2015-09-19 22 views
5

tôi cấu hình dự án của tôi, đề cập đến câu trả lời này: How to use Flask-SQLAlchemy in a Celery tasknhiệm vụ cần tây Flask không làm việc

extension.py tập tin của tôi:

import flask 
from flask.ext.sqlalchemy import SQLAlchemy 
from config import BaseConfig 
from celery import Celery 
from flask_mail import Mail 
from celery.schedules import crontab 


class FlaskCelery(Celery): 

    def __init__(self, *args, **kwargs): 

     super(FlaskCelery, self).__init__(*args, **kwargs) 
     self.patch_task() 

     if 'app' in kwargs: 
      self.init_app(kwargs['app']) 

    def patch_task(self): 
     TaskBase = self.Task 
     _celery = self 

     class ContextTask(TaskBase): 
      abstract = True 

      def __call__(self, *args, **kwargs): 
       if flask.has_app_context(): 
        return TaskBase.__call__(self, *args, **kwargs) 
       else: 
        with _celery.app.app_context(): 
         return TaskBase.__call__(self, *args, **kwargs) 

     self.Task = ContextTask 

    def init_app(self, app): 
     self.app = app 
     self.config_from_object(app.config) 


mail = Mail() 
db = SQLAlchemy() 
settings = BaseConfig() 
celery = FlaskCelery() 

Sau đó, trong app_settings.py của tôi, tôi đã tạo ra ứng dụng này:

app = Flask('app', instance_relative_config=True) 

Và cần tây được cấu hình:

celery.init_app(app) 

Tôi chạy dự án bình với python manage.py run:

app.run(
     debug=settings.get('DEBUG', False), 
     host=settings.get('HOST', '127.0.0.1'), 
     port=settings.get('PORT', 5000) 
    ) 

Và chạy cần tây:

celery -A manage.celery worker --beat -l debug 

Cần tây log có vẻ tốt:

[tasks] 
    . app.api.tasks.spin_file 
    . app.main.tasks.send_async_email 
    . celery.backend_cleanup 
    . celery.chain 
    ... 

Sau đó, trong views.py, tôi gọi công việc này :

send_async_email.delay(*args, **kwargs) 

Nhưng tất cả các tác vụ đang bị Celery bỏ qua. Không có gì xảy ra, không có lỗi, không có cảnh báo. Không có gì. Tôi đang làm gì sai?

EDIT: Khi tôi bắt đầu cần tây với lệnh này: celery -A manage.celery worker --beat -l debug

tôi nhận được cảnh báo sau đây:

[2015-09-21 10:04:32,220: WARNING/MainProcess] /home/.virtualenvs/myproject/local/lib/python2.7/site-packages/celery/app/control.py:36: DuplicateNodenameWarning: Received multiple replies from node name: 'name'. 
Please make sure you give each node a unique nodename using the `-n` option. 
    pluralize(len(dupes), 'name'), ', '.join(sorted(dupes)), 
+1

lỗi Điều đó làm cho nó âm thanh như bạn đã chạy một công nhân. Giết bất kỳ công nhân cần tây nào trước khi bắt đầu lại công nhân. Công nhân sẽ không thấy thay đổi về mã cho đến khi bạn khởi động lại mã. – davidism

Trả lời

1

Tôi không chắc chắn rằng điều này sẽ giúp bạn nhưng tôi đang sử dụng mã này trên nhiều dự án của tôi khi nào tôi cần cần tây:

from flask import Flask, request, jsonify as jsn 
from celery import Celery 
app = Flask(__name__) 
app.config.update(dict(
    SECRET_KEY='blabla' 
    ) 
) 
# Celery configuration 
app.config['CELERY_BROKER_URL'] = 'redis://localhost:6379/0' 
app.config['CELERY_RESULT_BACKEND'] = 'database' 
app.config['CELERY_RESULT_DBURI'] = 'sqlite:///temp.db' 
app.config['CELERY_TRACK_STARTED'] = True 
app.config['CELERY_SEND_EVENTS'] = True 

# Initialize Celery 
celery = Celery(app.name, broker=app.config['CELERY_BROKER_URL']) 
celery.conf.update(app.config) 

@celery.task 
def do_something(data): 

    from celery import current_task 
    import os 
    import subprocess 
    with app.app_context(): 
     #run some bash script with some params in my case 

Và sau đó tôi đang chạy cần tây với người giám sát qua:

#!/bin/bash 
cd /project/location && . venv/bin/activate && celery worker -A appname.celery --loglevel=info --purge #appname is my main flask file 

Và dĩ nhiên trong lộ trình của tôi có somthing như

@app.route('/someroute', methods=["POST"]) 
def someroute(): 
    result = do_something.delay(data) 
    print result.id 
Các vấn đề liên quan