2016-04-11 21 views
12

Tôi đang phát triển ứng dụng cho Android với đồng bộ hóa dữ liệu nền. Tôi hiện đang sử dụng RxJava để đăng một số dữ liệu trên máy chủ trong khoảng thời gian đều đặn. Ngoài ra, tôi muốn cung cấp cho người dùng một nút "buộc đồng bộ hóa" sẽ kích hoạt đồng bộ hóa ngay lập tức. Tôi biết cách sử dụng Observable.interval() để đẩy dữ liệu theo các khoảng thời gian đều đặn và tôi sẽ biết cách sử dụng Observalbe.just() để đẩy dữ liệu đó bị buộc, nhưng tôi muốn xếp hàng chúng nếu có xảy ra. chạy.Nhiệm vụ xếp hàng với RxJava trong Android

Vì vậy, hãy lấy ví dụ khi 1 phút là khoảng thời gian đồng bộ hóa tự động và giả sử rằng đồng bộ hóa kéo dài 40 giây (Tôi vượt quá cường điệu ở đây chỉ để tạo điểm dễ dàng hơn). Bây giờ nếu có bất kỳ cơ hội nào, người dùng nhấn nút "lực" khi tự động vẫn đang chạy (hoặc ngược lại - các bộ kích hoạt tự động khi nút bắt buộc vẫn đang chạy), tôi muốn xếp hàng yêu cầu đồng bộ thứ hai để thực hiện đầu tiên kết thúc.

Tôi đã vẽ hình ảnh này có thể đặt một số quan điểm nhiều hơn với nó:

enter image description here

Như bạn có thể thấy, tự động được kích hoạt (theo một số Observable.interval()), và ở giữa đồng bộ, người dùng nhấn nút "lực". Bây giờ chúng tôi muốn chờ yêu cầu đầu tiên kết thúc và sau đó bắt đầu lại cho yêu cầu bắt buộc. Tại một thời điểm, trong khi yêu cầu bắt buộc đang chạy, yêu cầu tự động mới được kích hoạt lại mà chỉ cần thêm nó vào hàng đợi. Sau khi người cuối cùng đã được hoàn thành từ hàng đợi tất cả dừng lại, và sau đó tự động được lên kế hoạch một lần nữa ít sau này.

Hy vọng ai đó có thể chỉ cho tôi để sửa nhà điều hành cách thực hiện việc này. Tôi đã thử với Observable.combineLatest(), nhưng danh sách hàng đợi đã được gửi đi lúc bắt đầu và khi tôi thêm đồng bộ mới vào hàng đợi, nó không tiếp tục khi hoạt động trước đó được hoàn thành.

Any help is appreciated rất nhiều, Darko

Trả lời

11

Bạn có thể làm điều này bằng cách kết hợp bộ đếm thời gian với nút bấm Observable/Subject, sử dụng hiệu ứng xếp hàng của onBackpressureBuffer và concatMap chế biến vào nó mà làm cho chắc chắn rằng chạy một tại một thời điểm.

PublishSubject<Long> subject = PublishSubject.create(); 

Observable<Long> periodic = Observable.interval(1, 1, TimeUnit.SECONDS); 

periodic.mergeWith(subject) 
.onBackpressureBuffer() 
.concatMap(new Func1<Long, Observable<Integer>>() { 
    @Override 
    public Observable<Integer> call(Long v) { 
     // simulates the task to run 
     return Observable.just(1) 
       .delay(300, TimeUnit.MILLISECONDS); 
    } 
} 
).subscribe(System.out::println, Throwable::printStackTrace); 

Thread.sleep(1100); 
// user clicks a button 
subject.onNext(-1L); 

Thread.sleep(800); 
+0

Ok, đó là một hướng đi tốt, nhưng tôi có một vài câu hỏi ... Có gì với chức năng ẩn danh? Tôi không sử dụng Java 8 vì tôi phải phát triển cho cấp API 16. Ngoài ra, dòng mô phỏng nhấn nút nào? –

+0

Lambda cho biết nơi đồng bộ hóa của bạn sẽ xảy ra - trong biểu mẫu Quan sát được. Subject.onNext() là nơi người dùng nhấp vào một nút. – akarnokd

+0

Thêm một câu hỏi nữa nếu bạn có thể giúp tôi. Vì vậy, tôi đã thực hiện điều này khi [@akarnokd] (http://stackoverflow.com/users/61158/akarnokd) đề xuất và tất cả đều hoạt động tốt, chỉ là những gì tôi cần. Bây giờ tôi đang cố gắng nhận lệnh 'onComplete()' sau khi chuỗi kết thúc. Nếu bạn nhìn vào hình trên, chuỗi đầu tiên sẽ kết thúc sau quá trình đồng bộ hóa thứ 3. Điều này là bởi vì có lẽ tôi muốn cập nhật giao diện người dùng sau khi chuỗi kết thúc thay vì sau đó sau mỗi lần đồng bộ hóa. –

3

Mặc dù có một câu trả lời chấp nhận với một giải pháp tốt, tôi muốn chia sẻ một tùy chọn để thực hiện điều này bằng Scheduler và SingleThreadExecutor

public static void main(String[] args) throws Exception { 
    System.out.println(" init "); 
    Observable<Long> numberObservable = 
      Observable.interval(700, TimeUnit.MILLISECONDS).take(10); 

    final Subject subject = PublishSubject.create(); 

    Executor executor = Executors.newSingleThreadExecutor(); 
    Scheduler scheduler = Schedulers.from(executor); 
    numberObservable.observeOn(scheduler).subscribe(subject); 

    subject.subscribe(onNextFunc("subscriber 1"), onErrorFunc("subscriber 1"), 
        onCompleteFunc("subscriber 1")); 

    Thread.sleep(800); 
    //simulate action 
    executor.execute(new Runnable() { 
     @Override 
     public void run() { 
      subject.onNext(333l); 
     } 
    }); 

    Thread.sleep(5000); 
} 

static Action1<Long> onNextFunc(final String who) { 
    return new Action1<Long>() { 
     public void call(Long x) { 
      System.out.println(who + " got " + x + " :: " + Thread.currentThread().getName() 
        + " -- " + System.currentTimeMillis()); 
      try { 
       //simulate some work 
       Thread.sleep(500); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
     } 
    }; 
} 

static Action1<Throwable> onErrorFunc(final String who) { 
    return new Action1<Throwable>() { 
     public void call(Throwable t) { 
      t.printStackTrace(); 
     } 
    }; 
} 

static Action0 onCompleteFunc(final String who) { 
    return new Action0() { 
     public void call() { 
      System.out.println(who + " complete"); 
     } 
    }; 
} 
Các vấn đề liên quan