Tôi đang viết một ứng dụng sẽ thực thi một nhóm các chuỗi nhiệm vụ đồng bộ một cách không đồng bộ.Nhóm các chuỗi với các đối số vị trí trong các tác vụ một phần bằng cách sử dụng Celery
Nói cách khác, tôi có thể có đường ống foo(a,b,c) -> boo(a,b,c)
cho một số danh sách bs
.
Hiểu biết của tôi là tạo một chuỗi foo(a,b,c) | boo(a,b,c)
cho mỗi b trong danh sách này. Những chuỗi này sau đó sẽ hình thành một nhóm cần tây, có thể được áp dụng không đồng bộ.
Mã của tôi để làm điều này là dưới đây:
my_app.py
#!/usr/bin/env python3
import functools
import time
from celery import chain, group, Celery
from celery.utils.log import get_task_logger
logger = get_task_logger(__name__)
app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0')
@app.task
def foo(a, b, c):
logger.info("foo from {0}!".format(b))
return b
@app.task
def boo(a, b, c):
logger.info("boo from {0}!".format(b))
return b
def break_up_tasks(tasks):
try:
first_task, *remaining_tasks = tasks
except ValueError as e:
first_task, remaining_tasks = [], []
return first_task, remaining_tasks
def do_tasks(a, bs, c, opts):
tasks = [foo, boo]
# There should be an option for each task
if len(opts) != len(tasks):
raise ValueError("There should be {0} provided options".format(len(tasks)))
# Create a list of tasks that should be included per the list of options' boolean values
tasks = [task for opt, task in zip(opts, tasks) if opt]
first_task, remaining_tasks = break_up_tasks(tasks)
# If there are no tasks, we're done.
if not first_task: return
chains = (
functools.reduce(
# `a` should be provided by `apply_async`'s `args` kwarg
# `b` should be provided by previous partials in chain
lambda x, y: x | y.s(c),
remaining_tasks, first_task.s(a, b, c)
) for b in bs
)
g = group(*chains)
res = g.apply_async(args=(a,), queue="default")
print("Applied async... waiting for termination.")
total_tasks = len(tasks)
while not res.ready():
print("Waiting... {0}/{1} tasks complete".format(res.completed_count(), total_tasks))
time.sleep(1)
if __name__ == "__main__":
a = "whatever"
bs = ["hello", "world"]
c = "baz"
opts = [
# do "foo"
True,
# do "boo"
True
]
do_tasks(a, bs, c, opts)
Chạy cần tây
celery worker -A my_app -l info -c 5 -Q default
Những gì tôi đang tìm kiếm, mặc dù , đó là khi tôi chạy trên, khách hàng máy chủ của tôi chạy một vòng lặp vô hạn vì boo
là mất tích một cuộc tranh cãi:
TypeError: boo() missing 1 required positional argument: 'c'
sự hiểu biết của tôi là apply_async
sẽ cung cấp args
kwarg cho mỗi chuỗi và rằng các liên kết trước đó trong chuỗi sẽ cung cấp giá trị trả về của họ cho các liên kết tiếp theo.
Tại sao boo
không nhận được đối số đúng cách? Tôi chắc rằng những nhiệm vụ này không được viết tốt vì đây là bước đột phá đầu tiên của tôi vào Celery. Nếu bạn có những gợi ý khác, tôi rất vui khi được giải trí.
Chắc chắn, nhưng việc cung cấp 'args' vào' Group.apply_async' cũng giống như vậy. Tại sao nó không hoạt động? – erip
Tôi tin rằng các đối số từ các hàm apply_async được áp dụng cho hàm đầu tiên trong chuỗi - hàm tiếp theo sẽ chỉ nhận được đối số đầu tiên từ đối số của nó. – olgierdh
Đó là loại giải thích ở đây - http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks - thật khó để tìm tài liệu về cách thức hoạt động của chức năng này trên chuỗi. – olgierdh