2012-03-08 27 views
21

Tôi gặp khó khăn khi hiểu cách mở và đóng đúng các phiên cơ sở dữ liệu một cách hiệu quả, như tôi đã hiểu bởi tài liệu sqlalchemy, nếu tôi sử dụng scoped_session để xây dựng đối tượng Session của tôi, và sau đó sử dụng Session đã trả về đối tượng để tạo phiên, đó là chủ đề an toàn, vì vậy về cơ bản, mỗi chuỗi sẽ nhận được phiên riêng của nó và sẽ không có vấn đề gì với nó. Bây giờ ví dụ dưới đây hoạt động, tôi đặt nó trong một vòng lặp vô hạn để xem nó có đóng đúng các phiên và nếu tôi theo dõi nó đúng cách (trong mysql bằng cách thực thi "SHOW PROCESSLIST;"), các kết nối chỉ tiếp tục phát triển, nó không đóng chúng , mặc dù tôi đã sử dụng session.close() và thậm chí loại bỏ đối tượng scoped_session ở cuối mỗi lần chạy. Tôi đang làm gì sai? Mục tiêu của tôi trong một ứng dụng lớn hơn là sử dụng số lượng tối thiểu các kết nối cơ sở dữ liệu cần thiết, vì việc thực hiện làm việc hiện tại của tôi tạo ra một phiên mới trong mọi phương thức mà nó được yêu cầu và đóng nó trước khi trở về, điều này có vẻ không hiệu quả.SQLAlchemy xử lý phiên thích hợp trong các ứng dụng đa luồng

from sqlalchemy import create_engine 
from sqlalchemy.orm import sessionmaker, scoped_session 
from threading import Thread 
from Queue import Queue, Empty as QueueEmpty 
from models import MyModel 


DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
     self.db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
     self.DBSession = scoped_session(
      sessionmaker(
       autoflush=True, 
       autocommit=False, 
       bind=self.db_engine 
      ) 
     ) 

    def _worker(self): 
     db_session = self.DBSession() 
     while True: 
      try: 
       task_id = self.task_queue.get(False) 
       try: 
        item = db_session.query(MyModel).filter(MyModel.id == task_id).one() 
        # do something with item 
       except Exception as exc: 
        # if an error occurrs we skip it 
        continue 

       finally: 
        db_session.commit() 
        self.task_queue.task_done() 
      except QueueEmpty: 
       db_session.close() 
       return 

    def start(self): 
     try: 
      db_session = self.DBSession() 
      all_items = db_session.query(MyModel).all() 
      for item in all_items: 
       self.task_queue.put(item.id) 

      for _i in range(self.worker_count): 
       t = Thread(target=self._worker) 
       t.start() 

      self.task_queue.join() 
     finally: 
      db_session.close() 
      self.DBSession.remove() 


if __name__ == '__main__': 
    while True: 
     mt_worker = MTWorker(worker_count=50) 
     mt_worker.start() 

Trả lời

36

Bạn chỉ nên được gọi create_enginescoped_session một lần mỗi quá trình (mỗi cơ sở dữ liệu). Mỗi nhóm sẽ có một số kết nối hoặc phiên riêng của mình (tương ứng), vì vậy, bạn muốn đảm bảo rằng bạn chỉ đang tạo một hồ bơi. Chỉ cần làm cho nó một cấp độ mô-đun toàn cầu. nếu bạn cần quản lý các phiên của mình nhiều hơn trước, bạn có thể không nên sử dụng scoped_session

Một thay đổi khác cần thực hiện là sử dụng DBSession trực tiếp như thể đây là phiên . gọi phương thức phiên trên scoped_session sẽ minh bạch tạo phiên địa phương theo chủ đề, nếu cần và chuyển tiếp cuộc gọi phương thức đến phiên .

Một điều cần lưu ý là pool_size của hồ bơi kết nối, trong đó là 5 theo mặc định. Đối với nhiều ứng dụng tốt, nhưng nếu bạn đang tạo ra một số chủ đề là , bạn có thể cần phải điều chỉnh thông số đó

DATABASE_CONNECTION_INFO = 'mysql://username:[email protected]:3306/dbname' 
db_engine = create_engine(DATABASE_CONNECTION_INFO, echo=False) 
DBSession = scoped_session(
    sessionmaker(
     autoflush=True, 
     autocommit=False, 
     bind=db_engine 
    ) 
) 


class MTWorker(object): 

    def __init__(self, worker_count=5): 
     self.task_queue = Queue() 
     self.worker_count = worker_count 
# snip 
+1

Cảm ơn bạn đã cung cấp thông tin. King kính trọng! – andrean

Các vấn đề liên quan