2009-11-24 35 views
8

Mới bắt đầu với python và thực hiện một số bước đầu với threading - đang thực hiện một số chuyển đổi tệp nhạc và muốn có thể sử dụng nhiều lõi trên máy của tôi (một chuỗi chuyển đổi hoạt động trên mỗi lõi).Làm cách nào để hạn chế số lượng chuỗi hoạt động trong python?

class EncodeThread(threading.Thread): 
    # this is hacked together a bit, but should give you an idea 
    def run(self): 
     decode = subprocess.Popen(["flac","--decode","--stdout",self.src], 
          stdout=subprocess.PIPE) 
     encode = subprocess.Popen(["lame","--quiet","-",self.dest], 
           stdin=decode.stdout) 
     encode.communicate() 

# some other code puts these threads with various src/dest pairs in a list 

for proc in threads: # `threads` is my list of `threading.Thread` objects 
    proc.start() 

Mọi thứ hoạt động, tất cả các tệp được mã hóa, bravo! ... tuy nhiên, tất cả các quy trình đều xuất hiện ngay lập tức, nhưng tôi chỉ muốn chạy hai lần tại một thời điểm (một cho mỗi lõi). Ngay sau khi kết thúc, tôi muốn nó chuyển sang danh sách tiếp theo trong danh sách cho đến khi nó kết thúc, sau đó tiếp tục với chương trình.

Làm cách nào để thực hiện việc này?

(Tôi đã xem xét các chức năng bơi thread và hàng đợi nhưng tôi không thể tìm thấy một câu trả lời đơn giản.)

Edit: có lẽ tôi nên thêm rằng mỗi chủ đề của tôi đang sử dụng để chạy một subprocess.Popen dòng lệnh riêng biệt bộ giải mã (flac) được truyền tới đường truyền được đưa vào dòng lệnh bộ mã hóa (lame/mp3).

+0

Tại sao phải bận tâm? Có gì sai khi để chủ đề của bạn cạnh tranh với nhau? Nó nhanh hơn để có mỗi lõi triệt để bão hòa với công việc. –

+0

Vâng, tôi đoán tôi đã không nghĩ về nó như vậy ... với một thư viện âm nhạc của hơn 2.000 tập tin tôi nghĩ rằng sinh sản (đồng thời) 2.000 giải mã quá trình (flac) đường ống đến 2.000 quá trình mã hóa (lame) cùng một lúc sẽ là phụ tối ưu. Liệu tôi có sai? – thornomad

+0

@thornomad: Vâng, bạn đã sai. Hạn chế chính xác 2 quy trình vì bạn có 2 lõi sai. Một quá trình sẽ không bão hòa một lõi với công việc. Ngay cả một đường ống dẫn ba phần có thể có đủ I/O mà cốt lõi không bị chiếm đóng hoàn toàn. –

Trả lời

4

"Mỗi chủ đề của tôi đang sử dụng subprocess.Popen để chạy một dòng lệnh riêng biệt [quy trình]".

Tại sao có một loạt chuỗi quản lý một loạt các quy trình? Đó chính là điều mà một hệ điều hành làm cho bạn. Tại sao vi quản lý những gì hệ điều hành đã quản lý?

Thay vì đánh lừa xung quanh với các chủ đề giám sát quy trình, chỉ cần tắt các quy trình. Bảng xử lý của bạn có lẽ không thể xử lý 2000 quy trình, nhưng nó có thể xử lý vài chục (có thể vài trăm) khá dễ dàng.

Bạn muốn có nhiều hơn hoạt động hơn CPU của bạn có thể xử lý được hàng đợi. Câu hỏi thực sự là một trong những bộ nhớ - không phải là quy trình hoặc chủ đề. Nếu tổng của tất cả các dữ liệu hoạt động cho tất cả các quá trình vượt quá bộ nhớ vật lý, sau đó dữ liệu phải được hoán đổi, và điều đó sẽ làm chậm bạn xuống.

Nếu quy trình của bạn có dung lượng bộ nhớ khá nhỏ, bạn có thể có rất nhiều và nhiều thao tác chạy. Nếu các quy trình của bạn có dung lượng bộ nhớ lớn, bạn không thể chạy nhiều.

+0

Heh. Tôi thấy bây giờ tomfoolerly của tôi hacked cùng nhau tiếp cận - nó là một chút dư thừa. Vì vậy, là có một cách với subprocess để quản lý một "hồ bơi" (như những người khác đã đề nghị). Cảm ơn vì đầu vào của bạn. Học như tôi đi ... nó sẽ chỉ là một vấn đề của việc sử dụng 'subprocess.poll()' để xem những gì được thực hiện và những gì vẫn đang chạy? Cảm ơn một lần nữa. – thornomad

+0

Chính xác. Bạn có thể sử dụng một bộ quy trình đơn giản; loại bỏ những người đã hoàn thành. Thêm những cái và giữ kích thước của bộ dưới một số giới hạn. Nó chỉ là một tập hợp với 'add' và' remove'. –

1

Nếu bạn đang sử dụng phiên bản "cpython" mặc định thì điều này sẽ không giúp bạn, bởi vì chỉ có một chuỗi có thể thực hiện tại một thời điểm; tra cứu Global Interpreter Lock. Thay vào đó, tôi khuyên bạn nên xem xét multiprocessingmodule bằng Python 2.6 - nó làm cho việc lập trình song song trở nên cinch. Bạn có thể tạo đối tượng Pool với các quy trình 2*num_threads và cung cấp cho bạn một loạt nhiệm vụ cần thực hiện. Nó sẽ thực hiện lên đến 2*num_threads tác vụ cùng một lúc, cho đến khi tất cả được thực hiện. Tại nơi làm việc, gần đây tôi đã di chuyển một loạt các công cụ Python XML (một bộ biến đổi xpath, xpath và xslt khác) để sử dụng, và có kết quả rất tốt với hai tiến trình trên mỗi bộ xử lý.

+1

Nếu các quy trình con của bạn sẽ thực hiện các chức năng trong mã Python của bạn thì mô-đun đa xử lý rất tuyệt. Nếu bạn đang gọi một chương trình bên ngoài thì mô-đun này sẽ không cung cấp một lợi thế hơn mô-đun subprocess ... bởi vì các chương trình bên ngoài đó sẽ không có bất kỳ phương tiện trả về kết quả của chúng cho phụ huynh khác ngoài tệp tạm thời hoặc đường ống vv Các lợi thế IPC lớn của mô-đun đa xử lý bị mất trên các chương trình bên ngoài mà bạn muốn thực hiện. (Có mỗi quá trình trong một tiến trình xử lý cuộc gọi đa xử lý âm thanh khá ngớ ngẩn, ví dụ). –

0

Tôi không phải là chuyên gia trong lĩnh vực này, nhưng tôi đã đọc nội dung về "Khóa". This article có thể giúp bạn ra

Hope this helps

1

Dường với tôi rằng những gì bạn muốn là một hồ bơi của một số loại, và trong hồ bơi mà bạn muốn đều có n luồng trong đó n == số bộ xử lý trên hệ thống của bạn. Sau đó, bạn sẽ có một luồng khác mà chỉ có công việc là nạp các công việc vào hàng đợi mà các chuỗi công nhân có thể nhận và xử lý khi chúng trở nên miễn phí (vì vậy đối với một máy mã kép, bạn có ba luồng nhưng luồng chính sẽ làm rất ít).

Vì bạn mới sử dụng Python mặc dù tôi cho rằng bạn không biết về số GIL và các tác dụng phụ của nó đối với luồng. Nếu bạn đọc bài viết tôi liên kết bạn sẽ sớm hiểu tại sao các giải pháp đa luồng truyền thống không phải lúc nào cũng tốt nhất trong thế giới Python. Thay vào đó, bạn nên cân nhắc sử dụng mô-đun multiprocessing (mới trong Python 2.6, trong 2.5 bạn có thể use this backport) để đạt được hiệu quả tương tự. Nó phụ bước vấn đề của GIL bằng cách sử dụng nhiều quy trình như thể chúng là các chủ đề trong cùng một ứng dụng. Có một số hạn chế về cách bạn chia sẻ dữ liệu (bạn đang làm việc trong các không gian bộ nhớ khác nhau) nhưng thực sự không phải là điều xấu: chúng chỉ khuyến khích thực hành tốt như giảm thiểu các điểm tiếp xúc giữa các luồng (hoặc các quy trình trong trường hợp này).

Trong trường hợp của bạn, có thể bạn đang sử dụng một hồ bơi theo quy định here.

+0

Cảm ơn - Tôi sẽ xem xét đa xử lý ... và tôi đã chỉnh sửa câu hỏi của mình để có chi tiết hơn một chút ... có vẻ như subprocess.Popen không phân loại và tự làm điều đó. – thornomad

+0

Mô-đun đa xử lý, BTW, là một bổ sung tuyệt vời cho 2.6 (từ mô-đun bên thứ ba xử lý hỗ trợ 2.4 và 2.5). Tuy nhiên, không phù hợp với việc chạy các chương trình bên ngoài. Những ưu điểm chính của mô đun đa xử lý nằm trong các cách mà nó được mô hình hoá sau khi hỗ trợ luồng. Bạn có thể tạo Queue() s làm cơ chế liên lạc chính (thread/process) để loại bỏ hầu hết sự cần thiết cho khóa rõ ràng của riêng bạn. (Queue() s cung cấp hỗ trợ mạch lạc cho nhiều nhà sản xuất và người tiêu dùng của các đối tượng tùy ý). Tuyệt vời nếu các con chạy mã Python. –

1

Câu trả lời ngắn: không sử dụng chuỗi.

Ví dụ làm việc, bạn có thể xem một số thứ tôi vừa mới ném cùng nhau tại nơi làm việc. Đó là một trình bao bọc nhỏ xung quanh ssh chạy một số cấu hình là Popen() các quy trình con. Tôi đã đăng nó tại: Bitbucket: classh (Cluster Admin's ssh Wrapper).

Như đã lưu ý, tôi không sử dụng chuỗi; Tôi chỉ sinh ra những đứa trẻ, lặp lại chúng gọi các phương thức .poll() của chúng và kiểm tra thời gian chờ (cũng có thể cấu hình được) và bổ sung hồ bơi khi tôi thu thập kết quả. Tôi đã chơi với các giá trị sleep() khác nhau và trước đây tôi đã viết phiên bản (trước khi mô-đun subprocess được thêm vào Python) sử dụng mô-đun tín hiệu (SIGCHLD và SIGALRM) và os.fork()os.execve() chức năng --- mà trên đường ống của tôi và mô tả đường ống dẫn nước, vv).

Trong trường hợp của tôi, tôi đang in kết quả khi tôi thu thập chúng ... và ghi nhớ tất cả chúng để tóm tắt ở cuối (khi tất cả các công việc đã hoàn thành hoặc bị giết vì vượt quá thời gian chờ).

Tôi chạy, như được đăng, trên danh sách 25.000 máy chủ nội bộ (nhiều trong số đó đã ngừng hoạt động, đã nghỉ hưu, được đặt quốc tế, không thể truy cập vào tài khoản thử nghiệm của tôi, v.v.). Nó hoàn thành công việc chỉ trong hơn hai giờ và không có vấn đề gì. (Có khoảng 60 trong số đó là thời gian chờ do các hệ thống ở trạng thái thoái hóa/đập vỡ - chứng minh rằng việc xử lý thời gian chờ của tôi hoạt động chính xác).

Vì vậy, tôi biết mô hình này hoạt động đáng tin cậy. Chạy 100 quy trình ssh hiện tại với mã này dường như không gây ra bất kỳ tác động đáng chú ý nào. (Đó là một hộp FreeBSD cũ vừa phải). Tôi đã từng chạy phiên bản cũ (pre- subprocess) với 100 quy trình đồng thời trên máy tính xách tay 512MB cũ của tôi mà không gặp sự cố).

(BTW: Tôi dự định dọn dẹp và thêm các tính năng vào nó; cảm thấy tự do để đóng góp hoặc sao chép chi nhánh của riêng bạn; đó là những gì Bitbucket.org dành cho).

+0

Cảm ơn - Hôm nay tôi sẽ xem xét kỹ hơn một chút. Tôi đã nhanh chóng đưa ra một tập hợp khá đơn giản trong khi các vòng lặp dường như chỉ hoạt động khi kiểm tra phương thức 'p.communicate()'. (PS: Tôi nghĩ rằng bạn đang thiếu một '' '' 'đóng trên dòng 4 trong nguồn.) – thornomad

30

Nếu bạn muốn giới hạn số lượng các chủ đề song song, sử dụng một semaphore:

threadLimiter = threading.BoundedSemaphore(maximumNumberOfThreads) 

class EncodeThread(threading.Thread): 

    def run(self): 
     threadLimiter.acquire() 
     try: 
      <your code here> 
     finally: 
      threadLimiter.release() 

Bắt đầu tất cả các chủ đề cùng một lúc. Tất cả trừ maximumNumberOfThreads sẽ đợi trong threadLimiter.acquire() và chuỗi chờ sẽ chỉ tiếp tục khi một chuỗi khác đi qua threadLimiter.release().

+1

Điều này chính xác trả lời câu hỏi ban đầu. Tuyệt vời cho những người kết thúc ở đây tìm kiếm trên Google. –

Các vấn đề liên quan