Tôi muốn thiết lập Flink để nó sẽ chuyển đổi và chuyển hướng các luồng dữ liệu từ Apache Kafka đến MongoDB. Đối với mục đích thử nghiệm tôi đang xây dựng trên đầu trang của ví dụ flink-streaming-connectors.kafka (https://github.com/apache/flink).Kafka -> Flink DataStream -> MongoDB
Luồng Kafka đang được tô màu đỏ đúng cách bởi Flink, tôi có thể ánh xạ chúng, v.v., nhưng sự cố xảy ra khi tôi muốn lưu từng thư đã nhận và được chuyển đổi thành MongoDB. Ví dụ duy nhất tôi đã tìm thấy về tích hợp MongoDB là flink-mongodb-test từ github. Thật không may, nó sử dụng nguồn dữ liệu tĩnh (cơ sở dữ liệu), không phải luồng dữ liệu.
Tôi tin rằng sẽ có một số triển khai DataStream.addSink cho MongoDB, nhưng dường như không có.
Cách tốt nhất để đạt được điều đó là gì? Tôi có cần viết chức năng chìm tùy chỉnh hoặc có thể tôi đang thiếu gì đó không? Có lẽ nó nên được thực hiện theo cách khác nhau?
Tôi không gắn với bất kỳ giải pháp nào, vì vậy mọi đề xuất sẽ được đánh giá cao.
Dưới đây là ví dụ về chính xác những gì tôi nhận được làm đầu vào và những gì tôi cần lưu trữ dưới dạng đầu ra.
Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String)
Apache Kafka Broker --------------> Flink: DataStream<String>
Flink: DataStream.map({
return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD")
})
.rebalance()
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection
Như bạn có thể thấy trong ví dụ này, tôi sử dụng Flink chủ yếu cho bộ đệm luồng thư của Kafka và một số phân tích cú pháp cơ bản.