Tôi đang cố gắng sử dụng CloseableHttpAsyncClient
để đọc từ điểm cuối, marshall chuỗi vào đối tượng (sử dụng javax.json) rồi chuyển đổi mảng trên đối tượng thành các thành phần riêng lẻ:Chuyển đổi chuỗi thành mảng thành các đối tượng trong quan sát
CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build();
client.start();
Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client)
.toObservable();
Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> {
String stringVal = new String(bb);
StringReader reader = new StringReader(stringVal);
JsonObject jobj = Json.createReader(reader).readObject();
return jobj.getJsonArray("elements");
})).share();
tôi cần phải nhận được Json array, sau đó lọc các đối tượng của mảng:
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
làm thế nào để chuyển đổi các Observable<JsonArray>
thành một ObservableJsonObject>
?
Vì không đồng bộ, tôi không thể sử dụng forEach để tạo một số loại mảng để đệm dữ liệu.
UPDATE:
Vì vậy, Nhìn vào sử dụng CloseableHttpAsyncClient thể không phải là giải pháp tốt nhất cho những gì tôi đang cố gắng để đạt được - tôi nhận ra sáng nay (trong phòng tắm của tất cả mọi thứ) mà tôi đang cố gắng để quá trình dữ liệu không đồng bộ để thực hiện cuộc gọi không đồng bộ.
Lý tưởng nhất là thực hiện cuộc gọi đến CloseableHttpClient (đồng bộ) và chuyển dữ liệu đến Observable để lọc sẽ là phương pháp lý tưởng hơn (tôi không cần cuộc gọi đầu tiên để quản lý nhiều hơn một cuộc gọi http).
CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build();
StringBuffer result = new StringBuffer();
try {
HttpGet request = new HttpGet(uri);
HttpResponse response = client.execute(request);
BufferedReader rd = new BufferedReader(
new InputStreamReader(response.getEntity().getContent()));
String line;
while ((line = rd.readLine()) != null) {
result.append(line);
}
} catch(ClientProtocolException cpe) { } catch(IOException ioe) { }
StringReader reader = new StringReader(result.toString());
JsonObject jobj = Json.createReader(reader).readObject();
JsonArray elements = jobj.getJsonArray("elements");
List<JsonObject> objects = elements.getValuesAs(JsonObject.class);
Observable<JsonObject> shareable = Observable.from(objects).share();
Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1"));
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2"));
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3"));
firstStream.subscribe(record -> {
//connect to SOTS/Facebook and store the results
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
secondStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
thirdStream.subscribe(record -> {
ByteArrayOutputStream baos = new ByteArrayOutputStream();
Json.createWriter(baos).writeObject(record);
System.out.println(baos.toString());
});
Bạn có thể gặp rắc rối với một này vì flatMap yêu cầu đầu vào của nó để hỗ trợ áp suất ngược và chỉ số Observable.create của bạn ement không hỗ trợ backpressure. Tôi muốn đề nghị cho rằng bit bạn sử dụng Observable.create (new AbstractOnSubscribe() {...}) mà cung cấp hỗ trợ backpressure. –
Đối với Observable.create bạn cũng có thể xây dựng một danh sách và trả về Observable.from (danh sách). –
Vì vậy, bằng cách sử dụng các giải pháp trên, tôi đã kết thúc phải dematerialize các Observable outcoming (nó trả về Observerable> do đó, nó gần gũi.Nhưng sử dụng dematerialize kết thúc trở lại Quan sát