2016-12-08 15 views
6

Luồng của tôi có cột được gọi là 'danh mục' và tôi có siêu dữ liệu tĩnh bổ sung cho mỗi 'danh mục' trong một cửa hàng khác, nó được cập nhật vài ngày một lần. Cách đúng để thực hiện việc tra cứu này là gì? Có hai tùy chọn với luồng KafkaCách lý tưởng để làm giàu KStream với dữ liệu tra cứu

  1. Tải dữ liệu tĩnh bên ngoài Kafka Suối và chỉ sử dụng KStreams#map() để thêm siêu dữ liệu. Điều này có thể là Kafka Streams chỉ là một thư viện.

  2. Tải siêu dữ liệu đến chủ đề Kafka, tải nó vào một số KTable và làm KStreams#leftJoin(), điều này có vẻ tự nhiên hơn và rời khỏi phân vùng vv thành luồng Kafka. Tuy nhiên, điều này yêu cầu chúng tôi giữ cho KTable được tải với tất cả các giá trị. Lưu ý rằng chúng tôi sẽ phải tải toàn bộ dữ liệu tra cứu và không chỉ thay đổi.

    • Ví dụ: ban đầu chỉ có một danh mục 'c1'. Ứng dụng luồng Kafka đã dừng lại một cách duyên dáng và khởi động lại. Sau khi khởi động lại, một danh mục mới 'c2' đã được thêm vào. Giả định của tôi là, table = KStreamBuilder(). Table ('metadataTopic') sẽ chỉ có giá trị 'c2', vì đó là điều duy nhất đã thay đổi kể từ khi ứng dụng bắt đầu lần thứ hai. Tôi muốn nó có 'c1' và 'c2'.
    • Nếu nó có 'c1', dữ liệu sẽ bị xóa khỏi KTable (có thể bằng cách đặt gửi thông điệp key = null?)?

Cách nào dưới đây là đúng cách để tìm kiếm siêu dữ liệu?

Có thể luôn chỉ buộc một luồng được đọc ngay từ đầu khi khởi động lại, điều này là để tất cả siêu dữ liệu có thể được tải vào KTable.

Có cách nào khác sử dụng cửa hàng không?

Trả lời

3

Quan sát tổng thể của bạn là chính xác và điều đó phụ thuộc vào sự cân bằng nào quan trọng hơn đối với bạn. Nếu siêu dữ liệu của bạn nhỏ, tùy chọn 1 có vẻ là tốt hơn. Nếu siêu dữ liệu lớn, có vẻ như tùy chọn 2 là cách để đi.

Nếu bạn sử dụng map(), bạn cần có bản sao siêu dữ liệu hoàn chỉnh trong từng trường hợp ứng dụng (vì bạn không thể biết chính xác cách Luồng sẽ phân đoạn bạn KStream dữ liệu). Do đó, nếu siêu dữ liệu của bạn không vừa với bộ nhớ chính sử dụng map() sẽ không hoạt động dễ dàng.

Nếu bạn sử dụng KTable, Luồng sẽ chăm sóc siêu dữ liệu được phân phối chính xác trên tất cả các phiên bản ứng dụng đang chạy, sao cho không yêu cầu sao chép dữ liệu. Hơn nữa, KTable sử dụng RocksDB làm công cụ lưu trữ trạng thái và do đó có thể tràn vào đĩa.

EDIT BEGIN

Về việc có tất cả dữ liệu trong KTable: nếu bạn có hai loại cho chìa khóa tương tự, giá trị thứ hai sẽ ghi đè lên giá trị đầu tiên nếu bạn đọc dữ liệu trực tiếp từ chủ đề này thành một KTable qua builder.table(...) (ngữ nghĩa changelog). Tuy nhiên, bạn có thể giải quyết vấn đề này dễ dàng bằng cách đọc chủ đề dưới dạng luồng bản ghi (ví dụ: builder.stream(...) và áp dụng tổng hợp để tính toán KTable. Kết hợp của bạn sẽ đơn giản phát ra danh sách tất cả các giá trị cho mỗi khóa.

Giới thiệu về xóa: KTable sử dụng ngữ nghĩa thay đổi và hiểu thông điệp bia mộ để xóa cặp khóa-giá trị. Do đó, nếu bạn đọc KTable từ một chủ đề và chủ đề chứa thông báo <key:null>, bản ghi hiện tại trong KTable bằng phím này sẽ bị xóa. Đây là khó khăn hơn để đạt được khi KTable là kết quả của một tập hợp, vì bản ghi đầu vào tổng hợp với giá trị null hoặc null sẽ đơn giản bị bỏ qua và không cập nhật kết quả tổng hợp.

Cách giải quyết sẽ có thêm một bước map() trước khi tập hợp và giới thiệu một giá trị NULL (ví dụ, một người dùng định nghĩa "đối tượng" mà đại diện cho bia mộ nhưng không phải là null - trong trường hợp của bạn, bạn có thể gọi nó là một null-category). Trong tập hợp của bạn, bạn chỉ cần trả lại giá trị null làm kết quả tích cực nếu bản ghi đầu vào có giá trị null-category. Điều này sau đó sẽ dịch trong một thông điệp tombstone cho KTable của bạn và xóa danh sách các danh mục hiện tại cho khóa này.

EDIT END

Và tất nhiên bạn luôn có thể xây dựng một giải pháp tùy chỉnh thông qua bộ xử lý API. Tuy nhiên, nếu DSL có thể cung cấp cho bạn muốn bạn cần, không có lý do chính đáng để làm điều này.

+0

Đã cập nhật câu hỏi bằng các ví dụ để tải toàn bộ dữ liệu tra cứu. Tôi có thể đã hiểu lầm những gì một KTable giữ, sẽ chờ đợi cho bản cập nhật của bạn). –

+0

Cảm ơn. Tôi đã cập nhật câu trả lời của mình. –

6
  1. Tải dữ liệu tĩnh bên ngoài Kafka Suối và chỉ sử dụng KStreams # map() để thêm siêu dữ liệu. Điều này có thể là Kafka Streams chỉ là một thư viện.

Công trình này hoạt động. Nhưng thông thường mọi người chọn tùy chọn tiếp theo bạn liệt kê, vì dữ liệu bên cạnh làm phong phú luồng đầu vào thường không hoàn toàn tĩnh; đúng hơn, nó đang thay đổi nhưng phần nào không thường xuyên:

  1. tải siêu dữ liệu đến một chủ đề Kafka, tải nó vào một KTable và làm KStreams # leftJoin(), điều này có vẻ tự nhiên hơn và lá phân vùng vv để Kafka Streams. Tuy nhiên, điều này đòi hỏi chúng ta phải giữ cho KTable được nạp với tất cả các giá trị. Lưu ý rằng chúng tôi sẽ phải tải toàn bộ dữ liệu tra cứu và không chỉ thay đổi.

Đây là cách tiếp cận thông thường và tôi khuyên bạn nên tuân thủ điều này trừ khi bạn có lý do cụ thể.

Tuy nhiên, điều này yêu cầu chúng tôi giữ cho KTable được tải với tất cả các giá trị. Lưu ý rằng chúng tôi sẽ phải tải toàn bộ dữ liệu tra cứu và không chỉ thay đổi.

Vì vậy, tôi đoán bạn cũng thích tùy chọn thứ hai, nhưng bạn lo lắng về việc liệu điều này có hiệu quả hay không.

Câu trả lời ngắn gọn là: Có, KTable sẽ được tải bằng tất cả các giá trị (mới nhất) cho mỗi khóa. Bảng sẽ chứa toàn bộ dữ liệu tra cứu, nhưng hãy nhớ rằng KTable được phân đoạn phía sau hậu trường: nếu, ví dụ, chủ đề đầu vào của bạn (cho bảng) có phân vùng 3, sau đó bạn có thể chạy tới 3 phiên bản ứng dụng của mình , mỗi phân vùng nhận được phân vùng 1 của bảng (giả sử dữ liệu được trải đều trên các phân vùng, sau đó mỗi phân vùng/chia sẻ của bảng sẽ chứa khoảng 1/3 dữ liệu của bảng).Vì vậy, trong thực tế nhiều khả năng hơn nó không "chỉ hoạt động".

Có thể luôn chỉ buộc một luồng được đọc ngay từ đầu khi khởi động lại, điều này là để tất cả siêu dữ liệu có thể được tải vào KTable.

Bạn không cần phải lo lắng về điều đó. Nói một cách đơn giản, nếu không có "bản sao" cục bộ của bảng có sẵn, thì API luồng sẽ tự động đảm bảo rằng dữ liệu của bảng được đọc hoàn toàn từ đầu. Nếu có sẵn một bản sao cục bộ, thì ứng dụng của bạn sẽ sử dụng lại bản sao đó (và cập nhật bản sao cục bộ của nó bất cứ khi nào có dữ liệu mới trong chủ đề đầu vào của bảng). câu trả lời

dài hơn với các ví dụ

Hãy tưởng tượng các dữ liệu đầu vào sau (nghĩ: dòng changelog) cho KTable của bạn, lưu ý cách nhập này bao gồm 6 thông điệp:

(alice, 1) -> (bob, 40) -> (alice, 2) -> (charlie, 600), (alice, 5), (bob, 22) 

Và đây là những trạng thái khác nhau của "hợp lý" KTable mà kết quả từ đầu vào này là, trong đó mỗi thông báo đầu vào mới nhận được (chẳng hạn như (alice, 1)) sẽ dẫn đến trạng thái mới của bảng:

Key  Value 
-------------- 
alice | 1 // (alice, 1) received 

| 
V 

Key  Value 
-------------- 
alice | 1 
bob  | 40 // (bob, 40) received 

| 
V 

Key  Value 
-------------- 
alice | 2 // (alice, 2) received 
bob  | 40 

| 
V 

Key  Value 
-------------- 
alice | 2 
bob  | 40 
charlie | 600 // (charlie, 600) received 

| 
V 

Key  Value 
-------------- 
alice | 5 // (alice, 5) received 
bob  | 40 
charlie | 600 

| 
V 

Key  Value 
-------------- 
alice | 5 
bob  | 22 // (bob, 22) received 
charlie | 600 

Điều bạn có thể thấy ở đây là, mặc dù dữ liệu đầu vào có thể có nhiều, nhiều thư (hoặc "thay đổi" như bạn đã nói; ở đây, chúng tôi có 6), số lượng mục/hàng trong kết quả KTable (đang trải qua các đột biến liên tục dựa trên đầu vào mới nhận được) là số lượng khóa duy nhất trong đầu vào (ở đây: bắt đầu với 1, tăng lên đến 3), thường ít hơn đáng kể so với số lượng tin nhắn. Vì vậy, nếu số lượng tin nhắn trong đầu vào là N và số lượng khóa duy nhất cho các tin nhắn này là M, thì thường là M << N (M nhỏ hơn đáng kể so với N; cộng với bản ghi, chúng tôi có bất biến M <= N). Đây là lý do đầu tiên tại sao "điều này đòi hỏi chúng ta phải giữ KTable nạp với tất cả các giá trị" thường không phải là một vấn đề, bởi vì chỉ có giá trị mới nhất được giữ lại cho mỗi khóa. Lý do thứ hai giúp ích là, như Matthias J. Sax đã chỉ ra, Kafka Streams sử dụng RocksDB làm công cụ lưu trữ mặc định cho các bảng như vậy (chính xác hơn: nhà nước lưu trữ lại bảng). RocksDB cho phép bạn duy trì các bảng lớn hơn bộ nhớ chính/vùng nhớ Java có sẵn của ứng dụng của bạn vì nó có thể tràn vào đĩa cục bộ.

Cuối cùng, lý do thứ ba là số KTable được phân đoạn. Vì vậy, nếu chủ đề đầu vào của bạn cho bảng là (nói) được cấu hình với phân vùng 3, thì điều xảy ra đằng sau hậu trường là bản thân số KTable được phân đoạn (suy nghĩ: được phân đoạn) theo cách tương tự.Trong ví dụ trên, dưới đây là những gì bạn có thể kết thúc, mặc dù "chia tách" chính xác phụ thuộc vào cách dữ liệu đầu vào ban đầu được trải rộng trên các phân đoạn của chủ đề đầu vào của bảng:

KTable hợp lý (trạng thái cuối cùng của tôi trình bày ở trên):

Key  Value 
-------------- 
alice | 5 
bob  | 22 
charlie | 600 

thực tế KTable, phân chia (giả sử 3 phân vùng cho chủ đề đầu vào của bảng, cộng với phím = tên người dùng được trải đều trên các phân vùng):

Key  Value 
-------------- 
alice | 5 // Assuming that all data for `alice` is in partition 1 

Key  Value 
-------------- 
bob  | 22 // ...for `bob` is in partition 2 

Key  Value 
-------------- 
charlie | 600 // ...for `charlie` is in partition 3 

trong thực tế, phân vùng này của inpu t dữ liệu - trong số những thứ khác - cho phép bạn "kích thước" các biểu hiện thực tế của KTable.

Một ví dụ khác:

  • Hãy tưởng tượng tình trạng mới nhất của KTable của bạn sẽ thường có kích thước của 1 TB (một lần nữa, kích thước xấp xỉ là một chức năng của số lượng các phím nhắn duy nhất trong dữ liệu đầu vào của bảng, nhân với kích thước trung bình của giá trị tin nhắn được liên kết).
  • Nếu chủ đề đầu vào của bảng chỉ có phân vùng 1, thì bản thân KTable cũng chỉ có phân vùng 1, với kích thước 1 TB. Ở đây, vì chủ đề đầu vào có nhưng phân vùng 1, bạn có thể chạy ứng dụng của mình với tối đa 1 phiên bản ứng dụng (do đó không thực sự là toàn bộ nhiều tính song song, heh).
  • Nếu chủ đề nhập của bảng có phân vùng 500, thì KTable cũng có phân vùng 500, với kích thước ~ 2 GB mỗi (giả sử dữ liệu được trải đều trên các phân vùng). Tại đây, bạn có thể chạy ứng dụng của mình với tối đa 500 phiên bản ứng dụng. Nếu bạn chạy chính xác 500 trường hợp, thì mỗi cá thể ứng dụng sẽ nhận được chính xác 1 phân vùng/phân đoạn của KTable hợp lý, do đó kết thúc bằng 2 GB dữ liệu bảng; nếu bạn chỉ chạy 100 trường hợp, thì mỗi trường hợp sẽ nhận được phân vùng 500/100 = 5 phân đoạn/bảng của bảng, kết thúc bằng khoảng 2 GB * 5 = 10 GB dữ liệu bảng.
+1

Điều gì sẽ xảy ra khi luồng đầu vào có nhiều phân vùng, luồng siêu dữ liệu chỉ có một phân vùng và có một số phiên bản ứng dụng? Mỗi trường hợp của ứng dụng sẽ tải luồng siêu dữ liệu hay một trong số đó tải nó và những người khác bằng cách nào đó sẽ nhận được giá trị từ ví dụ đó? –

Các vấn đề liên quan