Tôi là người mới trên Spark (Phiên bản của tôi là 1.6.0) và bây giờ tôi đang cố giải quyết vấn đề được đưa ra bên dưới:Cách thực hiện thao tác "Tra cứu" trên các khung dữ liệu Spark cho nhiều điều kiện
Giả sử ở đó là hai tệp nguồn:
- Chữ cái đầu tiên (viết tắt là A) chứa cột có tên A1, B1, C1 và 80 cột khác. Có 230K bản ghi bên trong.
- Chữ cái thứ hai (viết tắt là B) là bảng tra cứu nhỏ chứa các cột có tên A2, B2, C2 và D2. Có 250 hồ sơ bên trong.
Bây giờ chúng ta cần phải chèn một cột mới vào A, cho logic dưới đây:
- tra cứu đầu tiên A1, B1 và C1 trong B (cột tương ứng là A2, B2 và C2), nếu thành công , trả lại D2 làm giá trị của cột được thêm mới. Nếu không tìm thấy gì ...
- Sau đó tra cứu A1, B1 trong B. Nếu thành công, hãy trả lại D2. Nếu không tìm thấy gì ...
- Thiết lập giá trị mặc định "NA"
Tôi đã đọc trong các tập tin và chuyển đổi chúng thành các khung dữ liệu. Đối với tình huống đầu tiên, tôi nhận được kết quả bằng cách bên ngoài bên trái tham gia cùng họ. Nhưng tôi không thể tìm ra cách tốt trong bước tiếp theo.
Cố gắng hiện tại của tôi là tạo khung dữ liệu mới bằng cách tham gia A và B sử dụng điều kiện ít nghiêm ngặt hơn. Tuy nhiên tôi không có đầu mối làm thế nào để cập nhật các khung dữ liệu hiện tại từ một trong những khác. Hay có cách nào trực quan và hiệu quả hơn để giải quyết toàn bộ vấn đề?
Cảm ơn tất cả các câu trả lời.
----------------------------- Cập nhật vào 20160309 -------------- ------------------
Cuối cùng chấp nhận câu trả lời của @mlk. Vẫn còn tuyệt vời nhờ @ zero323 cho ý kiến tuyệt vời của mình trên UDF so với tham gia, việc tạo mã Vonfram thực sự là một vấn đề khác mà chúng ta đang phải đối mặt bây giờ. Nhưng kể từ khi chúng ta cần phải làm chục tra cứu và trung bình 4 điều kiện cho mỗi tra cứu, giải pháp trước đây là phù hợp hơn ...
Các giải pháp cuối cùng là bằng cách nào đó trông giống như bên dưới đoạn mã:
```
import sqlContext.implicits._
import com.github.marklister.collections.io._
case class TableType(A: String, B: String, C: String, D: String)
val tableBroadcast = sparkContext.broadcast(CsvParser(TableType).parseFile("..."))
val lkupD = udf {
(aStr: String, bStr: String, cStr: String) =>
tableBroadcast.value.find {
case TableType(a, b, c, _) =>
(a == aStr && b == bStr && c == cStr) ||
(a == aStr && b == bStr)
}.getOrElse(TableType("", "", "", "NA")).D
}
df = df.withColumn("NEW_COL", lkupD($"A", $"B", $"C"))
```
Đây có thể là cách để thực hiện. Tôi cung cấp một giải pháp thay thế với 'tham gia' là tốt. – zero323
Cảm ơn mlk. Nếu bảng tra cứu là lớn (500K * 50), vẫn còn tốt để phát sóng nó? –
Và một câu hỏi khác của tôi là, giả sử tôi cần thực hiện 30 tra cứu trên các cột khác nhau và viết 50 UDF, hiệu suất có bị ảnh hưởng không? –