2012-02-12 27 views
5

Một vài ngày trước, tôi đã hỏi một câu hỏi về việc giúp tôi thiết kế một mô hình để cấu trúc nhiều yêu cầu HTTPSự khác biệt về thông lượng khi sử dụng coroutines vs luồng

Đây là kịch bản. Tôi muốn có một hệ thống đa người sản xuất, đa người tiêu dùng. Các nhà sản xuất của tôi thu thập dữ liệu và loại bỏ một vài trang web và thêm các liên kết mà nó tìm thấy vào hàng đợi. Vì tôi sẽ thu thập dữ liệu nhiều trang web, tôi muốn có nhiều nhà sản xuất/trình thu thập thông tin.

Người tiêu dùng/công nhân cung cấp hàng đợi này, đặt yêu cầu TCP/UDP cho các liên kết này và lưu kết quả vào DB Django của tôi. Tôi cũng muốn có nhiều công nhân vì mỗi mục xếp hàng hoàn toàn độc lập với nhau.

Mọi người đề xuất sử dụng thư viện coroutine cho Gevent hoặc Eventlet này. Chưa bao giờ làm việc với coroutines, tôi đọc rằng mặc dù mô hình lập trình tương tự như mô hình luồng, chỉ có một luồng đang hoạt động tích cực nhưng khi chặn cuộc gọi xảy ra - chẳng hạn như cuộc gọi I/O - ngăn xếp được chuyển vào bộ nhớ và màu xanh lá cây khác chuỗi tiếp quản cho đến khi nó gặp phải một số loại cuộc gọi I/O chặn. Hy vọng rằng tôi có quyền này? Dưới đây là các mã từ một trong những bài viết SO tôi:

import gevent 
from gevent.queue import * 
import time 
import random 

q = JoinableQueue() 
workers = [] 
producers = [] 


def do_work(wid, value): 
    gevent.sleep(random.randint(0,2)) 
    print 'Task', value, 'done', wid 


def worker(wid): 
    while True: 
     item = q.get() 
     try: 
      print "Got item %s" % item 
      do_work(wid, item) 
     finally: 
      print "No more items" 
      q.task_done() 


def producer(): 
    while True: 
     item = random.randint(1, 11) 
     if item == 10: 
      print "Signal Received" 
      return 
     else: 
      print "Added item %s" % item 
      q.put(item) 


for i in range(4): 
    workers.append(gevent.spawn(worker, random.randint(1, 100000))) 

# This doesn't work. 
for j in range(2): 
    producers.append(gevent.spawn(producer)) 

# Uncommenting this makes this script work. 
# producer() 

q.join() 

này hoạt động tốt vì sleep gọi là chặn cuộc gọi và khi một sự kiện xảy ra sleep, một chủ đề màu xanh lá cây tiếp quản. Điều này nhanh hơn rất nhiều so với thực thi tuần tự. Như bạn có thể thấy, tôi không có bất kỳ mã nào trong chương trình của tôi cố tình tạo ra một chuỗi cho một luồng khác. Tôi không thấy cách này phù hợp với kịch bản ở trên như tôi muốn có tất cả các chủ đề thực hiện đồng thời.

Tất cả hoạt động tốt, nhưng tôi cảm thấy thông lượng mà tôi đã đạt được bằng cách sử dụng Gevent/Eventlets cao hơn chương trình chạy tuần tự ban đầu nhưng thấp hơn đáng kể so với những gì có thể đạt được bằng cách sử dụng luồng thực.

Nếu tôi thực hiện lại chương trình của mình bằng cách sử dụng cơ chế luồng, mỗi nhà sản xuất và người tiêu dùng của tôi có thể hoạt động đồng thời mà không cần trao đổi ngăn xếp trong và ngoài như coroutines.

Điều này có nên được triển khai lại bằng luồng không? Thiết kế của tôi có sai không? Tôi đã không nhìn thấy những lợi ích thực sự của việc sử dụng coroutines.

Có lẽ các khái niệm của tôi hơi lầy lội nhưng đây là những gì tôi đã đồng hóa. Bất kỳ sự giúp đỡ hoặc làm rõ về mô hình và khái niệm của tôi sẽ là tuyệt vời.

Cảm ơn

+0

Tại sao không sử dụng nhiều quy trình? –

+0

Tôi không biết ưu và nhược điểm của đa luồng so với đa xử lý vì vậy tôi không biết liệu nó có ổn hay không. –

+1

không có thứ gì như "luồng thực" (chỉ có một chuỗi hệ điều hành thực hiện tại bất kỳ thời điểm nào) trong các chương trình Python mà không cần đến các phần mở rộng C (hoặc các quy trình OS nặng) vì Khóa thông dịch toàn cầu. –

Trả lời

5

Như bạn có thể thấy, tôi không có bất kỳ mã nào trong chương trình của mình có chủ đích cho phép thực hiện một chuỗi cho một chuỗi khác. Tôi không thấy cách điều này phù hợp với kịch bản ở trên vì tôi muốn có tất cả các chủ đề thi hành đồng thời.

Có một chuỗi hệ điều hành duy nhất nhưng một số greenlet. Trong trường hợp của bạn gevent.sleep() cho phép người lao động thực thi đồng thời. Chặn các cuộc gọi IO chẳng hạn như urllib2.urlopen(url).read() làm tương tự nếu bạn sử dụng urllib2 được vá để hoạt động với gevent (bằng cách gọi gevent.monkey.patch_*()).

Xem thêm A Curious Course on Coroutines and Concurrency để hiểu cách mã có thể hoạt động đồng thời trong môi trường luồng đơn.

Để so sánh sự khác biệt thông giữa gevent, luồng, đa xử bạn có thể viết mã mà tương thích với tất cả các aproaches:

#!/usr/bin/env python 
concurrency_impl = 'gevent' # single process, single thread 
##concurrency_impl = 'threading' # single process, multiple threads 
##concurrency_impl = 'multiprocessing' # multiple processes 

if concurrency_impl == 'gevent': 
    import gevent.monkey; gevent.monkey.patch_all() 

import logging 
import time 
import random 
from itertools import count, islice 

info = logging.info 

if concurrency_impl in ['gevent', 'threading']: 
    from Queue import Queue as JoinableQueue 
    from threading import Thread 
if concurrency_impl == 'multiprocessing': 
    from multiprocessing import Process as Thread, JoinableQueue 

Phần còn lại của kịch bản là như nhau cho tất cả các triển khai đồng thời:

def do_work(wid, value): 
    time.sleep(random.randint(0,2)) 
    info("%d Task %s done" % (wid, value)) 

def worker(wid, q): 
    while True: 
     item = q.get() 
     try: 
      info("%d Got item %s" % (wid, item)) 
      do_work(wid, item) 
     finally: 
      q.task_done() 
      info("%d Done item %s" % (wid, item)) 

def producer(pid, q): 
    for item in iter(lambda: random.randint(1, 11), 10): 
     time.sleep(.1) # simulate a green blocking call that yields control 
     info("%d Added item %s" % (pid, item)) 
     q.put(item) 
    info("%d Signal Received" % (pid,)) 

Không thực thi mã ở cấp mô-đun đặt ở số main():

def main(): 
    logging.basicConfig(level=logging.INFO, 
         format="%(asctime)s %(process)d %(message)s") 

    q = JoinableQueue() 
    it = count(1) 
    producers = [Thread(target=producer, args=(i, q)) for i in islice(it, 2)] 
    workers = [Thread(target=worker, args=(i, q)) for i in islice(it, 4)] 
    for t in producers+workers: 
     t.daemon = True 
     t.start() 

    for t in producers: t.join() # put items in the queue 
    q.join() # wait while it is empty 
    # exit main thread (daemon workers die at this point) 

if __name__=="__main__":  
    main() 
+0

Hi Sebastian, tôi đã xem xét mã của tôi và thấy rằng các nhà sản xuất và người tiêu dùng của tôi đang làm việc cùng một lúc. Khi một hoạt động chặn xảy ra ở một trong các greenlet của tôi, nó sẽ kiểm soát các greenlet khác. Tôi đã thêm cuộc gọi thiếu 'monkey_patch' để mô-đun ổ cắm không bị chặn nhưng tôi không thể lấy đủ bộ xử lý của mình. Một máy tính thông thường có đủ nước để có nhiều kết nối đồng thời và nhiều greenlet hơn nhưng tôi không có đủ tốc độ. Tôi rất lạc lõng và bối rối vì sao nó không sử dụng nhiều bộ xử lý hơn và hoạt động nhanh hơn. Bạn có thể giúp tôi hiểu không? Tôi rất lạc. Cảm ơn. –

+0

@Mridang Agarwalla: Tôi đã nhận xét về mã mà bạn đã đăng trong câu hỏi của mình. 'producer' * không * làm việc đồng thời trong đó. – jfs

+1

@Mridang Agarwalla: nếu vấn đề của bạn là IO bị ràng buộc (đĩa, mạng) thì không quan trọng CPU của bạn nhanh như thế nào, nếu bạn có thể ghi vào đĩa chỉ với 50MB/s thì không quan trọng CPU của bạn có thể xử lý 1GB/s. Ngoài ra, chương trình của bạn có thể tiêu thụ các tài nguyên hữu hạn khác như số lượng tệp mở. Nếu bạn sử dụng 'gevent' đảm bảo rằng tất cả các cuộc gọi chặn là" xanh lục "nghĩa là, chúng không chặn, ví dụ: trình điều khiển cơ sở dữ liệu của bạn có thể không tương thích với' gevent'. – jfs

1

gevent là tuyệt vời khi bạn có rất nhiều chủ đề (màu lục). Tôi đã thử nghiệm nó với hàng ngàn và nó hoạt động rất tốt. bạn phải chắc chắn rằng tất cả các thư viện mà bạn sử dụng cho cả việc cạo và để lưu vào db sẽ có màu xanh lá cây. afaik nếu họ sử dụng socket của python, gevent injection nên làm việc. tuy nhiên, các tiện ích mở rộng được viết bằng C (ví dụ: mysqldb) sẽ chặn và bạn cần phải sử dụng các tương đương màu xanh lá cây thay thế.

nếu bạn sử dụng gevent bạn chủ yếu có thể làm đi với hàng đợi, đẻ trứng mới (màu xanh lá cây) thread cho mỗi nhiệm vụ, mã cho các thread được đơn giản như db.save(web.get(address)). gevent sẽ chăm sóc việc sử dụng khi một số thư viện trong khối db hoặc web. nó sẽ hoạt động miễn là nhiệm vụ của bạn phù hợp với bộ nhớ.

0

Trong trường hợp này, sự cố của bạn không có tốc độ chương trình (nghĩa là lựa chọn gevent hoặc luồng), nhưng thông lượng IO của mạng. Đó là (nên là) nút cổ chai xác định chương trình chạy nhanh như thế nào.

Gevent là một cách hay để đảm bảo rằng nút cổ chai chứ không phải là kiến ​​trúc của chương trình.

Đây là loại quá trình bạn muốn:

import gevent 
from gevent.queue import Queue, JoinableQueue 
from gevent.monkey import patch_all 


patch_all() # Patch urllib2, etc 


def worker(work_queue, output_queue): 
    for work_unit in work_queue: 
     finished = do_work(work_unit) 
     output_queue.put(finished) 
     work_queue.task_done() 


def producer(input_queue, work_queue): 
    for url in input_queue: 
     url_list = crawl(url) 
     for work in url_list: 
      work_queue.put(work) 
     input_queue.task_done() 


def do_work(work): 
    gevent.sleep(0) # Actually proces link here 
    return work 


def crawl(url): 
    gevent.sleep(0) 
    return list(url) # Actually process url here 

input = JoinableQueue() 
work = JoinableQueue() 
output = Queue() 

workers = [gevent.spawn(worker, work, output) for i in range(0, 10)] 
producers = [gevent.spawn(producer, input, work) for i in range(0, 10)] 


list_of_urls = ['foo', 'bar'] 

for url in list_of_urls: 
    input.put(url) 

# Wait for input to finish processing 
input.join() 
print 'finished producing' 
# Wait for workers to finish processing work 
work.join() 
print 'finished working' 

# We now have output! 
print 'output:' 
for message in output: 
    print message 
# Or if you'd like, you could use the output as it comes! 

Bạn không cần phải chờ đợi cho đầu vào và công việc hàng đợi để kết thúc, tôi vừa chứng minh rằng ở đây.

Các vấn đề liên quan