2012-08-29 37 views
9

Tôi đang sử dụng ứng dụng Clojure để truy cập dữ liệu từ API web. Tôi sẽ thực hiện rất nhiều yêu cầu và nhiều yêu cầu sẽ dẫn đến nhiều yêu cầu hơn, vì vậy tôi muốn giữ URL yêu cầu trong hàng đợi sẽ để lại 60 giây giữa các lần tải xuống tiếp theo.Hàng đợi công việc trong Clojure

Sau this blog post tôi đặt này với nhau:

(def queue-delay (* 1000 60)) ; one minute 

(defn offer! 
    [q x] 
    (.offerLast q x) 
    q) 

(defn take! 
    [q] 
    (.takeFirst q)) 

(def my-queue (java.util.concurrent.LinkedBlockingDeque.)) 

(defn- process-queue-item 
    [item] 
    (println ">> " item) ; this would be replaced by downloading `item` 
    (Thread/sleep queue-delay)) 

Nếu tôi bao gồm (future (process-queue-item (take! my-queue))) trong mã của tôi ở đâu đó sau đó tại REPL tôi có thể (offer! my-queue "something") và tôi xem ">> cái gì đó" in ngay lập tức. Càng xa càng tốt! Nhưng tôi cần hàng đợi để tồn tại trong suốt thời gian chương trình của tôi hoạt động. Cuộc gọi (future ...) Tôi vừa đề cập đến công việc để kéo một mục ra khỏi hàng đợi, khi có sẵn, nhưng tôi muốn một thứ sẽ liên tục xem hàng đợi và gọi process-queue-item bất cứ khi nào có sẵn.

Ngoài ra, trái với tình yêu Clojure thông thường, tôi muốn đảm bảo rằng chỉ có một yêu cầu được thực hiện tại một thời điểm và chương trình của tôi chờ 60 giây để thực hiện từng yêu cầu tiếp theo.

Tôi nghĩ rằng this Stack Overflow question là có liên quan, nhưng tôi không chắc chắn cách điều chỉnh nó để thực hiện những gì tôi muốn. Làm cách nào để thăm dò ý kiến ​​hàng đợi của tôi liên tục và đảm bảo rằng chỉ có một yêu cầu đang được chạy cùng một lúc?

+0

Tại sao bạn muốn thăm dò ý kiến ​​liên tục nhưng chỉ gửi 60 giây một lần? Cuộc thăm dò ý kiến ​​chỉ một lần mỗi 60 giây sẽ đạt được điều tương tự? – mamboking

+0

@mamboking Hầu như, có. Nhược điểm duy nhất của phương pháp đó sẽ là thêm mục đầu tiên vào hàng đợi: nếu nó mất chương trình năm giây để tìm ra URL yêu cầu đầu tiên sẽ là gì thì nó sẽ chỉ ở đó trong 55 giây cho đến khi hàng đợi được kiểm tra. Tuy nhiên, chương trình sẽ hoạt động khá lâu vì vậy tôi đoán đó không phải là vấn đề quá nhiều. – bdesham

+0

bạn có đang tránh một công cụ lập lịch biểu tác vụ không? Ví dụ: https://github.com/zcaudate/cronj (cũng có danh sách các thư viện khác trong readme của repo đó) – georgek

Trả lời

0

Tôi đã kết thúc việc chuyển thư viện nhỏ của riêng mình, mà tôi gọi là simple-queue. Bạn có thể đọc toàn bộ tài liệu trên GitHub, nhưng đây là toàn bộ nguồn. Tôi sẽ không cập nhật câu trả lời này, vì vậy nếu bạn muốn sử dụng thư viện này, hãy lấy nguồn từ GitHub.

(ns com.github.bdesham.simple-queue) 

(defn new-queue 
    "Creates a new queue. Each trigger from the timer will cause the function f 
    to be invoked with the next item from the queue. The queue begins processing 
    immediately, which in practice means that the first item to be added to the 
    queue is processed immediately." 
    [f & opts] 
    (let [options (into {:delaytime 1} 
         (select-keys (apply hash-map opts) [:delaytime])), 
     delaytime (:delaytime options), 
     queue {:queue (java.util.concurrent.LinkedBlockingDeque.)}, 
     task (proxy [java.util.TimerTask] [] 
       (run [] 
       (let [item (.takeFirst (:queue queue)), 
         value (:value item), 
         prom (:promise item)] 
        (if prom 
        (deliver prom (f value)) 
        (f value))))), 
     timer (java.util.Timer.)] 
    (.schedule timer task 0 (int (* 1000 delaytime))) 
    (assoc queue :timer timer))) 

(defn cancel 
    "Permanently stops execution of the queue. If a task is already executing 
    then it proceeds unharmed." 
    [queue] 
    (.cancel (:timer queue))) 

(defn process 
    "Adds an item to the queue, blocking until it has been processed. Returns 
    (f item)." 
    [queue item] 
    (let [prom (promise)] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise prom}) 
    @prom)) 

(defn add 
    "Adds an item to the queue and returns immediately. The value of (f item) is 
    discarded, so presumably f has side effects if you're using this." 
    [queue item] 
    (.offerLast (:queue queue) 
       {:value item, 
       :promise nil})) 

Một ví dụ của việc sử dụng hàng đợi này để trả lại giá trị:

(def url-queue (q/new-queue slurp :delaytime 30)) 
(def github (q/process url-queue "https://github.com")) 
(def google (q/process url-queue "http://www.google.com")) 

Các cuộc gọi đến q/process sẽ chặn do đó sẽ có một sự chậm trễ 30 giây giữa hai def báo cáo.

Một ví dụ của việc sử dụng hàng đợi này hoàn toàn là tác dụng phụ:

(defn cache-url 
    [{url :url, filename :filename}] 
    (spit (java.io.File. filename) 
     (slurp url))) 

(def url-queue (q/new-queue cache-url :delaytime 30)) 
(q/add url-queue {:url "https://github.com", 
        :filename "github.html"}) ; returns immediately 
(q/add url-queue {:url "https://google.com", 
        :filename "google.html"}) ; returns immediately 

Bây giờ các cuộc gọi đến q/add trở lại ngay lập tức.

2

Đây là đoạn mã từ a project I did for fun. Nó không hoàn hảo, nhưng có thể cung cấp cho bạn một ý tưởng về cách tôi nhận được xung quanh "chờ đợi 55 giây cho mục đầu tiên" vấn đề. Về cơ bản, nó xoay quanh các lời hứa, sử dụng tương lai để xử lý mọi thứ ngay lập tức hoặc cho đến khi một lời hứa "trở thành" có sẵn.

(defn ^:private process 
    [queues] 
    (loop [[q & qs :as q+qs] queues p (atom true)] 
    (when-not (Thread/interrupted) 
     (if (or 
      (< (count (:promises @work-manager)) (:max-workers @work-manager)) 
      @p) ; blocks until a worker is available 
     (if-let [job (dequeue q)] 
      (let [f (future-call #(process-job job))] 
      (recur queues (request-promise-from-work-manager))) 
      (do 
      (Thread/sleep 5000) 
      (recur (if (nil? qs) queues qs) p))) 
     (recur q+qs (request-promise-from-work-manager)))))) 

Có thể bạn có thể làm điều gì đó tương tự? Mã không phải là tuyệt vời, và có lẽ có thể được viết lại để sử dụng lazy-seq, nhưng đó chỉ là một bài tập mà tôi chưa đạt được!

0

Đây là hoàn toàn có thể điên rồ nhưng bạn luôn có thể sử dụng một chức năng như thế này để tạo ra một chuỗi lười biếng chậm xuống:

(defn slow-seq [delay-ms coll] 
    "Creates a lazy sequence with delays between each element" 
    (lazy-seq 
    (if-let [s (seq coll)] 
     (do 
      (Thread/sleep delay-ms) 
      (cons (first s) 
       (slow-seq delay-ms (rest s))))))) 

này về cơ bản sẽ đảm bảo một sự chậm trễ giữa mỗi lời gọi hàm.

Bạn có thể sử dụng nó với một cái gì đó như sau, cung cấp một sự chậm trễ trong mili giây:

(doseq [i (slow-seq 500 (range 10))] 
    (println (rand-int 10)) 

Hoặc cách khác bạn có thể đặt cuộc gọi chức năng của bạn bên trong chuỗi với một cái gì đó như:

(take 10 (slow-seq 500 (repeatedly #(rand-int 10)))) 

Rõ ràng , trong cả hai bên trên, bạn có thể thay thế (rand-int 10) bằng bất kỳ mã nào bạn đang sử dụng để thực hiện/kích hoạt tải xuống.

+0

Nếu tôi đọc quyền này, tất cả các thành phần của 'coll' sẽ phải được biết trước khi bạn chạy' slow-seq', đúng không? Tôi thích thứ gì đó sẽ cho phép bạn thêm động các mục mà không gặp sự cố. Cụ thể, nếu kết quả của một cuộc gọi API là tôi cần thực hiện cuộc gọi API khác, chức năng này có cho phép lệnh thứ hai được đặt trên hàng đợi không? – bdesham

Các vấn đề liên quan