Bạn có thể viết trình tải tùy chỉnh hoặc sử dụng tín hiệu.
Trình tải có phương thức on_task_init
, được gọi khi công việc sắp được thực thi, và on_worker_init
được gọi bằng quy trình chính cần thiết.
Sử dụng tín hiệu có lẽ là dễ nhất, các tín hiệu có sẵn là:
0.8.x:
task_prerun(task_id, task, args, kwargs)
cử đến khi một nhiệm vụ sắp được thực hiện bởi các công nhân (hoặc tại địa phương nếu sử dụng apply
/hoặc nếu CELERY_ALWAYS_EAGER
đã được đặt).
task_postrun(task_id, task, args, kwargs, retval)
Gửi đi sau khi tác vụ được thực thi trong cùng điều kiện như trên.
task_sent(task_id, task, args, kwargs, eta, taskset)
Được gọi khi một nhiệm vụ được áp dụng (không tốt cho hoạt động lâu dài)
tín hiệu bổ sung có sẵn trong 0.9.x (hiện tại chủ chi nhánh trên github):
worker_init()
Được gọi khi celeryd đã bắt đầu (trước khi nhiệm vụ được khởi tạo, vì vậy nếu trên hệ thống hỗ trợ fork
, bất kỳ thay đổi bộ nhớ nào sẽ được sao chép sang các quy trình công nhân của trẻ ).
worker_ready()
Được gọi khi celeryd có thể nhận nhiệm vụ.
worker_shutdown()
Được gọi khi celeryd đang tắt.
Dưới đây là một ví dụ precalculating một cái gì đó lần đầu tiên một công việc được điều hành trong quá trình này:
from celery.task import Task
from celery.registry import tasks
from celery.signals import task_prerun
_precalc_table = {}
class PowersOfTwo(Task):
def run(self, x):
if x in _precalc_table:
return _precalc_table[x]
else:
return x ** 2
tasks.register(PowersOfTwo)
def _precalc_numbers(**kwargs):
if not _precalc_table: # it's empty, so haven't been generated yet
for i in range(1024):
_precalc_table[i] = i ** 2
# need to use registered instance for sender argument.
task_prerun.connect(_precalc_numbers, sender=tasks[PowerOfTwo.name])
Nếu bạn muốn các chức năng được chạy cho tất cả các nhiệm vụ, chỉ cần bỏ qua đối số sender
.
loại khởi tạo tùy chỉnh nào bạn cần chạy? – diegueus9
tôi cần tải cấu trúc dữ liệu ~ 10MB cần thiết để xử lý mọi tác vụ (cấu trúc giống nhau cho mọi tác vụ). – xelk