2016-11-24 18 views
7

Tôi có 2 chủ đề Kafka đang phát nội dung chính xác từ nhiều nguồn khác nhau để tôi có thể có tính sẵn sàng cao trong trường hợp một trong các nguồn không thành công. Tôi đang cố gắng hợp nhất 2 chủ đề thành 1 chủ đề đầu ra bằng cách sử dụng Luồng Kafka 0.10.1.0 sao cho tôi không bỏ lỡ bất kỳ thông báo nào về lỗi và không có bản sao nào khi tất cả các nguồn đều lên.Hợp nhất nhiều chủ đề Kafka Stream giống hệt nhau

Khi sử dụng phương pháp leftJoin của KStream, một trong các chủ đề có thể không hoạt động (chủ đề phụ), nhưng khi chủ đề chính bị hỏng, không có gì được gửi đến chủ đề đầu ra. Đây dường như là bởi vì, theo Kafka Streams developer guide,

KStream-KStream leftJoin luôn được thúc đẩy bởi các hồ sơ đến từ luồng chính

vì vậy nếu không có hồ sơ đến từ các luồng chính, nó sẽ không sử dụng các bản ghi từ luồng thứ cấp ngay cả khi chúng tồn tại. Khi luồng chính trở lại trực tuyến, đầu ra sẽ tiếp tục bình thường.

Tôi cũng đã cố gắng sử dụng outerJoin (có thêm bản ghi trùng lặp) tiếp theo là một chuyển đổi sang một KTable và groupByKey để thoát khỏi bản sao,

KStream mergedStream = stream1.outerJoin(stream2, 
    (streamVal1, streamVal2) -> (streamVal1 == null) ? streamVal2 : streamVal1, 
    JoinWindows.of(2000L)) 

mergedStream.groupByKey() 
      .reduce((value1, value2) -> value1, TimeWindows.of(2000L), stateStore)) 
      .toStream((key,value) -> value) 
      .to(outputStream) 

nhưng tôi vẫn nhận được bản sao một lần trong một thời gian. Tôi cũng đang sử dụng commit.interval.ms=200 để nhận được KTable gửi tới luồng đầu ra thường xuyên.

Cách tốt nhất để tiếp cận hợp nhất này để có được kết quả chính xác một lần từ nhiều chủ đề đầu vào giống hệt nhau là gì?

+0

Nói chung, tôi khuyên bạn nên sử dụng Processor API để giải quyết vấn đề. Bạn cũng có thể thử chuyển sang phiên bản 'trunk' hiện tại (không chắc chắn điều này là có thể cho bạn). Tham gia đã được làm lại của họ, và điều này có thể giải quyết vấn đề của bạn: https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics Các ngữ nghĩa nối mới sẽ được bao gồm trong Kafka '0.10.2' có ngày phát hành mục tiêu tháng 1 năm 2017 (https://cwiki.apache.org/confluence/display/KAFKA/Time+Based+Release+Plan). –

+0

@ MatthiasJ.Sax Tôi chuyển sang thân cây và có vẻ như 'leftJoin' bây giờ hoạt động giống như một' outerJoin' cho KStream-KStream tham gia, vì vậy tôi nghĩ tôi sẽ quay lại với ngữ nghĩa 10.1. Những gì tôi đang cố gắng bây giờ là tạo ra một luồng giả tạo ra các giá trị rỗng mà tôi sẽ sử dụng làm phần chính trong một leftJoin với cái được sử dụng làm phần chính và sử dụng kết hợp đó trong một leftJoin với phần thứ hai. Tôi hy vọng điều này sẽ dẫn đến luôn luôn có giá trị trong dòng chính, ngay cả khi chính của tôi là xuống (như tôi sẽ chỉ nhận được null từ leftJoin đầu tiên). –

+0

'leftJoin' mới kích hoạt từ cả hai phía như cũ' outerJoin' cũng vậy (tôi đoán đó là ý của bạn "có vẻ như leftJoin bây giờ cư xử như một outerJoin"?) - điều này gần với ngữ nghĩa SQL hơn 'leftJoin' cũ - nhưng' leftJoin' vẫn khác với 'externalJoin': nếu trình kích hoạt bên phải và không tìm thấy đối tác tham gia, nó sẽ xóa bản ghi và không có kết quả nào được phát ra . –

Trả lời

5

Sử dụng bất kỳ loại tham gia nào sẽ không giải quyết được sự cố của bạn vì bạn sẽ luôn kết thúc hoặc thiếu kết quả (tham gia bên trong trường hợp một số quầy hàng) hoặc "trùng lặp" với null (tham gia bên trái hoặc tham gia bên ngoài trường hợp cả hai luồng đều trực tuyến). Xem https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams+Join+Semantics để biết chi tiết về cách tham gia ngữ nghĩa trong Luồng Kafka.

Vì vậy, tôi khuyên bạn nên sử dụng API bộ xử lý mà bạn có thể kết hợp và kết hợp với DSL bằng cách sử dụng KStreamprocess(), transform() hoặc transformValues(). Xem How to filter keys and value with a Processor using Kafka Stream DSL để biết thêm chi tiết.

Bạn cũng có thể thêm cửa hàng tùy chỉnh vào bộ xử lý của mình (How to add a custom StateStore to the Kafka Streams DSL processor?) để tạo khả năng chịu lỗi trùng lặp lọc.

Các vấn đề liên quan