2017-09-02 57 views
7

Tôi có hai luồng dữ liệu mà tôi muốn kết hợp. Vấn đề là một luồng dữ liệu có tần số cao hơn nhiều so với luồng dữ liệu kia và có những lúc mà một luồng không nhận được sự kiện nào cả. Có thể sử dụng sự kiện cuối cùng từ một luồng và tham gia cùng với luồng khác trên mọi sự kiện sắp diễn ra không?Kết hợp hai luồng trong Apache Flink bất kể thời gian cửa sổ

Giải pháp duy nhất tôi tìm thấy là sử dụng chức năng kết nối, nhưng bạn phải chỉ định một cửa sổ chung, nơi bạn có thể áp dụng hàm nối. Đây không phải là cửa sổ, khi một luồng không nhận được bất kỳ sự kiện nào.

Có khả năng áp dụng hàm kết nối trên mọi sự kiện đến từ luồng này hoặc luồng khác và duy trì trạng thái của sự kiện được tiêu thụ cuối cùng và sử dụng sự kiện này cho hàm nối?

Cảm ơn bạn trước vì bất kỳ mẹo hữu ích nào!

Trả lời

5

Bạn muốn sử dụng Flink's ConnectedStream s với số RichCoFlatMapFunction hoặc CoProcessFunction. Một trong hai cách này sẽ cho phép bạn giữ trạng thái được quản lý (tức là yếu tố cuối cùng từ luồng cập nhật không thường xuyên) và tham gia với luồng nhanh hơn. CoProcessFunction thêm khả năng làm việc với bộ hẹn giờ, mà bạn nên sử dụng để xóa trạng thái cho các khóa đã hết hạn, nếu điều đó có liên quan.

Có một bài tập trên trang web đào tạo Flink về việc triển khai tham gia như vậy: Low-latency Event Time Join.

Cập nhật: Trong Flink 1.5 (chưa được phát hành kể từ tháng 2 năm 2018), thư viện SQL has an implementation of non-windowed stream joins. Nó lưu trữ các bản ghi trong trạng thái Flink, sử dụng MapState<Long, Record> trong đó Long là dấu thời gian và tham gia bằng cách lặp qua các bản đồ này và so sánh dấu thời gian. So với ví dụ từ việc đào tạo (xem liên kết ở trên), điều này có lợi thế là chỉ deserializing hồ sơ khi họ là cần thiết.

+0

Cảm ơn bạn! Điều này thật đúng với gì mà tôi đã tìm kiếm ! – FLoppix

+0

@DavidAnderson Ngược lại, ví dụ sẽ khác khi tham gia hai luồng nhanh (thường xuyên)? Bạn sẽ thay đổi điều gì? – Beckham

Các vấn đề liên quan