2015-02-05 20 views
8

Tôi rất mới với Apache Spark. Tôi thực sự muốn tập trung vào đặc tả API cơ bản của Spark và muốn hiểu và viết một số chương trình bằng cách sử dụng API Spark. Tôi đã viết một chương trình java bằng cách sử dụng Apache Spark để thực hiện khái niệm Tham gia.Ví dụ về Apache Spark Tham gia với Java

Khi tôi sử dụng Left Outer Join - leftOuterJoin() hoặc Right External Join - rightOuterJoin(), cả hai phương thức đều trả về một JavaPairRDD chứa một kiểu Google Options đặc biệt. Nhưng tôi không biết cách trích xuất các giá trị ban đầu từ loại Tùy chọn.

Dù sao tôi cũng muốn biết tôi có thể sử dụng cùng phương thức kết hợp để trả lại dữ liệu theo định dạng của riêng mình hay không. Tôi không tìm được cách nào để làm điều đó. Có nghĩa là khi tôi đang sử dụng Apache Spark, tôi không thể tùy chỉnh mã theo phong cách riêng của mình vì chúng đã có tất cả những thứ được xác định trước.

Hãy tìm mã dưới đây

my 2 sample input datasets 

customers_data.txt: 
4000001,Kristina,Chung,55,Pilot 
4000002,Paige,Chen,74,Teacher 
4000003,Sherri,Melton,34,Firefighter 

and 

trasaction_data.txt 
00000551,12-30-2011,4000001,092.88,Games,Dice & Dice Sets,Buffalo,New York,credit 
00004811,11-10-2011,4000001,180.35,Outdoor Play Equipment,Water Tables,Brownsville,Texas,credit 
00034388,09-11-2011,4000002,020.55,Team Sports,Beach Volleyball,Orange,California,cash 
00008996,11-21-2011,4000003,121.04,Outdoor Recreation,Fishing,Colorado Springs,Colorado,credit 
00009167,05-24-2011,4000003,194.94,Exercise & Fitness,Foam Rollers,El Paso,Texas,credit 

Đây là mã Java của tôi

**SparkJoins.java:** 

public class SparkJoins { 

    @SuppressWarnings("serial") 
    public static void main(String[] args) throws FileNotFoundException { 
     JavaSparkContext sc = new JavaSparkContext(new SparkConf().setAppName("Spark Count").setMaster("local")); 
     JavaRDD<String> customerInputFile = sc.textFile("C:/path/customers_data.txt"); 
     JavaPairRDD<String, String> customerPairs = customerInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] customerSplit = s.split(","); 
       return new Tuple2<String, String>(customerSplit[0], customerSplit[1]); 
      } 
     }).distinct(); 

     JavaRDD<String> transactionInputFile = sc.textFile("C:/path/transactions_data.txt"); 
     JavaPairRDD<String, String> transactionPairs = transactionInputFile.mapToPair(new PairFunction<String, String, String>() { 
      public Tuple2<String, String> call(String s) { 
       String[] transactionSplit = s.split(","); 
       return new Tuple2<String, String>(transactionSplit[2], transactionSplit[3]+","+transactionSplit[1]); 
      } 
     }); 

     //Default Join operation (Inner join) 
     JavaPairRDD<String, Tuple2<String, String>> joinsOutput = customerPairs.join(transactionPairs); 
     System.out.println("Joins function Output: "+joinsOutput.collect()); 

     //Left Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<String, Optional<String>>>> leftJoinOutput = customerPairs.leftOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("LeftOuterJoins function Output: "+leftJoinOutput.collect()); 

     //Right Outer join operation 
     JavaPairRDD<String, Iterable<Tuple2<Optional<String>, String>>> rightJoinOutput = customerPairs.rightOuterJoin(transactionPairs).groupByKey().sortByKey(); 
     System.out.println("RightOuterJoins function Output: "+rightJoinOutput.collect()); 

     sc.close(); 
    } 
} 

Và đây sản lượng mà tôi đang nhận được

Joins function Output: [(4000001,(Kristina,092.88,12-30-2011)), (4000001,(Kristina,180.35,11-10-2011)), (4000003,(Sherri,121.04,11-21-2011)), (4000003,(Sherri,194.94,05-24-2011)), (4000002,(Paige,020.55,09-11-2011))] 

LeftOuterJoins function Output: [(4000001,[(Kristina,Optional.of(092.88,12-30-2011)), (Kristina,Optional.of(180.35,11-10-2011))]), (4000002,[(Paige,Optional.of(020.55,09-11-2011))]), (4000003,[(Sherri,Optional.of(121.04,11-21-2011)), (Sherri,Optional.of(194.94,05-24-2011))])] 

RightOuterJoins function Output: [(4000001,[(Optional.of(Kristina),092.88,12-30-2011), (Optional.of(Kristina),180.35,11-10-2011)]), (4000002,[(Optional.of(Paige),020.55,09-11-2011)]), (4000003,[(Optional.of(Sherri),121.04,11-21-2011), (Optional.of(Sherri),194.94,05-24-2011)])] 

Tôi đang chạy chương trình này trên Nền tảng Windows

Hãy quan sát đầu ra ở trên và giúp đỡ tôi trong chiết xuất các giá trị từ kiểu Optional

Cảm ơn trước

+0

Tại sao không sử dụng Scala để thay thế? – maasg

+0

Xin chào @maasg, tôi cơ bản là một nhà phát triển java .. Tôi thực sự không biết Scala .. Nhưng tôi nghĩ Apache Spark là phù hợp nhất cho lập trình Scala rồi Java. –

+0

@ShekarPatel bạn có thể vui lòng cập nhật mã của bạn bằng cách bạn đã xóa Tùy chọn đó .. sẽ hữu ích cho người khác. – Shankar

Trả lời

8

Khi bạn làm bên ngoài trái tham gia và ngay bên ngoài tham gia, bạn có thể có giá trị null. đúng!

Vì vậy, tia lửa trả về Đối tượng tùy chọn. sau khi nhận được kết quả đó, bạn có thể ánh xạ kết quả đó theo định dạng của riêng bạn.

bạn có thể sử dụng phương thức isPresent() Tùy chọn để ánh xạ dữ liệu của bạn.

Dưới đây là ví dụ:

JavaPairRDD<String,String> firstRDD = .... 
JavaPairRDD<String,String> secondRDD =.... 
// join both rdd using left outerjoin 
JavaPairRDD<String, Tuple2<String, Optional<Boolean>>> rddWithJoin = firstRDD.leftOuterJoin(secondRDD); 


// mapping of join result 
JavaPairRDD<String, String> mappedRDD = rddWithJoin 
      .mapToPair(tuple -> { 
       if (tuple._2()._2().isPresent()) { 
        //do your operation and return 
        return new Tuple2<String, String>(tuple._1(), tuple._2()._1()); 
       } else { 
        return new Tuple2<String, String>(tuple._1(), "not present"); 
       } 
      }); 
+0

Cảm ơn bạn thân .. Nó hoạt động tốt .. –

+0

@ sms_1190 làm cách nào để ánh xạ kết quả đó đến định dạng của riêng chúng ta? tôi cũng phải đối mặt với cùng một vấn đề. – Shankar

+0

@Shankar: Tôi đã thêm ví dụ vào câu trả lời ở trên. mappedRDD là định dạng của riêng bạn. –