2015-12-02 60 views
5

Tôi có thể đặt mức độ song song khác nhau cho một phần khác nhau của tác vụ trong chương trình của chúng tôi trong Flink không? Ví dụ: Flink giải thích mã mẫu sau đây như thế nào? Hai học viên tùy chỉnh MyPartitioner1, MyPartitioner2, phân vùng dữ liệu đầu vào hai phân vùng 4 và 2.Mức độ song song trong Apache Flink

partitionedData1 = inputData1 
    .partitionCustom(new MyPartitioner1(), 1); 
env.setParallelism(4); 
DataSet<Tuple2<Integer, Integer>> output1 = partitionedData1 
    .mapPartition(new calculateFun()); 

partitionedData2 = inputData2 
    .partitionCustom(new MyPartitioner2(), 2); 
env.setParallelism(2); 
DataSet<Tuple2<Integer, Integer>> output2 = partitionedData2 
    .mapPartition(new calculateFun()); 

tôi nhận được lỗi sau cho mã này:

Exception in thread "main" org.apache.flink.runtime.client.JobExecutionException: Job execution failed. 
    at org.apache.flink.runtime.jobmanager.JobManager$$anonfun$receiveWithLogMessages$1.applyOrElse(JobManager.scala:314) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply$mcVL$sp(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:33) 
    at scala.runtime.AbstractPartialFunction$mcVL$sp.apply(AbstractPartialFunction.scala:25) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:36) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.apply(ActorLogMessages.scala:29) 
    at scala.PartialFunction$class.applyOrElse(PartialFunction.scala:118) 
    at org.apache.flink.runtime.ActorLogMessages$$anon$1.applyOrElse(ActorLogMessages.scala:29) 
    at akka.actor.Actor$class.aroundReceive(Actor.scala:465) 
    at org.apache.flink.runtime.jobmanager.JobManager.aroundReceive(JobManager.scala:92) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:516) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:487) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:254) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:221) 
    at akka.dispatch.Mailbox.exec(Mailbox.scala:231) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 
Caused by: java.lang.ArrayIndexOutOfBoundsException: 2 
    at org.apache.flink.runtime.io.network.api.writer.RecordWriter.emit(RecordWriter.java:80) 
    at org.apache.flink.runtime.operators.shipping.OutputCollector.collect(OutputCollector.java:65) 
    at org.apache.flink.runtime.operators.NoOpDriver.run(NoOpDriver.java:92) 
    at org.apache.flink.runtime.operators.RegularPactTask.run(RegularPactTask.java:496) 
    at org.apache.flink.runtime.operators.RegularPactTask.invoke(RegularPactTask.java:362) 
    at org.apache.flink.runtime.taskmanager.Task.run(Task.java:559) 
    at java.lang.Thread.run(Unknown Source) 

Trả lời

5

ExecutionEnvironment.setParallelism() đặt song song cho toàn bộ chương trình, nghĩa là, tất cả các nhà khai thác của chương trình.

Bạn có thể chỉ định song song cho từng toán tử riêng lẻ bằng cách gọi phương thức setParallelism() trên toán tử.

ArrayIndexOutOfBoundsException được ném vì trình phân hoạch tùy chỉnh của bạn trả lại số phân vùng không hợp lệ có thể do mức độ song song bất ngờ. Trình phân hoạch tùy chỉnh nhận được sự song song thực sự của người nhận dưới dạng tham số trong phương thức partition(K key, int numPartitions) của nó.

Các vấn đề liên quan