2009-05-01 17 views
75

Tôi có một lớp học lấy đối tượng từ một số BlockingQueue và xử lý chúng bằng cách gọi số take() trong vòng lặp liên tục. Tại một số điểm tôi biết rằng không có nhiều đối tượng sẽ được thêm vào hàng đợi. Tôi làm cách nào để gián đoạn phương thức take() để nó dừng chặn?Làm thế nào để ngắt một BlockingQueue đang chặn trên take()?

Dưới đây là lớp mà xử lý các đối tượng:

public class MyObjHandler implements Runnable { 

    private final BlockingQueue<MyObj> queue; 

    public class MyObjHandler(BlockingQueue queue) { 
    this.queue = queue; 
    } 

    public void run() { 
    try { 
     while (true) { 
     MyObj obj = queue.take(); 
     // process obj here 
     // ... 
     } 
    } catch (InterruptedException e) { 
     Thread.currentThread().interrupt(); 
    } 
    } 
} 

Và đây là phương pháp sử dụng lớp này để xử lý các đối tượng:

public void testHandler() { 

    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjectHandler handler = new MyObjectHandler(queue); 
    new Thread(handler).start(); 

    // get objects for handler to process 
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext();) { 
    queue.put(i.next()); 
    } 

    // what code should go here to tell the handler 
    // to stop waiting for more objects? 
} 

Trả lời

61

Nếu làm gián đoạn các chủ đề không phải là một lựa chọn, một là đặt một "dấu hiệu" hoặc "lệnh" đối tượng trên hàng đợi đó sẽ được công nhận như vậy bởi MyObjHandler và nghỉ ngơi ra khỏi vòng lặp.

+34

Phương pháp này còn được gọi là phương pháp 'Ngừng thuốc độc' và được thảo luận theo chiều dài trong "Thực hành đồng thời Java", cụ thể trên trang 155-156. –

12
BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 
MyObjectHandler handler = new MyObjectHandler(queue); 
Thread thread = new Thread(handler); 
thread.start(); 
for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext();) { 
    queue.put(i.next()); 
} 
thread.interrupt(); 

Tuy nhiên, nếu bạn làm điều này, thread có thể bị gián đoạn trong khi vẫn còn các mục trong hàng đợi, đang chờ xử lý. Bạn có thể muốn xem xét sử dụng poll thay vì take, điều này sẽ cho phép chuỗi xử lý hết thời gian chờ và chấm dứt khi quá trình chờ trong một thời gian không có đầu vào mới.

+0

Có, đó là vấn đề nếu chuỗi bị gián đoạn trong khi vẫn còn các mục trong hàng đợi. Để giải quyết vấn đề này, tôi đã thêm mã để đảm bảo hàng đợi trống trước khi ngắt chuỗi: while (queue.size()>0) Thread.currentThread().sleep(5000); MCS

+2

@MCS - lưu ý cho khách truy cập trong tương lai rằng cách tiếp cận của bạn ở đây là hack và không được sao chép trong mã sản xuất vì ba lý do. Nó luôn luôn được ưa thích để tìm một cách thực tế để móc tắt máy. Nó không bao giờ được chấp nhận để sử dụng 'Thread.sleep()' như một sự thay thế cho một cái móc thích hợp. Trong các triển khai khác, các luồng khác có thể đặt mọi thứ vào hàng đợi và vòng lặp while có thể không bao giờ kết thúc. –

+0

Rất may, không cần phải dựa vào các hack như vậy vì một cách dễ dàng xử lý nó * sau * ngắt nếu cần thiết. Ví dụ, một "thực thi' take() 'đầy đủ có thể trông giống như:' try {return take(); } catch (InterruptedException e) {E o = poll(); if (o == null) ném e; Thread.currentThread(). Interrupt(); return o; } 'Tuy nhiên, không có lý do gì cần phải được thực hiện ở lớp này, và thực hiện nó cao hơn một chút sẽ dẫn đến mã hiệu quả hơn (chẳng hạn như bằng cách tránh phần tử' InterruptedException' và/hoặc bằng cách sử dụng 'BlockingQueue.drainTo () '). – antak

0

Hoặc đừng làm gián đoạn, khó chịu của nó.

public class MyQueue<T> extends ArrayBlockingQueue<T> { 

     private static final long serialVersionUID = 1L; 
     private boolean done = false; 

     public ParserQueue(int capacity) { super(capacity); } 

     public void done() { done = true; } 

     public boolean isDone() { return done; } 

     /** 
     * May return null if producer ends the production after consumer 
     * has entered the element-await state. 
     */ 
     public T take() throws InterruptedException { 
      T el; 
      while ((el = super.poll()) == null && !done) { 
       synchronized (this) { 
        wait(); 
       } 
      } 

      return el; 
     } 
    } 
  1. khi puts sản xuất đối tượng vào hàng đợi, hãy gọi queue.notify(), nếu nó kết thúc, hãy gọi queue.done()
  2. vòng lặp while (! Queue.isDone() ||! Queue.isEmpty())
  3. nghiệm thu() giá trị trả for null
+0

Tôi sẽ nói rằng giải pháp trước đó là sạch hơn và đơn giản hơn – sakthisundar

+0

Cờ thực hiện này giống như một viên thuốc độc, nó chỉ được quản lý khác nhau :) –

+0

Cleaner? Tôi nghi ngờ điều đó. Bạn không biết khi nào thread bị gián đoạn chính xác. Hoặc để tôi hỏi, cái gì sạch hơn với điều đó? Ít mã của nó, đó là một thực tế. – tomasb

12

Rất muộn nhưng Hope this helps khác nữa như tôi phải đối mặt với vấn đề tương tự và sử dụngCách tiếp cận 0 được đề xuất bởi erickson above với một số thay đổi nhỏ,

class MyObjHandler implements Runnable 
{ 
    private final BlockingQueue<MyObj> queue; 
    public volatile boolean Finished; //VOLATILE GUARANTEES UPDATED VALUE VISIBLE TO ALL 
    public MyObjHandler(BlockingQueue queue) 
    { 
     this.queue = queue; 
     Finished = false; 
    } 
    @Override 
    public void run() 
    {   
     while (true) 
     { 
      try 
      { 
       MyObj obj = queue.poll(100, TimeUnit.MILLISECONDS); 
       if(obj!= null)//Checking if job is to be processed then processing it first and then checking for return 
       { 
        // process obj here 
        // ... 
       } 
       if(Finished && queue.isEmpty()) 
        return; 

      } 
      catch (InterruptedException e) 
      {     
       return; 
      } 
     } 
    } 
} 

public void testHandler() 
{ 
    BlockingQueue<MyObj> queue = new ArrayBlockingQueue<MyObj>(100); 

    MyObjHandler handler = new MyObjHandler(queue); 
    new Thread(handler).start(); 

    // get objects for handler to process 
    for (Iterator<MyObj> i = getMyObjIterator(); i.hasNext();) 
    { 
     queue.put(i.next()); 
    } 

    // what code should go here to tell the handler to stop waiting for more objects? 
    handler.Finished = true; //THIS TELLS HIM 
    //If you need you can wait for the termination otherwise remove join 
    myThread.join(); 
} 

này giải quyết tất cả những vấn đề

  1. Dán cờ hiệu các BlockingQueue để nó biết nó đã không phải chờ đợi nhiều hơn cho các yếu tố
  2. Không bị gián đoạn ở giữa để các khối xử lý chấm dứt chỉ khi tất cả các mục trong hàng đợi được xử lý và không còn mục nào được thêm vào
+2

Biến 'Finished' thành' volatile' để đảm bảo khả năng hiển thị giữa các luồng. Xem http://stackoverflow.com/a/106787 – lukk

+0

@lukk cảm ơn phản hồi của bạn, tôi đã chỉnh sửa câu trả lời để bao gồm điểm của bạn ..... – dbw

+2

Nếu tôi không nhầm, yếu tố cuối cùng trong hàng đợi sẽ không được xử lý. Khi bạn lấy phần tử cuối cùng từ hàng đợi, kết thúc là đúng và hàng đợi rỗng, vì vậy nó sẽ trả về trước khi xử lý phần tử cuối cùng đó. Thêm điều kiện thứ ba nếu (Đã hoàn thành && queue.isEmpty() && obj == null) –

Các vấn đề liên quan