2013-03-24 28 views
8

Tôi đang cố tạo ống dẫn có thể tiêu thụ nhiều luồng đầu vào. Tôi cần có khả năng chờ đợi một hoặc các luồng đầu vào khác không theo thứ tự cụ thể (ví dụ: không xen kẽ) làm cho zip vô dụng. Không có gì song song hoặc không xác định xảy ra ở đây: Tôi đang chờ trên một dòng này hay dòng kia. Tôi muốn để có thể viết mã tương tự như sau (nơi awaitAawaitB đang chờ đợi trên dòng nhập dữ liệu đầu tiên hoặc thứ hai tương ứng):Sấy ống dẫn với nhiều đầu vào

do 
    _ <- awaitA 
    x <- awaitA 
    y <- awaitB 
    yield (x,y) 
    _ <- awaitB 
    _ <- awaitB 
    y' <- awaitB 
    yield (x,y') 

Giải pháp tốt nhất mà tôi có là làm cho đơn nguyên bên trong ống dẫn khác, ví dụ

foo :: Sink i1 (ConduitM i2 o m)() 

Mà sau đó cho phép

awaitA = await 
awaitB = lift await 

Và điều này chủ yếu hoạt động. Thật không may, điều này dường như làm cho nó rất khó khăn để cầu chì vào ống dẫn bên trong trước khi ống dẫn bên ngoài được kết nối hoàn toàn. Điều đầu tiên tôi thử là:

fuseInner :: Monad m => 
       Conduit i2' m i2 -> 
       Sink i1 (ConduitM i2 o m)() -> 
       Sink i1 (ConduitM i2' o m)() 
fuseInner x = transPipe (x =$=) 

Nhưng điều này không làm việc, ít nhất là khi x là stateful từ (x =$=) được chạy nhiều lần, khởi động lại hiệu quả x mỗi lần.

Có cách nào để viết fuseInner, ngắn đột nhập vào nội bộ của ống dẫn (có vẻ như nó sẽ là khá lộn xộn)? Có cách nào tốt hơn để xử lý nhiều luồng đầu vào không? Tôi chỉ là cách để vượt xa những gì conduit được thiết kế cho?

Cảm ơn!

+3

Tôi giả sử rằng bạn muốn nói rằng bạn muốn nhận các phần tử khi chúng được tạo ra từ hai luồng đầu vào 'IO'. Sử dụng 'stm-conduit' để làm điều này. –

+0

Tôi đã đọc câu hỏi được cập nhật của bạn. Là [this] (http://stackoverflow.com/questions/12496654/is-there-an-iteratee-like-concept-which-pulls-data-from-multiple-sources/12497593#12497593) gần hơn với những gì bạn có trong tâm trí? Nếu vậy, tôi có thể sửa đổi nó thành phiên bản 'conduit' tương đương và gửi nó như là một câu trả lời. –

+0

Tôi nghĩ rằng liên kết mô tả chính xác những gì tôi đã thử ('foo' là một' Sink' trên một 'ConduitM' monad). Vấn đề là, tôi không thể tìm ra cách kết hợp với ống dẫn bên trong bằng chiến lược này. – Benson

Trả lời

3

Nếu bạn muốn kết hợp hai luồng IO -generated, thì nhận xét của Gabriel là giải pháp.

Nếu không, bạn không thể đợi cả hai luồng, luồng nào tạo ra giá trị trước tiên. Ống dẫn là đơn luồng và xác định - nó chỉ xử lý một đường ống tại một thời điểm. Nhưng bạn có thể tạo một hàm interleaves hai con suối, cho phép họ quyết định khi nào chuyển:

{-# OPTIONS_GHC -fwarn-incomplete-patterns #-} 
import Control.Monad (liftM) 
import Data.Conduit.Internal (
    Pipe (..), Source, Sink, 
    injectLeftovers, ConduitM (..), 
    mapOutput, mapOutputMaybe 
) 

-- | Alternate two given sources, running one until it yields `Nothing`, 
-- then switching to the other one. 
merge :: Monad m 
     => Source m (Maybe a) 
     -> Source m (Maybe b) 
     -> Source m (Either a b) 
merge (ConduitM l) (ConduitM r) = ConduitM $ goL l r 
    where 
    goL :: Monad m => Pipe()() (Maybe a)() m() 
        -> Pipe()() (Maybe b)() m() 
        -> Pipe()() (Either a b)() m() 
    goL (Leftover l()) r   = goL l r 
    goL (NeedInput _ c) r   = goL (c()) r 
    goL (PipeM mx) r    = PipeM $ liftM (`goL` r) mx 
    goL (Done _) r     = mapOutputMaybe (liftM Right) r 
    goL (HaveOutput c f (Just o)) r = HaveOutput (goL c r) f (Left o) 
    goL (HaveOutput c f Nothing) r = goR c r 
    -- This is just a mirror copy of goL. We should combine them together to 
    -- avoid code repetition. 
    goR :: Monad m => Pipe()() (Maybe a)() m() 
        -> Pipe()() (Maybe b)() m() 
        -> Pipe()() (Either a b)() m() 
    goR l (Leftover r())   = goR l r 
    goR l (NeedInput _ c)   = goR l (c()) 
    goR l (PipeM mx)    = PipeM $ liftM (goR l) mx 
    goR l (Done _)     = mapOutputMaybe (liftM Left) l 
    goR l (HaveOutput c f (Just o)) = HaveOutput (goR l c) f (Right o) 
    goR l (HaveOutput c f Nothing) = goL l c 

Nó xử lý một nguồn cho đến khi nó trả Nothing, sau đó chuyển sang khác, vv Nếu một kết thúc nguồn, một trong những khác là được xử lý đến cuối.

Như một ví dụ, chúng ta có thể kết hợp và interleave hai danh sách:

import Control.Monad.Trans 
import Data.Conduit (($$), awaitForever) 
import Data.Conduit.List (sourceList) 

main = (merge (sourceList $ concatMap (\x -> [Just x, Just x, Nothing]) [ 1..10]) 
       (sourceList $ concatMap (\x -> [Just x, Nothing]) [101..110])) 
     $$ awaitForever (\x -> lift $ print x) 

Nếu bạn cần nhiều nguồn khác nhau, merge có thể thích nghi với một cái gì đó giống như

mergeList :: Monad m => [Source m (Maybe a)] -> Source m a 

mà sẽ lướt qua danh sách các nguồn đã cho đến khi tất cả các nguồn được hoàn thành.

+0

Tôi nghĩ cả giải pháp của bạn lẫn nhận xét của Gabriel đều giả định rằng tôi muốn các luồng thượng nguồn quyết định điều gì sẽ xảy ra tiếp theo, đó không phải là điều tôi sẽ làm. Tôi đã cập nhật câu hỏi của mình để cố gắng làm rõ. – Benson

+0

@Benson Tôi không chắc chắn nếu điều này là có thể, bởi vì _conduit_ 'chờ đợi' (hoặc cụ thể hơn 'NeedInput') không vượt qua bất kỳ thông tin thượng nguồn có thể được sử dụng để quyết định những gì dòng để đọc. Thứ tự của các giá trị nhận được từ thượng lưu không thể bị ảnh hưởng bởi một ống dẫn. Tuy nhiên điều này có vẻ là có thể với [ống] (http://hackage.haskell.org/package/pipes).Chúng là hai chiều và ['request'] (http://hackage.haskell.org/packages/archive/pipes/3.2.0/doc/html/Control-Proxy-Class.html#v:request) cho phép thông tin được gửi ngược dòng, có thể được sử dụng để chọn một trong hai luồng. –

3

Điều này có thể được thực hiện bằng cách đi sâu vào bên trong ống dẫn. Tôi muốn tránh điều này vì nó trông cực kỳ lộn xộn. Dựa trên các phản ứng ở đây, có vẻ như không có cách nào xung quanh nó (nhưng tôi thực sự sẽ đánh giá cao một giải pháp sạch hơn).

Khó khăn chính là (x =$=) là một chức năng thuần túy, nhưng để làm cho transPipe cho câu trả lời đúng, nó cần một loại trạng thái, chức năng giống như điều:

data StatefulMorph m n = StatefulMorph 
    { stepStatefulMorph :: forall a. m a -> n (StatefulMorph m n, a) 
    , finalizeStatefulMorph :: n() } 

Bước StatefulMorph m n mất một giá trị trong m và trả về, trong n, cả giá trị đó và giá trị StatefulMorph tiếp theo, sẽ được sử dụng để chuyển đổi giá trị m tiếp theo. Cuối cùng StatefulMorph nên được hoàn thành (trong đó, trong trường hợp của "stateful (x =$=)", để hoàn tất x ống dẫn

Conduit hợp có thể được thực hiện như một StatefulMorph, bằng cách sử dụng mã cho pipeL với những thay đổi nhỏ Chữ ký là..:

fuseStateful :: Monad m 
      => Conduit a m b 
      -> StatefulMorph (ConduitM b c m) (ConduitM a c m) 

tôi cũng cần một sự thay thế cho transPipe (một trường hợp đặc biệt của hoist) có sử dụng StatefulMorph giá trị thay vì chức năng.

class StatefulHoist t where 
    statefulHoist :: (Monad m, Monad n) 
        => StatefulMorph m n 
        -> t m r -> t n r 

Ví dụ StatefulHoist cho ConduitM i o có thể được viết bằng mã cho transPipe với một số thay đổi nhỏ.

fuseInner sau đó dễ triển khai.

fuseInner :: Monad m 
      => Conduit a m b 
      -> ConduitM i o (ConduitM b c m) r 
      -> ConduitM i o (ConduitM a c m) r 
fuseInner left = statefulHoist (fuseStateful left) 

Tôi đã viết một lời giải thích chi tiết hơn here và đăng tải đầy đủ mã here. Nếu ai đó có thể đưa ra giải pháp sạch hơn hoặc một giải pháp sử dụng API công cộng ống dẫn, vui lòng đăng nó.

Cảm ơn tất cả các đề xuất và đầu vào!

Các vấn đề liên quan