2012-04-04 24 views
5

Tôi có một tình huống mà các chủ đề khác nhau điền vào hàng đợi (nhà sản xuất) và một người tiêu dùng truy xuất phần tử từ hàng đợi này. Vấn đề của tôi là khi một trong những phần tử này được lấy ra từ hàng đợi, một số được bỏ qua (thiếu tín hiệu?). Mã nhà sản xuất là:java đồng thời: đa người sản xuất một người tiêu dùng

class Producer implements Runnable { 

    private Consumer consumer; 

    Producer(Consumer consumer) { this.consumer = consumer; } 

    @Override 
public void run() { 
    consumer.send("message"); 
    } 
} 

và họ được tạo ra và chạy với:

ExecutorService executor = Executors.newSingleThreadExecutor(); 
for (int i = 0; i < 20; i++) { 
    executor.execute(new Producer(consumer)); 
} 

đang tiêu dùng là:

class Consumer implements Runnable { 

private Queue<String> queue = new ConcurrentLinkedQueue<String>(); 

void send(String message) { 
    synchronized (queue) { 
     queue.add(message); 
     System.out.println("SIZE: " + queue.size()); 
     queue.notify(); 
    } 
} 

@Override 
public void run() { 
    int counter = 0; 
    synchronized (queue) { 
    while(true) { 
     try { 
      System.out.println("SLEEP"); 
       queue.wait(10); 
     } catch (InterruptedException e) { 
       Thread.interrupted(); 
     } 
     System.out.println(counter); 
     if (!queue.isEmpty()) {    
      queue.poll(); 
      counter++; 
     } 
    } 
    } 
} 

} 

Khi đang chạy tôi nhận được đôi khi 20 yếu tố gia tăng và 20 được truy xuất, nhưng trong các trường hợp khác, các phần tử được truy xuất nhỏ hơn 20. Bất kỳ ý tưởng nào để khắc phục điều đó?

+0

Bạn đang sử dụng hỗn hợp lẻ của các cấu trúc đồng bộ hóa mức thấp ('wait',' notify') và các mức cao cấp ('ConcurrentLinkedQueue',' ExecutorService'). Sử dụng cái này hay cái kia! – artbristol

+0

Tôi đã làm điều đó nhưng trong cả hai trường hợp tôi có cùng một vấn đề – Randomize

+0

Tôi không thể thấy mã thực sự chạy Người tiêu dùng. – dhblah

Trả lời

10

Tôi khuyên bạn nên sử dụng BlockingQueue thay vì Hàng đợi. Một LinkedBlockingDeque có thể là một ứng cử viên tốt cho bạn.

Mã của bạn sẽ trông như thế này:

void send(String message) { 
    synchronized (queue) { 
     queue.put(message); 
     System.out.println("SIZE: " + queue.size()); 
    } 
} 

và sau đó bạn sẽ cần phải chỉ

queue.take() 

về chủ đề người tiêu dùng của bạn

Ý tưởng là .take rằng() sẽ chặn cho đến khi một mục có sẵn trong hàng đợi và sau đó trả về chính xác một (đó là nơi tôi nghĩ rằng việc triển khai của bạn bị: thiếu thông báo e bỏ phiếu). .put() có trách nhiệm thực hiện tất cả các thông báo cho bạn. Không chờ đợi/thông báo cần thiết.

+0

Đã thử LinkedBlockingDeque nhưng tôi vẫn gặp vấn đề tương tự – Randomize

+0

@Randomize, bạn có thể đăng một ví dụ về mã có vấn đề khi sử dụng BlockingQueue không? Mã người tiêu dùng là đủ. – charisis

+0

Tôi đang sử dụng lại chính xác cùng một mã ở trên tôi vừa thay thế ConcurrentLinkedQueue bằng LinkedBlockingDeque. – Randomize

2

Sự cố trong mã của bạn có thể do bạn đang sử dụng notify thay vì notifyAll. Cái cũ sẽ chỉ đánh thức một sợi đơn, nếu có một cái đang chờ trên khóa. Điều này cho phép điều kiện cuộc đua không có chủ đề nào đang chờ và tín hiệu bị mất. Một thông báoAll sẽ buộc tính chính xác với chi phí hiệu suất nhỏ bằng cách yêu cầu tất cả các luồng đánh thức để kiểm tra xem chúng có thể lấy khóa hay không.

Điều này được giải thích rõ nhất trong Effective Java 1st ed (xem tr.150). Phiên bản thứ hai đã loại bỏ mẹo này vì các lập trình viên dự kiến ​​sẽ sử dụng java.util.concurrent để cung cấp các đảm bảo đúng đắn hơn.

+0

Tôi đã sử dụng notifyAll nhưng không hoạt động – Randomize

+0

Có một người tiêu dùng duy nhất để thông báo/thông báoTất cả không tạo sự khác biệt –

1

Có vẻ như ý tưởng tồi khi sử dụng ConcurrentLinkedQueue và đồng bộ hóa cả hai cùng một lúc. Nó thách thức mục đích của cấu trúc dữ liệu đồng thời ngay từ đầu.

Không có vấn đề với cấu trúc dữ liệu ConcurrentLinkedQueue và thay thế bằng BlockingQueue sẽ giải quyết được vấn đề nhưng đây không phải là nguyên nhân gốc rễ.

Sự cố với queue.wait (10). Đây là phương thức chờ thời gian. Nó sẽ lấy lại khóa khi 10ms trôi qua.

  1. Thông báo (queue.notify()) sẽ bị mất vì không có chuỗi tiêu thụ nào chờ đợi nếu 10ms đã trôi qua.

  2. Nhà sản xuất sẽ không thể thêm vào hàng đợi vì họ không thể lấy khóa vì khóa được người tiêu dùng yêu cầu lại.

Di chuyển đến Chặn Queue đã giải quyết được sự cố của bạn vì bạn đã xóa mã chờ đợi (10) và đợi và thông báo bằng cách chặn cấu trúc dữ liệu Queue.

Các vấn đề liên quan