Tôi có trường hợp sử dụng nhỏ trong Apache flink, đó là, một hệ thống xử lý hàng loạt. Tôi cần xử lý một tập hợp các tập tin. Việc xử lý từng tệp phải được xử lý bởi một máy. Tôi có mã dưới đây. Tất cả thời gian chỉ có một khe nhiệm vụ là chiếm đóng, và các tập tin được xử lý một sau khi khác. Tôi có 6 nút (6 trình quản lý tác vụ) và cấu hình 4 khe nhiệm vụ trong mỗi nút. Vì vậy, tôi mong đợi 24 tập tin được xử lý tại một thời điểm.công việc flink không được phân phối trên các máy
class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] {
override def mapPartition(
myfiles: java.lang.Iterable[java.io.File],
out:org.apache.flink.util.Collector[Int])
: Unit = {
var temp = myfiles.iterator()
while(temp.hasNext()){
val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh")
val file = new File(temp.next().toURI)
Process(
"/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv",
new File(fp1.getAbsoluteFile.getParent))
.lines
.foreach{println}
out.collect(1)
}
}
}
Tôi đã khởi chạy flink dưới dạng lệnh ./bin/start-cluster.sh và giao diện người dùng web hiển thị nó có 6 trình quản lý tác vụ, 24 vùng tác vụ.
Thư mục chứa khoảng 49 tệp. Khi tôi tạo mapPartition trên bộ sưu tập này, tôi mong đợi 49 quy trình song song được kéo dài. Nhưng sau đó, trong cơ sở hạ tầng của tôi, tất cả chúng đều được xử lý sau cái kia. Điều này có nghĩa là chỉ có một máy (một người quản lý tác vụ) xử lý tất cả 49 tên tập tin. Những gì tôi muốn là, như được cấu hình 2 nhiệm vụ cho mỗi khe, tôi mong đợi 24 tập tin được xử lý đồng thời.
Mọi con trỏ chắc chắn sẽ trợ giúp tại đây. Tôi có các tham số này trong tệp flink-conf.yaml
jobmanager.heap.mb: 2048
taskmanager.heap.mb: 1024
taskmanager.numberOfTaskSlots: 4
taskmanager.memory.preallocate: false
parallelism.default: 24
Xin cảm ơn trước. Ai đó có thể ném tôi vào chỗ tôi đang đi sai không?
Cố gắng thêm ** setParallelism (49) ** sau ** mapPartition (MyMapPartitionFunction()) mới **. ** env.fromCollection() ** sẽ tạo toán tử song song 1 (mặc dù bạn đã cấu hình song song công việc thành 24 trong flink-conf.yaml, vì nó sử dụng định dạng đầu vào ** NonParallelInput **). Nếu không thiết lập song song, toán tử * map map * sẽ kế thừa tính song song của nó từ nguồn. – David