TL; DR - Tôi có những gì trông giống như một DStream của Strings trong một ứng dụng PySpark. Tôi muốn gửi nó dưới dạng DStream[String]
đến thư viện Scala. Tuy nhiên, các chuỗi không được Py4j chuyển đổi.Chuyển đổi PySpark RDD bằng Scala
Tôi đang làm việc trên ứng dụng PySpark để lấy dữ liệu từ Kafka bằng Spark Streaming. Tin nhắn của tôi là các chuỗi và tôi muốn gọi một phương thức trong mã Scala, chuyển cho nó một cá thể DStream[String]
. Tuy nhiên, tôi không thể nhận được các chuỗi JVM thích hợp trong mã Scala. Dường như với tôi, các chuỗi Python không được chuyển thành các chuỗi Java mà thay vào đó, được tuần tự hóa.
Câu hỏi của tôi sẽ là: cách lấy chuỗi Java ra khỏi đối tượng DStream
?
Đây là mã Python đơn giản nhất tôi đã đưa ra:
from pyspark.streaming import StreamingContext
ssc = StreamingContext(sparkContext=sc, batchDuration=int(1))
from pyspark.streaming.kafka import KafkaUtils
stream = KafkaUtils.createDirectStream(ssc, ["IN"], {"metadata.broker.list": "localhost:9092"})
values = stream.map(lambda tuple: tuple[1])
ssc._jvm.com.seigneurin.MyPythonHelper.doSomething(values._jdstream)
ssc.start()
Tôi đang chạy mã này trong PySpark, truyền cho nó đường dẫn đến JAR của tôi:
pyspark --driver-class-path ~/path/to/my/lib-0.1.1-SNAPSHOT.jar
On phía Scala, tôi có:
package com.seigneurin
import org.apache.spark.streaming.api.java.JavaDStream
object MyPythonHelper {
def doSomething(jdstream: JavaDStream[String]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(println)
})
}
}
Không w, chúng ta hãy nói rằng tôi gửi một số dữ liệu vào Kafka:
echo 'foo bar' | $KAFKA_HOME/bin/kafka-console-producer.sh --broker-list localhost:9092 --topic IN
Các println
tuyên bố trong Scala in đang cái gì đó trông giống như:
[[email protected]
tôi mong đợi để có được foo bar
để thay thế.
Bây giờ, nếu tôi thay thế các tuyên bố đơn giản println
trong mã Scala với những điều sau:
rdd.foreach(v => println(v.getClass.getCanonicalName))
tôi nhận được:
java.lang.ClassCastException: [B cannot be cast to java.lang.String
Điều này cho thấy các dây đang thực sự trôi qua như mảng byte .
Nếu tôi chỉ đơn giản là cố gắng để chuyển đổi mảng này của byte thành chuỗi (Tôi biết tôi thậm chí không xác định mã hóa):
def doSomething(jdstream: JavaDStream[Array[Byte]]) = {
val dstream = jdstream.dstream
dstream.foreachRDD(rdd => {
rdd.foreach(bytes => println(new String(bytes)))
})
}
tôi nhận được một cái gì đó trông tương tự (ký tự đặc biệt có thể là tước):
�]qXfoo barqa.
Điều này cho thấy chuỗi Python đã được tuần tự hóa (được chọn?). Làm thế nào tôi có thể lấy một chuỗi Java thích hợp để thay thế?
Hoàn toàn rõ ràng và rất hữu ích. Cảm ơn! –
Tôi rất vui vì tôi có thể giúp đỡ. Tôi có thể phóng đại một chút ở đây. Nếu mục tiêu của bạn là xây dựng các phần mở rộng ngôn ngữ độc lập thì bạn thực sự không thể tránh tinkering với internals tuy nhiên các nhà phát triển đã đưa ra quyết định có ý thức ở đây và rối tung với điều đó không dành cho những người yếu tim. – zero323
Hi @ zero323 Tôi đang làm quá trình tương tự ở đây, nhưng có vấn đề lớn trong quá trình, tôi tạo ra một đối tượng để giao tiếp ứng dụng python của tôi với một kafka kerberized. Nhưng khi tôi tạo ra đối tượng, jvm của tia lửa không thể tìm thấy chức năng của tôi trong đối tượng. Nếu tôi tạo ra một lớp, nó sẽ tìm thấy lớp đó. Nhưng không thể gửi đối tượng rdd do lỗi: 'pyKafka ([org.apache.spark.api.java.JavaRDD, class java.lang.String]) không tồn tại' Tôi đang làm theo các bước. Điều gì có thể là worng? –