2014-11-23 13 views
5

Bối cảnh: Tôi có một chức năng:Sequencing Scala Futures với xử lý song song bị chặn (không rối tung xung quanh với ExecutorContexts)

def doWork(symbol: String): Future[Unit] 

mà khởi một số tác dụng phụ để lấy dữ liệu và lưu trữ nó, và hoàn thành một tương lai khi hoàn thành công . Tuy nhiên, cơ sở hạ tầng phía sau có giới hạn sử dụng, sao cho không quá 5 trong số các yêu cầu này có thể được thực hiện song song. Tôi có một danh sách các N biểu tượng mà tôi cần phải vượt qua:

var symbols = Array("MSFT",...) 

nhưng tôi muốn thiết lập trình tự họ như vậy mà không nhiều hơn 5 được thực hiện cùng một lúc. Đưa ra:

val allowableParallelism = 5 

giải pháp hiện tại của tôi là (giả sử tôi đang làm việc với async/chờ đợi):

val symbolChunks = symbols.toList.grouped(allowableParallelism).toList 
    def toThunk(x: List[String]) =() => Future.sequence(x.map(doWork)) 
    val symbolThunks = symbolChunks.map(toThunk) 
    val done = Promise[Unit]() 
    def procThunks(x: List[() => Future[List[Unit]]]): Unit = x match { 
    case Nil => done.success() 
    case x::xs => x().onComplete(_ => procThunks(xs)) 
    } 
    procThunks(symbolThunks) 
    await { done.future } 

nhưng, vì những lý do rõ ràng, tôi không khủng khiếp hài lòng với nó. Tôi cảm thấy như thế này nên có thể với nếp gấp, nhưng mỗi khi tôi cố gắng, tôi cuối cùng háo hức tạo ra tương lai. Tôi cũng đã thử một phiên bản với RxScala Observables, sử dụng concatMap, nhưng điều đó cũng có vẻ như quá mức cần thiết.

Có cách nào tốt hơn để thực hiện việc này không?

+0

tôi nên thêm rằng nó sẽ là tốt hơn nếu, mỗi lần một tương lai hoàn thành, một cái mới được bắt đầu, thay vì chờ đợi cho bệnh nhân điều trị toàn bộ/nhóm để hoàn thành. – experquisite

+0

là chặn IO của bạn sau đó được bao bọc trong Tương lai {} hoặc là IO không đồng bộ và không sử dụng luồng trong khi chờ trên máy chủ từ xa? Nếu nó chặn sau đó một hồ bơi cố định thread với 5 chủ đề có vẻ như giải pháp đơn giản nhất cho tôi. Nhưng sử dụng hồ bơi đó chỉ để chặn IO và không có gì khác tất nhiên. –

+0

IO sao lưu doWork() là không chặn, chạy trên các chủ đề tôi không có quyền kiểm soát, mà tôi đã bao bọc thành các Observables ở các mức trừu tượng khác nhau. – experquisite

Trả lời

5

Tôi có ví dụ về cách thực hiện điều đó với luồng trực tiếp. Nó khá nhiều mã vì nó được yêu cầu để chuyển đổi scala Future thành scalaz Task (trừu tượng cho tính toán trì hoãn). Tuy nhiên, nó được yêu cầu để thêm nó vào dự án một lần. Một tùy chọn khác là sử dụng Tác vụ để xác định 'doWork'. Cá nhân tôi thích công việc xây dựng các chương trình không đồng bộ.

import scala.concurrent.{Future => SFuture} 
    import scala.util.Random 
    import scala.concurrent.ExecutionContext.Implicits.global 


    import scalaz.stream._ 
    import scalaz.concurrent._ 

    val P = scalaz.stream.Process 

    val rnd = new Random() 

    def doWork(symbol: String): SFuture[Unit] = SFuture { 
    Thread.sleep(rnd.nextInt(1000)) 
    println(s"Symbol: $symbol. Thread: ${Thread.currentThread().getName}") 
    } 

    val symbols = Seq("AAPL", "MSFT", "GOOGL", "CVX"). 
    flatMap(s => Seq.fill(5)(s).zipWithIndex.map(t => s"${t._1}${t._2}")) 

    implicit class Transformer[+T](fut: => SFuture[T]) { 
    def toTask(implicit ec: scala.concurrent.ExecutionContext): Task[T] = { 
     import scala.util.{Failure, Success} 
     import scalaz.syntax.either._ 
     Task.async { 
     register => 
      fut.onComplete { 
      case Success(v) => register(v.right) 
      case Failure(ex) => register(ex.left) 
      } 
     } 
    } 
    } 

    implicit class ConcurrentProcess[O](val process: Process[Task, O]) { 
    def concurrently[O2](concurrencyLevel: Int)(f: Channel[Task, O, O2]): Process[Task, O2] = { 
     val actions = 
     process. 
      zipWith(f)((data, f) => f(data)) 

     val nestedActions = 
     actions.map(P.eval) 

     merge.mergeN(concurrencyLevel)(nestedActions) 
    } 
    } 

    val workChannel = io.channel((s: String) => doWork(s).toTask) 

    val process = Process.emitAll(symbols).concurrently(5)(workChannel) 

    process.run.run 

Khi bạn sẽ có tất cả chuyển đổi này trong phạm vi, về cơ bản tất cả các bạn cần là:

val workChannel = io.channel((s: String) => doWork(s).toTask) 

    val process = Process.emitAll(symbols).concurrently(5)(workChannel) 

Khá ngắn và tự decribing

+0

Cảm ơn, tôi đã cẩn thận tránh scalaz trong thời gian này, vì tôi là khá mới để scala đồng bằng vẫn còn, nhưng điều này có vẻ tốt ... – experquisite

3

Mặc dù bạn đã có một câu trả lời tuyệt vời , Tôi nghĩ tôi vẫn có thể đưa ra một hoặc hai ý kiến ​​về những vấn đề này.

Tôi nhớ đã thấy một nơi nào đó (trên blog của ai đó) "sử dụng diễn viên cho tiểu bang và sử dụng tương lai cho đồng thời".

Vì vậy, suy nghĩ đầu tiên của tôi sẽ là sử dụng diễn viên bằng cách nào đó. Để chính xác, tôi sẽ có một diễn viên chính với một bộ định tuyến khởi chạy nhiều nhân viên công nhân, với số lượng công nhân bị hạn chế theo số allowableParallelism. Vì vậy, giả sử tôi có

def doWorkInternal (symbol: String): Unit 

mà hiện các công việc từ bạn doWork lấy 'bên ngoài của tương lai', tôi sẽ có một cái gì đó dọc theo những dòng (rất thô sơ, không dùng nhiều chi tiết vào xem xét, và thực tế sao chép mã từ AKKA tài liệu):

import akka.actor._ 

case class WorkItem (symbol: String) 
case class WorkItemCompleted (symbol: String) 
case class WorkLoad (symbols: Array[String]) 
case class WorkLoadCompleted() 

class Worker extends Actor { 
    def receive = { 
     case WorkItem (symbol) => 
      doWorkInternal (symbol) 
      sender() ! WorkItemCompleted (symbol) 
    } 
} 

class Master extends Actor { 
    var pending = Set[String]() 
    var originator: Option[ActorRef] = None 

    var router = { 
     val routees = Vector.fill (allowableParallelism) { 
      val r = context.actorOf(Props[Worker]) 
      context watch r 
      ActorRefRoutee(r) 
     } 
     Router (RoundRobinRoutingLogic(), routees) 
    } 

    def receive = { 
     case WorkLoad (symbols) => 
      originator = Some (sender()) 
      context become processing 
      for (symbol <- symbols) { 
       router.route (WorkItem (symbol), self) 
       pending += symbol 
      } 
    } 

    def processing: Receive = { 
     case Terminated (a) => 
      router = router.removeRoutee(a) 
      val r = context.actorOf(Props[Worker]) 
      context watch r 
      router = router.addRoutee(r) 
     case WorkItemCompleted (symbol) => 
      pending -= symbol 
      if (pending.size == 0) { 
       context become receive 
       originator.get ! WorkLoadCompleted 
      } 
    } 
} 

Bạn có thể truy vấn các diễn viên bậc thầy với ask và nhận một WorkLoadCompleted trong một tương lai.Tuy nhiên, suy nghĩ thêm về 'trạng thái' (số lượng yêu cầu đồng thời đang xử lý) bị ẩn ở đâu đó, cùng với việc thực thi mã cần thiết để không vượt quá nó, đây là thứ 'trung gian cổng tương lai' sắp xếp, nếu bạn không ' t nhớ bắt buộc phong cách và có thể thay đổi (được sử dụng trong nội bộ chỉ mặc dù) cấu trúc:

object Guardian 
{ 
    private val incoming = new collection.mutable.HashMap[String, Promise[Unit]]() 
    private val outgoing = new collection.mutable.HashMap[String, Future[Unit]]() 
    private val pending = new collection.mutable.Queue[String] 

    def doWorkGuarded (symbol: String): Future[Unit] = { 
     synchronized { 
      val p = Promise[Unit]() 
      incoming(symbol) = p 
      if (incoming.size <= allowableParallelism) 
       launchWork (symbol) 
      else 
       pending.enqueue (symbol) 
      p.future 
     } 
    } 

    private def completionHandler (t: Try[Unit]): Unit = { 
     synchronized { 
      for (symbol <- outgoing.keySet) { 
       val f = outgoing (symbol) 
       if (f.isCompleted) { 
        incoming (symbol).completeWith (f) 
        incoming.remove (symbol) 
        outgoing.remove (symbol) 
       } 
      } 
      for (i <- outgoing.size to allowableParallelism) { 
       if (pending.nonEmpty) { 
        val symbol = pending.dequeue() 
        launchWork (symbol) 
       } 
      } 
     } 
    } 

    private def launchWork (symbol: String): Unit = { 
     val f = doWork(symbol) 
     outgoing(symbol) = f 
     f.onComplete(completionHandler) 
    } 
} 

doWork tại là chính xác như của bạn, trở Future[Unit], với ý tưởng rằng thay vì sử dụng một cái gì đó giống như

val futures = symbols.map (doWork (_)).toSeq 
val future = Future.sequence(futures) 

đó sẽ khởi động tương lai không liên quan đến allowableParallelism ở tất cả, tôi sẽ thay vì sử dụng

val futures = symbols.map (Guardian.doWorkGuarded (_)).toSeq 
val future = Future.sequence(futures) 

Hãy suy nghĩ về một số lái xe giả thuyết cơ sở dữ liệu truy cập với những người không chặn giao diện, tức là trở về tương lai trên yêu cầu, được giới hạn trong đồng thời bởi được xây dựng trên một số hồ bơi kết nối ví dụ - bạn sẽ không muốn nó quay trở lại tương lai không dùng mức độ song song trong tài khoản, và yêu cầu bạn phải sắp xếp với họ để giữ cho sự kiểm soát song song.

Ví dụ này minh họa hơn thực tế vì tôi thường không mong đợi giao diện 'gửi đi' sẽ sử dụng tương lai như thế này (đây là báo giá ok cho giao diện 'đến').

+0

Cảm ơn. Có ngủ trên đó, tôi nghĩ rằng những gì tôi thực sự cần làm là trừu tượng những gì tôi đang cố gắng để làm thành một combinator, và suy nghĩ về tên đúng. Nó thực sự rất giống với Observable.concatMap, vì vậy có lẽ tôi nên ngồi và suy nghĩ về cách thể hiện các loại tôi cần. Những gì tôi đang cố gắng làm là cơ bản "cho một dòng chảy chậm chạp của các chức năng mà sản xuất tương lai, chuỗi hoàn thành một với việc tạo ra tiếp theo, trở về một tương lai khi tất cả chúng được thực hiện". Cho một cái gì đó như thế, tôi có thể tổng quát hơn về trường hợp đồng thời ... – experquisite

1

Đầu tiên, rõ ràng một số wrapper hoàn toàn chức năng xung quanh Scala's Future là cần thiết, gây ra nó phụ hiệu quả và chạy càng sớm càng tốt. Hãy gọi nó là Deferred:

import scala.concurrent.Future 
import scala.util.control.Exception.nonFatalCatch 

class Deferred[+T](f:() => Future[T]) { 
    def run(): Future[T] = f() 
} 

object Deferred { 
    def apply[T](future: => Future[T]): Deferred[T] = 
    new Deferred(() => nonFatalCatch.either(future).fold(Future.failed, identity)) 
} 

Và đây là những thói quen:

import java.util.concurrent.CopyOnWriteArrayList 
import java.util.concurrent.atomic.AtomicInteger 

import scala.collection.immutable.Seq 
import scala.concurrent.{ExecutionContext, Future, Promise} 
import scala.util.control.Exception.nonFatalCatch 
import scala.util.{Failure, Success} 

trait ConcurrencyUtils {  
    def runWithBoundedParallelism[T](parallelism: Int = Runtime.getRuntime.availableProcessors()) 
            (operations: Seq[Deferred[T]]) 
            (implicit ec: ExecutionContext): Deferred[Seq[T]] = 
    if (parallelism > 0) Deferred { 
     val indexedOps = operations.toIndexedSeq // index for faster access 

     val promise = Promise[Seq[T]]() 

     val acc = new CopyOnWriteArrayList[(Int, T)] // concurrent acc 
     val nextIndex = new AtomicInteger(parallelism) // keep track of the next index atomically 

     def run(operation: Deferred[T], index: Int): Unit = { 
     operation.run().onComplete { 
      case Success(value) => 
      acc.add((index, value)) // accumulate result value 

      if (acc.size == indexedOps.size) { // we've done 
       import scala.collection.JavaConversions._ 
       // in concurrent setting next line may be called multiple times, that's why trySuccess instead of success 
       promise.trySuccess(acc.view.sortBy(_._1).map(_._2).toList) 
      } else { 
       val next = nextIndex.getAndIncrement() // get and inc atomically 
       if (next < indexedOps.size) { // run next operation if exists 
       run(indexedOps(next), next) 
       } 
      } 
      case Failure(t) => 
      promise.tryFailure(t) // same here (may be called multiple times, let's prevent stdout pollution) 
     } 
     } 

     if (operations.nonEmpty) { 
     indexedOps.view.take(parallelism).zipWithIndex.foreach((run _).tupled) // run as much as allowed 
     promise.future 
     } else { 
     Future.successful(Seq.empty) 
     } 
    } else { 
     throw new IllegalArgumentException("Parallelism must be positive") 
    } 
} 

Tóm lại, chúng tôi chạy càng nhiều hoạt động ban đầu là cho phép và sau đó vào mỗi hoàn hoạt động chúng tôi chạy hoạt động tiếp theo có sẵn, nếu bất kì. Vì vậy, khó khăn duy nhất ở đây là duy trì chỉ số hoạt động tiếp theo và kết quả tích lũy trong cài đặt đồng thời. Tôi không phải là một chuyên gia đồng thời tuyệt đối, vì vậy hãy cho tôi biết nếu có một số vấn đề tiềm ẩn trong mã ở trên. Lưu ý rằng giá trị trả lại cũng là một tính toán được hoãn lại phải là run.

Cách sử dụng và kiểm tra:

import org.scalatest.{Matchers, FlatSpec} 
import org.scalatest.concurrent.ScalaFutures 
import org.scalatest.time.{Seconds, Span} 

import scala.collection.immutable.Seq 
import scala.concurrent.ExecutionContext.Implicits.global 
import scala.concurrent.Future 
import scala.concurrent.duration._ 

class ConcurrencyUtilsSpec extends FlatSpec with Matchers with ScalaFutures with ConcurrencyUtils { 

    "runWithBoundedParallelism" should "return results in correct order" in { 
    val comp1 = mkDeferredComputation(1) 
    val comp2 = mkDeferredComputation(2) 
    val comp3 = mkDeferredComputation(3) 
    val comp4 = mkDeferredComputation(4) 
    val comp5 = mkDeferredComputation(5) 

    val compountComp = runWithBoundedParallelism(2)(Seq(comp1, comp2, comp3, comp4, comp5)) 

    whenReady(compountComp.run()) { result => 
     result should be (Seq(1, 2, 3, 4, 5)) 
    } 
    } 

    // increase default ScalaTest patience 
    implicit val defaultPatience = PatienceConfig(timeout = Span(10, Seconds)) 

    private def mkDeferredComputation[T](result: T, sleepDuration: FiniteDuration = 100.millis): Deferred[T] = 
    Deferred { 
     Future { 
     Thread.sleep(sleepDuration.toMillis) 
     result 
     } 
    } 

} 
+0

điều này không bao giờ kết thúc nếu các hoạt động Seq là trống rỗng, trong trường hợp đó runWithBoundedParallelism nên trả về một Future.successful (Seq.empty) – Somatik

+0

@Somatik cảm thấy tự do để cải thiện câu trả lời – Tvaroh

+0

Áp dụng sửa lỗi cho mã ví dụ – Somatik

Các vấn đề liên quan