2013-04-30 43 views
8

Tôi đang viết một ứng dụng mà cần phải thực hiện một loạt các nhiệm vụ song song và sau đó là một nhiệm vụ duy nhất với kết quả của tất cả các nhiệm vụ chạy:Chạy một nhiệm vụ sau khi tất cả các nhiệm vụ đã được hoàn thành

@celery.task 
def power(value, expo): 
    return value ** expo 

@celery.task 
def amass(values): 
    print str(values) 

Đó là một ví dụ rất giả tạo và quá minh mẫn, nhưng hy vọng rằng điểm này sẽ xuất hiện tốt. Về cơ bản, tôi có nhiều mặt hàng cần chạy qua power, nhưng tôi chỉ muốn chạy amass về kết quả từ tất cả các tác vụ. Tất cả điều này sẽ xảy ra không đồng bộ, và tôi không cần bất cứ điều gì trở lại từ phương pháp amass.

Có ai biết cách thiết lập điều này trong cần tây để mọi thứ được thực hiện không đồng bộ và một cuộc gọi lại với danh sách kết quả được gọi sau khi tất cả được nói và thực hiện?

tôi đã thiết lập ví dụ này để chạy với một chord như Alexander Afanasiev đề nghị:

from time import sleep 

import random 

tasks = [] 

for i in xrange(10): 
    tasks.append(power.s((i, 2))) 
    sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms 

callback = amass.s() 

r = chord(tasks)(callback) 

Thật không may, trong ví dụ trên, tất cả các nhiệm vụ trong tasks được bắt đầu chỉ khi các phương pháp chord được gọi. Có cách nào mà mỗi nhiệm vụ có thể bắt đầu riêng và sau đó tôi có thể thêm một cuộc gọi lại vào nhóm để chạy khi mọi thứ đã kết thúc?

Trả lời

3

Dưới đây là một giải pháp mà làm việc cho các mục đích của tôi:

tasks.py:

from time import sleep 

import random 

@celery.task 
def power(value, expo): 
    sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms 
    return value ** expo 

@celery.task 
def amass(results, tasks): 
    completed_tasks = [] 
    for task in tasks: 
     if task.ready(): 
      completed_tasks.append(task) 
      results.append(task.get()) 

    # remove completed tasks 
    tasks = list(set(tasks) - set(completed_tasks)) 

    if len(tasks) > 0: 
     # resend the task to execute at least 1 second from now 
     amass.delay(results, tasks, countdown=1) 
    else: 
     # we done 
     print results 

Trường hợp sử dụng:

tasks = [] 

for i in xrange(10): 
    tasks.append(power.delay(i, 2)) 

amass.delay([], tasks) 

này nên làm bắt đầu là tất cả các tác vụ càng sớm càng tốt một cách không đồng bộ. Khi tất cả đã được đăng lên hàng đợi, nhiệm vụ amass cũng sẽ được đăng lên hàng đợi. Nhiệm vụ tích lũy sẽ giữ lại chính nó cho đến khi tất cả các nhiệm vụ khác đã được hoàn thành.

+0

Xin chào, có vẻ như ở trên là một cách tiếp cận tốt, ít nhất là khái niệm. Tuy nhiên, khi tôi đã thử nó ra, chính xác cùng một mã như trên, nó ném lỗi dưới đây: 'EncodeError: không phải là JSON serializable' Sẽ thực sự đánh giá cao một số trợ giúp ở đây . – qre0ct

+0

Ok, tôi đã giải quyết lỗi ở trên bằng cách chuyển final_task() một danh sách các taskIds trực tiếp thay vì truyền danh sách các đối tượng nhiệm vụ như đang được thực hiện trong mẫu mã ở trên. Cảm ơn anyway cho câu trả lời. Nó đã giúp rất nhiều. – qre0ct

3

Cần tây có plenty of tools cho hầu hết các luồng công việc bạn có thể tưởng tượng.

Có vẻ như bạn cần sử dụng chord. Đây là trích dẫn từ tài liệu:

A chord is just like a group but with a callback. A chord consists of a header group and a body, where the body is a task that should execute after all of the tasks in the header are complete.

+0

Điều này chắc chắn đúng, tuy nhiên, có vấn đề với nó. Tôi đã cập nhật câu trả lời của mình với các chi tiết. –

0

Câu trả lời mà @ alexander-afanasiev cung cấp cho bạn về cơ bản là đúng: sử dụng hợp âm.

Mã của bạn là OK, nhưng tasks.append(power.s((i, 2))) không thực sự thực hiện thao tác phụ, chỉ cần thêm subtasks vào danh sách. Đó là chord(...)(...) một thư gửi nhiều thư đến người môi giới dưới dạng các mục phụ bạn đã xác định trong danh sách tasks, cộng thêm một thông báo nữa cho nhiệm vụ gọi lại. Khi bạn gọi chord, nó sẽ trả lại ngay khi có thể.

Nếu bạn muốn biết khi nào hợp âm đã kết thúc, bạn có thể thăm dò ý kiến ​​hoàn thành như với một tác vụ đơn lẻ bằng cách sử dụng r.ready() trong mẫu của bạn.

+0

Tôi muốn mỗi subtask thực thi ngay sau khi nó được đăng, không phải khi hợp âm được đăng. Điều đó có thể không? –

+0

Vâng, chỉ cần thực hiện 'power.delay (i, 2)' trong vòng lặp và kiểm tra tất cả các kết quả trung gian để hoàn thành trước khi gọi 'amass (kết quả)'. Nhưng tôi không thực sự thấy điểm. Sử dụng hợp âm sẽ thực hiện các nhiệm vụ 'power.s' ngay khi chúng có sẵn dưới dạng tin nhắn trong nhà môi giới và' tích lũy 'sau khi chúng kết thúc. Tôi nghĩ rằng bạn nên làm rõ những gì bạn muốn đạt được, bởi vì có vẻ như mong muốn của bạn để thực hiện các nhiệm vụ không đồng bộ mâu thuẫn với việc sử dụng bạn đang đề xuất. – enlavin

+0

Tôi đã đưa ra một giải pháp ở trên thể hiện những gì tôi muốn làm. –

0

Nhìn vào đoạn này từ câu hỏi của bạn, có vẻ như bạn đang đi qua một list như tiêu đề hợp âm, chứ không phải là một group:

from time import sleep 
import random 

tasks = [] 

for i in xrange(10): 
    tasks.append(power.s((i, 2))) 
    sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms 

callback = amass.s() 

r = chord(tasks)(callback) 

Chuyển đổi các list đến một group nên kết quả trong hành vi bạn đang mong đợi:

... 

callback = amass.s() 

tasks = group(tasks) 

r = chord(tasks)(callback) 
Các vấn đề liên quan