Tôi có một lò phản ứng mà lấy về các thông điệp từ một nhà môi giới RabbitMQ và gây nên các phương pháp nhân để xử lý các thông điệp trong một hồ bơi quá trình, một cái gì đó như thế này:Làm thế nào để xử lý các kết nối SQLAlchemy trong ProcessPool?
này được thực hiện sử dụng python asyncio
, loop.run_in_executor()
và concurrent.futures.ProcessPoolExecutor
.
Bây giờ tôi muốn truy cập cơ sở dữ liệu trong các phương thức công nhân sử dụng SQLAlchemy. Chủ yếu là xử lý sẽ rất đơn giản và nhanh chóng hoạt động CRUD.
Lò phản ứng sẽ xử lý 10-50 tin nhắn mỗi giây ngay từ đầu, do đó không thể mở kết nối cơ sở dữ liệu mới cho mọi yêu cầu. Thay vào đó, tôi muốn duy trì một kết nối liên tục cho mỗi quá trình.
Câu hỏi của tôi là: Làm thế nào tôi có thể thực hiện việc này? Tôi có thể lưu trữ chúng trong một biến toàn cục không? Hồ bơi kết nối SQA có xử lý điều này cho tôi không? Làm thế nào để làm sạch khi lò phản ứng dừng lại?
[Cập nhật]
- Cơ sở dữ liệu là MySQL với InnoDB.
Tại sao nên chọn mẫu này với bể xử lý?
Triển khai hiện tại sử dụng một mẫu khác nhau nơi mỗi người tiêu dùng chạy theo chuỗi riêng của mình. Bằng cách nào đó điều này không hoạt động rất tốt. Đã có khoảng 200 người tiêu dùng mỗi người chạy theo chủ đề của riêng họ và hệ thống đang phát triển nhanh chóng. Để mở rộng quy mô tốt hơn, ý tưởng là phân tách các mối quan tâm và tiêu thụ thông báo trong vòng lặp I/O và ủy quyền xử lý cho một nhóm. Tất nhiên, hiệu suất của toàn bộ hệ thống chủ yếu là I/O bị ràng buộc. Tuy nhiên, CPU là một vấn đề khi xử lý các tập kết quả lớn.
Lý do khác là "dễ sử dụng". Trong khi việc xử lý kết nối và tiêu thụ thông điệp được triển khai không đồng bộ, mã trong công nhân có thể đồng bộ và đơn giản.
Chẳng bao lâu nó trở nên hiển nhiên khi truy cập hệ thống từ xa thông qua kết nối mạng liên tục từ bên trong công nhân là một vấn đề. Đây là những gì các CommunicationChannels là cho: Bên trong công nhân, tôi có thể cấp yêu cầu cho các tin nhắn xe buýt thông qua các kênh này.
Một trong những ý tưởng hiện tại của tôi là xử lý truy cập DB theo cách tương tự: Chuyển câu lệnh qua hàng đợi đến vòng lặp sự kiện nơi chúng được gửi tới DB. Tuy nhiên, tôi không có ý tưởng làm thế nào để làm điều này với SQLAlchemy. Đâu là điểm vào? Đối tượng cần phải là pickled
khi chúng được chuyển qua hàng đợi. Làm cách nào để có được một đối tượng như vậy từ một truy vấn SQA? Giao tiếp với cơ sở dữ liệu phải hoạt động không đồng bộ để không chặn vòng lặp sự kiện. Tôi có thể sử dụng ví dụ: aiomysql như một trình điều khiển cơ sở dữ liệu cho SQA?
Vì vậy, mỗi công nhân là quá trình riêng của mình? Không thể chia sẻ các kết nối sau đó, vì vậy có thể bạn nên khởi tạo từng nhóm SQA (cục bộ) với giới hạn kết nối tối đa 1 hoặc 2. Sau đó quan sát, có thể thông qua cơ sở dữ liệu (mà db?) Những gì các kết nối đang được sinh ra/giết chết. Bị bỏng nặng chỉ vì điều này - điều bạn không muốn làm là triển khai hồ bơi ngây thơ của riêng bạn trên đầu trang của SQA.Hoặc cố gắng để xác định nếu một kết nối SQA được đóng hay không. –
@JLPeyret: Tôi đã cập nhật câu hỏi với thông tin bạn yêu cầu. Và không ... Tôi không định triển khai hồ bơi kết nối của riêng mình. – roman
Vì vậy, tôi nghĩ rằng tôi nhớ rằng các kết nối không thể vượt qua các quy trình (trong ý nghĩa hệ điều hành của từ, để phân biệt các chủ đề). Và tôi biết rằng các mối liên hệ không êm dịu chút nào. Bạn sẽ có thể thông báo các câu lệnh sql "chết" (string) nhưng tôi tin rằng bạn sẽ gặp khó khăn khi đi qua các kết nối db, tôi nghĩ rằng có thể bao gồm các kết quả SQA. Suy đoán về kết thúc của tôi, nhưng với mức độ nào đó chơi với cách sử dụng SQA lẻ để biện minh cho nó. –