Mô-đun sản xuất của ứng dụng của tôi được điều hành bởi những người dùng muốn gửi công việc được thực hiện trên một cụm nhỏ. Nó gửi các mục đăng ký dưới dạng JSON thông qua nhà môi giới tin nhắn RabbitMQ.Mẫu tốt nhất để thiết kế một ứng dụng RPC không đồng bộ bằng Python, Pika và AMQP là gì?
Tôi đã thử một số chiến lược, và tốt nhất cho đến nay là những điều sau đây, mà vẫn không hoàn toàn làm việc:
Mỗi máy cụm chạy một mô-đun tiêu dùng, mà đặt mua riêng của mình vào hàng đợi AMQP và các vấn đề một prefetch_count để cho người môi giới biết số lượng tác vụ có thể chạy cùng một lúc.
Tôi có thể làm cho nó hoạt động bằng cách sử dụng SelectConnection từ thư viện Pika AMQP. Cả người tiêu dùng và nhà sản xuất đều bắt đầu hai kênh, một kênh được kết nối với mỗi hàng đợi. Nhà sản xuất gửi yêu cầu trên kênh [A] và chờ phản hồi trong kênh [B] và người tiêu dùng chờ yêu cầu trên kênh [A] và gửi phản hồi trên kênh [B]. Có vẻ như, tuy nhiên, khi người tiêu dùng chạy cuộc gọi lại để tính toán phản hồi, nó chặn, vì vậy tôi chỉ có một nhiệm vụ được thực hiện tại mỗi người tiêu dùng mỗi lần.
Những gì tôi cần cuối cùng:
- người tiêu dùng [A] đặt mua nhiệm vụ của mình (khoảng 5k mỗi lần) với cụm
- người môi giới công văn N tin nhắn/yêu cầu cho mỗi người tiêu dùng, trong đó N là số lượng tác vụ đồng thời có thể xử lý
- khi một tác vụ hoàn tất, người tiêu dùng trả lời người môi giới/nhà sản xuất với kết quả
- nhà sản xuất nhận được trả lời, cập nhật trạng thái tính toán và cuối cùng, in một số báo cáo
Hạn chế:
- Nếu người dùng khác nộp công việc, tất cả các nhiệm vụ của mình sẽ được xếp hàng sau khi người dùng trước đó (tôi đoán đây là tự động thực từ hệ thống hàng đợi, nhưng tôi đã không nghĩ về những tác động về môi trường luồng)
- Nhiệm vụ đã một trật tự phải nộp, nhưng thứ tự mà chúng được trả lời là không quan trọng
UP NGÀY
Tôi đã nghiên cứu thêm một chút và vấn đề thực tế của tôi có vẻ là tôi sử dụng hàm đơn giản làm gọi lại hàm SelectConnection.channel.basic_consume() của pika. Ý tưởng cuối cùng (không được thực hiện) của tôi là chuyển một hàm luồng, thay vì một hàm thông thường, do đó cuộc gọi lại sẽ không chặn và người tiêu dùng có thể tiếp tục nghe.
vấn đề rất giống với những gì tôi gặp! –