2015-06-13 21 views
8

Trường hợp sử dụng của tôi là: Tôi nhận được danh sách các liên kết cố định và cần phát hành hai yêu cầu REST cho mỗi liên kết cố định để lấy dữ liệu của chúng trong các phần. Khi cả hai yêu cầu trở lại, tôi muốn kết hợp thông tin của họ với nhau và làm một cái gì đó với nó (ở đây - in nó ra). Tôi muốn làm điều đó với mã bằng cách sử dụng toán tử zip. Đây là mã hiện tại của tôi (cùng với mocks cho thư viện tôi đang sử dụng):Xử lý lỗi cho các quan sát được nén

public class Main { 

    public static void main(String[] args) { 
     ContentManager cm = new ContentManager(); 

     Observable 
       .from(cm.getPermalinks(10)) 
       .flatMap(permalink -> Observable.zip(
         Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
         Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
         (dataContent, streamUrlContent) -> { 
          if (dataContent == null || streamUrlContent == null) { 
           System.err.println("not zipping " + dataContent + " and " + streamUrlContent); 
           return Observable.empty(); 
          } 

          return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); 
         })) 
       .subscribe(System.out::println); 
    } 
} 

class SubscribingRestCallback implements RestCallback { 

    private final Subscriber<? super Content> subscriber; 

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) { 
     this.subscriber = subscriber; 
    } 

    @Override 
    public void onSuccess(Content content) { 
     subscriber.onNext(content); 
     subscriber.onCompleted(); 
    } 

    @Override 
    public void onFailure(int code, String message) { 
     System.err.println(message); 
     subscriber.onNext(null); 
     subscriber.onCompleted(); 
    } 
} 

public class Content { 

    public final String permalink; 

    public final String logoUrl; 

    public final String streamUrl; 

    public Content(String permalink, String logoUrl, String streamUrl) { 
     this.permalink = permalink; 
     this.logoUrl = logoUrl; 
     this.streamUrl = streamUrl; 
    } 

    @Override 
    public String toString() { 
     return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl); 
    } 
} 

public interface RestCallback { 

    void onSuccess(Content content); 

    void onFailure(int code, String message); 
} 

class ContentManager { 

    private final Random random = new Random(); 

    public List<String> getPermalinks(int n) { 
     List<String> permalinks = new ArrayList<>(n); 
     for (int i = 1; i <= n; ++i) { 
      permalinks.add("perma_" + i); 
     } 

     return permalinks; 
    } 

    public void getDataByPermalink(String permalink, RestCallback callback) { 
     getByPermalink(permalink, callback, false); 
    } 

    public void getStreamByPermalink(String permalink, RestCallback callback) { 
     getByPermalink(permalink, callback, true); 
    } 

    private void getByPermalink(String permalink, RestCallback callback, boolean stream) { 
     // simulate network latency and unordered results 
     new Thread(() -> { 
      try { 
       Thread.sleep(random.nextInt(1000) + 200); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      if (random.nextInt(100) < 95) { 
       String logoUrl; 
       String streamUrl; 
       if (stream) { 
        logoUrl = null; 
        streamUrl = "http://" + permalink + "/stream"; 
       } else { 
        logoUrl = "http://" + permalink + "/logo.png"; 
        streamUrl = null; 
       } 
       callback.onSuccess(new Content(permalink, logoUrl, streamUrl)); 
      } else { 
       callback.onFailure(-1, permalink + " data failure"); 
      } 
     }).start(); 
    } 
} 

Nói chung, nó hoạt động, nhưng tôi không thích xử lý lỗi trong quá trình triển khai này. Về cơ bản, các yêu cầu REST có thể không thành công, trong trường hợp này, phương thức onFailure gọi subscriber.onNext(null) để phương thức zip luôn có thứ gì đó hoạt động (một yêu cầu có thể bị lỗi, nhưng yêu cầu khác có thể không và tôi không biết). Sau đó, trong hàm zip tôi cần một số if kiểm tra cả hai không phải là null (mã của tôi sẽ bị lỗi nếu bất kỳ phần nào của Content s là null).

Tôi muốn có thể lọc ra null bằng cách sử dụng toán tử filter ở đâu đó, nếu có thể. Hoặc có thể có một cách tốt hơn so với việc phát ra các giá trị null cho trường hợp lỗi nhưng do đó nó vẫn hoạt động với chức năng zip?

+0

Tại sao bạn gọi 'người đăng ký.onTiếp theo (null)' từ phương thức 'onFailure'? Bạn có nghĩa vụ phải gọi 'subscriber.onError (throwable)' trong trường hợp này. –

+0

Bởi vì gọi 'onError()' sẽ thất bại toàn bộ luồng, và tôi không muốn điều này. Tôi muốn thu thập càng nhiều kết quả càng tốt, và về cơ bản bỏ qua/lọc ra các lỗi. Hoặc là tôi sai? – wujek

+0

Thậm chí nếu bạn không muốn toàn bộ luồng thất bại, bạn vẫn cần gọi phương thức 'subscriber.onError()'. Có một số cách khác để làm cạn các lỗi. Một trong số đó là toán tử 'onErrorResumeNext'. Dưới đây là ví dụ về cách sử dụng nó - https://gist.github.com/nsk-mironov/3270b103fc3326a325e2. –

Trả lời

5

Trước hết, đúng cách để thông báo cho một Subscriber về một lỗi là để gọi subscriber.onError phương pháp:

class SubscribingRestCallback implements RestCallback { 
    private final Subscriber<? super Content> subscriber; 

    public SubscribingRestCallback(Subscriber<? super Content> subscriber) { 
     this.subscriber = subscriber; 
    } 

    @Override 
    public void onSuccess(Content content) { 
     subscriber.onNext(content); 
     subscriber.onCompleted(); 
    } 

    @Override 
    public void onFailure(int code, String message) { 
     subscriber.onError(new Exception(message)); 
    } 
} 

Thậm chí nếu bạn không muốn toàn bộ dòng thất bại, bạn vẫn cần phải gọi một subscriber.onError(). Có một số cách khác để làm cạn các lỗi. Một trong số đó là một nhà điều hành onErrorResumeNext:

Observable 
     .from(cm.getPermalinks(10)) 
     .flatMap(permalink -> Observable.zip(
       Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       (dataContent, streamUrlContent) -> { 
        return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); 
       }).onErrorResumeNext(Observable.empty())) 
     .subscribe(System.out::println); 

EDIT

Tôi có một câu hỏi cuối cùng: nếu bạn nhận thấy chức năng dây kéo của tôi, tôi trở Observable.empty() nếu hai đối tượng có thể không được nén và khi tôi trả lại nội dung. Điều này có vẻ sai. Làm thế nào tôi nên xử lý các lỗi như vậy điều kiện trong chức năng dây kéo?

Có, trả lại Observable.empty() là hoàn toàn sai. Ném ngoại lệ từ hàm zip có vẻ như giải pháp tốt nhất:

Observable 
     .from(cm.getPermalinks(10)) 
     .flatMap(permalink -> Observable.zip(
       Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))), 
       (dataContent, streamUrlContent) -> { 
        if (!isDataValid(dataContent, streamUrlContent)) { 
         throw new RuntimeException("Something went wrong."); 
        } 
        return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl); 
       }).onErrorResumeNext(Observable.empty())) 
     .subscribe(System.out::println); 
+0

Điều cuối cùng tôi đã làm là: phương thức 'RestCallback # onFailure()' gọi 'subscriber.onError()', và trong 'RestCallback # onSuccess()' tôi hoặc gọi 'onNext()' và 'onComplete()' nếu nội dung là hợp lệ, hoặc 'onError()' nếu không. Sau đó, 'onResumeNext()' được áp dụng cho các 'có thể quan sát 'được nén với các loại lỗi nội dung này, và hàm nén của tôi đơn giản như' (dataContent, streamUrlContent) -> new Content (dataContent.permalink, dataContent. logoUrl, streamUrlContent.streamUrl)) '. Tôi đã thêm lệnh 'doOnError()' để ghi nhật ký bổ sung. Tôi thích mã ngay bây giờ. Cảm ơn bạn đã giúp đỡ. – wujek

Các vấn đề liên quan