2011-07-20 29 views
12

Tôi có một số ScheduledExecutorService với các tác vụ được lên lịch để thực thi sau một giờ. Làm cách nào để tôi có được danh sách các tác vụ nổi bật để tôi có thể buộc họ chạy ngay lập tức?Làm thế nào để chạy các tác vụ xuất sắc ngay sau khi ExecutorService.shutdown()?

Tôi tin shutdown() sẽ đợi một tiếng đồng hồ và có vẻ như nếu shutdownNow() trả về một danh sách các Runnables rằng không thể chạy() bởi vì việc thực hiện Runnable kiểm tra tình trạng Executor và khi nó thông báo rằng nó đã tắt Runnable từ chối chạy . Xem ScheduledThreadPoolExecutor.ScheduledFutureTask.run() để thực hiện thực tế.

Bất kỳ ý tưởng nào?

Trả lời

3

Tôi đã lấy câu trả lời của Mark Peters, triển khai tất cả các phương pháp trừu tượng, thêm an toàn luồng và cố gắng tôn trọng cấu hình ScheduledThreadPoolExecutor bên dưới bất cứ khi nào có thể.

/** 
* Overrides shutdown() to run outstanding tasks immediately. 
* 
* @author Gili Tzabari 
*/ 
public class RunOnShutdownScheduledExecutorService extends AbstractExecutorService 
    implements ScheduledExecutorService 
{ 
    private final ScheduledExecutorService delegate; 
    private final ScheduledThreadPoolExecutor scheduledThreadPoolExecutor; 
    private final ExecutorService immediateService; 
    private final ConcurrentMap<Future<?>, Callable<?>> tasks = Maps.newConcurrentMap(); 

    /** 
    * Creates a new RunOnShutdownScheduledExecutorService. 
    * 
    * @param delegate the executor to delegate to 
    */ 
    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegate) 
    { 
     Preconditions.checkNotNull(delegate, "delegate may not be null"); 

     this.delegate = delegate; 
     if (delegate instanceof ScheduledThreadPoolExecutor) 
     { 
      this.scheduledThreadPoolExecutor = (ScheduledThreadPoolExecutor) delegate; 
      this.immediateService = Executors.newFixedThreadPool(scheduledThreadPoolExecutor. 
       getCorePoolSize(), scheduledThreadPoolExecutor.getThreadFactory()); 
     } 
     else 
     { 
      scheduledThreadPoolExecutor = null; 
      this.immediateService = Executors.newSingleThreadExecutor(new ThreadFactoryBuilder(). 
       setNameFormat(RunOnShutdownScheduledExecutorService.class.getName() + "-%d").build()); 
     } 
    } 

    @Override 
    public boolean isShutdown() 
    { 
     return delegate.isShutdown(); 
    } 

    @Override 
    public boolean isTerminated() 
    { 
     return delegate.isTerminated(); 
    } 

    @Override 
    public boolean awaitTermination(long timeout, TimeUnit unit) throws InterruptedException 
    { 
     long before = System.nanoTime(); 
     if (!delegate.awaitTermination(timeout, unit)) 
      return false; 
     long after = System.nanoTime(); 
     long timeLeft = timeout - unit.convert(after - before, TimeUnit.NANOSECONDS); 
     return immediateService.awaitTermination(timeLeft, unit); 
    } 

    @Override 
    public void execute(Runnable command) 
    { 
     delegate.execute(command); 
    } 

    @Override 
    public ScheduledFuture<?> schedule(final Runnable command, long delay, TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = delegate.schedule(decorated, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public <V> ScheduledFuture<V> schedule(Callable<V> callable, long delay, TimeUnit unit) 
    { 
     CallableWithFuture<V> decorated = new CallableWithFuture<>(callable); 
     ScheduledFuture<V> future = delegate.schedule(decorated, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, callable); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public ScheduledFuture<?> scheduleAtFixedRate(Runnable command, long initialDelay, long period, 
     TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = delegate.scheduleAtFixedRate(decorated, initialDelay, period, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public ScheduledFuture<?> scheduleWithFixedDelay(Runnable command, long initialDelay, long delay, 
     TimeUnit unit) 
    { 
     CleaningRunnable decorated = new CleaningRunnable(command); 
     ScheduledFuture<?> future = 
      delegate.scheduleWithFixedDelay(decorated, initialDelay, delay, unit); 
     decorated.setFuture(future); 
     tasks.put(future, Executors.callable(command)); 
     return new CleaningScheduledFuture<>(future); 
    } 

    @Override 
    public synchronized void shutdown() 
    { 
     if (delegate.isShutdown()) 
      return; 
     if (scheduledThreadPoolExecutor != null) 
     { 
      // WORKAROUND: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418 
      // 
      // Cancel waiting scheduled tasks, otherwise executor won't shut down 
      scheduledThreadPoolExecutor.setExecuteExistingDelayedTasksAfterShutdownPolicy(false); 
     } 
     delegate.shutdown(); 
     // Users will not be able to cancel() Futures past this point so we're guaranteed that 
     // "tasks" will not be modified. 

     final List<Callable<?>> outstandingTasks = Lists.newArrayList(); 
     for (Map.Entry<Future<?>, Callable<?>> entry: tasks.entrySet()) 
     { 
      Future<?> future = entry.getKey(); 
      Callable<?> task = entry.getValue(); 

      if (future.isDone() && future.isCancelled()) 
      { 
       // Task called by the underlying executor, not the user. See CleaningScheduledFuture. 
       outstandingTasks.add(task); 
      } 
     } 
     tasks.clear(); 
     if (outstandingTasks.isEmpty()) 
     { 
      immediateService.shutdown(); 
      return; 
     } 

     immediateService.submit(new Callable<Void>() 
     { 
      @Override 
      public Void call() throws Exception 
      { 
       delegate.awaitTermination(Long.MAX_VALUE, TimeUnit.DAYS); 

       // Execute outstanding tasks only after the delegate executor finishes shutting down 
       for (Callable<?> task: outstandingTasks) 
        immediateService.submit(task); 
       immediateService.shutdown(); 
       return null; 
      } 
     }); 
    } 

    @Override 
    public List<Runnable> shutdownNow() 
    { 
     return delegate.shutdownNow(); 
    } 

    /** 
    * A Runnable that removes its future when running. 
    */ 
    private class CleaningRunnable implements Runnable 
    { 
     private final Runnable delegate; 
     private Future<?> future; 

     /** 
     * Creates a new RunnableWithFuture. 
     * 
     * @param delegate the Runnable to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CleaningRunnable(Runnable delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     /** 
     * Associates a Future with the runnable. 
     * 
     * @param future a future 
     */ 
     public void setFuture(Future<?> future) 
     { 
      this.future = future; 
     } 

     @Override 
     public void run() 
     { 
      tasks.remove(future); 
      delegate.run(); 
     } 
    } 

    /** 
    * A Callable that removes its future when running. 
    */ 
    private class CallableWithFuture<V> implements Callable<V> 
    { 
     private final Callable<V> delegate; 
     private Future<V> future; 

     /** 
     * Creates a new CallableWithFuture. 
     * 
     * @param delegate the Callable to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CallableWithFuture(Callable<V> delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     /** 
     * Associates a Future with the runnable. 
     * 
     * @param future a future 
     */ 
     public void setFuture(Future<V> future) 
     { 
      this.future = future; 
     } 

     @Override 
     public V call() throws Exception 
     { 
      tasks.remove(future); 
      return delegate.call(); 
     } 
    } 

    /** 
    * A ScheduledFuture that removes its future when canceling. 
    * 
    * This allows us to differentiate between tasks canceled by the user and the underlying 
    * executor. Tasks canceled by the user are removed from "tasks". 
    * 
    * @param <V> The result type returned by this Future 
    */ 
    private class CleaningScheduledFuture<V> implements ScheduledFuture<V> 
    { 
     private final ScheduledFuture<V> delegate; 

     /** 
     * Creates a new MyScheduledFuture. 
     * 
     * @param delegate the future to delegate to 
     * @throws NullPointerException if delegate is null 
     */ 
     public CleaningScheduledFuture(ScheduledFuture<V> delegate) 
     { 
      Preconditions.checkNotNull(delegate, "delegate may not be null"); 

      this.delegate = delegate; 
     } 

     @Override 
     public long getDelay(TimeUnit unit) 
     { 
      return delegate.getDelay(unit); 
     } 

     @Override 
     public int compareTo(Delayed o) 
     { 
      return delegate.compareTo(o); 
     } 

     @Override 
     public boolean cancel(boolean mayInterruptIfRunning) 
     { 
      boolean result = delegate.cancel(mayInterruptIfRunning); 

      if (result) 
      { 
       // Tasks canceled by users are removed from "tasks" 
       tasks.remove(delegate); 
      } 
      return result; 
     } 

     @Override 
     public boolean isCancelled() 
     { 
      return delegate.isCancelled(); 
     } 

     @Override 
     public boolean isDone() 
     { 
      return delegate.isDone(); 
     } 

     @Override 
     public V get() throws InterruptedException, ExecutionException 
     { 
      return delegate.get(); 
     } 

     @Override 
     public V get(long timeout, TimeUnit unit) throws InterruptedException, ExecutionException, 
      TimeoutException 
     { 
      return delegate.get(timeout, unit); 
     } 
    } 
} 
+1

Tôi vừa phát hiện ra một lỗi khó chịu trong ScheduledThreadPoolExecutor. Nếu một chuỗi công nhân đang chờ đợi một nhiệm vụ sẽ chỉ thực hiện trong một giờ và bạn hủy nhiệm vụ đó, nhân viên sẽ tiếp tục chờ và người điều hành sẽ không tắt. Tôi đã gửi báo cáo lỗi: http://bugs.sun.com/bugdatabase/view_bug.do?bug_id=7069418 – Gili

+0

Tôi đã cập nhật câu trả lời của mình với một giải pháp cho lỗi # 7069418 – Gili

0

Câu hỏi hay! Có vẻ như bạn có thể đang tự vá với nhau một giải pháp.

Một tùy chọn có thể là để bọc ScheduledThreadPoolExecutor với việc triển khai ScheduledExecutorService của riêng bạn. Khi đến lúc tắt dịch vụ, hãy hủy mọi tác vụ có thể bị hủy và thay vào đó gửi chúng đến một dịch vụ sẽ thực hiện chúng ngay lập tức. Sau đó, shutdown() dịch vụ đó.

Dưới đây là một số mã rất thô thể hiện ý tôi, mặc dù tôi cảnh báo bạn rằng có thể có những cạm bẫy ở đây vì nó đã được cập nhật sau vài phút. Đặc biệt, tôi đã không nỗ lực nhiều để đảm bảo đây là chủ đề an toàn.

class RunOnShutdownScheduledExecutorService extends AbstractExecutorService implements ScheduledExecutorService { 
    private final ScheduledExecutorService delegateService; 

    private Map<Future<?>, Runnable> scheduledFutures = 
      Collections.synchronizedMap(new IdentityHashMap<Future<?>, Runnable>()); 


    public RunOnShutdownScheduledExecutorService(ScheduledExecutorService delegateService) { 
     this.delegateService = delegateService; 
    } 

    public ScheduledFuture<?> schedule(Runnable command, long delay, TimeUnit unit) { 
     ScheduledFuture<?> future = delegateService.schedule(command, delay, unit); 
     scheduledFutures.put(future, command); 
     return future; 
    } 

    public void shutdown() { 
     delegateService.shutdown(); 
     ExecutorService immediateService = Executors.newFixedThreadPool(5); 
     for (Map.Entry<Future<?>, Runnable> entry : scheduledFutures.entrySet()) { 
      Future<?> future = entry.getKey(); 
      Runnable task = entry.getValue(); 
      if (!future.isDone()) { 
       if (future.cancel(false)) { 
        immediateService.submit(task); 
       } 
      } 
     } 
     immediateService.shutdown(); 
    } 

    //... 
} 
+0

Cách tiếp cận khác (được thảo luận bởi Javadoc của ScheduledThreadPoolExecutor) có vẻ là để ghi đè decorateTask(). Tôi tin rằng nó có thể dẫn đến việc thực hiện đơn giản hơn. – Gili

+0

@Gili: Có vẻ như đó là một khả năng. Tôi đề nghị bạn đăng một câu trả lời cho câu hỏi của riêng bạn nếu nó trông có thể thực hiện được. Bạn có đề nghị vẫn ủy thác cho một người thi hành ngay lập tức, hay bạn đang nghĩ bạn có thể trang trí nhiệm vụ theo cách như vậy khi tắt máy, những thay đổi lên lịch của nó? –

+0

Tôi không nghĩ rằng bạn có thể trộn 'decorateTrang()' với các công tố viên ủy nhiệm, nhưng tôi sẽ cho nó một số suy nghĩ. – Gili

Các vấn đề liên quan