Tôi đang xem xét việc thay thế thư viện xử lý nhật ký trong nhà trông rất gần với ReactiveStreams với io.projectreactor
. Mục tiêu là để giảm mã chúng tôi duy trì và tận dụng lợi thế của bất kỳ tính năng mới nào được cộng đồng thêm vào (liên kết với nhà điều hành).Luồng phản hồi - theo đợt với thời gian chờ
Khi bắt đầu, tôi cần tiêu thụ bộ lọc và hợp nhất các mục nhập nhật ký đa dòng thành các đốm màu văn bản sẽ chảy xuống đường ống. Trường hợp sử dụng được giải thích chi tiết trên multiline log entries chương của các tài liệu Filebeat (ngoại trừ chúng tôi muốn nó inprocess).
Cho đến nay các mã tôi có là:
BufferedReader input = new BufferedReader(new InputStreamReader(System.in));
Flux<String> lines = Flux.generate(sink -> rethrow(() -> { while (true) sink.next(input.readLine()); }));
Flux<String> logRecordsStr = lines.concatMap(new LogRecordJoiner());
Flux<LogRecord> logRecords = logRecordsStr.map(new LogRecordMapper());
logRecords.doOnEach(r -> System.out.printf("%s payload: %d chars\n", r.timestamp, r.payload.length()))
.subscribe();
này sẽ chăm sóc về nhiều dòng sáp nhập khi một tiêu đề đăng nhập mới được phát hiện, nhưng trong thư viện hiện có, chúng tôi cũng tuôn ra những dòng tích lũy sau một thời gian chờ (nghĩa là nếu không nhận được văn bản nào trong vòng 5 giây, hãy xóa bản ghi).
Cách thích hợp để mô hình hóa điều này trong Lò phản ứng là gì? Tôi có cần phải viết toán tử của riêng mình hay tôi có thể tùy chỉnh bất kỳ toán tử nào hiện có không?
Bất kỳ con trỏ nào cho các ví dụ và tài liệu liên quan để đạt được trường hợp sử dụng này trong Project Reactor hoặc RxJava sẽ được đánh giá rất nhiều.
Bạn có thấy toán tử 'buffer (long timespan, TimeUnit unit)' (rxjava) không? – zella
Bộ đệm trông thực sự gần gũi, nhưng không có sự quá tải nào phù hợp với những gì tôi cần - Tôi cần sự kết hợp của các chiến lược đóng "bufferClosingSelector" và "timespan" - tùy theo điều kiện nào xảy ra trước. – ddimitrov