2017-09-30 12 views
5

Tôi đã chơi với Java Flow offer toán tử nhưng sau khi đọc tài liệu và làm bài kiểm tra của tôi, tôi không hiểu.Java 9 Hành vi của luồng gửi Phương pháp cung cấp nhà xuất bản

đây thử nghiệm của tôi

@Test 
public void offer() throws InterruptedException { 
    //Create Publisher for expected items Strings 
    SubmissionPublisher<String> publisher = new SubmissionPublisher<>(); 
    //Register Subscriber 
    publisher.subscribe(new CustomSubscriber<>()); 
    publisher.subscribe(new CustomSubscriber<>()); 
    publisher.subscribe(new CustomSubscriber<>()); 
    publisher.offer("item", (subscriber, value) -> false); 
    Thread.sleep(500); 
} 

Nhà điều hành đề nghị nhận được một item được phát ra và một hàm BiPredicate, và như xa như tôi hiểu đọc tài liệu hướng dẫn, chỉ trong trường hợp đó các chức năng làm vị là đúng mục nó sẽ được phát ra.

Bur sau khi vượt qua bài kiểm tra kết quả là

Subscription done: 
Subscription done: 
Subscription done: 
Got : item --> onNext() callback 
Got : item --> onNext() callback 
Got : item --> onNext() callback 

There's không thay đổi kết quả nếu thay vì sai tôi trở thành sự thật.

Bất kỳ ai cũng có thể giải thích cho tôi điều này tốt hơn một chút.

Trả lời

5

Nope, chức năng ngữ được sử dụng để quyết định xem có nên retry hoạt động xuất bản như đã đề cập trong docs:

onDrop - nếu không null, xử lý gọi khi giảm đến một thuê bao, với các đối số của người đăng ký và mục; nếu nó trả về đúng sự thật, một lời đề nghị được thực hiện lại (một lần)

Nó không ảnh hưởng đến việc mục đó có được gửi ban đầu hay không.

EDIT: Một ví dụ về cách giọt có thể xảy ra khi sử dụng phương pháp offer

tôi đã đưa ra một ví dụ về cách giảm có thể xảy ra khi gọi phương thức offer. Tôi không nghĩ rằng đầu ra là 100% xác định, nhưng có một sự khác biệt rõ ràng khi nó được chạy nhiều lần. Bạn chỉ có thể thay đổi trình xử lý để trả về true thay vì false, để xem cách thử lại làm giảm các giọt do bộ đệm bão hòa. Trong ví dụ này, sự sụt giảm thường xảy ra bởi vì dung lượng bộ đệm tối đa là nhỏ một cách rõ ràng (được truyền cho hàm tạo của SubmissionPublisher). Nhưng khi thử lại được kích hoạt sau một thời gian ngủ nhỏ, những giọt được loại bỏ:

public class SubmissionPubliserDropTest { 

    public static void main(String[] args) throws InterruptedException { 
     // Create Publisher for expected items Strings 
     // Note the small buffer max capacity to be able to cause drops 
     SubmissionPublisher<String> publisher = 
           new SubmissionPublisher<>(ForkJoinPool.commonPool(), 2); 
     // Register Subscriber 
     publisher.subscribe(new CustomSubscriber<>()); 
     publisher.subscribe(new CustomSubscriber<>()); 
     publisher.subscribe(new CustomSubscriber<>()); 
     // publish 3 items for each subscriber 
     for(int i = 0; i < 3; i++) { 
      int result = publisher.offer("item" + i, (subscriber, value) -> { 
       // sleep for a small period before deciding whether to retry or not 
       try { 
        Thread.sleep(200); 
       } catch (InterruptedException e) { 
        e.printStackTrace(); 
       } 
       return false; // you can switch to true to see that drops are reduced 
      }); 
      // show the number of dropped items 
      if(result < 0) { 
       System.err.println("dropped: " + result); 
      } 
     } 
     Thread.sleep(3000); 
     publisher.close(); 
    } 
} 

class CustomSubscriber<T> implements Flow.Subscriber<T> { 

    private Subscription sub; 

    @Override 
    public void onComplete() { 
     System.out.println("onComplete"); 
    } 

    @Override 
    public void onError(Throwable th) { 
     th.printStackTrace(); 
     sub.cancel(); 
    } 

    @Override 
    public void onNext(T arg0) { 
     System.out.println("Got : " + arg0 + " --> onNext() callback"); 
     sub.request(1); 
    } 

    @Override 
    public void onSubscribe(Subscription sub) { 
     System.out.println("Subscription done"); 
     this.sub = sub; 
     sub.request(1); 
    } 

} 
+0

Tôi có thể câm, nhưng sau khi sửa đổi thử nghiệm của tôi và đọc lại doc vẫn không nhận được nó. Bạn có nghĩ rằng bạn có thể xây dựng một ví dụ rất đơn giản ?. Đừng lo lắng nếu bạn không thể tôi sẽ tìm ra. – paul

+1

Như tài liệu hướng dẫn cho biết "Mục này có thể bị giảm bởi một hoặc nhiều người đăng ký" Làm thế nào tôi có thể tái tạo hành vi đó ?. Tôi chỉ cố gắng trong lần đầu tiên onNext và trong onSubscribe lần đầu tiên, chỉ cần thực hiện một subscription.cancel() nhưng vẫn không thấy bất kỳ thay đổi trong kết quả – paul

+0

@paul Tôi cũng quan tâm đến một ví dụ như vậy (thử ngay bây giờ) . Sẽ cập nhật câu trả lời nếu tôi nghĩ ra. – manouti

4

SubmissionPublisher.offer khẳng định rằng

Các item có thể được giảm một hay nhiều thuê bao nếu giới hạn tài nguyên là vượt quá, trong trường hợp xử lý đã cho (nếu không null) là được gọi và nếu nó trả về true, hãy thử lại một lần.

Chỉ cần để hiểu, trong cả hai cuộc gọi của bạn

publisher.offer("item", (subscriber, value) -> true); // the handler would be invoked 

publisher.offer("item", (subscriber, value) -> false); // the handler wouldn't be invoked 

Nhưng vẫn publisher công bố mục nhất định, cho mỗi thuê bao hiện tại của nó. xảy ra trong trường hợp hiện tại của bạn.


Kịch bản để xác nhận nếu xử lý mà bạn đã cung cấp việc gọi hay không bằng cách cố gắng để tái tạo là khó khăn về hạn chế tài nguyên, như doc gợi ý:

Các mục có thể bị giảm bởi một hoặc nhiều người đăng ký nếu giới hạn tài nguyên bị vượt quá, trong trường hợp đó trình xử lý đã cho (nếu không trống) được gọi là được gọi và trả về đúng, hãy thử lại một lần.

Tuy nhiên, bạn có thể thử thả các mục có timeouts thiết lập để tối thiểu cơ sở sử dụng phương pháp quá tải cho offer​(T item, long timeout, TimeUnit unit, BiPredicate<Flow.Subscriber<? super T>,? super T> onDrop)

timeout - bao lâu để chờ nguồn lực cho bất kỳ thuê bao trước khi từ bỏ, trong các đơn vị của đơn vị

unit - một TIMEUNIT xác định làm thế nào để giải thích các thông số thời gian chờ

Kể từ khi offer phương pháp có thể thả các mục (hoặc ngay lập tức hoặc với bị chặn thời gian chờ), mà sẽ cung cấp một cơ hội để xen một handler và sau đó thử lại.

+0

Cảm ơn thông tin;) – paul

Các vấn đề liên quan