2012-04-17 29 views
21

Có cách nào để chỉ định mỗi nhân viên trong một nhóm đa xử lý python một ID duy nhất theo cách mà công việc đang được điều hành bởi một nhân viên cụ thể trong hồ bơi có thể biết nhân viên nào đang chạy nó? Theo tài liệu, số Processname nhưngNhận một ID duy nhất cho nhân viên trong hồ đa xử lý python

Tên là chuỗi chỉ được sử dụng cho mục đích nhận dạng. Nó không có ngữ nghĩa. Nhiều quy trình có thể được đặt cùng tên.

Trường hợp sử dụng cụ thể của tôi, tôi muốn chạy một loạt công việc trên một nhóm bốn GPU và cần đặt số thiết bị cho GPU mà công việc sẽ chạy. Vì các công việc có độ dài không đồng đều, tôi muốn chắc chắn rằng tôi không có xung đột trên GPU của công việc đang cố gắng chạy trên GPU trước khi công việc trước đó hoàn thành (vì vậy điều này ngăn cản việc gán một ID cho đơn vị công việc trước thời hạn).

+1

Tại sao không sử dụng một cái gì đó ngẫu nhiên như uuid? –

+0

@LuperRouch - Bạn có thể mở rộng những gì bạn muốn nói không? – JoshAdel

+1

Ví dụ: 'process = Process (target = foo, name = uuid.uuid4(). Hex)' 'sẽ cung cấp tên duy nhất cho các process của bạn. –

Trả lời

38

Có vẻ như những gì bạn muốn đơn giản: multiprocessing.current_process(). Ví dụ:

import multiprocessing 

def f(x): 
    print multiprocessing.current_process() 
    return x * x 

p = multiprocessing.Pool() 
print p.map(f, range(6)) 

Output:

$ python foo.py 
<Process(PoolWorker-1, started daemon)> 
<Process(PoolWorker-2, started daemon)> 
<Process(PoolWorker-3, started daemon)> 
<Process(PoolWorker-1, started daemon)> 
<Process(PoolWorker-2, started daemon)> 
<Process(PoolWorker-4, started daemon)> 
[0, 1, 4, 9, 16, 25] 

này trả về đối tượng quá trình tự, vì vậy quá trình này có thể được bản sắc riêng của mình. Bạn cũng có thể gọi id trên đó để có một id số duy nhất - trong cpython, đây là địa chỉ bộ nhớ của đối tượng quá trình, vì vậy tôi không nghĩ rằng có khả năng chồng chéo nào đó. Cuối cùng, bạn có thể sử dụng thuộc tính ident hoặc thuộc tính pid của quy trình - nhưng điều đó chỉ được đặt khi quá trình được bắt đầu.

Hơn nữa, nhìn qua nguồn, có vẻ như với tôi rất có khả năng tên được tạo tự động (như được minh họa bằng giá trị đầu tiên trong chuỗi repr Process ở trên) là duy nhất. multiprocessing duy trì đối tượng itertools.counter cho mọi quá trình, được sử dụng để tạo ra một bộ nhái _identity cho bất kỳ quá trình con nào mà nó sinh ra. Vì vậy, quy trình cấp cao nhất tạo ra tiến trình con với các id giá trị đơn, và chúng sinh ra quá trình với các id hai giá trị, và cứ thế. Sau đó, nếu không có tên nào được chuyển đến hàm tạo Process, nó chỉ đơn giản là autogenerates the name dựa trên _identity, sử dụng ':'.join(...). Sau đó, Poolalters the name của quá trình sử dụng replace, để nguyên id được tạo tự động.

Kết quả cuối cùng của tất cả điều này là mặc dù hai Process es thể có cùng tên, bởi vì bạn có thể gán cùng tên với họ khi bạn tạo chúng, họ là duy nhất nếu bạn không chạm vào tên tham số. Ngoài ra, bạn về lý thuyết có thể sử dụng _identity làm định danh duy nhất; nhưng tôi thu thập họ đã biến biến đó thành riêng tư vì một lý do!

Một ví dụ ở trên trong hành động:

import multiprocessing 

def f(x): 
    created = multiprocessing.Process() 
    current = multiprocessing.current_process() 
    print 'running:', current.name, current._identity 
    print 'created:', created.name, created._identity 
    return x * x 

p = multiprocessing.Pool() 
print p.map(f, range(6)) 

Output:

$ python foo.py 
running: PoolWorker-1 (1,) 
created: Process-1:1 (1, 1) 
running: PoolWorker-2 (2,) 
created: Process-2:1 (2, 1) 
running: PoolWorker-3 (3,) 
created: Process-3:1 (3, 1) 
running: PoolWorker-1 (1,) 
created: Process-1:2 (1, 2) 
running: PoolWorker-2 (2,) 
created: Process-2:2 (2, 2) 
running: PoolWorker-4 (4,) 
created: Process-4:1 (4, 1) 
[0, 1, 4, 9, 16, 25] 
1

Bạn có thể sử dụng multiprocessing.Queue để lưu trữ các id và sau đó nhận được id lúc khởi tạo của quá trình hồ bơi.

Ưu điểm:

  • Bạn không cần phải dựa vào internals.
  • Nếu trường hợp sử dụng của bạn là quản lý tài nguyên/thiết bị thì bạn có thể đặt trực tiếp vào số thiết bị. Điều này cũng sẽ đảm bảo rằng không có thiết bị nào được sử dụng hai lần: Nếu bạn có nhiều quy trình trong nhóm của mình hơn thiết bị, các quy trình bổ sung sẽ chặn trên queue.get() và sẽ không thực hiện bất kỳ công việc nào (Điều này sẽ không chặn sơ đồ của bạn hoặc ít nhất nó không khi tôi thử nghiệm).

Nhược điểm:

  • Bạn có overhead giao tiếp bổ sung và đẻ trứng hồ quá trình mất một chút nhỏ dài hơn: Nếu không có sự sleep(1) trong ví dụ tất cả công việc có thể được thực hiện theo quy trình đầu tiên, như những người khác chưa được khởi tạo xong.
  • Bạn cần một toàn cầu (hoặc ít nhất tôi không biết một con đường xung quanh nó)

Ví dụ:

import multiprocessing 
from time import sleep 

def init(queue): 
    global idx 
    idx = queue.get() 

def f(x): 
    global idx 
    process = multiprocessing.current_process() 
    sleep(1) 
    return (idx, process.pid, x * x) 

ids = [0, 1, 2, 3] 
manager = multiprocessing.Manager() 
idQueue = manager.Queue() 

for i in ids: 
    idQueue.put(i) 

p = multiprocessing.Pool(8, init, (idQueue,)) 
print(p.map(f, range(8))) 

Output:

[(0, 8289, 0), (1, 8290, 1), (2, 8294, 4), (3, 8291, 9), (0, 8289, 16), (1, 8290, 25), (2, 8294, 36), (3, 8291, 49)] 

Lưu ý, rằng có chỉ có 4 pid khác nhau, mặc dù hồ bơi chứa 8 quy trình và một idx chỉ được sử dụng bởi một quy trình.

0

Tôi đã làm điều này với luồng và kết thúc bằng cách sử dụng a queue để xử lý công việc quản lý. Đây là đường cơ sở. Phiên bản hoàn chỉnh của tôi có một loạt các try-catches (đặc biệt là trong công nhân, để đảm bảo rằng q.task_done() được gọi là ngay cả trên thất bại).

from threading import Thread 
from queue import Queue 
import time 
import random 


def run(idx, *args): 
    time.sleep(random.random() * 1) 
    print idx, ':', args 


def run_jobs(jobs, workers=1): 
    q = Queue() 
    def worker(idx): 
     while True: 
      args = q.get() 
      run(idx, *args) 
      q.task_done() 

    for job in jobs: 
     q.put(job) 

    for i in range(0, workers): 
     t = Thread(target=worker, args=[i]) 
     t.daemon = True 
     t.start() 

    q.join() 


if __name__ == "__main__": 
    run_jobs([('job', i) for i in range(0,10)], workers=5) 

Tôi không cần phải sử dụng đa xử lý (nhân viên của tôi chỉ để gọi một quy trình bên ngoài), nhưng điều này có thể được gia hạn. API cho đa thay đổi nó một cảm ứng, dưới đây là cách bạn có thể thích ứng:

from multiprocessing import Process, Queue 
from Queue import Empty 
import time 
import random 

def run(idx, *args): 
    time.sleep(random.random() * i) 
    print idx, ':', args 


def run_jobs(jobs, workers=1): 
    q = Queue() 
    def worker(idx): 
     try: 
      while True: 
       args = q.get(timeout=1) 
       run(idx, *args) 
     except Empty: 
      return 

    for job in jobs: 
     q.put(job) 

    processes = [] 
    for i in range(0, workers): 
     p = Process(target=worker, args=[i]) 
     p.daemon = True 
     p.start() 
     processes.append(p) 

    for p in processes: 
     p.join() 


if __name__ == "__main__": 
    run_jobs([('job', i) for i in range(0,10)], workers=5) 

Cả hai phiên bản sẽ đưa ra một cái gì đó như:

0 : ('job', 0) 
1 : ('job', 2) 
1 : ('job', 6) 
3 : ('job', 3) 
0 : ('job', 5) 
1 : ('job', 7) 
2 : ('job', 1) 
4 : ('job', 4) 
3 : ('job', 8) 
0 : ('job', 9) 
Các vấn đề liên quan