8

Tôi đang xem xét việc thay thế thư viện xử lý nhật ký trong nhà trông rất gần với ReactiveStreams với io.projectreactor. Mục tiêu là để giảm mã chúng tôi duy trì và tận dụng lợi thế của bất kỳ tính năng mới nào được cộng đồng thêm vào (liên kết với nhà điều hành).Luồng phản hồi - theo đợt với thời gian chờ

Khi bắt đầu, tôi cần tiêu thụ bộ lọc và hợp nhất các mục nhập nhật ký đa dòng thành các đốm màu văn bản sẽ chảy xuống đường ống. Trường hợp sử dụng được giải thích chi tiết trên multiline log entries chương của các tài liệu Filebeat (ngoại trừ chúng tôi muốn nó inprocess).

Cho đến nay các mã tôi có là:

BufferedReader input = new BufferedReader(new InputStreamReader(System.in)); 
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); })); 
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner()); 
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper()); 
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length())) 
      .subscribe();   

này sẽ chăm sóc về nhiều dòng sáp nhập khi một tiêu đề đăng nhập mới được phát hiện, nhưng trong thư viện hiện có, chúng tôi cũng tuôn ra những dòng tích lũy sau một thời gian chờ (nghĩa là nếu không nhận được văn bản nào trong vòng 5 giây, hãy xóa bản ghi).

Cách thích hợp để mô hình hóa điều này trong Lò phản ứng là gì? Tôi có cần phải viết toán tử của riêng mình hay tôi có thể tùy chỉnh bất kỳ toán tử nào hiện có không?

Bất kỳ con trỏ nào cho các ví dụ và tài liệu liên quan để đạt được trường hợp sử dụng này trong Project Reactor hoặc RxJava sẽ được đánh giá rất nhiều.

+1

Bạn có thấy toán tử 'buffer (long timespan, TimeUnit unit)' (rxjava) không? – zella

+0

Bộ đệm trông thực sự gần gũi, nhưng không có sự quá tải nào phù hợp với những gì tôi cần - Tôi cần sự kết hợp của các chiến lược đóng "bufferClosingSelector" và "timespan" - tùy theo điều kiện nào xảy ra trước. – ddimitrov

Trả lời

3

Nó phụ thuộc vào cách bạn xác định điểm bắt đầu và kết thúc mỗi bộ đệm, do đó RxJava 2 mã sau đây được dự định như là một gợi ý về việc sử dụng giá trị nguồn chính để mở và đóng cửa của đệm:

TestScheduler scheduler = new TestScheduler(); 
PublishProcessor<String> pp = PublishProcessor.create(); 

Function<Flowable<String>, Flowable<List<String>>> f = o -> 
     o.buffer(o.filter(v -> v.contains("Start")), 
       v -> Flowable.merge(o.filter(w -> w.contains("End")), 
            Flowable.timer(5, TimeUnit.MINUTES, scheduler))); 

pp.publish(f) 
.subscribe(System.out::println); 

pp.onNext("Start"); 
pp.onNext("A"); 
pp.onNext("B"); 
pp.onNext("End"); 

pp.onNext("Start"); 
pp.onNext("C"); 

scheduler.advanceTimeBy(5, TimeUnit.MINUTES); 

pp.onNext("Start"); 
pp.onNext("D"); 
pp.onNext("End"); 
pp.onComplete(); 

Prints:

[Start, A, B, End] 
[Start, C] 
[Start, D, End] 

Nó hoạt động bằng cách chia sẻ nguồn qua publish cho phép sử dụng lại cùng một giá trị từ phía trên mà không cần nhiều bản sao nguồn chạy cùng một lúc. Việc mở được điều chỉnh bởi việc phát hiện chuỗi "Bắt đầu" trên dòng. Việc đóng được điều chỉnh bằng cách phát hiện chuỗi "Kết thúc" hoặc hẹn giờ kích hoạt sau một thời gian gia hạn.

Edit:

Nếu "Start" cũng là chỉ số cho các lô hàng tiếp theo, bạn có thể thay thế "Kết thúc" kiểm tra với "Start" và thay đổi các nội dung của bộ đệm vì nó sẽ bao gồm các tiêu đề mới trong bộ đệm trước khác:

pp.publish(f) 
.doOnNext(v -> { 
    int s = v.size(); 
    if (s > 1 && v.get(s - 1).contains("Start")) { 
     v.remove(s - 1); 
    } 
}) 
.subscribe(System.out::println); 
+0

Làm thế nào về trường hợp không có END, nhưng bộ đệm được đóng lại khi chúng ta thấy START tiếp theo, hoặc một tumeout hết hạn? Tôi bắt đầu nghi ngờ giao tiếp của tôi - có điều gì không rõ ràng trong câu hỏi không? – ddimitrov

1

buffer nhà điều hành có vẻ là giải pháp phù hợp và đơn giản nhất đối với tôi.

Nó có các chiến lược dựa trên kích thước và thời gian. Bạn có đăng nhập, vì vậy tôi nghĩ, bạn có thể diễn giải các dòng được tính là kích thước bộ đệm.

Ở đây ví dụ - làm thế nào phát ra mục nhóm 4 hoặc 5 giây khoảng thời gian:

Observable<String> lineReader = Observable.<String>create(subscriber -> { 
     try { 
      BufferedReader br = new BufferedReader(new InputStreamReader(System.in)); 
      for (String line = br.readLine(); line != null; line = br.readLine()) { 
       subscriber.onNext(line); 
      } 
     } catch (IOException e) { 
      throw new UncheckedIOException(e); 
     } 
    }).subscribeOn(Schedulers.newThread()); 

    lineReader 
     .buffer(5, TimeUnit.SECONDS,4) 
     .filter(lines -> !lines.isEmpty()) 
     .subscribe(System.out::println); 
+0

Tôi cần nhóm theo tiêu đề nhật ký với tumeout. I E. nếu tôi đã đăng nhập một tin nhắn 2 dòng, tiếp theo là tin nhắn 1 dòng, tiếp theo là stacktrace, sau đó là một dòng khác và sau đó không có gì trong khoảng thời gian tumeout.Tôi hy vọng sẽ nhận được 3 tin nhắn ngay lập tức, lên đến stacktrace, và một tin nhắn thứ tư sau khi tumeout. – ddimitrov

Các vấn đề liên quan