2016-04-12 40 views
5

Tôi đang đọc trang này về coroutines in Python và điều này Wikipedia page. Tôi thấy rằng có một vài libraries in Java thực hiện coroutines.Coroutines trong Java

Câu hỏi của tôi là: có lý do nào được biết tại sao các nhà thiết kế Java đã quyết định không triển khai coroutines cho đến nay và có kế hoạch đưa nó vào một phiên bản Java tương lai không?

Cảm ơn.

+0

Những gì bạn có thể làm với coroutines mà bạn không thể làm với 'Thread's trong java ? –

+0

Nếu có thư viện triển khai coroutines, các nhà thiết kế ngôn ngữ cần làm gì? –

+0

Như tôi đã hiểu từ liên kết đầu tiên, nó tiêu thụ tài nguyên ít hơn 'chuỗi'. Câu hỏi đặt ra là: tại sao nó không được bao gồm trong ngôn ngữ Java chuẩn. – joel314

Trả lời

6

Thực ra khái niệm về một đồng thường là thiết kế đầu tiên của hệ thống luồng Java. Cơ chế wait/notify là một hình thức đơn giản của đồng thói quen nơi notify tương đương với yield, vv

Kể từ đó nhiều đã được thực hiện, đặc biệt là để làm cho cấu trúc thread-safe chứ không phải là thuật toán. Điều này xuất phát từ việc nhận ra rằng nó không phải là mã phải đồng bộ hóa/năng suất nhưng cấu trúc dữ liệu được sử dụng để giao tiếp giữa các chủ đề phải an toàn chỉ.

+0

Những gì bạn gọi là "thực hiện" thực sự là một quyết định. Có nhiều cách tiếp cận cho vấn đề này, bạn không nên trình bày một Java được giải quyết như một cách duy nhất có thể. – back2dos

+0

@ back2dos - vui lòng thêm vào câu trả lời của tôi. Tôi quan tâm đến các tùy chọn thay thế. – OldCurmudgeon

3

Trên "là có bất kỳ kế hoạch ..." phần của câu hỏi, câu trả lời là:

Không ở giai đoạn này

Danh sách JEP (http://openjdk.java.net/jeps/0) không thực hiện bất kỳ đề cập đến coroutines. Danh sách này bao gồm các tính năng được thêm vào trong Java 8, được thêm vào hoặc được nhắm mục tiêu cho Java 9 hoặc được đề xuất cho các bản phát hành trong tương lai.

Điều thú vị là đã có một RFE được gửi vào tháng 3 năm 2013 (https://bugs.openjdk.java.net/browse/JDK-8029988). RFE chỉ có một phiếu bầu, và nó đã bị đóng cửa 9 tháng với đề xuất gửi một JEP. Không ai làm phiền để đưa ra ý tưởng thêm nữa, điều mà tôi đang nói.

+1

"Không ai làm phiền để đưa ra ý tưởng thêm nữa, mà tôi đang nói." - Chỉ tò mò thôi; nói về cái gì? Không ai trong cộng đồng Java muốn coroutines? Oracle không quan tâm đến việc triển khai chúng? Hoặc một cái gì đó khác – Abdul

+1

Nó nói với tôi rằng không ai trong tầm vóc nào trong thế giới Java nghĩ rằng coroutines là cần thiết. Bởi vì, nếu ai đó nghĩ rằng họ là cần thiết, ai đó sẽ khởi xướng một JEP. Tuy nhiên, rõ ràng là không tính đến một "bằng chứng". –

0

Có một lựa chọn khác là vào đây để Java6 +

Một coroutine pythonic thực hiện:

import java.lang.ref.WeakReference; 
import java.util.ArrayList; 
import java.util.List; 
import java.util.concurrent.*; 
import java.util.concurrent.atomic.AtomicBoolean; 
import java.util.concurrent.atomic.AtomicReference; 

class CorRunRAII { 
    private final List<WeakReference<? extends CorRun>> resources = new ArrayList<>(); 

    public CorRunRAII add(CorRun resource) { 
     if (resource == null) { 
      return this; 
     } 
     resources.add(new WeakReference<>(resource)); 

     return this; 
    } 

    public CorRunRAII addAll(List<? extends CorRun> arrayList) { 
     if (arrayList == null) { 
      return this; 
     } 
     for (CorRun corRun : arrayList) { 
      add(corRun); 
     } 

     return this; 
    } 

    @Override 
    protected void finalize() throws Throwable { 
     super.finalize(); 

     for (WeakReference<? extends CorRun> corRunWeakReference : resources) { 
      CorRun corRun = corRunWeakReference.get(); 
      if (corRun != null) { 
       corRun.stop(); 
      } 
     } 
    } 
} 

class CorRunYieldReturn<ReceiveType, YieldReturnType> { 
    public final AtomicReference<ReceiveType> receiveValue; 
    public final LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue; 

    CorRunYieldReturn(AtomicReference<ReceiveType> receiveValue, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { 
     this.receiveValue = receiveValue; 
     this.yieldReturnValue = yieldReturnValue; 
    } 
} 

interface CorRun<ReceiveType, YieldReturnType> extends Runnable, Callable<YieldReturnType> { 
    boolean start(); 
    void stop(); 
    void stop(final Throwable throwable); 
    boolean isStarted(); 
    boolean isEnded(); 
    Throwable getError(); 

    ReceiveType getReceiveValue(); 
    void setResultForOuter(YieldReturnType resultForOuter); 
    YieldReturnType getResultForOuter(); 

    YieldReturnType receive(ReceiveType value); 
    ReceiveType yield(); 
    ReceiveType yield(YieldReturnType value); 
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another); 
    <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value); 
} 

abstract class CorRunSync<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { 

    private ReceiveType receiveValue; 
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList = new ArrayList<>(); 

    // Outside 

    private AtomicBoolean isStarted = new AtomicBoolean(false); 
    private AtomicBoolean isEnded = new AtomicBoolean(false); 
    private Throwable error; 

    private YieldReturnType resultForOuter; 

    @Override 
    public boolean start() { 

     boolean isStarted = this.isStarted.getAndSet(true); 
     if ((! isStarted) 
       && (! isEnded())) { 
      receive(null); 
     } 

     return isStarted; 
    } 

    @Override 
    public void stop() { 
     stop(null); 
    } 

    @Override 
    public void stop(Throwable throwable) { 
     isEnded.set(true); 
     if (throwable != null) { 
      error = throwable; 
     } 

     for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { 
      CorRun child = weakReference.get(); 
      if (child != null) { 
       child.stop(); 
      } 
     } 
    } 

    @Override 
    public boolean isStarted() { 
     return isStarted.get(); 
    } 

    @Override 
    public boolean isEnded() { 
     return isEnded.get(); 
    } 

    @Override 
    public Throwable getError() { 
     return error; 
    } 

    @Override 
    public ReceiveType getReceiveValue() { 
     return receiveValue; 
    } 

    @Override 
    public void setResultForOuter(YieldReturnType resultForOuter) { 
     this.resultForOuter = resultForOuter; 
    } 

    @Override 
    public YieldReturnType getResultForOuter() { 
     return resultForOuter; 
    } 

    @Override 
    public synchronized YieldReturnType receive(ReceiveType value) { 
     receiveValue = value; 

     run(); 

     return getResultForOuter(); 
    } 

    @Override 
    public ReceiveType yield() { 
     return yield(null); 
    } 

    @Override 
    public ReceiveType yield(YieldReturnType value) { 
     resultForOuter = value; 
     return receiveValue; 
    } 

    @Override 
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another) { 
     return yieldFrom(another, null); 
    } 

    @Override 
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(CorRun<TargetReceiveType, TargetYieldReturnType> another, TargetReceiveType value) { 
     if (another == null || another.isEnded()) { 
      throw new RuntimeException("Call null or isEnded coroutine"); 
     } 

     potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); 

     synchronized (another) { 
      boolean isStarted = another.start(); 
      boolean isJustStarting = ! isStarted; 
      if (isJustStarting && another instanceof CorRunSync) { 
       return another.getResultForOuter(); 
      } 

      return another.receive(value); 
     } 
    } 

    @Override 
    public void run() { 
     try { 
      this.call(); 
     } 
     catch (Exception e) { 
      e.printStackTrace(); 

      stop(e); 
      return; 
     } 
    } 
} 

abstract class CorRunThread<ReceiveType, YieldReturnType> implements CorRun<ReceiveType, YieldReturnType> { 

    private final ExecutorService childExecutorService = newExecutorService(); 
    private ExecutorService executingOnExecutorService; 

    private static final CorRunYieldReturn DUMMY_COR_RUN_YIELD_RETURN = new CorRunYieldReturn(new AtomicReference<>(null), new LinkedBlockingDeque<AtomicReference>()); 

    private final CorRun<ReceiveType, YieldReturnType> self; 
    public final List<WeakReference<CorRun>> potentialChildrenCoroutineList; 
    private CorRunYieldReturn<ReceiveType, YieldReturnType> lastCorRunYieldReturn; 

    private final LinkedBlockingDeque<CorRunYieldReturn<ReceiveType, YieldReturnType>> receiveQueue; 

    // Outside 

    private AtomicBoolean isStarted = new AtomicBoolean(false); 
    private AtomicBoolean isEnded = new AtomicBoolean(false); 
    private Future<YieldReturnType> future; 
    private Throwable error; 

    private final AtomicReference<YieldReturnType> resultForOuter = new AtomicReference<>(); 

    CorRunThread() { 
     executingOnExecutorService = childExecutorService; 

     receiveQueue = new LinkedBlockingDeque<>(); 
     potentialChildrenCoroutineList = new ArrayList<>(); 

     self = this; 
    } 

    @Override 
    public void run() { 
     try { 
      self.call(); 
     } 
     catch (Exception e) { 
      stop(e); 
      return; 
     } 

     stop(); 
    } 

    @Override 
    public abstract YieldReturnType call(); 

    @Override 
    public boolean start() { 
     return start(childExecutorService); 
    } 

    protected boolean start(ExecutorService executorService) { 
     boolean isStarted = this.isStarted.getAndSet(true); 
     if (!isStarted) { 
      executingOnExecutorService = executorService; 
      future = (Future<YieldReturnType>) executingOnExecutorService.submit((Runnable) self); 
     } 
     return isStarted; 
    } 

    @Override 
    public void stop() { 
     stop(null); 
    } 

    @Override 
    public void stop(final Throwable throwable) { 
     if (throwable != null) { 
      error = throwable; 
     } 
     isEnded.set(true); 

     returnYieldValue(null); 
     // Do this for making sure the coroutine has checked isEnd() after getting a dummy value 
     receiveQueue.offer(DUMMY_COR_RUN_YIELD_RETURN); 

     for (WeakReference<CorRun> weakReference : potentialChildrenCoroutineList) { 
      CorRun child = weakReference.get(); 
      if (child != null) { 
       if (child instanceof CorRunThread) { 
        ((CorRunThread)child).tryStop(childExecutorService); 
       } 
      } 
     } 

     childExecutorService.shutdownNow(); 
    } 

    protected void tryStop(ExecutorService executorService) { 
     if (this.executingOnExecutorService == executorService) { 
      stop(); 
     } 
    } 

    @Override 
    public boolean isEnded() { 
     return isEnded.get() || (
       future != null && (future.isCancelled() || future.isDone()) 
       ); 
    } 

    @Override 
    public boolean isStarted() { 
     return isStarted.get(); 
    } 

    public Future<YieldReturnType> getFuture() { 
     return future; 
    } 

    @Override 
    public Throwable getError() { 
     return error; 
    } 

    @Override 
    public void setResultForOuter(YieldReturnType resultForOuter) { 
     this.resultForOuter.set(resultForOuter); 
    } 

    @Override 
    public YieldReturnType getResultForOuter() { 
     return this.resultForOuter.get(); 
    } 

    @Override 
    public YieldReturnType receive(ReceiveType value) { 

     LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue = new LinkedBlockingDeque<>(); 

     offerReceiveValue(value, yieldReturnValue); 

     try { 
      AtomicReference<YieldReturnType> takeValue = yieldReturnValue.take(); 
      return takeValue == null ? null : takeValue.get(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     return null; 
    } 

    @Override 
    public ReceiveType yield() { 
     return yield(null); 
    } 

    @Override 
    public ReceiveType yield(final YieldReturnType value) { 
     returnYieldValue(value); 

     return getReceiveValue(); 
    } 

    @Override 
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another) { 
     return yieldFrom(another, null); 
    } 

    @Override 
    public <TargetReceiveType, TargetYieldReturnType> TargetYieldReturnType yieldFrom(final CorRun<TargetReceiveType, TargetYieldReturnType> another, final TargetReceiveType value) { 
     if (another == null || another.isEnded()) { 
      throw new RuntimeException("Call null or isEnded coroutine"); 
     } 

     boolean isStarted = false; 
     potentialChildrenCoroutineList.add(new WeakReference<CorRun>(another)); 

     synchronized (another) { 
      if (another instanceof CorRunThread) { 
       isStarted = ((CorRunThread)another).start(childExecutorService); 
      } 
      else { 
       isStarted = another.start(); 
      } 

      boolean isJustStarting = ! isStarted; 
      if (isJustStarting && another instanceof CorRunSync) { 
       return another.getResultForOuter(); 
      } 

      TargetYieldReturnType send = another.receive(value); 
      return send; 
     } 
    } 

    @Override 
    public ReceiveType getReceiveValue() { 

     setLastCorRunYieldReturn(takeLastCorRunYieldReturn()); 

     return lastCorRunYieldReturn.receiveValue.get(); 
    } 

    protected void returnYieldValue(final YieldReturnType value) { 
     CorRunYieldReturn<ReceiveType, YieldReturnType> corRunYieldReturn = lastCorRunYieldReturn; 
     if (corRunYieldReturn != null) { 
      corRunYieldReturn.yieldReturnValue.offer(new AtomicReference<>(value)); 
     } 
    } 

    protected void offerReceiveValue(final ReceiveType value, LinkedBlockingDeque<AtomicReference<YieldReturnType>> yieldReturnValue) { 
     receiveQueue.offer(new CorRunYieldReturn(new AtomicReference<>(value), yieldReturnValue)); 
    } 

    protected CorRunYieldReturn<ReceiveType, YieldReturnType> takeLastCorRunYieldReturn() { 
     try { 
      return receiveQueue.take(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     return null; 
    } 

    protected void setLastCorRunYieldReturn(CorRunYieldReturn<ReceiveType,YieldReturnType> lastCorRunYieldReturn) { 
     this.lastCorRunYieldReturn = lastCorRunYieldReturn; 
    } 

    protected ExecutorService newExecutorService() { 
     return Executors.newCachedThreadPool(getThreadFactory()); 
    } 

    protected ThreadFactory getThreadFactory() { 
     return new ThreadFactory() { 
      @Override 
      public Thread newThread(final Runnable runnable) { 
       Thread thread = new Thread(runnable); 
       thread.setUncaughtExceptionHandler(new Thread.UncaughtExceptionHandler() { 
        @Override 
        public void uncaughtException(Thread thread, Throwable throwable) { 
         throwable.printStackTrace(); 
         if (runnable instanceof CorRun) { 
          CorRun self = (CorRun) runnable; 
          self.stop(throwable); 
          thread.interrupt(); 
         } 
        } 
       }); 
       return thread; 
      } 
     }; 
    } 
} 

Bây giờ bạn có thể sử dụng coroutines pythonic theo cách này (ví dụ như số fibonacci)

Chủ đề Version:

class Fib extends CorRunThread<Integer, Integer> { 

    @Override 
    public Integer call() { 
     Integer times = getReceiveValue(); 
     do { 
      int a = 1, b = 1; 
      for (int i = 0; times != null && i < times; i++) { 
       int temp = a + b; 
       a = b; 
       b = temp; 
      } 
      // A pythonic "yield", i.e., it returns `a` to the caller and waits `times` value from the next caller 
      times = yield(a); 
     } while (! isEnded()); 

     setResultForOuter(Integer.MAX_VALUE); 
     return getResultForOuter(); 
    } 
} 

class MainRun extends CorRunThread<String, String> { 

    @Override 
    public String call() { 

     // The fib coroutine would be recycled by its parent 
     // (no requirement to call its start() and stop() manually) 
     // Otherwise, if you want to share its instance and start/stop it manually, 
     // please start it before being called by yieldFrom() and stop it in the end. 
     Fib fib = new Fib(); 
     String result = ""; 
     Integer current; 
     int times = 10; 
     for (int i = 0; i < times; i++) { 

      // A pythonic "yield from", i.e., it calls fib with `i` parameter and waits for returned value as `current` 
      current = yieldFrom(fib, i); 

      if (fib.getError() != null) { 
       throw new RuntimeException(fib.getError()); 
      } 

      if (current == null) { 
       continue; 
      } 

      if (i > 0) { 
       result += ","; 
      } 
      result += current; 

     } 

     setResultForOuter(result); 

     return result; 
    } 
} 

Sync (không đề) phiên bản:

class Fib extends CorRunSync<Integer, Integer> { 

    @Override 
    public Integer call() { 
     Integer times = getReceiveValue(); 

     int a = 1, b = 1; 
     for (int i = 0; times != null && i < times; i++) { 
      int temp = a + b; 
      a = b; 
      b = temp; 
     } 
     yield(a); 

     return getResultForOuter(); 
    } 
} 

class MainRun extends CorRunSync<String, String> { 

    @Override 
    public String call() { 

     CorRun<Integer, Integer> fib = null; 
     try { 
      fib = new Fib(); 
     } catch (Exception e) { 
      e.printStackTrace(); 
     } 

     String result = ""; 
     Integer current; 
     int times = 10; 
     for (int i = 0; i < times; i++) { 

      current = yieldFrom(fib, i); 

      if (fib.getError() != null) { 
       throw new RuntimeException(fib.getError()); 
      } 

      if (current == null) { 
       continue; 
      } 

      if (i > 0) { 
       result += ","; 
      } 
      result += current; 
     } 

     stop(); 
     setResultForOuter(result); 

     if (Utils.isEmpty(result)) { 
      throw new RuntimeException("Error"); 
     } 

     return result; 
    } 
} 

Execution (Cả hai phiên bản sẽ làm việc):

// Run the entry coroutine 
MainRun mainRun = new MainRun(); 
mainRun.start(); 

// Wait for mainRun ending for 5 seconds 
long startTimestamp = System.currentTimeMillis(); 
while(!mainRun.isEnded()) { 
    if (System.currentTimeMillis() - startTimestamp > TimeUnit.SECONDS.toMillis(5)) { 
     throw new RuntimeException("Wait too much time"); 
    } 
} 
// The result should be "1,1,2,3,5,8,13,21,34,55" 
System.out.println(mainRun.getResultForOuter()); 
Các vấn đề liên quan