2016-03-09 22 views
5

Tôi tạo RDD từ danh sách các url và sau đó thử tìm nạp dữ liệu với một số cuộc gọi http không đồng bộ. Tôi cần tất cả các kết quả trước khi thực hiện các phép tính khác. Lý tưởng nhất, tôi cần phải thực hiện cuộc gọi http trên các nút khác nhau để cân nhắc mở rộng quy mô.Công việc Spark với cuộc gọi HTTP Async

tôi đã làm một cái gì đó như thế này:

//init spark 
val sparkContext = new SparkContext(conf) 
val datas = Seq[String]("url1", "url2") 

//create rdd 
val rdd = sparkContext.parallelize[String](datas) 

//httpCall return Future[String] 
val requests = rdd.map((url: String) => httpCall(url)) 

//await all results (Future.sequence may be better) 
val responses = requests.map(r => Await.result(r, 10.seconds)) 

//print responses 
response.collect().foreach((s: String) => println(s)) 

//stop spark 
sparkContext.stop() 

Công việc này, nhưng Spark công việc không bao giờ kết thúc!

Vì vậy, tôi tự hỏi thực tiễn tốt nhất để xử lý tương lai bằng Spark (hoặc tương lai [RDD]) là gì.

Tôi nghĩ trường hợp sử dụng này có vẻ khá phổ biến nhưng chưa tìm thấy câu trả lời nào.

Trân trọng

Trả lời

4

trường hợp sử dụng điều này có vẻ khá phổ biến

Không thực sự, vì nó đơn giản không làm việc như bạn (có lẽ) mong đợi. Vì mỗi tác vụ hoạt động trên chuẩn Scala Iterators, các hoạt động này sẽ bị đè bẹp với nhau. Nó có nghĩa là tất cả các hoạt động sẽ bị chặn trong thực tế. Giả sử bạn có ba URL ["x", "y", "z"], mã của bạn sẽ được thực hiện theo thứ tự sau:

Await.result(httpCall("x", 10.seconds)) 
Await.result(httpCall("y", 10.seconds)) 
Await.result(httpCall("z", 10.seconds)) 

Bạn có thể dễ dàng tái tạo cùng một hành vi tại địa phương. Nếu bạn muốn thực thi mã không đồng bộ, bạn nên xử lý điều này một cách rõ ràng bằng cách sử dụng mapPartitions:

rdd.mapPartitions(iter => { 
    ??? // Submit requests 
    ??? // Wait until all requests completed and return Iterator of results 
}) 

nhưng điều này tương đối phức tạp. Không có đảm bảo tất cả dữ liệu cho một phân vùng nhất định phù hợp với bộ nhớ, do đó bạn có thể sẽ cần một số cơ chế trộn.

Tất cả những gì được nói là tôi không thể tái tạo sự cố mà bạn đã mô tả là có thể là một số vấn đề về cấu hình hoặc một vấn đề với chính mình là httpCall.

Trên ghi chú bên cho phép một lần hết thời gian chờ để tiêu diệt toàn bộ tác vụ không giống như một ý tưởng hay.

1

Điều này sẽ không hoạt động.

Bạn không thể mong đợi các đối tượng yêu cầu được phân phối và các câu trả lời được thu thập trên một cụm bởi các nút khác. Nếu bạn làm vậy thì tia lửa gọi cho tương lai sẽ không bao giờ kết thúc. Tương lai sẽ không bao giờ làm việc trong trường hợp này.

Nếu bản đồ của bạn() thực hiện yêu cầu đồng bộ (http) thì vui lòng thu thập các phản hồi trong cùng một cuộc gọi hành động/chuyển đổi và sau đó chịu kết quả (phản hồi) để tiếp tục lập bản đồ/giảm/các cuộc gọi khác.

Trong trường hợp của bạn, vui lòng viết lại logic thu thập câu trả lời cho từng cuộc gọi đồng bộ và xóa khái niệm tương lai thì tất cả sẽ ổn.

+0

Sự cố không nên có chuyển động dữ liệu giữa 'yêu cầu' và' phản hồi' để cả hai phép biến đổi phải được thực hiện trong cùng một giai đoạn, do đó cùng các thực thi và ngữ cảnh. – zero323

1

Cuối cùng tôi đã sử dụng nó bằng scalaj-http thay vì Công văn. Cuộc gọi đồng bộ, nhưng điều này khớp với trường hợp sử dụng của tôi.

Tôi nghĩ rằng Công việc Spark không bao giờ kết thúc bằng cách sử dụng Công văn vì kết nối Http không được đóng đúng cách.

Trân trọng

1

Tôi không thể tìm thấy cách dễ dàng để đạt được điều này. Nhưng sau một vài lần thử lại, đây là những gì tôi đã làm và nó làm việc cho một danh sách rất lớn các truy vấn. Về cơ bản, chúng tôi đã sử dụng điều này để thực hiện thao tác hàng loạt cho một truy vấn lớn vào nhiều truy vấn phụ.

// Break down your huge workload into smaller chunks, in this case huge query string is broken 
// down to a small set of subqueries 
// Here if needed to optimize further down, you can provide an optimal partition when parallelizing 
val queries = sqlContext.sparkContext.parallelize[String](subQueryList.toSeq) 

// Then map each one those to a Spark Task, in this case its a Future that returns a string 
val tasks: RDD[Future[String]] = queries.map(query => { 
    val task = makeHttpCall(query) // Method returns http call response as a Future[String] 
    task.recover { 
     case ex => logger.error("recover: " + ex.printStackTrace()) } 
    task onFailure { 
     case t => logger.error("execution failed: " + t.getMessage) } 
    task 
}) 

// Note:: Http call is still not invoked, you are including this as part of the lineage 

// Then in each partition you combine all Futures (means there could be several tasks in each partition) and sequence it 
// And Await for the result, in this way you making it to block untill all the future in that sequence is resolved 

val contentRdd = tasks.mapPartitions[String] { f: Iterator[Future[String]] => 
    val searchFuture: Future[Iterator[String]] = Future sequence f 
    Await.result(searchFuture, threadWaitTime.seconds) 
} 

// Note: At this point, you can do any transformations on this rdd and it will be appended to the lineage. 
// When you perform any action on that Rdd, then at that point, 
// those mapPartition process will be evaluated to find the tasks and the subqueries to perform a full parallel http requests and 
// collect those data in a single rdd. 

Nếu bạn không muốn thực hiện bất kỳ chuyển đổi về nội dung như phân tích các tải trọng phản ứng, vv Sau đó, bạn có thể sử dụng foreachPartition thay vì mapPartitions để thực hiện tất cả những http gọi ngay lập tức.

Các vấn đề liên quan