Đây là một thực hiện đơn giản của hàm:
public class ZipWithIndex {
public static void main(String[] args) throws Exception {
ExecutionEnvironment ee = ExecutionEnvironment.getExecutionEnvironment();
DataSet<String> in = ee.readTextFile("/home/robert/flink-workdir/debug/input");
// count elements in each partition
DataSet<Tuple2<Integer, Long>> counts = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Integer, Long>>() {
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Integer, Long>> out) throws Exception {
long cnt = 0;
for (String v : values) {
cnt++;
}
out.collect(new Tuple2<Integer, Long>(getRuntimeContext().getIndexOfThisSubtask(), cnt));
}
});
DataSet<Tuple2<Long, String>> result = in.mapPartition(new RichMapPartitionFunction<String, Tuple2<Long, String>>() {
long start = 0;
@Override
public void open(Configuration parameters) throws Exception {
super.open(parameters);
List<Tuple2<Integer, Long>> offsets = getRuntimeContext().getBroadcastVariable("counts");
Collections.sort(offsets, new Comparator<Tuple2<Integer, Long>>() {
@Override
public int compare(Tuple2<Integer, Long> o1, Tuple2<Integer, Long> o2) {
return ZipWithIndex.compare(o1.f0, o2.f0);
}
});
for(int i = 0; i < getRuntimeContext().getIndexOfThisSubtask(); i++) {
start += offsets.get(i).f1;
}
}
@Override
public void mapPartition(Iterable<String> values, Collector<Tuple2<Long, String>> out) throws Exception {
for(String v: values) {
out.collect(new Tuple2<Long, String>(start++, v));
}
}
}).withBroadcastSet(counts, "counts");
result.print();
}
public static int compare(int x, int y) {
return (x < y) ? -1 : ((x == y) ? 0 : 1);
}
}
Đây là cách hoạt động: Tôi đang sử dụng các hoạt động đầu tiên mapPartition()
để đi qua tất cả các yếu tố trong các phân vùng để đếm có bao nhiêu yếu tố trong đó . Tôi cần phải biết số phần tử trong mỗi phân vùng để đặt đúng các khoảng cách khi gán ID cho các phần tử. Kết quả của mapPartition
đầu tiên là một Tập dữ liệu chứa ánh xạ. Tôi đang phát Số liệu này cho tất cả các toán tử thứ hai mapPartition()
sẽ gán ID cho các phần tử từ đầu vào. Trong phương thức open()
của số mapPartition()
thứ hai, tôi tính toán độ lệch cho mỗi phân vùng.
Tôi có thể sẽ đóng góp mã cho Flink (sau khi thảo luận với các bên khác).
Nguồn
2015-06-02 14:26:11
Đó là một câu hỏi thú vị. Tôi sẽ cố gắng để đưa ra một thực hiện. –