2014-07-23 15 views
7

Tôi có 2 nhiệm vụ tùy chỉnh (TaskATaskB), đều được kế thừa từ celery.Task. Trình lập lịch biểu khởi chạy TaskA mọi lúc và sau đó, và TaskA khởi chạy N lần TaskB với các đối số khác nhau mỗi lần. Nhưng vì lý do nào đó, đôi khi cùng một số TaskB, với cùng một đối số, đang được thực hiện hai lần cùng một lúc và gây ra các vấn đề khác nhau với cơ sở dữ liệu.Cần tây/Làm lại cùng một nhiệm vụ đang được thực thi nhiều lần song song

class TaskA(celery.Task): 

    def run(self, *args, **kwargs): 
     objects = MyModel.objects.filter(processed=False)\ 
           .values_list('id', flat=True) 
     task_b = TaskB() 
     for o in objects: 
      o.apply_async(args=[o, ]) 

class TaskB(celery.Task): 

    def run(self, obj_id, *args, **kwargs): 
     obj = MyModel.objects.get(id=obj_id) 
     # do some stuff with obj 

Những điều tôi đã cố gắng

tôi đã cố gắng sử dụng celery.group với hy vọng rằng nó sẽ sửa chữa các vấn đề như vậy, nhưng tất cả tôi nhận được lỗi, nói rằng run mất 2 đối số và không được cung cấp.

Đây là cách tôi đã cố gắng để khởi động TaskB sử dụng celery.group:

# somewhere in TaskA 
task_b = TaskB() 
g = celery.group([task_b.s(id) for id in objects]) 
g.apply_async() 

Tôi cũng đã thử nó như thế này:

# somewhere in TaskA 
task_b = TaskB() 
g = celery.group([task_b.run(id) for id in objects]) 
g.apply_async() 

mà thực hiện các nhiệm vụ ngay tại đó, trước khi g.apply_async().

Câu hỏi

Liệu vấn đề này xuất phát từ cách tôi khởi động nhiệm vụ hoặc là nó cái gì khác? Đó có phải là hành vi bình thường không?

Thông tin cá

Trên máy tính địa phương của tôi, tôi chạy celery 3.1.13 với RabbitMQ 3.3.4, và trên máy chủ celery 3.1.13 chạy với Redis 2.8.9. Trên máy cục bộ, tôi thấy không có hành vi nào như vậy, mọi công việc được thực hiện một lần. Trên máy chủ tôi thấy bất cứ nơi nào giữa 1 - 10 nhiệm vụ như vậy được thực thi hai lần liên tiếp.

Đây là cách tôi chạy cần tây trên máy tính cục bộ và trên máy chủ:

celery_beat: celery -A proj beat -l info 

celery1: celery -A proj worker -Q default -l info --purge -n default_worker -P eventlet -c 50 

celery2: celery -A proj worker -Q long -l info --purge -n long_worker -P eventlet -c 200 

Giải pháp mà làm việc

tôi đã giới thiệu một khóa trên TaskB dựa trên những gì lý lẽ nó nhận được. Sau khoảng 10 giờ thử nghiệm, tôi thấy chính xác những gì đang được thực hiện hai lần, nhưng khóa ngăn ngừa va chạm trên cơ sở dữ liệu. Điều này giải quyết được vấn đề của tôi, nhưng tôi vẫn muốn hiểu tại sao nó lại xảy ra.

+0

Mã của bạn sẽ hoạt động tốt. Tôi đã sao chép [mã của bạn vào một tập tin như thế này] (http://pastebin.com/f1gAf4R4) và tất cả các tác vụ đã được thực hiện sau khi gọi 'TaskA(). Apply_async()'. Bạn có thể đăng truy nguyên của mình để xem vấn đề ở đâu không? – daniula

+0

truy nguyên xuất phát từ cơ sở dữ liệu. 'MyModel' có ràng buộc duy nhất trên 2 trường. Vì vậy, khi nhiệm vụ được chạy lần đầu tiên và tạo một đối tượng mới, tất cả đều tốt, nhưng sau đó cùng một tác vụ chạy lại, và cố tạo lại cùng một đối tượng và ném 'IntegrityError'. – Neara

+0

Với mã mà bạn đã đăng, bạn không thể sao chép vấn đề của mình. Tôi nghĩ rằng bạn có thể thử tạo các cá thể TaskB riêng biệt cho mỗi tác vụ vì nó có thể là một vấn đề. Hãy thử: 'g = celery.group ([TaskB(). S (id) cho id trong các đối tượng])' – daniula

Trả lời

2

Bạn đã đặt fanout_prefixfanout_patterns như được mô tả trong tài liệu Using Redis cho cần tây? Tôi đang sử dụng cần tây với Redis và tôi không gặp vấn đề này.

+0

Tôi đã gặp phải vấn đề tương tự này, với một nhiệm vụ xếp hàng các nhiệm vụ khác và các nhiệm vụ khác được chọn và thực hiện nhiều lần. Đặt 'fanout_prefix' và' fanout_patterns' như được mô tả dường như đã khắc phục được sự cố. Sử dụng cần tây 3.1.18 và Kombu 3.0.30 –

Các vấn đề liên quan