2010-01-28 34 views
25

Tôi có một quá trình ủy quyền nhiệm vụ asynch cho một nhóm chủ đề. Tôi cần đảm bảo rằng các tác vụ nhất định được thực hiện theo thứ tự. Vì vậy, ví dụKiểm soát thứ tự thực hiện Tác vụ với ExecutorService

Nhiệm vụ đến để

Nhiệm vụ a1, b1, c1, d1, e1, a2, a3, b2, f1

Công việc có thể được thực hiện theo thứ tự nào trừ trường hợp có một phụ thuộc tự nhiên, do đó, a1, a2, a3 phải được xử lý theo thứ tự đó bằng cách phân bổ cho cùng một luồng hoặc ngăn chặn chúng cho đến khi tôi biết nhiệm vụ # đã hoàn thành trước đó.

Hiện tại, nó không sử dụng gói Java Concurrency, nhưng tôi đang xem xét thay đổi để tận dụng khả năng quản lý luồng.

Có ai có một giải pháp tương tự hoặc gợi ý về cách để đạt được điều này

Trả lời

2

Khi bạn gửi một Runnable hoặc Callable một ExecutorService bạn nhận được một Future đáp lại. Có các chủ đề phụ thuộc vào a1 được chuyển qua số Future của a1 và gọi Future.get(). Điều này sẽ chặn cho đến khi chuỗi hoàn tất.

Vì vậy:

ExecutorService exec = Executor.newFixedThreadPool(5); 
Runnable a1 = ... 
final Future f1 = exec.submit(a1); 
Runnable a2 = new Runnable() { 
    @Override 
    public void run() { 
    f1.get(); 
    ... // do stuff 
    } 
} 
exec.submit(a2); 

và vân vân.

+4

Tôi không nghĩ rằng điều này sẽ làm việc với một hồ bơi thread cố định, như các chủ đề có thể tất cả các khối trên 'f1.get() 'cùng một lúc và được bế tắc. – finnw

+0

Chỉnh kích thước của hồ bơi nếu thích hợp. – cletus

+0

Hoặc sử dụng nhóm chủ đề được lưu trong bộ nhớ cache. – finnw

2

Một tùy chọn khác là tạo trình xử lý của riêng bạn, gọi nó là OrderedExecutor và tạo một mảng các đối tượng ThreadPoolExecutor được đóng gói, với 1 chuỗi cho mỗi trình thực thi nội bộ. Sau đó bạn cung cấp một cơ chế cho việc lựa chọn một trong các đối tượng nội bộ, ví dụ, bạn có thể làm điều này bằng cách cung cấp một giao diện người dùng của lớp học của bạn có thể thực hiện:

 
executor = new OrderedExecutor(10 /* pool size */, new OrderedExecutor.Chooser() { 
    public int choose(Runnable runnable) { 
    MyRunnable myRunnable = (MyRunnable)runnable; 
    return myRunnable.someId(); 
    }); 

executor.execute(new MyRunnable()); 

Việc thực hiện OrderedExecutor.execute() sau đó sẽ sử dụng Trình chọn để có được một int, bạn mod này với kích thước hồ bơi, và đó là chỉ số của bạn vào mảng nội bộ. Ý tưởng là "someId()" sẽ trả lại cùng một giá trị cho tất cả "a's", v.v.

12

Khi tôi đã thực hiện điều này trong quá khứ, tôi thường có đơn hàng được xử lý bởi một thành phần mà sau đó gửi callables/runnables cho một Executor.

Điều gì đó tương tự.

  • Got một danh sách các nhiệm vụ để chạy, một số phụ thuộc
  • Tạo một Executor và quấn với một ExecutorCompletionService
  • Tìm kiếm tất cả các nhiệm vụ, bất kỳ không phụ thuộc, lên lịch cho chúng thông qua dịch vụ hoàn
  • Thăm dò ý kiến dịch vụ hoàn
  • Khi mỗi nhiệm vụ hoàn thành
    • Thêm nó vào danh sách "hoàn thành"
    • Đánh giá lại bất kỳ nhiệm vụ chờ đợi nào trong "danh sách đã hoàn thành" để xem liệu chúng có "phụ thuộc hoàn thành" hay không.Nếu vậy lên lịch cho chúng
    • Rửa lặp lại cho đến khi tất cả các nhiệm vụ được nộp/hoàn

Dịch vụ hoàn là một cách tốt đẹp của việc có thể để có được những nhiệm vụ khi họ hoàn toàn chứ không phải là cố gắng để thăm dò ý kiến ​​một loạt các Tương lai. Tuy nhiên, bạn có thể muốn giữ Map<Future, TaskIdentifier> được điền khi công việc được lên lịch thông qua dịch vụ hoàn thành để khi dịch vụ hoàn thành cung cấp cho bạn một Tương lai hoàn chỉnh, bạn có thể tìm ra số TaskIdentifier đó là gì.

Nếu bạn đã từng thấy mình ở trạng thái nơi nhiệm vụ vẫn đang chờ chạy, nhưng không có gì đang chạy và không có gì có thể được lên lịch thì bạn có vấn đề phụ thuộc vòng tròn.

9

Tôi viết Trình xử lý riêng để đảm bảo thứ tự nhiệm vụ cho các tác vụ có cùng khóa. Nó sử dụng bản đồ các hàng đợi cho các nhiệm vụ đặt hàng với cùng một khóa. Mỗi tác vụ được khóa thực hiện tác vụ tiếp theo với cùng một khóa.

Giải pháp này không xử lý RejectedExecutionException hoặc các trường hợp ngoại lệ khác từ Người thực thi được ủy quyền! Vì vậy, Executor được ủy quyền nên "không giới hạn".

import java.util.HashMap; 
import java.util.LinkedList; 
import java.util.Map; 
import java.util.Queue; 
import java.util.concurrent.Executor; 

/** 
* This Executor warrants task ordering for tasks with same key (key have to implement hashCode and equal methods correctly). 
*/ 
public class OrderingExecutor implements Executor{ 

    private final Executor delegate; 
    private final Map<Object, Queue<Runnable>> keyedTasks = new HashMap<Object, Queue<Runnable>>(); 

    public OrderingExecutor(Executor delegate){ 
     this.delegate = delegate; 
    } 

    @Override 
    public void execute(Runnable task) { 
     // task without key can be executed immediately 
     delegate.execute(task); 
    } 

    public void execute(Runnable task, Object key) { 
     if (key == null){ // if key is null, execute without ordering 
      execute(task); 
      return; 
     } 

     boolean first; 
     Runnable wrappedTask; 
     synchronized (keyedTasks){ 
      Queue<Runnable> dependencyQueue = keyedTasks.get(key); 
      first = (dependencyQueue == null); 
      if (dependencyQueue == null){ 
       dependencyQueue = new LinkedList<Runnable>(); 
       keyedTasks.put(key, dependencyQueue); 
      } 

      wrappedTask = wrap(task, dependencyQueue, key); 
      if (!first) 
       dependencyQueue.add(wrappedTask); 
     } 

     // execute method can block, call it outside synchronize block 
     if (first) 
      delegate.execute(wrappedTask); 

    } 

    private Runnable wrap(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
     return new OrderedTask(task, dependencyQueue, key); 
    } 

    class OrderedTask implements Runnable{ 

     private final Queue<Runnable> dependencyQueue; 
     private final Runnable task; 
     private final Object key; 

     public OrderedTask(Runnable task, Queue<Runnable> dependencyQueue, Object key) { 
      this.task = task; 
      this.dependencyQueue = dependencyQueue; 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      try{ 
       task.run(); 
      } finally { 
       Runnable nextTask = null; 
       synchronized (keyedTasks){ 
        if (dependencyQueue.isEmpty()){ 
         keyedTasks.remove(key); 
        }else{ 
         nextTask = dependencyQueue.poll(); 
        } 
       } 
       if (nextTask!=null) 
        delegate.execute(nextTask); 
      } 
     } 
    } 
} 
+0

+1. Cảm ơn vì điều đó.Tôi sẽ sử dụng cấy ghép này, nhưng tôi thực sự không biết làm thế nào điều này không được đánh dấu là câu trả lời cuối cùng cho câu hỏi. –

0

Trong Habanero-Java library, có một khái niệm về nhiệm vụ dựa trên dữ liệu mà có thể được sử dụng để thể hiện sự phụ thuộc giữa các tác vụ và tránh hoạt động thread-blocking. Dưới thư viện bao gồm thư viện Habanero-Java sử dụng JDKs ForkJoinPool (tức là một ExecutorService).

Ví dụ, trường hợp sử dụng của bạn cho các nhiệm vụ A1, A2, A3, ... có thể được diễn tả như sau:

HjFuture a1 = future(() -> { doA1(); return true; }); 
HjFuture a2 = futureAwait(a1,() -> { doA2(); return true; }); 
HjFuture a3 = futureAwait(a2,() -> { doA3(); return true; }); 

Lưu ý rằng a1, a2, và a3 chỉ là tài liệu tham khảo để các đối tượng của loại HjFuture và có thể được duy trì trong các cấu trúc dữ liệu tùy chỉnh của bạn để chỉ định các phụ thuộc và khi các nhiệm vụ A2 và A3 đến vào lúc chạy.

Có một số tutorial slides available. Bạn có thể tìm thêm tài liệu dưới dạng javadoc, API summaryprimers.

0

Bạn có thể sử dụng Executors.newSingleThreadExecutor(), nhưng nó sẽ chỉ sử dụng một luồng để thực hiện các tác vụ của bạn. Một tùy chọn khác là sử dụng CountDownLatch. Dưới đây là một ví dụ đơn giản:

public class Main2 { 

public static void main(String[] args) throws InterruptedException { 

    final CountDownLatch cdl1 = new CountDownLatch(1); 
    final CountDownLatch cdl2 = new CountDownLatch(1); 
    final CountDownLatch cdl3 = new CountDownLatch(1); 

    List<Runnable> list = new ArrayList<Runnable>(); 
    list.add(new Runnable() { 
     public void run() { 
      System.out.println("Task 1"); 

      // inform that task 1 is finished 
      cdl1.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 1 is finished 
      try { 
       cdl1.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 2"); 

      // inform that task 2 is finished 
      cdl2.countDown(); 
     } 
    }); 

    list.add(new Runnable() { 
     public void run() { 
      // wait until task 2 is finished 
      try { 
       cdl2.await(); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 

      System.out.println("Task 3"); 

      // inform that task 3 is finished 
      cdl3.countDown(); 
     } 
    }); 

    ExecutorService es = Executors.newFixedThreadPool(200); 
    for (int i = 0; i < 3; i++) { 
     es.submit(list.get(i)); 
    } 

    es.shutdown(); 
    es.awaitTermination(1, TimeUnit.MINUTES); 
} 
} 
0

Tôi đã tạo một OrderingExecutor cho vấn đề này. Nếu bạn truyền cùng một khóa tới phương thức execute() với các runnables khác nhau, việc thực thi các runnables với cùng một khóa sẽ theo thứ tự thực thi() được gọi và sẽ không bao giờ chồng lên nhau.

import java.util.Arrays; 
import java.util.Collection; 
import java.util.Iterator; 
import java.util.Queue; 
import java.util.concurrent.ConcurrentHashMap; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.ConcurrentMap; 
import java.util.concurrent.Executor; 

/** 
* Special executor which can order the tasks if a common key is given. 
* Runnables submitted with non-null key will guaranteed to run in order for the same key. 
* 
*/ 
public class OrderedExecutor { 

    private static final Queue<Runnable> EMPTY_QUEUE = new QueueWithHashCodeAndEquals<Runnable>(
      new ConcurrentLinkedQueue<Runnable>()); 

    private ConcurrentMap<Object, Queue<Runnable>> taskMap = new ConcurrentHashMap<Object, Queue<Runnable>>(); 
    private Executor delegate; 
    private volatile boolean stopped; 

    public OrderedExecutor(Executor delegate) { 
     this.delegate = delegate; 
    } 

    public void execute(Runnable runnable, Object key) { 
     if (stopped) { 
      return; 
     } 

     if (key == null) { 
      delegate.execute(runnable); 
      return; 
     } 

     Queue<Runnable> queueForKey = taskMap.computeIfPresent(key, (k, v) -> { 
      v.add(runnable); 
      return v; 
     }); 
     if (queueForKey == null) { 
      // There was no running task with this key 
      Queue<Runnable> newQ = new QueueWithHashCodeAndEquals<Runnable>(new ConcurrentLinkedQueue<Runnable>()); 
      newQ.add(runnable); 
      // Use putIfAbsent because this execute() method can be called concurrently as well 
      queueForKey = taskMap.putIfAbsent(key, newQ); 
      if (queueForKey != null) 
       queueForKey.add(runnable); 
      delegate.execute(new InternalRunnable(key)); 
     } 
    } 

    public void shutdown() { 
     stopped = true; 
     taskMap.clear(); 
    } 

    /** 
    * Own Runnable used by OrderedExecutor. 
    * The runnable is associated with a specific key - the Queue&lt;Runnable> for this 
    * key is polled. 
    * If the queue is empty, it tries to remove the queue from taskMap. 
    * 
    */ 
    private class InternalRunnable implements Runnable { 

     private Object key; 

     public InternalRunnable(Object key) { 
      this.key = key; 
     } 

     @Override 
     public void run() { 
      while (true) { 
       // There must be at least one task now 
       Runnable r = taskMap.get(key).poll(); 
       while (r != null) { 
        r.run(); 
        r = taskMap.get(key).poll(); 
       } 
       // The queue emptied 
       // Remove from the map if and only if the queue is really empty 
       boolean removed = taskMap.remove(key, EMPTY_QUEUE); 
       if (removed) { 
        // The queue has been removed from the map, 
        // if a new task arrives with the same key, a new InternalRunnable 
        // will be created 
        break; 
       } // If the queue has not been removed from the map it means that someone put a task into it 
        // so we can safely continue the loop 
      } 
     } 
    } 

    /** 
    * Special Queue implementation, with equals() and hashCode() methods. 
    * By default, Java SE queues use identity equals() and default hashCode() methods. 
    * This implementation uses Arrays.equals(Queue::toArray()) and Arrays.hashCode(Queue::toArray()). 
    * 
    * @param <E> The type of elements in the queue. 
    */ 
    private static class QueueWithHashCodeAndEquals<E> implements Queue<E> { 

     private Queue<E> delegate; 

     public QueueWithHashCodeAndEquals(Queue<E> delegate) { 
      this.delegate = delegate; 
     } 

     public boolean add(E e) { 
      return delegate.add(e); 
     } 

     public boolean offer(E e) { 
      return delegate.offer(e); 
     } 

     public int size() { 
      return delegate.size(); 
     } 

     public boolean isEmpty() { 
      return delegate.isEmpty(); 
     } 

     public boolean contains(Object o) { 
      return delegate.contains(o); 
     } 

     public E remove() { 
      return delegate.remove(); 
     } 

     public E poll() { 
      return delegate.poll(); 
     } 

     public E element() { 
      return delegate.element(); 
     } 

     public Iterator<E> iterator() { 
      return delegate.iterator(); 
     } 

     public E peek() { 
      return delegate.peek(); 
     } 

     public Object[] toArray() { 
      return delegate.toArray(); 
     } 

     public <T> T[] toArray(T[] a) { 
      return delegate.toArray(a); 
     } 

     public boolean remove(Object o) { 
      return delegate.remove(o); 
     } 

     public boolean containsAll(Collection<?> c) { 
      return delegate.containsAll(c); 
     } 

     public boolean addAll(Collection<? extends E> c) { 
      return delegate.addAll(c); 
     } 

     public boolean removeAll(Collection<?> c) { 
      return delegate.removeAll(c); 
     } 

     public boolean retainAll(Collection<?> c) { 
      return delegate.retainAll(c); 
     } 

     public void clear() { 
      delegate.clear(); 
     } 

     @Override 
     public boolean equals(Object obj) { 
      if (!(obj instanceof QueueWithHashCodeAndEquals)) { 
       return false; 
      } 
      QueueWithHashCodeAndEquals<?> other = (QueueWithHashCodeAndEquals<?>) obj; 
      return Arrays.equals(toArray(), other.toArray()); 
     } 

     @Override 
     public int hashCode() { 
      return Arrays.hashCode(toArray()); 
     } 

    } 

} 
0

Tôi đã viết dịch vụ chấp hành đã thắng của mình theo thứ tự nhận thức được. Nó trình tự các nhiệm vụ có chứa tham chiếu liên quan nhất định và hiện tại là ánh sáng.

Bạn có thể đi qua các thực tại https://github.com/nenapu/SequenceAwareExecutorService

Các vấn đề liên quan