2013-12-18 18 views
9

Tôi đang khám phá lập trình phản ứng và RxJava. Đó là niềm vui, nhưng tôi bị mắc kẹt trên một vấn đề mà tôi không thể tìm thấy câu trả lời. Câu hỏi cơ bản của tôi: một cách phản ứng thích hợp để chấm dứt một quan sát vô hạn khác là gì? Tôi cũng hoan nghênh các phê bình và các phương pháp hay nhất về mã của tôi.RxJava - Chấm dứt các dòng vô hạn

Làm bài tập, tôi viết tiện ích đuôi tệp nhật ký. Luồng các dòng trong tệp nhật ký được biểu thị bằng một số Observable<String>. Để có được BufferedReader để tiếp tục đọc văn bản được thêm vào tệp, tôi bỏ qua kiểm tra chấm dứt reader.readLine() == null thông thường và thay vào đó giải thích nó có nghĩa là chuỗi của tôi sẽ ngủ và đợi thêm văn bản trình ghi nhật ký.

Nhưng trong khi tôi có thể chấm dứt Observer bằng cách sử dụng takeUntil, tôi cần tìm một cách rõ ràng để chấm dứt trình xem tệp khác chạy vô hạn. Tôi có thể viết phương thức/lĩnh vực của riêng mình là terminateWatcher, nhưng nó phá vỡ sự đóng gói Quan sát/Quan sát - và tôi muốn giữ chặt chẽ mô hình phản ứng càng tốt.

Đây là mã Observable<String>:

public class FileWatcher implements OnSubscribeFunc<String> { 
    private Path path = . . .; 

    @Override 
    // The <? super String> generic is pointless but required by the compiler 
    public Subscription onSubscribe(Observer<? super String> observer) { 
     try (BufferedReader reader = new BufferedReader(new FileReader(path.toFile()))) { 
      String newLine = ""; 
      while (!Thread.interrupted()) { // How do I terminate this reactively? 
       if ((newLine = reader.readLine()) != null) 
        observer.onNext(newLine); 
       else 
        try { 
         // Wait for more text 
         Thread.sleep(250); 
        } catch (InterruptedException e) { 
         Thread.currentThread().interrupt(); 
        } 
      } 
      observer.onCompleted(); 
     } catch (Exception e) { 
      observer.onError(e); 
     } 

     return null; // Not sure what Subscription I should return 
    } 
} 

Đây là mã Observer rằng in các dòng mới như họ đến:

public static void main(String... args) { 
    . . . 
    Observable<String> lines = Observable.create(createWatcher(file)); 
    lines = lines.takeWhile(new Func1<String, Boolean>() { 
     @Override 
     public Boolean call(String line) { 
      // Predicate for which to continue processing 
      return !line.contains("shutdown"); 
     } 
    }).subscribeOn(Schedulers.threadPoolForIO()) 
      .observeOn(Schedulers.currentThread()); 
    // Seems like I should use subscribeOn() and observeOn(), but they 
    // make my tailer terminate without reading any text. 

    Subscription subscription = lines.subscribe(new Action1<String>() { 
     @Override 
     public void call(String line) { 
      System.out.printf("%20s\t%s\n", file, line); 
     } 
    }); 
} 

hai câu hỏi của tôi là:

  1. gì là một cách phản ứng nhất quán để chấm dứt một luồng chạy vô hạn khác?
  2. Lỗi nào khác trong mã của tôi khiến bạn khóc? :)

Trả lời

6

Vì bạn đang bắt đầu xem tệp khi đăng ký được yêu cầu, nên chấm dứt khi đăng ký kết thúc, tức là khi Subscription bị đóng. Điều này giải quyết một trong các kết thúc bị mất trong các nhận xét về mã của bạn: bạn trả về một số Subscription cho biết FileWatcher để ngừng xem tệp. Sau đó, thay thế điều kiện vòng lặp của bạn sao cho nó kiểm tra xem đăng ký đã bị hủy thay vì kiểm tra xem chuỗi hiện tại có bị gián đoạn hay không.

Vấn đề là, điều đó không hữu ích nếu phương pháp onSubscribe() của bạn không bao giờ trở lại. Có lẽ FileWatcher của bạn nên yêu cầu chỉ định lịch biểu, với việc đọc đang diễn ra trên trình lên lịch đó.

+0

Vì vậy, trình lên lịch có thể xác định tần suất tệp nhật ký nên được đọc; sau đó, khi trình lên lịch chạy FileWatcher, FileWatcher sẽ kéo ra các dòng cho đến khi 'readLine() == null'. Sau đó, Observer có thể hủy đăng ký như bình thường, hoặc sử dụng 'takeUntil()' để tự động hủy đăng ký. Tôi sẽ phải xem xét các phương pháp này để xem nó có hiệu quả không, nhưng tôi thích ý tưởng đó. – MonkeyWithDarts

Các vấn đề liên quan