2015-02-18 24 views
6

Tôi đang triển khai người học luồng để phân loại văn bản. Có một số thông số có giá trị duy nhất trong triển khai của tôi cần được cập nhật khi các mục hàng mới đến. Ví dụ, tôi muốn thay đổi tỷ lệ học tập như những dự đoán mới được thực hiện. Tuy nhiên, tôi nghi ngờ rằng có một cách để phát sóng các biến sau khi phát sóng ban đầu. Vì vậy, những gì sẽ xảy ra nếu tôi cần phát sóng một biến mỗi khi tôi cập nhật nó. Nếu có cách để thực hiện hoặc giải pháp cho những gì tôi muốn thực hiện trong Phát trực tuyến, tôi rất vui khi được nghe về điều đó.Phát sóng định kỳ trong Apache Spark Phát trực tuyến

Xin cảm ơn trước.

Trả lời

1

Sự hiểu biết của tôi là một khi biến phát sóng ban đầu được gửi đi, đó là 'chỉ đọc'. Tôi tin rằng bạn có thể cập nhật các biến phát sóng trên các nút địa phương, nhưng không phải trên các nút từ xa.

Có thể bạn cần cân nhắc thực hiện điều này 'Spark bên ngoài'. Làm thế nào về việc sử dụng một cửa hàng noSQL (Cassandra .. etc) hoặc thậm chí Memcache? Sau đó, bạn có thể cập nhật biến từ một nhiệm vụ và kiểm tra định kỳ cửa hàng này từ các tác vụ khác?

0

Tốt nhất là bạn thu thập dữ liệu vào trình điều khiển và sau đó phát chúng đến tất cả các nút.

Sử dụng Dstream # foreachRDD để thu thập RDD được tính tại trình điều khiển và khi bạn biết cần thay đổi tốc độ học, sau đó sử dụng SparkContext # broadcast (value) để gửi giá trị mới cho tất cả các nút.

tôi mong chờ các mã để trông giống như sau:

dStreamContainingBroadcastValue.foreachRDD{ rdd => 
     val valueToBroadcast = rdd.collect() 
     sc.broadcast(valueToBroadcast) 
} 

Bạn cũng có thể tìm this thread hữu ích, từ người sử dụng tia lửa mailing list. Cho tôi biết nếu nó hiệu quả.

+0

Vì vậy, làm cách nào để bạn có thể đoán được rằng mình có thể đọc biến phát sóng từ đoạn mã đó? Nó đánh bại mục đích của nó để trả lại 'Đơn vị'. – kareblak

1

Tôi đã chơi xấu, nhưng nó đã hoạt động! Chúng tôi có thể tìm cách nhận giá trị phát sóng từ một đối tượng phát sóng. https://github.com/apache/spark/blob/master/core/src/main/scala/org/apache/spark/broadcast/TorrentBroadcast.scala#L114 chỉ bằng id phát.

vì vậy tôi định kỳ phát lại qua cùng một id phát sóng.

val broadcastFactory = new TorrentBroadcastFactory() 
broadcastFactory.unbroadcast(BroadcastId, true, true) 
// append some ids to initIds 
val broadcastcontent = broadcastFactory.newBroadcast[.Set[String]](initIds, false, BroadcastId) 

và tôi có thể nhận BroadcastId từ giá trị phát sóng đầu tiên.

val ids = ssc.sparkContext.broadcast(initIds) 
// broadcast id 
val BroadcastId = broadcastIds.id 

sau đó sử dụng id công việc làm Loại phát sóng như bình thường.

def func(record: Array[Byte], bc: Broadcast[Set[String]]) = ??? 
1
bkc.unpersist(true) 
bkc.destroy() 
bkc = sc.broadcast(tableResultMap) 
bkv = bkc.value 

Bạn có thể thử điều này, tôi không đảm bảo hiệu quả cho dù

1

tôi đã làm việc này bằng cách tạo ra một lớp wrapper qua biến phát sóng. Phương thức updateAndGet của lớp wrapper trả về biến phát sóng được làm mới. Tôi gọi chức năng này bên dStream.transform -> theo Documentation Spark

http://spark.apache.org/docs/latest/streaming-programming-guide.html#transform-operation

Chuyển Operation trạng thái:. "chức năng cung cấp được gọi trong mỗi khoảng thời gian hàng loạt này cho phép bạn làm tốn nhiều thời gian các hoạt động RDD khác nhau, đó là các hoạt động RDD, số phân vùng, các biến phát sóng, v.v.có thể thay đổi giữa các lô "

lớp BroadcastWrapper sẽ trông giống như:.

public class BroadcastWrapper { 
private Broadcast<ReferenceData> broadcastVar; 
private Date lastUpdatedAt = Calendar.getInstance().getTime(); 

private static BroadcastWrapper obj = new BroadcastWrapper(); 

private BroadcastWrapper(){} 

public static BroadcastWrapper getInstance() { 
     return obj; 
} 

public JavaSparkContext getSparkContext(SparkContext sc) { 
     JavaSparkContext jsc = JavaSparkContext.fromSparkContext(sc); 
     return jsc; 
} 

public Broadcast<ReferenceData> updateAndGet(SparkContext sparkContext){ 
     Date currentDate = Calendar.getInstance().getTime(); 
     long diff = currentDate.getTime()-lastUpdatedAt.getTime(); 
     if (var == null || diff > 60000) { //Lets say we want to refresh every 1 min = 60000 ms 
      if (var != null) 
       var.unpersist(); 
      lastUpdatedAt = new Date(System.currentTimeMillis()); 

      //Your logic to refresh 
      ReferenceData data = getRefData(); 

      var = getSparkContext(sparkContext).broadcast(data); 
     } 
     return var; 
} 
} 

Bạn có thể sử dụng chức năng phát sóng updateAndGet biến trong phương pháp stream.transform cho phép RDD-RDD biến đổi

objectStream.transform(stream -> { 

    Broadcast<Object> var = BroadcastWrapper.getInstance().updateAndGet(stream.context()); 

/**Your code to manipulate stream **/ 
}); 

Tham khảo câu trả lời đầy đủ của tôi từ vị trí này: https://stackoverflow.com/a/41259333/3166245

Ho pe nó giúp

Các vấn đề liên quan