23

Tôi có một lò phản ứng mà lấy về các thông điệp từ một nhà môi giới RabbitMQ và gây nên các phương pháp nhân để xử lý các thông điệp trong một hồ bơi quá trình, một cái gì đó như thế này:Làm thế nào để xử lý các kết nối SQLAlchemy trong ProcessPool?

Reactor

này được thực hiện sử dụng python asyncio, loop.run_in_executor()concurrent.futures.ProcessPoolExecutor.

Bây giờ tôi muốn truy cập cơ sở dữ liệu trong các phương thức công nhân sử dụng SQLAlchemy. Chủ yếu là xử lý sẽ rất đơn giản và nhanh chóng hoạt động CRUD.

Lò phản ứng sẽ xử lý 10-50 tin nhắn mỗi giây ngay từ đầu, do đó không thể mở kết nối cơ sở dữ liệu mới cho mọi yêu cầu. Thay vào đó, tôi muốn duy trì một kết nối liên tục cho mỗi quá trình.

Câu hỏi của tôi là: Làm thế nào tôi có thể thực hiện việc này? Tôi có thể lưu trữ chúng trong một biến toàn cục không? Hồ bơi kết nối SQA có xử lý điều này cho tôi không? Làm thế nào để làm sạch khi lò phản ứng dừng lại?

[Cập nhật]

  • Cơ sở dữ liệu là MySQL với InnoDB.

Tại sao nên chọn mẫu này với bể xử lý?

Triển khai hiện tại sử dụng một mẫu khác nhau nơi mỗi người tiêu dùng chạy theo chuỗi riêng của mình. Bằng cách nào đó điều này không hoạt động rất tốt. Đã có khoảng 200 người tiêu dùng mỗi người chạy theo chủ đề của riêng họ và hệ thống đang phát triển nhanh chóng. Để mở rộng quy mô tốt hơn, ý tưởng là phân tách các mối quan tâm và tiêu thụ thông báo trong vòng lặp I/O và ủy quyền xử lý cho một nhóm. Tất nhiên, hiệu suất của toàn bộ hệ thống chủ yếu là I/O bị ràng buộc. Tuy nhiên, CPU là một vấn đề khi xử lý các tập kết quả lớn.

Lý do khác là "dễ sử dụng". Trong khi việc xử lý kết nối và tiêu thụ thông điệp được triển khai không đồng bộ, mã trong công nhân có thể đồng bộ và đơn giản.

Chẳng bao lâu nó trở nên hiển nhiên khi truy cập hệ thống từ xa thông qua kết nối mạng liên tục từ bên trong công nhân là một vấn đề. Đây là những gì các CommunicationChannels là cho: Bên trong công nhân, tôi có thể cấp yêu cầu cho các tin nhắn xe buýt thông qua các kênh này.

Một trong những ý tưởng hiện tại của tôi là xử lý truy cập DB theo cách tương tự: Chuyển câu lệnh qua hàng đợi đến vòng lặp sự kiện nơi chúng được gửi tới DB. Tuy nhiên, tôi không có ý tưởng làm thế nào để làm điều này với SQLAlchemy. Đâu là điểm vào? Đối tượng cần phải là pickled khi chúng được chuyển qua hàng đợi. Làm cách nào để có được một đối tượng như vậy từ một truy vấn SQA? Giao tiếp với cơ sở dữ liệu phải hoạt động không đồng bộ để không chặn vòng lặp sự kiện. Tôi có thể sử dụng ví dụ: aiomysql như một trình điều khiển cơ sở dữ liệu cho SQA?

+0

Vì vậy, mỗi công nhân là quá trình riêng của mình? Không thể chia sẻ các kết nối sau đó, vì vậy có thể bạn nên khởi tạo từng nhóm SQA (cục bộ) với giới hạn kết nối tối đa 1 hoặc 2. Sau đó quan sát, có thể thông qua cơ sở dữ liệu (mà db?) Những gì các kết nối đang được sinh ra/giết chết. Bị bỏng nặng chỉ vì điều này - điều bạn không muốn làm là triển khai hồ bơi ngây thơ của riêng bạn trên đầu trang của SQA.Hoặc cố gắng để xác định nếu một kết nối SQA được đóng hay không. –

+0

@JLPeyret: Tôi đã cập nhật câu hỏi với thông tin bạn yêu cầu. Và không ... Tôi không định triển khai hồ bơi kết nối của riêng mình. – roman

+0

Vì vậy, tôi nghĩ rằng tôi nhớ rằng các kết nối không thể vượt qua các quy trình (trong ý nghĩa hệ điều hành của từ, để phân biệt các chủ đề). Và tôi biết rằng các mối liên hệ không êm dịu chút nào. Bạn sẽ có thể thông báo các câu lệnh sql "chết" (string) nhưng tôi tin rằng bạn sẽ gặp khó khăn khi đi qua các kết nối db, tôi nghĩ rằng có thể bao gồm các kết quả SQA. Suy đoán về kết thúc của tôi, nhưng với mức độ nào đó chơi với cách sử dụng SQA lẻ để biện minh cho nó. –

Trả lời

6

yêu cầu của bạn của kết nối một cơ sở dữ liệu cho mỗi quá trình quá trình bơi có thể dễ dàng thỏa mãn nếu một số chăm sóc được thực hiện vào cách bạn nhanh chóng session, giả sử bạn đang làm việc với orm, trong quá trình lao động.

Một giải pháp đơn giản sẽ có một thế giới session mà bạn sử dụng lại trên yêu cầu:

# db.py 
engine = create_engine("connection_uri", pool_size=1, max_overflow=0) 
DBSession = scoped_session(sessionmaker(bind=engine)) 

Và vào các nhiệm vụ công nhân:

# task.py 
from db import engine, DBSession 
def task(): 
    DBSession.begin() # each task will get its own transaction over the global connection 
    ... 
    DBSession.query(...) 
    ... 
    DBSession.close() # cleanup on task end 

Arguments pool_sizemax_overflowcustomize mặc định QueuePool sử dụng bởi create_engine. pool_size sẽ đảm bảo quá trình của bạn chỉ giữ được 1 kết nối còn tồn tại cho mỗi quá trình trong nhóm xử lý.

Nếu bạn muốn kết nối lại, bạn có thể sử dụng DBSession.remove() để xóa phiên khỏi sổ đăng ký và sẽ kết nối lại vào lần sử dụng DBSession tiếp theo. Bạn cũng có thể sử dụng đối số recycle của Pool để kết nối lại kết nối sau khoảng thời gian được chỉ định.

Trong quá trình phát triển/gỡ lỗi, bạn có thể sử dụng AssertionPool sẽ tăng ngoại lệ nếu nhiều kết nối được kiểm tra từ hồ bơi, xem switching pool implementations về cách thực hiện điều đó.

+0

Vì vậy, về cơ bản bạn đề nghị rằng tôi không nên lo lắng vì hồ bơi SQA sẽ xử lý quyền ra khỏi hộp? Điều này sẽ được tốt đẹp! Tôi sẽ di chuyển ứng dụng chính của chúng tôi với 200 người tiêu dùng và 20.000 dòng mã cho kiến ​​trúc phần mềm mới trong vài ngày tới và xem nó có hoạt động hay không. – roman

+0

@roman Chúc may mắn với refactor của bạn, nếu bạn có bất kỳ vấn đề không ngần ngại gửi bình luận ở đây, và nếu bạn cảm thấy rằng tôi trả lời câu hỏi của bạn nó sẽ được tốt đẹp để đánh dấu này là chấp nhận :). – olokki

+0

Dường như làm việc tốt cho đến nay! :) Phần này trong tài liệu nên được đề cập đến Tôi nghĩ rằng http://docs.sqlalchemy.org/en/rel_1_1/core/pooling.html?highlight=multiprocessing#using-connection-pools-with-multiprocessing. Người ta phải chăm sóc đặc biệt liên quan đến đa xử lý. – roman

0

@roman: Rất vui khi bạn có mặt ở đó.

Tôi đã ở trong một tình huống tương tự trước vì vậy đây là 2 của tôi cent: trừ khi chỉ tiêu dùng này "đọc""viết" tin nhắn, mà không làm bất cứ proccessing thực sự của nó, bạn có thể tái thiết kế người tiêu dùng này là người tiêu dùng/nhà sản xuất sẽ tiêu thụ thông báo, nó sẽ xử lý thông báo và sau đó sẽ đưa kết quả vào hàng đợi khác, hàng đợi đó (tin nhắn được xử lý để nói) có thể được đọc bởi 1..N non các quy trình không đồng bộ chung có thể mở kết nối DB trong toàn bộ vòng đời của nó.

Tôi có thể mở rộng câu trả lời, nhưng tôi không biết liệu phương pháp này có phù hợp với nhu cầu của bạn hay không, nếu có, tôi có thể cung cấp cho bạn chi tiết hơn về thiết kế mở rộng.

+0

Tôi đã xem xét một cách tiếp cận như vậy tuy nhiên tôi nghĩ rằng nó sẽ rất khó khăn để có được quyền xử lý giao dịch. Tôi nghĩ tôi không muốn thử xây dựng trình quản lý giao dịch được phân phối của riêng mình. – roman

0

Phương pháp tiếp cận đã phục vụ tôi thực sự tốt là sử dụng máy chủ web để xử lý và mở rộng quy mô nhóm xử lý. bình-sqlalchemy ngay cả trong trạng thái mặc định của nó sẽ giữ một hồ bơi kết nối và không đóng mỗi kết nối trên mỗi chu kỳ phản ứng yêu cầu.

Trình thực thi asyncio chỉ có thể gọi các điểm kết thúc url để thực thi các chức năng của bạn. Lợi ích bổ sung là vì tất cả các quy trình thực hiện công việc đều nằm sau url, bạn có thể chia tỷ lệ hồ bơi công nhân của mình trên các máy mutliple, thêm nhiều quy trình hơn thông qua gunicorn hoặc một trong nhiều phương pháp khác để mở rộng một máy chủ wsgi đơn giản. Thêm vào đó bạn nhận được tất cả lòng tốt khoan dung lỗi.

Nhược điểm là bạn có thể chuyển nhiều thông tin hơn trên mạng. Tuy nhiên khi bạn nói vấn đề là CPU bị ràng buộc và có thể bạn sẽ chuyển nhiều dữ liệu hơn đến và đi từ cơ sở dữ liệu.

+0

Khi tôi nói CPU là một vấn đề, tôi không có nghĩa là tải công việc lớn là CPU bị ràng buộc! Nó không phải là ... Như với cách tiếp cận khác ở trên, tôi thấy một vấn đề nghiêm trọng với xử lý giao dịch ở đây. Để có kết nối mạng không trạng thái ở giữa logic nghiệp vụ và lớp kiên trì có vẻ đáng sợ. – roman

Các vấn đề liên quan