Tôi đang khám phá lập trình phản ứng và RxJava. Đó là niềm vui, nhưng tôi bị mắc kẹt trên một vấn đề mà tôi không thể tìm thấy câu trả lời. Câu hỏi cơ bản của tôi: một cách phản ứng thích hợp để chấm dứt một quan sát vô hạn khác là gì? Tôi cũng hoan nghênh các phê bình và các phương pháp hay nhất về mã của tôi.RxJava - Chấm dứt các dòng vô hạn
Làm bài tập, tôi viết tiện ích đuôi tệp nhật ký. Luồng các dòng trong tệp nhật ký được biểu thị bằng một số Observable<String>
. Để có được BufferedReader
để tiếp tục đọc văn bản được thêm vào tệp, tôi bỏ qua kiểm tra chấm dứt reader.readLine() == null
thông thường và thay vào đó giải thích nó có nghĩa là chuỗi của tôi sẽ ngủ và đợi thêm văn bản trình ghi nhật ký.
Nhưng trong khi tôi có thể chấm dứt Observer bằng cách sử dụng takeUntil
, tôi cần tìm một cách rõ ràng để chấm dứt trình xem tệp khác chạy vô hạn. Tôi có thể viết phương thức/lĩnh vực của riêng mình là terminateWatcher
, nhưng nó phá vỡ sự đóng gói Quan sát/Quan sát - và tôi muốn giữ chặt chẽ mô hình phản ứng càng tốt.
Đây là mã Observable<String>
:
public class FileWatcher implements OnSubscribeFunc<String> {
private Path path = . . .;
@Override
// The <? super String> generic is pointless but required by the compiler
public Subscription onSubscribe(Observer<? super String> observer) {
try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) {
String newLine = "";
while (!Thread.interrupted()) { // How do I terminate this reactively?
if ((newLine = reader.readLine()) != null)
observer.onNext(newLine);
else
try {
// Wait for more text
Thread.sleep(250);
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
}
observer.onCompleted();
} catch (Exception e) {
observer.onError(e);
}
return null; // Not sure what Subscription I should return
}
}
Đây là mã Observer rằng in các dòng mới như họ đến:
public static void main(String... args) {
. . .
Observable<String> lines = Observable.create(createWatcher(file));
lines = lines.takeWhile(new Func1<String, Boolean>() {
@Override
public Boolean call(String line) {
// Predicate for which to continue processing
return !line.contains("shutdown");
}
}).subscribeOn(Schedulers.threadPoolForIO())
.observeOn(Schedulers.currentThread());
// Seems like I should use subscribeOn() and observeOn(), but they
// make my tailer terminate without reading any text.
Subscription subscription = lines.subscribe(new Action1<String>() {
@Override
public void call(String line) {
System.out.printf("%20s\t%s\n", file, line);
}
});
}
hai câu hỏi của tôi là:
- gì là một cách phản ứng nhất quán để chấm dứt một luồng chạy vô hạn khác?
- Lỗi nào khác trong mã của tôi khiến bạn khóc? :)
Vì vậy, trình lên lịch có thể xác định tần suất tệp nhật ký nên được đọc; sau đó, khi trình lên lịch chạy FileWatcher, FileWatcher sẽ kéo ra các dòng cho đến khi 'readLine() == null'. Sau đó, Observer có thể hủy đăng ký như bình thường, hoặc sử dụng 'takeUntil()' để tự động hủy đăng ký. Tôi sẽ phải xem xét các phương pháp này để xem nó có hiệu quả không, nhưng tôi thích ý tưởng đó. – MonkeyWithDarts