2016-06-21 18 views
9

Tôi đã viết một số mã core.async trong Clojure và khi tôi chạy nó, nó tiêu thụ tất cả bộ nhớ có sẵn và không thành công với lỗi. Có vẻ như sử dụng mapcat trong đường ống core.async sẽ làm giảm áp lực ngược. (Đó là bất hạnh vì những lý do ngoài phạm vi của câu hỏi này.)Lỗ rò bộ nhớ ở đâu khi mapcat ngắt áp suất trong core.async?

Dưới đây là một số mã đó chứng tỏ vấn đề bằng cách đếm :x s trong và ngoài của một mapcat ing dò:

(ns mapcat.core 
    (:require [clojure.core.async :as async])) 

(defn test-backpressure [n length] 
    (let [message (repeat length :x) 
     input (async/chan) 
     transform (async/chan 1 (mapcat seq)) 
     output (async/chan) 
     sent (atom 0)] 
    (async/pipe input transform) 
    (async/pipe transform output) 
    (async/go 
     (dotimes [_ n] 
     (async/>! input message) 
     (swap! sent inc)) 
     (async/close! input)) 
    (async/go-loop [x 0] 
     (when (= 0 (mod x (/ (* n length) 10))) 
     (println "in:" (* @sent length) "out:" x)) 
     (when-let [_ (async/<! output)] 
     (recur (inc x)))))) 

=> (test-backpressure 1000 10) 
in: 10 out: 0 
in: 2680 out: 1000 
in: 7410 out: 2000 
in: 10000 out: 3000 ; Where are the other 7000 characters? 
in: 10000 out: 4000 
in: 10000 out: 5000 
in: 10000 out: 6000 
in: 10000 out: 7000 
in: 10000 out: 8000 
in: 10000 out: 9000 
in: 10000 out: 10000 

Các chủng tộc sản xuất xa trước người tiêu dùng.

Dường như tôi không phải là người đầu tiên khám phá điều này. Nhưng lời giải thích cho here dường như không bao gồm nó. (Mặc dù nó cung cấp một cách giải quyết thỏa đáng.) Về mặt khái niệm, tôi cho rằng nhà sản xuất sẽ đi trước, nhưng chỉ bởi độ dài của một vài thông điệp có thể được đệm trong các kênh.

Câu hỏi của tôi là, tất cả các thư khác ở đâu? Bởi dòng đầu ra thứ tư 7000 :x s không được tính.

+0

Trong liên kết mà bạn đưa ra, Alex đã đề cập đến đây là tình huống khó xử giữa kết quả sai và vi phạm giới hạn bộ đệm. Rõ ràng [ASYNC-124] (http://dev.clojure.org/jira/browse/ASYNC-124) thích câu trả lời đúng – Davyzhu

+0

Vì vậy, về câu hỏi của bạn, các tin nhắn khác có thể được giữ trong 'takers' được tham chiếu [tại đây ] (https://github.com/clojure/core.async/blob/master/src/main/clojure/clojure/core/async/impl/channels.clj#L86). Không chắc chắn về nó vì vậy chúng ta hãy chờ đợi một câu trả lời tự tin hơn. – Davyzhu

Trả lời

2

Có hai cách diễn giải có thể có của câu hỏi "Mất bộ nhớ ở đâu?"

Thứ nhất, dữ liệu được giữ ở đâu? Câu trả lời dường như nằm trong bộ đệm kênh ngay phía dưới của biến đổi mở rộng.

Các kênh theo mặc định sử dụng FixedBuffer (clojure.core.async.impl.buffers/FixedBuffer) có thể cho biết nếu kênh đầy nhưng không phản đối quá mức.

Thứ hai, đoạn mã nào khiến bộ đệm bị quá tải? Điều này (đúng với tôi nếu tôi sai) có vẻ như ở số the take! method của ManyToManyChannel (clojure.core.async.impl.channels/ManyToManyChannel) trong đó first call to add! trên bộ đệm xảy ra trước khi bất kỳ calls to full? nào diễn ra.

Dường như take! giả định rằng nó có thể thêm ít nhất một mục vào bộ đệm cho mọi mục mà nó xóa. Trong trường hợp các đầu dò mở rộng chạy dài như mapcat, điều này không phải lúc nào cũng là một giả định an toàn.

Bằng cách thay đổi this line thành (when (and (.hasNext iter) (not (impl/full? buf))) trong bản sao cục bộ của core.async Tôi có thể làm cho mã trong câu hỏi hoạt động như mong đợi. (NB sự hiểu biết của tôi về core.async là không đủ cho tôi để đảm bảo rằng đây là một giải pháp mạnh mẽ cho trường hợp bạn sử dụng.)

CẬP NHẬT 2016/09/17: hiện nay là một vấn đề cho việc này: http://dev.clojure.org/jira/browse/ASYNC-178