2014-12-17 19 views
11

Tôi đang sử dụng tia lửa và mông. Tôi có thể kết nối với Mongo sử dụng mã sau đây:Làm thế nào để truy vấn đến mongo bằng cách sử dụng tia lửa?

val sc = new SparkContext("local", "Hello from scala") 

val config = new Configuration() 
config.set("mongo.input.uri", "mongodb://127.0.0.1:27017/dbName.collectionName") 
val mongoRDD = sc.newAPIHadoopRDD(config, classOf[com.mongodb.hadoop.MongoInputFormat], classOf[Object], classOf[BSONObject]) 

trên mã mang lại cho tôi tất cả tài liệu từ bộ sưu tập.

Bây giờ tôi muốn áp dụng một số điều kiện về truy vấn.

Cho rằng tôi sử dụng

config.set("mongo.input.query","{customerId: 'some mongo id'}") 

này chỉ mất một điều kiện cùng một lúc. Tôi muốn thêm điều kiện nếu 'sử dụng'> 30

1) Làm cách nào để thêm nhiều điều kiện vào truy vấn mongo (bao gồm cả lớn hơn và nhỏ hơn) bằng cách sử dụng tia lửa và mongo ??

Ngoài ra tôi muốn lặp qua từng tài liệu về kết quả truy vấn bằng scala ??

2) Làm cách nào để lặp qua kết quả bằng scala ??

+0

một số phụ cờ ở đây: định dạng hadoop cho Mongo có tài nguyên vấn đề xử lý mà giữ kết nối mở. Đó là một sự kết hợp bùng nổ khi chúng tôi trộn lẫn nó với Spark. * Tránh * – maasg

+0

@maasg Có tùy chọn nào khác để kết nối mongo với tia lửa không ?? – Vishwas

Trả lời

10

Hi bạn có thể thử này:

Có một dự án tích hợp MongoDB với Spark

https://github.com/Stratio/deep-spark/tree/develop

1) làm một git clone

2) đi vào trong sâu tia lửa, sau đó để sâu-cha mẹ

3) mvn cài đặt

4) mở vỏ tia lửa với các tùy chọn này:

./spark-shell --jars YOUR_PATH/deep-core-0.7.0-SNAPSHOT.jar, YOUR_PATH/deep-commons-0.7.0-SNAPSHOT. jar, your_path/sâu MongoDB-0.7.0-SNAPSHOT.jar, your_path/Mongo-java-lái-2.12.4-sources.jar

nhớ để ghi đè lên "your_path" với đường dẫn thực

5) Thực hiện một ví dụ đơn giản trong vỏ tia lửa:

import com.stratio.deep.mongodb.config.MongoDeepJobConfig 
import com.stratio.deep.mongodb.extractor.MongoNativeDBObjectExtractor 
import com.stratio.deep.core.context.DeepSparkContext 
import com.mongodb.DBObject 
import org.apache.spark.rdd.RDD 
import com.mongodb.QueryBuilder 
import com.mongodb.BasicDBObject 

val host = "localhost:27017" 


val database = "test" 

val inputCollection = "input"; 

val deepContext: DeepSparkContext = new DeepSparkContext(sc) 

val inputConfigEntity: MongoDeepJobConfig[DBObject] = new MongoDeepJobConfig[DBObject](classOf[DBObject]) 


val query: QueryBuilder = QueryBuilder.start(); 

query.and("number").greaterThan(27).lessThan(30); 


inputConfigEntity.host(host).database(database).collection(inputCollection).filterQuery(query).setExtractorImplClass(classOf[MongoNativeDBObjectExtractor]) 


val inputRDDEntity: RDD[DBObject] = deepContext.createRDD(inputConfigEntity) 

Điều tốt nhất là bạn có thể sử dụng QueryBui lder Object để làm cho truy vấn của bạn

Ngoài ra bạn có thể vượt qua một DBObject như thế này:

{ "number" : { "$gt" : 27 , "$lt" : 30}} 

Nếu bạn muốn lặp bạn có thể sử dụng phương pháp yourRDD.collect(). Ngoài ra bạn có thể sử dụngRDD.foreach của bạn, nhưng bạn phải cung cấp một chức năng.

Có một cách khác để thêm lọ để làm sáng. Bạn có thể sửa đổi spark-env.sh và đưa dòng này ở cuối:

CONFDIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" 
for jar in $(ls $CONFDIR/../lib/*.jar); do 
    SPARK_CLASSPATH=$SPARK_CLASSPATH:${jar} 
done 

Bên trong thư mục lib bạn đặt thư viện của bạn và đó là tất cả.

Disclaimer: Tôi hiện đang làm việc trên Stratio

+0

Dự án này đã không được chấp nhận và không còn hoạt động. Câu trả lời này nên được loại bỏ. – rjurney

2

1) Để thêm điều kiện để truy vấn của bạn chỉ cần thêm chúng vào từ điển được cung cấp với 'mongo.input.query':

config.set("mongo.input.query","{customerId: 'some mongo id', usage: {'$gt': 30}") 

Để hiểu rõ hơn về cách truy vấn làm việc tham khảo:

http://docs.mongodb.org/manual/tutorial/query-documents/

http://docs.mongodb.org/getting-started/python/query/

2) Đối với iterating qua kết quả bạn có thể muốn có một cái nhìn châm ngòi phương pháp RDD 'thu thập', thông tin thêm trong liên kết này, chỉ cần tìm phương pháp thu thập:

http://spark.apache.org/docs/latest/api/scala/#org.apache.spark.rdd.RDD

Các vấn đề liên quan