2011-06-30 30 views
7

Có một vài bài viết về điều này, và tôi làm việc này ... nhưng tôi muốn biết cách đặt số lượng chủ đề Task tối đa cho các đăng ký quan sát cùng một lúc.Xử lý hàng đợi không đồng bộ với phần mở rộng phản ứng

Tôi có sau đây để parallelize tiết kiệm async các mục nhật ký:

private BlockingCollection<ILogEntry> logEntryQueue; 

logEntryQueue = new BlockingCollection<ILogEntry>(); 
logEntryQueue.GetConsumingEnumerable().ToObservable(Scheduler.TaskPool).Subscribe(SaveLogEntry); 

Để lập kế hoạch tiết kiệm của tôi ... nhưng làm thế nào để xác định các chủ đề tối đa cho scheduler để sử dụng cùng một lúc?

+0

Nếu bạn đang tạo lịch trình chỉ giới hạn cho các chuỗi đồng thời tối đa thì hãy xem xét xem TPL Dataflow. Nó được xây dựng đặc biệt để tạo ra các đường ống dẫn nơi mỗi khối trong đường ống có các giới hạn khác nhau để đồng thời. Ít nhất thì nó cũng dễ hiểu hơn và có thể duy trì được đối với tôi khi tôi tạo ra cả hai phương pháp. – yzorg

Trả lời

8

Đây không phải là chức năng của Quan sát, mà là chức năng của Trình lập lịch biểu. Đài quan sát xác định những gì và trình lên lịch xác định trong đó.

Bạn cần chuyển qua trình lên lịch tùy chỉnh. Một cách đơn giản để làm điều này sẽ là phân lớp TaskScheduler và ghi đè thuộc tính "MaximumConcurrencyLevel".

http://msdn.microsoft.com/en-us/library/system.threading.tasks.taskscheduler.maximumconcurrencylevel.aspx

Tôi thực sự tìm thấy một mẫu xe này trên MSDN:

http://msdn.microsoft.com/en-us/library/ee789351.aspx

Edit: Bạn hỏi về làm thế nào để đi từ TaskScheduler để IScheduler. Một nhà phát triển chỉ cho tôi rằng một chút thông tin:

var ischedulerForRx = new TaskPoolScheduler 
(
    new TaskFactory 
    (
     //This is your custom scheduler 
     new LimitedConcurrencyLevelTaskScheduler(1) 
    ) 
); 
+1

Thật tuyệt vời, cảm ơn ... nhưng làm thế nào để tôi có được một bộ đếm phản ứng phản ứng cho TaskScheduler mới của tôi? Tôi không muốn thực hiện lại toàn bộ chức năng. – Jeff

2

Nếu bạn tạo "tác phẩm" của bạn như IObservable<T> với thực hiện chậm (. tức là họ muốn làm bất cứ điều gì cho đến khi đăng ký đến), bạn có thể sử dụng quá tải Merge chấp nhận một số đăng ký đồng thời tối đa:

ISubject<QueueItem> synchronizedQueue = new Subject<QueueItem>().Synchronize(); 

queue 
    .Select(item => StartWork(item)) 
    .Merge(maxConcurrent: 5) // C# 4 syntax for illustrative purposes 
    .Subscribe(); 

// To enqueue: 
synchronizedQueue.OnNext(new QueueItem()); 
+0

Làm cách nào để xử lý các chuỗi khác thêm vào hàng đợi trong khi hàng đợi đang xử lý? Nó sẽ không bị thu thập được sửa đổi trong quá trình điều tra? – Jeff

+0

Toán tử 'Merge' là luồng an toàn, nhưng tôi đã cập nhật câu trả lời của mình để sử dụng chủ đề an toàn chỉ trong trường hợp. –

+0

Tôi đoán nó chỉ là sự nhầm lẫn của tôi ở đây ... nó không phải là chủ đề an toàn mà tôi quan tâm, mà là hàng đợi đang được sửa đổi (các mặt hàng khác đang được enqueued) vì nó đang được liệt kê ... Đây không phải là một vấn đề ? – Jeff

Các vấn đề liên quan