2013-07-21 39 views
7

Tôi hiện đang cố gắng để quấn đầu của tôi xung quanh đồng thời với RX .NET và bị lẫn lộn bởi một cái gì đó. Tôi muốn chạy bốn nhiệm vụ tương đối chậm song song, vì vậy tôi giả định NewThreadScheduler.Default sẽ là cách để đi, vì nó "Đại diện cho một đối tượng lên lịch mỗi đơn vị công việc trên một chuỗi riêng biệt.".NewThreadScheduler.Default lịch trình tất cả các công việc trên cùng một chủ đề

Đây là mã thiết lập của tôi:

static void Test() 
    { 
     Console.WriteLine("Starting. Thread {0}", Thread.CurrentThread.ManagedThreadId); 

     var query = Enumerable.Range(1, 4); 
     var obsQuery = query.ToObservable(NewThreadScheduler.Default); 
     obsQuery.Subscribe(DoWork, Done); 

     Console.WriteLine("Last line. Thread {0}", Thread.CurrentThread.ManagedThreadId); 
    } 

    static void DoWork(int i) 
    { 
     Thread.Sleep(500); 
     Console.WriteLine("{0} Thread {1}", i, Thread.CurrentThread.ManagedThreadId); 
    } 

    static void Done() 
    { 
     Console.WriteLine("Done. Thread {0}", Thread.CurrentThread.ManagedThreadId); 
    } 

tôi cho rằng "X Chủ đề Y" sẽ ra một chủ đề khác nhau id mọi thời gian, tuy nhiên sản lượng thực tế là:

Starting. Thread 1 
Last line. Thread 1 
1 Thread 3 
2 Thread 3 
3 Thread 3 
4 Thread 3 
Done. Thread 3 

Tất cả công việc là là một trong cùng một chủ đề mới trong thứ tự tuần tự, đó không phải là những gì tôi mong đợi.

Tôi giả sử tôi đang thiếu thứ gì đó, nhưng tôi không thể hiểu được điều gì.

Trả lời

8

Có hai phần cho một truy vấn quan sát được, chính bản thân số QuerySubscription. (Đây cũng là sự khác biệt giữa các nhà khai thác ObserveOn và SubscribeOn.)

bạn Query

Enumerable 
    .Range(1, 4) 
    .ToObservable(NewThreadScheduler.Default); 

này tạo ra một quan sát được rằng sản xuất giá trị trên mặc định NewThreadScheduler cho hệ thống đó.

đăng ký của bạn là

obsQuery.Subscribe(DoWork, Done); 

này chạy DoWork cho mỗi giá trị được tạo ra bởi QueryDone khi Query kết thúc với một cuộc gọi OnComplete. Tôi không nghĩ rằng có bất kỳ đảm bảo về những gì thread các chức năng trong phương pháp đăng ký sẽ được gọi là, trong thực tế nếu tất cả các giá trị của truy vấn được sản xuất trên cùng một chủ đề đó là chủ đề đăng ký sẽ được chạy trên. Dường như họ cũng đang làm cho nó để tất cả các cuộc gọi đăng ký được thực hiện trên cùng một chủ đề mà rất có thể được thực hiện để thoát khỏi rất nhiều lỗi đa luồng phổ biến.

Vì vậy, bạn có hai vấn đề, một là với khai thác gỗ của bạn, nếu bạn thay đổi của bạn Query để

Enumerable 
    .Range(1, 4) 
    .Do(x => Console.WriteLine("Query Value {0} produced on Thread {1}", x, Thread.CurrentThread.ManagedThreadId); 
    .ToObservable(NewThreadScheduler.Default); 

Bạn sẽ thấy mỗi giá trị sản xuất trên một chủ đề mới.

Vấn đề khác là một trong những ý định và thiết kế của Rx. Đó là dự định rằng Query là quy trình chạy dài và Subscription là một phương pháp ngắn liên quan đến kết quả. Nếu bạn muốn chạy một hàm chạy dài dưới dạng Rx Observable, tùy chọn tốt nhất của bạn là sử dụng Observable.ToAsync.

+0

Cảm ơn bạn vì điều này. Không chỉ là bạn làm rõ những gì tôi đang làm sai, nhưng tôi cũng nghĩ rằng những gì bạn nói về ý định của Rx làm cho rất nhiều ý nghĩa. –

+0

Câu trả lời ở trên dường như đã lỗi thời, phương thức Do không còn khả dụng trên Enumerable. – VivekDev

+0

Để phương thức Do hoạt động, bạn cần phải cài đặt gói gói System.Interactive – VivekDev

Các vấn đề liên quan