2014-12-30 18 views
7

Tôi đang sử dụng tia lửa với cassandra và i hava a JavaRDD<String> của khách hàng. Và đối với mỗi khách hàng, tôi muốn chọn từ cassandra Tương tác của mình như thế này:JavaSparkContext không thể nối tiếp

avaPairRDD<String, List<InteractionByMonthAndCustomer>> a = client.mapToPair(new PairFunction<String, String, List<InteractionByMonthAndCustomer>>() { 
     @Override 
     public Tuple2<String, List<InteractionByMonthAndCustomer>> call(String s) throws Exception {    
      List<InteractionByMonthAndCustomer> b = javaFunctions(sc) 
        .cassandraTable(CASSANDRA_SCHEMA, "interaction_by_month_customer") 
        .where("ctid =?", s) 
        .map(new Function<CassandraRow, InteractionByMonthAndCustomer>() { 
         @Override 
         public InteractionByMonthAndCustomer call(CassandraRow cassandraRow) throws Exception { 
          return new InteractionByMonthAndCustomer(cassandraRow.getString("channel"), 
            cassandraRow.getString("motif"), 
            cassandraRow.getDate("start"), 
            cassandraRow.getDate("end"), 
            cassandraRow.getString("ctid"), 
            cassandraRow.getString("month") 
          ); 
         } 
        }).collect(); 
      return new Tuple2<String, List<InteractionByMonthAndCustomer>>(s, b); 
     } 
    }); 

Đối với điều này tôi đang sử dụng một JavaSparkContext sc. Nhưng tôi gặp lỗi này:

Exception in thread "main" org.apache.spark.SparkException: Task not serializable 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:166) 
at org.apache.spark.util.ClosureCleaner$.clean(ClosureCleaner.scala:158) 
at org.apache.spark.SparkContext.clean(SparkContext.scala:1242) 
at org.apache.spark.rdd.RDD.map(RDD.scala:270) 
at org.apache.spark.api.java.JavaRDDLike$class.mapToPair(JavaRDDLike.scala:99) 
at org.apache.spark.api.java.JavaRDD.mapToPair(JavaRDD.scala:32) 
at fr.aid.cim.spark.dao.GenrateCustumorJourney.AllCleintInteractions(GenrateCustumorJourney.java:91) 
at fr.aid.cim.spark.dao.GenrateCustumorJourney.main(GenrateCustumorJourney.java:75) 
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method) 
at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:62) 
at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43) 
at java.lang.reflect.Method.invoke(Method.java:483) 
at org.apache.spark.deploy.SparkSubmit$.launch(SparkSubmit.scala:328) 
at org.apache.spark.deploy.SparkSubmit$.main(SparkSubmit.scala:75) 
at org.apache.spark.deploy.SparkSubmit.main(SparkSubmit.scala) 
Caused by: java.io.NotSerializableException: org.apache.spark.api.java.JavaSparkContext 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1184) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.defaultWriteFields(ObjectOutputStream.java:1548) 
at java.io.ObjectOutputStream.writeSerialData(ObjectOutputStream.java:1509) 
at java.io.ObjectOutputStream.writeOrdinaryObject(ObjectOutputStream.java:1432) 
at java.io.ObjectOutputStream.writeObject0(ObjectOutputStream.java:1178) 
at java.io.ObjectOutputStream.writeObject(ObjectOutputStream.java:348) 
at org.apache.spark.serializer.JavaSerializationStream.writeObject(JavaSerializer.scala:42) 
at org.apache.spark.serializer.JavaSerializerInstance.serialize(JavaSerializer.scala:73) 
at org.apache.spark.util.ClosureCleaner$.ensureSerializable(ClosureCleaner.scala:164) 
... 14 more 

Tôi nghĩ rằng JavaSparkContext phải được tuần tự hóa. Nhưng làm thế nào tôi có thể làm cho nó serializable xin vui lòng?

Cảm ơn bạn.

Trả lời

12

Không, JavaSparkContext không thể tuần tự hóa và không được cho là. Nó không thể được sử dụng trong một chức năng bạn gửi cho công nhân từ xa. Ở đây bạn không tham chiếu rõ ràng nhưng một tham chiếu đang được tuần tự hóa bởi vì hàm lớp bên trong vô danh của bạn không phải là static và do đó có tham chiếu đến lớp kèm theo.

Hãy thử viết lại mã của bạn bằng chức năng này dưới dạng đối tượng độc lập static.

0

Bạn không thể sử dụng SparkContext và tạo RDD khác từ bên trong một trình thực thi (hàm bản đồ của RDD).

Bạn phải tạo Cassandra RDD (sc.cassandraTable) trong trình điều khiển và sau đó thực hiện một phép nối giữa hai RDD này (RDD của khách hàng và bảng cassandra RDD).

+0

Đúng, mã không nên làm việc bất kỳ cách nào (Spark cấm chuyển đổi bên trong chuyển đổi vv ..) –

0

Khai báo nó với transient keyword:

private transient JavaSparkContext sparkContext;