2014-07-27 16 views
7

Tôi đã đọc trong một số nguồn rằng lệnh in không an toàn cho luồng và cách giải quyết khác là sử dụng lệnh sys.stdout.write thay vào đó, nhưng vẫn không hoạt động đối với tôi và văn bản STDOUT không phải là nguyên tử.Làm thế nào tôi có thể làm một ghi nguyên tử để stdout trong python?

Dưới đây là một ví dụ ngắn (gọi là tập tin parallelExperiment.py này):

import os 
    import sys 
    from multiprocessing import Pool 

    def output(msg): 
    msg = '%s%s' % (msg, os.linesep) 
    sys.stdout.write(msg) 

    def func(input): 
    output(u'pid:%d got input \"%s\"' % (os.getpid(), str(input))) 

    def executeFunctionInParallel(funcName, inputsList, maxParallelism): 
     output(u'Executing function %s on input of size %d with maximum parallelism of %d' % (
      funcName.__name__, len(inputsList), maxParallelism)) 
     parallelismPool = Pool(processes=maxParallelism) 
     executeBooleanResultsList = parallelismPool.map(funcName, inputsList) 
     parallelismPool.close() 
     output(u'Function %s executed on input of size %d with maximum parallelism of %d' % (
      funcName.__name__, len(inputsList), maxParallelism)) 
     # if all parallel executions executed well - the boolean results list should all be True 
     return all(executeBooleanResultsList) 

    if __name__ == "__main__": 
    inputsList=[str(i) for i in range(20)] 
    executeFunctionInParallel(func, inputsList, 4) 

Nhìn vào kết quả:

i. Sản lượng gọi python parallelExperiment.py (chú ý rằng từ "pid" là điều sai lầm trong một số dòng):

Executing function func on input of size 20 with maximum parallelism of 4 
ppid:2240 got input "0" 
id:4960 got input "2" 
pid:4716 got input "4" 
pid:4324 got input "6" 
ppid:2240 got input "1" 
id:4960 got input "3" 
pid:4716 got input "5" 
pid:4324 got input "7" 
ppid:4960 got input "8" 
id:2240 got input "10" 
pid:4716 got input "12" 
pid:4324 got input "14" 
ppid:4960 got input "9" 
id:2240 got input "11" 
pid:4716 got input "13" 
pid:4324 got input "15" 
ppid:4960 got input "16" 
id:2240 got input "18" 
ppid:2240 got input "19" 
id:4960 got input "17" 
Function func executed on input of size 20 with maximum parallelism of 4 

ii. Sản lượng gọi python parallelExperiment.py> parallelExperiment.log, có nghĩa là chuyển hướng stdout đến tập tin parallelExperiment.log (chú ý rằng thứ tự của dòng này là không tốt, bởi vì trước và sau khi gọi đến executeFunctionInParallel mà các cuộc gọi func song song, một thông báo sẽ được in):

pid:3244 got input "4" 
pid:3244 got input "5" 
pid:3244 got input "12" 
pid:3244 got input "13" 
pid:240 got input "0" 
pid:240 got input "1" 
pid:240 got input "8" 
pid:240 got input "9" 
pid:240 got input "16" 
pid:240 got input "17" 
pid:1268 got input "2" 
pid:1268 got input "3" 
pid:1268 got input "10" 
pid:1268 got input "11" 
pid:1268 got input "18" 
pid:1268 got input "19" 
pid:3332 got input "6" 
pid:3332 got input "7" 
pid:3332 got input "14" 
pid:3332 got input "15" 
Executing function func on input of size 20 with maximum parallelism of 4 
Function func executed on input of size 20 with maximum parallelism of 4 
+0

Kết quả sai lầm là những gì được xem là có nhiều luồng. Bạn sinh ra các luồng mà tất cả đều có dòng thời gian riêng của chúng. Thực tế là bash messes lên chuỗi * pid * là không thực sự không phổ biến. Đó là chủ yếu, như bạn nói, rằng nó không thực sự chấp nhận nhiều thụ thể tốt. Nó chỉ in những gì nó được và đôi khi messes callbacks chồng chéo. –

Trả lời

7

đó xảy ra vì multiprocessing.Pool thực sự sử dụng trình con thay vì đề. Bạn cần sử dụng rõ ràng synchronization giữa các quá trình. Lưu ý, ví dụ về liên kết, nó giải quyết vấn đề của bạn.

import os 
import sys 
from multiprocessing import Pool, Lock 

lock = Lock() 

def output(msg): 
    msg = '%s%s' % (msg, os.linesep) 
    with lock: 
     sys.stdout.write(msg) 

def func(input): 
    output(u'pid:%d got input \"%s\"' % (os.getpid(), str(input))) 

def executeFunctionInParallel(funcName, inputsList, maxParallelism): 
    output(u'Executing function %s on input of size %d with maximum parallelism of %d' % (
     funcName.__name__, len(inputsList), maxParallelism)) 
    parallelismPool = Pool(processes=maxParallelism) 
    executeBooleanResultsList = parallelismPool.map(funcName, inputsList) 
    parallelismPool.close() 
    output(u'Function %s executed on input of size %d with maximum parallelism of %d' % (
     funcName.__name__, len(inputsList), maxParallelism)) 
    # if all parallel executions executed well - the boolean results list should all be True 
    return all(executeBooleanResultsList) 

if __name__ == "__main__": 
    inputsList=[str(i) for i in range(20)] 
    executeFunctionInParallel(func, inputsList, 4) 
+0

và tôi có thể sử dụng phương pháp Pool.map không? ví dụ là sử dụng đối tượng "quá trình" –

+0

Tất nhiên bạn có thể. Bản thân Pool sử dụng các đối tượng xử lý nội bộ. –

+0

Tôi xin lỗi nhưng tôi không theo dõi .. Tôi nên thay đổi điều gì trong các phương thức ** executeFunctionInParallel ** và ** func **? –

1

Nếu bạn muốn tránh khóa và sẵn lòng đi đến một giao diện cấp thấp hơn, bạn có thể nhận được POSIX hành vi O_APPEND với os.open, os.write (nếu hệ thống của bạn hỗ trợ nó); và xem Is file append atomic in UNIX?.

Các vấn đề liên quan