2012-11-12 36 views
6

Trong ví dụ này https://stackoverflow.com/a/9980346/93647 và tại đây Why is my disruptor example so slow? (ở cuối câu hỏi) có 1 nhà xuất bản xuất bản các mặt hàng và 1 người tiêu dùng.Ví dụ về gián đoạn với 1 nhà xuất bản và 4 người tiêu dùng song song

Nhưng trong trường hợp của tôi, công việc của người tiêu dùng phức tạp hơn nhiều và mất chút thời gian. Vì vậy, tôi muốn 4 người tiêu dùng xử lý dữ liệu song song.

Vì vậy, ví dụ nếu nhà sản xuất số sản phẩm: 1,2,3,4,5,6,7,8,9,10,11 ..

Tôi muốn consumer1 để bắt 1,5,9, ... consumer2 để bắt 2,6,10, ... consumer3 để bắt 3,7,11, ... consumer4 để bắt 4,8,12 ... (cũng không chính xác những con số này, ý tưởng là dữ liệu đó nên được xử lý song song, tôi không quan tâm một số nào đó được xử lý đối với người tiêu dùng nào)

Và nhớ điều này cần được thực hiện song song vì trong ứng dụng thực tế của người tiêu dùng khá đắt. Tôi mong đợi người tiêu dùng được thực hiện trong các chủ đề khác nhau để sử dụng sức mạnh của các hệ thống đa lõi.

Tất nhiên tôi chỉ có thể tạo 4 ringbuffers và đính kèm 1 người tiêu dùng vào 1 vòng đệm. Bằng cách này tôi có thể sử dụng ví dụ ban đầu. Nhưng tôi cảm thấy nó sẽ không chính xác. Có khả năng sẽ là chính xác khi tạo 1 nhà xuất bản (1 người đeo vòng) và 4 người tiêu dùng - vì đây là những gì tôi cần.

Thêm liên kết đến một câu hỏi rất simular trong nhóm google: https://groups.google.com/forum/#!msg/lmax-disruptor/-CLapWuwWLU/GHEP4UkxrAEJ

Vì vậy, chúng ta có hai lựa chọn:

  • một vòng nhiều người tiêu dùng (mỗi người tiêu dùng sẽ "đánh thức" trên mỗi Thêm vào đó, tất cả người tiêu dùng nên có cùng WaitStrategy)
  • nhiều "một vòng - một người tiêu dùng" (mỗi người tiêu dùng sẽ chỉ thức dậy trên dữ liệu mà nó sẽ xử lý. mỗi người tiêu dùng có thể có WaitStrategy riêng).

Trả lời

1

EDIT: Tôi quên đề cập đến mã được lấy một phần từ the FAQ. Tôi không biết liệu phương pháp này tốt hơn hay tệ hơn lời đề nghị của Frank.

Dự án bị xem xét nghiêm ngặt, thật đáng tiếc vì nó trông đẹp.
Dù sao thử snip sau (dựa trên liên kết đầu tiên của bạn) - thử nghiệm trên mono và có vẻ là OK:

using System; 
using System.Threading.Tasks; 
using Disruptor; 
using Disruptor.Dsl; 

namespace DisruptorTest 
{ 
    public sealed class ValueEntry 
    { 
     public long Value { get; set; } 
    } 

    public class MyHandler : IEventHandler<ValueEntry> 
    { 
     private static int _consumers = 0; 
     private readonly int _ordinal; 

     public MyHandler() 
     { 
      this._ordinal = _consumers++; 
     } 

     public void OnNext(ValueEntry data, long sequence, bool endOfBatch) 
     { 
      if ((sequence % _consumers) == _ordinal) 
       Console.WriteLine("Event handled: Value = {0}, event {1} processed by {2}", data.Value, sequence, _ordinal); 
      else 
       Console.WriteLine("Event {0} rejected by {1}", sequence, _ordinal);      
     } 
    } 

    class Program 
    { 
     private static readonly Random _random = new Random(); 
     private const int SIZE = 16; // Must be multiple of 2 
     private const int WORKERS = 4; 

     static void Main() 
     { 
      var disruptor = new Disruptor.Dsl.Disruptor<ValueEntry>(() => new ValueEntry(), SIZE, TaskScheduler.Default); 
      for (int i=0; i < WORKERS; i++) 
       disruptor.HandleEventsWith(new MyHandler()); 
      var ringBuffer = disruptor.Start(); 

      while (true) 
      { 
       long sequenceNo = ringBuffer.Next(); 
       ringBuffer[sequenceNo].Value = _random.Next();; 
       ringBuffer.Publish(sequenceNo); 
       Console.WriteLine("Published entry {0}, value {1}", sequenceNo, ringBuffer[sequenceNo].Value); 
       Console.ReadKey(); 
      } 
     } 
    } 
} 
+0

cảm ơn! sẽ thử! – javapowered

+0

Bạn có biết nếu gọi đến nhạc chuông 'Next()' 'Phương thức Publish' có phải là chủ đề an toàn không? tôi có thể gọi chúng song song không? Tôi có thể gọi từ hai chủ đề khác nhau về phương pháp 'bẻ khóa' tiếp theo không? – javapowered

+0

cũng làm thế nào disruptor làm cho nội bộ này? bao nhiêu chủ đề được tạo ra? làm gián đoạn tạo ra thread riêng biệt cho mỗi người tiêu dùng? hoặc một số loại threadpool được sử dụng? – javapowered

0

Từ thông số kỹ thuật của bộ đệm vòng, bạn sẽ thấy rằng mọi người tiêu dùng sẽ cố gắng xử lý ValueEvent của bạn. trong trường hợp của bạn, bạn không cần điều đó.

Tôi giải quyết nó như thế này:

Thêm một lĩnh vực xử lý để ValueEvent của bạn và khi một người tiêu dùng có sự kiện ông thử nghiệm trên lĩnh vực đó, nếu nó đã được xử lý, ông chuyển sang lĩnh vực tiếp theo.

Không phải là cách đẹp nhất, nhưng đó là cách hoạt động của bộ đệm.

+0

bạn cần đồng bộ hóa trên cánh đồng đó? bạn khai báo nó 'volatile' hoặc sử dụng lớp' Interlock' để cập nhật trường 'bool'? Ngoài ra làm thế nào để đính kèm nhiều hơn một người tiêu dùng để ringbuffer? tôi chỉ có thể chuyển một người tiêu dùng đến phương thức 'HandleEventsWith'.Cho đến giờ với tôi, có vẻ dễ dàng hơn khi tạo 4 vòng đệm và chuyển qua chúng bằng cách sử dụng bộ đệm vòng tiếp theo để xuất bản mỗi lần :) – javapowered

+0

Nếu bạn tạo 4 bộ đệm vòng, bạn sẽ mất tính năng "cân bằng tải" của vòng bộ đệm, và tôi cho rằng đó là lý do tại sao bạn đang sử dụng nó. Trên Q khác của bạn tôi đang sử dụng bộ đệm JAVA vì vậy tôi không thể hiển thị cho bạn mã, nhưng chỉ cần làm theo các ví dụ, họ là khá rõ ràng. – Frank

+0

trong trường hợp cụ thể này tôi không cần "cân bằng tải" như nhiệm vụ của tôi là gần như "bằng nhau" và chỉ tách chúng giữa 4 là ok. Tuy nhiên đó là tính năng thú vị. Tôi có nên làm theo các ví dụ Java không? Bởi vì tôi gần như không thể tìm thấy bất kỳ C# ví dụ. – javapowered

Các vấn đề liên quan