2009-06-24 13 views
8

Tôi muốn viết một máy chủ mà khách hàng có thể kết nối và nhận các cập nhật định kỳ mà không cần phải thăm dò ý kiến. Vấn đề tôi đã gặp phải với asyncore là nếu bạn không trả về true khi dispatcher.writable() được gọi, bạn phải đợi cho đến sau khi asyncore.loop đã hết thời gian (mặc định là 30s).Asyncore của Python định kỳ gửi dữ liệu bằng cách sử dụng thời gian chờ biến đổi. Có cách nào tốt hơn?

Hai cách tôi đã cố gắng giải quyết vấn đề này là 1) giảm thời gian chờ xuống giá trị thấp hoặc 2) kết nối truy vấn khi cập nhật tiếp theo và tạo ra giá trị thời gian chờ thích hợp. Tuy nhiên, nếu bạn tham khảo 'Chọn Luật' trong 'man 2 select_tut', nó nói, "Bạn nên luôn luôn cố gắng sử dụng select() mà không có thời gian chờ."

Có cách nào tốt hơn để thực hiện việc này không? Xoắn có thể? Tôi muốn thử và tránh các chủ đề phụ. Tôi sẽ bao gồm thời gian chờ dụ biến ở đây:

#!/usr/bin/python 

import time 
import socket 
import asyncore 


# in seconds 
UPDATE_PERIOD = 4.0 

class Channel(asyncore.dispatcher): 

    def __init__(self, sock, sck_map): 
     asyncore.dispatcher.__init__(self, sock=sock, map=sck_map) 
     self.last_update = 0.0 # should update immediately 
     self.send_buf = '' 
     self.recv_buf = '' 

    def writable(self): 
     return len(self.send_buf) > 0 

    def handle_write(self): 
     nbytes = self.send(self.send_buf) 
     self.send_buf = self.send_buf[nbytes:] 

    def handle_read(self): 
     print 'read' 
     print 'recv:', self.recv(4096) 

    def handle_close(self): 
     print 'close' 
     self.close() 

    # added for variable timeout 
    def update(self): 
     if time.time() >= self.next_update(): 
      self.send_buf += 'hello %f\n'%(time.time()) 
      self.last_update = time.time() 

    def next_update(self): 
     return self.last_update + UPDATE_PERIOD 


class Server(asyncore.dispatcher): 

    def __init__(self, port, sck_map): 
     asyncore.dispatcher.__init__(self, map=sck_map) 
     self.port = port 
     self.sck_map = sck_map 
     self.create_socket(socket.AF_INET, socket.SOCK_STREAM) 
     self.bind(("", port)) 
     self.listen(16) 
     print "listening on port", self.port 

    def handle_accept(self): 
     (conn, addr) = self.accept() 
     Channel(sock=conn, sck_map=self.sck_map) 

    # added for variable timeout 
    def update(self): 
     pass 

    def next_update(self): 
     return None 


sck_map = {} 

server = Server(9090, sck_map) 
while True: 
    next_update = time.time() + 30.0 
    for c in sck_map.values(): 
     c.update() # <-- fill write buffers 
     n = c.next_update() 
     #print 'n:',n 
     if n is not None: 
      next_update = min(next_update, n) 
    _timeout = max(0.1, next_update - time.time()) 

    asyncore.loop(timeout=_timeout, count=1, map=sck_map) 
+0

Nick: Đó là thay đổi nhỏ để làm cho chức năng hoạt động? Bạn có thể đặt mã? Cảm ơn –

Trả lời

4

Các "chọn luật" không áp dụng cho trường hợp của bạn, như bạn có không chỉ hoạt động (server tinh khiết) client-kích hoạt, mà còn hoạt động thời gian kích hoạt - đây chính xác là thời gian chờ chọn. Những gì pháp luật thực sự nên nói là "nếu bạn chỉ định một thời gian chờ, hãy chắc chắn rằng bạn thực sự phải làm một cái gì đó hữu ích khi thời gian chờ đến". Luật pháp có nghĩa là để bảo vệ chống lại sự bận rộn chờ đợi; mã của bạn không bận.

Tôi sẽ không đặt _timeout tối đa là 0,1 và thời gian cập nhật tiếp theo, nhưng tối đa là 0.0 và thời gian chờ tiếp theo. IOW, nếu thời gian cập nhật hết hạn trong khi bạn đang thực hiện cập nhật, bạn nên thực hiện cập nhật cụ thể đó ngay lập tức.

Thay vì hỏi từng kênh cho dù bạn muốn cập nhật, bạn có thể lưu trữ tất cả các kênh trong hàng đợi ưu tiên (được sắp xếp theo thời gian cập nhật tiếp theo) và sau đó chỉ chạy cập nhật cho các kênh sớm nhất thời gian cập nhật chưa đến). Bạn có thể sử dụng mô-đun heapq cho điều đó.

Bạn cũng có thể lưu một số cuộc gọi hệ thống bằng cách không yêu cầu mỗi kênh yêu cầu thời gian hiện tại, nhưng chỉ thăm dò ý kiến ​​thời gian hiện tại một lần và chuyển nó đến .update.

1

Tôi sẽ sử dụng Twisted, thời gian dài kể từ khi tôi sử dụng asyncore nhưng tôi nghĩ rằng đây sẽ là tương đương xoắn (không dự thi, bằng văn bản từ bộ nhớ):

from twisted.internet import reactor, protocol 
import time 

UPDATE_PERIOD = 4.0 

class MyClient(protocol.Protocol): 

    def connectionMade(self): 
     self.updateCall = reactor.callLater(UPDATE_PERIOD, self.update) 

    def connectionLost(self, reason): 
     self.updateCall.cancel() 

    def update(self): 
     self.transport.write("hello %f\n" % (time.time(),)) 

    def dataReceived(self, data): 
     print "recv:", data 


f = protocol.ServerFactory() 
f.protocol = MyClient 

reactor.listenTCP(9090, f) 
reactor.run() 
+0

+1: với mã xoắn có thể đọc được và dễ bảo trì. – nosklo

+0

Mã hoạt động (khá tốt nếu từ bộ nhớ!) Nhưng cần một thay đổi nhỏ để gọi reactor.callLater() trong phương thức update() để gửi bản cập nhật tiếp theo. Nếu không, bạn chỉ nhận được một tin nhắn và updateCall.cancel() sẽ thất bại khi ngắt kết nối. Vấn đề duy nhất của tôi là xoắn thêm một phụ thuộc nhưng tôi sẽ phải cân nhắc điều này đối với năng suất thực tế và khả năng đọc. –

4

lẽ bạn có thể làm điều này với sched.scheduler, như thế này (nb không được thử nghiệm):

import sched, asyncore, time 

# Create a scheduler with a delay function that calls asyncore.loop 
scheduler = sched.scheduler(time.time, lambda t: _poll_loop(t, time.time())) 

# Add the update timeouts with scheduler.enter 
# ... 

def _poll_loop(timeout, start_time): 
    asyncore.loop(timeout, count=1) 
    finish_time = time.time() 
    timeleft = finish_time - start_time 
    if timeleft > timeout: # there was a message and the timeout delay is not finished 
    _poll_loop(timeleft, finish_time) # so wait some more polling the socket 

def main_loop(): 
    while True: 
    if scheduler.empty(): 
     asyncore.loop(30.0, count=1) # just default timeout, use what suits you 
     # add other work that might create scheduled events here 
    else: 
     scheduler.run() 
+0

Trong khi câu trả lời này là tốt đẹp, bạn sẽ sớm chạy trong một RuntimeError với độ sâu đệ quy trong _poll_loop. Tốt hơn hãy viết nó mà không cần đệ quy cho các ví dụ thế giới thực. ;-) –

2

Đây là giải pháp cơ bản của demiurgus với các cạnh thô được làm tròn. Nó vẫn giữ nguyên ý tưởng cơ bản của mình, nhưng ngăn chặn RuntimeErrors và các vòng lặp bận rộn và được kiểm tra. [Edit: giải quyết vấn đề với sửa đổi lịch trình trong _delay]

class asynschedcore(sched.scheduler): 
    """Combine sched.scheduler and asyncore.loop.""" 
    # On receiving a signal asyncore kindly restarts select. However the signal 
    # handler might change the scheduler instance. This tunable determines the 
    # maximum time in seconds to spend in asycore.loop before reexamining the 
    # scheduler. 
    maxloop = 30 
    def __init__(self, map=None): 
     sched.scheduler.__init__(self, time.time, self._delay) 
     if map is None: 
      self._asynmap = asyncore.socket_map 
     else: 
      self._asynmap = map 
     self._abort_delay = False 

    def _maybe_abort_delay(self): 
     if not self._abort_delay: 
      return False 
     # Returning from this function causes the next event to be executed, so 
     # it might be executed too early. This can be avoided by modifying the 
     # head of the queue. Also note that enterabs sets _abort_delay to True. 
     self.enterabs(0, 0, lambda:None,()) 
     self._abort_delay = False 
     return True 

    def _delay(self, timeout): 
     if self._maybe_abort_delay(): 
      return 
     if 0 == timeout: 
      # Should we support this hack, too? 
      # asyncore.loop(0, map=self._asynmap, count=1) 
      return 
     now = time.time() 
     finish = now + timeout 
     while now < finish and self._asynmap: 
      asyncore.loop(min(finish - now, self.maxloop), map=self._asynmap, 
          count=1) 
      if self._maybe_abort_delay(): 
       return 
      now = time.time() 
     if now < finish: 
      time.sleep(finish - now) 

    def enterabs(self, abstime, priority, action, argument): 
     # We might insert an event before the currently next event. 
     self._abort_delay = True 
     return sched.scheduler.enterabs(self, abstime, priority, action, 
             argument) 

    # Overwriting enter is not necessary, because it is implemented using enter. 

    def cancel(self, event): 
     # We might cancel the next event. 
     self._abort_delay = True 
     return sched.scheduler.cancel(self, event) 

    def run(self): 
     """Runs as long as either an event is scheduled or there are 
     sockets in the map.""" 
     while True: 
      if not self.empty(): 
       sched.scheduler.run(self) 
      elif self._asynmap: 
       asyncore.loop(self.maxloop, map=self._asynmap, count=1) 
      else: 
       break 
+0

Chỉ thực hiện thử nghiệm giới hạn, nhưng có vẻ hoạt động tốt! – konrad

0

Có lẽ tôi không hiểu những gì OP đã cố gắng để hoàn thành, nhưng tôi chỉ giải quyết vấn đề này bằng 1 chủ đề mà được một weakref của mỗi kênh (asyncore.dispatcher) đối tượng. Chuỗi này xác định thời gian của riêng nó và sẽ gửi Kênh cập nhật định kỳ bằng Hàng đợi trong kênh đó. Nó nhận được hàng đợi từ đối tượng Channel bằng cách gọi getQueue.

Lý do tôi sử dụng weakref là do khách hàng tạm thời. Nếu kênh chết thì hàm weakref sẽ trả về None. Bằng cách đó, chuỗi thời gian không giữ cho các đối tượng cũ tồn tại vì nó tham chiếu đến chúng.

Tôi biết OP muốn tránh đề tài, nhưng giải pháp này rất đơn giản.Nó chỉ bao giờ tạo ra một luồng và nó nói chuyện với bất kỳ Kênh nào được tạo ra khi đối tượng Máy chủ thêm chúng vào danh sách chủ đề của các đối tượng cần giám sát.

Các vấn đề liên quan