2016-01-27 16 views
10

Tôi đang chạy Spark Streaming với hai cửa sổ khác nhau (trên cửa sổ để đào tạo một mô hình với SKLearn và người kia để dự đoán giá trị dựa trên mô hình đó) và tôi tự hỏi làm thế nào tôi có thể tránh một cửa sổ (cửa sổ đào tạo "chậm") để đào tạo một mô hình, mà không "chặn" cửa sổ dự đoán "nhanh".
đang đơn giản hóa của tôi trông như sau:Làm thế nào để tránh một cửa sổ Spark Streaming chặn một cửa sổ khác với cả việc chạy một số mã Python gốc

conf = SparkConf() 
conf.setMaster("local[4]") 
sc = SparkContext(conf=conf) 
ssc = StreamingContext(sc, 1) 

stream = ssc.socketTextStream("localhost", 7000) 


import Custom_ModelContainer 

### Window 1 ### 
### predict data based on model computed in window 2 ### 

def predict(time, rdd): 
    try: 
     # ... rdd conversion to df, feature extraction etc... 

     # regular python code 
     X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) 
     pred = Custom_ModelContainer.getmodel().predict(X) 

     # send prediction to GUI 

    except Exception, e: print e 

predictionStream = stream.window(60,60) 
predictionStream.foreachRDD(predict) 


### Window 2 ### 
### fit new model ### 

def trainModel(time, rdd): 
try: 
    # ... rdd conversion to df, feature extraction etc... 

    X = np.array(df.map(lambda lp: lp.features.toArray()).collect()) 
    y = np.array(df.map(lambda lp: lp.label).collect()) 

    # train test split etc... 

    model = SVR().fit(X_train, y_train) 
    Custom_ModelContainer.setModel(model) 

except Exception, e: print e 

modelTrainingStream = stream.window(600,600) 
modelTrainingStream.foreachRDD(trainModel) 

(Lưu ý: Custom_ModelContainer là một lớp học tôi đã viết để lưu và lấy các mô hình đào tạo)

Thiết lập của tôi nói chung là hoạt động tốt, ngoại trừ rằng mỗi lần một mô hình mới được đào tạo trong cửa sổ thứ hai (mất khoảng một phút), các cửa sổ đầu tiên không tính toán các dự đoán cho đến khi việc đào tạo mô hình kết thúc. Trên thực tế, tôi đoán rằng điều này có ý nghĩa, vì mô hình phù hợp và dự đoán đều được tính trên nút chính (trong một thiết lập không phân phối - do SKLearn).

Vì vậy, câu hỏi của tôi là như sau: Có thể đào tạo mô hình trên một nút công nhân duy nhất (thay vì nút chính) không? Nếu vậy, làm thế nào tôi có thể đạt được thứ hai và điều đó thực sự sẽ giải quyết vấn đề của tôi?

Nếu không, bất kỳ đề xuất nào khác về cách tôi có thể thực hiện công việc thiết lập như vậy mà không trì hoãn tính toán trong cửa sổ 1?

Bất kỳ trợ giúp nào được đánh giá cao.

EDIT: Tôi đoán câu hỏi tổng quát hơn sẽ là: Làm cách nào để chạy hai tác vụ khác nhau trên hai công nhân khác nhau song song?

Trả lời

2

Tuyên bố từ chối trách nhiệm: Đây chỉ là một tập hợp các ý tưởng. Không ai trong số này đã được thử nghiệm trong thực tế.


Một vài điều bạn có thể thử:

  1. Đừng collect để predict. scikit-learn mô hình thường serializable vì vậy quá trình dự đoán có thể dễ dàng xử lý trên cụm:

    def predict(time, rdd): 
        ... 
    
        model = Custom_ModelContainer.getmodel() 
        pred = (df.rdd.map(lambda lp: lp.features.toArray()) 
         .mapPartitions(lambda iter: model.predict(np.array(list(iter))))) 
        ... 
    

    Nó không nên chỉ parallelize dự đoán mà còn, nếu dữ liệu thô không được đưa vào giao diện đồ họa, làm giảm lượng dữ liệu phải được thu thập .

  2. Cố gắng collect và gửi dữ liệu một cách không đồng bộ. PySpark không cung cấp collectAsync phương pháp nhưng bạn có thể cố gắng để đạt được một cái gì đó tương tự với concurrent.futures:

    from pyspark.rdd import RDD 
    from concurrent.futures import ThreadPoolExecutor 
    
    executor = ThreadPoolExecutor(max_workers=4) 
    
    def submit_to_gui(*args): ... 
    
    def submit_if_success(f): 
        if not f.exception(): 
         executor.submit(submit_to_gui, f.result()) 
    

    tiếp tục từ 1.

    def predict(time, rdd): 
        ... 
        f = executor.submit(RDD.collect, pred) 
        f.add_done_callback(submit_if_success) 
        ... 
    
  3. Nếu bạn thực sự muốn sử dụng địa phương scikit-learn mô hình cố gắng collectfit sử dụng tương lai như trên. Bạn cũng có thể cố gắng thu thập chỉ một lần, đặc biệt là nếu dữ liệu không được lưu trữ: Quá trình đào tạo

    def collect_and_train(df): 
        y, X = zip(*((p.label, p.features.toArray()) for p in df.collect())) 
        ... 
        return SVR().fit(X_train, y_train) 
    
    def set_if_success(f): 
        if not f.exception(): 
         Custom_ModelContainer.setModel(f.result()) 
    
    def trainModel(time, rdd): 
        ... 
        f = excutor.submit(collect_and_train, df) 
        f.add_done_callback(set_if_success) 
        ... 
    
  4. Di chuyển đến cụm hoặc sử dụng các giải pháp đã tồn tại như spark-sklearn hoặc phương pháp tùy chỉnh:

    • giải pháp ngây thơ - chuẩn bị dữ liệu của bạn, coalesce(1) và đào tạo một mô hình duy nhất bằng cách sử dụng mapPartitions.
    • giải pháp phân tán - tạo và xác thực một mô hình riêng biệt trên mỗi phân vùng bằng cách sử dụng mapPartitions, thu thập các mô hình và sử dụng như một tập hợp ví dụ bằng cách lấy dự đoán trung bình hoặc trung bình.
  5. Ném đi scikit-learn và sử dụng một mô hình có thể được đào tạo và duy trì trong một môi trường phân phối, truyền trực tuyến (ví dụ StreamingLinearRegressionWithSGD).

    Cách tiếp cận hiện tại của bạn khiến Spark lỗi thời. Nếu bạn có thể đào tạo mô hình tại địa phương có một cơ hội tốt mà bạn có thể thực hiện tất cả các nhiệm vụ khác nhanh hơn nhiều trên máy địa phương. Nếu không, chương trình của bạn sẽ bị lỗi trên collect.

1

Tôi nghĩ rằng những gì bạn đang tìm kiếm là thuộc tính: "spark.streaming.concurrentJobs" mặc định là 1. Tăng điều này sẽ cho phép bạn chạy nhiều hàm foreachRDD song song.

In JobScheduler.scala:

private val numConcurrentJobs = ssc.conf.getInt("spark.streaming.concurrentJobs", 1) 

Chỉ cần một lời nhắc nhở đến cũng phải nhận thức về an toàn thread trên mô hình chứa tùy chỉnh của bạn nếu bạn đang đi để được biến đổi và đọc song song. :)

Các vấn đề liên quan