2009-07-15 27 views
34

Tôi có một chương trình đa luồng, nơi tôi tạo ra một chức năng máy phát điện và sau đó chuyển nó đến các chủ đề mới. Tôi muốn nó được chia sẻ/toàn cầu trong tự nhiên vì vậy mỗi thread có thể nhận được giá trị tiếp theo từ máy phát điện.Are Generators Threadsafe?

Có an toàn khi sử dụng máy phát như thế này hay tôi sẽ gặp phải các vấn đề/điều kiện khi truy cập trình tạo chia sẻ từ nhiều luồng?

Nếu không, có cách nào tốt hơn để tiếp cận sự cố không? Tôi cần cái gì đó sẽ chu kỳ thông qua một danh sách và sản xuất các giá trị tiếp theo cho bất kỳ chủ đề gọi nó.

Trả lời

49

Chủ đề không an toàn; các cuộc gọi đồng thời có thể xen kẽ, và gây rối với các biến cục bộ.

Cách tiếp cận phổ biến là sử dụng mẫu phụ-phụ (bây giờ được gọi là mẫu công nhân nông dân trong PC). Tạo một luồng thứ ba tạo ra dữ liệu và thêm một Hàng đợi giữa chủ và các nô lệ, nơi mà các nô lệ sẽ đọc từ hàng đợi và chủ sẽ ghi vào nó. Module xếp hàng tiêu chuẩn cung cấp sự an toàn chủ đề cần thiết và sắp xếp để chặn master cho đến khi nô lệ sẵn sàng đọc thêm dữ liệu.

+7

Chắc chắn +1 cho hàng đợi.Queue, cách tuyệt vời để tổ chức hệ thống luồng khi có thể (hầu hết thời gian và chắc chắn cho tác vụ này). –

-7

Nó phụ thuộc vào python thực hiện bạn đang sử dụng. Trong CPython, GIL thực hiện tất cả các hoạt động trên các đối tượng python threadsafe, vì chỉ có một luồng có thể thực thi mã tại bất kỳ thời điểm nào.

http://en.wikipedia.org/wiki/Global_Interpreter_Lock

+1

"GIL làm cho tất cả các hoạt động trên các đối tượng python threadsafe" - huh? tất cả các hoạt động không phải là nguyên tử –

+6

Điều này gây hiểu nhầm nguy hiểm. GIL chỉ có nghĩa là mã Python sẽ không làm hỏng trạng thái Python trong một môi trường đa luồng: bạn không thể thay đổi các luồng ở giữa một mã byte bytecode. (Ví dụ, bạn có thể sửa đổi một dict chia sẻ mà không làm hỏng nó.) Bạn vẫn có thể thay đổi chủ đề giữa bất kỳ ops bytecode nào. –

40

Edited để thêm điểm chuẩn dưới đây.

Bạn có thể quấn máy phát điện có khóa. Ví dụ,

import threading 
class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

gen = [x*2 for x in [1,2,3,4]] 
g2 = LockedIterator(gen) 
print list(g2) 

Khóa mất 50ms trên hệ thống của tôi, Queue mất 350ms. Hàng đợi rất hữu ích khi bạn thực sự có hàng đợi; ví dụ, nếu bạn có các yêu cầu HTTP đến và bạn muốn xếp hàng chúng để xử lý bởi các luồng công nhân. (Điều đó không phù hợp với mô hình trình lặp Python - một khi một trình vòng lặp chạy hết các mục, nó được thực hiện.) Nếu bạn thực sự có một trình lặp, thì Trình thay đổi khóa (LockIterator) là một cách nhanh hơn và đơn giản hơn để làm cho nó an toàn.

from datetime import datetime 
import threading 
num_worker_threads = 4 

class LockedIterator(object): 
    def __init__(self, it): 
     self.lock = threading.Lock() 
     self.it = it.__iter__() 

    def __iter__(self): return self 

    def next(self): 
     self.lock.acquire() 
     try: 
      return self.it.next() 
     finally: 
      self.lock.release() 

def test_locked(it): 
    it = LockedIterator(it) 
    def worker(): 
     try: 
      for i in it: 
       pass 
     except Exception, e: 
      print e 
      raise 

    threads = [] 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     threads.append(t) 
     t.start() 

    for t in threads: 
     t.join() 

def test_queue(it): 
    from Queue import Queue 
    def worker(): 
     try: 
      while True: 
       item = q.get() 
       q.task_done() 
     except Exception, e: 
      print e 
      raise 

    q = Queue() 
    for i in range(num_worker_threads): 
     t = threading.Thread(target=worker) 
     t.setDaemon(True) 
     t.start() 

    t1 = datetime.now() 

    for item in it: 
     q.put(item) 

    q.join() 

start_time = datetime.now() 
it = [x*2 for x in range(1,10000)] 

test_locked(it) 
#test_queue(it) 
end_time = datetime.now() 
took = end_time-start_time 
print "took %.01f" % ((took.seconds + took.microseconds/1000000.0)*1000) 
+1

Ít hiệu quả hơn khi sử dụng Queue.Queue, nhưng được thực hiện rất đẹp. – gooli

Các vấn đề liên quan