2014-10-20 15 views
7

Khi một trong các bài kiểm tra đơn vị của tôi xóa một đối tượng SQLAlchemy, đối tượng gây nên một sự kiện after_delete mà gây nên một nhiệm vụ cần tây để xóa một tập tin từ ổ đĩa.kết nối được đóng lại khi một sự kiện SQLAlchemy gây nên một nhiệm vụ cần tây

Nhiệm vụ là CELERY_ALWAYS_EAGER = True khi thử nghiệm.

gist to reproduce the issue easily

Ví dụ này có hai thử nghiệm. Một tác nhân kích hoạt nhiệm vụ trong sự kiện, bên kia sự kiện. Chỉ có một trong sự kiện đóng kết nối.

Để nhanh chóng sao chép các lỗi bạn có thể chạy:

git clone https://gist.github.com/5762792fc1d628843697.git 
cd 5762792fc1d628843697 
virtualenv venv 
. venv/bin/activate 
pip install -r requirements.txt 
python test.py 

Các stack:

$  python test.py 
E 
====================================================================== 
ERROR: test_delete_task (__main__.CeleryTestCase) 
---------------------------------------------------------------------- 
Traceback (most recent call last): 
    File "test.py", line 73, in test_delete_task 
    db.session.commit() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/scoping.py", line 150, in do 
    return getattr(self.registry(), name)(*args, **kwargs) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 776, in commit 
    self.transaction.commit() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 377, in commit 
    self._prepare_impl() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 357, in _prepare_impl 
    self.session.flush() 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 1919, in flush 
    self._flush(objects) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush 
    transaction.rollback(_capture_exception=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/util/langhelpers.py", line 63, in __exit__ 
    compat.reraise(type_, value, traceback) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 2037, in _flush 
    transaction.rollback(_capture_exception=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 393, in rollback 
    self._assert_active(prepared_ok=True, rollback_ok=True) 
    File "/home/brice/Code/5762792fc1d628843697/venv/local/lib/python2.7/site-packages/sqlalchemy/orm/session.py", line 223, in _assert_active 
    raise sa_exc.ResourceClosedError(closed_msg) 
ResourceClosedError: This transaction is closed 

---------------------------------------------------------------------- 
Ran 1 test in 0.014s 

FAILED (errors=1) 
+0

Vui lòng đăng lỗi và theo dõi ngăn xếp. – ACV

+0

Tôi vừa cập nhật với đầu ra thử nghiệm. Tôi cũng đã thêm các lệnh để dễ dàng tái tạo trên máy * nix cục bộ. – deBrice

Trả lời

8

Tôi nghĩ rằng tôi thấy vấn đề - đó là ở cách bạn thiết lập nhiệm vụ cần tây bạn. Nếu bạn loại bỏ các cuộc gọi bối cảnh ứng dụng từ thiết lập cần tây của bạn, tất cả mọi thứ chạy tốt:

class ContextTask(TaskBase): 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     # deleted --> with app.app_context(): 
     return TaskBase.__call__(self, *args, **kwargs) 

Có một cảnh báo lớn trong các tài liệu SQLAlchemy về không bao giờ thay đổi phiên trong after_delete sự kiện: http://docs.sqlalchemy.org/en/latest/orm/events.html#sqlalchemy.orm.events.MapperEvents.after_delete

Vì vậy, tôi nghi ngờ with app.app_context(): đang được gọi trong quá trình xóa, cố gắng đính kèm và/hoặc sửa đổi phiên mà các cửa hàng Flask-SQLAlchemy trong đối tượng app, và do đó toàn bộ điều là vụ đánh bom.

Flask-SQLAlchemy hiện rất nhiều kỳ diệu đằng sau hậu trường cho bạn, nhưng bạn có thể bỏ qua điều này và sử dụng SQLAlchemy trực tiếp. Nếu bạn cần nói chuyện với cơ sở dữ liệu trong khi xóa sự kiện, bạn có thể tạo phiên mới cho db:

@celery.task() 
def my_task(): 
    # obviously here I create a new object 
    session = db.create_scoped_session() 
    session.add(User(id=13, value="random string")) 
    session.commit() 
    return 

Nhưng có vẻ như bạn không cần điều này, bạn chỉ đang cố gắng xóa hình ảnh con đường. Trong trường hợp đó, tôi sẽ chỉ thay đổi nhiệm vụ của bạn để có đường dẫn:

# instance will call the task 
@event.listens_for(User, "after_delete") 
def after_delete(mapper, connection, target): 
    my_task.delay(target.value) 

@celery.task() 
def my_task(image_path): 
    os.remove(image_path) 

Hy vọng điều đó hữu ích - hãy cho tôi biết nếu bất kỳ điều nào không hiệu quả với bạn. Cảm ơn bạn đã thiết lập rất chi tiết, nó thực sự giúp gỡ lỗi.

+0

Bạn đóng đinh nó, xác định lại nhiệm vụ được chạy trong bối cảnh ứng dụng là vấn đề. Tôi sẽ xem liệu có thể định nghĩa một trang trí 'celery.task_with_context' ngoài' celery.task' hiện có hay không. Cảm ơn bạn rất nhiều! – deBrice

+0

Không sao cả! Vui mừng nó đã giúp. –

+1

Oh hai @RachelSanders, ưa thích chạy vào bạn ở đây. – ACV

0

Ask, tác giả của cần tây, cho rằng giải pháp on github

from celery import signals 

def make_celery(app): 
    ... 

    @signals.task_prerun.connect 
    def add_task_flask_context(sender, **kwargs): 
     if not sender.request.is_eager: 
      sender.request.flask_context = app.app_context().__enter__() 

    @signals.task_postrun.connect 
    def cleanup_task_flask_context(sender, **kwargs): 
     flask_context = getattr(sender.request, 'flask_context', None) 
     if flask_context is not None: 
      flask_context.__exit__(None, None, None) 
1

Tương tự như câu trả lời gợi ý của deBrice, nhưng sử dụng phương pháp tương tự như Rachel.

class ContextTask(TaskBase): 
    abstract = True 

    def __call__(self, *args, **kwargs): 
     import flask 
     # tests will be run in unittest app context 
     if flask.current_app: 
      return TaskBase.__call__(self, *args, **kwargs) 
     else: 
      # actual workers need to enter worker app context 
      with app.app_context(): 
       return TaskBase.__call__(self, *args, **kwargs) 
Các vấn đề liên quan