2016-09-12 18 views
5

TL; DR - Tôi có những gì trông giống như một DStream của Strings trong một ứng dụng PySpark. Tôi muốn gửi nó dưới dạng DStream[String] đến thư viện Scala. Tuy nhiên, các chuỗi không được Py4j chuyển đổi.Chuyển đổi PySpark RDD bằng Scala

Tôi đang làm việc trên ứng dụng PySpark để lấy dữ liệu từ Kafka bằng Spark Streaming. Tin nhắn của tôi là các chuỗi và tôi muốn gọi một phương thức trong mã Scala, chuyển cho nó một cá thể DStream[String]. Tuy nhiên, tôi không thể nhận được các chuỗi JVM thích hợp trong mã Scala. Dường như với tôi, các chuỗi Python không được chuyển thành các chuỗi Java mà thay vào đó, được tuần tự hóa.

Câu hỏi của tôi sẽ là: cách lấy chuỗi Java ra khỏi đối tượng DStream?


Đây là mã Python đơn giản nhất tôi đã đưa ra:

from pyspark.streaming import StreamingContext 
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1)) 

from pyspark.streaming.kafka import KafkaUtils 
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"}) 
values = stream.map(lambda tuple: tuple[1]) 

ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream) 

ssc.start() 

Tôi đang chạy mã này trong PySpark, truyền cho nó đường dẫn đến JAR của tôi:

pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar 

On phía Scala, tôi có:

package com.seigneurin 

import org.apache.spark.streaming.api.java.JavaDStream 

object MyPythonHelper { 
    def doSomething(jdstream: JavaDStream[String]) = { 
    val dstream = jdstream.dstream 
    dstream.foreachRDD(rdd => { 
     rdd.foreach(println) 
    }) 
    } 
} 

Không w, chúng ta hãy nói rằng tôi gửi một số dữ liệu vào Kafka:

echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN 

Các println tuyên bố trong Scala in đang cái gì đó trông giống như:

[[email protected] 

tôi mong đợi để có được foo bar để thay thế.

Bây giờ, nếu tôi thay thế các tuyên bố đơn giản println trong mã Scala với những điều sau:

rdd.foreach(v => println(v.getClass.getCanonicalName)) 

tôi nhận được:

java.lang.ClassCastException: [B cannot be cast to java.lang.String 

Điều này cho thấy các dây đang thực sự trôi qua như mảng byte .

Nếu tôi chỉ đơn giản là cố gắng để chuyển đổi mảng này của byte thành chuỗi (Tôi biết tôi thậm chí không xác định mã hóa):

 def doSomething(jdstream: JavaDStream[Array[Byte]]) = { 
     val dstream = jdstream.dstream 
     dstream.foreachRDD(rdd => { 
      rdd.foreach(bytes => println(new String(bytes))) 
     }) 
     } 

tôi nhận được một cái gì đó trông tương tự (ký tự đặc biệt có thể là tước):

�]qXfoo barqa. 

Điều này cho thấy chuỗi Python đã được tuần tự hóa (được chọn?). Làm thế nào tôi có thể lấy một chuỗi Java thích hợp để thay thế?

Trả lời

6

Ngắn câu chuyện ngắn không có cách nào được hỗ trợ để làm điều gì đó như thế này. Đừng thử điều này trong sản xuất. Mày đã được cảnh báo.

Nói chung, Spark không sử dụng Py4j cho bất kỳ điều gì khác ngoài một số cuộc gọi RPC cơ bản trên trình điều khiển và không khởi động cổng Py4j trên bất kỳ máy nào khác.Khi nó được yêu cầu (chủ yếu là MLlib và một số phần của SQL) Spark sử dụng Pyrolite để tuần tự hóa các đối tượng được truyền giữa JVM và Python.

Phần này của API là riêng tư (Scala) hoặc nội bộ (Python) và như vậy không nhằm mục đích sử dụng chung. Trong khi về mặt lý thuyết bạn truy cập vào nó anyway dù trên mỗi mẻ:

package dummy 

import org.apache.spark.api.java.JavaRDD 
import org.apache.spark.streaming.api.java.JavaDStream 
import org.apache.spark.sql.DataFrame 

object PythonRDDHelper { 
    def go(rdd: JavaRDD[Any]) = { 
    rdd.rdd.collect { 
     case s: String => s 
    }.take(5).foreach(println) 
    } 
} 

hoàn chỉnh dòng:

object PythonDStreamHelper { 
    def go(stream: JavaDStream[Any]) = { 
    stream.dstream.transform(_.collect { 
     case s: String => s 
    }).print 
    } 
} 

hoặc phơi bày lô cá nhân như DataFrames (lẽ là lựa chọn ít nhất ác):

object PythonDataFrameHelper { 
    def go(df: DataFrame) = { 
    df.show 
    } 
} 

và sử dụng các trình bao bọc này như sau:

from pyspark.streaming import StreamingContext 
from pyspark.mllib.common import _to_java_object_rdd 
from pyspark.rdd import RDD 

ssc = StreamingContext(spark.sparkContext, 10) 
spark.catalog.listTables() 

q = ssc.queueStream([sc.parallelize(["foo", "bar"]) for _ in range(10)]) 

# Reserialize RDD as Java RDD<Object> and pass 
# to Scala sink (only for output) 
q.foreachRDD(lambda rdd: ssc._jvm.dummy.PythonRDDHelper.go(
    _to_java_object_rdd(rdd) 
)) 

# Reserialize and convert to JavaDStream<Object> 
# This is the only option which allows further transformations 
# on DStream 
ssc._jvm.dummy.PythonDStreamHelper.go(
    q.transform(lambda rdd: RDD( # Reserialize but keep as Python RDD 
     _to_java_object_rdd(rdd), ssc.sparkContext 
    ))._jdstream 
) 

# Convert to DataFrame and pass to Scala sink. 
# Arguably there are relatively few moving parts here. 
q.foreachRDD(lambda rdd: 
    ssc._jvm.dummy.PythonDataFrameHelper.go(
     rdd.map(lambda x: (x,)).toDF()._jdf 
    ) 
) 

ssc.start() 
ssc.awaitTerminationOrTimeout(30) 
ssc.stop() 

điều này không được hỗ trợ, chưa được kiểm tra và như vậy thay vì vô ích cho bất kỳ điều gì khác ngoài các thử nghiệm với API Spark.

+1

Hoàn toàn rõ ràng và rất hữu ích. Cảm ơn! –

+0

Tôi rất vui vì tôi có thể giúp đỡ. Tôi có thể phóng đại một chút ở đây. Nếu mục tiêu của bạn là xây dựng các phần mở rộng ngôn ngữ độc lập thì bạn thực sự không thể tránh tinkering với internals tuy nhiên các nhà phát triển đã đưa ra quyết định có ý thức ở đây và rối tung với điều đó không dành cho những người yếu tim. – zero323

+0

Hi @ zero323 Tôi đang làm quá trình tương tự ở đây, nhưng có vấn đề lớn trong quá trình, tôi tạo ra một đối tượng để giao tiếp ứng dụng python của tôi với một kafka kerberized. Nhưng khi tôi tạo ra đối tượng, jvm của tia lửa không thể tìm thấy chức năng của tôi trong đối tượng. Nếu tôi tạo ra một lớp, nó sẽ tìm thấy lớp đó. Nhưng không thể gửi đối tượng rdd do lỗi: 'pyKafka ([org.apache.spark.api.java.JavaRDD, class java.lang.String]) không tồn tại' Tôi đang làm theo các bước. Điều gì có thể là worng? –

Các vấn đề liên quan