5

Tôi đang cố gắng truyền một số dữ liệu từ google PubSub vào BigQuery bằng cách sử dụng một luồng dữ liệu python. Đối với mục đích thử nghiệm tôi đã thích nghi đoạn mã sau https://github.com/GoogleCloudPlatform/DataflowSDK-examples/blob/master/python/dataflow_examples/cookbook/bigquery_schema.py vào một đường ống trực tuyến bằng cách thiết lậpTruyền trực tuyến từ Pub/Sub sang BigQuery

options.view_as(StandardOptions).streaming = True 

Vì vậy, sau đó tôi đã thay đổi các đường ống dẫn record_ids để đọc từ Pub/Sub

# ADDED THIS 
lines = p | 'Read PubSub' >> beam.io.ReadStringsFromPubSub(INPUT_TOPIC) | beam.WindowInto(window.FixedWindows(15)) 
# CHANGED THIS # record_ids = p | 'CreateIDs' >> beam.Create(['1', '2', '3', '4', '5']) 
record_ids = lines | 'Split' >> (beam.FlatMap(split_fn).with_output_types(unicode)) 
records = record_ids | 'CreateRecords' >> beam.Map(create_random_record) 
records | 'Write' >> beam.io.Write(
    beam.io.BigQuerySink(
     OUTPUT, 
     schema=table_schema, 
     create_disposition=beam.io.BigQueryDisposition.CREATE_IF_NEEDED, 
     write_disposition=beam.io.BigQueryDisposition.WRITE_TRUNCATE)) 

Lưu ý: Tôi đã được danh sách trắng bởi google để chạy mã (trong alpha)

Bây giờ khi tôi dùng thử, tôi gặp lỗi

Workfl không thành công. Nguyên nhân: (f215df7c8fcdbb00): Unknown luồng chìm: bigquery

Bạn có thể tìm thấy mã đầy đủ ở đây: https://github.com/marcorigodanzo/gcp_streaming_test/blob/master/my_bigquery_schema.py

Tôi nghĩ rằng điều này đã làm với các đường ống dẫn là hiện nay loại trực tuyến, bất cứ ai có thể vui lòng cho tôi biết thế nào thế nào để làm một bigQuery viết trong một đường ống trực tuyến?

Trả lời

2

Python chùm không hỗ trợ viết thư cho BigQuery từ các tuyến đường truyền trực tuyến. Hiện tại, bạn cần sử dụng Beam Java - bạn có thể sử dụng tương ứng PubsubIO.readStrings()BigQueryIO.writeTableRows().

+0

OK, cảm ơn Eugene. Tôi đã hy vọng sử dụng python. Bạn có biết điều này sẽ thay đổi trong tương lai không? Bạn cũng có thể giải quyết cho tôi một ví dụ về đọc mã từ Pub/Sub và viết thư cho BigQuery trong java không? –

+0

Tôi tin rằng ví dụ này sử dụng cả https://github.com/apache/beam/blob/master/examples/java8/src/main/java/org/apache/beam/examples/complete/game/LeaderBoard.java. Có, Python cuối cùng sẽ bắt kịp với Java (có khả năng thông qua khung công tác di chuyển hiện tại của Beam, cho phép các đường ống Python sử dụng các biến đổi Java), nhưng tôi không thể dự đoán được dòng thời gian sẽ là gì. – jkff

Các vấn đề liên quan