2016-12-23 25 views
16

Tôi đã viết thư viện có tên amqp-worker cung cấp chức năng gọi là worker để thăm dò hàng đợi tin nhắn (như RabbitMQ) cho tin nhắn, gọi trình xử lý khi tìm thấy tin nhắn. Sau đó, nó quay trở lại để bỏ phiếu.Rò rỉ bộ nhớ trong chức năng IO đệ quy - PAP

Đó là bộ nhớ bị rò rỉ. Tôi đã lược tả nó và biểu đồ cho biết PAP (ứng dụng chức năng một phần) là thủ phạm. Sự rò rỉ trong mã của tôi ở đâu? Làm cách nào để tránh rò rỉ khi lặp lại trong IO với forever?

enter image description here

Dưới đây là một số chức năng có liên quan. The full source is here.

Example Program. Đây rò rỉ

main :: IO() 
main = do 
    -- connect 
    conn <- Worker.connect (fromURI "amqp://guest:[email protected]:5672") 

    -- initialize the queues 
    Worker.initQueue conn queue 
    Worker.initQueue conn results 

    -- publish a message 
    Worker.publish conn queue (TestMessage "hello world") 

    -- create a worker, the program loops here 
    Worker.worker def conn queue onError (onMessage conn) 

worker

worker :: (FromJSON a, MonadBaseControl IO m, MonadCatch m) => WorkerOptions -> Connection -> Queue key a -> (WorkerException SomeException -> m()) -> (Message a -> m()) -> m() 
worker opts conn queue onError action = 
    forever $ do 
    eres <- consumeNext (pollDelay opts) conn queue 
    case eres of 
     Error (ParseError reason bd) -> 
     onError (MessageParseError bd reason) 

     Parsed msg -> 
     catch 
      (action msg) 
      (onError . OtherException (body msg)) 
    liftBase $ threadDelay (loopDelay opts) 

consumeNext

consumeNext :: (FromJSON msg, MonadBaseControl IO m) => Microseconds -> Connection -> Queue key msg -> m (ConsumeResult msg) 
consumeNext pd conn queue = 
    poll pd $ consume conn queue 

poll

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a 
poll us action = do 
    ma <- action 
    case ma of 
     Just a -> return a 
     Nothing -> do 
     liftBase $ threadDelay us 
     poll us action 
+0

gì phiên bản ghc của bạn và cách bạn biên dịch? – jberryman

+1

Nó được thiết lập để lts-7.3 vì vậy đó là GHC 8.0.1. Tôi đang biên dịch với ngăn xếp cài đặt - hồ sơ. Nhưng tôi bị rò rỉ bộ nhớ với một cài đặt ngăn xếp bình thường. Sử dụng các tùy chọn ghc mặc định từ mẫu ngăn xếp: -treaded -rtsopts -with-rtsopts = -N –

+2

Ví dụ này rất xa tối thiểu - bạn đang nhập toàn bộ thư viện của mình ('Network.AMQP.Worker') trong chương trình ví dụ của bạn. Khi nó đứng, điều này là quá rộng. – user2407038

Trả lời

14

Dưới đây là một ví dụ rất đơn giản đó chứng tỏ vấn đề của bạn:

main :: IO() 
main = worker 

{-# NOINLINE worker #-} 
worker :: (Monad m) => m() 
worker = 
    let loop = poll >> loop 
    in loop 

poll :: (Monad m) => m a 
poll = return() >> poll 
If you remove the `NOINLINE`, or specialize `m` to 
`IO` (while compiling with `-O`), the leak goes away. 

Tôi đã viết một chi tiết blog post về việc tại sao chính xác mã này rò rỉ bộ nhớ. Tóm tắt nhanh chóng, như Reid chỉ ra trong câu trả lời của mình, rằng mã tạo và ghi nhớ một chuỗi các ứng dụng một phần của >> s.

Tôi cũng đã gửi ghc ticket về việc này.

3

Vụ rò rỉ bộ nhớ là trong poll. Sử dụng monad-loops, tôi đã thay đổi định nghĩa sau: Có vẻ như untilJust thực hiện điều tương tự như đệ quy của tôi, nhưng sửa lỗi bị rò rỉ.

Mọi người có thể nhận xét về lý do tại sao định nghĩa trước đây của tôi về poll bị rò rỉ bộ nhớ?

{-# LANGUAGE FlexibleContexts #-} 

module Network.AMQP.Worker.Poll where 

import Control.Concurrent (threadDelay) 
import Control.Monad.Trans.Control (MonadBaseControl) 
import Control.Monad.Base (liftBase) 
import Control.Monad.Loops (untilJust) 

poll :: (MonadBaseControl IO m) => Int -> m (Maybe a) -> m a 
poll us action = untilJust $ do 
    ma <- action 
    case ma of 
     Just a -> return $ Just a 
     Nothing -> do 
     liftBase $ threadDelay us 
     return Nothing 
4

Có lẽ một ví dụ dễ hiểu là này một

main :: IO() 
main = let c = count 0 
     in c >> c 

{-# NOINLINE count #-} 
count :: Monad m => Int -> m() 
count 1000000 = return() 
count n = return() >> count (n+1) 

Đánh giá f >> g cho hành động IO sản lượng một số loại đóng cửa mà có tài liệu tham khảo cho cả fg (đó là về cơ bản các thành phần của fg như các chức năng trên mã thông báo trạng thái). count 0 trả về một đoạn c sẽ đánh giá cấu trúc lớn của các bao đóng của biểu mẫu return() >> return() >> return() >> .... Khi chúng ta thực hiện c chúng ta xây dựng cấu trúc này, và vì chúng ta phải thực thi c lần thứ hai toàn bộ cấu trúc vẫn còn sống. Vì vậy, chương trình này bị rò rỉ bộ nhớ (bất kể cờ tối ưu hóa).

Khi count chuyên IO và tối ưu hóa được bật, GHC có nhiều thủ thuật để tránh tạo cấu trúc dữ liệu này; nhưng tất cả đều dựa vào việc biết rằng đơn nguyên là IO.

Quay trở lại bản gốc count :: Monad m => Int -> m(), chúng ta có thể cố gắng tránh việc xây dựng cấu trúc lớn này bằng cách thay đổi dòng cuối cùng để

count n = return() >>= (\_ -> count (n+1)) 

Bây giờ cuộc gọi đệ quy được giấu bên trong một lambda, vì vậy c chỉ là một cấu trúc nhỏ return() >>= (\_ -> BODY) . Điều này thực sự tránh rò rỉ không gian khi biên dịch mà không cần tối ưu hóa. Tuy nhiên khi tối ưu hóa được kích hoạt, GHC nổi ra count (n+1) ra khỏi cơ thể của lambda (vì nó không phụ thuộc vào đối số) sản xuất

count n = return() >>= (let body = count (n+1) in \_ -> body) 

và bây giờ c là một cấu trúc lớn một lần nữa ...

+0

Cách sử dụng 'NOINLINE' làm cho chương trình có thể so sánh với chương trình bị rò rỉ ban đầu như thế nào? – Michael

+0

GHC không nội tuyến hoặc chuyên biệt là trường hợp chung (khi hàm được định nghĩa trong một mô-đun khác, không nhỏ, vv) GHC biết nhiều thủ thuật và khi bạn giảm thiểu các thủ thuật này có thể khởi động. Sử dụng 'NOINLINE' dừng nhiều thủ thuật cho phép bạn giảm thiểu hơn nữa. –