Đây là simple example on how to use state stores, được lấy từ Confluent Platform documentation on Kafka Streams.
Bước 1: Xác định StateStore
/StateStoreSupplier
:
StateStoreSupplier countStore = Stores.create("Counts")
.withKeys(Serdes.String())
.withValues(Serdes.Long())
.persistent()
.build();
- Tôi không thấy một cách để thêm một đối tượng StateStore để topo của tôi. Nó đòi hỏi một StateStoreSupplier là tốt mặc dù.
Bước 2: Thêm các cửa hàng nhà nước để topo của bạn.
Lựa chọn A - Khi sử dụng API Processor:
TopologyBuilder builder = new TopologyBuilder();
// add the source processor node that takes Kafka topic "source-topic" as input
builder.addSource("Source", "source-topic")
.addProcessor("Process",() -> new WordCountProcessor(), "Source")
// Add the countStore associated with the WordCountProcessor processor
.addStateStore(countStore, "Process")
.addSink("Sink", "sink-topic", "Process");
Lựa chọn B - Khi sử dụng Kafka Streams DSL:
Ở đây bạn cần phải gọi KStreamBuilder#addStateStore("name-of-your-store")
để thêm các cửa hàng nhà nước để topo vi xử lý của bạn . Sau đó, khi gọi các phương thức như KStream#process()
hoặc KStream#transform()
, bạn cũng phải chuyển tên của cửa hàng nhà nước - nếu không, ứng dụng của bạn sẽ không thành công khi chạy.
Tại ví dụ về KStream#transform()
:
KStreamBuilder builder = new KStreamBuilder();
// Add the countStore that will be used within the Transformer[Supplier]
// that we pass into `transform()` below.
builder.addStateStore(countStore);
KStream<byte[], String> input = builder.stream("source-topic");
KStream<String, Long> transformed =
input.transform(/* your TransformerSupplier */, countStore.name());
Tại sao nó cần thiết cho một bộ xử lý để có một cửa hàng nhà nước? Dường như điều này cũng có thể là tùy chọn cho các bộ vi xử lý không có trạng thái và không duy trì trạng thái.
Bạn nói đúng - bạn không cần cửa hàng nhà nước nếu bộ xử lý của bạn không duy trì trạng thái.
Khi sử dụng DSL, bạn chỉ cần gọi KStreamBuilder#addStateStore("name-of-your-store")
để thêm cửa hàng trạng thái vào cấu trúc liên kết bộ xử lý của bạn và tham khảo sau này.
Bạn đã đọc chương API bộ xử lý tại http://docs.confluent.io/3.0.0/streams/developer-guide.html#processor-api? Nó bao gồm các phần về sử dụng các cửa hàng nhà nước. –