2012-09-06 34 views
8

Tôi có một ứng dụng phản ứng với các tin nhắn được gửi bởi khách hàng. Một thông báo là reload_credentials, rằng ứng dụng sẽ nhận được bất kỳ lúc nào khách hàng mới đăng ký. Thông báo này sau đó sẽ kết nối với một cơ sở dữ liệu PostgreSQL, thực hiện truy vấn cho tất cả các thông tin đăng nhập, và sau đó lưu trữ chúng trong một băm Ruby thông thường (client_id => client_token).Tôi nên xử lý trường hợp sử dụng này bằng EventMachine như thế nào?

Một số thông báo khác mà ứng dụng có thể nhận được là start, stop, pause được sử dụng để theo dõi một số lần phiên. Quan điểm của tôi là tôi mường tượng các ứng dụng hoạt động theo cách sau:

  • client gửi một thông điệp
  • nhắn được xếp hàng đợi
  • hàng đợi đang được xử lý

Tuy nhiên, ví dụ, tôi don không muốn chặn lò phản ứng. Hơn nữa, hãy tưởng tượng tôi có một tin nhắn reload_credentials tiếp theo trong hàng đợi. Tôi không muốn bất kỳ thư nào khác từ hàng đợi được xử lý cho đến khi thông tin đăng nhập được tải lại từ DB. Ngoài ra, trong khi tôi đang xử lý một thông báo nhất định (như chờ truy vấn thông tin xác thực kết thúc), tôi muốn cho phép các thư khác được enqueued.

Bạn có thể hướng dẫn tôi giải quyết vấn đề như vậy không? Tôi nghĩ tôi có thể phải sử dụng em-synchrony, nhưng tôi không chắc chắn.

Trả lời

7

Sử dụng một trong các trình điều khiển EM Postgresql, hoặc EM.defer để bạn không chặn lò phản ứng.

Khi bạn nhận được thông báo 'reload_credentials' chỉ cần lật cờ làm cho tất cả các thư tiếp theo được enqueued. Khi 'reload_credentials' đã hoàn tất, xử lý tất cả thư từ hàng đợi. Sau khi hàng đợi rỗng, hãy lật cờ làm cho các thư được xử lý khi chúng được nhận.

trình điều khiển EM cho PostgreSQL được liệt kê ở đây: https://github.com/eventmachine/eventmachine/wiki/Protocol-Implementations

module Server 
    def post_init 
    @queue    = [] 
    @loading_credentials = false 
    end 

    def recieve_message(type, data) 
    return @queue << [type, data] if @loading_credentials || [email protected]? 
    return process_msg(type, data) unless :reload_credentials == type 
    @loading_credentials = true 
    reload_credentials do 
     @loading_credentials = false 
     process_queue 
    end 
    end 

    def reload_credentials(&when_done) 
    EM.defer(proc { query_and_load_credentials }, when_done) 
    end 


    def process_queue 
    while (type, data = @queue.shift) 
     process_msg(type, data) 
    end 
    end 

    # lots of other methods 
end 

EM.start_server(HOST, PORT, Server) 

Nếu bạn muốn tất cả các kết nối đến xếp hàng thông điệp bất cứ khi nào bất kỳ kết nối nhận được một thông báo 'reload_connections' bạn sẽ phải phối hợp thông qua eigenclass.

+0

Tuy nhiên, có thể nhận được thông báo reload_credentials nhiều lần. Không nên có 2 chủ đề? Một trong đó giữ xếp hàng và một trong đó là xử lý? – Geo

+0

Có, nếu reload_credentials được nhận trong khi một reload_credential khác đang được xử lý, nó sẽ được xếp hàng đợi như các tin nhắn khác. – simulacre

+0

Nhiều đoạn tải lại reload_credentials phải được xử lý như lần đầu tiên. Bằng cách đặt reload_credentials trong một khối EM.defer bạn đang thực thi nó trong một luồng khác. Miễn là mã 'đang xử lý' của bạn không bị chặn, bạn sẽ tiếp tục nhận được tin nhắn. Sử dụng thư viện tương thích EM để đảm bảo bạn không chặn. Hoặc sử dụng EM.defer để xử lý. – simulacre

4

Sau đây là tôi đoán, một cái gì đó giống như thực hiện hiện tại của bạn:

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 


    q = EM::Queue.new 

    workers = Array.new(10) { Worker.new q } 

Vấn đề trên, nếu tôi hiểu bạn một cách chính xác, là bạn không muốn nhân viên làm việc trên việc làm mới (công việc mà đã đến sớm hơn trong thời gian sản xuất), hơn bất kỳ công việc reload_credentials nào. Sau đây nên dịch vụ này (bổ sung lời cảnh cáo ở cuối).

class Worker 
     def initialize queue 
     @queue = queue 
     dequeue 
     end 

     def dequeue 
     @queue.pop do |item| 
      begin 
      work_on item 
      ensure 
      dequeue 
      end 
     end 
     end 

     def work_on item 
     case item.type 
     when :reload_credentials 
      # magic happens here 
     else 
      # more magic happens here 
     end 
     end 
    end 

    class LockingDispatcher 
     def initialize channel, queue 
     @channel = channel 
     @queue = queue 

     @backlog = [] 
     @channel.subscribe method(:dispatch_with_locking) 

     @locked = false 
     end 

     def dispatch_with_locking item 
     if locked? 
      @backlog << item 
     else 
      # You probably want to move the specialization here out into a method or 
      # block that's passed into the constructor, to make the lockingdispatcher 
      # more of a generic processor 
      case item.type 
      when :reload_credentials 
      lock 
      deferrable = CredentialReloader.new(item).start 
      deferrable.callback { unlock } 
      deferrable.errback { unlock } 
      else 
      dispatch_without_locking item 
      end 
     end 
     end 

     def dispatch_without_locking item 
     @queue << item 
     end 

     def locked? 
     @locked 
     end 

     def lock 
     @locked = true 
     end 

     def unlock 
     @locked = false 
     bl = @backlog.dup 
     @backlog.clear 
     bl.each { |item| dispatch_with_locking item } 
     end 

    end 

    channel = EM::Channel.new 
    queue = EM::Queue.new 

    dispatcher = LockingDispatcher.new channel, queue 

    workers = Array.new(10) { Worker.new queue } 

Vì vậy, đầu vào cho hệ thống đầu tiên do thỏa thuận trên q, nhưng trong hệ thống mới này nói ở trên channel. queue vẫn được sử dụng để phân phối công việc giữa các công nhân, nhưng queue không được điền trong khi hoạt động của thông tin xác thực làm mới đang diễn ra. Thật không may, vì tôi không mất nhiều thời gian hơn, tôi đã không khái quát hóa số LockingDispatcher sao cho nó không được kết hợp với loại mục và mã để gửi đi CredentialsReloader. Tôi sẽ để điều đó cho bạn.

Bạn nên lưu ý rằng mặc dù dịch vụ này là những gì tôi hiểu về yêu cầu ban đầu của bạn, thường tốt hơn là bạn nên thư giãn loại yêu cầu này. Có một số vấn đề còn tồn tại về cơ bản không thể loại trừ mà không cần thay đổi trong yêu cầu rằng:

  • Hệ thống không chờ đợi để thực hiện công việc để hoàn thành trước khi bắt đầu thông tin việc làm
  • Hệ thống sẽ xử lý các vụ nổ của thông tin công việc rất nặng nề - các mục khác có thể xử lý được, sẽ không được.
  • Trong trường hợp có lỗi trong mã thông tin xác thực, nhật ký có thể lấp đầy ram và gây ra lỗi. Một thời gian chờ đơn giản có thể là đủ để tránh các hiệu ứng thảm khốc, mã iff bị hủy bỏ, và các thông điệp tiếp theo đủ khả năng xử lý để tránh các deadlocks khác.

Có vẻ như bạn có một số khái niệm về userid trong hệ thống. Nếu bạn nghĩ rằng thông qua yêu cầu của bạn, nó có khả năng có thể là bạn chỉ cần backlog các mục liên quan đến một userid của những người thông tin đang ở trong trạng thái làm mới. Đây là một vấn đề khác, liên quan đến một loại điều phối khác. Hãy thử một băm của backlogs bị khóa cho những người dùng, với một cuộc gọi lại trên hoàn thành giấy chứng nhận để thoát những backlogs vào công nhân, hoặc một số sắp xếp tương tự.

Chúc may mắn!

Các vấn đề liên quan