Tôi đang cố gắng tìm ra cách tốt nhất để sử dụng các tác nhân để tiêu thụ các mục từ Hàng đợi Tin nhắn (SQS của Amazon). Ngay bây giờ tôi có một hàm (process-queue-item) lấy một mục từ hàng đợi và xử lý nó.Các tác nhân clojure tiêu thụ từ hàng đợi
Tôi muốn xử lý đồng thời các mục này, nhưng tôi không thể quấn đầu xung quanh cách kiểm soát các đại lý. Về cơ bản, tôi muốn giữ tất cả các đại lý bận rộn càng nhiều càng tốt mà không cần kéo nhiều vật phẩm từ Hàng đợi và phát triển một backlog (tôi sẽ có điều này chạy trên một vài máy móc, vì vậy các mục cần được để lại trong hàng đợi cho đến khi chúng thực sự cần thiết).
Có ai có thể cho tôi một số gợi ý về cải thiện việc triển khai của tôi không?
(def active-agents (ref 0))
(defn process-queue-item [_]
(dosync (alter active-agents inc))
;retrieve item from Message Queue (Amazon SQS) and process
(dosync (alter active-agents dec)))
(defn -main []
(def agents (for [x (range 20)] (agent x)))
(loop [loop-count 0]
(if (< @active-agents 20)
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent))
;should skip this agent until later if it is still busy processing (not sure how)
(send-off agent process-queue-item)))
;(apply await-for (* 10 1000) agents)
(Thread/sleep 10000)
(logging/info (str "ACTIVE AGENTS " @active-agents))
(if (> 10 loop-count)
(do (logging/info (str "done, let's cleanup " count))
(doseq [agent agents]
(if (agent-errors agent)
(clear-agent-errors agent)))
(apply await agents)
(shutdown-agents))
(recur (inc count)))))
Có một số cách mà bạn có thể đối xử với Queue nhắn như một seq và sau đó chỉ cần sử dụng pmap để có được sự song song? –
@Alex Stoddard: Trong trường hợp của tôi, process-queue-item thực sự chặn trên mạng IO, vì vậy tôi không nghĩ rằng pmap là sự lựa chọn đúng vì nó chỉ sử dụng nhiều chủ đề như máy có lõi. – erikcw
@erikw: Chắc chắn, nhưng đó chỉ là một chi tiết triển khai pmap (threads = #cores + 2). Không có lý do bạn không thể viết một phiên bản của pmap với một số tham số của chủ đề.Xem dòng đầu tiên của nguồn pmap: (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) –