Tôi đang viết một ứng dụng mà cần phải thực hiện một loạt các nhiệm vụ song song và sau đó là một nhiệm vụ duy nhất với kết quả của tất cả các nhiệm vụ chạy:Chạy một nhiệm vụ sau khi tất cả các nhiệm vụ đã được hoàn thành
@celery.task
def power(value, expo):
return value ** expo
@celery.task
def amass(values):
print str(values)
Đó là một ví dụ rất giả tạo và quá minh mẫn, nhưng hy vọng rằng điểm này sẽ xuất hiện tốt. Về cơ bản, tôi có nhiều mặt hàng cần chạy qua power
, nhưng tôi chỉ muốn chạy amass
về kết quả từ tất cả các tác vụ. Tất cả điều này sẽ xảy ra không đồng bộ, và tôi không cần bất cứ điều gì trở lại từ phương pháp amass
.
Có ai biết cách thiết lập điều này trong cần tây để mọi thứ được thực hiện không đồng bộ và một cuộc gọi lại với danh sách kết quả được gọi sau khi tất cả được nói và thực hiện?
tôi đã thiết lập ví dụ này để chạy với một chord
như Alexander Afanasiev đề nghị:
from time import sleep
import random
tasks = []
for i in xrange(10):
tasks.append(power.s((i, 2)))
sleep(random.randint(10, 1000)/1000.0) # sleep for 10-1000ms
callback = amass.s()
r = chord(tasks)(callback)
Thật không may, trong ví dụ trên, tất cả các nhiệm vụ trong tasks
được bắt đầu chỉ khi các phương pháp chord
được gọi. Có cách nào mà mỗi nhiệm vụ có thể bắt đầu riêng và sau đó tôi có thể thêm một cuộc gọi lại vào nhóm để chạy khi mọi thứ đã kết thúc?
Xin chào, có vẻ như ở trên là một cách tiếp cận tốt, ít nhất là khái niệm. Tuy nhiên, khi tôi đã thử nó ra, chính xác cùng một mã như trên, nó ném lỗi dưới đây: 'EncodeError: không phải là JSON serializable' Sẽ thực sự đánh giá cao một số trợ giúp ở đây . –
qre0ct
Ok, tôi đã giải quyết lỗi ở trên bằng cách chuyển final_task() một danh sách các taskIds trực tiếp thay vì truyền danh sách các đối tượng nhiệm vụ như đang được thực hiện trong mẫu mã ở trên. Cảm ơn anyway cho câu trả lời. Nó đã giúp rất nhiều. – qre0ct