Được rồi, vì vậy đây là ví dụ của tôi như thế nào tôi sẽ làm điều đó với yêu cầu get.
tôi bổ sung thêm hai thành phần chính:
Đầu tiên là một người biết lắng nghe pubsub ren đơn giản mà gắn thêm tin nhắn mới vào một đối tượng danh sách địa phương. Tôi cũng đã thêm danh sách người truy cập vào lớp học, vì vậy bạn có thể đọc từ chuỗi người nghe như thể bạn đang đọc từ một danh sách thông thường. Theo như của bạn WebRequest
là có liên quan, bạn chỉ cần đọc dữ liệu từ một đối tượng danh sách địa phương. Điều này trả về ngay lập tức và không chặn yêu cầu hiện tại hoàn thành hoặc yêu cầu trong tương lai không được chấp nhận và xử lý.
class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)
self.output = []
# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]
def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]
def __str__(self):
with self.lock:
return self.output.__str__()
# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])
def stop(self):
self._Thread__stop()
Thứ hai là lớp ApplicationMixin. Đây là đối tượng phụ mà bạn có lớp yêu cầu web của bạn kế thừa để thêm chức năng và thuộc tính. Trong trường hợp này, nó sẽ kiểm tra xem một trình lắng nghe kênh đã tồn tại cho kênh được yêu cầu hay chưa, và tạo một cái nếu không tìm thấy, và trả về trình xử lý người nghe cho WebRequest.
# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]
Lớp WebRequest nay đối xử với người nghe như thể nó là một danh sách tĩnh (ghi nhớ rằng bạn cần phải cung cấp cho self.write
một chuỗi)
class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?
Cuối cùng, sau khi ứng dụng được tạo ra, tôi đã thêm một từ điển trống làm thuộc tính
# add a dictionary containing channels to your application
application.channels = {}
Cũng như một số dọn dẹp các chủ đề đang chạy, khi bạn thoát khỏi ứng dụng
# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()
Các mã hoàn chỉnh:
import threading
import redis
import tornado.web
class OpenChannel(threading.Thread):
def __init__(self, channel, host = None, port = None):
threading.Thread.__init__(self)
self.lock = threading.Lock()
self.redis = redis.StrictRedis(host = host or 'localhost', port = port or 6379)
self.pubsub = self.redis.pubsub()
self.pubsub.subscribe(channel)
self.output = []
# lets implement basic getter methods on self.output, so you can access it like a regular list
def __getitem__(self, item):
with self.lock:
return self.output[item]
def __getslice__(self, start, stop = None, step = None):
with self.lock:
return self.output[start:stop:step]
def __str__(self):
with self.lock:
return self.output.__str__()
# thread loop
def run(self):
for message in self.pubsub.listen():
with self.lock:
self.output.append(message['data'])
def stop(self):
self._Thread__stop()
# add a method to the application that will return existing channels
# or create non-existing ones and then return them
class ApplicationMixin(object):
def GetChannel(self, channel, host = None, port = None):
if channel not in self.application.channels:
self.application.channels[channel] = OpenChannel(channel, host, port)
self.application.channels[channel].start()
return self.application.channels[channel]
class ReadChannel(tornado.web.RequestHandler, ApplicationMixin):
@tornado.web.asynchronous
def get(self, channel):
# get the channel
channel = self.GetChannel(channel)
# write out its entire contents as a list
self.write('{}'.format(channel[:]))
self.finish() # not necessary?
class GetHandler(tornado.web.RequestHandler):
def get(self):
self.write("Hello world")
application = tornado.web.Application([
(r"/", GetHandler),
(r"/channel/(?P<channel>\S+)", ReadChannel),
])
# add a dictionary containing channels to your application
application.channels = {}
if __name__ == '__main__':
application.listen(8888)
print 'running'
try:
tornado.ioloop.IOLoop.instance().start()
except KeyboardInterrupt:
pass
# clean up the subscribed channels
for channel in application.channels:
application.channels[channel].stop()
application.channels[channel].join()
Redis pub/sub không nên được sử dụng trong một 'web.RequestHandler', bởi vì nó sẽ chặn ioloop trong khi chờ đợi vào' pubsub .listen() '. Hãy xem http://tornadogists.org/532067/ để biết ví dụ về websocket đang hoạt động. –
Websocket là một lựa chọn tốt, tuy nhiên ứng dụng của tôi cần làm việc trong các trình duyệt không có hỗ trợ cho websockets. Tôi đang sử dụng phiếu thăm dò ý kiến dài. Đó là lý do tôi cần một 'async get'. –
@HelieelsonSantos trong trường hợp đó, đặt cược tốt nhất của bạn là duy trì trạng thái cục bộ của lịch sử kênh đã đăng ký (được đưa vào bởi một chuỗi riêng biệt), và sau đó viết trạng thái đó ngay lập tức cho phản hồi và hoàn tất thao tác 'get'. Khách hàng nên duy trì một số bản ghi của chỉ mục nhận được cuối cùng, hoặc cuối cùng có được thời gian, vv, cho phép bạn duy trì tính liên tục cho các khách hàng khác nhau. Tôi sẽ viết một câu trả lời với một ví dụ trong một vài giờ khi tôi nhận được thời gian. –