BookSleeve của tôi. Tôi đang cố gắng tạo một kịch bản chuyển đổi dự phòng bus Redis với ứng dụng SignalR.Sử dụng SignalR với Redis chuyển đổi messagebus bằng ConnectionUtils.Connect()
Lúc đầu, chúng tôi đã thử chuyển đổi dự phòng cân bằng tải phần cứng đơn giản, chỉ cần theo dõi hai máy chủ Redis. Ứng dụng SignalR chỉ vào điểm cuối HLB số ít. Sau đó tôi đã thất bại một máy chủ, nhưng không thể nhận được bất kỳ tin nhắn nào thành công trên máy chủ Redis thứ hai mà không cần tái chế nhóm ứng dụng SignalR. Có lẽ điều này là bởi vì nó cần phải phát hành các lệnh thiết lập cho xe buýt tin nhắn Redis mới.
Do SignalR RC1, Microsoft.AspNet.SignalR.Redis.RedisMessageBus
sử dụng Booksleeve's RedisConnection()
để kết nối với một Redis đơn lẻ cho pub/sub.
Tôi đã tạo một lớp mới, RedisMessageBusCluster()
sử dụng ConnectionUtils.Connect()
của Booksleeve để kết nối với một trong một cụm máy chủ Redis.
using System;
using System.Collections.Generic;
using System.Linq;
using System.Threading;
using System.Threading.Tasks;
using BookSleeve;
using Microsoft.AspNet.SignalR.Infrastructure;
namespace Microsoft.AspNet.SignalR.Redis
{
/// <summary>
/// WIP: Getting scaleout for Redis working
/// </summary>
public class RedisMessageBusCluster : ScaleoutMessageBus
{
private readonly int _db;
private readonly string[] _keys;
private RedisConnection _connection;
private RedisSubscriberConnection _channel;
private Task _connectTask;
private readonly TaskQueue _publishQueue = new TaskQueue();
public RedisMessageBusCluster(string serverList, int db, IEnumerable<string> keys, IDependencyResolver resolver)
: base(resolver)
{
_db = db;
_keys = keys.ToArray();
// uses a list of connections
_connection = ConnectionUtils.Connect(serverList);
//_connection = new RedisConnection(host: server, port: port, password: password);
_connection.Closed += OnConnectionClosed;
_connection.Error += OnConnectionError;
// Start the connection - TODO: can remove this Open as the connection is already opened, but there's the _connectTask is used later on
_connectTask = _connection.Open().Then(() =>
{
// Create a subscription channel in redis
_channel = _connection.GetOpenSubscriberChannel();
// Subscribe to the registered connections
_channel.Subscribe(_keys, OnMessage);
// Dirty hack but it seems like subscribe returns before the actual
// subscription is properly setup in some cases
while (_channel.SubscriptionCount == 0)
{
Thread.Sleep(500);
}
});
}
protected override Task Send(Message[] messages)
{
return _connectTask.Then(msgs =>
{
var taskCompletionSource = new TaskCompletionSource<object>();
// Group messages by source (connection id)
var messagesBySource = msgs.GroupBy(m => m.Source);
SendImpl(messagesBySource.GetEnumerator(), taskCompletionSource);
return taskCompletionSource.Task;
},
messages);
}
private void SendImpl(IEnumerator<IGrouping<string, Message>> enumerator, TaskCompletionSource<object> taskCompletionSource)
{
if (!enumerator.MoveNext())
{
taskCompletionSource.TrySetResult(null);
}
else
{
IGrouping<string, Message> group = enumerator.Current;
// Get the channel index we're going to use for this message
int index = Math.Abs(group.Key.GetHashCode()) % _keys.Length;
string key = _keys[index];
// Increment the channel number
_connection.Strings.Increment(_db, key)
.Then((id, k) =>
{
var message = new RedisMessage(id, group.ToArray());
return _connection.Publish(k, message.GetBytes());
}, key)
.Then((enumer, tcs) => SendImpl(enumer, tcs), enumerator, taskCompletionSource)
.ContinueWithNotComplete(taskCompletionSource);
}
}
private void OnConnectionClosed(object sender, EventArgs e)
{
// Should we auto reconnect?
if (true)
{
;
}
}
private void OnConnectionError(object sender, BookSleeve.ErrorEventArgs e)
{
// How do we bubble errors?
if (true)
{
;
}
}
private void OnMessage(string key, byte[] data)
{
// The key is the stream id (channel)
var message = RedisMessage.Deserialize(data);
_publishQueue.Enqueue(() => OnReceived(key, (ulong)message.Id, message.Messages));
}
protected override void Dispose(bool disposing)
{
if (disposing)
{
if (_channel != null)
{
_channel.Unsubscribe(_keys);
_channel.Close(abort: true);
}
if (_connection != null)
{
_connection.Close(abort: true);
}
}
base.Dispose(disposing);
}
}
}
Booksleeve có cơ chế riêng để xác định tổng thể và sẽ tự động chuyển sang máy chủ khác và hiện đang thử nghiệm điều này với SignalR.Chat
.
Trong web.config
, tôi thiết lập danh sách các máy chủ có sẵn:
<add key="redis.serverList" value="dbcache1.local:6379,dbcache2.local:6379"/>
Sau đó, trong Application_Start()
:
// Redis cluster server list
string redisServerlist = ConfigurationManager.AppSettings["redis.serverList"];
List<string> eventKeys = new List<string>();
eventKeys.Add("SignalR.Redis.FailoverTest");
GlobalHost.DependencyResolver.UseRedisCluster(redisServerlist, eventKeys);
tôi bổ sung thêm hai phương pháp bổ sung cho Microsoft.AspNet.SignalR.Redis.DependencyResolverExtensions
:
public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, IEnumerable<string> eventKeys)
{
return UseRedisCluster(resolver, serverList, db: 0, eventKeys: eventKeys);
}
public static IDependencyResolver UseRedisCluster(this IDependencyResolver resolver, string serverList, int db, IEnumerable<string> eventKeys)
{
var bus = new Lazy<RedisMessageBusCluster>(() => new RedisMessageBusCluster(serverList, db, eventKeys, resolver));
resolver.Register(typeof(IMessageBus),() => bus.Value);
return resolver;
}
Bây giờ vấn đề là khi tôi có một số điểm ngắt được kích hoạt, u ntil sau khi tên người dùng đã được thêm vào, sau đó vô hiệu hóa tất cả các điểm ngắt, ứng dụng hoạt động như mong đợi. Tuy nhiên, với các điểm ngắt bị vô hiệu hóa ngay từ đầu, có vẻ như có một số điều kiện chủng tộc có thể bị lỗi trong quá trình kết nối.
Như vậy, trong RedisMessageCluster()
:
// Start the connection
_connectTask = _connection.Open().Then(() =>
{
// Create a subscription channel in redis
_channel = _connection.GetOpenSubscriberChannel();
// Subscribe to the registered connections
_channel.Subscribe(_keys, OnMessage);
// Dirty hack but it seems like subscribe returns before the actual
// subscription is properly setup in some cases
while (_channel.SubscriptionCount == 0)
{
Thread.Sleep(500);
}
});
Tôi đã thử thêm cả một Task.Wait
, và thậm chí thêm một Sleep()
(không hiển thị ở trên) - đã được chờ đợi/etc, nhưng vẫn nhận được lỗi.
Các lỗi lặp đi lặp lại có vẻ là trong Booksleeve.MessageQueue.cs
~ ln 71:
A first chance exception of type 'System.InvalidOperationException' occurred in BookSleeve.dll
iisexpress.exe Error: 0 : SignalR exception thrown by Task: System.AggregateException: One or more errors occurred. ---> System.InvalidOperationException: The queue is closed
at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821
--- End of inner exception stack trace ---
---> (Inner Exception #0) System.InvalidOperationException: The queue is closed
at BookSleeve.MessageQueue.Enqueue(RedisMessage item, Boolean highPri) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\MessageQueue.cs:line 71
at BookSleeve.RedisConnectionBase.EnqueueMessage(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 910
at BookSleeve.RedisConnectionBase.ExecuteInt64(RedisMessage message, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\RedisConnectionBase.cs:line 826
at BookSleeve.RedisConnection.IncrementImpl(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 277
at BookSleeve.RedisConnection.BookSleeve.IStringCommands.Increment(Int32 db, String key, Int64 value, Boolean queueJump) in c:\Projects\Frameworks\BookSleeve-1.2.0.5\BookSleeve\IStringCommands.cs:line 270
at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.SendImpl(IEnumerator`1 enumerator, TaskCompletionSource`1 taskCompletionSource) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 90
at Microsoft.AspNet.SignalR.Redis.RedisMessageBusCluster.<Send>b__2(Message[] msgs) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Redis\RedisMessageBusCluster.cs:line 67
at Microsoft.AspNet.SignalR.TaskAsyncHelper.GenericDelegates`4.<>c__DisplayClass57.<ThenWithArgs>b__56() in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 893
at Microsoft.AspNet.SignalR.TaskAsyncHelper.TaskRunners`2.<>c__DisplayClass42.<RunTask>b__41(Task t) in c:\Projects\Frameworks\SignalR\SignalR.1.0RC1\SignalR\src\Microsoft.AspNet.SignalR.Core\TaskAsyncHelper.cs:line 821<---
public void Enqueue(RedisMessage item, bool highPri)
{
lock (stdPriority)
{
if (closed)
{
throw new InvalidOperationException("The queue is closed");
}
Trường hợp ngoại lệ hàng đợi khép kín đã được ném.
Tôi thấy trước một vấn đề khác: Vì kết nối Redis được thực hiện trong Application_Start()
có thể có một số sự cố trong "kết nối lại" với máy chủ khác. Tuy nhiên, tôi nghĩ rằng điều này là hợp lệ khi sử dụng số ít RedisConnection()
, trong đó chỉ có một kết nối để chọn. Tuy nhiên, với sự xâm nhập của ConnectionUtils.Connect()
Tôi muốn nghe từ @dfowler
hoặc các tín hiệu SignalR khác trong cách kịch bản này được xử lý trong SignalR.
Tôi sẽ xem xét, nhưng: điều đầu tiên xảy ra là bạn không cần phải gọi 'Mở' vì kết nối bạn nên * đã * mở. Mặc dù vậy, tôi sẽ không thể nhìn ngay được, khi tôi chuẩn bị sẵn sàng cho chuyến bay –
Tôi tin rằng có hai vấn đề ở đây. 1) cách Booksleeve đối phó với một chuyển đổi dự phòng; 2) Cách SignalR sử dụng con trỏ để theo dõi khách hàng. Khi một bus tin nhắn mới được khởi tạo, tất cả các con trỏ từ mb1 không xuất hiện trên mb2. Vì vậy, khi đặt lại pool ứng dụng SignalR, nó sẽ bắt đầu hoạt động - không phải trước đây, mà rõ ràng không phải là một lựa chọn khả thi. – ElHaix
Liên kết mô tả cách SignalR sử dụng con trỏ: http://stackoverflow.com/questions/13054592/how-does-signalr-redis-work-under-the-hood/13063449#13063449 – ElHaix