2016-02-02 43 views
7

Tôi muốn thiết lập Flink để nó sẽ chuyển đổi và chuyển hướng các luồng dữ liệu từ Apache Kafka đến MongoDB. Đối với mục đích thử nghiệm tôi đang xây dựng trên đầu trang của ví dụ flink-streaming-connectors.kafka (https://github.com/apache/flink).Kafka -> Flink DataStream -> MongoDB

Luồng Kafka đang được tô màu đỏ đúng cách bởi Flink, tôi có thể ánh xạ chúng, v.v., nhưng sự cố xảy ra khi tôi muốn lưu từng thư đã nhận và được chuyển đổi thành MongoDB. Ví dụ duy nhất tôi đã tìm thấy về tích hợp MongoDB là flink-mongodb-test từ github. Thật không may, nó sử dụng nguồn dữ liệu tĩnh (cơ sở dữ liệu), không phải luồng dữ liệu.

Tôi tin rằng sẽ có một số triển khai DataStream.addSink cho MongoDB, nhưng dường như không có.

Cách tốt nhất để đạt được điều đó là gì? Tôi có cần viết chức năng chìm tùy chỉnh hoặc có thể tôi đang thiếu gì đó không? Có lẽ nó nên được thực hiện theo cách khác nhau?

Tôi không gắn với bất kỳ giải pháp nào, vì vậy mọi đề xuất sẽ được đánh giá cao.

Dưới đây là ví dụ về chính xác những gì tôi nhận được làm đầu vào và những gì tôi cần lưu trữ dưới dạng đầu ra.

Apache Kafka Broker <-------------- "AAABBBCCCDDD" (String) 
Apache Kafka Broker --------------> Flink: DataStream<String> 

Flink: DataStream.map({ 
    return ("AAABBBCCCDDD").convertTo("A: AAA; B: BBB; C: CCC; D: DDD") 
}) 
.rebalance() 
.addSink(MongoDBSinkFunction); // store the row in MongoDB collection 

Như bạn có thể thấy trong ví dụ này, tôi sử dụng Flink chủ yếu cho bộ đệm luồng thư của Kafka và một số phân tích cú pháp cơ bản.

Trả lời

3

Hiện tại không có bồn rửa tuyến MongoDB nào có sẵn trong Flink.

Tuy nhiên, có hai cách để ghi dữ liệu vào MongoDB:

  • Sử dụng DataStream.write() tiếng gọi của Flink. Nó cho phép bạn sử dụng bất kỳ OutputFormat (từ Batch API) với streaming. Sử dụng HadoopOutputFormatWrapper of Flink, bạn có thể sử dụng trình kết nối MongoDB Hadoop chính thức

  • Tự mình thực hiện Sink. Việc triển khai các bồn chứa khá dễ dàng với API Streaming và tôi chắc chắn MongoDB có một thư viện Java Client tốt.

Cả hai cách tiếp cận không cung cấp bất kỳ đảm bảo xử lý tinh vi nào. Tuy nhiên, khi bạn đang sử dụng Flink với Kafka (và checkpointing được kích hoạt), bạn sẽ có các ngữ nghĩa ít nhất một lần: Trong một trường hợp lỗi, dữ liệu được truyền lại một lần nữa đến bồn chứa MongoDB. Nếu bạn đang thực hiện cập nhật không cần thiết, việc thực hiện lại các cập nhật này sẽ không gây ra bất kỳ mâu thuẫn nào.

Nếu bạn thực sự cần ngữ nghĩa chính xác một lần cho MongoDB, có lẽ bạn nên gửi JIRA in Flink và thảo luận với cộng đồng về cách triển khai điều này.

2

Để thay thế cho câu trả lời của Robert Metzger, bạn có thể viết lại kết quả cho Kafka và sau đó sử dụng một trong các đầu nối kafka duy trì để xóa nội dung của một chủ đề bên trong Cơ sở dữ liệu MongoDB của bạn.

Kafka -> Flink -> Kafka -> Mongo/Bất cứ điều gì

Với phương pháp này bạn có thể mantain các "ngữ nghĩa ở-nhất-once" behaivour.

Các vấn đề liên quan