2013-11-04 15 views
6

Dự án tôi đang thực hiện yêu cầu đọc thư từ SQS và tôi quyết định sử dụng Akka để phân phối việc xử lý các thư này.Tỷ lệ thăm dò của người tiêu dùng với Akka, SQS và Camel

Vì SQS được hỗ trợ bởi Camel và có chức năng được tích hợp để sử dụng trong Akka trong lớp Người tiêu dùng, tôi tưởng tượng tốt nhất là triển khai điểm cuối và đọc thư theo cách này, mặc dù tôi không thấy nhiều ví dụ mọi người làm như vậy.

Vấn đề của tôi là tôi không thể thăm dò ý kiến ​​xếp hàng của mình đủ nhanh để giữ cho hàng đợi trống hoặc gần trống. Điều tôi nghĩ ban đầu là tôi có thể nhờ Người tiêu dùng nhận tin nhắn qua Camel từ SQS với tốc độ X/s. Từ đó, tôi có thể đơn giản tạo ra nhiều Người tiêu dùng hơn để đạt được tốc độ mà tại đó tôi cần thông báo được xử lý.

tiêu dùng của tôi:

import akka.camel.{CamelMessage, Consumer} 
import akka.actor.{ActorRef, ActorPath} 

class MyConsumer() extends Consumer { 
    def endpointUri = "aws-sqs://my_queue?delay=1&maxMessagesPerPoll=10&accessKey=myKey&secretKey=RAW(mySecret)" 
    var count = 0 

    def receive = { 
    case msg: CamelMessage => { 
     count += 1 
    } 
    case _ => { 
     println("Got something else") 
    } 
    } 

    override def postStop(){ 
    println("Count for actor: " + count) 
    } 
} 

Như đã trình bày, tôi đã thiết delay=1 cũng như &maxMessagesPerPoll=10 để cải thiện tốc độ của bài viết, nhưng tôi không thể đẻ trứng nhiều người tiêu dùng với các thiết bị đầu cuối cùng.

Tôi đọc trong tài liệu By default endpoints are assumed not to support multiple consumers. và tôi tin rằng điều này cũng đúng với điểm cuối SQS, vì sinh sản nhiều người tiêu dùng sẽ chỉ cho tôi một người tiêu dùng sau khi chạy hệ thống trong một phút, thông báo đầu ra là Count for actor: x thay vì những sản phẩm khác xuất ra Count for actor: 0.

Nếu điều này hữu ích; Tôi có thể đọc khoảng 33 tin nhắn/giây với việc triển khai hiện tại này trên một người tiêu dùng duy nhất.

Đây có phải là cách thích hợp để đọc thư từ hàng đợi SQS trong Akka không? Nếu vậy, có cách nào tôi có thể nhận được điều này để mở rộng quy mô ra ngoài để tôi có thể tăng tỷ lệ tiêu thụ tin nhắn của tôi gần hơn với 900 tin nhắn/giây?

Trả lời

5

Xu hướng đáng tiếc hiện không hỗ trợ việc tiêu thụ song song thư trên SQS.

http://camel.465427.n5.nabble.com/Amazon-SQS-listener-as-multi-threaded-td5741541.html

Để giải quyết này, tôi đã viết diễn viên của riêng tôi để thăm dò ý kiến ​​thông điệp hàng loạt SQS sử dụng AWS-java-sdk.

def receive = { 
    case BeginPolling => { 
     // re-queue sending asynchronously 
     self ! BeginPolling 
     // traverse the response 
     val deleteMessageList = new ArrayList[DeleteMessageBatchRequestEntry] 
     val messages = sqs.receiveMessage(receiveMessageRequest).getMessages 
     messages.toList.foreach { 
     node => { 
      deleteMessageList.add(new DeleteMessageBatchRequestEntry(node.getMessageId, node.getReceiptHandle)) 
      //log.info("Node body: {}", node.getBody) 
      filterSupervisor ! node.getBody 
     } 
     } 
     if(deleteEntryList.size() > 0){ 
     val deleteMessageBatchRequest = new DeleteMessageBatchRequest(queueName, deleteMessageList) 
     sqs.deleteMessageBatch(deleteMessageBatchRequest) 
     } 
    } 

    case _ => { 
     log.warning("Unknown message") 
    } 
    } 

Mặc dù tôi không chắc chắn nếu điều này là việc thực hiện tốt nhất, và nó có thể đương nhiên được cải thiện khi để yêu cầu không liên tục đánh một hàng đợi rỗng, nó không phù hợp với nhu cầu hiện tại của tôi là có thể thăm dò ý kiến tin nhắn từ cùng một hàng đợi.

Nhận khoảng 133 (thư/giây)/diễn viên từ SQS với điều này.

1

Lạc đà 2,15 hỗ trợ đồng thờiConsumers, mặc dù không chắc chắn cách hữu ích này là ở chỗ tôi không biết liệu hỗ trợ akka lạc đà 2.15 và tôi không biết nếu có một diễn viên tiêu dùng tạo sự khác biệt ngay cả khi có nhiều người tiêu dùng.

Các vấn đề liên quan