2013-07-18 33 views
6

Tôi đang làm việc trên ứng dụng mạng haskell và tôi sử dụng mẫu diễn viên để quản lý đa luồng. Một điều tôi gặp phải là làm thế nào để lưu trữ ví dụ một tập hợp các ổ cắm/tay cầm của khách hàng. Tất nhiên là phải có thể truy cập cho tất cả các chủ đề và có thể thay đổi khi khách hàng đăng nhập/tắt.Haskell - Biến thể dựa trên diễn viên

Vì tôi là đến từ thế giới bắt buộc Tôi nghĩ về một số loại khóa cơ chế nhưng khi tôi nhận thấy như thế nào xấu xí này là tôi nghĩ về mutability "tinh khiết", cũng thực sự nó là loại tinh khiết:

import Control.Concurrent 
import Control.Monad 
import Network 
import System.IO 
import Data.List 
import Data.Maybe 
import System.Environment 
import Control.Exception 


newStorage :: (Eq a, Show a) => IO (Chan (String, Maybe (Chan [a]), Maybe a)) 
newStorage = do 
    q <- newChan 
    forkIO $ storage [] q 
    return q 


newHandleStorage :: IO (Chan (String, Maybe (Chan [Handle]), Maybe Handle)) 
newHandleStorage = newStorage 


storage :: (Eq a, Show a) => [a] -> Chan (String, Maybe (Chan [a]), Maybe a) -> IO() 
storage s q = do 
    let loop = (`storage` q) 
    (req, reply, d) <- readChan q 
    print ("processing " ++ show(d)) 
    case req of 
    "add" -> loop ((fromJust d) : s) 
    "remove" -> loop (delete (fromJust d) s) 
    "get" -> do 
     writeChan (fromJust reply) s 
     loop s 


store s d = writeChan s ("add", Nothing, Just d) 
unstore s d = writeChan s ("remove", Nothing, Just d) 
request s = do 
    chan <- newChan 
    writeChan s ("get", Just chan, Nothing) 
    readChan chan 

Vấn đề là một chủ đề (diễn viên) đang quản lý danh sách các mục và sửa đổi danh sách theo yêu cầu gửi đến. Kể từ khi thread thực sự rẻ tôi nghĩ rằng đây có thể là một thay thế chức năng thực sự tốt đẹp.

Tất nhiên đây chỉ là một mẫu thử nghiệm (một bằng chứng bẩn nhanh về khái niệm). Vì vậy, câu hỏi của tôi là:

  1. Đây có phải là cách "tốt" để quản lý các biến có thể thay đổi được chia sẻ (trong thế giới diễn viên) không?
  2. Đã có thư viện cho mẫu này chưa? (Tôi đã tìm kiếm nhưng tôi thấy không có gì)

Kính trọng, Chris

+3

Nếu bạn sẵn sàng khám phá các lựa chọn thay thế cho mô hình diễn viên, tôi khuyên bạn nên dùng thử [Software Transactional Memory] của Haskell (https://en.wikipedia.org/wiki/Software_transactional_memory). Đó là một cơ chế tuyệt đẹp tương tự như các giao dịch cơ sở dữ liệu. Xem [Chương 28] (http://book.realworldhaskell.org/read/software-transactional-memory.html) trong The Real World Haskell. –

+0

Về mặt kỹ thuật là một lựa chọn tuyệt vời nhưng tôi nghe nói rằng sử dụng STM với một số lượng lớn các chủ đề (một luồng cho mỗi khách hàng là tiêu chuẩn trong haskell) và các hoạt động tương đối dài (xóa một mục khỏi danh sách là O (n), tất nhiên là bộ băm/bản đồ có thể giúp đỡ ở đây) có thể làm giảm hiệu suất của STM bằng một số lượng lớn. Và tất nhiên kênh MVar có thể được thay thế bằng kênh STM có nghĩa là sử dụng tốt nhất của hai kỹ thuật. EDIT: Các mẫu diễn viên nói chung là thực sự tốt đẹp trong tình huống như vậy, bởi vì xóa/thêm một mục là O (1) (chỉ cần gửi một tin nhắn) Công việc thực tế được thực hiện trong một chủ đề ... – Kr0e

+0

Bạn nói đúng. Với STM nó có thể xảy ra rằng các giao dịch được khởi động lại nhiều lần, dẫn đến hiệu suất giảm. Nhưng nếu các hoạt động đồng bộ của bạn mất nhiều thời gian, bạn cũng có thể gặp các vấn đề tương tự với các diễn viên - nếu có nhiều thông điệp hơn nó có thể xử lý, trạng thái của nó sẽ tụt lại phía sau thực tế. Vì vậy, việc sử dụng các cây cân bằng ('Map' /' Set') hoặc các bộ băm dựa trên 'ST/IO' chắc chắn sẽ giúp ích. –

Trả lời

6

Dưới đây là một ví dụ nhanh chóng và bẩn sử dụng stmpipes-network. Điều này sẽ thiết lập một máy chủ đơn giản cho phép khách hàng kết nối và tăng hoặc giảm bộ đếm. Nó sẽ hiển thị một thanh trạng thái rất đơn giản hiển thị các mức cao hiện tại của tất cả các máy khách được kết nối và sẽ loại bỏ các mức cao của khách hàng khỏi thanh khi chúng ngắt kết nối.

Đầu tiên tôi sẽ bắt đầu với máy chủ, và tôi đã hào phóng comment mã để giải thích cách hoạt động:

import Control.Concurrent.STM (STM, atomically) 
import Control.Concurrent.STM.TVar 
import qualified Data.HashMap.Strict as H 
import Data.Foldable (forM_) 

import Control.Concurrent (forkIO, threadDelay) 
import Control.Monad (unless) 
import Control.Monad.Trans.State.Strict 
import qualified Data.ByteString.Char8 as B 
import Control.Proxy 
import Control.Proxy.TCP 
import System.IO 

main = do 
    hSetBuffering stdout NoBuffering 

    {- These are the internal data structures. They should be an implementation 
     detail and you should never expose these references to the 
     "business logic" part of the application. -} 
    -- I use nRef to keep track of creating fresh Ints (which identify users) 
    nRef <- newTVarIO 0  :: IO (TVar Int) 
    {- hMap associates every user (i.e. Int) with a counter 

     Notice how I've "striped" the hash map by storing STM references to the 
     values instead of storing the values directly. This means that I only 
     actually write the hashmap when adding or removing users, which reduces 
     contention for the hash map. 

     Since each user gets their own unique STM reference for their counter, 
     modifying counters does not cause contention with other counters or 
     contention with the hash map. -} 
    hMap <- newTVarIO H.empty :: IO (TVar (H.HashMap Int (TVar Int))) 

    {- The following code makes heavy use of Haskell's pure closures. Each 
     'let' binding closes over its current environment, which is safe since 
     Haskell is pure. -} 

    let {- 'getCounters' is the only server-facing command in our STM API. The 
      only permitted operation is retrieving the current set of user 
      counters. 

      'getCounters' closes over the 'hMap' reference currently in scope so 
      that the server never needs to be aware about our internal 
      implementation. -} 
     getCounters :: STM [Int] 
     getCounters = do 
      refs <- fmap H.elems (readTVar hMap) 
      mapM readTVar refs 

     {- 'init' is the only client-facing command in our STM API. It 
      initializes the client's entry in the hash map and returns two 
      commands: the first command is what the client calls to 'increment' 
      their counter and the second command is what the client calls to log 
      off and delete 
      'delete' command. 

      Notice that those two returned commands each close over the client's 
      unique STM reference so the client never needs to be aware of how 
      exactly 'init' is implemented under the hood. -} 
     init :: STM (STM(), STM()) 
     init = do 
      n <- readTVar nRef 
      writeTVar nRef $! n + 1 

      ref <- newTVar 0 
      modifyTVar' hMap (H.insert n ref) 

      let incrementRef :: STM() 
       incrementRef = do 
        mRef <- fmap (H.lookup n) (readTVar hMap) 
        forM_ mRef $ \ref -> modifyTVar' ref (+ 1) 

       deleteRef :: STM() 
       deleteRef = modifyTVar' hMap (H.delete n) 

      return (incrementRef, deleteRef) 

    {- Now for the actual program logic. Everything past this point only uses 
     the approved STM API (i.e. 'getCounters' and 'init'). If I wanted I 
     could factor the above approved STM API into a separate module to enforce 
     the encapsulation boundary, but I am lazy. -} 

    {- Fork a thread which polls the current state of the counters and displays 
     it to the console. There is a way to implement this without polling but 
     this gets the job done for now. 

     Most of what it is doing is just some simple tricks to reuse the same 
     console line instead of outputting a stream of lines. Otherwise it 
     would be just: 

     forkIO $ forever $ do 
      ns <- atomically getCounters 
      print ns 
    -} 
    forkIO $ (`evalStateT` 0) $ forever $ do 
     del <- get 
     lift $ do 
      putStr (replicate del '\b') 
      putStr (replicate del ' ') 
      putStr (replicate del '\b') 
     ns <- lift $ atomically getCounters 
     let str = show ns 
     lift $ putStr str 
     put $! length str 
     lift $ threadDelay 10000 

    {- Fork a thread for each incoming connection, which listens to the client's 
     commands and translates them into 'STM' actions -} 
    serve HostAny "8080" $ \(socket, _) -> do 
     (increment, delete) <- atomically init 

     {- Right now, just do the dumb thing and convert all keypresses into 
      increment commands, with the exception of the 'q' key, which will 
      quit -} 
     let handler :: (Proxy p) =>() -> Consumer p Char IO() 
      handler() = runIdentityP loop 
       where 
       loop = do 
        c <- request() 
        unless (c == 'q') $ do 
         lift $ atomically increment 
         loop 

     {- This uses my 'pipes' library. It basically is a high-level way to 
      say: 

      * Read binary packets from the socket no bigger than 4096 bytes 

      * Get the first character from each packet and discard the rest 

      * Handle the character using the above 'handler' function -} 
     runProxy $ socketReadS 4096 socket >-> mapD B.head >-> handler 

     {- The above pipeline finishes either when the socket closes or 
      'handler' stops looping because it received a 'q'. Either case means 
      that the client is done so we log them out using 'delete'. -} 
     atomically delete 

Tiếp theo là khách hàng, mà chỉ đơn giản mở ra một kết nối và chuyển tiếp tất cả các phím bấm như các gói dữ liệu duy nhất:

import Control.Monad 
import Control.Proxy 
import Control.Proxy.Safe 
import Control.Proxy.TCP.Safe 
import Data.ByteString.Char8 (pack) 
import System.IO 

main = do 
    hSetBuffering stdin NoBuffering 
    hSetEcho  stdin False 

    {- Again, this uses my 'pipes' library. It basically says: 

     * Read characters from the console using 'commands' 

     * Pack them into a binary format 

     * send them to a server running at 127.0.0.1:8080 

     This finishes looping when the user types a 'q' or the connection is 
     closed for whatever reason. 
    -} 
    runSafeIO $ runProxy $ runEitherK $ 
     try . commands 
    >-> mapD (\c -> pack [c]) 
    >-> connectWriteD Nothing "127.0.0.1" "8080" 

commands :: (Proxy p) =>() -> Producer p Char IO() 
commands() = runIdentityP loop 
    where 
    loop = do 
     c <- lift getChar 
     respond c 
     unless (c == 'q') loop 

Nó khá đơn giản: commands tạo ra một dòng Char s, sau đó được chuyển đổi để ByteString s và sau đó được gửi như các gói tin đến máy chủ.

Nếu bạn chạy các máy chủ và một vài khách hàng và họ đã từng gõ một vài phím, máy chủ của bạn hiển thị sẽ ra một danh sách hiển thị có bao nhiêu phím mỗi khách hàng gõ:

[1,6,4] 

... và nếu một số khách hàng ngắt kết nối họ sẽ bị xoá khỏi danh sách:

[1,4] 

Lưu ý rằng các thành phần pipes của các ví dụ này sẽ đơn giản hóa đáng kể trong việc phát hành pipes-4.0.0 sắp tới, nhưng hiện tại pipes hệ sinh thái vẫn g ets công việc thực hiện như là.

+0

Giải pháp tuyệt vời, tôi chắc chắn sẽ nghĩ về nó;) – Kr0e

+0

Chỉ cần cho sự hiểu biết của tôi: STM được coi là tinh khiết? Tôi đoán nó không phải vì nó là tất cả về sự biến đổi mà không sử dụng một cơ chế khóa, phải không? – Kr0e

+2

@ Kr0e Right. Hãy nghĩ về STM như là các tham chiếu bộ nhớ có thể thay đổi được, có thể kết hợp được. –

3

Trước tiên, tôi chắc chắn khuyên bạn nên sử dụng loại dữ liệu cụ thể của riêng bạn để biểu thị các lệnh. Khi sử dụng (String, Maybe (Chan [a]), Maybe a) một khách hàng bị lỗi có thể làm hỏng diễn viên của bạn chỉ bằng cách gửi một lệnh không xác định hoặc bằng cách gửi ("add", Nothing, Nothing), v.v.Tôi muốn đề xuất một cái gì đó như

data Command a = Add a | Remove a | Get (Chan [a]) 

Sau đó, bạn có thể khớp mẫu trên các lệnh trong storage một cách tiết kiệm.

Diễn viên có lợi thế của họ, nhưng tôi cũng cảm thấy rằng họ có một số nhược điểm. Ví dụ, nhận được một câu trả lời từ một diễn viên yêu cầu gửi một lệnh và sau đó chờ trả lời. Và khách hàng không thể hoàn toàn chắc chắn rằng nó nhận được câu trả lời và câu trả lời sẽ thuộc về một số loại cụ thể - bạn không thể nói tôi chỉ muốn câu trả lời thuộc loại này (và số lượng câu trả lời) cho lệnh cụ thể này.

Ví dụ, tôi sẽ đưa ra một giải pháp STM đơn giản. Sẽ tốt hơn nếu sử dụng bảng băm hoặc tập hợp (cây cân bằng), nhưng kể từ Handle không thực hiện Ord cũng không Hashable, chúng tôi không thể sử dụng các cấu trúc dữ liệu này, vì vậy tôi sẽ tiếp tục sử dụng danh sách.

module ThreadSet (
    TSet, add, remove, get 
) where 

import Control.Monad 
import Control.Monad.STM 
import Control.Concurrent.STM.TVar 
import Data.List (delete) 

newtype TSet a = TSet (TVar [a]) 

add :: (Eq a) => a -> TSet a -> STM() 
add x (TSet v) = readTVar v >>= writeTVar v . (x :) 

remove :: (Eq a) => a -> TSet a -> STM() 
remove x (TSet v) = readTVar v >>= writeTVar v . delete x 

get :: (Eq a) => TSet a -> STM [a] 
get (TSet v) = readTVar v 

Mô-đun này triển khai tập hợp các yếu tố tùy ý STM. Bạn có thể có nhiều bộ như vậy và sử dụng chúng cùng nhau trong một giao dịch STM duy nhất thành công hoặc không thành công cùng một lúc. Ví dụ:

-- | Ensures that there is exactly one element `x` in the set. 
add1 :: (Eq a) => a -> TSet a -> STM() 
add1 x v = remove x v >> add x v 

Điều này sẽ khó với diễn viên, bạn không thể thêm nó làm mệnh lệnh khác cho diễn viên, bạn không thể soạn thảo hành động hiện tại và vẫn có nguyên tử.

Cập nhật: Có một thú vị article giải thích lý do tại sao nhà thiết kế Clojure chọn không sử dụng diễn viên. Ví dụ, bằng cách sử dụng diễn viên, ngay cả khi bạn có nhiều lần đọc và chỉ viết rất ít vào một cấu trúc có thể thay đổi, chúng được sắp xếp theo thứ tự, điều này có thể ảnh hưởng rất lớn đến hiệu suất.

+0

Vâng, tuần tự hóa/deserializing chi phí rất nhiều, điều này là đúng sự thật. CloudHaskell không có cùng "serialization-overhead", chúng gọi nó là một tính năng. Nhưng gần đây họ đã thêm một chức năng gửi không an toàn để gửi thông điệp mà không có ser./deser. đó là một thứ tự cường độ nhanh hơn. Về mặt lý thuyết, việc truyền thông điệp phải rẻ như một lời gọi hàm đơn giản để làm cho mẫu diễn viên trở thành một sự thay thế thực sự, tất nhiên, không phải là trường hợp mà trong Erlang nó là như thế. Tôi nghĩ STM là một tính năng thực sự tuyệt vời, có thể sử dụng cả hai kỹ thuật là con đường để đi, vì STM thực sự là cấp thấp so với mô hình diễn viên. – Kr0e

Các vấn đề liên quan