2011-07-26 39 views
62

Chương trình tạo nhiều quy trình hoạt động trên hàng đợi có thể tham gia, Q và cuối cùng có thể thao tác từ điển toàn cầu D để lưu trữ kết quả. (do đó, mỗi quy trình con có thể sử dụng D để lưu trữ kết quả của nó và cũng xem kết quả của các quá trình con khác đang tạo ra)Đa xử lý Python: Làm cách nào để chia sẻ một dict giữa nhiều quy trình?

Nếu tôi in từ điển D trong quá trình con, tôi thấy các sửa đổi đã được thực hiện trên đó (tức là trên D). Nhưng sau khi quá trình chính tham gia Q, nếu tôi in D, đó là một dict trống!

Tôi hiểu đó là sự cố đồng bộ hóa/khóa. Ai đó có thể cho tôi biết những gì đang xảy ra ở đây và làm thế nào tôi có thể đồng bộ hóa quyền truy cập vào D?

Trả lời

15

đa xử lý không giống như luồng. Mỗi tiến trình con sẽ nhận được một bản sao của bộ nhớ của quá trình chính. Nói chung trạng thái được chia sẻ thông qua giao tiếp (ống/ổ cắm), tín hiệu hoặc bộ nhớ dùng chung.

Đa làm cho một số trừu tượng có sẵn đối với trường hợp sử dụng của bạn - nhà nước chia sẻ đó là coi như địa phương bằng cách sử dụng proxy hoặc chia sẻ bộ nhớ: http://docs.python.org/library/multiprocessing.html#sharing-state-between-processes

phần liên quan:

+0

Thanks a lot. Bạn đã dẫn tôi đến/a giải pháp: multiprocessing.Manager(). Dict(). – dop

92

Câu trả lời chung liên quan đến việc sử dụng đối tượng Manager. Chuyển thể từ các tài liệu:

from multiprocessing import Process, Manager 

def f(d): 
    d[1] += '1' 
    d['2'] += 2 

if __name__ == '__main__': 
    manager = Manager() 

    d = manager.dict() 
    d[1] = '1' 
    d['2'] = 2 

    p1 = Process(target=f, args=(d,)) 
    p2 = Process(target=f, args=(d,)) 
    p1.start() 
    p2.start() 
    p1.join() 
    p2.join() 

    print d 

Output:

$ python mul.py 
{1: '111', '2': 6} 
+2

Cảm ơn người gửi. Thật vậy, D = multiprocessing.Manager(). Dict() giải quyết vấn đề của tôi. Tôi đã sử dụng D = dict(). – dop

+0

làm việc với Manager(). Dict() nhưng không phải Manager(). List() –

+0

@Coc, hoạt động hoàn hảo cho tôi, khi tôi thay đổi chuỗi thành int và preallocated danh sách. Bạn phải làm lỗi ở đâu đó. – senderle

1

Có lẽ bạn có thể thử pyshmht, chia sẻ bộ nhớ mở rộng bảng băm dựa cho Python.

Thông báo

  1. Nó không kiểm tra đầy đủ, chỉ để bạn tham khảo.

  2. Hiện tại, thiếu cơ chế khóa/bán để xử lý đa.

8

Tôi muốn chia sẻ công việc của mình nhanh hơn quản lý của dict và đơn giản hơn và ổn định hơn thư viện pyshmht sử dụng tấn bộ nhớ và không hoạt động trên Mac OS. Mặc dù dict của tôi chỉ hoạt động cho các chuỗi đơn giản và không thay đổi. Tôi sử dụng triển khai thăm dò tuyến tính và lưu trữ các cặp khóa và giá trị trong một khối bộ nhớ riêng biệt sau bảng.

from mmap import mmap 
import struct 
from timeit import default_timer 
from multiprocessing import Manager 
from pyshmht import HashTable 


class shared_immutable_dict: 
    def __init__(self, a): 
     self.hs = 1 << (len(a) * 3).bit_length() 
     kvp = self.hs * 4 
     ht = [0xffffffff] * self.hs 
     kvl = [] 
     for k, v in a.iteritems(): 
      h = self.hash(k) 
      while ht[h] != 0xffffffff: 
       h = (h + 1) & (self.hs - 1) 
      ht[h] = kvp 
      kvp += self.kvlen(k) + self.kvlen(v) 
      kvl.append(k) 
      kvl.append(v) 

     self.m = mmap(-1, kvp) 
     for p in ht: 
      self.m.write(uint_format.pack(p)) 
     for x in kvl: 
      if len(x) <= 0x7f: 
       self.m.write_byte(chr(len(x))) 
      else: 
       self.m.write(uint_format.pack(0x80000000 + len(x))) 
      self.m.write(x) 

    def hash(self, k): 
     h = hash(k) 
     h = (h + (h >> 3) + (h >> 13) + (h >> 23)) * 1749375391 & (self.hs - 1) 
     return h 

    def get(self, k, d=None): 
     h = self.hash(k) 
     while True: 
      x = uint_format.unpack(self.m[h * 4:h * 4 + 4])[0] 
      if x == 0xffffffff: 
       return d 
      self.m.seek(x) 
      if k == self.read_kv(): 
       return self.read_kv() 
      h = (h + 1) & (self.hs - 1) 

    def read_kv(self): 
     sz = ord(self.m.read_byte()) 
     if sz & 0x80: 
      sz = uint_format.unpack(chr(sz) + self.m.read(3))[0] - 0x80000000 
     return self.m.read(sz) 

    def kvlen(self, k): 
     return len(k) + (1 if len(k) <= 0x7f else 4) 

    def __contains__(self, k): 
     return self.get(k, None) is not None 

    def close(self): 
     self.m.close() 

uint_format = struct.Struct('>I') 


def uget(a, k, d=None): 
    return to_unicode(a.get(to_str(k), d)) 


def uin(a, k): 
    return to_str(k) in a 


def to_unicode(s): 
    return s.decode('utf-8') if isinstance(s, str) else s 


def to_str(s): 
    return s.encode('utf-8') if isinstance(s, unicode) else s 


def mmap_test(): 
    n = 1000000 
    d = shared_immutable_dict({str(i * 2): '1' for i in xrange(n)}) 
    start_time = default_timer() 
    for i in xrange(n): 
     if bool(d.get(str(i))) != (i % 2 == 0): 
      raise Exception(i) 
    print 'mmap speed: %d gets per sec' % (n/(default_timer() - start_time)) 


def manager_test(): 
    n = 100000 
    d = Manager().dict({str(i * 2): '1' for i in xrange(n)}) 
    start_time = default_timer() 
    for i in xrange(n): 
     if bool(d.get(str(i))) != (i % 2 == 0): 
      raise Exception(i) 
    print 'manager speed: %d gets per sec' % (n/(default_timer() - start_time)) 


def shm_test(): 
    n = 1000000 
    d = HashTable('tmp', n) 
    d.update({str(i * 2): '1' for i in xrange(n)}) 
    start_time = default_timer() 
    for i in xrange(n): 
     if bool(d.get(str(i))) != (i % 2 == 0): 
      raise Exception(i) 
    print 'shm speed: %d gets per sec' % (n/(default_timer() - start_time)) 


if __name__ == '__main__': 
    mmap_test() 
    manager_test() 
    shm_test() 

Mở kết quả thực hiện máy tính xách tay của tôi là:

mmap speed: 247288 gets per sec 
manager speed: 33792 gets per sec 
shm speed: 691332 gets per sec 

đơn giản ví dụ sử dụng:

ht = shared_immutable_dict({'a': '1', 'b': '2'}) 
print ht.get('a') 
+5

Github? Tài liệu? làm thế nào chúng ta có thể sử dụng công cụ này? – Pavlos

Các vấn đề liên quan