2010-06-29 28 views
5

xin chào tôi muốn đề tài hợp tác với nhà sản xuất và người tiêu dùng. người tiêu dùng là khá chậm, và nhà sản xuất là rất nhanh và hoạt động trong các vụ nổ.C# liên lạc giữa các luồng

ví dụ người tiêu dùng có thể xử lý một tin nhắn trong 20 giây và nhà sản xuất có thể tạo 10 tin nhắn trong một giây, nhưng thực hiện một lần trong một thời gian dài để người tiêu dùng có thể bắt kịp.

tôi muốn một cái gì đó như:

Stream commonStream; 
AutoResetEvent commonLock; 

void Producer() 
{ 
    while (true) 
    { 
    magic.BlockUntilMagicAvalible(); 
    byte[] buffer = magic.Produce(); 
    commonStream.Write(buffer); 
    commonLock.Set(); 
    } 
} 

void Consumer() 
{ 
    while(true) 
    { 
    commonLock.WaitOne(); 
    MagicalObject o = binarySerializer.Deserialize(commonStream); 
    DoSomething(o); 
    } 
} 
+0

Phiên bản .Net nào bạn đang sử dụng, có một số điều mới cho v4 cho chính xác công cụ này –

+0

.Net 3.5; Nhận xét phải dài ít nhất 15 ký tự. –

Trả lời

11

Nếu bạn có Net 4.0 hoặc cao hơn, bạn có thể làm theo cách này bằng cách sử dụng một BlockingCollection

int maxBufferCap = 500; 
BlockingCollection<MagicalObject> Collection 
          = new BlockingCollection<MagicalObject>(maxBufferCap); 
void Producer() 
{ 
    while (magic.HasMoreMagic) 
    { 
     this.Collection.Add(magic.ProduceMagic()); 
    } 
    this.Collection.CompleteAdding(); 
} 

void Consumer() 
{ 
    foreach (MagicalObject magicalObject in this.Collection.GetConsumingEnumerable()) 
    { 
     DoSomthing(magicalObject); 
    } 
} 

Dòng foreach sẽ ngủ nếu không có dữ liệu trong bộ đệm, nó cũng sẽ tự động đánh thức nó khi một cái gì đó được thêm vào bộ sưu tập.

Lý do tôi đặt bộ đệm tối đa là nếu nhà sản xuất của bạn nhanh hơn nhiều so với người tiêu dùng bạn có thể sẽ tiêu tốn rất nhiều bộ nhớ khi ngày càng có nhiều đối tượng được đưa vào bộ sưu tập. Bằng cách thiết lập kích thước bộ đệm tối đa khi bạn tạo bộ sưu tập chặn khi kích thước bộ đệm đạt đến lệnh gọi Add trên nhà sản xuất sẽ chặn cho đến khi một mục đã bị xóa khỏi bộ sưu tập của người tiêu dùng.

Một phần thưởng khác của lớp học BlockingCollection là bạn có thể có nhiều nhà sản xuất và người tiêu dùng theo ý muốn, không cần tỷ lệ 1: 1. Nếu DoSomthing hỗ trợ nó, bạn có thể có một vòng lặp foreach mỗi lõi của máy tính (hoặc thậm chí sử dụng Parallel.ForEach và sử dụng đếm được tiêu thụ như các nguồn dữ liệu)

void ConsumersInParalell() 
{ 
    //This assumes the method signature of DoSomthing is one of the following: 
    // Action<MagicalObject> 
    // Action<MagicalObject, ParallelLoopState> 
    // Action<MagicalObject, ParallelLoopState, long> 
    Paralell.ForEach(this.Collection.GetConsumingEnumerable(), DoSomthing); 
} 
+2

Lưu ý rằng TPL đã được back-ported để. NET 3.5: http://codeblog.theg2.net/2010/02/tpl-and-parallelforeach-in- net-35-using.html –

0

Bạn có thể có được những gì bạn muốn sử dụng một hàng đợi và hẹn giờ. Nhà sản xuất thêm các giá trị vào hàng đợi và bắt đầu hẹn giờ của người tiêu dùng. Sự kiện trôi qua của người tiêu dùng theo thời gian (trên luồng Threadpool) dừng bộ hẹn giờ và lặp lại hàng đợi cho đến khi nó trống, sau đó biến mất (không cần bỏ phiếu không cần thiết). Nhà sản xuất có thể thêm vào hàng đợi trong khi người tiêu dùng vẫn đang chạy.

System.Timers.Timer consumerTimer; 
Queue<byte[]> queue = new Queue<byte[]>(); 

void Producer() 
{ 
    consumerTimer = new System.Timers.Timer(1000); 
    consumerTimer.Elapsed += new System.Timers.ElapsedEventHandler(consumerTimer_Elapsed); 
    while (true) 
    { 
     magic.BlockUntilMagicAvailable(); 
     lock (queue) 
     { 
      queue.Enqueue(magic.Produce()); 
      if (!consumerTimer.Enabled) 
      { 
       consumerTimer.Start(); 
      } 
     } 
    } 
} 

void consumerTimer_Elapsed(object sender, System.Timers.ElapsedEventArgs e) 
{ 
    while (true) 
    { 
     consumerTimer.Stop(); 
     lock (queue) 
     { 
      if (queue.Count > 0) 
      { 
       DoSomething(queue.Dequeue()); 
      } 
      else 
      { 
       break; 
      } 
     } 
    } 
} 
+0

đoạn mã của bạn không phải là chủ đề an toàn ... và tôi ngụ ý không bỏ phiếu –

+0

Chủ đề không an toàn về chủ đề này? Và nó không bình chọn - bộ đếm thời gian là một ảnh chỉ được kích hoạt khi nhà sản xuất thêm vào hàng đợi. –

-1

Tôi sử dụng Mutex's. Ý tưởng là cả hai chạy trong các chủ đề khác nhau. Chủ đề người tiêu dùng được bắt đầu bị khóa bởi một mutex, nơi nó sẽ ngồi vô thời hạn cho đến khi phát hành bởi các nhà sản xuất. Sau đó nó sẽ xử lý dữ liệu song song để rời khỏi Nhà sản xuất để tiếp tục. Người tiêu dùng sẽ khóa lại khi hoàn tất.

(Mã bắt đầu thread, và bit chất lượng khác đã được bỏ qua cho ngắn gọn.)

// Pre-create mutex owned by Producer thread, then start Consumer thread. 
Mutex mutex = new Mutex(true); 
Queue<T> queue = new Queue<T>(); 

void Producer_AddData(T data) 
{ 
    lock (queue) { 
    queue.Enqueue(GetData()); 
    } 

    // Release mutex to start thread: 
    mutex.ReleaseMutex(); 
    mutex.WaitOne(); 
} 

void Consumer() 
{ 
    while(true) 
    { 
    // Wait indefinitely on mutex 
    mutex.WaitOne(); 
    mutex.ReleaseMutex(); 

    T data; 
    lock (queue) { 
     data = queue.Dequeue(); 
    } 
    DoSomething(data); 
    } 

}

này làm chậm quá trình sản xuất bởi một số rất ít mili giây trong khi đang chờ đợi cho người tiêu dùng để đánh thức và phát hành mutex. Nếu bạn có thể sống với điều đó.

+0

Sử dụng 'BlockingCollection' tốt hơn nhiều. Trước hết, nó rõ ràng hơn nhiều khi nó chính xác hơn là sử dụng mutexes, và không giống như mô hình của bạn, nhà sản xuất và người tiêu dùng có thể làm việc song song; bạn đảm bảo rằng mã của bạn là * hoặc * sản xuất * hoặc * tiêu thụ, nhưng không bao giờ cả hai. Nó cũng không quy mô tốt cho nhiều hơn một nhà sản xuất hoặc nhiều hơn một người tiêu dùng, không giống như một bộ sưu tập chặn nơi làm như vậy là tầm thường. Bạn có thể sử dụng một phương pháp tiếp cận dựa trên mutex phức tạp hơn có lợi ích của một bộ sưu tập chặn, nhưng nó sẽ là * rất nhiều công việc và ít dễ đọc hơn/có thể duy trì được. – Servy

+0

BlockingColletion không có sẵn cho tôi vì tôi không thể chạy 4.5. Nếu tôi có thể thì đây có lẽ là giải pháp đúng. Tuy nhiên, mã này chạy song song. Tôi có thể đã không được rõ ràng, nhưng hai là trong chủ đề khác nhau. Tôi sử dụng điều này để chạy các truy vấn SQL nặng trên một luồng, trong khi thu thập dữ liệu trên một luồng khác và nó hoạt động tốt cho tôi. – Ben

+0

BlockingCollection đã được thêm vào 4.0, không phải 4.5. – Servy

Các vấn đề liên quan