2011-07-02 24 views
25

Tôi quan tâm đến việc biết điều gì sẽ là cách tốt nhất để triển khai hàng đợi dựa trên chuỗi.Chủ đề và Hàng đợi

Ví dụ:

Tôi có 10 hành động mà tôi muốn thực thi chỉ với 4 chủ đề. Tôi muốn tạo một hàng đợi với tất cả 10 hành động được đặt tuyến tính và bắt đầu hành động 4 đầu tiên với 4 chủ đề, khi một trong các chủ đề được thực hiện xong, tiếp theo sẽ bắt đầu vv - Vì vậy, tại một thời điểm, số lượng chủ đề là hoặc 4 hoặc nhỏ hơn 4.

Trả lời

28

Có một lớp Queue trong thread trong thư viện chuẩn. Sử dụng mà bạn có thể làm điều gì đó như thế này:

require 'thread' 

queue = Queue.new 
threads = [] 

# add work to the queue 
queue << work_unit 

4.times do 
    threads << Thread.new do 
    # loop until there are no more things to do 
    until queue.empty? 
     # pop with the non-blocking flag set, this raises 
     # an exception if the queue is empty, in which case 
     # work_unit will be set to nil 
     work_unit = queue.pop(true) rescue nil 
     if work_unit 
     # do work 
     end 
    end 
    # when there is no more work, the thread will stop 
    end 
end 

# wait until all threads have completed processing 
threads.each { |t| t.join } 

Lý do tôi bật với cờ non-blocking là giữa và pop until queue.empty? thread khác có thể đã pop'ed hàng đợi, vì vậy trừ khi non-blocking cờ được thiết lập, chúng tôi có thể gặp khó khăn ở dòng đó mãi mãi.

Nếu bạn đang sử dụng MRI, trình thông dịch Ruby mặc định, hãy nhớ rằng các chuỗi sẽ không hoàn toàn đồng thời. Nếu công việc của bạn là CPU bị ràng buộc bạn cũng có thể chạy đơn luồng. Nếu bạn có một số hoạt động mà khối trên IO bạn có thể nhận được một số song song, nhưng YMMV. Ngoài ra, bạn có thể sử dụng một thông dịch viên cho phép đồng thời đầy đủ, chẳng hạn như jRuby hoặc Rubinius.

+1

Trong cuốc, nó gợi ý có 4 ': END_OF_WORK' 'work_unit's thay vì có một cửa sổ pop non-blocking. Ngoài ra, tuyên bố cuối cùng của bạn về các chủ đề không có CPU chạy đồng thời áp dụng cho YARV, nhưng không phải để JRuby. –

+0

@AndrewGrimm, tôi thích câu trả lời này bởi vì đôi khi bạn muốn có một hàng đợi công việc và các chủ đề kéo dài xung quanh để làm việc bất cứ khi nào một mục công việc mới được thêm vào. – akostadinov

7

Có khu vực một số đá quý triển khai mẫu này cho bạn; song song, đào và mỏ được gọi là threach (hoặc jruby_threach dưới jruby). Đó là một thay thế drop-in cho #each nhưng cho phép bạn chỉ định có bao nhiêu luồng để chạy với, bằng cách sử dụng một SizedQueue bên dưới để giữ cho mọi thứ khỏi bị vượt khỏi tầm kiểm soát.

Vậy ...

(1..10).threach(4) {|i| do_my_work(i) } 

Không đẩy công cụ của riêng tôi; có rất nhiều cách triển khai tốt để giúp mọi thứ trở nên dễ dàng hơn.

Nếu bạn đang sử dụng JRuby, jruby_threach là một triển khai tốt hơn nhiều - Java chỉ cung cấp một bộ phong phú hơn các luồng nguyên thủy và cấu trúc dữ liệu để sử dụng.

5

thực thi ví dụ mô tả:

require 'thread' 

p tasks = [ 
    {:file => 'task1'}, 
    {:file => 'task2'}, 
    {:file => 'task3'}, 
    {:file => 'task4'}, 
    {:file => 'task5'} 
] 

tasks_queue = Queue.new 
tasks.each {|task| tasks_queue << task} 

# run workers 
workers_count = 3 
workers = [] 
workers_count.times do |n| 
    workers << Thread.new(n+1) do |my_n| 
     while (task = tasks_queue.shift(true) rescue nil) do 
      delay = rand(0) 
      sleep delay 
      task[:result] = "done by worker ##{my_n} (in #{delay})" 
      p task 
     end 
    end 
end 

# wait for all threads 
workers.each(&:join) 

# output results 
puts "all done" 
p tasks 
1

Tôi sử dụng đá quý được gọi là work_queue. Nó thực sự thực tế.

Ví dụ:

require 'work_queue' 
wq = WorkQueue.new 4, 10 
(1..10).each do |number| 
    wq.enqueue_b("Thread#{number}") do |thread_name| 
     puts "Hello from the #{thread_name}" 
    end 
end 
wq.join 
Các vấn đề liên quan