Trường hợp sử dụng của tôi là: Tôi nhận được danh sách các liên kết cố định và cần phát hành hai yêu cầu REST cho mỗi liên kết cố định để lấy dữ liệu của chúng trong các phần. Khi cả hai yêu cầu trở lại, tôi muốn kết hợp thông tin của họ với nhau và làm một cái gì đó với nó (ở đây - in nó ra). Tôi muốn làm điều đó với mã bằng cách sử dụng toán tử zip
. Đây là mã hiện tại của tôi (cùng với mocks cho thư viện tôi đang sử dụng):Xử lý lỗi cho các quan sát được nén
public class Main {
public static void main(String[] args) {
ContentManager cm = new ContentManager();
Observable
.from(cm.getPermalinks(10))
.flatMap(permalink -> Observable.zip(
Observable.<Content>create(subscriber -> cm.getDataByPermalink(permalink, new SubscribingRestCallback(subscriber))),
Observable.<Content>create(subscriber -> cm.getStreamByPermalink(permalink, new SubscribingRestCallback(subscriber))),
(dataContent, streamUrlContent) -> {
if (dataContent == null || streamUrlContent == null) {
System.err.println("not zipping " + dataContent + " and " + streamUrlContent);
return Observable.empty();
}
return new Content(dataContent.permalink, dataContent.logoUrl, streamUrlContent.streamUrl);
}))
.subscribe(System.out::println);
}
}
class SubscribingRestCallback implements RestCallback {
private final Subscriber<? super Content> subscriber;
public SubscribingRestCallback(Subscriber<? super Content> subscriber) {
this.subscriber = subscriber;
}
@Override
public void onSuccess(Content content) {
subscriber.onNext(content);
subscriber.onCompleted();
}
@Override
public void onFailure(int code, String message) {
System.err.println(message);
subscriber.onNext(null);
subscriber.onCompleted();
}
}
public class Content {
public final String permalink;
public final String logoUrl;
public final String streamUrl;
public Content(String permalink, String logoUrl, String streamUrl) {
this.permalink = permalink;
this.logoUrl = logoUrl;
this.streamUrl = streamUrl;
}
@Override
public String toString() {
return String.format("Content [%s, %s, %s]", permalink, logoUrl, streamUrl);
}
}
public interface RestCallback {
void onSuccess(Content content);
void onFailure(int code, String message);
}
class ContentManager {
private final Random random = new Random();
public List<String> getPermalinks(int n) {
List<String> permalinks = new ArrayList<>(n);
for (int i = 1; i <= n; ++i) {
permalinks.add("perma_" + i);
}
return permalinks;
}
public void getDataByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, false);
}
public void getStreamByPermalink(String permalink, RestCallback callback) {
getByPermalink(permalink, callback, true);
}
private void getByPermalink(String permalink, RestCallback callback, boolean stream) {
// simulate network latency and unordered results
new Thread(() -> {
try {
Thread.sleep(random.nextInt(1000) + 200);
} catch (InterruptedException e) {
e.printStackTrace();
}
if (random.nextInt(100) < 95) {
String logoUrl;
String streamUrl;
if (stream) {
logoUrl = null;
streamUrl = "http://" + permalink + "/stream";
} else {
logoUrl = "http://" + permalink + "/logo.png";
streamUrl = null;
}
callback.onSuccess(new Content(permalink, logoUrl, streamUrl));
} else {
callback.onFailure(-1, permalink + " data failure");
}
}).start();
}
}
Nói chung, nó hoạt động, nhưng tôi không thích xử lý lỗi trong quá trình triển khai này. Về cơ bản, các yêu cầu REST có thể không thành công, trong trường hợp này, phương thức onFailure
gọi subscriber.onNext(null)
để phương thức zip
luôn có thứ gì đó hoạt động (một yêu cầu có thể bị lỗi, nhưng yêu cầu khác có thể không và tôi không biết). Sau đó, trong hàm zip
tôi cần một số if
kiểm tra cả hai không phải là null
(mã của tôi sẽ bị lỗi nếu bất kỳ phần nào của Content
s là null
).
Tôi muốn có thể lọc ra null
bằng cách sử dụng toán tử filter
ở đâu đó, nếu có thể. Hoặc có thể có một cách tốt hơn so với việc phát ra các giá trị null
cho trường hợp lỗi nhưng do đó nó vẫn hoạt động với chức năng zip
?
Tại sao bạn gọi 'người đăng ký.onTiếp theo (null)' từ phương thức 'onFailure'? Bạn có nghĩa vụ phải gọi 'subscriber.onError (throwable)' trong trường hợp này. –
Bởi vì gọi 'onError()' sẽ thất bại toàn bộ luồng, và tôi không muốn điều này. Tôi muốn thu thập càng nhiều kết quả càng tốt, và về cơ bản bỏ qua/lọc ra các lỗi. Hoặc là tôi sai? – wujek
Thậm chí nếu bạn không muốn toàn bộ luồng thất bại, bạn vẫn cần gọi phương thức 'subscriber.onError()'. Có một số cách khác để làm cạn các lỗi. Một trong số đó là toán tử 'onErrorResumeNext'. Dưới đây là ví dụ về cách sử dụng nó - https://gist.github.com/nsk-mironov/3270b103fc3326a325e2. –