2016-08-22 15 views
5

Tôi đang xây dựng cấu trúc liên kết và muốn sử dụng KStream.process() để viết một số giá trị trung gian vào cơ sở dữ liệu. Bước này không thay đổi bản chất của dữ liệu và hoàn toàn không trạng thái.Làm thế nào để đăng ký một bộ xử lý không trạng thái (có vẻ như yêu cầu một StateStore)?

Thêm Processor yêu cầu tạo và chuyển phiên bản này tới hàm KStream.process() cùng với tên của cửa hàng nhà nước. Đây là những gì tôi không hiểu.

Cách thêm đối tượng StateStore vào cấu trúc liên kết vì nó yêu cầu StateStoreSupplier?

Không thêm nói StateStore cho lỗi này khi ứng dụng được bắt đầu:

ngoại lệ trong chủ đề org.apache.kafka.streams.errors.TopologyBuilderException "chính": không hợp lệ xây dựng topo: StateStore may- cửa hàng tiểu bang chưa được thêm.

Tại sao bộ xử lý cần có cửa hàng nhà nước? Dường như điều này cũng có thể là tùy chọn cho các bộ vi xử lý không có trạng thái và không duy trì trạng thái.

Xử lý tất cả các phần tử trong luồng này, mỗi lần một phần tử bằng cách áp dụng Bộ xử lý.

+0

Bạn đã đọc chương API bộ xử lý tại http://docs.confluent.io/3.0.0/streams/developer-guide.html#processor-api? Nó bao gồm các phần về sử dụng các cửa hàng nhà nước. –

Trả lời

11

Đây là simple example on how to use state stores, được lấy từ Confluent Platform documentation on Kafka Streams.

Bước 1: Xác định StateStore/StateStoreSupplier:

StateStoreSupplier countStore = Stores.create("Counts") 
             .withKeys(Serdes.String()) 
             .withValues(Serdes.Long()) 
             .persistent() 
             .build(); 
  1. Tôi không thấy một cách để thêm một đối tượng StateStore để topo của tôi. Nó đòi hỏi một StateStoreSupplier là tốt mặc dù.

Bước 2: Thêm các cửa hàng nhà nước để topo của bạn.

Lựa chọn A - Khi sử dụng API Processor:

TopologyBuilder builder = new TopologyBuilder(); 

// add the source processor node that takes Kafka topic "source-topic" as input 
builder.addSource("Source", "source-topic") 
     .addProcessor("Process",() -> new WordCountProcessor(), "Source") 
     // Add the countStore associated with the WordCountProcessor processor 
     .addStateStore(countStore, "Process") 
     .addSink("Sink", "sink-topic", "Process"); 

Lựa chọn B - Khi sử dụng Kafka Streams DSL:

Ở đây bạn cần phải gọi KStreamBuilder#addStateStore("name-of-your-store") để thêm các cửa hàng nhà nước để topo vi xử lý của bạn . Sau đó, khi gọi các phương thức như KStream#process() hoặc KStream#transform(), bạn cũng phải chuyển tên của cửa hàng nhà nước - nếu không, ứng dụng của bạn sẽ không thành công khi chạy.

Tại ví dụ về KStream#transform():

KStreamBuilder builder = new KStreamBuilder(); 

// Add the countStore that will be used within the Transformer[Supplier] 
// that we pass into `transform()` below. 
builder.addStateStore(countStore); 

KStream<byte[], String> input = builder.stream("source-topic"); 

KStream<String, Long> transformed = 
    input.transform(/* your TransformerSupplier */, countStore.name()); 

Tại sao nó cần thiết cho một bộ xử lý để có một cửa hàng nhà nước? Dường như điều này cũng có thể là tùy chọn cho các bộ vi xử lý không có trạng thái và không duy trì trạng thái.

Bạn nói đúng - bạn không cần cửa hàng nhà nước nếu bộ xử lý của bạn không duy trì trạng thái.

Khi sử dụng DSL, bạn chỉ cần gọi KStreamBuilder#addStateStore("name-of-your-store") để thêm cửa hàng trạng thái vào cấu trúc liên kết bộ xử lý của bạn và tham khảo sau này.

+0

Tôi nhớ lại việc này (và tương tự) nhưng Im không sử dụng ** TopologyBuilder **. Đang cố gắng làm việc thông qua điều này bằng cách sử dụng '.foreach()'. – ethrbunny

+0

Bạn không thể sử dụng các kho lưu trữ trạng thái trong toán tử 'foreach()' của DSL bởi vì, theo định nghĩa toán tử, 'foreach()' thêm nút xử lý * stateless * vào cấu trúc liên kết. Nhưng bạn có thể sử dụng 'transform()', 'transformValues ​​()', và 'process()' trong DSL cho các phép toán trạng thái. Xem https://cwiki.apache.org/confluence/display/KAFKA/Kafka+Streams%3A+Internal+Data+Management. –

+0

Và xin lỗi tôi đã bỏ lỡ rằng bạn đang sử dụng DSL. Có một phương thức 'process()' trong cả DSL và API bộ xử lý. :-) –

Các vấn đề liên quan