2015-04-18 21 views
5

Tôi đang thử nghiệm với RxJava và lớp hoàn thiện Javacủa Java 8 và không hoàn toàn hiểu cách xử lý các điều kiện hết thời gian chờ.Hủy tác vụ khi hết thời gian chờ ở RxJava

import static net.javacrumbs.futureconverter.java8rx.FutureConverter.toObservable; 

// ... 

    Observable<String> doSomethingSlowly() { 
     CompletableFuture<PaymentResult> task = CompletableFuture.supplyAsync(() -> { 
      // this call may be very slow - if it takes too long, 
      // we want to time out and cancel it. 
      return processor.slowExternalCall(); 

     }); 

     return toObservable(task); 
    } 

    // ... 

    doSomethingSlowly() 
     .single() 
     .timeout(3, TimeUnit.SECONDS, Observable.just("timeout")); 

Điều này về cơ bản hoạt động (nếu hết thời gian chờ ba giây, "thời gian chờ" được xuất bản). Tuy nhiên, tôi cũng muốn hủy bỏ nhiệm vụ trong tương lai mà tôi đã bao bọc trong một số Observable - có thể với phương pháp RxJava làm trung tâm không?

Tôi biết rằng một tùy chọn sẽ là tự xử lý thời gian chờ, sử dụng task.get(3, TimeUnit.SECONDS), nhưng tôi tự hỏi liệu có thể thực hiện tất cả công việc xử lý tác vụ trong RxJava hay không.

+0

Bạn có thể cho biết phương thức 'toObservable' có được triển khai không? –

+1

https://github.com/lukas-krecan/future-converter/tree/master/rxjava-java8 –

Trả lời

10

Có, bạn có thể thực hiện việc này. Bạn sẽ thêm một số Subscription vào số Subscriber.

Điều này cho phép bạn nghe trong các đăng ký hủy đăng ký, điều này sẽ xảy ra nếu bạn gọi một cách rõ ràng subscribe().unsubscribe() hoặc nếu Observable hoàn tất thành công hoặc có lỗi.

Nếu bạn thấy việc hủy đăng ký trước khi tương lai đã hoàn tất, bạn có thể giả sử đó là do unsubscribe hoặc thời gian chờ rõ ràng.

public class FutureTest { 
    public static void main(String[] args) throws IOException { 
     doSomethingSlowly() 
       .timeout(1, TimeUnit.SECONDS, Observable.just("timeout")) 
       .subscribe(System.out::println); 
     System.in.read(); // keep process alive 
    } 

    private static Observable<String> doSomethingSlowly() { 
     CompletableFuture<String> future = CompletableFuture.supplyAsync(() -> { 
      try { 
       Thread.sleep(2000); 
      } catch (InterruptedException e) { 
      } 
      return "Something"; 

     }); 
     return toObservable(future); 
    } 

    private static <T> Observable<T> toObservable(CompletableFuture<T> future) { 
     return Observable.create(subscriber -> { 
      subscriber.add(new Subscription() { 
       private boolean unsubscribed = false; 
       @Override 
       public void unsubscribe() { 
        if (!future.isDone()){ 
         future.cancel(true); 
        } 
        unsubscribed = true; 
       } 

       @Override 
       public boolean isUnsubscribed() { 
        return unsubscribed; 
       } 
      }); 

      future.thenAccept(value -> { 
       if (!subscriber.isUnsubscribed()){ 
        subscriber.onNext(value); 
        subscriber.onCompleted(); 
       } 
      }).exceptionally(throwable -> { 
       if (!subscriber.isUnsubscribed()) { 
        subscriber.onError(throwable); 
       } 
       return null; 
      }); 
     }); 
    } 
} 
+1

Cảm ơn, đó là một mô hình hữu ích cần biết. Trong khi chờ đợi, tôi phát hiện ra rằng [ngữ nghĩa của 'CompletableFuture.cancel' khác với tương lai thuần túy] (http://java.dzone.com/articles/completablefuture-cant-be) - bạn không thể ngắt một' CompletableFuture 'bằng cách sử dụng phương thức' cancel'. Giải pháp bạn đề xuất tuy nhiên dịch độc đáo cũng thành đồng bằng tương lai, vì vậy tôi sẽ đi với điều đó. – jstaffans

Các vấn đề liên quan