2011-08-06 47 views
9

Tôi ngạc nhiên bởi một tràn ngăn xếp trong chương trình dựa trên async của tôi. Tôi nghi ngờ vấn đề chính là với các chức năng sau đây, mà là vụ phải soạn hai tính async để thực hiện song song và chờ cho cả đến khi kết thúc:F # tràn bộ đệm async

let (<|>) (a: Async<unit>) (b: Async<unit>) = 
    async { 
     let! x = Async.StartChild a 
     let! y = Async.StartChild b 
     do! x 
     do! y 
    } 

Với điều này được xác định, tôi đã sau chương trình mapReduce mà cố gắng để khai thác song song trong cả hai phần mapreduce. Một cách không chính thức, ý tưởng là để kích hoạt các trình giảm tốc NN-1 sử dụng kênh được chia sẻ, chờ cho đến khi kết thúc và đọc kết quả từ kênh. Tôi đã có riêng Channel thực hiện của tôi, ở đây thay thế bằng một ConcurrentBag cho mã ngắn hơn (vấn đề ảnh hưởng đến cả hai):

let mapReduce (map : 'T1 -> Async<'T2>) 
       (reduce : 'T2 -> 'T2 -> Async<'T2>) 
       (input : seq<'T1>) : Async<'T2> = 
    let bag = System.Collections.Concurrent.ConcurrentBag() 

    let rec read() = 
     async { 
      match bag.TryTake() with 
      | true, value -> return value 
      | _   -> do! Async.Sleep 100 
          return! read() 
     } 

    let write x = 
     bag.Add x 
     async.Return() 

    let reducer = 
     async { 
      let! x = read() 
      let! y = read() 
      let! r = reduce x y 
      return bag.Add r 
     } 

    let work = 
     input 
     |> Seq.map (fun x -> async.Bind(map x, write)) 
     |> Seq.reduce (fun m1 m2 -> m1 <|> m2 <|> reducer) 

    async { 
     do! work 
     return! read() 
    } 

Bây giờ kiểm tra cơ bản sau đây bắt đầu ném StackOverflowException trên n = 10000:

let test n = 
    let map x  = async.Return x 
    let reduce x y = async.Return (x + y) 
    mapReduce map reduce [0..n] 
    |> Async.RunSynchronously 

EDIT: An thực hiện thay thế của <|> combinator làm cho thử nghiệm thành công trên N = 10000:

let (<|>) (a: Async<unit>) (b: Async<unit>) = 
    Async.FromContinuations(fun (ok, _, _) -> 
    let count = ref 0 
    let ok() = 
     lock count (fun() -> 
      match !count with 
      | 0 -> incr count 
      | _ -> ok()) 
    Async.Start <| 
     async { 
      do! a 
      return ok() 
     } 
    Async.Start <| 
     async { 
      do! b 
      return ok() 
     }) 

Đây là r Tôi thực sự ngạc nhiên vì đây là điều tôi cho là Async.StartChild đang hoạt động. Bất kỳ suy nghĩ về giải pháp nào sẽ là tối ưu?

Trả lời

4

Tôi nghĩ rằng ngoại lệ tràn ngăn xếp xảy ra khi bắt đầu quy trình công việc không đồng bộ được tạo bằng toán tử <|>. Các cuộc gọi đến Async.StartChild bắt đầu công việc đầu tiên, được kết hợp sử dụng <|> và vì vậy nó làm cho một cuộc gọi đến Async.StartChild, vv

Một cách dễ dàng để sửa chữa nó là để sắp xếp các công việc trong một handler của một bộ đếm thời gian (để nó không được thêm vào ngăn xếp hiện tại). Một cái gì đó như:

let (<|>) (a: Async<unit>) (b: Async<unit>) = 
    async { 
     do! Async.Sleep 1 
     let! x = Async.StartChild a 
     let! y = Async.StartChild b 
     do! x 
     do! y } 

Cách tốt hơn để sửa chữa nó sẽ được tạo riêng Seq.reduce của bạn - việc thực hiện hiện nếp gấp nó trở thành một-by-một, do đó bạn sẽ nhận được một cây có chiều sâu 10000, mà chỉ chứa một đơn mục công việc ở bên phải và tất cả các mục công việc khác ở bên trái. Nếu bạn đã tạo một cây nhị phân có khối lượng của các mục công việc, thì nó sẽ không ngăn xếp luồng vì chiều cao sẽ chỉ là 15 hoặc hơn.

EDIT Hãy thử thay thế Seq.reduce với chức năng sau:

module Seq = 
    let reduceBallanced f input = 
    let arr = input |> Array.ofSeq 
    let rec reduce s t = 
     if s + 1 >= t then arr.[s] 
     else 
     let m = (s + t)/2 
     f (reduce s m) (reduce m t) 
    reduce 0 arr.Length 
+0

Sử dụng '1' Async.Sleep làm cho mã chậm hơn nhiều. Mặc dù điều đó có thể sẽ không hiển thị khi các hàm 'map' và' reduce' thực sự đã thực hiện một số công việc hữu ích, mất một thời gian. – svick

+0

Thực ra, tôi không quan tâm đến sự phức tạp về thời gian/không gian ngay bây giờ - Tôi thực sự ngạc nhiên khi mã sử dụng ngăn xếp! Nếu cấu trúc nằm trên heap thì điều này sẽ tốt cho n = 10K. – t0yv0

+0

@toyvo - Vâng, tràn ngăn xảy ra khi bắt đầu quy trình công việc, rõ ràng là sử dụng ngăn xếp. Chạy quy trình làm việc không cần ngăn xếp. Cách giải quyết 'Sleep 1' chỉ là để chứng minh rằng đây thực sự là vấn đề - bằng cách sử dụng' Seq.reduce' để xây dựng một cây nhị phân được giải mã nên giải quyết vấn đề mà không cần thêm phí. –

2

Tôi tin rằng Tomas có trực giác ngay trong câu trả lời, nhưng ở đây nó là theo cách của tôi và cụ thể hơn, sau khi trải qua khá nhiều thời gian để tìm ra điều này.

  1. Vấn đề ở chỗ mã trên không thực hiện thuật toán mapReduce dự định do đồng bộ hóa quá mức. Cụ thể, a <|> b <|> c không bắt đầu c trước khi cả hai ab đã hoàn tất, vì vậy trên thực tế, <|> vô dụng đối với tính song song với hơn hai phép tính.

  2. Vấn đề thứ hai là async.Return x là đẳng cấu cho Async.FromContinuations(fun (ok, _, _) -> ok x).Ví dụ sau đó trong thực tế thực hiện tuần tự, trên các sợi đơn, và đóng cửa được phân bổ thổi các ngăn xếp.

Đối với người đọc tò mò, dưới đây là nỗ lực thứ hai của tôi để thiết kế thuật toán này, mà dường như giá vé tốt hơn một chút (~ 1 giây trên n=100000 và ~ 21 giây trên n=100000 với bản đồ và giảm chức năng mở rộng với Async.Sleep 1000, Tôi có Core i3).

let mapReduce (map : 'T1 -> Async<'T2>) 
       (reduce : 'T2 -> 'T2 -> Async<'T2>) 
       (input : seq<'T1>) : Async<'T2> = 
    let run (a: Async<'T>) (k: 'T -> unit) = 
     Async.StartWithContinuations(a, k, ignore, ignore) 
    Async.FromContinuations <| fun (ok, _, _) -> 
     let k = ref 0 
     let agent = 
      new MailboxProcessor<_>(fun chan -> 
       async { 
        for i in 2 .. k.Value do 
         let! x = chan.Receive() 
         let! y = chan.Receive() 
         return run (reduce x y) chan.Post 
        let! r = chan.Receive() 
        return ok r 
       }) 
     k := 
      (0, input) 
      ||> Seq.fold (fun count x -> 
       run (map x) agent.Post 
       count + 1) 
     agent.Start() 
0

khác, thực hiện đơn giản có thể được cái gì đó như:

let mapReduce' (map : 'T1 -> Async<'T2>) 
       (reduce : 'T2 -> 'T2 -> Async<'T2>) 
       (input : seq<'T1>) : Async<'T2> = 
     async { 
      let! r = input |> Seq.map map |> Async.Parallel 
      return r |> Array.toSeq 
        |> Seq.reduce (fun a b -> reduce a b |> Async.RunSynchronously) 

     } 

Trong giai đoạn bản đồ được thực hiện trong Parallel và sau đó giảm giai đoạn là tuần tự vì nó có sự phụ thuộc dữ liệu về giá trị tính toán trước.

+0

Khi bạn chỉ ra, giảm ở đây không thể xen kẽ hoặc sắp xếp lại, do đó, nó không giống nhau. – t0yv0

2

Thảo luận rất thú vị! Tôi đã có một vấn đề tương tự với Async.Parallel

let (<||>) first second = async { let! results = Async.Parallel([|first; second|]) in return (results.[0], results.[1]) } 

let test = async { do! Async.Sleep 100 } 
(test, [1..10000]) 
||> List.fold (fun state value -> (test <||> state) |> Async.Ignore) 
|> Async.RunSynchronously // stackoverflow 

Tôi đã rất thất vọng ... vì vậy tôi giải quyết nó bằng cách tạo combinator Parallel của riêng tôi.

let parallel<'T>(computations : Async<'T> []) : Async<'T []> = 
    Async.FromContinuations (fun (cont, exnCont, _) -> 
    let count = ref computations.Length 
    let results : 'T [] = Array.zeroCreate computations.Length 
    computations 
     |> Array.iteri (fun i computation -> 
      Async.Start <| 
       async { 
        try 
         let! res = computation 
         results.[i] <- res 
        with ex -> exnCont ex 

        let n = System.Threading.Interlocked.Decrement(count) 
        if n = 0 then 
         results |> cont 
       })) 

Và cuối cùng lấy cảm hứng từ các cuộc thảo luận, tôi thực hiện các chức năng MapReduce sau

// (|f ,⊗|) 

let mapReduce (mapF : 'T -> Async<'R>) (reduceF : 'R -> 'R -> Async<'R>) (input : 'T []) : Async<'R> = 
let rec mapReduce' s e = 
    async { 
     if s + 1 >= e then return! mapF input.[s] 
     else 
      let m = (s + e)/2 
      let! (left, right) = mapReduce' s m <||> mapReduce' m e 
      return! reduceF left right 
    } 
mapReduce' 0 input.Length 
+1

Tuyệt vời! Tôi thích nó rất nhiều. Tôi tự hỏi nếu Tomas có giải pháp này trong tâm trí trong bình luận của mình. Tôi nghĩ rằng sự khác biệt giữa điều này và câu trả lời của riêng tôi là bạn đang sử dụng một chương trình giảm cứng nhắc, trong khi tôi đang sử dụng đầu tiên đến trước-phục vụ. Tôi sẽ tưởng tượng rằng đối với một số đầu vào giải pháp của tôi sẽ đưa ra một thứ tự giảm tốt hơn, nhưng đối với hầu hết các yếu tố đầu vào sẽ chậm hơn vì sự phối hợp trên không. Tôi có thể chơi người ủng hộ và xây dựng đầu vào của ma quỷ, nơi giải pháp của tôi thắng (sử dụng nhiều bản đồ nhanh, một bản đồ rất chậm, và giảm chậm): https://gist.github.com/1131917 – t0yv0

+0

Một ý tưởng quan trọng là giải pháp đại số của tôi yêu cầu chức năng giảm phải tuân theo luật liên kết. (kiểm tra danh sách đồng cấu) –

+0

Chắc chắn, giải pháp của tôi cũng đòi hỏi phải giảm được kết hợp. Tuy nhiên, bạn không khai thác triệt để sự tự do mà nó cung cấp, làm cho việc giảm bớt đôi khi chờ đợi một cách không cần thiết. – t0yv0

Các vấn đề liên quan