2015-06-03 13 views
7

Trong Python (2.7) Tôi cố gắng tạo ra các quy trình (với đa) trong một nhiệm vụ cần tây (celery 3.1.17) nhưng nó mang lại cho các lỗi:cần tây: ma quỉ quá trình không được phép có con

daemonic processes are not allowed to have children 

Googling nó, tôi thấy rằng phiên bản mới nhất của bi-a sửa chữa "lỗi" nhưng tôi có phiên bản mới nhất (3.3.0.20) và lỗi vẫn còn xảy ra. Tôi cũng đã cố gắng thực hiện this workaround trong nhiệm vụ cần tây của mình nhưng nó cho cùng một lỗi.

Có ai biết cách làm điều đó không? Any help is appreciated, Patrick

EDIT: đoạn mã

Nhiệm vụ:

from __future__ import absolute_import 
from celery import shared_task 
from embedder.models import Embedder 

@shared_task 
def embedder_update_task(embedder_id): 
    embedder = Embedder.objects.get(pk=embedder_id) 
    embedder.test() 

Artificial chức năng kiểm tra (from here):

def sleepawhile(t): 
    print("Sleeping %i seconds..." % t) 
    time.sleep(t) 
    return t  

def work(num_procs): 
    print("Creating %i (daemon) workers and jobs in child." % num_procs) 
    pool = mp.Pool(num_procs) 

    result = pool.map(sleepawhile, 
     [randint(1, 5) for x in range(num_procs)]) 

    # The following is not really needed, since the (daemon) workers of the 
    # child's pool are killed when the child is terminated, but it's good 
    # practice to cleanup after ourselves anyway. 
    pool.close() 
    pool.join() 
    return result 

def test(self): 
    print("Creating 5 (non-daemon) workers and jobs in main process.") 
    pool = MyPool(5) 

    result = pool.map(work, [randint(1, 5) for x in range(5)]) 

    pool.close() 
    pool.join() 
    print(result) 

My thực chức năng:

import mulitprocessing as mp 

def test(self): 
    self.init() 
    for saveindex in range(self.start_index,self.start_index+self.nsaves): 
     self.create_storage(saveindex) 
     # process creation: 
     procs = [mp.Process(name="Process-"+str(i),target=getattr(self,self.training_method),args=(saveindex,)) for i in range(self.nproc)] 
     for p in procs: p.start() 
     for p in procs: p.join() 
    print "End of task" 

Các hàm init định nghĩa một mảng đa và một đối tượng mà chia sẻ bộ nhớ tương tự để tất cả các quá trình của tôi có thể cập nhật mảng này cùng một lúc cùng một lúc:

mp_arr = mp.Array(c.c_double, np.random.rand(1000000)) # example 
self.V = numpy.frombuffer(mp_arr.get_obj()) #all the processes can update V 

Lỗi phát sinh khi công việc được gọi là :

[2015-06-04 09:47:46,659: INFO/MainProcess] Received task: embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda] 
[2015-06-04 09:47:47,674: WARNING/Worker-5] Creating 5 (non-daemon) workers and jobs in main process. 
[2015-06-04 09:47:47,789: ERROR/MainProcess] Task embedder.tasks.embedder_update_task[09f8abae-649a-4abc-8381-bdf258d33dda]  raised unexpected: AssertionError('daemonic processes are not allowed to have children',) 
Traceback (most recent call last): 
    File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 240, in trace_task 
    R = retval = fun(*args, **kwargs) 
    File "/usr/local/lib/python2.7/dist-packages/celery/app/trace.py", line 438, in __protected_call__ 
    return self.run(*args, **kwargs) 
    File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/tasks.py", line 21, in embedder_update_task 
    embedder.test() 
    File "/home/patrick/django/entite-tracker-master/entitetracker/embedder/models.py", line 475, in test 
    pool = MyPool(5) 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 159, in __init__ 
self._repopulate_pool() 
    File "/usr/lib/python2.7/multiprocessing/pool.py", line 223, in _repopulate_pool 
    w.start() 
    File "/usr/lib/python2.7/multiprocessing/process.py", line 124, in start 
'daemonic processes are not allowed to have children' 
AssertionError: daemonic processes are not allowed to have children 
+0

vui lòng cập nhật câu hỏi của bạn với đoạn mã gây ra ngoại lệ và ful l ngoại lệ. – scytale

+0

Xong. Cảm ơn bạn. – Patrick

+0

Đã thêm mã thực của tôi (trái ngược với ký tự * nhân tạo *). Cảm ơn bạn scytale cho sự giúp đỡ của bạn, nó rất đánh giá cao. – Patrick

Trả lời

5

billiardmultiprocessing là thư viện khác nhau - billiard là nĩa riêng Cần tây dự án của multiprocessing. Bạn sẽ cần phải nhập billiard và sử dụng nó thay vì multiprocessing

Tuy nhiên, câu trả lời tốt hơn có thể là bạn nên thay thế bằng cách sử dụng hai cách phân phối tác phẩm của bạn.

Bạn có thể làm điều này bằng cần tây canvas

from celery import group 

@app.task 
def sleepawhile(t): 
    print("Sleeping %i seconds..." % t) 
    time.sleep(t) 
    return t  

def work(num_procs): 
    return group(sleepawhile.s(randint(1, 5)) for x in range(num_procs)]) 

def test(self): 
    my_group = group(work(randint(1, 5)) for x in range(5)) 
    result = my_group.apply_async() 
    result.get() 

Tôi đã cố gắng để tạo ra một phiên bản làm việc của mã của bạn có sử dụng nguyên thủy vải thay vì đa xử lý. Tuy nhiên, vì ví dụ của bạn khá giả tạo nên không dễ để nghĩ ra điều gì đó có ý nghĩa.

Cập nhật:

Dưới đây là bản dịch của mã thực sự của bạn có sử dụng vải Cần tây:

tasks.py:

@shared_task 
run_training_method(saveindex, embedder_id): 
    embedder = Embedder.objects.get(pk=embedder_id) 
    embedder.training_method(saveindex) 

models.py:

from tasks import run_training_method 
from celery import group 

class Embedder(Model): 

    def embedder_update_task(self): 
     my_group = [] 

     for saveindex in range(self.start_index, self.start_index + self.nsaves): 
      self.create_storage(saveindex) 
      group.extend([run_training_method.subtask((saveindex, self.id)) 
         for i in range(self.nproc)]) 

     result = group(my_group).apply_async() 
+0

Cảm ơn bạn scytale, tôi sẽ thử nó! Bạn có biết nếu có một tương đương với multiprocessing.array trong billiard (cf phương pháp init của tôi) để tất cả các nhiệm vụ có thể chia sẻ cùng một biến bộ nhớ? – Patrick

+0

Không, không có - nhân viên cần tây không thể đưa ra giả định về việc có thể chia sẻ bất cứ điều gì vì chúng có thể chạy trên các máy chủ khác nhau. Nếu bạn chỉ chạy trên 1 máy thì có thể chỉ sử dụng đa xử lý là một cách tiếp cận tốt hơn vì bạn có sự tiện lợi của bộ nhớ dùng chung. Nếu bạn cần chạy trên nhiều máy thì có thể chỉ sử dụng cần tây và xử lý kiểu mapreduce trên dữ liệu của bạn – scytale

+0

Cảm ơn tất cả mọi sự giúp đỡ của bạn! Patrick – Patrick

Các vấn đề liên quan