2010-02-01 44 views
40

câu hỏi tương tự:Bọc một tính toán không đồng bộ thành một (chặn) tính toán đồng bộ

tôi có một đối tượng với một phương pháp tôi muốn tiếp xúc với khách hàng thư viện (đặc biệt là khách hàng tạo kịch bản) như một cái gì đó như:

interface MyNiceInterface 
{ 
    public Baz doSomethingAndBlock(Foo fooArg, Bar barArg); 
    public Future<Baz> doSomething(Foo fooArg, Bar barArg); 
    // doSomethingAndBlock is the straightforward way; 
    // doSomething has more control but deals with 
    // a Future and that might be too much hassle for 
    // scripting clients 
} 

nhưng "công cụ" nguyên thủy tôi có sẵn là một tập các lớp hướng sự kiện:

interface BazComputationSink 
{ 
    public void onBazResult(Baz result); 
} 

class ImplementingThing 
{ 
    public void doSomethingAsync(Foo fooArg, Bar barArg, BazComputationSink sink); 
} 

nơi ImplementingThing mất đầu vào, thực hiện một số nội dung phức tạp như enqueueing thứ trên một hàng đợi công việc, và sau đó khi một kết quả xảy ra , sink.onBazResult() được gọi trên một chuỗi có thể hoặc có thể không phải là cùng một luồng như ImplementingThing.doSomethingAsync() đã được gọi.

Có cách nào tôi có thể sử dụng các hàm theo hướng sự kiện mà tôi có, cùng với các nguyên thủy đồng thời, để triển khai MyNiceInterface để các trình khách script có thể vui vẻ chờ một luồng chặn không?

chỉnh sửa: tôi có thể sử dụng FutureTask cho việc này không?

Trả lời

41

Sử dụng implemenation tương lai của riêng bạn:

public class BazComputationFuture implements Future<Baz>, BazComputationSink { 

    private volatile Baz result = null; 
    private volatile boolean cancelled = false; 
    private final CountDownLatch countDownLatch; 

    public BazComputationFuture() { 
     countDownLatch = new CountDownLatch(1); 
    } 

    @Override 
    public boolean cancel(final boolean mayInterruptIfRunning) { 
     if (isDone()) { 
      return false; 
     } else { 
      countDownLatch.countDown(); 
      cancelled = true; 
      return !isDone(); 
     } 
    } 

    @Override 
    public Baz get() throws InterruptedException, ExecutionException { 
     countDownLatch.await(); 
     return result; 
    } 

    @Override 
    public Baz get(final long timeout, final TimeUnit unit) 
      throws InterruptedException, ExecutionException, TimeoutException { 
     countDownLatch.await(timeout, unit); 
     return result; 
    } 

    @Override 
    public boolean isCancelled() { 
     return cancelled; 
    } 

    @Override 
    public boolean isDone() { 
     return countDownLatch.getCount() == 0; 
    } 

    public void onBazResult(final Baz result) { 
     this.result = result; 
     countDownLatch.countDown(); 
    } 

} 

public Future<Baz> doSomething(Foo fooArg, Bar barArg) { 
    BazComputationFuture future = new BazComputationFuture(); 
    doSomethingAsync(fooArg, barArg, future); 
    return future; 
} 

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    return doSomething(fooArg, barArg).get(); 
} 

Giải pháp tạo ra một CountDownLatch trong nội bộ được xóa khi nhận được cuộc gọi lại. Nếu người dùng gọi, CountDownLatch được sử dụng để chặn chuỗi gọi cho đến khi quá trình tính toán hoàn thành và gọi lệnh gọi lại onBazResult. CountDownLatch sẽ đảm bảo rằng nếu callback xảy ra trước khi get() được gọi là phương thức get() sẽ trả về ngay lập tức với kết quả.

+0

(+1) giải pháp tốt đẹp – skaffman

+0

+1 bởi vì tôi nghĩ rằng tôi hiểu nó .... nhưng bạn có thể giải thích các khía cạnh đồng thời không? (việc sử dụng chốt đếm ngược) –

+0

bạn cũng đang thiếu "done = true" ở đâu đó trong onBazResult() .... –

14

Vâng, đó là giải pháp đơn giản để làm một cái gì đó như:

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    final AtomicReference<Baz> notifier = new AtomicReference(); 
    doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
    public void onBazResult(Baz result) { 
     synchronized (notifier) { 
     notifier.set(result); 
     notifier.notify(); 
     } 
    } 
    }); 
    synchronized (notifier) { 
    while (notifier.get() == null) 
     notifier.wait(); 
    } 
    return notifier.get(); 
} 

Tất nhiên, điều này giả định rằng Baz kết quả của bạn sẽ không bao giờ được null ...

+0

công trình này và là một mẫu trang nhã đẹp mắt. cảm ơn. –

+0

Gọi lại bên trong (BazComputationSink() {}), nó sẽ khiếu nại - khởi tạo trình thông báo. Và nếu được khởi tạo thành null, tôi nhận được NullPointerException được ném cho lần truy cập đầu tiên, khiến kết quả của tôi chưa sẵn sàng trở lại trong tác vụ không đồng bộ. –

+1

Điều này không được khuyến nghị: nếu cuộc gọi của bạn trả về ngay lập tức, thông báo sẽ được gọi trước khi chờ và bạn nhận được bế tắc (điều này có thể xảy ra nếu bạn có logic bộ nhớ cache) – for3st

11

google guava library có dễ sử dụng SettableFuture khiến cho vấn đề này rất đơn giản (khoảng 10 dòng mã).

public class ImplementingThing { 

public Baz doSomethingAndBlock(Foo fooArg, Bar barArg) { 
    try { 
     return doSomething(fooArg, barArg).get(); 
    } catch (Exception e) { 
     throw new RuntimeException("Oh dear"); 
    } 
}; 

public Future<Baz> doSomething(Foo fooArg, Bar barArg) { 
    final SettableFuture<Baz> future = new SettableFuture<Baz>(); 
    doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
     @Override 
     public void onBazResult(Baz result) { 
      future.set(result); 
     } 
    }); 
    return future; 
}; 

// Everything below here is just mock stuff to make the example work, 
// so you can copy it into your IDE and see it run. 

public static class Baz {} 
public static class Foo {} 
public static class Bar {} 

public static interface BazComputationSink { 
    public void onBazResult(Baz result); 
} 

public void doSomethingAsync(Foo fooArg, Bar barArg, final BazComputationSink sink) { 
    new Thread(new Runnable() { 
     @Override 
     public void run() { 
      try { 
       Thread.sleep(4000); 
      } catch (InterruptedException e) { 
       e.printStackTrace(); 
      } 
      Baz baz = new Baz(); 
      sink.onBazResult(baz); 
     } 
    }).start(); 
}; 

public static void main(String[] args) { 
    System.err.println("Starting Main"); 
    System.err.println((new ImplementingThing()).doSomethingAndBlock(null, null)); 
    System.err.println("Ending Main"); 
} 
3

Một ví dụ rất đơn giản, chỉ để hiểu CountDownLatch mà không cần bất kỳ mã thêm .

A java.util.concurrent.CountDownLatch là một cấu trúc đồng thời cho phép một hoặc nhiều chủ đề chờ một tập hợp hoạt động nhất định hoàn thành.

A CountDownLatch được khởi tạo với một số đã cho. Số lượng này được giảm đi bằng các cuộc gọi đến phương thức countDown(). Các chủ đề chờ số đếm này đạt đến số không có thể gọi một trong các phương thức await(). Gọi số await() sẽ chặn luồng cho đến khi số đếm đạt đến 0.

Dưới đây là một ví dụ đơn giản.Sau khi Decrementer đã gọi countDown() 3 lần trên CountDownLatch, Waiter chờ đợi được phát hành từ cuộc gọi await().

Bạn cũng có thể đề cập đến một số TimeOut để chờ.

CountDownLatch latch = new CountDownLatch(3); 

Waiter  waiter  = new Waiter(latch); 
Decrementer decrementer = new Decrementer(latch); 

new Thread(waiter)  .start(); 
new Thread(decrementer).start(); 

Thread.sleep(4000); 
public class Waiter implements Runnable{ 

    CountDownLatch latch = null; 

    public Waiter(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 
     try { 
      latch.await(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 

     System.out.println("Waiter Released"); 
    } 
} 

// --------------

public class Decrementer implements Runnable { 

    CountDownLatch latch = null; 

    public Decrementer(CountDownLatch latch) { 
     this.latch = latch; 
    } 

    public void run() { 

     try { 
      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 

      Thread.sleep(1000); 
      this.latch.countDown(); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 
} 

Reference

Nếu bạn không muốn sử dụng một CountDownLatch hoặc yêu cầu của bạn là một cái gì đó giống như Facebook giống và không giống như chức năng. Có nghĩa là nếu một phương thức được gọi thì không gọi phương thức khác.

Trong trường hợp đó, bạn có thể khai báo một

private volatile Boolean isInprocessOfLikeOrUnLike = false; 

và sau đó bạn có thể kiểm tra trong phần đầu của lời gọi phương thức của bạn rằng nếu nó là false sau đó gọi phương thức khác trở lại .. tùy thuộc vào thực hiện của bạn.

+0

Boolean dễ bay hơi? Tôi nghĩ rằng bạn muốn một boolean dễ bay hơi (lowecase b), hoặc tốt hơn: một AtomicReference –

2

Dưới đây là một giải pháp chung chung hơn dựa trên Paul Wagland của câu trả lời:

public abstract class AsyncRunnable<T> { 
    protected abstract void run(AtomicReference<T> notifier); 

    protected final void finish(AtomicReference<T> notifier, T result) { 
     synchronized (notifier) { 
      notifier.set(result); 
      notifier.notify(); 
     } 
    } 

    public static <T> T wait(AsyncRunnable<T> runnable) { 
     final AtomicReference<T> notifier = new AtomicReference<>(); 

     // run the asynchronous code 
     runnable.run(notifier); 

     // wait for the asynchronous code to finish 
     synchronized (notifier) { 
      while (notifier.get() == null) { 
       try { 
        notifier.wait(); 
       } catch (InterruptedException ignore) {} 
      } 
     } 

     // return the result of the asynchronous code 
     return notifier.get(); 
    } 
} 

Dưới đây là một ví dụ làm thế nào để sử dụng nó ::

String result = AsyncRunnable.wait(new AsyncRunnable<String>() { 
     @Override 
     public void run(final AtomicReference<String> notifier) { 
      // here goes your async code, e.g.: 
      new Thread(new Runnable() { 
       @Override 
       public void run() { 
        finish(notifier, "This was a asynchronous call!"); 
       } 
      }).start(); 
     } 
    }); 

Một phiên bản tiết hơn về các mã có thể được tìm thấy ở đây: http://pastebin.com/hKHJUBqE

EDIT: Ví dụ liên quan đến câu hỏi sẽ là:

public Baz doSomethingAndBlock(final Foo fooArg, final Bar barArg) { 
    return AsyncRunnable.wait(new AsyncRunnable<Baz>() { 
     @Override 
     protected void run(final AtomicReference<Baz> notifier) { 
      doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
       public void onBazResult(Baz result) { 
        synchronized (notifier) { 
         notifier.set(result); 
         notifier.notify(); 
        } 
       } 
      }); 
     } 
    }); 
} 
+0

Tôi hiểu câu trả lời của bạn nhưng tôi không thể xem làm thế nào để ánh xạ nó đến câu hỏi của tôi hơn sáu năm trước đây. –

+0

Tôi đã cập nhật câu trả lời để bạn có thể ánh xạ câu hỏi cho câu hỏi –

+0

BTW nếu bạn phải thực hiện việc này ở những nơi có thể tôi xem xét sử dụng: https://github.com/ReactiveX/RxJava. ReactiveX là một kết hợp hoàn hảo cho vấn đề của bạn mặc dù nó có một đường cong học tập dốc (ít nhất là đối với tôi). Đây là phần giới thiệu tuyệt vời: https://www.youtube.com/watch?v=_t06LRX0DV0. –

2

Đây là chết đơn giản với RxJava 2.x:

try { 
    Baz baz = Single.create((SingleEmitter<Baz> emitter) -> 
      doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) 
      .toFuture().get(); 
} catch (InterruptedException e) { 
    e.printStackTrace(); 
} catch (ExecutionException e) { 
    e.printStackTrace(); 
} 

Hoặc mà không ký hiệu Lambda:

Baz baz = Single.create(new SingleOnSubscribe<Baz>() { 
       @Override 
       public void subscribe(SingleEmitter<Baz> emitter) { 
        doSomethingAsync(fooArg, barArg, new BazComputationSink() { 
         @Override 
         public void onBazResult(Baz result) { 
          emitter.onSuccess(result); 
         } 
        }); 
       } 
      }).toFuture().get(); 

Thậm chí đơn giản hơn:

Baz baz = Single.create((SingleEmitter<Baz> emitter) -> 
       doSomethingAsync(fooArg, barArg, result -> emitter.onSuccess(result))) 
       .blockingGet(); 
Các vấn đề liên quan