2017-05-04 47 views
9

Tôi có trường hợp sử dụng nhỏ trong Apache flink, đó là, một hệ thống xử lý hàng loạt. Tôi cần xử lý một tập hợp các tập tin. Việc xử lý từng tệp phải được xử lý bởi một máy. Tôi có mã dưới đây. Tất cả thời gian chỉ có một khe nhiệm vụ là chiếm đóng, và các tập tin được xử lý một sau khi khác. Tôi có 6 nút (6 trình quản lý tác vụ) và cấu hình 4 khe nhiệm vụ trong mỗi nút. Vì vậy, tôi mong đợi 24 tập tin được xử lý tại một thời điểm.công việc flink không được phân phối trên các máy

class MyMapPartitionFunction extends RichMapPartitionFunction[java.io.File, Int] { 
    override def mapPartition(
     myfiles: java.lang.Iterable[java.io.File], 
     out:org.apache.flink.util.Collector[Int]) 
    : Unit = { 
    var temp = myfiles.iterator() 
    while(temp.hasNext()){ 
     val fp1 = getRuntimeContext.getDistributedCache.getFile("hadoopRun.sh") 
     val file = new File(temp.next().toURI) 
     Process(
     "/bin/bash ./run.sh " + argumentsList(3)+ "/" + file.getName + " " + argumentsList(7) + "/" + file.getName + ".csv", 
     new File(fp1.getAbsoluteFile.getParent)) 
     .lines 
     .foreach{println} 
     out.collect(1) 
    } 
    } 
} 

Tôi đã khởi chạy flink dưới dạng lệnh ./bin/start-cluster.sh và giao diện người dùng web hiển thị nó có 6 trình quản lý tác vụ, 24 vùng tác vụ.

Thư mục chứa khoảng 49 tệp. Khi tôi tạo mapPartition trên bộ sưu tập này, tôi mong đợi 49 quy trình song song được kéo dài. Nhưng sau đó, trong cơ sở hạ tầng của tôi, tất cả chúng đều được xử lý sau cái kia. Điều này có nghĩa là chỉ có một máy (một người quản lý tác vụ) xử lý tất cả 49 tên tập tin. Những gì tôi muốn là, như được cấu hình 2 nhiệm vụ cho mỗi khe, tôi mong đợi 24 tập tin được xử lý đồng thời.

Mọi con trỏ chắc chắn sẽ trợ giúp tại đây. Tôi có các tham số này trong tệp flink-conf.yaml

jobmanager.heap.mb: 2048 
taskmanager.heap.mb: 1024 
taskmanager.numberOfTaskSlots: 4 
taskmanager.memory.preallocate: false 
parallelism.default: 24 

Xin cảm ơn trước. Ai đó có thể ném tôi vào chỗ tôi đang đi sai không?

+1

Cố gắng thêm ** setParallelism (49) ** sau ** mapPartition (MyMapPartitionFunction()) mới **. ** env.fromCollection() ** sẽ tạo toán tử song song 1 (mặc dù bạn đã cấu hình song song công việc thành 24 trong flink-conf.yaml, vì nó sử dụng định dạng đầu vào ** NonParallelInput **). Nếu không thiết lập song song, toán tử * map map * sẽ kế thừa tính song song của nó từ nguồn. – David

Trả lời

2

Vì David mô tả sự cố là env.fromCollection(Iterable[T]) tạo một DataSource với số không song song InputFormat. Do đó, DataSource được thực hiện với sự song song là 1. Các toán tử tiếp theo (mapPartition) kế thừa tính song song này từ nguồn để chúng có thể bị xích (điều này tiết kiệm cho chúng ta một sự lộn xộn mạng).

Cách giải quyết vấn đề này là một trong hai cân bằng lại một cách rõ ràng nguồn DataSet qua

env.fromCollection(folders).rebalance() 

hoặc để thiết lập một cách rõ ràng song song chúc tại các nhà điều hành tiếp theo (mapPartition):

env.fromCollection(folders).mapPartition(...).setParallelism(49) 
+0

Cảm ơn rất nhiều Rohrmann và David. rebalance() trông sạch hơn và nó hoạt động quá !! – Bala

Các vấn đề liên quan