2010-01-25 18 views
16

Tôi có một dự án Django và tôi đang cố gắng sử dụng Celery để gửi các tác vụ xử lý nền (http://ask.github.com/celery/introduction.html). Celery tích hợp tốt với Django và tôi đã có thể gửi các nhiệm vụ tùy chỉnh của mình và lấy lại kết quả.Làm cách nào để thiết lập Celery để gọi chức năng khởi tạo tùy chỉnh trước khi chạy tác vụ của tôi?

Vấn đề duy nhất là tôi không thể tìm thấy một cách lành mạnh để thực hiện khởi tạo tùy chỉnh trong quy trình daemon. Tôi cần phải gọi một hàm đắt tiền tải rất nhiều bộ nhớ trước khi tôi bắt đầu xử lý các nhiệm vụ, và tôi không thể đủ khả năng để gọi hàm đó mỗi lần.

Có ai có vấn đề này trước đây không? Bất kỳ ý tưởng làm thế nào để làm việc xung quanh nó mà không sửa đổi mã nguồn Celery?

Cảm ơn

+0

loại khởi tạo tùy chỉnh nào bạn cần chạy? – diegueus9

+0

tôi cần tải cấu trúc dữ liệu ~ 10MB cần thiết để xử lý mọi tác vụ (cấu trúc giống nhau cho mọi tác vụ). – xelk

Trả lời

15

Bạn có thể viết trình tải tùy chỉnh hoặc sử dụng tín hiệu.

Trình tải có phương thức on_task_init, được gọi khi công việc sắp được thực thi, và on_worker_init được gọi bằng quy trình chính cần thiết.

Sử dụng tín hiệu có lẽ là dễ nhất, các tín hiệu có sẵn là:

0.8.x:

  • task_prerun(task_id, task, args, kwargs)

    cử đến khi một nhiệm vụ sắp được thực hiện bởi các công nhân (hoặc tại địa phương nếu sử dụng apply/hoặc nếu CELERY_ALWAYS_EAGER đã được đặt).

  • task_postrun(task_id, task, args, kwargs, retval) Gửi đi sau khi tác vụ được thực thi trong cùng điều kiện như trên.

  • task_sent(task_id, task, args, kwargs, eta, taskset)

    Được gọi khi một nhiệm vụ được áp dụng (không tốt cho hoạt động lâu dài)

tín hiệu bổ sung có sẵn trong 0.9.x (hiện tại chủ chi nhánh trên github):

  • worker_init()

    Được gọi khi celeryd đã bắt đầu (trước khi nhiệm vụ được khởi tạo, vì vậy nếu trên hệ thống hỗ trợ fork, bất kỳ thay đổi bộ nhớ nào sẽ được sao chép sang các quy trình công nhân của trẻ ).

  • worker_ready()

    Được gọi khi celeryd có thể nhận nhiệm vụ.

  • worker_shutdown()

    Được gọi khi celeryd đang tắt.

Dưới đây là một ví dụ precalculating một cái gì đó lần đầu tiên một công việc được điều hành trong quá trình này:

from celery.task import Task 
from celery.registry import tasks 
from celery.signals import task_prerun 

_precalc_table = {} 

class PowersOfTwo(Task): 

    def run(self, x): 
     if x in _precalc_table: 
      return _precalc_table[x] 
     else: 
      return x ** 2 
tasks.register(PowersOfTwo) 


def _precalc_numbers(**kwargs): 
    if not _precalc_table: # it's empty, so haven't been generated yet 
     for i in range(1024): 
      _precalc_table[i] = i ** 2 


# need to use registered instance for sender argument. 
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name]) 

Nếu bạn muốn các chức năng được chạy cho tất cả các nhiệm vụ, chỉ cần bỏ qua đối số sender.

Các vấn đề liên quan