2011-11-26 31 views
10

Tôi muốn biết cách xử lý đa được thực hiện đúng cách. Giả sử tôi có một danh sách [1,2,3,4,5] được tạo bởi hàm f1 được ghi vào một số Queue (vòng tròn màu xanh lá cây bên trái). Bây giờ tôi bắt đầu hai quá trình kéo từ hàng đợi đó (bằng cách thực hiện f2 trong các quy trình). Họ xử lý dữ liệu, giả sử: nhân đôi giá trị và ghi nó vào hàng đợi thứ hai. Bây giờ, hàm f3 đọc dữ liệu này và in ra.Đa xử lý trong một đường ống được thực hiện ngay

layout of the data flow

Bên trong các chức năng có một loại một vòng, cố gắng đọc từ hàng đợi mãi mãi. Làm thế nào để tôi ngừng quá trình này?

Idea 1

f1 không chỉ gửi danh sách, mà còn là một đối tượng None hoặc một đối tượng custon, class PipelineTerminator: pass hoặc một số ví dụ mà chỉ được tuyên truyền tất cả các con đường xuống. f3 bây giờ chờ None để đến, khi nó ở đó, nó vỡ ra khỏi vòng lặp. Sự cố: có thể một trong hai số f2 s đọc và truyền bá số None trong khi số khác vẫn xử lý một số. Sau đó, giá trị cuối cùng bị mất.

Idea 2

f3f1. Vì vậy, chức năng f1 tạo ra dữ liệu và các đường ống, sinh ra các quy trình với f2 và cung cấp tất cả dữ liệu. Sau khi sinh sản và cho ăn, nó lắng nghe trên ống thứ hai, chỉ cần đếm và xử lý các đối tượng nhận được. Vì nó biết lượng dữ liệu được nạp, nó có thể chấm dứt các quá trình thực thi f2. Nhưng nếu mục tiêu là để thiết lập một đường ống xử lý, các bước khác nhau nên được tách ra. Vì vậy, f1, f2f3 là các thành phần khác nhau của một đường ống và các bước đắt tiền được thực hiện song song.

Idea 3

pipeline idea 3

Mỗi mảnh của đường ống là một chức năng, chức năng này sinh ra các quy trình vì nó thích và có trách nhiệm để quản lý chúng. Nó biết, bao nhiêu dữ liệu đến và bao nhiêu dữ liệu đã được trả về (với yield có thể). Vì vậy, nó an toàn để tuyên truyền một đối tượng None.

setup child processes 

execute thread one and two and wait until both finished 

thread 1: 
    while True: 
     pull from input queue 
     if None: break and set finished_flag 
     else: push to queue1 and increment counter1 

thread 2: 
    while True: 
     pull from queue2 
     increment counter2 
     yield result 
     if counter1 == counter2 and finished_flag: break 

when both threads finished: kill process pool and return. 

(Thay vì sử dụng chủ đề, có lẽ người ta có thể nghĩ ra một giải pháp thông minh hơn.)

Vậy ...

tôi đã thực hiện một giải pháp sau đây ý tưởng 2, cho ăn và chờ đợi kết quả đến, nhưng nó không thực sự là một đường ống với các chức năng độc lập được cắm vào nhau. Nó làm việc cho công việc tôi phải quản lý, nhưng rất khó để duy trì.

Tôi muốn nghe ý kiến ​​của bạn về cách bạn triển khai đường ống (dễ dàng trong một quy trình với các chức năng của máy phát, v.v., nhưng với nhiều quy trình?) Và quản lý chúng thường xuyên.

Trả lời

1

Điều gì sẽ sai khi sử dụng ý tưởng 1, nhưng với mỗi quy trình công nhân (f2) đặt một đối tượng tùy chỉnh với mã định danh của nó khi nó được thực hiện?Sau đó f3, sẽ chấm dứt công nhân đó, cho đến khi không còn quy trình công nhân nào.

Ngoài ra, mới bằng Python 3.2 là gói concurrent.futures trên thư viện chuẩn, mà nên làm những gì bạn đang cố gắng trong "đúng cách" (tm) - http://docs.python.org/dev/library/concurrent.futures.html

Có lẽ chúng ta có thể tìm một backport của concurrent.futures cho Python 2.x series.

+0

Nhưng người lao động nên biết như thế nào trong 'f2' * biết * đó là người cuối cùng? 'f1' cần phải biết có bao nhiêu công nhân và gửi số đối tượng tùy chỉnh đó. Làm như vậy, đảm bảo rằng mọi nhân viên đều nhận được thông báo này. Điều đó rõ ràng là có thể, nhưng sau đó tôi không thể "chỉ cần cắm các chức năng", tôi cần phải biết có bao nhiêu công nhân ở mỗi bước. Đó là lý do tại sao tôi thích ý tưởng 3. Và cảm ơn bạn vì những thứ 'đồng thời', đó là điều mới mẻ đối với tôi và tôi sẽ đào sâu vào nó. –

+0

Đó cũng là lý do tại sao tôi đã chọn "chấp nhận" :) –

+0

Vì đối tượng tùy chỉnh "ngừng hoạt động" được gửi bởi "F1", nó có thể bao gồm tổng số quy trình công nhân "f2". Nếu những điều này chỉ truyền đối tượng "ngừng làm việc" tới "f3", nó sẽ biết tổng số người lao động. Thông tin chi tiết có thể được gửi theo cách này - vì vậy một điều quan trọng là phải có một "lớp điều khiển" ít nhất trong "f3" (nhưng cũng có thể trong "f1") mà sẽ chỉ lo lắng về điều này và chỉ truyền xuống bất kỳ "thông báo" không các đối tượng trên hàng đợi để thực sự được xử lý. – jsbueno

1

Đối Idea 1, làm thế nào về:

import multiprocessing as mp 

sentinel=None 

def f2(inq,outq): 
    while True: 
     val=inq.get() 
     if val is sentinel: 
      break 
     outq.put(val*2) 

def f3(outq): 
    while True: 
     val=outq.get() 
     if val is sentinel: 
      break 
     print(val) 

def f1(): 
    num_workers=2 
    inq=mp.Queue() 
    outq=mp.Queue() 
    for i in range(5): 
     inq.put(i) 
    for i in range(num_workers):   
     inq.put(sentinel) 
    workers=[mp.Process(target=f2,args=(inq,outq)) for i in range(2)] 
    printer=mp.Process(target=f3,args=(outq,)) 
    for w in workers: 
     w.start() 
    printer.start() 
    for w in workers: 
     w.join() 
    outq.put(sentinel) 
    printer.join() 

if __name__=='__main__': 
    f1() 

Sự khác biệt duy từ mô tả của Idea 1 là f2 phá vỡ ra khỏi while-loop khi nó nhận được các trọng điểm (như vậy, chấm dứt chính nó). f1 khối cho đến khi công nhân được thực hiện (sử dụng w.join()) và sau đó gửi f3 sentinel (báo hiệu rằng nó thoát ra khỏi while-loop) của nó.

+0

Cảm ơn bạn, đó là tương tự như cách tiếp cận tôi đã kết thúc thực hiện, nhưng phiên bản của bạn là rất dễ đọc. Những gì tôi không thích là một thực tế là mọi thành phần của đường ống cần phải biết gì đó về đường ống, như trong trường hợp này: 'máy in' cần biết số lượng công nhân trong bước trước và cứ tiếp tục như vậy. Đó là lý do tại sao tôi nghĩ về việc đóng gói này và cho * mọi bước * trong đường ống * chính xác một * đầu vào và một đầu ra và phân nhánh và hợp nhất diễn ra trong mỗi bước. –

+0

Đó là một điểm tốt. Bạn có thể làm cho 'f3' độc lập với' num_workers' nhưng để 'f1' gửi sentinel sau khi' workers' được thực hiện. Tôi đã chỉnh sửa bài đăng để hiển thị ý của tôi. – unutbu

7

Với MPipe mô-đun, bạn chỉ cần làm điều này:

from mpipe import OrderedStage, Pipeline 

def f1(value): 
    return value * 2 

def f2(value): 
    print(value) 

s1 = OrderedStage(f1, size=2) 
s2 = OrderedStage(f2) 
p = Pipeline(s1.link(s2)) 

for task in 1, 2, 3, 4, 5, None: 
    p.put(task) 

Máy chạy trên 4 quy trình:

  • hai cho giai đoạn đầu tiên (chức năng f1)
  • một cho giai đoạn thứ hai (functi trên f2)
  • một nhiều hơn cho chương trình chính cung cấp đường dẫn.

MPipe cookbook cung cấp giải thích về cách các quy trình được tắt nội bộ bằng cách sử dụng None làm tác vụ cuối cùng.

Để chạy mã, cài đặt MPipe:

virtualenv venv 
venv/bin/pip install mpipe 
venv/bin/python prog.py 

Output:

2 
4 
6 
8 
10 
+0

Có vẻ tốt, ít nhất là ví dụ giới thiệu! Logo đẹp, nhân tiện. –

0

Cách dễ nhất để thực hiện chính xác đó là sử dụng Cột.

F1

F1 được Populating 'Queue' của bạn với dữ liệu bạn muốn để xử lý. Kết thúc thúc đẩy này, bạn đặt n 'Stop' từ khóa trong hàng đợi của bạn. n = 2 cho ví dụ của bạn, nhưng thường là số lượng công nhân tham gia. Mã sẽ trông như thế:

for n in no_of_processes: 
    tasks.put('Stop') 

F2

F2 là kéo từ hàng đợi được cung cấp bởi một -Command get. Phần tử được lấy từ hàng đợi và bị xóa trong hàng đợi.Bây giờ, bạn có thể đặt các cửa sổ pop thành một vòng lặp trong khi chú ý đến các tín hiệu dừng:

for elem in iter(tasks.get, 'STOP'): 
    do something 

F3

một Đây là một chút khéo léo. Bạn có thể tạo ra một semaphore trong F2 hoạt động như một tín hiệu cho F3. Nhưng bạn không biết khi nào tín hiệu này đến và bạn có thể mất dữ liệu. Tuy nhiên, F3 lấy dữ liệu giống như F2 và bạn có thể đặt dữ liệu đó vào một mã số try... except. queue.get tăng queue.Empty khi không có yếu tố nào trong hàng đợi. Vì vậy, kéo bạn trong F3 sẽ trông như thế:

while control: 
    try: 
     results.get() 
    except queue.Empty: 
     control = False 

Với tasksresults là hàng đợi. Vì vậy, bạn không cần bất cứ điều gì mà chưa được bao gồm trong Python.

0

Tôi sử dụng concurent.futures và ba hồ bơi, được kết nối với nhau qua future.add_done_callback. Sau đó, tôi chờ cho toàn bộ quá trình kết thúc bằng cách gọi shutdown trên mỗi hồ bơi.

from concurrent.futures import ProcessPoolExecutor 
import time 
import random 


def worker1(arg): 
    time.sleep(random.random()) 
    return arg 


def pipe12(future): 
    pool2.submit(worker2, future.result()).add_done_callback(pipe23) 


def worker2(arg): 
    time.sleep(random.random()) 
    return arg 


def pipe23(future): 
    pool3.submit(worker3, future.result()).add_done_callback(spout) 


def worker3(arg): 
    time.sleep(random.random()) 
    return arg 


def spout(future): 
    print(future.result()) 


if __name__ == "__main__": 
    __spec__ = None # Fix multiprocessing in Spyder's IPython 
    pool1 = ProcessPoolExecutor(2) 
    pool2 = ProcessPoolExecutor(2) 
    pool3 = ProcessPoolExecutor(2) 
    for i in range(10): 
     pool1.submit(worker1, i).add_done_callback(pipe12) 
    pool1.shutdown() 
    pool2.shutdown() 
    pool3.shutdown() 
Các vấn đề liên quan