2016-07-02 20 views
5

Tài liệu của Elasticsaerch chỉ bao gồm việc tải một chỉ mục hoàn chỉnh tới Spark.Cách truy vấn chỉ mục Elasticsearch bằng cách sử dụng Pyspark và Dataframes

from pyspark.sql import SQLContext 
sqlContext = SQLContext(sc) 
df = sqlContext.read.format("org.elasticsearch.spark.sql").load("index/type") 
df.printSchema() 

Làm cách nào bạn có thể thực hiện truy vấn trả về dữ liệu từ chỉ mục Elasticsearch và tải chúng vào Spark dưới dạng DataFrame bằng pyspark?

Trả lời

4

Dưới đây là cách tôi thực hiện.

thiết lập môi trường chung và lệnh:

export SPARK_HOME=/home/ezerkar/spark-1.6.0-bin-hadoop2.6 
export PYSPARK_DRIVER_PYTHON=ipython2 

./spark-1.6.0-bin-hadoop2.6/bin/pyspark --driver-class-path=/home/eyald/spark-1.6.0-bin-hadoop2.6/lib/elasticsearch-hadoop-2.3.1.jar 

Code:

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

conf = SparkConf().setAppName("ESTest") 
sc = SparkContext(conf=conf) 
sqlContext = SQLContext(sc) 

q ="""{ 
    "query": { 
    "filtered": { 
     "filter": { 
     "exists": { 
      "field": "label" 
     } 
     }, 
     "query": { 
     "match_all": {} 
     } 
    } 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "titanic/passenger", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

Bạn cũng có thể xác định các cột dữ liệu-frame. Tham khảo Here để biết thêm thông tin.

Hy vọng rằng điều đó sẽ hữu ích!

+0

Đó là những gì tôi đã làm ngay bây giờ, tôi đã hy vọng có một cách để trực tiếp lấy một DataFrame lọc –

+1

Tôi không chắc chắn rằng nó có thể với API mới nhất của kết nối ES-Hadoop Spark. –

+1

Có cách nào để viết một khung dữ liệu để elasticsearch sử dụng API này không? –

0

Tôi đang chạy mã của mình trong cụm EMR từ Amazon bằng pyspark. Sau đó, cách tôi đã làm cho nó làm việc được làm theo các bước sau:

1) Đặt hành động bootstrap này trong việc tạo ra cụm (để tạo máy chủ elasticsearch localhost):

s3://awssupportdatasvcs.com/bootstrap-actions/elasticsearch/elasticsearch_install.4.0.0.rb 

2) Tôi chạy các lệnh này để cư cơ sở dữ liệu elasticsearch với một số dữ liệu:

curl -XPUT "http://localhost:9200/movies/movie/1" -d' { 
    "title": "The Godfather", 
    "director": "Francis Ford Coppola", 
    "year": 1972 
    }' 

bạn cũng có thể chạy lệnh curl khác nếu bạn muốn, như:

curl -XGET http://localhost:9200/_search?pretty=true&q={'matchAll':{''}} 

3) Tôi inited pyspark sử dụng các thông số sau:

pyspark --driver-memory 5G --executor-memory 10G --executor-cores 2 --jars=elasticsearch-hadoop-5.5.1.jar 

Tôi đã tải client elasticsearch python trước

4) Tôi chạy đoạn mã sau:

from pyspark import SparkConf 
from pyspark.sql import SQLContext 

q ="""{ 
    "query": { 
    "match_all": {} 
    } 
}""" 

es_read_conf = { 
    "es.nodes" : "localhost", 
    "es.port" : "9200", 
    "es.resource" : "movies/movie", 
    "es.query" : q 
} 

es_rdd = sc.newAPIHadoopRDD(
    inputFormatClass="org.elasticsearch.hadoop.mr.EsInputFormat", 
    keyClass="org.apache.hadoop.io.NullWritable", 
    valueClass="org.elasticsearch.hadoop.mr.LinkedMapWritable", 
    conf=es_read_conf) 

sqlContext.createDataFrame(es_rdd).collect() 

Sau đó, cuối cùng tôi nhận được kết quả thành công từ lệnh.

0

Tôi đã gặp sự cố tương tự như vậy để nhận dữ liệu được lọc theo địa lý vào một DataFrame PySpark. Tôi đang sử dụng elasticsearch-spark-20_2.11-5.2.2.jar với phiên bản Spark 2.1.1 và ES phiên bản 5.2. Tôi đã có thể nạp dữ liệu vào một DataFrame bằng cách xác định truy vấn của tôi như là một tùy chọn trong khi tạo ra các DataFrame

My geo-query

q ="""{ 
    "query": { 
     "bool" : { 
      "must" : { 
       "match_all" : {} 
      }, 
      "filter" : { 
       "geo_distance" : { 
        "distance" : "100km", 
        "location" : { 
         "lat" : 35.825, 
         "lon" : -87.99 
        } 
       } 
      } 
     } 
    } 
}""" 

tôi đã sử dụng lệnh sau để tải dữ liệu vào DataFrame

spark_df = spark.read.format("es").option("es.query", q).load("index_name") 
Các vấn đề liên quan