2015-06-02 48 views
5

Tôi muốn chỉ định mỗi hàng đầu vào của mình là id - phải là một số từ 0 đến N - 1, trong đó N là số hàng trong đầu vào.zipWithIndex trên Apache Flink

đại khái, tôi muốn để có thể làm điều gì đó như sau:

val data = sc.textFile(textFilePath, numPartitions) 
val rdd = data.map(line => process(line)) 
val rddMatrixLike = rdd.zipWithIndex.map { case (v, idx) => someStuffWithIndex(idx, v) } 

Nhưng trong Apache Flink. Có thể không?

+0

Đó là một câu hỏi thú vị. Tôi sẽ cố gắng để đưa ra một thực hiện. –

Trả lời

6

Đây hiện là một phần của bản phát hành Apache Flink 0.10-SNAPSHOT. Ví dụ cho zipWithIndex(in)zipWithUniqueId(in) có sẵn trong Flink documentation chính thức.

5

Đây là một thực hiện đơn giản của hàm:

public class ZipWithIndex { 

public static void main(String[] args) throws Exception { 

    ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment(); 

    DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input"); 

    // count elements in each partition 
    DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() { 
     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception { 
      long cnt = 0; 
      for (String v : values) { 
       cnt++; 
      } 
      out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt)); 
     } 
    }); 

    DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() { 
     long start = 0; 

     @Override 
     public void open(Configuration parameters) throws Exception { 
      super.open(parameters); 
      List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts"); 
      Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() { 
       @Override 
       public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) { 
        return ZipWithIndex.compare(o1.f0, o2.f0); 
       } 
      }); 
      for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) { 
       start += offsets.get(i).f1; 
      } 
     } 

     @Override 
     public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception { 
      for(String v: values) { 
       out.collect(new Tuple2<Long, String>(start++, v)); 
      } 
     } 
    }).withBroadcastSet(counts, "counts"); 
    result.print(); 

} 

public static int compare(int x, int y) { 
    return (x < y) ? -1 : ((x == y) ? 0 : 1); 
} 
} 

Đây là cách hoạt động: Tôi đang sử dụng các hoạt động đầu tiên mapPartition() để đi qua tất cả các yếu tố trong các phân vùng để đếm có bao nhiêu yếu tố trong đó . Tôi cần phải biết số phần tử trong mỗi phân vùng để đặt đúng các khoảng cách khi gán ID cho các phần tử. Kết quả của mapPartition đầu tiên là một Tập dữ liệu chứa ánh xạ. Tôi đang phát Số liệu này cho tất cả các toán tử thứ hai mapPartition() sẽ gán ID cho các phần tử từ đầu vào. Trong phương thức open() của số mapPartition() thứ hai, tôi tính toán độ lệch cho mỗi phân vùng.

Tôi có thể sẽ đóng góp mã cho Flink (sau khi thảo luận với các bên khác).

+0

Cảm ơn Robert! Bạn có thể cũng có thể giải thích trong một vài từ cách hoạt động này? Ví dụ. tại sao chúng ta sử dụng 'getRuntimeContext(). getIndexOfThisSubtask()' và tại sao số lượng phát sóng của mỗi phân vùng có thể giúp ích? –

+0

Điểm tốt. Tôi sẽ sớm thêm một số mô tả. –

+0

Mô tả được thêm –

Các vấn đề liên quan