Mục đích là xử lý luồng liên tục của các phần tử với sự trợ giúp của luồng Java 8. Do đó, các phần tử được thêm vào nguồn dữ liệu của luồng song song trong khi xử lý luồng đó.Thêm các phần tử vào Java 8 Dòng song song một cách nhanh chóng
Các Javadoc of Streams mô tả các thuộc tính sau trong phần "Không can thiệp":
Đối với hầu hết các nguồn dữ liệu, ngăn chặn sự can thiệp có nghĩa là đảm bảo rằng các nguồn dữ liệu không được sửa đổi ở tất cả trong việc thực hiện các đường ống dẫn dòng. Ngoại lệ đáng chú ý cho đây là các luồng có nguồn là các bộ sưu tập đồng thời, được thiết kế đặc biệt để xử lý sửa đổi đồng thời. Các nguồn luồng đồng thời là những nguồn có Trình tách nghĩa báo cáo đặc tính CONCURRENT.
Đó là lý do một ConcurrentLinkedQueue được sử dụng trong nỗ lực của chúng tôi, mà trả về true cho
new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT)
Nó không phải là rõ ràng nói rằng nguồn dữ liệu phải không được sửa đổi khi được sử dụng trong dòng song song.
Trong ví dụ của chúng tôi cho mỗi phần tử trong luồng, giá trị bộ đếm gia tăng được thêm vào hàng đợi, là nguồn dữ liệu của luồng, cho đến khi bộ đếm lớn hơn N. Với hàng đợi gọi.stream() tất cả mọi thứ hoạt động tốt trong khi thực hiện tuần tự:
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
assertEquals(N, testSequential(N));
}
public static int testSequential(int N) {
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N/10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.stream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
}
});
stream.close();
return check.get();
}
}
Như một nỗ lực thứ hai dòng là song song và ném một java.lang.AssertionError vì kiểm tra nhỏ hơn N và không phải mọi phần tử trong hàng đợi được xử lý. Luồng có thể đã thực hiện xong sớm vì hàng đợi có thể bị trống vào một thời điểm nào đó.
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
assertEquals(N, testParallel1(N));
}
public static int testParallel1(int N) {
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N/10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.parallelStream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
}
});
stream.close();
return check.get();
}
}
Cố gắng tiếp theo là báo hiệu luồng chính, khi luồng liên tục ‘thực sự’ kết thúc (hàng đợi trống) và đóng đối tượng luồng sau đó. Ở đây, vấn đề là đối tượng luồng xuất hiện để đọc các phần tử từ hàng đợi chỉ một lần hoặc ít nhất là không liên tục và không bao giờ đạt đến kết thúc ‘thực’ của luồng.
import static org.junit.Assert.assertEquals;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.locks.Condition;
import java.util.concurrent.locks.Lock;
import java.util.concurrent.locks.ReentrantLock;
import java.util.stream.Stream;
public class StreamTest {
public static void main(String[] args) {
final int N = 10000;
try {
assertEquals(N, testParallel2(N));
} catch (InterruptedException e) {
e.printStackTrace();
}
}
public static int testParallel2(int N) throws InterruptedException {
final Lock lock = new ReentrantLock();
final Condition cond = lock.newCondition();
final AtomicInteger counter = new AtomicInteger(0);
final AtomicInteger check = new AtomicInteger(0);
final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>();
for (int i = 0; i < N/10; ++i) {
queue.add(counter.incrementAndGet());
}
Stream<Integer> stream = queue.parallelStream();
stream.forEach(i -> {
System.out.println(i);
int j = counter.incrementAndGet();
lock.lock();
check.incrementAndGet();
if (j <= N) {
queue.add(j);
} else {
cond.signal();
}
lock.unlock();
});
lock.lock();
while (check.get() < N) {
cond.await();
}
lock.unlock();
stream.close();
return check.get();
}
}
Các câu hỏi phát sinh từ đó là:
- đã chúng tôi làm điều gì sai?
- Việc sử dụng API luồng không xác định hoặc thậm chí không chính xác?
- Làm cách nào để đạt được hành vi mong muốn?
Tôi không chắc chắn liệu tôi có đồng ý với phần "không có luồng liên tục" hay không. Điều gì về các công cụ như thế này: http://stackoverflow.com/questions/22483554/how-to-create-an-infinite-stream-with-java-8 –
@DominikSandjaja điểm công bằng. –