2013-02-03 28 views
7

Hiện tại tôi đang cố gắng xây dựng một dịch vụ web bằng API RESTful xử lý một số tác vụ dài (công việc).Hàng đợi công việc không đồng bộ cho dịch vụ web ở Clojure

Ý tưởng là người dùng gửi một công việc bằng cách thực hiện một POST trả về một số URL để kiểm tra trạng thái công việc cũng chứa url cho kết quả. Khi công việc hoàn tất (nghĩa là một số giá trị được ghi vào cơ sở dữ liệu), URL kết quả sẽ trả lại thông tin thích hợp (thay vì không có kết quả) và url công việc sẽ cho biết trạng thái đã hoàn thành.

Thật không may các tính toán khá chuyên sâu nên chỉ có thể chạy một lần, do đó các công việc cần được xếp hàng đợi.

Trong giả một cái gì đó như thế này sẽ là cần thiết

(def job-queue (atom queue)) ;; some queue 
(def jobs (atom {})) 

(defn schedule-job [params] 
    ;; schedules the job into the queue and 
    ;; adds the job to a jobs map for checking status via GET 
    ;; note that the job should not be evaluated until popped from the queue 
) 

(POST "/analyze" [{params :params}] 
(schedulde-job params)) 

(GET "job/:id" [:d] 
(get @jobs id)) 

;; Some function that pops the next item from the queue 
;; and evaluates it when the previous item is complete 
;; Note: should not terminate when queue is empty! 

Tôi đã nhìn vào Lamina cho phép xử lý không đồng bộ nhưng nó dường như không phù hợp với nhu cầu của tôi.

Câu hỏi của tôi là cách làm giảm hàng đợi công việc và thực hiện tác vụ của nó sau khi đã hoàn thành công việc trước đó, mà không chấm dứt khi hàng đợi trống rỗng, tức là xử lý vĩnh viễn các công việc sắp tới.

+0

Tôi không quá giỏi clojure, vì vậy không có trợ giúp ở đó, chỉ cần lưu ý: tại sao trả lại 204? Tôi cảm thấy có thể tốt hơn là trả lại một thông điệp thực sự (chỉ 200) với một cơ thể nói rằng nó chưa được thực hiện hay như thế? – Nanne

+0

Bạn nói đúng, điều đó có thể thực sự tốt hơn vì tôi bỏ lỡ "Nếu khách hàng là tác nhân người dùng, KHÔNG NÊN thay đổi chế độ xem tài liệu của nó từ đó khiến yêu cầu được gửi" trong thông số HTTP, nếu không "Không có nội dung "Tôi đoán là thích hợp hơn. – JoelKuiper

+0

Tôi luôn thích nếu REST API không cố gắng chơi ở http. Như trong, nếu tôi yêu cầu một số dữ liệu (ví dụ như tài khoản của ai đó nhưng với một id sai), tôi là một fan hâm mộ của nhận được một 200 với bên trong cơ thể đề cập đến không có người như vậy, thay vì ví dụ một 404. Điều này cảm thấy kinda giống nhau. Nhưng điều này là không hợp lý, xin lỗi vì điều đó;) – Nanne

Trả lời

9

Một java.util.concurrent.ExecutorService có thể là những gì bạn muốn. Điều này cho phép bạn gửi một công việc để thực hiện sau này, và trả về một tương lai mà bạn có thể truy vấn để khám phá xem nó đã hoàn thành chưa.

(import '[java.util.concurrent Callable Executors]) 

(def job-executor 
    (Executors/newSingleThreadExecutor)) 

(def jobs (atom {})) 

(defn submit-job [func] 
    (let [job-id (str (java.util.UUID/randomUUID)) 
     callable (reify Callable (call [_] (func))] 
    (swap! jobs assoc job-id (.submit job-executor callable)) 
    job-id)) 

(use 'compojure.core) 

(defroutes app 
    (POST "/jobs" [& params] 
    (let [id (submit-job #(analyze params))] 
     {:status 201 :headers {"Location" (str "/jobs/" id)}})) 
    (GET "/jobs/:id" [id] 
    (let [job-future (@jobs id)] 
     (if (.isDone job-future) 
     (.get job-future) 
     {:status 404})))) 
+2

Không thể 'ExecutorService' có thể được thay thế bằng' tương lai' tích hợp? – neoascetic

2

Điều này dường như đang làm những gì tôi mong đợi, nhưng nó có vẻ khá không thành ngữ. Bất cứ ai cũng có suy nghĩ về cách cải thiện điều này?

;; Create a unique identifier 
(defn uuid [] (str (java.util.UUID/randomUUID))) 

;; Create a job-queue and a map for keeping track of the status 
(def job-queue (ref clojure.lang.PersistentQueue/EMPTY)) 
(def jobs (atom {})) 

(defn dequeue! [queue-ref] 
    ;; Pops the first element off the queue-ref 
    (dosync 
    (let [item (peek @queue-ref)] 
     (alter queue-ref pop) 
     item))) 

(defn schedule-job! [task] 
    ;; Schedule a task to be executed, expects a function (task) to be evaluated 
    (let [uuid (uuid) 
     job (delay task)] 
    (dosync 
     (swap! jobs assoc uuid job) 
     (alter job-queue conj job)))) 

(defn run-jobs [] 
    ;; Runs the jobs 
    (while true 
    (Thread/sleep 10) 
    (let [curr (dequeue! job-queue)] 
     (if-not (nil? curr) (@curr))))) 

(.start (Thread. run-jobs)) 
0

Mô tả của bạn có vẻ giống như một nhà sản xuất nhiều và một kịch bản tiêu dùng duy nhất. Dưới đây là một ví dụ mã (mà bạn có thể treo lên với những thứ REST và có thể một số ngoại lệ xử lý để đại lý mà không được chết)

(def worker (agent {}))                                

(defn do-task [name func]                                
    (send worker                                  
     (fn [results]                                 
      (let [r (func)]                                
      (assoc results name r))))) 

;submit tasks                            
(do-task "uuid1" #(print 10))                               
(do-task "uuid2" #(+ 1 1)) 

;get all results 
(print @worker) 
Các vấn đề liên quan