2016-03-01 18 views
6

Tôi là người mới trên Spark (Phiên bản của tôi là 1.6.0) và bây giờ tôi đang cố giải quyết vấn đề được đưa ra bên dưới:Cách thực hiện thao tác "Tra cứu" trên các khung dữ liệu Spark cho nhiều điều kiện

Giả sử ở đó là hai tệp nguồn:

  • Chữ cái đầu tiên (viết tắt là A) chứa cột có tên A1, B1, C1 và 80 cột khác. Có 230K bản ghi bên trong.
  • Chữ cái thứ hai (viết tắt là B) là bảng tra cứu nhỏ chứa các cột có tên A2, B2, C2 và D2. Có 250 hồ sơ bên trong.

Bây giờ chúng ta cần phải chèn một cột mới vào A, cho logic dưới đây:

  • tra cứu đầu tiên A1, B1 và ​​C1 trong B (cột tương ứng là A2, B2 và C2), nếu thành công , trả lại D2 làm giá trị của cột được thêm mới. Nếu không tìm thấy gì ...
  • Sau đó tra cứu A1, B1 trong B. Nếu thành công, hãy trả lại D2. Nếu không tìm thấy gì ...
  • Thiết lập giá trị mặc định "NA"

Tôi đã đọc trong các tập tin và chuyển đổi chúng thành các khung dữ liệu. Đối với tình huống đầu tiên, tôi nhận được kết quả bằng cách bên ngoài bên trái tham gia cùng họ. Nhưng tôi không thể tìm ra cách tốt trong bước tiếp theo.

Cố gắng hiện tại của tôi là tạo khung dữ liệu mới bằng cách tham gia A và B sử dụng điều kiện ít nghiêm ngặt hơn. Tuy nhiên tôi không có đầu mối làm thế nào để cập nhật các khung dữ liệu hiện tại từ một trong những khác. Hay có cách nào trực quan và hiệu quả hơn để giải quyết toàn bộ vấn đề?

Cảm ơn tất cả các câu trả lời.

----------------------------- Cập nhật vào 20160309 -------------- ------------------

Cuối cùng chấp nhận câu trả lời của @mlk. Vẫn còn tuyệt vời nhờ @ zero323 cho ý kiến ​​tuyệt vời của mình trên UDF so với tham gia, việc tạo mã Vonfram thực sự là một vấn đề khác mà chúng ta đang phải đối mặt bây giờ. Nhưng kể từ khi chúng ta cần phải làm chục tra cứu và trung bình 4 điều kiện cho mỗi tra cứu, giải pháp trước đây là phù hợp hơn ...

Các giải pháp cuối cùng là bằng cách nào đó trông giống như bên dưới đoạn mã:

``` 
import sqlContext.implicits._ 
import com.github.marklister.collections.io._ 

case class TableType(A: String, B: String, C: String, D: String) 
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("...")) 
val lkupD = udf { 
    (aStr: String, bStr: String, cStr: String) => 
    tableBroadcast.value.find { 
     case TableType(a, b, c, _) => 
     (a == aStr && b == bStr && c == cStr) || 
     (a == aStr && b == bStr) 
    }.getOrElse(TableType("", "", "", "NA")).D 
} 
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C")) 
``` 

Trả lời

4

Như B là Tôi nghĩ cách tốt nhất để làm điều này sẽ là một biến phát sóng và hàm do người dùng định nghĩa.

// However you get the data... 
case class BType(A2: Int, B2: Int, C2 : Int, D2 : String) 
val B = Seq(BType(1,1,1,"B111"), BType(1,1,2, "B112"), BType(2,0,0, "B200")) 

val A = sc.parallelize(Seq((1,1,1, "DATA"), (1,1,2, "DATA"), (2, 0, 0, "DATA"), (2, 0, 1, "NONE"), (3, 0, 0, "NONE"))).toDF("A1", "B1", "C1", "OTHER") 


// Broadcast B so all nodes have a copy of it. 
val Bbradcast = sc.broadcast(B) 

// A user defined function to find the value for D2. This I'm sure could be improved by whacking it into maps. But this is a small example. 
val findD = udf {(a: Int, b : Int, c: Int) => Bbradcast.value.find(x => x.A2 == a && x.B2 == b && x.C2 == c).getOrElse(Bbradcast.value.find(x => x.A2 == a && x.B2 == b).getOrElse(BType(0,0,0,"NA"))).D2 } 

// Use the UDF in a select 
A.select($"A1", $"B1", $"C1", $"OTHER", findD($"A1", $"B1", $"C1").as("D")).show 
+1

Đây có thể là cách để thực hiện. Tôi cung cấp một giải pháp thay thế với 'tham gia' là tốt. – zero323

+0

Cảm ơn mlk. Nếu bảng tra cứu là lớn (500K * 50), vẫn còn tốt để phát sóng nó? –

+0

Và một câu hỏi khác của tôi là, giả sử tôi cần thực hiện 30 tra cứu trên các cột khác nhau và viết 50 UDF, hiệu suất có bị ảnh hưởng không? –

2

Chỉ cần để tham khảo một giải pháp mà không UDFs:

val b1 = broadcast(b.toDF("A2_1", "B2_1", "C2_1", "D_1")) 
val b2 = broadcast(b.toDF("A2_2", "B2_2", "C2_2", "D_2")) 

// Match A, B and C 
val expr1 = ($"A1" === $"A2_1") && ($"B1" === $"B2_1") && ($"C1" === $"C2_1") 
// Match A and B mismatch C 
val expr2 = ($"A1" === $"A2_2") && ($"B1" === $"B2_2") && ($"C1" !== $"C2_2") 

val toDrop = b1.columns ++ b2.columns 

toDrop.foldLeft(a 
    .join(b1, expr1, "leftouter") 
    .join(b2, expr2, "leftouter") 
    // If there is match on A, B, C then D_1 should be not NULL 
    // otherwise we fall-back to D_2 
    .withColumn("D", coalesce($"D_1", $"D_2")) 
)((df, c) => df.drop(c)) 

này giả định có ít nhất một trận đấu trong mỗi thể loại (cả ba cột, hoặc hai người đầu tiên) hoặc lặp lại hàng trong đầu ra là mong muốn.

UDF vs THAM GIA:

Có nhiều yếu tố để xem xét và không có câu trả lời đơn giản ở đây:

Nhược điểm:

  • phát sóng joins yêu cầu thông qua dữ liệu hai lần đến nút công nhân. Hiện tại, broadcasted bảng không được lưu trong bộ nhớ cache (SPARK-3863) và không có khả năng thay đổi trong tương lai gần nhất (Độ phân giải: Sau).
  • join hoạt động được áp dụng hai lần ngay cả khi có kết quả khớp đầy đủ.

Ưu:

  • joincoalesce là trong suốt đối với tôi ưu hoa khi UDFs thì không.
  • hoạt động trực tiếp với các biểu thức SQL có thể được hưởng lợi từ tất cả các tối ưu hóa Tungsten bao gồm tạo mã trong khi UDF không thể.
Các vấn đề liên quan