Tôi đang sử dụng Akka 2.4.4 và cố di chuyển từ Apache HttpAsyncClient (không thành công).Dòng Akka bị treo khi thực hiện các yêu cầu http qua hồ bơi kết nối
Dưới đây là phiên bản mã đơn giản mà tôi sử dụng trong dự án của mình.
Vấn đề là nó treo nếu tôi gửi nhiều hơn 1-3 yêu cầu cho luồng. Cho đến nay sau 6 giờ gỡ lỗi tôi thậm chí không thể xác định được vấn đề. Tôi không thấy ngoại lệ, nhật ký lỗi, sự kiện trong số Decider
. NOTHING :)
Tôi đã cố gắng giảm cài đặt connection-timeout
xuống 1 giây khi nghĩ rằng có thể nó đang chờ phản hồi từ máy chủ nhưng nó không giúp ích gì.
Tôi đang làm gì sai?
import akka.actor.ActorSystem
import akka.http.scaladsl.Http
import akka.http.scaladsl.model.headers.Referer
import akka.http.scaladsl.model.{HttpRequest, HttpResponse}
import akka.http.scaladsl.settings.ConnectionPoolSettings
import akka.stream.Supervision.Decider
import akka.stream.scaladsl.{Sink, Source}
import akka.stream.{ActorAttributes, Supervision}
import com.typesafe.config.ConfigFactory
import scala.collection.immutable.{Seq => imSeq}
import scala.concurrent.{Await, Future}
import scala.concurrent.duration.Duration
import scala.util.Try
object Main {
implicit val system = ActorSystem("root")
implicit val executor = system.dispatcher
val config = ConfigFactory.load()
private val baseDomain = "www.google.com"
private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config))
private val decider: Decider = {
case ex =>
ex.printStackTrace()
Supervision.Stop
}
private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] =
Source.fromIterator(() => items.toIterator)
.via(poolClientFlow)
.log("Logger")(log = myAdapter)
.recoverWith {
case ex =>
println(ex)
null
}
.withAttributes(ActorAttributes.supervisionStrategy(decider))
.runWith(Sink.seq)
.map { v =>
println(s"Got ${v.length} responses in Flow")
v.asInstanceOf[Seq[(Try[HttpResponse], T)]]
}
def main(args: Array[String]) {
val headers = imSeq(Referer("https://www.google.com/"))
val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID"
val requests = List.fill(10)(reqPair)
val qwe = sendMultipleRequests(requests).map { case responses =>
println(s"Got ${responses.length} responses")
system.terminate()
}
Await.ready(system.whenTerminated, Duration.Inf)
}
}
Còn điều gì với số proxy support? Dường như không làm việc cho tôi.
Tôi thực sự đã thử điều đó và nó không giúp ích gì. Có lẽ tôi đã đặt nó sai chỗ. Bạn có thể vui lòng xem dự án khép kín mà tôi đã tạo không? https://github.com/cppexpert/akka_flow_freezing – expert
Ya, đó là vấn đề. Bạn đang cố gắng sắp xếp các kết quả của 10 tương lai và sau đó đọc cơ thể. Vấn đề ở đây là để gọi bản đồ trên 'trình tự', tất cả 10 tương lai phải được hoàn thành và chỉ 4 người đầu tiên sẽ là 4 người đầu tiên đang chặn người kia 6. Đẩy mã đọc phản hồi lên cao hơn và điều đó sẽ sửa vấn đề của bạn. – cmbaxter
Bạn có thể minh họa cách bạn muốn chuyển mã đọc phản hồi không? Tôi đã thử vài thứ và nó vẫn chờ đợi cho tương lai được hoàn thành với số lượng lớn. Ngay cả trong ví dụ của tôi nên 'parseResponse' được gọi là không đồng bộ trước khi trên mỗi phản ứng TRƯỚC KHI nó được truyền cho Future.sequence? Có lẽ tôi có thể chuyển sang 'toMat' của hàng đợi nhưng sau đó tôi sẽ không thể sử dụng nó để phân tích các câu trả lời khác nhau. Đóng gói lamba cùng với '(Any, Promise [..])' cho mọi yêu cầu có vẻ quá xấu với tôi. – expert