2016-02-28 14 views
7

Sau khi đọc một số trang tài liệu của Apache Flink (official documentation, dataartisans) cũng như các ví dụ được cung cấp trong official repository, tôi tiếp tục xem các ví dụ nơi chúng sử dụng làm nguồn dữ liệu để truyền tệp đã được tải xuống, luôn kết nối với máy chủ cục bộ.Nhận các phần tử JSON từ một trang web có Apache Flink

Tôi đang cố sử dụng Apache Flink để tải xuống các tệp JSON chứa dữ liệu động. Mục đích của tôi là cố gắng thiết lập url nơi tôi có thể truy cập tệp JSON làm nguồn đầu vào của Apache Flink, thay vì tải xuống nó bằng một hệ thống khác và xử lý tệp đã tải xuống với Apache Flink.

Có thể thiết lập kết nối mạng này với Apache Flink không?

Trả lời

4

Bạn có thể xác định URL bạn muốn tải xuống làm dữ liệu nhập DataStream và sau đó tải xuống tài liệu từ trong một số MapFunction. Mã sau đây minh họa điều này:

StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment(); 

DataStream<String> inputURLs = env.fromElements("http://www.json.org/index.html"); 

inputURLs.map(new MapFunction<String, String>() { 
    @Override 
    public String map(String s) throws Exception { 
     URL url = new URL(s); 
     InputStream is = url.openStream(); 

     BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(is)); 

     StringBuilder builder = new StringBuilder(); 
     String line; 

     try { 
      while ((line = bufferedReader.readLine()) != null) { 
       builder.append(line + "\n"); 
      } 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     try { 
      bufferedReader.close(); 
     } catch (IOException ioe) { 
      ioe.printStackTrace(); 
     } 

     return builder.toString(); 
    } 
}).print(); 

env.execute("URL download job"); 
+0

Tôi chạy mã mẫu, nhưng nó chỉ chạy một lần và đọc tất cả tệp. Tuy nhiên Iit không phải là streaming, tôi nghĩ rằng nó sẽ contiune đọc khi có incease trong tập tin json. – zt1983811

+0

Đối với điều đó bạn sẽ phải sử dụng 'ContinuousFileMonitoringFunction'. Phát trực tuyến mỗi lần không có nghĩa là công việc sẽ chạy vô hạn lâu. Điều này chỉ xảy ra nếu bạn có một nguồn không hữu hạn. Nhưng trong trường hợp này, hàm 'env.fromElements' tạo ra một nguồn phát hữu hạn. Khi nguồn này đạt đến kết thúc, chương trình sẽ chấm dứt. –

Các vấn đề liên quan