2013-02-28 21 views
11

Tôi đang cố tìm cách sử dụng Redis và Tornado một cách không đồng bộ. Tôi tìm thấy số tornado-redis nhưng tôi cần nhiều hơn là chỉ thêm một mã số yield.Làm cách nào để sử dụng Tornado và Redis một cách không đồng bộ?

Tôi có đoạn mã sau:

import redis 
import tornado.web 

class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = redis.StrictRedis(port=6279) 
     pubsub = client.pubsub() 
     pubsub.subscribe('test_channel') 

     for item in pubsub.listen(): 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 

     self.write(item['data']) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    tornado.ioloop.IOLoop.instance().start() 

tôi cần nhận được quyền truy cập url / và nhận được "Hello World", trong khi có một yêu cầu đang chờ trong /wait. Tôi có thể làm như thế nào?

+1

Redis pub/sub không nên được sử dụng trong một 'web.RequestHandler', bởi vì nó sẽ chặn ioloop trong khi chờ đợi vào' pubsub .listen() '. Hãy xem http://tornadogists.org/532067/ để biết ví dụ về websocket đang hoạt động. –

+0

Websocket là một lựa chọn tốt, tuy nhiên ứng dụng của tôi cần làm việc trong các trình duyệt không có hỗ trợ cho websockets. Tôi đang sử dụng phiếu thăm dò ý kiến ​​dài. Đó là lý do tôi cần một 'async get'. –

+0

@HelieelsonSantos trong trường hợp đó, đặt cược tốt nhất của bạn là duy trì trạng thái cục bộ của lịch sử kênh đã đăng ký (được đưa vào bởi một chuỗi riêng biệt), và sau đó viết trạng thái đó ngay lập tức cho phản hồi và hoàn tất thao tác 'get'. Khách hàng nên duy trì một số bản ghi của chỉ mục nhận được cuối cùng, hoặc cuối cùng có được thời gian, vv, cho phép bạn duy trì tính liên tục cho các khách hàng khác nhau. Tôi sẽ viết một câu trả lời với một ví dụ trong một vài giờ khi tôi nhận được thời gian. –

Trả lời

5

Bạn không nên sử dụng Redis pub/sub trong chuỗi Tornado chính, vì nó sẽ chặn vòng lặp IO. Bạn có thể xử lý việc bỏ phiếu dài từ các máy khách web trong chuỗi chính, nhưng bạn nên tạo một chuỗi riêng biệt để nghe Redis. Sau đó, bạn có thể sử dụng ioloop.add_callback() và/hoặc threading.Queue để giao tiếp với chuỗi chính khi bạn nhận được tin nhắn.

1

Được rồi, vì vậy đây là ví dụ của tôi như thế nào tôi sẽ làm điều đó với yêu cầu get.

tôi bổ sung thêm hai thành phần chính:

Đầu tiên là một người biết lắng nghe pubsub ren đơn giản mà gắn thêm tin nhắn mới vào một đối tượng danh sách địa phương. Tôi cũng đã thêm danh sách người truy cập vào lớp học, vì vậy bạn có thể đọc từ chuỗi người nghe như thể bạn đang đọc từ một danh sách thông thường. Theo như của bạn WebRequest là có liên quan, bạn chỉ cần đọc dữ liệu từ một đối tượng danh sách địa phương. Điều này trả về ngay lập tức và không chặn yêu cầu hiện tại hoàn thành hoặc yêu cầu trong tương lai không được chấp nhận và xử lý.

class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 

Thứ hai là lớp ApplicationMixin. Đây là đối tượng phụ mà bạn có lớp yêu cầu web của bạn kế thừa để thêm chức năng và thuộc tính. Trong trường hợp này, nó sẽ kiểm tra xem một trình lắng nghe kênh đã tồn tại cho kênh được yêu cầu hay chưa, và tạo một cái nếu không tìm thấy, và trả về trình xử lý người nghe cho WebRequest.

# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

Lớp WebRequest nay đối xử với người nghe như thể nó là một danh sách tĩnh (ghi nhớ rằng bạn cần phải cung cấp cho self.write một chuỗi)

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 

Cuối cùng, sau khi ứng dụng được tạo ra, tôi đã thêm một từ điển trống làm thuộc tính

# add a dictionary containing channels to your application 
application.channels = {} 

Cũng như một số dọn dẹp các chủ đề đang chạy, khi bạn thoát khỏi ứng dụng

# clean up the subscribed channels 
for channel in application.channels: 
    application.channels[channel].stop() 
    application.channels[channel].join() 

Các mã hoàn chỉnh:

import threading 
import redis 
import tornado.web 



class OpenChannel(threading.Thread): 
    def __init__(self, channel, host = None, port = None): 
     threading.Thread.__init__(self) 
     self.lock = threading.Lock() 
     self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379) 
     self.pubsub = self.redis.pubsub() 
     self.pubsub.subscribe(channel) 

     self.output = [] 

    # lets implement basic getter methods on self.output, so you can access it like a regular list 
    def __getitem__(self, item): 
     with self.lock: 
      return self.output[item] 

    def __getslice__(self, start, stop = None, step = None): 
     with self.lock: 
      return self.output[start:stop:step] 

    def __str__(self): 
     with self.lock: 
      return self.output.__str__() 

    # thread loop 
    def run(self): 
     for message in self.pubsub.listen(): 
      with self.lock: 
       self.output.append(message['data']) 

    def stop(self): 
     self._Thread__stop() 


# add a method to the application that will return existing channels 
# or create non-existing ones and then return them 
class ApplicationMixin(object): 
    def GetChannel(self, channel, host = None, port = None): 
     if channel not in self.application.channels: 
      self.application.channels[channel] = OpenChannel(channel, host, port) 
      self.application.channels[channel].start() 
     return self.application.channels[channel] 

class ReadChannel(tornado.web.RequestHandler, ApplicationMixin): 
    @tornado.web.asynchronous 
    def get(self, channel): 
     # get the channel 
     channel = self.GetChannel(channel) 
     # write out its entire contents as a list 
     self.write('{}'.format(channel[:])) 
     self.finish() # not necessary? 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/channel/(?P<channel>\S+)", ReadChannel), 
]) 


# add a dictionary containing channels to your application 
application.channels = {} 


if __name__ == '__main__': 
    application.listen(8888) 
    print 'running' 
    try: 
     tornado.ioloop.IOLoop.instance().start() 
    except KeyboardInterrupt: 
     pass 

    # clean up the subscribed channels 
    for channel in application.channels: 
     application.channels[channel].stop() 
     application.channels[channel].join() 
+0

Bạn có thể dễ dàng thay thế danh sách bằng hàng đợi hoặc một số đối tượng khác hỗ trợ truy cập không chặn và chỉ trả lại các tin nhắn nhận được từ yêu cầu trước đó. Tuy nhiên, bạn sẽ phải duy trì một hàng đợi cho mỗi máy khách và đảm bảo sử dụng các khóa không chặn và xử lý đúng các trường hợp ngoại lệ 'Empty'. –

2

Đối với Python> = 3.3, tôi khuyên bạn nên sử dụng aioredis. tôi đã không kiểm tra mã dưới đây nhưng nó phải là một cái gì đó như thế:

import redis 
import tornado.web 
from tornado.web import RequestHandler 

import aioredis 
import asyncio 
from aioredis.pubsub import Receiver 


class WaiterHandler(tornado.web.RequestHandler): 

    @tornado.web.asynchronous 
    def get(self): 
     client = await aioredis.create_redis((host, 6279), encoding="utf-8", loop=IOLoop.instance().asyncio_loop) 

     ch = redis.channels['test_channel'] 
     result = None 
     while await ch.wait_message(): 
      item = await ch.get() 
      if item['type'] == 'message': 
       print item['channel'] 
       print item['data'] 
       result = item['data'] 

     self.write(result) 
     self.finish() 


class GetHandler(tornado.web.RequestHandler): 

    def get(self): 
     self.write("Hello world") 


application = tornado.web.Application([ 
    (r"/", GetHandler), 
    (r"/wait", WaiterHandler), 
]) 

if __name__ == '__main__': 
    print 'running' 
    tornado.ioloop.IOLoop.configure('tornado.platform.asyncio.AsyncIOLoop') 
    server = tornado.httpserver.HTTPServer(application) 
    server.bind(8888) 
    # zero means creating as many processes as there are cores. 
    server.start(0) 
    tornado.ioloop.IOLoop.instance().start() 
Các vấn đề liên quan