2013-02-27 24 views
16

Khi tôi có một cái gì đó như saucần tây - các nhóm chuỗi và nhiệm vụ phụ. -> ra lệnh thực hiện

group1 = group(task1.si(), task1.si(), task1.si()) 
group2 = group(task2.si(), task2.si(), task2.si()) 

workflow = chain(group1, group2, task3.si()) 

Việc giải thích trực quan là task3 chỉ nên thực hiện sau khi tất cả các nhiệm vụ trong nhóm 2 đã kết thúc.

Thực tế, nhiệm vụ 3 thực thi trong khi nhóm1 đã bắt đầu nhưng chưa hoàn thành.

Tôi đang làm gì sai?

+0

Bất kỳ cập nhật nào về điều này với phiên bản cần tây mới? – JohnnyM

Trả lời

11

Vì vậy, khi nó quay ra, trong cần tây bạn không thể chuỗi hai nhóm lại với nhau.
tôi nghi ngờ điều này là do nhóm xích với nhiệm vụ tự động trở thành một hợp âm
- tài liệu> Cần tây: http://docs.celeryproject.org/en/latest/userguide/canvas.html

Chaining một nhóm cùng với công việc khác sẽ tự động nâng cấp nó trở thành một hợp âm:

Nhóm trả về tác vụ gốc. Khi chuỗi hai nhóm lại với nhau, tôi nghi ngờ rằng khi nhóm đầu tiên hoàn thành, hợp âm bắt đầu gọi lại "nhiệm vụ". Tôi nghi ngờ "nhiệm vụ" này thực sự là "nhiệm vụ phụ huynh" của nhóm thứ hai. Tôi tiếp tục nghi ngờ rằng nhiệm vụ phụ huynh này hoàn thành ngay sau khi nó kết thúc khởi động tất cả các nhiệm vụ phụ trong nhóm và kết quả là mục tiếp theo sau khi nhóm thứ hai được thực thi.

Để chứng minh điều này ở đây là một số mã mẫu. Bạn sẽ cần phải có một trường hợp cần tây chạy.

# celery_experiment.py 

from celery import task, group, chain, chord 
from celery.signals import task_sent, task_postrun, task_prerun 

import time 
import logging 

import random 
random.seed() 

logging.basicConfig(level=logging.DEBUG) 

### HANDLERS ###  
@task_prerun.connect() 
def task_starting_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, **kwds):  
    try: 
     logging.info('[%s] starting' % kwargs['id']) 
    except KeyError: 
     pass 

@task_postrun.connect() 
def task_finished_handler(sender=None, task_id=None, task=None, args=None, kwargs=None, retval=None, state=None, **kwds): 
    try:  
     logging.info('[%s] finished' % kwargs['id']) 
    except KeyError: 
     pass 


def random_sleep(id): 
    slp = random.randint(1, 3) 
    logging.info('[%s] sleep for %ssecs' % (id, slp)) 
    time.sleep(slp) 

@task() 
def thing(id): 
    logging.info('[%s] begin' % id) 
    random_sleep(id) 
    logging.info('[%s] end' % id) 


def exec_exp(): 
    st = thing.si(id='st') 
    st_arr = [thing.si(id='st_arr1_a'), thing.si(id='st_arr1_b'), thing.si(id='st_arr1_c'),] 
    st_arr2 = [thing.si(id='st_arr2_a'), thing.si(id='st_arr2_b'),] 
    st2 = thing.si(id='st2') 
    st3 = thing.si(id='st3') 
    st4 = thing.si(id='st4') 

    grp1 = group(st_arr) 
    grp2 = group(st_arr2) 

    # chn can chain two groups together because they are seperated by a single subtask 
    chn = (st | grp1 | st2 | grp2 | st3 | st4) 

    # in chn2 you can't chain two groups together. what will happen is st3 will start before grp2 finishes 
    #chn2 = (st | st2 | grp1 | grp2 | st3 | st4) 

    r = chn() 
    #r2 = chn2() 
+0

Cảm ơn vì điều này. Thật không may cho tôi, công việc của tôi sẽ không cho phép tôi sử dụng một nhiệm vụ 'có liên quan' giữa các nhóm. Vì vậy, tôi đã kết thúc việc tạo ra một nhiệm vụ giả mạo 'def fake_celery_task(): pass' để chạy giữa các nhóm ... – lukik

13

Tôi có cùng vấn đề với cần tây, cố gắng có quy trình làm việc trong đó bước đầu tiên là "đẻ trứng hàng triệu tác vụ". Đã cố gắng nhóm các nhóm, nhiệm vụ phụ, cuối cùng bước 2 của tôi khởi động trước khi bước 1 kết thúc.

dài truyện ngắn tôi có thể đã tìm thấy một giải pháp với việc sử dụng các hợp âm và một Finisher câm:

@celery.task 
def chordfinisher(*args, **kwargs): 
    return "OK" 

Làm gì nhiều, nhưng nó cho phép tôi làm điều này:

tasks = [] 
for id in ids: 
    tasks.append(mytask.si(id)) 
step1 = chord(group(tasks), chordfinisher.si()) 

step2 = ... 

workflow = chain(step1, step2) 

Nguyên Tôi muốn có step1 trong một subtask nhưng cho cùng một lý do như bị nghi ngờ, hành động gọi một nhóm kết thúc, nhiệm vụ được coi là hoàn thành, và công việc của tôi di chuyển trên ...

Nếu som eone có cái gì đó tốt hơn, tôi quan tâm!

+1

Xin chào, đây là những gì tôi đã làm rất nhiều. Một điều cần ghi nhớ, bạn cần người dumbfinisher trả lại kết quả thực hiện nhóm. nếu không nếu bất cứ điều gì trong nhóm không thành công, chuỗi của bạn sẽ không dừng lại ở bước 1. (điều này có thể hoặc không thể là những gì bạn muốn) –

Các vấn đề liên quan