2015-09-12 13 views
5

Tôi đang sử dụng Akka Streams trong Scala để thăm dò ý kiến ​​từ hàng đợi AWS SQS bằng cách sử dụng AWS Java SDK. Tôi tạo ra một ActorPublisher mà dequeues tin nhắn trong một hai khoảng thời gian thứ hai:Các luồng Akka có thể được thực hiện liên tục như thế nào?

class SQSSubscriber(name: String) extends ActorPublisher[Message] { 
    implicit val materializer = ActorMaterializer() 

    val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue") 

    val client = new AmazonSQSClient() 
    client.setRegion(RegionUtils.getRegion("us-east-1")) 
    val url = client.getQueueUrl(name).getQueueUrl 

    val MaxBufferSize = 100 
    var buf = Vector.empty[Message] 

    override def receive: Receive = { 
    case "dequeue" => 
     val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList 
     messages.foreach(self ! _) 
    case message: Message if buf.size == MaxBufferSize => 
     log.error("The buffer is full") 
    case message: Message => 
     if (buf.isEmpty && totalDemand > 0) 
     onNext(message) 
     else { 
     buf :+= message 
     deliverBuf() 
     } 
    case Request(_) => 
     deliverBuf() 
    case Cancel => 
     context.stop(self) 
    } 

    @tailrec final def deliverBuf(): Unit = 
    if (totalDemand > 0) { 
     if (totalDemand <= Int.MaxValue) { 
     val (use, keep) = buf.splitAt(totalDemand.toInt) 
     buf = keep 
     use foreach onNext 
     } else { 
     val (use, keep) = buf.splitAt(Int.MaxValue) 
     buf = keep 
     use foreach onNext 
     deliverBuf() 
     } 
    } 
} 

Trong ứng dụng của tôi, tôi đang cố gắng để chạy các dòng chảy tại một khoảng thời gian 2 giây cũng như:

val system = ActorSystem("system") 
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name")) 
val flow = Flow[Message] 
    .map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem } 
    .to(Sink.ignore) 

system.scheduler.schedule(0 seconds, 2 seconds) { 
    flow.runWith(sqsSource)(ActorMaterializer()(system)) 
} 

Tuy nhiên, khi tôi chạy đơn đăng ký của tôi tôi nhận được java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds] và thông báo thư chết sau đó do ActorMaterializer gây ra.

Có phương pháp được đề xuất để liên tục thực hiện Luồng Akka không?

+0

Tôi không thể kiểm tra ngay bây giờ, nhưng tôi không chắc chắn về việc sử dụng nhiều trường hợp của ActorMaterializer. Bạn đang sử dụng một cá thể bên trong ActorPublisher và một phiên bản khác cho toàn bộ luồng. –

+0

Tôi đã kết thúc bằng cách sử dụng Akka-Camel vì nó có một hội nhập SQS tốt đẹp mà hoàn thành tất cả những gì tôi cần làm (https://github.com/fzakaria/Akka-Camel-SQS/). –

+0

Có lý do nào khiến bạn phải liên tục sử dụng ActorPublisher khác nhau sau mỗi 2 giây không?Dựa trên mã mẫu cho nó sẽ dễ dàng hơn nhiều để chỉ tiếp tục sử dụng cùng một nhà xuất bản ... –

Trả lời

7

Tôi không nghĩ rằng bạn cần phải tạo một ActorPublisher mới cứ sau 2 giây. Điều này có vẻ dư thừa và lãng phí bộ nhớ. Ngoài ra, tôi không nghĩ rằng một ActorPublisher là cần thiết. Từ những gì tôi có thể nói về mã, việc triển khai của bạn sẽ có số lượng ngày càng tăng của tất cả các luồng truy vấn cùng một dữ liệu. Mỗi Message từ khách hàng sẽ được xử lý bởi N luồng akka khác nhau và, thậm chí tệ hơn, N sẽ phát triển theo thời gian.

Iterator Đối với Infinite Loop Truy vấn

Bạn có thể nhận hành vi tương tự từ ActorPublisher của bạn bằng cách sử dụng scala của Iterator. Nó có thể tạo ra một Iterator mà liên tục truy vấn khách hàng:

//setup the client 
val client = { 
    val sqsClient = new AmazonSQSClient() 
    sqsClient setRegion (RegionUtils getRegion "us-east-1") 
    sqsClient 
} 

val url = client.getQueueUrl(name).getQueueUrl 

//single query 
def queryClientForMessages : Iterable[Message] = iterableAsScalaIterable { 
    client receiveMessage (new ReceiveMessageRequest(url).getMessages) 
} 

def messageListIteartor : Iterator[Iterable[Message]] = 
    Iterator continually messageListStream 

//messages one-at-a-time "on demand", no timer pushing you around 
def messageIterator() : Iterator[Message] = messageListIterator flatMap identity 

thực hiện này chỉ truy vấn cho khách hàng khi tất cả các tin nhắn trước đó đã được tiêu thụ và do đó thực sự là reactive. Không cần phải theo dõi một bộ đệm với kích thước cố định. Giải pháp của bạn cần một bộ đệm vì việc tạo ra các thông điệp (thông qua một bộ đếm thời gian) là de-coupled từ việc tiêu thụ các thông điệp (thông qua println). Trong quá trình thực hiện của tôi, việc tạo ra tiêu thụ & là tightly coupled thông qua áp suất ngược.

Akka Suối Nguồn

Sau đó bạn có thể sử dụng Iterator này phát chức năng để nuôi một dòng AKKA Nguồn:

def messageSource : Source[Message, _] = Source fromIterator messageIterator 

luồng hình

Và cuối cùng bạn có thể sử dụng này Nguồn để thực hiện println (Lưu ý: giá trị flow của bạn thực sự là một Sink si nce Flow + Sink = Sink). Sử dụng giá trị flow của bạn từ câu hỏi:

messageSource runWith flow 

Một akka Xử lý luồng tất cả thư.

Các vấn đề liên quan