Tôi gặp khó khăn khi hiểu cách mở và đóng đúng các phiên cơ sở dữ liệu một cách hiệu quả, như tôi đã hiểu bởi tài liệu sqlalchemy, nếu tôi sử dụng scoped_session để xây dựng đối tượng Session của tôi, và sau đó sử dụng Session đã trả về đối tượng để tạo phiên, đó là chủ đề an toàn, vì vậy về cơ bản, mỗi chuỗi sẽ nhận được phiên riêng của nó và sẽ không có vấn đề gì với nó. Bây giờ ví dụ dưới đây hoạt động, tôi đặt nó trong một vòng lặp vô hạn để xem nó có đóng đúng các phiên và nếu tôi theo dõi nó đúng cách (trong mysql bằng cách thực thi "SHOW PROCESSLIST;"), các kết nối chỉ tiếp tục phát triển, nó không đóng chúng , mặc dù tôi đã sử dụng session.close() và thậm chí loại bỏ đối tượng scoped_session ở cuối mỗi lần chạy. Tôi đang làm gì sai? Mục tiêu của tôi trong một ứng dụng lớn hơn là sử dụng số lượng tối thiểu các kết nối cơ sở dữ liệu cần thiết, vì việc thực hiện làm việc hiện tại của tôi tạo ra một phiên mới trong mọi phương thức mà nó được yêu cầu và đóng nó trước khi trở về, điều này có vẻ không hiệu quả.SQLAlchemy xử lý phiên thích hợp trong các ứng dụng đa luồng
from sqlalchemy import create_engine
from sqlalchemy.orm import sessionmaker, scoped_session
from threading import Thread
from Queue import Queue, Empty as QueueEmpty
from models import MyModel
DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname'
class MTWorker(object):
def __init__(self, worker_count=5):
self.task_queue = Queue()
self.worker_count = worker_count
self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False)
self.DBSession = scoped_session(
sessionmaker(
autoflush=True,
autocommit=False,
bind=self.db_engine
)
)
def _worker(self):
db_session = self.DBSession()
while True:
try:
task_id = self.task_queue.get(False)
try:
item = db_session.query(MyModel).filter(MyModel.id == task_id).one()
# do something with item
except Exception as exc:
# if an error occurrs we skip it
continue
finally:
db_session.commit()
self.task_queue.task_done()
except QueueEmpty:
db_session.close()
return
def start(self):
try:
db_session = self.DBSession()
all_items = db_session.query(MyModel).all()
for item in all_items:
self.task_queue.put(item.id)
for _i in range(self.worker_count):
t = Thread(target=self._worker)
t.start()
self.task_queue.join()
finally:
db_session.close()
self.DBSession.remove()
if __name__ == '__main__':
while True:
mt_worker = MTWorker(worker_count=50)
mt_worker.start()
Cảm ơn bạn đã cung cấp thông tin. King kính trọng! – andrean