2016-05-05 19 views
7

Tôi đang sử dụng Akka 2.4.4 và cố di chuyển từ Apache HttpAsyncClient (không thành công).Dòng Akka bị treo khi thực hiện các yêu cầu http qua hồ bơi kết nối

Dưới đây là phiên bản mã đơn giản mà tôi sử dụng trong dự án của mình.

Vấn đề là nó treo nếu tôi gửi nhiều hơn 1-3 yêu cầu cho luồng. Cho đến nay sau 6 giờ gỡ lỗi tôi thậm chí không thể xác định được vấn đề. Tôi không thấy ngoại lệ, nhật ký lỗi, sự kiện trong số Decider. NOTHING :)

Tôi đã cố gắng giảm cài đặt connection-timeout xuống 1 giây khi nghĩ rằng có thể nó đang chờ phản hồi từ máy chủ nhưng nó không giúp ích gì.

Tôi đang làm gì sai?

import akka.actor.ActorSystem 
import akka.http.scaladsl.Http 
import akka.http.scaladsl.model.headers.Referer 
import akka.http.scaladsl.model.{HttpRequest, HttpResponse} 
import akka.http.scaladsl.settings.ConnectionPoolSettings 
import akka.stream.Supervision.Decider 
import akka.stream.scaladsl.{Sink, Source} 
import akka.stream.{ActorAttributes, Supervision} 
import com.typesafe.config.ConfigFactory 

import scala.collection.immutable.{Seq => imSeq} 
import scala.concurrent.{Await, Future} 
import scala.concurrent.duration.Duration 
import scala.util.Try 

object Main { 

    implicit val system = ActorSystem("root") 
    implicit val executor = system.dispatcher 
    val config = ConfigFactory.load() 

    private val baseDomain = "www.google.com" 
    private val poolClientFlow = Http()(system).cachedHostConnectionPool[Any](baseDomain, 80, ConnectionPoolSettings(config)) 

    private val decider: Decider = { 
    case ex => 
     ex.printStackTrace() 
     Supervision.Stop 
    } 

    private def sendMultipleRequests[T](items: Seq[(HttpRequest, T)]): Future[Seq[(Try[HttpResponse], T)]] = 

    Source.fromIterator(() => items.toIterator) 
     .via(poolClientFlow) 
     .log("Logger")(log = myAdapter) 
     .recoverWith { 
     case ex => 
      println(ex) 
      null 
     } 
     .withAttributes(ActorAttributes.supervisionStrategy(decider)) 
     .runWith(Sink.seq) 
     .map { v => 
     println(s"Got ${v.length} responses in Flow") 
     v.asInstanceOf[Seq[(Try[HttpResponse], T)]] 
     } 

    def main(args: Array[String]) { 

    val headers = imSeq(Referer("https://www.google.com/")) 
    val reqPair = HttpRequest(uri = "/intl/en/policies/privacy").withHeaders(headers) -> "some req ID" 
    val requests = List.fill(10)(reqPair) 
    val qwe = sendMultipleRequests(requests).map { case responses => 
     println(s"Got ${responses.length} responses") 

     system.terminate() 
    } 

    Await.ready(system.whenTerminated, Duration.Inf) 
    } 
} 

Còn điều gì với số proxy support? Dường như không làm việc cho tôi.

Trả lời

7

Bạn cần tiêu thụ toàn bộ phần thân của phản hồi để kết nối được cung cấp cho các yêu cầu tiếp theo. Nếu bạn không quan tâm đến các đối tượng phản ứng gì cả, sau đó bạn chỉ có thể thoát nó vào một Sink.ignore, một cái gì đó như thế này:

resp.entity.dataBytes.runWith(Sink.ignore) 

Bằng cách cấu hình mặc định, khi sử dụng một hồ bơi kết nối máy chủ, các kết nối tối đa là thiết lập để 4. Mỗi hồ bơi có hàng đợi riêng của nó, nơi yêu cầu chờ đợi cho đến khi một trong các kết nối mở trở nên có sẵn. Nếu hàng đợi đó vượt quá 32 (cấu hình mặc định, có thể thay đổi, phải là lũy thừa 2) thì yo sẽ bắt đầu thấy lỗi. Trong trường hợp của bạn, bạn chỉ làm 10 yêu cầu, vì vậy bạn không đạt đến giới hạn đó. Nhưng bằng cách không tiêu thụ thực thể phản hồi, bạn không giải phóng kết nối và mọi thứ khác chỉ là hàng đợi ở phía sau, chờ kết nối miễn phí.

+0

Tôi thực sự đã thử điều đó và nó không giúp ích gì. Có lẽ tôi đã đặt nó sai chỗ. Bạn có thể vui lòng xem dự án khép kín mà tôi đã tạo không? https://github.com/cppexpert/akka_flow_freezing – expert

+2

Ya, đó là vấn đề. Bạn đang cố gắng sắp xếp các kết quả của 10 tương lai và sau đó đọc cơ thể. Vấn đề ở đây là để gọi bản đồ trên 'trình tự', tất cả 10 tương lai phải được hoàn thành và chỉ 4 người đầu tiên sẽ là 4 người đầu tiên đang chặn người kia 6. Đẩy mã đọc phản hồi lên cao hơn và điều đó sẽ sửa vấn đề của bạn. – cmbaxter

+0

Bạn có thể minh họa cách bạn muốn chuyển mã đọc phản hồi không? Tôi đã thử vài thứ và nó vẫn chờ đợi cho tương lai được hoàn thành với số lượng lớn. Ngay cả trong ví dụ của tôi nên 'parseResponse' được gọi là không đồng bộ trước khi trên mỗi phản ứng TRƯỚC KHI nó được truyền cho Future.sequence? Có lẽ tôi có thể chuyển sang 'toMat' của hàng đợi nhưng sau đó tôi sẽ không thể sử dụng nó để phân tích các câu trả lời khác nhau. Đóng gói lamba cùng với '(Any, Promise [..])' cho mọi yêu cầu có vẻ quá xấu với tôi. – expert

Các vấn đề liên quan