2011-09-13 25 views
8

Mô-đun sản xuất của ứng dụng của tôi được điều hành bởi những người dùng muốn gửi công việc được thực hiện trên một cụm nhỏ. Nó gửi các mục đăng ký dưới dạng JSON thông qua nhà môi giới tin nhắn RabbitMQ.Mẫu tốt nhất để thiết kế một ứng dụng RPC không đồng bộ bằng Python, Pika và AMQP là gì?

Tôi đã thử một số chiến lược, và tốt nhất cho đến nay là những điều sau đây, mà vẫn không hoàn toàn làm việc:

Mỗi máy cụm chạy một mô-đun tiêu dùng, mà đặt mua riêng của mình vào hàng đợi AMQP và các vấn đề một prefetch_count để cho người môi giới biết số lượng tác vụ có thể chạy cùng một lúc.

Tôi có thể làm cho nó hoạt động bằng cách sử dụng SelectConnection từ thư viện Pika AMQP. Cả người tiêu dùng và nhà sản xuất đều bắt đầu hai kênh, một kênh được kết nối với mỗi hàng đợi. Nhà sản xuất gửi yêu cầu trên kênh [A] và chờ phản hồi trong kênh [B] và người tiêu dùng chờ yêu cầu trên kênh [A] và gửi phản hồi trên kênh [B]. Có vẻ như, tuy nhiên, khi người tiêu dùng chạy cuộc gọi lại để tính toán phản hồi, nó chặn, vì vậy tôi chỉ có một nhiệm vụ được thực hiện tại mỗi người tiêu dùng mỗi lần.

Những gì tôi cần cuối cùng:

  1. người tiêu dùng [A] đặt mua nhiệm vụ của mình (khoảng 5k mỗi lần) với cụm
  2. người môi giới công văn N tin nhắn/yêu cầu cho mỗi người tiêu dùng, trong đó N là số lượng tác vụ đồng thời có thể xử lý
  3. khi một tác vụ hoàn tất, người tiêu dùng trả lời người môi giới/nhà sản xuất với kết quả
  4. nhà sản xuất nhận được trả lời, cập nhật trạng thái tính toán và cuối cùng, in một số báo cáo

Hạn chế:

  • Nếu người dùng khác nộp công việc, tất cả các nhiệm vụ của mình sẽ được xếp hàng sau khi người dùng trước đó (tôi đoán đây là tự động thực từ hệ thống hàng đợi, nhưng tôi đã không nghĩ về những tác động về môi trường luồng)
  • Nhiệm vụ đã một trật tự phải nộp, nhưng thứ tự mà chúng được trả lời là không quan trọng

UP NGÀY

Tôi đã nghiên cứu thêm một chút và vấn đề thực tế của tôi có vẻ là tôi sử dụng hàm đơn giản làm gọi lại hàm SelectConnection.channel.basic_consume() của pika. Ý tưởng cuối cùng (không được thực hiện) của tôi là chuyển một hàm luồng, thay vì một hàm thông thường, do đó cuộc gọi lại sẽ không chặn và người tiêu dùng có thể tiếp tục nghe.

+0

vấn đề rất giống với những gì tôi gặp! –

Trả lời

0

Thiết lập của bạn có vẻ tốt với tôi. Và bạn đúng, bạn chỉ có thể đặt gọi lại để bắt đầu một chuỗi và chuỗi đến một cuộc gọi lại riêng khi chuỗi kết thúc xếp hàng trả lời qua Kênh B.

Về cơ bản, người tiêu dùng của bạn phải có hàng đợi của riêng họ (kích thước của N, số lượng song song mà họ hỗ trợ). Khi một yêu cầu đến qua kênh A, nó sẽ lưu trữ kết quả trong hàng đợi được chia sẻ giữa luồng chính với Pika và các chuỗi công nhân trong nhóm luồng. Ngay sau khi nó được xếp hàng đợi, pika sẽ phản hồi lại bằng ACK và chuỗi công nhân của bạn sẽ thức dậy và bắt đầu xử lý.

Khi công nhân hoàn thành công việc, nó sẽ xếp kết quả trở lại trên hàng đợi kết quả riêng biệt và phát lại cuộc gọi đến chuỗi chính để gửi lại cho người tiêu dùng.

Bạn nên cẩn thận và đảm bảo rằng các chuỗi công nhân không can thiệp lẫn nhau nếu họ đang sử dụng bất kỳ tài nguyên được chia sẻ nào, nhưng đó là một chủ đề riêng biệt.

0

Không có kinh nghiệm về luồng, thiết lập của tôi sẽ chạy nhiều quy trình tiêu dùng (số lượng cơ bản là số lần tìm nạp trước của bạn). Mỗi người sẽ kết nối với hai hàng đợi và họ sẽ xử lý công việc một cách vui vẻ, không biết sự tồn tại của nhau.

2

Như bạn đã nhận thấy, quy trình của bạn sẽ chặn khi chạy một cuộc gọi lại. Có một số cách để giải quyết vấn đề này tùy thuộc vào những gì gọi lại của bạn.

Nếu gọi lại của bạn bị ràng buộc IO (thực hiện nhiều mạng hoặc đĩa IO), bạn có thể sử dụng một trong hai chủ đề hoặc giải pháp dựa trên greenlet, chẳng hạn như gevent, eventlet hoặc greenhouse. Tuy nhiên, hãy nhớ rằng, Python bị giới hạn bởi GIL (Global Interpreter Lock), có nghĩa là chỉ có một đoạn mã python đang chạy trong một tiến trình python duy nhất. Điều này có nghĩa rằng nếu bạn đang làm rất nhiều tính toán với mã python, các giải pháp này có thể sẽ không nhanh hơn nhiều so với những gì bạn đã có.

Một tùy chọn khác là triển khai người tiêu dùng của bạn dưới dạng nhiều quy trình sử dụng multiprocessing. Tôi đã tìm thấy đa xử lý rất hữu ích khi thực hiện công việc song song. Bạn có thể thực hiện điều này bằng cách sử dụng một Queue, có quá trình cha mẹ là người tiêu dùng và nuôi ra công việc cho con cái của nó, hoặc đơn giản là bắt đầu lên nhiều quy trình mà mỗi tiêu thụ một mình. Tôi sẽ đề nghị, trừ khi ứng dụng của bạn là rất đồng thời (1000s của công nhân), để chỉ đơn giản là bắt đầu nhiều công nhân, mỗi người trong số đó tiêu thụ từ kết nối của riêng họ. Bằng cách này, bạn có thể sử dụng tính năng xác nhận của AMQP, vì vậy nếu một người tiêu dùng chết trong khi vẫn xử lý một nhiệm vụ, thông báo sẽ được gửi trở lại hàng đợi tự động và sẽ được một nhân viên khác nhặt chứ không phải chỉ đơn giản là mất yêu cầu.

Tùy chọn cuối cùng, nếu bạn kiểm soát nhà sản xuất và nó cũng được viết bằng Python, hãy sử dụng thư viện tác vụ như celery để trừu tượng hoạt động công việc/xếp hàng cho bạn. Tôi đã sử dụng cần tây cho một số dự án lớn và đã tìm thấy nó được viết rất tốt. Nó cũng sẽ xử lý nhiều vấn đề người tiêu dùng cho bạn với cấu hình thích hợp.

+0

+1 để đề cập đến cần tây –

Các vấn đề liên quan