Tôi đang sử dụng Akka Streams trong Scala để thăm dò ý kiến từ hàng đợi AWS SQS bằng cách sử dụng AWS Java SDK. Tôi tạo ra một ActorPublisher mà dequeues tin nhắn trong một hai khoảng thời gian thứ hai:Các luồng Akka có thể được thực hiện liên tục như thế nào?
class SQSSubscriber(name: String) extends ActorPublisher[Message] {
implicit val materializer = ActorMaterializer()
val schedule = context.system.scheduler.schedule(0 seconds, 2 seconds, self, "dequeue")
val client = new AmazonSQSClient()
client.setRegion(RegionUtils.getRegion("us-east-1"))
val url = client.getQueueUrl(name).getQueueUrl
val MaxBufferSize = 100
var buf = Vector.empty[Message]
override def receive: Receive = {
case "dequeue" =>
val messages = iterableAsScalaIterable(client.receiveMessage(new ReceiveMessageRequest(url).getMessages).toList
messages.foreach(self ! _)
case message: Message if buf.size == MaxBufferSize =>
log.error("The buffer is full")
case message: Message =>
if (buf.isEmpty && totalDemand > 0)
onNext(message)
else {
buf :+= message
deliverBuf()
}
case Request(_) =>
deliverBuf()
case Cancel =>
context.stop(self)
}
@tailrec final def deliverBuf(): Unit =
if (totalDemand > 0) {
if (totalDemand <= Int.MaxValue) {
val (use, keep) = buf.splitAt(totalDemand.toInt)
buf = keep
use foreach onNext
} else {
val (use, keep) = buf.splitAt(Int.MaxValue)
buf = keep
use foreach onNext
deliverBuf()
}
}
}
Trong ứng dụng của tôi, tôi đang cố gắng để chạy các dòng chảy tại một khoảng thời gian 2 giây cũng như:
val system = ActorSystem("system")
val sqsSource = Source.actorPublisher[Message](SQSSubscriber.props("queue-name"))
val flow = Flow[Message]
.map { elem => system.log.debug(s"${elem.getBody} (${elem.getMessageId})"); elem }
.to(Sink.ignore)
system.scheduler.schedule(0 seconds, 2 seconds) {
flow.runWith(sqsSource)(ActorMaterializer()(system))
}
Tuy nhiên, khi tôi chạy đơn đăng ký của tôi tôi nhận được java.util.concurrent.TimeoutException: Futures timed out after [20000 milliseconds]
và thông báo thư chết sau đó do ActorMaterializer
gây ra.
Có phương pháp được đề xuất để liên tục thực hiện Luồng Akka không?
Tôi không thể kiểm tra ngay bây giờ, nhưng tôi không chắc chắn về việc sử dụng nhiều trường hợp của ActorMaterializer. Bạn đang sử dụng một cá thể bên trong ActorPublisher và một phiên bản khác cho toàn bộ luồng. –
Tôi đã kết thúc bằng cách sử dụng Akka-Camel vì nó có một hội nhập SQS tốt đẹp mà hoàn thành tất cả những gì tôi cần làm (https://github.com/fzakaria/Akka-Camel-SQS/). –
Có lý do nào khiến bạn phải liên tục sử dụng ActorPublisher khác nhau sau mỗi 2 giây không?Dựa trên mã mẫu cho nó sẽ dễ dàng hơn nhiều để chỉ tiếp tục sử dụng cùng một nhà xuất bản ... –