2015-07-21 17 views
5

Tôi đang cố gắng sử dụng CloseableHttpAsyncClient để đọc từ điểm cuối, marshall chuỗi vào đối tượng (sử dụng javax.json) rồi chuyển đổi mảng trên đối tượng thành các thành phần riêng lẻ:Chuyển đổi chuỗi thành mảng thành các đối tượng trong quan sát

CloseableHttpAsyncClient client = HttpAsyncClientBuilder.create().setDefaultCredentialsProvider(provider).build(); 

client.start(); 

Observable<ObservableHttpResponse> observable = ObservableHttp.createRequest(HttpAsyncMethods.createGet(uri), client) 
      .toObservable(); 

Observable<JsonArray> shareable = observable.flatMap(response -> response.getContent().map(bb -> { 
     String stringVal = new String(bb); 
     StringReader reader = new StringReader(stringVal); 
     JsonObject jobj = Json.createReader(reader).readObject(); 
     return jobj.getJsonArray("elements"); 
    })).share(); 

tôi cần phải nhận được Json array, sau đó lọc các đối tượng của mảng:

Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); 
Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); 
Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3")); 

làm thế nào để chuyển đổi các Observable<JsonArray> thành một ObservableJsonObject>?

Vì không đồng bộ, tôi không thể sử dụng forEach để tạo một số loại mảng để đệm dữ liệu.

UPDATE:

Vì vậy, Nhìn vào sử dụng CloseableHttpAsyncClient thể không phải là giải pháp tốt nhất cho những gì tôi đang cố gắng để đạt được - tôi nhận ra sáng nay (trong phòng tắm của tất cả mọi thứ) mà tôi đang cố gắng để quá trình dữ liệu không đồng bộ để thực hiện cuộc gọi không đồng bộ.

Lý tưởng nhất là thực hiện cuộc gọi đến CloseableHttpClient (đồng bộ) và chuyển dữ liệu đến Observable để lọc sẽ là phương pháp lý tưởng hơn (tôi không cần cuộc gọi đầu tiên để quản lý nhiều hơn một cuộc gọi http).

CloseableHttpClient client = HttpClientBuilder.create().setDefaultCredentialsProvider(provider).build(); 

    StringBuffer result = new StringBuffer(); 

    try { 
     HttpGet request = new HttpGet(uri); 
     HttpResponse response = client.execute(request); 

     BufferedReader rd = new BufferedReader(
       new InputStreamReader(response.getEntity().getContent())); 

     String line; 
     while ((line = rd.readLine()) != null) { 
      result.append(line); 
     } 
    } catch(ClientProtocolException cpe) { } catch(IOException ioe) { } 

    StringReader reader = new StringReader(result.toString()); 
    JsonObject jobj = Json.createReader(reader).readObject(); 
    JsonArray elements = jobj.getJsonArray("elements"); 

    List<JsonObject> objects = elements.getValuesAs(JsonObject.class); 


    Observable<JsonObject> shareable = Observable.from(objects).share(); 

    Observable<JsonObject> firstStream = shareable.filter(item -> item.getString("type").equals("TYPE_1")); 
    Observable<JsonObject> secondStream = shareable.filter(item -> item.getString("type").equals("TYPE_2")); 
    Observable<JsonObject> thirdStream = shareable.filter(item -> item.getString("type").equals("TYPE_3")); 


    firstStream.subscribe(record -> { 
     //connect to SOTS/Facebook and store the results 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     Json.createWriter(baos).writeObject(record); 
     System.out.println(baos.toString()); 
    }); 

    secondStream.subscribe(record -> { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     Json.createWriter(baos).writeObject(record); 
     System.out.println(baos.toString()); 
    }); 

    thirdStream.subscribe(record -> { 
     ByteArrayOutputStream baos = new ByteArrayOutputStream(); 
     Json.createWriter(baos).writeObject(record); 
     System.out.println(baos.toString()); 
    }); 

Trả lời

2

Hãy thử mã này:

String myjson = "{\"elements\": [{\"text\":\"Obj1\"},{\"text\":\"Obj2\"}, {\"text\":\"Obj3\"}]}"; 

    Observable.just(myjson) 
      .map(jsonStr -> new StringReader(myjson)) 
      .map(reader -> Json.createReader(reader).readObject()) 
      .map(jobj -> jobj.getJsonArray("elements")) 
      .map(elements -> elements.toArray(new JsonObject[elements.size()])) 
      .flatMap(jsonObjects -> Observable.from(jsonObjects)) 
      .subscribe(
        (jsonObject) -> System.out.println(jsonObject.getString("text")), 
        throwable -> throwable.printStackTrace(), 
        () -> System.out.println("On complete")); 

Kết quả:

07-22 12: 19: 22,362 8032-8032/com.mediamanagment.app I/System.out ﹕ Obj1
07-22 12: 19: 22.362 8032-8032/com.mediamanagment.app I/System.out ﹕ Obj2
07-22 12: 19: 22.362 8032-8032/com .mediamanagment.app Tôi/Hệ thống.ra: Obj3

LƯU Ý:
Bạn nên sử dụng phụ thuộc này:

compile 'org.glassfish:javax.json:1.0.4' 

Thay vào đó này:

compile 'javax.json:javax.json-api:1.0' 

Nếu bạn sẽ sử dụng 'javax.json:javax.json-api:1.0' bạn sẽ nhận được javax.json.JsonException: Provider org.glassfish.json.JsonProviderImpl not found ở bước :

.map(reader -> Json.createReader(reader).readObject()) 

của chúng, xin vui lòng, sử dụng 'org.glassfish:javax.json:1.0.4'

UPDATE: Ngoài ra, thay vì

.flatMap(jsonObjects -> Observable.from(jsonObjects)) 

Bạn có thể sử dụng flatMapIterable( ):

.flatMapIterable(jsonObjects -> jsonObjects) 
0

Bạn sẽ có thể sử dụng một flatMap() cuộc gọi thay vì map() gọi bạn đang sử dụng. Sau đó sử dụng Observable.create() phát ra JsonObject s

Observable<JsonObject> shareable = 
     observable 
       .flatMap(response -> response.getContent() 
         .flatMap(bb -> { 
          String stringVal = new String(bb); 
          StringReader reader = new StringReader(stringVal); 
          JsonObject jobj = Json.createReader(reader).readObject(); 
          JsonArray elements = jobj.getJsonArray("elements"); 
          return Observable.create(subscriber -> { 
           for (int i = 0; i < elements.length(); i++) { 
            subscriber.onNext(elements.getJSONObject(i)); 
           } 
           subscriber.onCompleted(); 
          }); 
         })) 
       .share(); 
+2

Bạn có thể gặp rắc rối với một này vì flatMap yêu cầu đầu vào của nó để hỗ trợ áp suất ngược và chỉ số Observable.create của bạn ement không hỗ trợ backpressure. Tôi muốn đề nghị cho rằng bit bạn sử dụng Observable.create (new AbstractOnSubscribe() {...}) mà cung cấp hỗ trợ backpressure. –

+0

Đối với Observable.create bạn cũng có thể xây dựng một danh sách và trả về Observable.from (danh sách). –

+0

Vì vậy, bằng cách sử dụng các giải pháp trên, tôi đã kết thúc phải dematerialize các Observable outcoming (nó trả về Observerable > do đó, nó gần gũi.Nhưng sử dụng dematerialize kết thúc trở lại Quan sát (dường như không thể làm thế nào để phân tích cú pháp đối tượng trở lại chính xác) – user2266870

Các vấn đề liên quan