2011-08-26 40 views
41

Tôi đã nghiên cứu trước và không thể tìm thấy câu trả lời cho câu hỏi của mình. Tôi đang cố chạy nhiều hàm song song trong Python.Python: Làm thế nào tôi có thể chạy các hàm python song song?

Tôi có một cái gì đó như thế này:

files.py 

import common #common is a util class that handles all the IO stuff 

dir1 = 'C:\folder1' 
dir2 = 'C:\folder2' 
filename = 'test.txt' 
addFiles = [25, 5, 15, 35, 45, 25, 5, 15, 35, 45] 

def func1(): 
    c = common.Common() 
    for i in range(len(addFiles)): 
     c.createFiles(addFiles[i], filename, dir1) 
     c.getFiles(dir1) 
     time.sleep(10) 
     c.removeFiles(addFiles[i], dir1) 
     c.getFiles(dir1) 

def func2(): 
    c = common.Common() 
    for i in range(len(addFiles)): 
     c.createFiles(addFiles[i], filename, dir2) 
     c.getFiles(dir2) 
     time.sleep(10) 
     c.removeFiles(addFiles[i], dir2) 
     c.getFiles(dir2) 

Tôi muốn gọi Func1 và Func2 và họ đã chạy cùng một lúc. Các chức năng không tương tác với nhau hoặc trên cùng một đối tượng. Ngay bây giờ tôi phải chờ func1 kết thúc trước khi func2 bắt đầu. Làm cách nào để tôi thực hiện điều gì đó như dưới đây:

process.py 

from files import func1, func2 

runBothFunc(func1(), func2()) 

Tôi muốn có thể tạo cả hai thư mục khá gần với cùng một thời điểm vì mỗi phút tôi đếm số lượng tệp đang được tạo. Nếu thư mục không có ở đó, nó sẽ làm mất thời gian của tôi.

+1

Cập nhật câu hỏi – lmcadory

+1

Bạn có thể muốn thiết kế lại kiến ​​trúc này; nếu bạn đang đếm số lượng tệp/thư mục mỗi phút, bạn đang tạo ra một điều kiện chủng tộc. Điều gì về việc có mỗi chức năng cập nhật một truy cập, hoặc sử dụng một lockfile để đảm bảo rằng các quá trình định kỳ không cập nhật số cho đến khi cả hai chức năng đã hoàn thành thực hiện? –

Trả lời

73

Bạn có thể sử dụng threading hoặc multiprocessing.

Do peculiarities of CPython, threading khó có thể đạt được tính song song thực sự. Vì lý do này, multiprocessing thường là đặt cược tốt hơn.

Đây là một ví dụ hoàn chỉnh:

from multiprocessing import Process 

def func1(): 
    print 'func1: starting' 
    for i in xrange(10000000): pass 
    print 'func1: finishing' 

def func2(): 
    print 'func2: starting' 
    for i in xrange(10000000): pass 
    print 'func2: finishing' 

if __name__ == '__main__': 
    p1 = Process(target=func1) 
    p1.start() 
    p2 = Process(target=func2) 
    p2.start() 
    p1.join() 
    p2.join() 

Các cơ chế khởi động/tham gia tiến trình con có thể dễ dàng được đóng gói vào một chức năng dọc theo dòng của runBothFunc của bạn:

def runInParallel(*fns): 
    proc = [] 
    for fn in fns: 
    p = Process(target=fn) 
    p.start() 
    proc.append(p) 
    for p in proc: 
    p.join() 

runInParallel(func1, func2) 
+2

Tôi đã sử dụng mã của bạn nhưng các chức năng vẫn không khởi động cùng một lúc. – lmcadory

+2

@Lamar McAdory: Vui lòng giải thích chính xác ý bạn là gì "cùng một lúc", có lẽ là ví dụ cụ thể về những gì bạn đã làm, điều bạn mong đợi xảy ra và những gì thực sự xảy ra. – NPE

+3

@ Lamar: Bạn không bao giờ có bất kỳ sự đảm bảo nào về "chính xác cùng một lúc" và nghĩ rằng bạn có thể chỉ đơn giản là sai. Tùy thuộc vào số lượng cpus bạn có, tải của máy, thời gian của nhiều thứ xảy ra trên máy tính tất cả sẽ có ảnh hưởng đến thời gian bắt đầu luồng/quá trình. Ngoài ra, vì các quá trình được bắt đầu ngay sau khi tạo, chi phí tạo quá trình cũng phải được tính toán trong chênh lệch thời gian mà bạn thấy. – Martin

3

Không có cách nào để đảm bảo rằng hai chức năng sẽ thực thi đồng bộ với nhau mà dường như là những gì bạn muốn làm.

Điều tốt nhất bạn có thể làm là chia chức năng thành nhiều bước, sau đó đợi cả hai kết thúc tại các điểm đồng bộ quan trọng bằng cách sử dụng Process.join như đề cập đến câu trả lời của @ aix.

Điều này tốt hơn time.sleep(10) vì bạn không thể đảm bảo thời gian chính xác. Với một cách rõ ràng chờ đợi, bạn đang nói rằng các chức năng phải được thực hiện bước đó trước khi chuyển sang bước tiếp theo, thay vì giả sử nó sẽ được thực hiện trong vòng 10ms mà không được đảm bảo dựa trên những gì khác đang xảy ra trên máy.

3

Nếu bạn là người dùng cửa sổ và sử dụng python 3, thì bài đăng này sẽ giúp bạn lập trình song song trong python.when bạn chạy chương trình pool của thư viện đa xử lý thông thường, bạn sẽ gặp lỗi liên quan đến chức năng chính trong chương trình của mình . Điều này là do thực tế là các cửa sổ không có chức năng fork(). Bài viết dưới đây đưa ra một giải pháp cho vấn đề được đề cập.

http://python.6.x6.nabble.com/Multiprocessing-Pool-woes-td5047050.html

Kể từ khi tôi đã sử dụng python 3, tôi đã thay đổi chương trình một chút như thế này:

from types import FunctionType 
import marshal 

def _applicable(*args, **kwargs): 
    name = kwargs['__pw_name'] 
    code = marshal.loads(kwargs['__pw_code']) 
    gbls = globals() #gbls = marshal.loads(kwargs['__pw_gbls']) 
    defs = marshal.loads(kwargs['__pw_defs']) 
    clsr = marshal.loads(kwargs['__pw_clsr']) 
    fdct = marshal.loads(kwargs['__pw_fdct']) 
    func = FunctionType(code, gbls, name, defs, clsr) 
    func.fdct = fdct 
    del kwargs['__pw_name'] 
    del kwargs['__pw_code'] 
    del kwargs['__pw_defs'] 
    del kwargs['__pw_clsr'] 
    del kwargs['__pw_fdct'] 
    return func(*args, **kwargs) 

def make_applicable(f, *args, **kwargs): 
    if not isinstance(f, FunctionType): raise ValueError('argument must be a function') 
    kwargs['__pw_name'] = f.__name__ # edited 
    kwargs['__pw_code'] = marshal.dumps(f.__code__) # edited 
    kwargs['__pw_defs'] = marshal.dumps(f.__defaults__) # edited 
    kwargs['__pw_clsr'] = marshal.dumps(f.__closure__) # edited 
    kwargs['__pw_fdct'] = marshal.dumps(f.__dict__) # edited 
    return _applicable, args, kwargs 

def _mappable(x): 
    x,name,code,defs,clsr,fdct = x 
    code = marshal.loads(code) 
    gbls = globals() #gbls = marshal.loads(gbls) 
    defs = marshal.loads(defs) 
    clsr = marshal.loads(clsr) 
    fdct = marshal.loads(fdct) 
    func = FunctionType(code, gbls, name, defs, clsr) 
    func.fdct = fdct 
    return func(x) 

def make_mappable(f, iterable): 
    if not isinstance(f, FunctionType): raise ValueError('argument must be a function') 
    name = f.__name__ # edited 
    code = marshal.dumps(f.__code__) # edited 
    defs = marshal.dumps(f.__defaults__) # edited 
    clsr = marshal.dumps(f.__closure__) # edited 
    fdct = marshal.dumps(f.__dict__) # edited 
    return _mappable, ((i,name,code,defs,clsr,fdct) for i in iterable) 

Sau khi chức năng này, các mã vấn đề trên cũng được thay đổi một chút như thế này:

from multiprocessing import Pool 
from poolable import make_applicable, make_mappable 

def cube(x): 
    return x**3 

if __name__ == "__main__": 
    pool = Pool(processes=2) 
    results = [pool.apply_async(*make_applicable(cube,x)) for x in range(1,7)] 
    print([result.get(timeout=10) for result in results]) 

Và tôi có đầu ra như:

[1, 8, 27, 64, 125, 216] 

Tôi nghĩ rằng bài đăng này có thể hữu ích cho một số người dùng windows.

Các vấn đề liên quan