2010-06-25 41 views
32

Có lẽ đây là một câu hỏi ngớ ngẩn, nhưng tôi dường như không thể tìm thấy câu trả lời rõ ràng.Hàng đợi tập hợp đồng thời

Tôi cần một hàng đợi FIFO đồng thời chỉ chứa các giá trị duy nhất. Cố gắng thêm một giá trị đã tồn tại trong hàng đợi chỉ đơn giản là bỏ qua giá trị đó. Mà, nếu không cho sự an toàn thread sẽ là tầm thường. Có một cấu trúc dữ liệu trong Java hoặc có thể là một mã snipit trên interwebs thể hiện hành vi này?

+0

Đáng tiếc là thuật ngữ "hàng đợi" rất mơ hồ, như một số người đọc nó mặc nhiên có nghĩa là "hàng đợi FIFO", trong khi những người khác nó có ý nghĩa tổng quát hơn 'java.util.Queue', về cơ bản có nghĩa là bất kỳ bộ sưu tập nào có * một số * khái niệm về" phần tử đầu ", cho dù phần tử đó có phải là phần tử đầu tiên hay không. Vì thế! Đó là nó? –

+0

FIFO, xin lỗi về thiếu sót =) –

Trả lời

5

Nếu bạn muốn đồng thời tốt hơn đồng bộ hóa đầy đủ, có một cách tôi biết để làm điều đó, sử dụng ConcurrentHashMap làm bản đồ sao lưu. Sau đây chỉ là một bản phác thảo.

public final class ConcurrentHashSet<E> extends ForwardingSet<E> 
    implements Set<E>, Queue<E> { 
    private enum Dummy { VALUE } 

    private final ConcurrentMap<E, Dummy> map; 

    ConcurrentHashSet(ConcurrentMap<E, Dummy> map) { 
    super(map.keySet()); 
    this.map = Preconditions.checkNotNull(map); 
    } 

    @Override public boolean add(E element) { 
    return map.put(element, Dummy.VALUE) == null; 
    } 

    @Override public boolean addAll(Collection<? extends E> newElements) { 
    // just the standard implementation 
    boolean modified = false; 
    for (E element : newElements) { 
     modified |= add(element); 
    } 
    return modified; 
    } 

    @Override public boolean offer(E element) { 
    return add(element); 
    } 

    @Override public E remove() { 
    E polled = poll(); 
    if (polled == null) { 
     throw new NoSuchElementException(); 
    } 
    return polled; 
    } 

    @Override public E poll() { 
    for (E element : this) { 
     // Not convinced that removing via iterator is viable (check this?) 
     if (map.remove(element) != null) { 
     return element; 
     } 
    } 
    return null; 
    } 

    @Override public E element() { 
    return iterator().next(); 
    } 

    @Override public E peek() { 
    Iterator<E> iterator = iterator(); 
    return iterator.hasNext() ? iterator.next() : null; 
    } 
} 

Tất cả đều không có ánh nắng mặt trời với cách tiếp cận này. Chúng tôi không có cách nào tốt để chọn yếu tố đầu khác ngoài việc sử dụng bản đồ sao lưu entrySet().iterator().next(), kết quả là bản đồ càng ngày càng mất cân bằng khi thời gian trôi qua. Sự mất cân đối này là một vấn đề cả do va chạm xô lớn hơn và tranh chấp phân khúc lớn hơn.

Lưu ý: mã này sử dụng Guava ở một vài nơi.

+3

Làm cách nào để duy trì thứ tự của 'Queue'? Thứ tự lặp lại phụ thuộc vào việc thực hiện bản đồ sao lưu. – erickson

+1

Bảo tồn thứ tự nào? Tôi không thấy bất kỳ yêu cầu nào về thứ tự, trừ khi nó được giả định rằng anh ta muốn một hàng đợi * FIFO * ... Tôi đã thêm một bình luận để hỏi. –

+0

@NitsanWakart Câu hỏi tôi trả lời là làm thế nào để có được một 'Queue'. –

5

Không có bộ sưu tập tích hợp thực hiện việc này. Có một số triển khai Set đồng thời có thể được sử dụng cùng với Queue đồng thời.

Ví dụ: một mục chỉ được thêm vào hàng đợi sau khi nó được thêm thành công vào bộ này và mỗi mục bị xóa khỏi hàng đợi sẽ bị xóa khỏi tập hợp. Trong trường hợp này, nội dung của hàng đợi, lô-gic, thực sự là bất kỳ thứ gì trong tập hợp và hàng đợi chỉ được sử dụng để theo dõi thứ tự và cung cấp các hoạt động hiệu quả take()poll() chỉ tìm thấy trên BlockingQueue.

+1

Một trong những triển khai của tôi đã sử dụng LinkedHashSet để tôi chỉ có một cấu trúc dữ liệu và có thể phụ thuộc vào thứ tự. Tuy nhiên, thuật toán đằng sau ConcurrentQueue khá phức tạp hơn một chút sau đó sử dụng khóa đồng bộ hóa và tôi đã tự hỏi liệu có một bộ sưu tập biểu diễn có ràng buộc duy nhất bổ sung hay không. –

+0

@Ambience - Không có bộ sưu tập như vậy. Kỹ thuật của tôi cho phép bạn sử dụng đồng thời 'Queue' (hoặc' LinkedBlockingQueue' nếu bạn cần 'take()' hoặc 'ConcurrentLinkedQueue' nếu bạn chỉ cần' poll() '), giữ nguyên thứ tự FIFO, trong khi thêm' Set' giống như tính độc đáo. – erickson

4

A java.util.concurrent.ConcurrentLinkedQueue giúp bạn tận dụng tối đa.

Quấn ConcurrentLinkedQueue bằng lớp của riêng bạn để kiểm tra tính độc đáo của phần bổ sung. Mã của bạn phải là chủ đề an toàn.

+1

Sau khi được bọc, nó có thể không còn an toàn nữa, ngay cả khi nó dựa trên 'ConcurrentLinkedQueue'. – finnw

+0

@finnw: Lưu ý đúng. Tôi đã cập nhật câu trả lời của mình. –

+1

Việc thực thi pass đầu tiên của tôi khá chính xác nhưng tôi lo lắng về chi phí của việc gọi .contains() trên danh sách liên kết được sao lưu và cũng đồng bộ hóa các phương thức xếp hàng hoàn toàn phủ nhận lợi ích của thuật toán cơ bản của ConcurrentLinkdQueue. –

4

Tôi sẽ sử dụng LinkedHashSet được đồng bộ hóa cho đến khi có đủ lý do để cân nhắc các giải pháp thay thế. Lợi ích chính mà một giải pháp đồng thời hơn có thể cung cấp là tách khóa.

Cách tiếp cận đồng thời đơn giản nhất sẽ là ConcurrentHashMap (hoạt động như một tập hợp) và ConcurrentLinkedQueue. Thứ tự các hoạt động sẽ cung cấp ràng buộc mong muốn. Một lời mời() trước tiên sẽ thực hiện một hàm CHM # putIfAbsent() và nếu chèn thành công vào CLQ. Một poll() sẽ lấy từ CLQ và sau đó loại bỏ nó khỏi CHM. Điều này có nghĩa rằng chúng ta xem xét một mục trong hàng đợi của chúng ta nếu nó nằm trong bản đồ và CLQ cung cấp thứ tự. Hiệu suất sau đó có thể được điều chỉnh bằng cách tăng concurrencyLevel của bản đồ. Nếu bạn khoan dung với tính ưu việt bổ sung, thì giá trị CHM # get() rẻ có thể hoạt động như một điều kiện tiên quyết hợp lý (nhưng nó có thể bị ảnh hưởng bởi một cái nhìn hơi cũ).

2

Bạn có ý nghĩa gì với hàng đợi đồng thời với Đặt ngữ nghĩa? Nếu bạn có nghĩa là một cấu trúc thực sự đồng thời (như trái ngược với một cấu trúc thread-an toàn) thì tôi sẽ cho rằng bạn đang yêu cầu một con ngựa.

Điều gì sẽ xảy ra nếu bạn gọi put(element) và phát hiện thấy có điều gì đó đã có ngay lập tức bị xóa? Ví dụ: trường hợp của bạn có ý nghĩa gì nếu offer(element) || queue.contains(element) trả lại false?

Những loại sự việc này thường cần phải suy nghĩ về một chút khác biệt trong một thế giới đồng thời thường không có gì là như nó có vẻ, trừ khi bạn ngăn chặn thế giới (khóa nó xuống). Nếu không, bạn thường nhìn vào một cái gì đó trong quá khứ. Vì vậy, bạn đang thực sự cố gắng làm gì?

+1

Quan sát thú vị, nếu không phải là câu trả lời =) Bạn có dưới ngưỡng điểm nhận xét không? –

+0

Tôi đã :-) Xin lỗi về điều đó. –

0

Có thể mở rộng ArrayBlockingQueue. Để có được quyền truy cập vào khóa (truy cập gói), tôi phải đặt lớp con của mình trong cùng một gói. Caveat: Tôi chưa thử nghiệm điều này.

package java.util.concurrent; 

import java.util.Collection; 
import java.util.concurrent.locks.ReentrantLock; 

public class DeDupingBlockingQueue<E> extends ArrayBlockingQueue<E> { 

    public DeDupingBlockingQueue(int capacity) { 
     super(capacity); 
    } 

    public DeDupingBlockingQueue(int capacity, boolean fair) { 
     super(capacity, fair); 
    } 

    public DeDupingBlockingQueue(int capacity, boolean fair, Collection<? extends E> c) { 
     super(capacity, fair, c); 
    } 

    @Override 
    public boolean add(E e) { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (contains(e)) return false; 
      return super.add(e); 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public boolean offer(E e) { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (contains(e)) return true; 
      return super.offer(e); 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public void put(E e) throws InterruptedException { 
     final ReentrantLock lock = this.lock; 
     lock.lockInterruptibly(); //Should this be lock.lock() instead? 
     try { 
      if (contains(e)) return; 
      super.put(e); //if it blocks, it does so without holding the lock. 
     } finally { 
      lock.unlock(); 
     } 
    } 

    @Override 
    public boolean offer(E e, long timeout, TimeUnit unit) throws InterruptedException { 
     final ReentrantLock lock = this.lock; 
     lock.lock(); 
     try { 
      if (contains(e)) return true; 
      return super.offer(e, timeout, unit); //if it blocks, it does so without holding the lock. 
     } finally { 
      lock.unlock(); 
     } 
    } 
} 
0

Một câu trả lời đơn giản cho một danh sách các đối tượng độc nhất có thể được như sau:

import java.util.concurrent.ConcurrentLinkedQueue; 

public class FinalQueue { 

    class Bin { 
     private int a; 
     private int b; 

     public Bin(int a, int b) { 
      this.a = a; 
      this.b = b; 
     } 

     @Override 
     public int hashCode() { 
      return a * b; 
     } 

     public String toString() { 
      return a + ":" + b; 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if (this == obj) 
       return true; 
      if (obj == null) 
       return false; 
      if (getClass() != obj.getClass()) 
       return false; 
      Bin other = (Bin) obj; 
      if ((a != other.a) || (b != other.b)) 
       return false; 
      return true; 
     } 
    } 

    private ConcurrentLinkedQueue<Bin> queue; 

    public FinalQueue() { 
     queue = new ConcurrentLinkedQueue<Bin>(); 
    } 

    public synchronized void enqueue(Bin ipAddress) { 
     if (!queue.contains(ipAddress)) 
      queue.add(ipAddress); 
    } 

    public Bin dequeue() { 
     return queue.poll(); 
    } 

    public String toString() { 
     return "" + queue; 
    } 

    /** 
    * @param args 
    */ 
    public static void main(String[] args) { 
     FinalQueue queue = new FinalQueue(); 
     Bin a = queue.new Bin(2,6); 

     queue.enqueue(a); 
     queue.enqueue(queue.new Bin(13, 3)); 
     queue.enqueue(queue.new Bin(13, 3)); 
     queue.enqueue(queue.new Bin(14, 3)); 
     queue.enqueue(queue.new Bin(13, 9)); 
     queue.enqueue(queue.new Bin(18, 3)); 
     queue.enqueue(queue.new Bin(14, 7)); 
     Bin x= queue.dequeue(); 
     System.out.println(x.a); 
     System.out.println(queue.toString()); 
     System.out.println("Dequeue..." + queue.dequeue()); 
     System.out.println("Dequeue..." + queue.dequeue()); 
     System.out.println(queue.toString()); 
    } 
} 
Các vấn đề liên quan