2015-11-20 15 views
7

là có anyway để nói java rx sử dụng các chủ đề hiện tại trong các chức năng quan sát? Tôi đang viết mã cho syncadapter android và tôi muốn kết quả được quan sát trong thread adapter đồng bộ và không nằm trong chủ đề chính.Làm thế nào để quan sátBên trong chuỗi gọi trong java rx?

Một ví dụ cuộc gọi mạng với Retrofit + RX Java trông giống như thế:

MyRetrofitApi.getInstance().getObjects() 
.subscribeOn(Schedulers.io()) 
.observeOn(<current_thread>) 
.subscribe(new Subscriber<Object>() { 
    //do stuff on the sync adapter thread 

} 

Tôi đã cố gắng sử dụng sử dụng

... 
.observeOn(AndroidSchedulers.handlerThread(new Handler(Looper.myLooper()))) 
... 

đó là cùng một cách android rx tạo ra lịch trình cho các chủ đề chính nhưng không hoạt động ngay khi tôi thay thế Looper.myLooper() cho Looper.getMainLooper().

Tôi có thể sử dụng Schedulers.newThread() nhưng là mã đồng bộ phức tạp với nhiều cuộc gọi máy chủ. Tôi sẽ liên tục tạo chuỗi mới chỉ để kích hoạt cuộc gọi mạng mới tạo lại chủ đề mới để khởi chạy nhiều cuộc gọi mạng hơn . Có cách nào để làm việc này không? Hoặc là cách tiếp cận của tôi chính nó hoàn toàn sai?

+0

Đây là một chút suy đoán, vì vậy tôi không gửi bài này như một Trả lời: Từ phiên bản 2.0-beta2 trở đi, Retrofit không đưa yêu cầu mạng vào một chuỗi khác nữa - xem tại đây: https://github.com/square/retrofit/commit/38ce2bee70342ac1ab08115d74802d3a54d85511 Vì vậy, nếu bạn đang sử dụng phiên bản hiện tại của Retrofit bạn sẽ có thể chỉ cần bỏ qua 'subscribeOn' và' observOn' hoàn toàn và chỉ ở lại trên thread adapter đồng bộ toàn bộ thời gian. Hoặc tôi đã hiểu sai câu hỏi của bạn và bạn muốn tạo chủ đề mới nhưng họ chỉ nên quay trở lại chủ đề mà bạn đã bắt đầu? –

Trả lời

1

Ồ, tôi chỉ thấy điều này trong wiki tại địa chỉ: https://github.com/ReactiveX/RxAndroid#observing-on-arbitrary-threads

new Thread(new Runnable() { 
    @Override 
    public void run() { 
     final Handler handler = new Handler(); // bound to this thread 
     Observable.just("one", "two", "three", "four", "five") 
       .subscribeOn(Schedulers.newThread()) 
       .observeOn(HandlerScheduler.from(handler)) 
       .subscribe(/* an Observer */) 

     // perform work, ... 
    } 
}, "custom-thread-1").start(); 

Tôi nghĩ rằng điều này sẽ làm việc cho trường hợp của bạn, quá - ngoại trừ việc tạo ra một chủ đề mới, tất nhiên ... Vì vậy chỉ:

final Handler handler = new Handler(); // bound to this thread 
MyRetrofitApi.getInstance().getObjects() 
    .subscribeOn(Schedulers.io()) 
    .observeOn(HandlerScheduler.from(handler)) 
    .subscribe(new Subscriber<Object>() { 
     //do stuff on the sync adapter thread 

    } 
+0

Cảm ơn bạn đã trả lời nhanh chóng. Tôi nâng cấp android rx (tôi đã sử dụng một phiên bản cũ) và đã thử ví dụ. Thật không may nó đã không làm việc, nhưng tôi tạo ra một số xét nghiệm và phát hiện ra rằng onNext() được gọi là nhưng các thuê bao không nhận được nó. Chỉ sau khi thêm Looper.loop() vào cuối nó dường như hoạt động (đó là một vòng lặp infite xử lý thông điệp). Ngay cả ví dụ tương tự cũng cho tôi biết tôi cần sử dụng Looper.prepare() trước khi tạo Trình xử lý. Có lẽ tôi đang làm một cái gì đó về cơ bản sai hoặc nó có thể là một vấn đề với rx android chính nó? – tiqz

+0

Tôi sợ tôi không thể nói cho bạn biết những gì đang xảy ra ở đây ... hãy xem những gì người khác phải nói ... –

+0

@tiqz 'phát hiện ra rằng onNext() được gọi nhưng Người đăng ký không nhận được nó 'onNext là phương thức của người đăng ký để những gì bạn viết có chút lạ đối với tôi. Về lý thuyết câu trả lời của david là tốt. Tôi khuyên bạn nên đăng các nội dung chi tiết hơn về những gì bạn đang cố gắng để chúng tôi có thể giúp bạn. – Diolor

3

hãy thử sử dụng Schedulers.immediate()

MyRetrofitApi.getInstance().getObjects() 
.subscribeOn(Schedulers.io()) 
.observeOn(Schedulers.immediate()) 
.subscribe(new Subscriber<Object>() { 
    //do stuff on the sync adapter thread 

} 

mô tả của nó nói: Creates and returns a Scheduler that executes work immediately on the current thread.

LƯU Ý:
tôi nghĩ rằng nó không quan trọng để giữ tất cả các công việc trên chủ đề của SyncAdapter bởi vì nó đã được sử dụng một sợi khác nhau

+0

Schedulers.immediate() không hoạt động đối với tôi. Tôi cần quan sát trên GlThread nhưng nó quan sát trên thread từ subscribeOn. Điều gì có thể là lý do? –

+0

@DmitriyPuchkov thứ tự của các chức năng 'subscribeOn' và 'observOn' của bạn là gì? –

+0

Thứ tự giống như bạn viết. Tôi giải quyết vấn đề của tôi bằng cách tạo ra tùy chỉnh thực thi mà đăng runnables để GLThread. –

Các vấn đề liên quan