2010-04-08 28 views
26

Tôi đang cố gắng tìm ra cách tốt nhất để sử dụng các tác nhân để tiêu thụ các mục từ Hàng đợi Tin nhắn (SQS của Amazon). Ngay bây giờ tôi có một hàm (process-queue-item) lấy một mục từ hàng đợi và xử lý nó.Các tác nhân clojure tiêu thụ từ hàng đợi

Tôi muốn xử lý đồng thời các mục này, nhưng tôi không thể quấn đầu xung quanh cách kiểm soát các đại lý. Về cơ bản, tôi muốn giữ tất cả các đại lý bận rộn càng nhiều càng tốt mà không cần kéo nhiều vật phẩm từ Hàng đợi và phát triển một backlog (tôi sẽ có điều này chạy trên một vài máy móc, vì vậy các mục cần được để lại trong hàng đợi cho đến khi chúng thực sự cần thiết).

Có ai có thể cho tôi một số gợi ý về cải thiện việc triển khai của tôi không?

(def active-agents (ref 0)) 

(defn process-queue-item [_] 
    (dosync (alter active-agents inc)) 
    ;retrieve item from Message Queue (Amazon SQS) and process 
    (dosync (alter active-agents dec))) 

(defn -main [] 
    (def agents (for [x (range 20)] (agent x))) 

    (loop [loop-count 0] 

    (if (< @active-agents 20) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent)) 
     ;should skip this agent until later if it is still busy processing (not sure how) 
     (send-off agent process-queue-item))) 

    ;(apply await-for (* 10 1000) agents) 
    (Thread/sleep 10000) 
    (logging/info (str "ACTIVE AGENTS " @active-agents)) 
    (if (> 10 loop-count) 
     (do (logging/info (str "done, let's cleanup " count)) 
     (doseq [agent agents] 
     (if (agent-errors agent) 
      (clear-agent-errors agent))) 
     (apply await agents) 
     (shutdown-agents)) 
     (recur (inc count))))) 
+0

Có một số cách mà bạn có thể đối xử với Queue nhắn như một seq và sau đó chỉ cần sử dụng pmap để có được sự song song? –

+0

@Alex Stoddard: Trong trường hợp của tôi, process-queue-item thực sự chặn trên mạng IO, vì vậy tôi không nghĩ rằng pmap là sự lựa chọn đúng vì nó chỉ sử dụng nhiều chủ đề như máy có lõi. – erikcw

+0

@erikw: Chắc chắn, nhưng đó chỉ là một chi tiết triển khai pmap (threads = #cores + 2). Không có lý do bạn không thể viết một phiên bản của pmap với một số tham số của chủ đề.Xem dòng đầu tiên của nguồn pmap: (let [n (+ 2 (.. Runtime getRuntime availableProcessors)) –

Trả lời

6

Điều bạn đang yêu cầu là cách để tiếp tục giao nhiệm vụ nhưng với một số giới hạn trên. Một cách tiếp cận đơn giản là sử dụng semaphore để điều chỉnh giới hạn. Đây là cách tôi sẽ tiếp cận nó:

(let [limit (.availableProcessors (Runtime/getRuntime)) 
     ; note: you might choose limit 20 based upon your problem description 
     sem (java.util.concurrent.Semaphore. limit)] 
    (defn submit-future-call 
    "Takes a function of no args and yields a future object that will 
    invoke the function in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [#^Callable task] 
    ; take a slot (or block until a slot is free) 
    (.acquire sem) 
    (try 
     ; create a future that will free a slot on completion 
     (future (try (task) (finally (.release sem)))) 
     (catch java.util.concurrent.RejectedExecutionException e 
     ; no task was actually submitted 
     (.release sem) 
     (throw e))))) 

(defmacro submit-future 
    "Takes a body of expressions and yields a future object that will 
    invoke the body in another thread, and will cache the result and 
    return it on all subsequent calls to deref/@. If the computation has 
    not yet finished, calls to deref/@ will block. 
    If n futures have already been submitted, then submit-future blocks 
    until the completion of another future, where n is the number of 
    available processors." 
    [& body] `(submit-future-call (fn [] [email protected]))) 

#_(example 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    #<[email protected]: :pending> 
    user=> (submit-future (reduce + (range 100000000))) 
    ;; blocks at this point for a 2 processor PC until the previous 
    ;; two futures complete 
    #<[email protected]: :pending> 
    ;; then submits the job 

Với điều đó tại chỗ bây giờ bạn chỉ cần phối hợp cách tự thực hiện các nhiệm vụ. Có vẻ như bạn đã có cơ chế để thực hiện điều đó. Loop (submit-future (process-queue-item))

4

Có lẽ bạn có thể sử dụng chức năng seque? Trích dẫn (doc seque):

clojure.core/seque 
([s] [n-or-q s]) 
    Creates a queued seq on another (presumably lazy) seq s. The queued 
    seq will produce a concrete seq in the background, and can get up to 
    n items ahead of the consumer. n-or-q can be an integer n buffer 
    size, or an instance of java.util.concurrent BlockingQueue. Note 
    that reading from a seque can block if the reader gets ahead of the 
    producer. 

Điều tôi lưu ý là một chuỗi lười nhận các mục hàng qua mạng; bạn muốn bọc nó trong seque, đặt trong Ref và có các nhân viên của công nhân tiêu thụ các mặt hàng của số seque này. seque trả về một cái gì đó trông giống như một seq thường xuyên từ quan điểm của mã của bạn, với ma thuật hàng đợi xảy ra một cách minh bạch. Lưu ý rằng nếu trình tự bạn đặt bên trong được chunked, sau đó nó vẫn sẽ bị buộc một đoạn tại một thời điểm. Cũng lưu ý rằng các cuộc gọi ban đầu để seque chính nó dường như khối cho đến khi một mục ban đầu hoặc hai là thu được (hoặc một đoạn, như trường hợp có thể được; Tôi nghĩ rằng đó là nhiều hơn để làm với cách lười biếng trình tự làm việc hơn seque chính nó, mặc dù).

Một phác thảo của mã (một thực sự sơ sài một, không được kiểm tra ở tất cả):

(defn get-queue-items-seq [] 
    (lazy-seq 
    (cons (get-queue-item) 
     (get-queue-items-seq)))) 

(def task-source (ref (seque (get-queue-items-seq)))) 

(defn do-stuff [] 
    (let [worker (agent nil)] 
    (if-let [result 
      (dosync 
       (when-let [task (first @task-source)] 
       (send worker (fn [_] (do-stuff-with task)))))] 
     (do (await worker) 
      ;; maybe do something with worker's state 
      (do-stuff))))) ;; continue working 

(defn do-lots-of-stuff [] 
    (let [fs (doall (repeatedly 20 #(future (do-stuff))))] 
    fs))) 

Trên thực tế bạn có thể muốn một nhà sản xuất phức tạp hơn của seq mục hàng đợi để bạn có thể yêu cầu nó ngừng sản xuất các mặt hàng mới (một điều cần thiết nếu toàn bộ điều có thể được tắt một cách duyên dáng; tương lai sẽ chết khi nguồn nhiệm vụ cạn kiệt, sử dụng future-done? để xem liệu chúng đã làm như thế chưa). Và đó chỉ là một cái gì đó tôi có thể nhìn thấy trong nháy mắt đầu tiên ... Tôi chắc chắn có nhiều thứ để đánh bóng ở đây. Tôi nghĩ rằng cách tiếp cận chung sẽ làm việc, mặc dù.

+0

Tôi đã thêm bản sửa lỗi vào dòng cuối cùng nhưng một của bản phác thảo mã theo đó tương lai sẽ thực sự được tạo. (Loại quan trọng đối với toàn bộ ý tưởng, thực sự ... :-)) –

+0

Tôi đang cố gắng hiểu mã này. Tại sao nhiệm vụ nguồn là ref? Bạn dường như không thay đổi nó bất cứ lúc nào cả. –

+0

@Siddhartha Reddy: Thoạt nhìn, tôi muốn nói rằng đây là lý do tại sao tôi gọi mã "* thực sự * sơ sài". ;-) Tôi đoán nó sẽ cần một '(thay đổi phần còn lại nguồn nhiệm vụ)' (hoặc 'next') trong' khi-let' bên trong 'dosync' là hữu ích. Trên thực tế, suy nghĩ về điều này một lần nữa, tôi tự hỏi nếu sử dụng 'seque' đây là một ý tưởng tốt sau khi tất cả; có vẻ như với tôi bây giờ nó làm tăng số lượng các mặt hàng từ hàng đợi sẽ bị mất trong trường hợp có sự cố của máy địa phương (vì 'seque' kéo các vật phẩm vào trước khi chúng được yêu cầu bởi các công nhân). Sau đó, một lần nữa, trong một số kịch bản nó có thể là hiệu suất tốt-khôn ngoan; đó là –

23
(let [switch (atom true) ; a switch to stop workers 
     workers (doall 
       (repeatedly 20 ; 20 workers pulling and processing items from SQS 
        #(future (while @switch 
          (retrieve item from Amazon SQS and process)))))] 
    (Thread/sleep 100000) ; arbitrary rule to decide when to stop ;-) 
    (reset! switch false) ; stop ! 
    (doseq [worker workers] @worker)) ; waiting for all workers to be done 
+2

Điều này không hoạt động với 1,4 nữa ('tương lai' và' tương lai-gọi' không trả về 'IFn', mà' liên tục' yêu cầu). Tuy nhiên, bạn có thể dễ dàng bọc một tương lai trong một hàm bằng cách thêm tiền tố '(tương lai' với' # ', –

+3

@AlexB bắt tốt, nó thậm chí không phải là một vấn đề 1.4: # nên có ở đó. cảm ơn! – cgrand

0

Không chắc thế nào thành ngữ này là, như tôi vẫn là một newbie với ngôn ngữ, nhưng các giải pháp sau đây làm việc cho tôi:

(let [number-of-messages-per-time 2 
     await-timeout 1000] 
    (doseq [p-messages (partition number-of-messages-per-time messages)] 
    (let [agents (map agent p-messages)] 
     (doseq [a agents] (send-off a process)) 
     (apply await-for await-timeout agents) 
     (map deref agents)))) 
Các vấn đề liên quan