2011-11-20 54 views
11

Vì không ai cung cấp giải pháp cho this post cộng với thực tế là tôi rất cần một giải pháp, đây là tình huống của tôi và một số giải pháp/ý tưởng trừu tượng để tranh luận.Cần tây tích hợp cần tây

chồng của tôi:

  1. Tornado
  2. Cần tây
  3. MongoDB
  4. Redis
  5. RabbitMQ

Vấn đề của tôi: Tìm một cách để Tornado để gửi một cần tây công việc (đã giải quyết) một d sau đó thu thập kết quả không đồng bộ (bất kỳ ý tưởng nào?).

Kịch bản 1: (request/response Hack cộng webhook)

  • Tornado nhận (người dùng) yêu cầu, sau đó lưu trong bộ nhớ địa phương (hoặc trong Redis) a {jobID: (người sử dụng) theo yêu cầu} để nhớ nơi truyền bá phản ứng và kích hoạt nhiệm vụ cần tây với jobID
  • Khi cần tây hoàn thành nhiệm vụ, nó thực hiện một webhook tại một số url và nói với lốc xoáy rằng jobID này đã kết thúc (cộng với kết quả)
  • Tornado truy xuất (người dùng) yêu cầu và gửi trả lời cho (người dùng)

Điều này có thể xảy ra không? Liệu nó có logic nào không?

Kịch bản 2: (cơn lốc xoáy cộng dài polling)

  • Tornado công văn công việc cần tây và trả về một số dữ liệu json chính cho khách hàng (jQuery)
  • jQuery hiện một số dài bỏ phiếu khi nhận của json chính, giả sử, mỗi x micro giây và phản hồi lốc xoáy theo một số cờ cơ sở dữ liệu. Khi nhiệm vụ cần tây hoàn thành, cờ cơ sở dữ liệu này được đặt thành True, sau đó jQuery "loop" được hoàn thành.

Điều này có hiệu quả không?

Bất kỳ ý tưởng/lược đồ nào khác?

Trả lời

4

Tôi tình cờ gặp câu hỏi này và nhấn vào kết quả phụ trợ nhiều lần không nhìn tối ưu với tôi. Vì vậy, tôi đã thực hiện một Mixin tương tự như kịch bản của bạn 1 bằng cách sử dụng Unix Sockets.

Nó thông báo cho Tornado ngay khi nhiệm vụ kết thúc (chính xác, ngay sau khi nhiệm vụ tiếp theo chạy trong chuỗi) và chỉ truy cập kết quả phụ trợ một lần. Đây là link.

+0

Công việc tuyệt vời Eren! – hymloth

9

Giải pháp của tôi liên quan đến việc bỏ phiếu từ cơn lốc xoáy để cần tây:

class CeleryHandler(tornado.web.RequestHandlerr): 

    @tornado.web.asynchronous 
    def get(self):  

     task = yourCeleryTask.delay(**kwargs) 

     def check_celery_task(): 
      if task.ready(): 
       self.write({'success':True}) 
       self.set_header("Content-Type", "application/json") 
       self.finish() 
      else: 
       tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task) 

     tornado.ioloop.IOLoop.instance().add_timeout(datetime.timedelta(0.00001), check_celery_task) 

Đây là post về nó.

+0

bạn có thể xin đăng lại link blog của bạn, nó được đưa xuống! – vgoklani

+1

Đã được chỉnh sửa thành liên kết archive.org – rbu

8

Đây là giải pháp cho vấn đề của chúng tôi. Vì chúng tôi tìm kiếm kết quả trong một số trình xử lý trong ứng dụng của chúng tôi, chúng tôi đã thực hiện tra cứu cần tây một lớp mixin.

Điều này cũng làm cho mã dễ đọc hơn với mẫu tornado.gen.

from functools import partial 

class CeleryResultMixin(object): 
    """ 
    Adds a callback function which could wait for the result asynchronously 
    """ 
    def wait_for_result(self, task, callback): 
     if task.ready(): 
      callback(task.result) 
     else: 
      # TODO: Is this going to be too demanding on the result backend ? 
      # Probably there should be a timeout before each add_callback 
      tornado.ioloop.IOLoop.instance().add_callback(
       partial(self.wait_for_result, task, callback) 
      ) 


class ARemoteTaskHandler(CeleryResultMixin, tornado.web.RequestHandler): 
    """Execute a task asynchronously over a celery worker. 
    Wait for the result without blocking 
    When the result is available send it back 
    """ 
    @tornado.web.asynchronous 
    @tornado.web.authenticated 
    @tornado.gen.engine 
    def post(self): 
     """Test the provided Magento connection 
     """ 
     task = expensive_task.delay(
      self.get_argument('somearg'), 
     ) 

     result = yield tornado.gen.Task(self.wait_for_result, task) 

     self.write({ 
      'success': True, 
      'result': result.some_value 
     }) 
     self.finish() 
3

Bây giờ, https://github.com/mher/tornado-celery đến để giải thoát ...

class GenAsyncHandler(web.RequestHandler): 
    @asynchronous 
    @gen.coroutine 
    def get(self): 
     response = yield gen.Task(tasks.sleep.apply_async, args=[3]) 
     self.write(str(response.result)) 
     self.finish() 
Các vấn đề liên quan