2016-11-18 19 views
11

Tôi đang sử dụng Celery với RabbitMQ để xử lý dữ liệu từ các yêu cầu API. Quá trình này diễn ra như sau:Đa luồng trong một công nhân cần thiết

Yêu cầu -> API -> RabbitMQ -> Cần tây Worker -> Return

Lý tưởng nhất là tôi sẽ đẻ trứng nhiều người lao động cần tây nhưng tôi bị hạn chế bởi bộ nhớ hạn chế.

Hiện tại, nút cổ chai trong quy trình của tôi đang tìm nạp và tải xuống dữ liệu từ các URL được chuyển vào công nhân. Roughy, quá trình này trông giống như sau:

celery_gets_job(url): 
    var data = fetches_url(url) # takes 0.1s to 1.0s (bottleneck) 
    var result = processes_data(data) # takes 0.1ss 
    return result 

Điều này là không thể chấp nhận khi công nhân bị khóa trong một thời gian khi tìm nạp URL. Tôi đang tìm kiếm nâng cao này thông qua luồng, nhưng tôi không chắc chắn những gì thực tiễn tốt nhất là:

  • Có cách nào để làm cho việc tải xuống công nhân cần tây dữ liệu đến không đồng bộ khi xử lý dữ liệu cùng lúc trong một chủ đề khác nhau?

  • Tôi có nên có công nhân riêng lẻ tìm nạp và xử lý, với một số hình thức gửi tin nhắn, có thể thông qua RabbitMQ?

+1

Bạn có thể xem xét sử dụng một cái gì đó như [ống đa xử lý] (https://docs.python.org/3/library/multiprocessing.html#multiprocessing.Pipe) trong tác vụ cần tây bằng cách tạo hai bộ đa xử lý. Ofcourse quá trình đa xử lý của bạn nên được hạn chế bởi hồ bơi. Chia sẻ dữ liệu lớn của url được tìm nạp trên phần phụ trợ kết quả thỏmq/kết quả sẽ không tốt nếu tôi không sai. Celi cấp thấp của Celery cũng có thể có một số chức năng tương tự. –

+1

Tôi không biết về RabbitMQ nhưng những gì tôi nghĩ là đa xử lý sẽ phù hợp hơn cho bạn hơn là đa luồng vì 'celery_gets_job' có nhiều hoạt động phi nguyên tử và điều này sẽ tạo ra các vấn đề trong khi sử dụng đa luồng. Bạn có thể sử dụng Queue nơi dữ liệu được điền bởi nhóm các tiến trình đang chạy 'fetches_url (url)' và một tiến trình khác để thực hiện 'processes_data (dữ liệu)' – shrishinde

+0

Đây có thể là những gì bạn đang tìm kiếm: http: // stackoverflow. com/questions/28315657/celery-eventlet-non-blocking-requests – fpbhb

Trả lời

1

Sử dụng thư viện eventlet, bạn có thể vá các thư viện chuẩn để làm cho chúng không đồng bộ.

đầu tiên nhập khẩu các async urllib2:

from eventlet.green import urllib2 

Vì vậy, bạn sẽ nhận được cơ thể url với:

def fetch(url): 
    body = urllib2.urlopen(url).read() 
    return body 

Xem thêm eventlet ví dụ here.

+2

Ngoài ra, trực tiếp sử dụng các pool thực thi eventlet http://docs.celeryproject.org/en/latest/userguide/concurrency/eventlet.html sẽ tự động cập nhật các cuộc gọi io khỉ. – dyeray

+0

Nhưng sau đó sẽ không 'processes_data (dữ liệu)' vẫn chặn và làm cho kết quả kết hợp chậm hơn trước? – ostrokach

0

Tôi sẽ tạo hai tác vụ, một để tải xuống dữ liệu và một tác vụ khác để xử lý dữ liệu sau khi tải xuống. Bằng cách này bạn có thể mở rộng hai nhiệm vụ một cách độc lập. Xem: Routing, Chains.

Các vấn đề liên quan