2016-12-14 18 views
5

Tôi đang viết một ứng dụng sẽ thực thi một nhóm các chuỗi nhiệm vụ đồng bộ một cách không đồng bộ.Nhóm các chuỗi với các đối số vị trí trong các tác vụ một phần bằng cách sử dụng Celery

Nói cách khác, tôi có thể có đường ống foo(a,b,c) -> boo(a,b,c) cho một số danh sách bs.

Hiểu biết của tôi là tạo một chuỗi foo(a,b,c) | boo(a,b,c) cho mỗi b trong danh sách này. Những chuỗi này sau đó sẽ hình thành một nhóm cần tây, có thể được áp dụng không đồng bộ.

Mã của tôi để làm điều này là dưới đây:

my_app.py

#!/usr/bin/env python3 

import functools 
import time 

from celery import chain, group, Celery 
from celery.utils.log import get_task_logger 

logger = get_task_logger(__name__) 

app = Celery("my_app", broker='redis://localhost:6379/0', backend='redis://localhost:6379/0') 

@app.task 
def foo(a, b, c): 
    logger.info("foo from {0}!".format(b)) 
    return b 

@app.task 
def boo(a, b, c): 
    logger.info("boo from {0}!".format(b)) 
    return b 

def break_up_tasks(tasks): 
    try: 
     first_task, *remaining_tasks = tasks 
    except ValueError as e: 
     first_task, remaining_tasks = [], [] 
    return first_task, remaining_tasks 

def do_tasks(a, bs, c, opts): 
    tasks = [foo, boo] 

    # There should be an option for each task 
    if len(opts) != len(tasks): 
     raise ValueError("There should be {0} provided options".format(len(tasks))) 

    # Create a list of tasks that should be included per the list of options' boolean values 
    tasks = [task for opt, task in zip(opts, tasks) if opt] 

    first_task, remaining_tasks = break_up_tasks(tasks) 

    # If there are no tasks, we're done. 
    if not first_task: return 

    chains = (
     functools.reduce(
      # `a` should be provided by `apply_async`'s `args` kwarg 
      # `b` should be provided by previous partials in chain 
      lambda x, y: x | y.s(c), 
      remaining_tasks, first_task.s(a, b, c) 
     ) for b in bs 
    ) 

    g = group(*chains) 
    res = g.apply_async(args=(a,), queue="default") 
    print("Applied async... waiting for termination.") 

    total_tasks = len(tasks) 

    while not res.ready(): 
     print("Waiting... {0}/{1} tasks complete".format(res.completed_count(), total_tasks)) 
     time.sleep(1) 

if __name__ == "__main__": 
    a = "whatever" 
    bs = ["hello", "world"] 
    c = "baz" 

    opts = [ 
     # do "foo" 
     True, 
     # do "boo" 
     True 
    ] 

    do_tasks(a, bs, c, opts) 

Chạy cần tây

celery worker -A my_app -l info -c 5 -Q default 

Những gì tôi đang tìm kiếm, mặc dù , đó là khi tôi chạy trên, khách hàng máy chủ của tôi chạy một vòng lặp vô hạn vì boo là mất tích một cuộc tranh cãi:

TypeError: boo() missing 1 required positional argument: 'c'

sự hiểu biết của tôi là apply_async sẽ cung cấp args kwarg cho mỗi chuỗi và rằng các liên kết trước đó trong chuỗi sẽ cung cấp giá trị trả về của họ cho các liên kết tiếp theo.

Tại sao boo không nhận được đối số đúng cách? Tôi chắc rằng những nhiệm vụ này không được viết tốt vì đây là bước đột phá đầu tiên của tôi vào Celery. Nếu bạn có những gợi ý khác, tôi rất vui khi được giải trí.

Trả lời

3

Sau khi gỡ lỗi mã của bạn (Tôi cũng mới sử dụng Celery! :)) Tôi đã học được rằng mỗi hàm xích sẽ nhận được đối số đầu tiên được thay thế bằng kết quả của cuộc gọi hàm trước đó. giải pháp cho vấn đề của bạn là thêm một đối số bị thiếu (số thứ hai) vào ys trong phần giảm:

chains = (
    functools.reduce(
     # `a` should be provided by `apply_async`'s `args` kwarg 
     # `b` should be provided by previous partials in chain 
     lambda x, y: x | y.s(b,c), # <- here is the 'new guy' 
     remaining_tasks, first_task.s(a, b, c) 
    ) for b in bs 
) 

Hy vọng điều đó sẽ hữu ích.

+0

Chắc chắn, nhưng việc cung cấp 'args' vào' Group.apply_async' cũng giống như vậy. Tại sao nó không hoạt động? – erip

+0

Tôi tin rằng các đối số từ các hàm apply_async được áp dụng cho hàm đầu tiên trong chuỗi - hàm tiếp theo sẽ chỉ nhận được đối số đầu tiên từ đối số của nó. – olgierdh

+0

Đó là loại giải thích ở đây - http://docs.celeryproject.org/en/latest/userguide/calling.html#linking-callbacks-errbacks - thật khó để tìm tài liệu về cách thức hoạt động của chức năng này trên chuỗi. – olgierdh

Các vấn đề liên quan