2012-01-30 29 views
7

Tôi có một trình xử lý lệnh yêu cầu một hoạt động trên một đối tượng miền sẽ kích hoạt một sự kiện khi thao tác đã được thực thi. Tôi muốn kiểm tra xem một trình xử lý sự kiện có nhận được sự kiện khi lệnh tương ứng được gửi đi hay không (xem bên dưới, một số mã được bỏ qua cho ngắn gọn). Trình xử lý sự kiện (MyEventConsumer.Consume) không bao giờ được gọi ngay cả khi thông báo sự kiện được xuất bản trên bus (bus vòng lặp trong trường hợp này). Bất kỳ ý tưởng?Cách kiểm tra hệ thống dựa trên lệnh và sự kiện với Masstransit

//Test 
[TestFixture] 
public class TestSendCommandReceiveEvent 
{ 
    [Given] 
    public void installation_of_infrastructure_objects() 
    { 
     container.Register(Component.For<MyEventConsumer>().UsingFactoryMethod(() => new MyEventConsumer(_received))); 
     container.Register(
     Component.For<IServiceBus>() 
     .UsingFactoryMethod(() => ServiceBusFactory.New(x => { x.ReceiveFrom("loopback://localhost/mt_client"); x.Subscribe(conf => conf.LoadFrom(container));              }))); 
    } 

    [When] 
    public void sending_a_command() 
    { 
     var LocalBus = container.Resolve<IServiceBus>(); 
     LocalBus.Publish(new DoSomething(_aggregateId)); 
    } 
    [Then] 
    public void corresponding_event_should_be_received_by_consumer() 
    { 
     _received.WaitOne(5000).ShouldBeTrue(); 
    } 
} 
public class MyEventConsumer : Consumes<SomethingDone>.All 
{ 
    private readonly ManualResetEvent _received; 
    public MyEventConsumer(ManualResetEvent received) 
    { 
     _received = received; 
    } 
    public void Consume(SomethingDone message) 
    { 
     _received.Set(); 
    } 
} 

//Command handler 
public class DoSomethingCommandHandler : Consumes<DoSomething>.All where T:class 
{ 
    public void Consume(DoSomething message) 
    { 
     var ar = Repository.GetById<SomeAR>(message.ArId); 
     ar.DoSomething(); 
     Repository.Save(ar, Guid.NewGuid(), null); 
    } 
} 
//Domain object 
public class SomeDomainObject : AggregateBase 
{ 
    public void DoSomething() 
    { 
     RaiseEvent(new SomethingDone(Id, 1)); 
    } 
} 
+0

Điều này làm việc trong sản xuất và chỉ thất bại trong một thử nghiệm? Nó xuất hiện rằng công cụ là okay từ mã, nhưng như tôi nghĩ rằng có một số lỗi trong mã vì vậy nó giả định rằng công cụ kết nối đúng. Tôi khuyên bạn nên tham gia danh sách gửi thư với một chút chi tiết hơn về những gì đang diễn ra. https://groups.google.com/forum/#!forum/masstransit-discuss Nếu tôi phải đoán, có thể đó là vấn đề với vùng chứa. Tôi nghĩ rằng chúng tôi đã tìm ra tất cả chúng nhưng nó có thể là một ngoại lệ. – Travis

+0

Hmm, dường như cũng là một vấn đề về sản xuất. Phải định cấu hình bus sai. Tôi sẽ có một cái nhìn. – Christian

+0

Ok, không thể thấy những gì còn thiếu ở đây (ngoại trừ sự thiếu kinh nghiệm của riêng tôi với MT/Castle). Chuyển sang danh sách gửi thư. – Christian

Trả lời

5

này đi cho tôi:

// Copyright 2012 Henrik Feldt 
// 
// Licensed under the Apache License, Version 2.0 (the "License"); you may not use 
// this file except in compliance with the License. You may obtain a copy of the 
// License at 
// 
//  http://www.apache.org/licenses/LICENSE-2.0 
// 
// Unless required by applicable law or agreed to in writing, software distributed 
// under the License is distributed on an "AS IS" BASIS, WITHOUT WARRANTIES OR 
// CONDITIONS OF ANY KIND, either express or implied. See the License for the 
// specific language governing permissions and limitations under the License. 

using System; 
using System.Threading; 
using Castle.MicroKernel.Registration; 
using Castle.Windsor; 
using Magnum.Extensions; 
using Magnum.TestFramework; 
using MassTransit; 
using NUnit.Framework; 

namespace ConsoleApplication11 
{ 
    [TestFixture] 
    public class TestSendCommandReceiveEvent 
    { 
     ManualResetEventSlim _received = new ManualResetEventSlim(false); 
     IWindsorContainer _container; 

     [Given] 
     public void installation_of_infrastructure_objects() 
     { 
      _container = new WindsorContainer(); 
      _container.Register(
       Component.For<IServiceBus>() 
        .UsingFactoryMethod(() => ServiceBusFactory.New(x => 
         { 
          x.ReceiveFrom("loopback://localhost/mt_client"); 
          x.Subscribe(conf => 
           { 
            conf.Consumer(() => new MyEventConsumer(_received)); 
            conf.Consumer(() => new MyCmdConsumer()); 
           }); 
         }))); 

      when(); 
     } 

     public void when() 
     { 
      var localBus = _container.Resolve<IServiceBus>(); 
      // wait for startup 
      localBus.Endpoint.InboundTransport.Receive(c1 => c2 => { }, 1.Milliseconds()); 

      localBus.Publish(new DoSomething()); 
     } 

     [Then] 
     public void corresponding_event_should_be_received_by_consumer() 
     { 
      _received.Wait(5000).ShouldBeTrue(); 
     } 
    } 

    [Serializable] 
    public class DoSomething 
    { 
    } 

    [Serializable] 
    public class SomethingDone 
    { 
    } 

    public class MyEventConsumer : Consumes<SomethingDone>.All 
    { 
     readonly ManualResetEventSlim _received; 

     public MyEventConsumer(ManualResetEventSlim received) 
     { 
      _received = received; 
     } 

     public void Consume(SomethingDone message) 
     { 
      _received.Set(); 
     } 
    } 

    public class MyCmdConsumer : Consumes<DoSomething>.Context 
    { 
     public void Consume(IConsumeContext<DoSomething> ctx) 
     { 
      Console.WriteLine("consumed cmd"); 
      ctx.Bus.Publish(new SomethingDone()); 
     } 
    } 
} 
0

Theo kinh nghiệm của tôi, có một khoảng thời gian ngắn, ngay sau khi tạo cá thể buýt, trong đó bất kỳ tin nhắn đã xuất bản nào bị mất. Phải là một loại khởi tạo không đồng bộ đang diễn ra.

Thử thêm độ trễ giữa container.Resolve<IServiceBus>()LocalBus.Publish(new DoSomething(_aggregateId)).

Thread.Sleep không hoạt động trong trường hợp của tôi, nhưng một cách đáng ngạc nhiên là Console.ReadLine()!

+4

Bạn có thể làm: 'bus.Endpoint.InboundTransport.Receive (c1 => c2 => {}, TimeSpan.FromMilliseconds (1));' thay vì Thread.Sleep. Vấn đề là các vòng nhận/gửi đi và vận chuyển được khởi tạo không đồng bộ. – Henrik

Các vấn đề liên quan