2013-08-27 22 views
14

Giả sử tôi có mô hình sản xuất/tiêu dùng đơn giản, nơi người tiêu dùng muốn trả lại một số trạng thái cho nhà sản xuất. Ví dụ, để cho các đối tượng lưu lượng hạ lưu là các đối tượng mà chúng ta muốn ghi vào một tệp và các đối tượng ngược dòng là một số mã đại diện cho nơi đối tượng được ghi trong tệp (ví dụ: một offset).Ống hai chiều thành ngữ với trạng thái hạ lưu mà không bị mất

Hai quá trình có thể giống như thế này (với pipes-4.0),

{-# LANGUAGE GeneralizedNewtypeDeriving #-} 

import Pipes 
import Pipes.Core 
import Control.Monad.Trans.State  
import Control.Monad 

newtype Object = Obj Int 
       deriving (Show) 

newtype ObjectId = ObjId Int 
       deriving (Show, Num) 

writeObjects :: Proxy ObjectId Object() X IO r 
writeObjects = evalStateT (forever go) (ObjId 0) 
    where go = do i <- get 
       obj <- lift $ request i 
       lift $ lift $ putStrLn $ "Wrote "++show obj 
       modify (+1) 

produceObjects :: [Object] -> Proxy X() ObjectId Object IO() 
produceObjects = go 
    where go [] = return() 
     go (obj:rest) = do 
      lift $ putStrLn $ "Producing "++show obj 
      objId <- respond obj 
      lift $ putStrLn $ "Object "++show obj++" has ID "++show objId 
      go rest 

objects = [ Obj i | i <- [0..10] ] 

Đơn giản vì điều này có thể được, tôi đã có một chút công bằng của khó khăn lý luận về cách soạn chúng. Lý tưởng nhất, chúng tôi muốn một dòng chảy đẩy dựa trên tầm kiểm soát như sau,

  1. writeObjects bắt đầu bằng cách chặn trên request, sau khi gửi ban đầu ObjId 0 thượng nguồn.
  2. produceObjects gửi đối tượng đầu tiên, Obj 0, hạ lưu
  3. writeObjects viết đối tượng và increments trạng thái của nó, và chờ đợi trên request, lần này gửi ObjId 1 thượng nguồn
  4. respond trong produceObjects lợi nhuận với ObjId 0
  5. produceObjects tiếp tục ở Bước (2) với đối tượng thứ hai, Obj 1

Nỗ lực ban đầu của tôi là với thành phần push-based như sau,

main = void $ run $ produceObjects objects >>~ const writeObjects 

Lưu ý việc sử dụng const để làm việc xung quanh các loại khác không tương thích (đây là khả năng mà vấn đề nằm). Trong trường hợp này, tuy nhiên, chúng tôi thấy rằng ObjId 0 được ăn,

Producing Obj 0 
Wrote Obj 0 
Object Obj 0 has ID ObjId 1 
Producing Obj 1 
... 

Một cách tiếp cận theo mô hình pull,

main = void $ run $ const (produceObjects objects) +>> writeObjects 

bị một vấn đề tương tự, lần này thả Obj 0.

Làm thế nào người ta có thể sắp xếp các phần này theo cách mong muốn?

Trả lời

14

Lựa chọn chế phẩm để sử dụng phụ thuộc vào thành phần nào nên bắt đầu toàn bộ quá trình. Nếu bạn muốn đường ống hạ lưu bắt đầu quá trình thì bạn muốn sử dụng thành phần dựa trên kéo (tức là (>+>)/(+>>)) nhưng nếu bạn muốn ống thượng nguồn bắt đầu quá trình thì bạn nên sử dụng bố cục dựa trên đẩy (tức là (>>~)/(>~>)) . Các lỗi loại bạn đã thực sự cảnh báo bạn rằng có một lỗi logic trong mã của bạn: bạn chưa xác định rõ ràng thành phần nào bắt đầu quá trình đầu tiên.

Từ mô tả của bạn, rõ ràng là bạn muốn luồng điều khiển bắt đầu từ produceObjects để bạn muốn sử dụng bố cục dựa trên đẩy. Khi bạn sử dụng thành phần dựa trên đẩy, loại toán tử thành phần sẽ cho bạn biết mọi thứ bạn cần biết về cách sửa mã của bạn.Tôi sẽ đưa loại của nó và chuyên nó để chuỗi sáng tác của bạn:

-- Here I'm using the `Server` and `Client` type synonyms to simplify the types 
(>>~) :: Server ObjectId Object IO() 
     -> (Object -> Client ObjectId Object IO()) 
     -> Effect IO() 

Như bạn đã thấy, sai số loại mà bạn nhận được khi bạn cố gắng sử dụng (>>~) nói với bạn rằng bạn bị mất tích một đối số kiểu Object để bạn Hàm writeObjects. Điều này thực thi tĩnh mà bạn không thể chạy bất kỳ mã nào trong writeObjects trước khi nhận được Object đầu tiên của bạn (thông qua đối số ban đầu).

Giải pháp là phải viết lại chức năng writeObjects của bạn như thế này:

writeObjects :: Object -> Proxy ObjectId Object() X IO r 
writeObjects obj0 = evalStateT (go obj0) (ObjId 0) 
    where go obj = do i <- get 
        lift $ lift $ putStrLn $ "Wrote "++ show obj 
        modify (+1) 
        obj' <- lift $ request i 
        go obj' 

này sau đó cung cấp cho các hành vi đúng:

>>> run $ produceObjects objects >>~ writeObjects 
Producing Obj 0 
Wrote Obj 0 
Object Obj 0 has ID ObjId 0 
Producing Obj 1 
Wrote Obj 1 
Object Obj 1 has ID ObjId 1 
Producing Obj 2 
Wrote Obj 2 
Object Obj 2 has ID ObjId 2 
Producing Obj 3 
Wrote Obj 3 
Object Obj 3 has ID ObjId 3 
Producing Obj 4 
Wrote Obj 4 
Object Obj 4 has ID ObjId 4 
Producing Obj 5 
Wrote Obj 5 
Object Obj 5 has ID ObjId 5 
Producing Obj 6 
Wrote Obj 6 
Object Obj 6 has ID ObjId 6 
Producing Obj 7 
Wrote Obj 7 
Object Obj 7 has ID ObjId 7 
Producing Obj 8 
Wrote Obj 8 
Object Obj 8 has ID ObjId 8 
Producing Obj 9 
Wrote Obj 9 
Object Obj 9 has ID ObjId 9 
Producing Obj 10 
Wrote Obj 10 
Object Obj 10 has ID ObjId 10 

Bạn có thể tự hỏi tại sao yêu cầu này mà một trong hai ống có một lập luận ban đầu có ý nghĩa, khác với sự biện minh trừu tượng rằng đây là điều luật pháp yêu cầu. Giải thích bằng tiếng Anh đơn giản là phương án thay thế là bạn sẽ cần bộ đệm đầu tiên được truyền Object "ở giữa" hai đường ống trước khi writeObjects đạt được tuyên bố request đầu tiên. Cách tiếp cận này tạo ra rất nhiều hành vi có vấn đề và các trường hợp góc lỗi, nhưng có lẽ vấn đề quan trọng nhất là thành phần ống sẽ không còn liên kết nữa và thứ tự của các hiệu ứng sẽ thay đổi dựa trên thứ tự mà bạn tạo ra.

Điều thú vị về các toán tử thành phần ống hai chiều là các loại hoạt động sao cho bạn luôn có thể suy đoán thành phần có hoạt động hay không (tức là chờ đầu vào) thuần túy bằng cách nghiên cứu loại. Nếu bố cục nói rằng một đường ống nhất định (như writeObjects) phải có một đối số, thì nó là thụ động. Nếu không có đối số (như produceObjects), thì nó hoạt động và bắt đầu kiểm soát. Vì vậy, thành phần buộc bạn phải có tối đa một đường ống hoạt động trong đường ống của bạn (đường ống không lấy đối số ban đầu) và đó là đường ống bắt đầu kiểm soát.

4

'const là nơi bạn đang xóa dữ liệu. Để có được tất cả các dữ liệu, bạn có thể muốn làm một công việc push-based như sau:

writeObjects :: Object -> Proxy ObjectId Object() X IO r 
writeObjects obj = go 0 obj 
    where 
    go objid obj = do 
     lift $ putStrLn $ "Wrote "++show obj 
     obj' <- request objid 
     go (objid + 1) obj' 

-- produceObjects as before 

main = void $ run $ produceObjects objects >>~ writeObjects 
2

Chúng tôi đã thảo luận này trên mailing list, nhưng tôi figured tôi muốn ném nó lên đây như tốt cho những ai quan tâm.

Vấn đề của bạn là bạn có hai coroutines, cả hai đều sẵn sàng để nhổ ra các giá trị với nhau. Không ai cần đầu vào của người khác để tạo ra một giá trị. Vậy ai sẽ đi trước? Vâng, bạn nói nó tự hỏi:

writeObjects bắt đầu bằng cách ngăn chặn theo yêu cầu, sau khi gửi ban đầu ObjId 0 thượng nguồn

rồi sau đó, có nghĩa là chúng ta cần phải trì hoãn produceObjects để nó chờ đợi một ObjId tín hiệu trước phun ra các đối tượng tương ứng (mặc dù nó dường như không cần ID nói).

Nhúng vào nội dung proxy, đây là câu thần chú mà tôi sẽ không bận tâm giải thích rất cẩn thận vào lúc này.Ý tưởng cơ bản là chỉ để lấy đầu vào trước khi bạn cần nó, sau đó áp dụng các đầu vào khi cần thiết, nhưng sau đó giả vờ như bạn cần một đầu vào mới (mặc dù bạn không cần rằng một chỉ được nêu ra):

delayD :: (Monad m) => Proxy a' a b' b m r -> b' -> Proxy a' a b' b m r 
delayD p0 b' = case p0 of 
    Request a' f -> Request a' (go . f) 
    Respond b g -> Respond b (delayD (g b')) 
    M m   -> M (liftM go m) 
    Pure r  -> Pure r 
    where 
    go p = delayD p b' 

Bây giờ, bạn có thể sử dụng trên produceObjects objects thay vì const, và nỗ lực thứ hai của bạn hoạt động như mong muốn:

delayD (produceObjects objects) +>> writeObjects 

Chúng tôi đang thảo luận delayD trên mailing list để xem nếu nó xứng đáng đưa vào ống tiêu chuẩn tiết mục.

Các vấn đề liên quan