2013-11-21 13 views
6

Tôi đang tìm một giải pháp tốt để đăng nhập thông báo không thành công, ngay sau khi vượt quá giới hạn thử lại, mà không có thỏa thuận với hàng đợi lỗi. Những gì tôi đã tìm thấy cho đến nay:Làm thế nào để đăng nhập thông báo thất bại trong masstransit?

  • tôi có thể kế thừa từ InMemoryInboundMessageTracker và ghi đè IsRetryLimitExceeded, nhưng vào thời điểm này không có thông tin về thông điệp riêng của mình trừ id.
  • tôi có thể thực hiện IInboundMessageInterceptor và nhận IConsumeContext trong Pre/PostDispatch, nhưng vào thời điểm này không có thông tin về thành công/thất bại.

Vì vậy, như một giải pháp, tôi có thể nhận IConsumeContext trong PreDispatch đặt nó trong một số loại một bộ nhớ cache sau đó nhận được nó ra khỏi một bộ nhớ cache trong IsRetryLimitExceeded khi giới hạn retry bị vượt qua.

Phương pháp này được gọi là theo thứ tự như vậy: IsRetryLimitExceeded -> PreDispatch -> PostDispatch

Vì vậy, tôi không thể tìm thấy một nơi tốt để loại bỏ thông điệp xử lý thành công từ một bộ nhớ cache.

Tất nhiên tôi có thể sử dụng bộ nhớ cache có kích thước giới hạn nhưng toàn bộ giải pháp này có vẻ lạ.

Bất kỳ suy nghĩ nào về vấn đề này sẽ được đánh giá cao.

Trả lời

2

Bạn có thể triển khai và định cấu hình Theo dõi thử lại tin nhắn của riêng mình trên xe buýt để thông báo không thành công được chuyển qua quá trình triển khai của bạn. Bạn có thể ủy quyền cho trình theo dõi thử lại mặc định và chỉ chặn các sự kiện để bạn có thể hành động với họ hoặc bạn có thể thực hiện theo dõi thử lại của riêng mình nếu cần.

MessageTrackerFactory là đại biểu để định cấu hình, tôi nghĩ giao diện ở gần.

+0

Yep, đó là trường hợp với việc thực hiện/trọng IsRetryLimitExceeded. – amstix

+0

Đó là từ giao diện _IInboundMessageTracker_. Không có thông tin về thông điệp và bối cảnh tiêu thụ ngoại trừ messageId, nhưng có một vài phương thức hữu ích _MessageWasReceivedSuccessfully_ và _MessageWasMovedToErrorQueue_ có thể được sử dụng để loại bỏ IConsumeContext khỏi bộ nhớ cache. – amstix

4

Tôi đã ednded lên với giải pháp này:

class MessageInterceptor: IInboundMessageInterceptor 
{ 
    public void PreDispatch(IConsumeContext context) 
    { 
     MessageTracker.Register(context); 
    } 

    public void PostDispatch(IConsumeContext context) 
    {} 
} 

class MessageTracker: InMemoryInboundMessageTracker 
{ 
    readonly Logger logger; 

    static readonly ConcurrentDictionary<string, IConsumeContext> DispatchingCache = new ConcurrentDictionary<string, IConsumeContext>(); 

    public MessageTracker(int retryLimit, Logger logger) 
     : base(retryLimit) 
    { 
     this.logger = logger; 
    } 

    public static void Register(IConsumeContext context) 
    { 
     DispatchingCache.GetOrAdd(context.MessageId, context); 
    } 

    public override void MessageWasReceivedSuccessfully(string id) 
    { 
     base.MessageWasReceivedSuccessfully(id); 

     IConsumeContext value; 
     DispatchingCache.TryRemove(id, out value); 
    } 

    public override bool IsRetryLimitExceeded(string id, out Exception retryException, out IEnumerable<Action> faultActions) 
    { 
     var result = base.IsRetryLimitExceeded(id, out retryException, out faultActions); 

     IConsumeContext failed; 
     if (!result || !DispatchingCache.TryRemove(id, out failed)) 
      return result; 

     // --> log failed IConsumeContext with exception 

     return true; 
    } 
} 

Và để cắm những lớp học trong

 serviceBus = ServiceBusFactory.New(config => 
     { 
      ... 
      config.AddBusConfigurator(new PostCreateBusBuilderConfigurator(sb => 
      { 
       var interceptorConfig = new InboundMessageInterceptorConfigurator(sb.InboundPipeline); 
       interceptorConfig.Create(new MessageInterceptor()); 
      })); 

      config.SetDefaultInboundMessageTrackerFactory(retryLimit => new MessageTracker(retryLimit, LogManager.GetCurrentClassLogger())); 
     }); 
Các vấn đề liên quan