2011-09-24 33 views
5

Tôi hoàn toàn mới để đa xử lý. Tôi đã đọc tài liệu về mô-đun đa xử lý. Tôi đọc về hồ bơi, chủ đề, hàng đợi, vv nhưng tôi hoàn toàn bị mất.Chiến lược nào để sử dụng với đa xử lý trong python

Điều tôi muốn làm với đa xử lý là, chuyển đổi trình tải xuống http khiêm tốn của tôi, để làm việc với nhiều người lao động. Những gì tôi đang làm hiện tại là tải xuống một trang, phân tích trang để nhận các liên kết thú vị. Tiếp tục cho đến khi tất cả các liên kết thú vị được tải xuống. Bây giờ, tôi muốn thực hiện điều này với đa xử lý. Nhưng tôi không có ý tưởng tại thời điểm này, làm thế nào để tổ chức dòng chảy công việc này. Tôi đã có hai suy nghĩ về điều này. Thứ nhất, tôi nghĩ về việc có hai hàng đợi. Một hàng đợi cho các liên kết cần được tải xuống, khác để liên kết được phân tích cú pháp. Một công nhân, tải xuống các trang và thêm chúng vào hàng đợi cho các mục cần được phân tích cú pháp. Và quá trình khác phân tích cú pháp một trang và thêm các liên kết mà nó thấy thú vị vào hàng đợi khác. Các vấn đề tôi mong đợi từ phương pháp này là; trước hết, tại sao phải tải xuống một trang tại một thời điểm và phân tích một trang tại một thời điểm. Hơn nữa, làm thế nào để một quá trình biết rằng có các mục được thêm vào hàng đợi sau này, sau khi nó cạn kiệt tất cả các mục từ hàng đợi.

Một cách tiếp cận khác mà tôi đã nghĩ đến khi sử dụng là điều đó. Có một hàm, có thể được gọi với một url làm đối số. Hàm này tải xuống tài liệu và bắt đầu phân tích cú pháp nó cho các liên kết. Mỗi khi nó gặp một liên kết thú vị, nó ngay lập tức tạo ra một luồng mới chạy hàm giống hệt như chính nó. Vấn đề tôi có với cách tiếp cận này là, làm thế nào để tôi theo dõi tất cả các quá trình sinh ra xung quanh, làm cách nào để biết liệu vẫn còn các tiến trình để chạy hay không. Ngoài ra, làm cách nào để hạn chế số lượng quy trình tối đa.

Vì vậy, tôi hoàn toàn bị mất. Bất cứ ai có thể đề xuất một chiến lược tốt, và có lẽ hiển thị một số mã ví dụ về làm thế nào để đi với ý tưởng.

+0

Điều này đã được thảo luận ở một số độ sâu [trước đây] (http://stackoverflow.com/questions/731993/multiprocessing-or-multithreading) – brc

+0

Tôi có thể đề nghị bạn xem thư viện eventlet không? Bạn có thể thấy nó phù hợp với mục đích của bạn tốt hơn sau đó sử dụng đa xử lý. –

Trả lời

3

Dưới đây là một cách tiếp cận, sử dụng đa xử lý. (Rất cám ơn @Voo, vì đã đề xuất nhiều cải tiến cho mã).

import multiprocessing as mp 
import logging 
import Queue 
import time 

logger=mp.log_to_stderr(logging.DEBUG) # or, 
# logger=mp.log_to_stderr(logging.WARN) # uncomment this to silence debug and info messages 

def worker(url_queue,seen): 
    while True: 
     url=url_queue.get() 
     if url not in seen: 
      logger.info('downloading {u}'.format(u=url)) 
      seen[url]=True 
      # Replace this with code to dowload url 
      # urllib2.open(...) 
      time.sleep(0.5) 
      content=url 
      logger.debug('parsing {c}'.format(c=content)) 
      # replace this with code that finds interesting links and 
      # puts them in url_queue 
      for i in range(3): 
       if content<5: 
        u=2*content+i-1 
        logger.debug('adding {u} to url_queue'.format(u=u)) 
        time.sleep(0.5) 
        url_queue.put(u) 
     else: 
      logger.debug('skipping {u}; seen before'.format(u=url)) 
     url_queue.task_done() 

if __name__=='__main__': 
    num_workers=4 
    url_queue=mp.JoinableQueue() 
    manager=mp.Manager() 
    seen=manager.dict() 

    # prime the url queue with at least one url 
    url_queue.put(1) 
    downloaders=[mp.Process(target=worker,args=(url_queue,seen)) 
       for i in range(num_workers)] 
    for p in downloaders: 
     p.daemon=True 
     p.start() 
    url_queue.join() 
  • Một vũng (4) quá trình lao động được tạo ra.
  • Có một số JoinableQueue, được gọi là url_queue.
  • Mỗi nhân viên nhận một url từ url_queue, tìm url mới và thêm chúng vào url_queue.
  • Chỉ sau khi thêm các mục mới, nó gọi là url_queue.task_done().
  • Quy trình chính gọi url_queue.join(). Việc này chặn quy trình chính cho đến khi task_done được gọi cho mọi công việc trong url_queue.
  • Vì quy trình công nhân có thuộc tính daemon được đặt thành True, chúng cũng kết thúc khi quá trình chính kết thúc.

Tất cả các thành phần được sử dụng trong ví dụ này cũng được giải thích trong Doug Hellman's excellent Python Module of the Week tutorial on multiprocessing.

+0

Cá nhân tôi đi mà không cần cài đặt daemon và kết thúc quá trình thường xuyên, nhưng nếu bạn làm, bạn thực sự phải tham gia với trình phân tích cú pháp chứ không phải trình tải xuống, vì nếu không thì có khả năng một số tệp sẽ được tải xuống nhưng không bao giờ được phân tích cú pháp. Và điều đó rõ ràng làm cho nó phức tạp hơn một chút - vì vậy tôi chỉ cần thêm giá trị sentinel vào hàng đợi để các quá trình biết khi nào không có dữ liệu nào nữa. Nó có lẽ tốt hơn từ một cân bằng tải pov (và làm cho logic đơn giản hơn) để chỉ có một hàng đợi/xử lý hồ bơi. – Voo

+0

@Voo: làm cách nào bạn biết khi nào cần thêm một hàng gửi vào hàng đợi? – unutbu

+0

Giải pháp của tôi cho điều này: Sử dụng 'JoinableQueue'. Chủ đề chính tạo ra nhóm xử lý, đặt các nhiệm vụ bắt đầu vào hàng đợi, bắt đầu quá trình và gia nhập hàng đợi. Vòng lặp cho mỗi quá trình làm việc: nhận được công việc, nếu công việc là một sentinel, nghỉ ngơi. Nếu không, hãy thực hiện công việc, đặt các công việc mới vào hàng đợi, vv Cuối cùng gọi 'task_done' và lặp lại. Chủ đề chính bị chặn cho đến khi tất cả các công việc được hoàn thành, khi nó unblocks nó đặt nrProcesses Sentinels vào hàng đợi và chúng ta đã hoàn thành. – Voo

1

Nội dung bạn mô tả về bản chất là truyền tải đồ thị; Hầu hết các thuật toán truyền tải đồ thị (Độ phức tạp hơn độ sâu đầu tiên), theo dõi hai bộ các nút, trong trường hợp của bạn, các nút là url.

Tập đầu tiên được gọi là "bộ đóng" và đại diện cho tất cả các nút đã được truy cập và xử lý.Nếu, trong khi bạn đang xử lý một trang, bạn tìm thấy một liên kết xảy ra trong tập hợp đã đóng, bạn có thể bỏ qua nó, nó đã được xử lý.

Tập thứ hai không được ngạc nhiên gọi là "bộ mở" và bao gồm tất cả các cạnh đã được tìm thấy, nhưng chưa được xử lý.

Cơ chế cơ bản là bắt đầu bằng cách đặt nút gốc vào bộ mở (bộ đóng lúc đầu trống, chưa có nút nào được xử lý) và bắt đầu làm việc. Mỗi công nhân nhận một nút đơn từ bộ mở, sao chép nó vào tập hợp đóng, xử lý nút và thêm bất kỳ nút nào mà nó phát hiện trở lại bộ mở (miễn là chúng chưa có trong bộ mở hoặc đóng) . Khi bộ mở đang trống, (và không có nhân viên nào vẫn đang xử lý các nút), biểu đồ đã được hoàn toàn đi ngang.

Thực tế việc này trong multiprocessing có thể có nghĩa là bạn sẽ có một nhiệm vụ chính để theo dõi các bộ mở và đóng; Nếu một nhân viên trong một nhóm công nhân chỉ ra rằng nó đã sẵn sàng cho công việc, thì nhân viên chủ sẽ quan tâm đến việc di chuyển nút từ bộ mở đến tập hợp đã đóng và khởi động công nhân đó. sau đó, người lao động có thể vượt qua tất cả của các nút họ tìm thấy mà không phải lo lắng về việc liệu họ đã đóng hay chưa, quay lại trang cái; và người chủ sẽ bỏ qua các nút đã bị đóng.

Các vấn đề liên quan