2015-10-13 18 views
6

Tôi đang gặp một thời gian khó khăn thực hiện một cái gì đó có vẻ như nó nên rất dễ dàng:Performing tra cứu/dịch trong một Spark RDD hoặc dữ liệu khung sử dụng một RDD/df

Mục tiêu của tôi là làm cho bản dịch trong một RDD/dataframe bằng RDD/dataframe thứ hai dưới dạng bảng tra cứu hoặc từ điển dịch. Tôi muốn thực hiện các bản dịch này trong nhiều cột.

Cách dễ nhất để giải thích sự cố là ví dụ. Hãy nói rằng tôi có như là đầu vào của tôi hai RDDs sau:

Route SourceCityID DestinationCityID 
A  1   2 
B  1   3 
C  2   1 

CityID CityName 
1  London 
2  Paris 
3  Tokyo 

đầu ra của tôi mong muốn RDD là:

Route SourceCity DestinationCity 
A  London  Paris 
B  London  Tokyo 
C  Paris  London 

Làm thế nào tôi nên đi về nó sản xuất nó?

Đây là một vấn đề dễ dàng trong SQL, nhưng tôi không biết các giải pháp rõ ràng với RDD trong Spark. Các phương thức tham gia, cogroup, v.v. dường như không phù hợp với các RDD nhiều cột và không cho phép chỉ định cột nào sẽ tham gia.

Bất kỳ ý tưởng nào? SQLContext có phải là câu trả lời không?

+0

Sử dụng Dataframe và SparkSQL sẽ giúp bạn những gì bạn đang tìm kiếm. Về cơ bản nó là sql với một cú pháp khác. – eliasah

+0

Kích thước của bảng/RDD là gì? Là CityID/CityName RDD nhiều lần nhỏ hơn RDD Route? Trong trường hợp đó tôi sẽ thu thập các kết quả của RDD như một bản đồ và phát sóng nó, để nó có thể là một tra cứu cục bộ trên mọi Công nhân. –

Trả lời

3

Cách RDD:

routes = sc.parallelize([("A", 1, 2),("B", 1, 3), ("C", 2, 1) ]) 
cities = sc.parallelize([(1, "London"),(2, "Paris"), (3, "Tokyo")]) 


print routes.map(lambda x: (x[1], (x[0], x[2]))).join(cities) \ 
.map(lambda x: (x[1][0][1], (x[1][0][0], x[1][1]))).join(cities). \ 
map(lambda x: (x[1][0][0], x[1][0][1], x[1][1])).collect() 

nào in:

[('C', 'Paris', 'London'), ('A', 'London', 'Paris'), ('B', 'London', 'Tokyo')] 

Và cách SQLContext:

from pyspark.sql import HiveContext 
from pyspark.sql import SQLContext 

df_routes = sqlContext.createDataFrame(\ 
routes, ["Route", "SourceCityID", "DestinationCityID"]) 
df_cities = sqlContext.createDataFrame(\ 
cities, ["CityID", "CityName"]) 

temp = df_routes.join(df_cities, df_routes.SourceCityID == df_cities.CityID) \ 
.select("Route", "DestinationCityID", "CityName") 
.withColumnRenamed("CityName", "SourceCity") 

print temp.join(df_cities, temp.DestinationCityID == df_cities.CityID) \ 
.select("Route", "SourceCity", "CityName") 
.withColumnRenamed("CityName", "DestinationCity").collect() 

nào in:

[Row(Route=u'C', SourceCity=u'Paris', DestinationCity=u'London'), 
Row(Route=u'A', SourceCity=u'London', DestinationCity=u'Paris'), 
Row(Route=u'B', SourceCity=u'London', DestinationCity=u'Tokyo')] 
3

Giả sử chúng ta có hai RDDs với các tuyến đường và các thành phố:

val routes = sc.parallelize(List(("A", 1, 2),("B", 1, 3),("C", 2, 1))) 
val citiesByIDRDD = sc.parallelize(List((1, "London"), (2, "Paris"), (3, "Tokyo"))) 

Có một số cách để thực hiện việc tra cứu các thành phố. Giả sử rằng việc tra cứu thành phố có chứa ít mục so với các tuyến đường chứa nhiều mục. Trong trường hợp đó, chúng ta hãy bắt đầu với việc thu thập các thành phố như một bản đồ được gửi bởi người lái xe cho mỗi nhiệm vụ.

val citiesByID = citiesByIDRDD.collectAsMap 

routes.map{r => (r._1, citiesByID(r._2), citiesByID(r._3))}.collect 
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London)) 

Để tránh gửi bảng tra cứu đến mọi công việc, nhưng chỉ một lần cho công nhân, bạn có thể mở rộng mã hiện có đang phát bản đồ tra cứu.

val bCitiesByID = sc.broadcast(citiesByID) 

routes.map{r => (r._1, bCitiesByID.value(r._2), bCitiesByID.value(r._3))}.collect 
=> Array[(String, String, String)] = Array((A,London,Paris), (B,London,Tokyo), (C,Paris,London)) 

Tôi không thấy sự cần thiết của khung dữ liệu ở đây, nhưng nếu bạn muốn, bạn có thể:

import sqlContext.implicits._ 

case class Route(id: String, from: Int, to: Int) 
case class City(id: Int, name: String) 

val cities = List(City(1, "London"), City(2, "Paris"), City(3, "Tokyo")) 
val routes = List(Route("A", 1, 2), Route("B", 1, 3), Route("C", 2, 1)) 

val citiesDf = cities.df 
citiesDf.registerTempTable("cities") 
val routesDf = routes.df 
citiesDf.registerTempTable("routes") 

routesDf.show 
+---+----+---+ 
| id|from| to| 
+---+----+---+ 
| A| 1| 2| 
| B| 1| 3| 
| C| 2| 1| 
+---+----+---+ 

citiesDf.show 
+---+------+ 
| id| name| 
+---+------+ 
| 1|London| 
| 2| Paris| 
| 3| Tokyo| 
+---+------+ 

Bạn nói rằng nó là một vấn đề dễ dàng trong SQL vì vậy tôi giả sử bạn có thể lấy nó từ đây.Thực hiện SQL đi như thế này:

sqlContext.sql ("SELECT COUNT(*) FROM routes") 
+0

Để sử dụng thu thập, bạn phải chắc chắn rằng tất cả dữ liệu sẽ khớp với nút chính –

+1

Có, đúng vậy. Mẫu của câu hỏi gợi ý cho giải pháp này bằng cách thu thập và phát sóng, nhưng điều này chỉ có ý nghĩa khi các bảng tra cứu thành phố tương đối nhỏ so với định tuyến và đủ nhỏ để vừa với bộ nhớ của trình điều khiển/bộ điều khiển. –

Các vấn đề liên quan