2013-02-15 21 views
111

BookSleeve của tôi. Tôi đang cố gắng tạo một kịch bản chuyển đổi dự phòng bus Redis với ứng dụng SignalR.Sử dụng SignalR với Redis chuyển đổi messagebus bằng ConnectionUtils.Connect()

Lúc đầu, chúng tôi đã thử chuyển đổi dự phòng cân bằng tải phần cứng đơn giản, chỉ cần theo dõi hai máy chủ Redis. Ứng dụng SignalR chỉ vào điểm cuối HLB số ít. Sau đó tôi đã thất bại một máy chủ, nhưng không thể nhận được bất kỳ tin nhắn nào thành công trên máy chủ Redis thứ hai mà không cần tái chế nhóm ứng dụng SignalR. Có lẽ điều này là bởi vì nó cần phải phát hành các lệnh thiết lập cho xe buýt tin nhắn Redis mới.

Do SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBus sử dụng Booksleeve's RedisConnection() để kết nối với một Redis đơn lẻ cho pub/sub.

Tôi đã tạo một lớp mới, RedisMessageBusCluster() sử dụng ConnectionUtils.Connect() của Booksleeve để kết nối với một trong một cụm máy chủ Redis.

using System; 
using System.Collections.Generic; 
using System.Linq; 
using System.Threading; 
using System.Threading.Tasks; 
using BookSleeve; 
using Microsoft.AspNet.SignalR.Infrastructure; 

namespace Microsoft.AspNet.SignalR.Redis 
{ 
    /// <summary> 
    /// WIP: Getting scaleout for Redis working 
    /// </summary> 
    public class RedisMessageBusCluster : ScaleoutMessageBus 
    { 
     private readonly int _db; 
     private readonly string[] _keys; 
     private RedisConnection _connection; 
     private RedisSubscriberConnection _channel; 
     private Task _connectTask; 

     private readonly TaskQueue _publishQueue = new TaskQueue(); 

     public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver) 
      : base(resolver) 
     { 
      _db = db; 
      _keys = keys.ToArray(); 

      // uses a list of connections 
      _connection = ConnectionUtils.Connect(serverList); 

      //_connection = new RedisConnection(host: server, port: port, password: password); 

      _connection.Closed += OnConnectionClosed; 
      _connection.Error += OnConnectionError; 


      // Start the connection - TODO: can remove this Open as the connection is already opened, but there's the _connectTask is used later on 
      _connectTask = _connection.Open().Then(() => 
      { 
       // Create a subscription channel in redis 
       _channel = _connection.GetOpenSubscriberChannel(); 

       // Subscribe to the registered connections 
       _channel.Subscribe(_keys, OnMessage); 

       // Dirty hack but it seems like subscribe returns before the actual 
       // subscription is properly setup in some cases 
       while (_channel.SubscriptionCount == 0) 
       { 
        Thread.Sleep(500); 
       } 
      }); 
     } 


     protected override Task Send(Message[] messages) 
     { 
      return _connectTask.Then(msgs => 
      { 
       var taskCompletionSource = new TaskCompletionSource<object>(); 

       // Group messages by source (connection id) 
       var messagesBySource = msgs.GroupBy(m => m.Source); 

       SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource); 

       return taskCompletionSource.Task; 
      }, 
      messages); 
     } 

     private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource) 
     { 
      if (!enumerator.MoveNext()) 
      { 
       taskCompletionSource.TrySetResult(null); 
      } 
      else 
      { 
       IGrouping<string, Message> group = enumerator.Current; 

       // Get the channel index we're going to use for this message 
       int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length; 

       string key = _keys[index]; 

       // Increment the channel number 
       _connection.Strings.Increment(_db, key) 
            .Then((id, k) => 
            { 
             var message = new RedisMessage(id, group.ToArray()); 

             return _connection.Publish(k, message.GetBytes()); 
            }, key) 
            .Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource) 
            .ContinueWithNotComplete(taskCompletionSource); 
      } 
     } 

     private void OnConnectionClosed(object sender, EventArgs e) 
     { 
      // Should we auto reconnect? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e) 
     { 
      // How do we bubble errors? 
      if (true) 
      { 
       ; 
      } 
     } 

     private void OnMessage(string key, byte[] data) 
     { 
      // The key is the stream id (channel) 
      var message = RedisMessage.Deserialize(data); 

      _publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages)); 
     } 

     protected override void Dispose(bool disposing) 
     { 
      if (disposing) 
      { 
       if (_channel != null) 
       { 
        _channel.Unsubscribe(_keys); 
        _channel.Close(abort: true); 
       } 

       if (_connection != null) 
       { 
        _connection.Close(abort: true); 
       }     
      } 

      base.Dispose(disposing); 
     } 
    } 
} 

Booksleeve có cơ chế riêng để xác định tổng thể và sẽ tự động chuyển sang máy chủ khác và hiện đang thử nghiệm điều này với SignalR.Chat.

Trong web.config, tôi thiết lập danh sách các máy chủ có sẵn:

<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/> 

Sau đó, trong Application_Start():

 // Redis cluster server list 
     string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"]; 

     List<string> eventKeys = new List<string>(); 
     eventKeys.Add("SignalR.Redis.FailoverTest"); 
     GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys); 

tôi bổ sung thêm hai phương pháp bổ sung cho Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions:

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys) 
{ 
    return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys); 
} 

public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys) 
{ 
    var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver)); 
    resolver.Register(typeof(IMessageBus),() => bus.Value); 

    return resolver; 
} 

Bây giờ vấn đề là khi tôi có một số điểm ngắt được kích hoạt, u ntil sau khi tên người dùng đã được thêm vào, sau đó vô hiệu hóa tất cả các điểm ngắt, ứng dụng hoạt động như mong đợi. Tuy nhiên, với các điểm ngắt bị vô hiệu hóa ngay từ đầu, có vẻ như có một số điều kiện chủng tộc có thể bị lỗi trong quá trình kết nối.

Như vậy, trong RedisMessageCluster():

// Start the connection 
    _connectTask = _connection.Open().Then(() => 
    { 
     // Create a subscription channel in redis 
     _channel = _connection.GetOpenSubscriberChannel(); 

     // Subscribe to the registered connections 
     _channel.Subscribe(_keys, OnMessage); 

     // Dirty hack but it seems like subscribe returns before the actual 
     // subscription is properly setup in some cases 
     while (_channel.SubscriptionCount == 0) 
     { 
      Thread.Sleep(500); 
     } 
    }); 

Tôi đã thử thêm cả một Task.Wait, và thậm chí thêm một Sleep() (không hiển thị ở trên) - đã được chờ đợi/etc, nhưng vẫn nhận được lỗi.

Các lỗi lặp đi lặp lại có vẻ là trong Booksleeve.MessageQueue.cs ~ ln 71:

A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll 
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821 
    --- End of inner exception stack trace --- 
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed 
    at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71 
    at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910 
    at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826 
    at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277 
    at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90 
    at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893 
    at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<--- 



public void Enqueue(RedisMessage item, bool highPri) 
{ 
    lock (stdPriority) 
    { 
     if (closed) 
     { 
      throw new InvalidOperationException("The queue is closed"); 
     } 

Trường hợp ngoại lệ hàng đợi khép kín đã được ném.

Tôi thấy trước một vấn đề khác: Vì kết nối Redis được thực hiện trong Application_Start() có thể có một số sự cố trong "kết nối lại" với máy chủ khác. Tuy nhiên, tôi nghĩ rằng điều này là hợp lệ khi sử dụng số ít RedisConnection(), trong đó chỉ có một kết nối để chọn. Tuy nhiên, với sự xâm nhập của ConnectionUtils.Connect() Tôi muốn nghe từ @dfowler hoặc các tín hiệu SignalR khác trong cách kịch bản này được xử lý trong SignalR.

+0

Tôi sẽ xem xét, nhưng: điều đầu tiên xảy ra là bạn không cần phải gọi 'Mở' vì kết nối bạn nên * đã * mở. Mặc dù vậy, tôi sẽ không thể nhìn ngay được, khi tôi chuẩn bị sẵn sàng cho chuyến bay –

+0

Tôi tin rằng có hai vấn đề ở đây. 1) cách Booksleeve đối phó với một chuyển đổi dự phòng; 2) Cách SignalR sử dụng con trỏ để theo dõi khách hàng. Khi một bus tin nhắn mới được khởi tạo, tất cả các con trỏ từ mb1 không xuất hiện trên mb2. Vì vậy, khi đặt lại pool ứng dụng SignalR, nó sẽ bắt đầu hoạt động - không phải trước đây, mà rõ ràng không phải là một lựa chọn khả thi. – ElHaix

+2

Liên kết mô tả cách SignalR sử dụng con trỏ: http://stackoverflow.com/questions/13054592/how-does-signalr-redis-work-under-the-hood/13063449#13063449 – ElHaix

Trả lời

14

Nhóm SignalR hiện đã hỗ trợ cho nhà máy kết nối tùy chỉnh với StackExchange.Redis, người kế thừa BookSleeve, hỗ trợ kết nối Redis dự phòng thông qua ConnectionMultiplexer.

Vấn đề ban đầu gặp phải là mặc dù việc tạo các phương thức mở rộng của riêng tôi trong BookSleeve để chấp nhận một tập hợp các máy chủ, nhưng không thể thực hiện được.

Bây giờ, với sự phát triển của BookSleeve sang StackExchange.Redis, chúng tôi hiện có thể configure bộ sưu tập các máy chủ/cổng ngay trong khởi tạo Connect.

Việc thực hiện mới là đơn giản hơn nhiều so với con đường tôi đang đi xuống, trong việc tạo ra một phương pháp UseRedisCluster, và back-end pluming bây giờ hỗ trợ đúng fail-over:

var conn = ConnectionMultiplexer.Connect("redisServer1:6380,redisServer2:6380,redisServer3:6380,allowAdmin=true"); 

StackExchange.Redis cũng cho phép thêm cấu hình thủ công như đã nêu trong phần Automatic and Manual Configuration của tài liệu:

ConfigurationOptions config = new ConfigurationOptions 
{ 
    EndPoints = 
    { 
     { "redis0", 6379 }, 
     { "redis1", 6380 } 
    }, 
    CommandMap = CommandMap.Create(new HashSet<string> 
    { // EXCLUDE a few commands 
     "INFO", "CONFIG", "CLUSTER", 
     "PING", "ECHO", "CLIENT" 
    }, available: false), 
    KeepAlive = 180, 
    DefaultVersion = new Version(2, 8, 8), 
    Password = "changeme" 
}; 

về bản chất, khả năng để khởi tạo môi trường quy mô-out SignalR của chúng tôi với một tập hợp các máy chủ hiện nay giải quyết proble ban đầu m.

+0

Tôi có nên thưởng cho câu trả lời của bạn với 500 tiền thưởng đại diện không? ;) – nicael

+0

Vâng, nếu bạn tin rằng bây giờ là * câu trả lời * :) – ElHaix

+0

@ElHaix kể từ khi bạn đặt câu hỏi, bạn có thể đủ điều kiện để nói câu trả lời của bạn có kết luận hay không hay chỉ là một phần trong câu đố - tôi đề nghị thêm một câu để cho biết liệu có thể giải quyết vấn đề của bạn như thế nào –

Các vấn đề liên quan