2011-10-23 44 views
23

Tôi tự hỏi liệu có tồn tại/triển khai cho ConcurrentQueue, tương tự như BlockingCollection, nơi lấy từ bộ sưu tập không chặn, nhưng thay vì không đồng bộ và sẽ gây ra một async chờ đợi cho đến khi một mục được đặt trong xếp hàng.awaitable Task based queue

Tôi đã bắt đầu triển khai của riêng mình nhưng có vẻ như nó không hoạt động như mong đợi. Tôi tự hỏi liệu tôi có đang sáng tạo lại thứ gì đó đã tồn tại không.

Dưới đây là thực hiện của tôi:

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 

    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 

    object queueSyncLock = new object(); 

    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> Dequeue() 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs=null; 
     T firstItem=default(T); 
     while (true) 
     { 
      bool ok; 
      lock (queueSyncLock) 
      { 
       ok = waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem); 
       if (ok) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        queue.TryDequeue(out firstItem); 
       } 
      } 
      if (!ok) break; 
      tcs.SetResult(firstItem); 
     } 
    } 
} 
+0

oh yuck .... ...... –

+21

@AdamSack: thực sự, nhưng nhận xét của bạn không giúp tôi. – spender

Trả lời

36

Tôi không biết của giải pháp không có khóa nhưng bạn có thể xem Dataflow library mới, một phần của Async CTP. Một đơn giản BufferBlock<T> nên là đủ, ví dụ .:

BufferBlock<int> buffer = new BufferBlock<int>(); 

Sản xuất và tiêu thụ được dễ dàng nhất thực hiện thông qua phương pháp khuyến nông về các loại khối dataflow.

sản xuất là đơn giản như:

buffer.Post(13); 

và tiêu thụ là async-ready:

int item = await buffer.ReceiveAsync(); 

tôi khuyên bạn nên sử dụng Dataflow nếu có thể; làm cho một bộ đệm như cả hiệu quả và chính xác là khó khăn hơn so với lần đầu tiên nó xuất hiện.

+0

Điều này có vẻ rất hứa hẹn ... sẽ kiểm tra nó vào ngày mai. Cảm ơn. Nó trông rất giống một cổng CCR. – spender

+2

Thay vào đó hãy xem trước khi đi ngủ! Có vẻ như Dataflow phù hợp với nhu cầu của tôi rất độc đáo. Nó dường như thu hẹp khoảng cách giữa những gì được cung cấp bởi TPL và những gì được cung cấp trong CCR (mà tôi đã sử dụng để thành công lớn). Nó khiến tôi cảm thấy tích cực rằng công việc tuyệt vời trong CCR đã không bị lãng phí. Đây là câu trả lời đúng (và một cái gì đó sáng bóng và mới để đánh răng của tôi vào!) Cảm ơn @StephenCleary. – spender

1

Nó có thể là quá mức cần thiết đối với trường hợp sử dụng của bạn (cho đường cong học tập), nhưng Reactive Extentions cung cấp tất cả các keo bao giờ bạn có thể muốn cho các thành phần không đồng bộ.

Bạn về cơ bản đăng ký các thay đổi và chúng được đẩy tới bạn khi chúng có sẵn và bạn có thể có hệ thống đẩy các thay đổi trên một chuỗi riêng biệt.

+0

Tôi ít nhất là một phần thành thạo trong Phản ứng, nhưng nó có một chút bí truyền để sử dụng trong sản xuất khi những người khác có thể phải duy trì mã.Tôi thực sự đào sự đơn giản mà async/await đang mang đến một sản phẩm máy chủ trước đây rất phức tạp, và tôi đang cố gắng giữ tất cả công nghệ không đồng bộ dưới một công nghệ duy nhất. – spender

-1

Bạn chỉ có thể sử dụng một BlockingCollection (sử dụng mặc định ConcurrentQueue) và quấn cuộc gọi đến Take trong một Task để bạn có thể await nó:

var bc = new BlockingCollection<T>(); 

T element = await Task.Run(() => bc.Take()); 
+4

Ý tưởng hay, nhưng tôi không hài lòng với việc chặn. Tôi sẽ có một vài nghìn khách hàng với hàng đợi tin nhắn của riêng họ. Bất kỳ chặn nào cũng sẽ làm chìm tàu ​​vì nó sẽ buộc các luồng không làm gì cả. Lý do tôi muốn một nhiệm vụ không bị ngăn chặn, không ngăn chặn là vì vậy tôi có thể giữ tất cả các hoạt động trong threadpool mà không gây ra nạn đói. – spender

0

Đây là việc thực hiện Tôi hiện đang sử dụng.

public class MessageQueue<T> 
{ 
    ConcurrentQueue<T> queue = new ConcurrentQueue<T>(); 
    ConcurrentQueue<TaskCompletionSource<T>> waitingQueue = 
     new ConcurrentQueue<TaskCompletionSource<T>>(); 
    object queueSyncLock = new object(); 
    public void Enqueue(T item) 
    { 
     queue.Enqueue(item); 
     ProcessQueues(); 
    } 

    public async Task<T> DequeueAsync(CancellationToken ct) 
    { 
     TaskCompletionSource<T> tcs = new TaskCompletionSource<T>(); 
     ct.Register(() => 
     { 
      lock (queueSyncLock) 
      { 
       tcs.TrySetCanceled(); 
      } 
     }); 
     waitingQueue.Enqueue(tcs); 
     ProcessQueues(); 
     return tcs.Task.IsCompleted ? tcs.Task.Result : await tcs.Task; 
    } 

    private void ProcessQueues() 
    { 
     TaskCompletionSource<T> tcs = null; 
     T firstItem = default(T); 
     lock (queueSyncLock) 
     { 
      while (true) 
      { 
       if (waitingQueue.TryPeek(out tcs) && queue.TryPeek(out firstItem)) 
       { 
        waitingQueue.TryDequeue(out tcs); 
        if (tcs.Task.IsCanceled) 
        { 
         continue; 
        } 
        queue.TryDequeue(out firstItem); 
       } 
       else 
       { 
        break; 
       } 
       tcs.SetResult(firstItem); 
      } 
     } 
    } 
} 

Nó hoạt động đủ tốt, nhưng có khá nhiều tranh cãi trên queueSyncLock, như tôi đang làm khá nhiều việc sử dụng CancellationToken để hủy bỏ một số nhiệm vụ chờ đợi. Tất nhiên, điều này dẫn đến đáng kể ít chặn tôi sẽ nhìn thấy với một BlockingCollection nhưng ...

Tôi đang tự hỏi nếu có một mượt mà, khóa phương tiện miễn phí để đạt được cùng một mục

2

atempt của tôi (nó có một sự kiện lớn lên khi một "lời hứa" được tạo ra, và nó có thể được sử dụng bởi một nhà sản xuất bên ngoài để biết khi nào để sản xuất nhiều mặt hàng):

public class AsyncQueue<T> 
{ 
    private ConcurrentQueue<T> _bufferQueue; 
    private ConcurrentQueue<TaskCompletionSource<T>> _promisesQueue; 
    private object _syncRoot = new object(); 

    public AsyncQueue() 
    { 
     _bufferQueue = new ConcurrentQueue<T>(); 
     _promisesQueue = new ConcurrentQueue<TaskCompletionSource<T>>(); 
    } 

    /// <summary> 
    /// Enqueues the specified item. 
    /// </summary> 
    /// <param name="item">The item.</param> 
    public void Enqueue(T item) 
    { 
     TaskCompletionSource<T> promise; 
     do 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return;          
      } 
     } 
     while (promise != null); 

     lock (_syncRoot) 
     { 
      if (_promisesQueue.TryDequeue(out promise) && 
       !promise.Task.IsCanceled && 
       promise.TrySetResult(item)) 
      { 
       return; 
      } 

      _bufferQueue.Enqueue(item); 
     }    
    } 

    /// <summary> 
    /// Dequeues the asynchronous. 
    /// </summary> 
    /// <param name="cancellationToken">The cancellation token.</param> 
    /// <returns></returns> 
    public Task<T> DequeueAsync(CancellationToken cancellationToken) 
    { 
     T item; 

     if (!_bufferQueue.TryDequeue(out item)) 
     { 
      lock (_syncRoot) 
      { 
       if (!_bufferQueue.TryDequeue(out item)) 
       { 
        var promise = new TaskCompletionSource<T>(); 
        cancellationToken.Register(() => promise.TrySetCanceled()); 

        _promisesQueue.Enqueue(promise); 
        this.PromiseAdded.RaiseEvent(this, EventArgs.Empty); 

        return promise.Task; 
       } 
      } 
     } 

     return Task.FromResult(item); 
    } 

    /// <summary> 
    /// Gets a value indicating whether this instance has promises. 
    /// </summary> 
    /// <value> 
    /// <c>true</c> if this instance has promises; otherwise, <c>false</c>. 
    /// </value> 
    public bool HasPromises 
    { 
     get { return _promisesQueue.Where(p => !p.Task.IsCanceled).Count() > 0; } 
    } 

    /// <summary> 
    /// Occurs when a new promise 
    /// is generated by the queue 
    /// </summary> 
    public event EventHandler PromiseAdded; 
} 
+0

Tôi nghĩ đây là giải pháp tốt nhất. Tôi đã thực hiện điều này và thử nghiệm nó rộng rãi. Một vài lưu ý: lời gọi đến! Promise.Task.IsCanceled là không cần thiết. Tôi đã thêm một ManualResetEventSlim để theo dõi khi bufferQueue trống để người gọi có thể chặn để đợi hàng đợi rỗng. –

+0

Bạn [nên được xử lý] (http://stackoverflow.com/a/21653382/298609) 'CancellationTokenRegistration' bạn nhận được từ cuộc gọi' cancelToken.Register'. – Paya