2016-07-12 17 views
9

Có cách nào (hoặc bất kỳ gói nào) để có thể biến các bộ sưu tập phân phối Spark (RDD s, Dataframe hoặc Dataset s) trực tiếp thành Broadcast biến mà không cần collect? API công khai dường như không có bất kỳ thứ gì "ngoài hộp", nhưng có thể làm gì đó ở cấp độ thấp hơn?Làm thế nào để chuyển đổi RDD, Dataframe hoặc Dataset thẳng đến một biến Broadcast mà không cần thu thập?

Tôi có thể tưởng tượng có một số tiềm năng tăng tốc 2x (hoặc nhiều hơn?) Cho các loại hoạt động này. Để giải thích chi tiết tôi muốn nói chi tiết, hãy làm việc qua ví dụ:

val myUberMap: Broadcast[Map[String, String]] = 
    sc.broadcast(myStringPairRdd.collect().toMap) 

someOtherRdd.map(someCodeUsingTheUberMap) 

Điều này làm cho tất cả dữ liệu được thu thập cho người lái xe, sau đó dữ liệu được phát sóng. Điều này có nghĩa là dữ liệu được gửi qua mạng cơ bản hai lần.

Điều gì sẽ là tốt đẹp là một cái gì đó như thế này:

val myUberMap: Broadcast[Map[String, String]] = 
    myStringPairRdd.toBroadcast((a: Array[(String, String)]) => a.toMap) 

someOtherRdd.map(someCodeUsingTheUberMap) 

đây Spark có thể bỏ qua việc thu thập các dữ liệu hoàn toàn và chỉ cần di chuyển dữ liệu giữa các nút.

THƯỞNG

Bên cạnh đó, có thể có một API monoid-like (hơi giống combineByKey) cho các tình huống nơi .toMap hoặc bất cứ hoạt động trên Array[T] là tốn kém, nhưng có thể có thể được thực hiện song song. Ví dụ. xây dựng cấu trúc Trie nhất định có thể tốn kém, loại chức năng này có thể dẫn đến phạm vi tuyệt vời cho thiết kế thuật toán. Hoạt động CPU này cũng có thể được chạy trong khi IO đang chạy quá - trong khi cơ chế phát sóng hiện tại đang chặn (tức là tất cả IO, sau đó tất cả CPU, sau đó tất cả IO một lần nữa).

Làm rõ

Gia nhập là không (chính) sử dụng trường hợp ở đây, nó có thể được giả định rằng tôi thưa thớt sử dụng cấu trúc dữ liệu phát sóng. Ví dụ: các phím trong someOtherRdd không có nghĩa là bao gồm các phím trong myUberMap nhưng tôi không biết phím nào tôi cần cho đến khi tôi đi qua someOtherRdd VÀ giả sử tôi sử dụng myUberMap nhiều lần.

Tôi biết rằng tất cả âm thanh hơi mơ hồ, nhưng vấn đề là thiết kế thuật toán học máy tổng quát hơn.

Trả lời

6

Mặc dù về mặt lý thuyết đây là một ý tưởng thú vị nhưng tôi cho rằng mặc dù về mặt lý thuyết có thể nó có các ứng dụng thực tế rất hạn chế. Rõ ràng là tôi không thể nói cho PMC vì vậy tôi không thể nói nếu có bất kỳ kế hoạch để thực hiện loại hình này của cơ chế phát sóng ở tất cả.

có thể thực hiện:

Từ Spark đã cung cấp torrent broadcasting cơ chế mà hành vi được mô tả như sau:

Người tài xế chia đối tượng serialized thành khối nhỏ và cửa hàng những khối trong BlockManager của người lái xe.

Trên mỗi người thi hành, người thực thi đầu tiên tìm cách lấy đối tượng từ số BlockManager. Nếu nó không tồn tại, nó sẽ sử dụng các lần tìm nạp từ xa để tìm các khối nhỏ từ trình điều khiển và/hoặc các trình thực thi khác nếu có.

Sau khi đã tải các khối, nó sẽ đặt các khối theo số BlockManager của riêng mình, sẵn sàng cho những người thực thi khác tìm nạp.

có thể tái sử dụng cùng một cơ chế để phát trực tiếp nút-tới-nút trực tiếp.

Cần lưu ý rằng cách tiếp cận này không thể loại bỏ hoàn toàn việc truyền thông của trình điều khiển. Mặc dù các khối có thể được tạo cục bộ, bạn vẫn cần một nguồn chân lý duy nhất để quảng cáo một tập hợp các khối để tìm nạp.

ứng TNHH

Một vấn đề với các biến số phát sóng là có khá tốn kém. Ngay cả khi bạn có thể loại bỏ tắc nghẽn trình điều khiển hai vấn đề vẫn còn:

  • Bộ nhớ cần thiết để lưu trữ đối tượng deserialized trên mỗi người thi hành.
  • Chi phí chuyển dữ liệu được phát sóng tới mọi người thi hành.

Vấn đề đầu tiên phải tương đối rõ ràng. Nó không chỉ là về việc sử dụng bộ nhớ trực tiếp mà còn về chi phí GC và ảnh hưởng của nó đến độ trễ tổng thể. Thứ hai là khá tinh tế. Tôi đã bảo vệ một phần điều này trong câu trả lời của tôi cho Why my BroadcastHashJoin is slower than ShuffledHashJoin in Spark nhưng hãy thảo luận thêm điều này.

Từ góc độ lưu lượng truy cập mạng phát sóng toàn bộ tập dữ liệu tương đương với việc tạo ra sản phẩm Descartes. Vì vậy, nếu tập dữ liệu đủ lớn để người lái xe trở thành nút cổ chai thì có vẻ như không phải là ứng cử viên tốt cho phương pháp phát sóng và nhắm mục tiêu như tham gia băm có thể được ưu tiên trong thực tế.

Alternatives:

Có một số phương pháp có thể được sử dụng để đạt được kết quả tương tự như các vấn đề phát sóng và địa chỉ trực tiếp được liệt kê ở trên bao gồm:

  • Passing dữ liệu thông qua hệ thống tập tin phân tán.
  • Sử dụng cơ sở dữ liệu được nhân rộng được thu thập với nút công nhân.
+0

Tôi đã đưa ra một ưu tiên cho việc chỉ ra thực sự cho việc tham gia thường xuyên tham gia ngẫu nhiên thường tốt hơn. Nhưng không chấp nhận vì trường hợp sử dụng của tôi là tổng quát hơn thế - tôi đã không nói rằng tôi chỉ muốn tham gia những người tham gia đơn lẻ thường xuyên. Tôi đoán vì đó là điều mà 99% người làm với các chương trình phát sóng đó là một giả định công bằng. Tôi đã cập nhật OP của mình để rõ ràng hơn. Cảm ơn. – samthebest

+0

Ồ, tôi hiểu rồi. Thing là miễn là chúng ta không sử dụng cấu trúc off-heap GC sẽ ăn chúng ta sống nhanh hơn nhiều so với lưu lượng mạng. Hoặc ít nhất đây là những gì tôi đã nhìn thấy cho đến nay. Và nếu chúng ta bắt đầu điều chỉnh cho những vật thể rất lớn thì chúng ta sẽ đạt được hiệu suất cao hơn. Vì vậy, ứng dụng duy nhất tôi có thể nghĩ ra là các đối tượng nhỏ và điều chỉnh để xử lý gần đúng thời gian thực. Nhưng không phát trực tuyến bởi vì chúng ta không thể phá hủy và tái phát thanh lịch. – zero323

+0

Tôi nghĩ rằng sẽ hữu ích khi có thể nhanh chóng phát trực tiếp dữ liệu phát sóng từ những người thực thi. Nó sẽ cung cấp sự linh hoạt. Ví dụ: nếu biến phát sóng là kết quả của việc truy vấn nguồn dữ liệu jdbc bên ngoài rất chậm. Người ta có thể sử dụng khả năng tải song song của một cụm tia lửa để khởi tạo nhiều truy vấn trên máy chủ nguồn, có thể chạy trong các luồng song song. – ThatDataGuy

0

Tôi không biết nếu chúng ta có thể làm điều đó cho RDD nhưng bạn có thể làm điều đó cho Dataframe

import org.apache.spark.sql.functions 

val df:DataFrame = your_data_frame 

val broadcasted_df = functions.broadcast(df) 

bây giờ bạn có thể sử dụng biến broadcasted_df và nó sẽ được phát sóng để thi hành di chúc.

Đảm bảo broadcasted_df dataframe không quá lớn và có thể được gửi tới người thực thi.

broadcasted_df sẽ phát thanh viên trong các hoạt động như ví dụ

other_df.join(broadcasted_df) 

và trong trường hợp này join() hoạt động thực thi nhanh hơn vì mỗi người thi hành có 1 phân vùng của other_df và toàn broadcasted_df

Đối với câu hỏi của bạn, tôi không chắc chắn bạn có thể làm những gì bạn muốn. Bạn không thể sử dụng một rdd bên trong phương thứC#map() của một rdd khác bởi vì tia lửa không cho phép các biến đổi bên trong các phép biến đổi. Và trong trường hợp của bạn, bạn cần gọi phương thức thu thập() để tạo bản đồ từ RDD vì bạn chỉ có thể sử dụng đối tượng bản đồ thông thường bên trong phương thứC#map() mà bạn không thể sử dụng RDD ở đó.

+0

Tôi không nghĩ rằng điều này thực sự gây ra các khung dữ liệu được phân phối mà không có một thu thập "dưới mui xe" (mặc dù đó là một linh cảm). Thay vào đó nó đánh dấu các khung dữ liệu cho phát sóng nếu chúng ta sử dụng nó để tham gia. Cần lưu ý rằng việc tham gia không phải là trường hợp sử dụng (chính) ở đây, mà tôi sẽ làm rõ trong OP của tôi bằng cách chỉnh sửa. – samthebest

+0

Không phải linh cảm: nó thu thập 'DataFrame' mà không chuyển đổi thành kiểu cục bộ và phát lại nó. Tôi khá chắc chắn có một câu trả lời mô tả điều này một nơi nào đó ở đây nhưng nó có thể được vào trò chuyện. Bằng cách này hay cách khác, nó chỉ ẩn 'sưu tập'. – zero323

Các vấn đề liên quan