6

Tôi muốn chèn dữ liệu thư PubSub đến từ một chủ đề vào bảng BigQuery bằng Google Cloud Dataflow. Mọi thứ hoạt động tốt nhưng trong bảng BigQuery tôi có thể thấy các chuỗi không đọc được như "߈ ". Đây là đường ống dẫn của tôi:Chèn thư PubSub vào BigQuery thông qua Google Cloud Dataflow

p.apply(PubsubIO.Read.named("ReadFromPubsub").topic("projects/project-name/topics/topic-name")) 
.apply(ParDo.named("Transformation").of(new StringToRowConverter())) 
.apply(BigQueryIO.Write.named("Write into BigQuery").to("project-name:dataset-name.table") 
    .withSchema(schema) 
    .withCreateDisposition(BigQueryIO.Write.CreateDisposition.CREATE_IF_NEEDED)) 

và chức năng StringToRowConverter đơn giản của tôi là:

class StringToRowConverter extends DoFn<String, TableRow> { 
private static final long serialVersionUID = 0; 

@Override 
public void processElement(ProcessContext c) { 
    for (String word : c.element().split(",")) { 
     if (!word.isEmpty()) { 
      System.out.println(word); 
     c.output(new TableRow().set("data", word)); 
     } 
    } 
} 
} 

Và đây là thông điệp mà tôi gửi thông qua một yêu cầu POST:

POST https://pubsub.googleapis.com/v1/projects/project-name/topics/topic-name:publish 
{ 
"messages": [ 
    { 
    "attributes":{ 
"key": "tablet, smartphone, desktop", 
"value": "eng" 
    }, 
    "data": "34gf5ert" 
    } 
] 
} 

tôi đang thiếu gì ? Cảm ơn bạn!

+0

[This] (https://github.com/bomboradata/pubsub-to-bigquery) là một mã nguồn mở bạn có thể sử dụng để chuyển hướng pub/sub tới BQ – PUG

Trả lời

6

Theo https://cloud.google.com/pubsub/reference/rest/v1/PubsubMessage, tải trọng JSON của thông điệp pubsub được mã hóa base64. PubsubIO trong Dataflow, theo mặc định, sử dụng coder String UTF8. Chuỗi ví dụ bạn đã cung cấp "34gf5ert", khi giải mã base64 và sau đó được hiểu là chuỗi UTF-8, cung cấp chính xác "߈ ".

2

Đây là cách tôi giải nén thông điệp pubsub tôi:

@Override 
public void processElement(ProcessContext c) { 

    String json = c.element(); 

    HashMap<String,String> items = new Gson().fromJson(json, new TypeToken<HashMap<String, String>>(){}.getType()); 
    String unpacked = items.get("JsonKey"); 

Hy vọng hữu ích của nó đối với bạn.

Các vấn đề liên quan