2016-01-19 17 views
5

Tôi đã tích hợp ELK với Pyspark.Cách ghi dữ liệu trong Elasticsearch từ Pyspark?

lưu RDD như dữ liệu ELK trên hệ thống tập tin địa phương

rdd.saveAsTextFile("/tmp/ELKdata") 
logData = sc.textFile('/tmp/ELKdata/*') 
errors = logData.filter(lambda line: "raw1-VirtualBox" in line) 
errors.count() 

giá trị tôi đã nhận là 35

errors.first() 

tôi đã nhận ra

(u'AVI0UK0KZsowGuTwoQnN', {u 'host': u'raw1-VirtualBox ', u'ident': u'NetworkManager ', u'pid': u'748 ', u'message': u '(eth0): thay đổi trạng thái thiết bị: ip-config - > secondaries (lý do 'none') [70 90 0] ", u '@ timestamp': u'2016-01-12T10: 59: 48 + 05: 30 '})

khi tôi cố gắng ghi dữ liệu trong tìm kiếm đàn hồi từ pyspark i get lỗi

errors.saveAsNewAPIHadoopFile(
    path='-', 
    outputFormatClass="org.elasticsearch.hadoop.mr.EsOutputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf= {"es.resource" : "logstash-2016.01.12/errors}) 

lớn lỗi java

 

org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used 
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) 
    at org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) 
    at org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) 
    at org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
    at org.apache.spark.scheduler.Task.run(Task.scala:54) 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 INFO TaskSetManager: Starting task 1.0 in stage 31.0 (TID 62, localhost, PROCESS_LOCAL, 1181 bytes) 
16/01/12 17:20:13 WARN TaskSetManager: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) 
     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
     org.apache.spark.scheduler.Task.run(Task.scala:54) 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 ERROR TaskSetManager: Task 0 in stage 31.0 failed 1 times; aborting job 
16/01/12 17:20:13 INFO TaskSchedulerImpl: Cancelling stage 31 
16/01/12 17:20:13 INFO TaskSchedulerImpl: Stage 31 was cancelled 
16/01/12 17:20:13 INFO Executor: Executor is trying to kill task 1.0 in stage 31.0 (TID 62) 
16/01/12 17:20:13 INFO DAGScheduler: Failed to run saveAsNewAPIHadoopFile at PythonRDD.scala:665 
Traceback (most recent call last): 
    File "", line 6, in 
    File "/opt/spark/python/pyspark/rdd.py", line 1213, in saveAsNewAPIHadoopFile 
    keyConverter, valueConverter, jconf) 
    File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/java_gateway.py", line 538, in __call__ 
    File "/opt/spark/python/lib/py4j-0.8.2.1-src.zip/py4j/protocol.py", line 300, in get_return_value 
py4j.protocol.Py4JJavaError16/01/12 17:20:13 INFO Executor: Running task 1.0 in stage 31.0 (TID 62) 
16/01/12 17:20:13 ERROR Executor: Exception in task 1.0 in stage 31.0 (TID 62) 
org.apache.spark.TaskKilledException 
    at org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) 
    at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
    at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
    at java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 WARN TaskSetManager: Lost task 1.0 in stage 31.0 (TID 62, localhost): org.apache.spark.TaskKilledException: 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:168) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
16/01/12 17:20:13 INFO TaskSchedulerImpl: Removed TaskSet 31.0, whose tasks have all completed, from pool 
: An error occurred while calling z:org.apache.spark.api.python.PythonRDD.saveAsNewAPIHadoopFile. 
: org.apache.spark.SparkException: Job aborted due to stage failure: Task 0 in stage 31.0 failed 1 times, most recent failure: Lost task 0.0 in stage 31.0 (TID 61, localhost): org.apache.spark.SparkException: RDD element of type java.lang.String cannot be used 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:113) 
     org.apache.spark.api.python.SerDeUtil$$anonfun$pythonToPairRDD$1$$anonfun$apply$3.apply(SerDeUtil.scala:108) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     scala.collection.Iterator$$anon$11.next(Iterator.scala:328) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:921) 
     org.apache.spark.rdd.PairRDDFunctions$$anonfun$12.apply(PairRDDFunctions.scala:903) 
     org.apache.spark.scheduler.ResultTask.runTask(ResultTask.scala:62) 
     org.apache.spark.scheduler.Task.run(Task.scala:54) 
     org.apache.spark.executor.Executor$TaskRunner.run(Executor.scala:177) 
     java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145) 
     java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615) 
     java.lang.Thread.run(Thread.java:745) 
Driver stacktrace: 
    at org.apache.spark.scheduler.DAGScheduler.org$apache$spark$scheduler$DAGScheduler$$failJobAndIndependentStages(DAGScheduler.scala:1185) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1174) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$abortStage$1.apply(DAGScheduler.scala:1173) 
    at scala.collection.mutable.ResizableArray$class.foreach(ResizableArray.scala:59) 
    at scala.collection.mutable.ArrayBuffer.foreach(ArrayBuffer.scala:47) 
    at org.apache.spark.scheduler.DAGScheduler.abortStage(DAGScheduler.scala:1173) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGScheduler$$anonfun$handleTaskSetFailed$1.apply(DAGScheduler.scala:688) 
    at scala.Option.foreach(Option.scala:236) 
    at org.apache.spark.scheduler.DAGScheduler.handleTaskSetFailed(DAGScheduler.scala:688) 
    at org.apache.spark.scheduler.DAGSchedulerEventProcessActor$$anonfun$receive$2.applyOrElse(DAGScheduler.scala:1391) 
    at akka.actor.ActorCell.receiveMessage(ActorCell.scala:498) 
    at akka.actor.ActorCell.invoke(ActorCell.scala:456) 
    at akka.dispatch.Mailbox.processMailbox(Mailbox.scala:237) 
    at akka.dispatch.Mailbox.run(Mailbox.scala:219) 
    at akka.dispatch.ForkJoinExecutorConfigurator$AkkaForkJoinTask.exec(AbstractDispatcher.scala:386) 
    at scala.concurrent.forkjoin.ForkJoinTask.doExec(ForkJoinTask.java:260) 
    at scala.concurrent.forkjoin.ForkJoinPool$WorkQueue.runTask(ForkJoinPool.java:1339) 
    at scala.concurrent.forkjoin.ForkJoinPool.runWorker(ForkJoinPool.java:1979) 
    at scala.concurrent.forkjoin.ForkJoinWorkerThread.run(ForkJoinWorkerThread.java:107) 


nếu tôi đã làm nó bằng tay vẫn có thể ghi dữ liệu

errors = logData.filter(lambda line: "raw1-VirtualBox" in line) 
errors = errors.map(lambda item: ('AVI0UK0KZsowGuTwoQnP',{"host": "raw1-VirtualBox", 
    "ident": "NetworkManager", 
    "pid": "69", 
    "message": " sucess <info> (eth0): device state change: ip-config -> secondaries (reason 'none') [70 90 0]", 
    "@timestamp": "2016-01-12T10:59:48+05:30" 
    })) 

nhưng tôi muốn ghi dữ liệu được lọc & dữ liệu được quản lý trong tìm kiếm đàn hồi.

+0

sáng sau này [link] (http: //help.mortardata.com/technologies/spark/load_and_transform_data) – pyspark

Trả lời

4

Tôi đã gặp sự cố tương tự và dưới đây là cách tôi đã giải quyết được vấn đề. Đầu tiên tôi sử dụng một dataframe vs sử dụng RDD.

Once in a dataframe

from pyspark.sql import SQLContext 
df.write.format("org.elasticsearch.spark.sql").option("es.resource", "logstash-2016.01.12/errors").save() 
+1

Bạn cung cấp địa chỉ IP hoặc dns của nút ES ở đâu? – Somar

0

Tương tự như câu trả lời chấp nhận ngay bây giờ, tôi đang ở trong cùng một thuyền, cố gắng để viết ra các dữ liệu như một RDD. Câu trả lời ở trên thực sự gần gũi, nhưng có rất nhiều tùy chọn cấu hình cũng sẽ hữu ích. Trừ khi bạn đang sử dụng mặc định localhost cho nút của mình, câu trả lời này sẽ không hoạt động.

Một khung dữ liệu là con đường để đi, sạch hơn, đơn giản hơn nhiều. Nếu bạn đang sử dụng vỏ pyspark, khi bạn bắt đầu trình bao, thêm một đường dẫn đến jar hadoop elasticsearch.

Từ cli bắt đầu vỏ sử dụng:

$ pyspark2 --jars <pathtojar>/elasticsearch-hadoop-5.X.X.jar 

Bạn không nhất thiết cần dòng sau:

from pyspark.sql import SQLContext 

Khi bạn có dataframe của bạn, bạn chỉ cần những điều sau đây, cộng với khả năng thêm tùy chọn:

df.write.format("org.elasticsearch.spark.sql") 
.option("es.resource", "<index/type>") 
.option("es.nodes", "<enter node address or name>").save() 

Nếu chỉ mục/loại bạn chỉ định chưa tồn tại trong Elasticsearch, nó sẽ được tạo ra.

Bạn có thể thêm tùy chọn bổ sung, có thể được tìm thấy ở đây: https://www.elastic.co/guide/en/elasticsearch/hadoop/current/configuration.html

Các vấn đề liên quan