2016-08-10 22 views
9

Tôi đang làm việc trên một ứng dụng truyền dữ liệu và tôi đang điều tra khả năng sử dụng Apache Flink cho dự án này. Lý do chính cho việc này là nó hỗ trợ các cấu trúc luồng cao cấp tốt đẹp, rất giống với API luồng của Java 8.Cách tra cứu và cập nhật trạng thái bản ghi từ cơ sở dữ liệu trong Apache Flink?

Tôi sẽ nhận các sự kiện tương ứng với một bản ghi cụ thể trong cơ sở dữ liệu và tôi muốn có thể xử lý các sự kiện này (đến từ một nhà môi giới thư như RabbitMQ hoặc Kafka) và cuối cùng cập nhật hồ sơ trong cơ sở dữ liệu đẩy các sự kiện được xử lý/chuyển đổi sang một bồn rửa khác (có thể là một nhà môi giới thư khác).

Sự kiện liên quan đến một bản ghi cụ thể lý tưởng cần được xử lý theo thứ tự FIFO (mặc dù sẽ có dấu thời gian giúp phát hiện các sự kiện đặt hàng), nhưng các sự kiện liên quan đến các bản ghi khác nhau có thể được xử lý song song. Tôi đã định sử dụng cấu trúc keyBy() để phân đoạn luồng theo bản ghi.

Quá trình xử lý cần được thực hiện tùy thuộc vào thông tin hiện tại trong cơ sở dữ liệu về bản ghi. Tuy nhiên, tôi không thể tìm thấy một ví dụ hoặc cách tiếp cận được đề nghị để truy vấn cơ sở dữ liệu cho các hồ sơ đó để làm phong phú thêm sự kiện mà nó đang được xử lý với thông tin bổ sung mà tôi cần để xử lý nó.

Đường ống dẫn dầu tôi có trong tâm trí như sau:

-> keyBy() trên id nhận được -> lấy các bản ghi từ cơ sở dữ liệu tương ứng với id -> thực hiện các bước xử lý trên hồ sơ -> push sự kiện đã xử lý vào hàng đợi bên ngoài và cập nhật bản ghi cơ sở dữ liệu

Bản ghi cơ sở dữ liệu sẽ cần được cập nhật vì ứng dụng khác sẽ truy vấn dữ liệu.

Có thể có thêm tối ưu hóa mà người ta có thể thực hiện sau khi đường ống này đạt được. Ví dụ người ta có thể lưu trữ bản ghi (cập nhật) trong trạng thái được quản lý để sự kiện tiếp theo trên cùng một bản ghi sẽ không cần truy vấn cơ sở dữ liệu khác. Tuy nhiên, nếu ứng dụng không biết về một bản ghi cụ thể, nó sẽ cần phải lấy nó từ cơ sở dữ liệu.

Cách tiếp cận tốt nhất để sử dụng cho loại kịch bản này trong Apache Flink là gì?

Trả lời

4

Bạn có thể thực hiện tìm kiếm cơ sở dữ liệu bằng cách mở rộng hàm phong phú cho ví dụ: một hàm RichFlatMap, khởi tạo kết nối cơ sở dữ liệu một lần trong phương pháp open() nó và sau đó xử lý mỗi sự kiện trong flatMap() phương pháp:

public static class DatabaseMapper extends RichFlatMapFunction<Event, EncrichedEvent> { 

    // Declare DB coonection and query statements 

    @Override 
    public void open(Configuration parameters) throws Exception { 
     // Initialize Database connection 
     // Prepare Query statements 
    } 

    @Override 
    public void flatMap(Event currentEvent, Collector<EncrichedEvent> out) throws Exception { 
     // look up the Database, update record, enrich event 
     out.collect(enrichedEvent);   
    } 
}) 

Và sau đó bạn có thể sử dụng DatabaseMapper như sau:

stream.keyby(id) 
     .flatmap(new DatabaseMapper()) 
     .addSink(..); 

Bạn có thể tìm here một ví dụ sử dụng dữ liệu được lưu trong bộ nhớ cache từ Redis.

+1

Cảm ơn. Vì vậy, những gì sẽ xảy ra khi Flink chạy trong một chế độ cụm phân tán. Nó có thiết lập kết nối từ mỗi nút của cụm sao không? – jbx

+0

Bạn có thể có nhiều trường hợp 'flatmap' tùy thuộc vào paralleism bạn đã đặt. Có thể có nhiều hơn một cá thể toán tử trên mỗi nút (tùy thuộc vào số lượng các vùng tác vụ được định cấu hình). Đối với mỗi cá thể song song, phương thức 'open()' được gọi chính xác một lần và 'flatmap()' được gọi cho mỗi sự kiện đến. –

+1

OK Cảm ơn. Tôi sẽ đi. Bạn có biết liệu có thể tích hợp Spring trong Flink (hoặc ngược lại) không? Có những thứ như tiêm phụ thuộc, quản lý thực thể tự động-dây vvsẽ rất hữu ích để sử dụng, nhưng tôi dường như không thể tìm thấy bất kỳ thông tin trực tuyến nào về những người đã làm điều đó (và nếu có bất kỳ cạm bẫy nào). – jbx

Các vấn đề liên quan