Mặc dù bạn đã có một câu trả lời tuyệt vời , Tôi nghĩ tôi vẫn có thể đưa ra một hoặc hai ý kiến về những vấn đề này.
Tôi nhớ đã thấy một nơi nào đó (trên blog của ai đó) "sử dụng diễn viên cho tiểu bang và sử dụng tương lai cho đồng thời".
Vì vậy, suy nghĩ đầu tiên của tôi sẽ là sử dụng diễn viên bằng cách nào đó. Để chính xác, tôi sẽ có một diễn viên chính với một bộ định tuyến khởi chạy nhiều nhân viên công nhân, với số lượng công nhân bị hạn chế theo số allowableParallelism
. Vì vậy, giả sử tôi có
def doWorkInternal (symbol: String): Unit
mà hiện các công việc từ bạn doWork
lấy 'bên ngoài của tương lai', tôi sẽ có một cái gì đó dọc theo những dòng (rất thô sơ, không dùng nhiều chi tiết vào xem xét, và thực tế sao chép mã từ AKKA tài liệu):
import akka.actor._
case class WorkItem (symbol: String)
case class WorkItemCompleted (symbol: String)
case class WorkLoad (symbols: Array[String])
case class WorkLoadCompleted()
class Worker extends Actor {
def receive = {
case WorkItem (symbol) =>
doWorkInternal (symbol)
sender() ! WorkItemCompleted (symbol)
}
}
class Master extends Actor {
var pending = Set[String]()
var originator: Option[ActorRef] = None
var router = {
val routees = Vector.fill (allowableParallelism) {
val r = context.actorOf(Props[Worker])
context watch r
ActorRefRoutee(r)
}
Router (RoundRobinRoutingLogic(), routees)
}
def receive = {
case WorkLoad (symbols) =>
originator = Some (sender())
context become processing
for (symbol <- symbols) {
router.route (WorkItem (symbol), self)
pending += symbol
}
}
def processing: Receive = {
case Terminated (a) =>
router = router.removeRoutee(a)
val r = context.actorOf(Props[Worker])
context watch r
router = router.addRoutee(r)
case WorkItemCompleted (symbol) =>
pending -= symbol
if (pending.size == 0) {
context become receive
originator.get ! WorkLoadCompleted
}
}
}
Bạn có thể truy vấn các diễn viên bậc thầy với ask
và nhận một WorkLoadCompleted
trong một tương lai.Tuy nhiên, suy nghĩ thêm về 'trạng thái' (số lượng yêu cầu đồng thời đang xử lý) bị ẩn ở đâu đó, cùng với việc thực thi mã cần thiết để không vượt quá nó, đây là thứ 'trung gian cổng tương lai' sắp xếp, nếu bạn không ' t nhớ bắt buộc phong cách và có thể thay đổi (được sử dụng trong nội bộ chỉ mặc dù) cấu trúc:
object Guardian
{
private val incoming = new collection.mutable.HashMap[String, Promise[Unit]]()
private val outgoing = new collection.mutable.HashMap[String, Future[Unit]]()
private val pending = new collection.mutable.Queue[String]
def doWorkGuarded (symbol: String): Future[Unit] = {
synchronized {
val p = Promise[Unit]()
incoming(symbol) = p
if (incoming.size <= allowableParallelism)
launchWork (symbol)
else
pending.enqueue (symbol)
p.future
}
}
private def completionHandler (t: Try[Unit]): Unit = {
synchronized {
for (symbol <- outgoing.keySet) {
val f = outgoing (symbol)
if (f.isCompleted) {
incoming (symbol).completeWith (f)
incoming.remove (symbol)
outgoing.remove (symbol)
}
}
for (i <- outgoing.size to allowableParallelism) {
if (pending.nonEmpty) {
val symbol = pending.dequeue()
launchWork (symbol)
}
}
}
}
private def launchWork (symbol: String): Unit = {
val f = doWork(symbol)
outgoing(symbol) = f
f.onComplete(completionHandler)
}
}
doWork
tại là chính xác như của bạn, trở Future[Unit]
, với ý tưởng rằng thay vì sử dụng một cái gì đó giống như
val futures = symbols.map (doWork (_)).toSeq
val future = Future.sequence(futures)
đó sẽ khởi động tương lai không liên quan đến allowableParallelism
ở tất cả, tôi sẽ thay vì sử dụng
val futures = symbols.map (Guardian.doWorkGuarded (_)).toSeq
val future = Future.sequence(futures)
Hãy suy nghĩ về một số lái xe giả thuyết cơ sở dữ liệu truy cập với những người không chặn giao diện, tức là trở về tương lai trên yêu cầu, được giới hạn trong đồng thời bởi được xây dựng trên một số hồ bơi kết nối ví dụ - bạn sẽ không muốn nó quay trở lại tương lai không dùng mức độ song song trong tài khoản, và yêu cầu bạn phải sắp xếp với họ để giữ cho sự kiểm soát song song.
Ví dụ này minh họa hơn thực tế vì tôi thường không mong đợi giao diện 'gửi đi' sẽ sử dụng tương lai như thế này (đây là báo giá ok cho giao diện 'đến').
tôi nên thêm rằng nó sẽ là tốt hơn nếu, mỗi lần một tương lai hoàn thành, một cái mới được bắt đầu, thay vì chờ đợi cho bệnh nhân điều trị toàn bộ/nhóm để hoàn thành. – experquisite
là chặn IO của bạn sau đó được bao bọc trong Tương lai {} hoặc là IO không đồng bộ và không sử dụng luồng trong khi chờ trên máy chủ từ xa? Nếu nó chặn sau đó một hồ bơi cố định thread với 5 chủ đề có vẻ như giải pháp đơn giản nhất cho tôi. Nhưng sử dụng hồ bơi đó chỉ để chặn IO và không có gì khác tất nhiên. –
IO sao lưu doWork() là không chặn, chạy trên các chủ đề tôi không có quyền kiểm soát, mà tôi đã bao bọc thành các Observables ở các mức trừu tượng khác nhau. – experquisite