8

Mục đích là xử lý luồng liên tục của các phần tử với sự trợ giúp của luồng Java 8. Do đó, các phần tử được thêm vào nguồn dữ liệu của luồng song song trong khi xử lý luồng đó.Thêm các phần tử vào Java 8 Dòng song song một cách nhanh chóng

Các Javadoc of Streams mô tả các thuộc tính sau trong phần "Không can thiệp":

Đối với hầu hết các nguồn dữ liệu, ngăn chặn sự can thiệp có nghĩa là đảm bảo rằng các nguồn dữ liệu không được sửa đổi ở tất cả trong việc thực hiện các đường ống dẫn dòng. Ngoại lệ đáng chú ý cho đây là các luồng có nguồn là các bộ sưu tập đồng thời, được thiết kế đặc biệt để xử lý sửa đổi đồng thời. Các nguồn luồng đồng thời là những nguồn có Trình tách nghĩa báo cáo đặc tính CONCURRENT.

Đó là lý do một ConcurrentLinkedQueue được sử dụng trong nỗ lực của chúng tôi, mà trả về true cho

new ConcurrentLinkedQueue<Integer>().spliterator().hasCharacteristics(Spliterator.CONCURRENT) 

Nó không phải là rõ ràng nói rằng nguồn dữ liệu phải không được sửa đổi khi được sử dụng trong dòng song song.

Trong ví dụ của chúng tôi cho mỗi phần tử trong luồng, giá trị bộ đếm gia tăng được thêm vào hàng đợi, là nguồn dữ liệu của luồng, cho đến khi bộ đếm lớn hơn N. Với hàng đợi gọi.stream() tất cả mọi thứ hoạt động tốt trong khi thực hiện tuần tự:

import static org.junit.Assert.assertEquals; 
import java.util.Queue; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.stream.Stream; 

public class StreamTest { 
    public static void main(String[] args) { 
     final int N = 10000; 
     assertEquals(N, testSequential(N)); 
    } 

    public static int testSequential(int N) { 
     final AtomicInteger counter = new AtomicInteger(0); 
     final AtomicInteger check = new AtomicInteger(0); 
     final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); 

     for (int i = 0; i < N/10; ++i) { 
      queue.add(counter.incrementAndGet()); 
     } 

     Stream<Integer> stream = queue.stream(); 
     stream.forEach(i -> { 
      System.out.println(i); 

      int j = counter.incrementAndGet(); 

      check.incrementAndGet(); 
      if (j <= N) { 
       queue.add(j); 
      } 
     }); 
     stream.close(); 
     return check.get(); 
    } 
} 

Như một nỗ lực thứ hai dòng là song song và ném một java.lang.AssertionError vì kiểm tra nhỏ hơn N và không phải mọi phần tử trong hàng đợi được xử lý. Luồng có thể đã thực hiện xong sớm vì hàng đợi có thể bị trống vào một thời điểm nào đó.

import static org.junit.Assert.assertEquals; 
import java.util.Queue; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.stream.Stream; 

public class StreamTest { 
    public static void main(String[] args) { 
     final int N = 10000; 
     assertEquals(N, testParallel1(N)); 
    } 

    public static int testParallel1(int N) { 
     final AtomicInteger counter = new AtomicInteger(0); 
     final AtomicInteger check = new AtomicInteger(0); 
     final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); 

     for (int i = 0; i < N/10; ++i) { 
      queue.add(counter.incrementAndGet()); 
     } 

     Stream<Integer> stream = queue.parallelStream(); 
     stream.forEach(i -> { 
      System.out.println(i); 

      int j = counter.incrementAndGet(); 

      check.incrementAndGet(); 
      if (j <= N) { 
       queue.add(j); 
      } 
     }); 
     stream.close(); 
     return check.get(); 
    } 
} 

Cố gắng tiếp theo là báo hiệu luồng chính, khi luồng liên tục ‘thực sự’ kết thúc (hàng đợi trống) và đóng đối tượng luồng sau đó. Ở đây, vấn đề là đối tượng luồng xuất hiện để đọc các phần tử từ hàng đợi chỉ một lần hoặc ít nhất là không liên tục và không bao giờ đạt đến kết thúc ‘thực’ của luồng.

import static org.junit.Assert.assertEquals; 
import java.util.Queue; 
import java.util.concurrent.ConcurrentLinkedQueue; 
import java.util.concurrent.atomic.AtomicInteger; 
import java.util.concurrent.locks.Condition; 
import java.util.concurrent.locks.Lock; 
import java.util.concurrent.locks.ReentrantLock; 
import java.util.stream.Stream; 

public class StreamTest { 

    public static void main(String[] args) { 
     final int N = 10000; 
     try { 
      assertEquals(N, testParallel2(N)); 
     } catch (InterruptedException e) { 
      e.printStackTrace(); 
     } 
    } 

    public static int testParallel2(int N) throws InterruptedException { 
     final Lock lock = new ReentrantLock(); 
     final Condition cond = lock.newCondition(); 

     final AtomicInteger counter = new AtomicInteger(0); 
     final AtomicInteger check = new AtomicInteger(0); 
     final Queue<Integer> queue = new ConcurrentLinkedQueue<Integer>(); 

     for (int i = 0; i < N/10; ++i) { 
      queue.add(counter.incrementAndGet()); 
     } 

     Stream<Integer> stream = queue.parallelStream(); 
     stream.forEach(i -> { 
      System.out.println(i); 

      int j = counter.incrementAndGet(); 

      lock.lock(); 
      check.incrementAndGet(); 
      if (j <= N) { 
       queue.add(j); 
      } else { 
       cond.signal(); 
      } 
      lock.unlock(); 
     }); 

     lock.lock(); 
     while (check.get() < N) { 
      cond.await(); 
     } 
     lock.unlock(); 
     stream.close(); 
     return check.get(); 
    } 
} 

Các câu hỏi phát sinh từ đó là:

  • đã chúng tôi làm điều gì sai?
  • Việc sử dụng API luồng không xác định hoặc thậm chí không chính xác?
  • Làm cách nào để đạt được hành vi mong muốn?

Trả lời

0

Luồng có thể được tạo liên tục hoặc từ bộ sưu tập được sửa đổi hoặc cũng không được thiết kế để chạy liên tục. Nó được thiết kế để xử lý các phần tử có sẵn khi luồng được bắt đầu và trả lại khi chúng đã được xử lý. Ngay sau khi nó kết thúc, nó dừng lại.

Làm sao chúng ta có thể đạt được hành vi mong muốn nếu không?

Bạn cần sử dụng một cách tiếp cận khác. Tôi sẽ sử dụng một ExecutorService nơi bạn vượt qua nhiệm vụ gửi mà bạn muốn thực hiện.

Cách khác là sử dụng luồng liên tục chặn khi không có kết quả. Lưu ý: điều này sẽ khóa phổ biến ForkJoinPool được sử dụng bởi luồng song song và không có mã nào khác có thể sử dụng nó.

+0

Tôi không chắc chắn liệu tôi có đồng ý với phần "không có luồng liên tục" hay không. Điều gì về các công cụ như thế này: http://stackoverflow.com/questions/22483554/how-to-create-an-infinite-stream-with-java-8 –

+0

@DominikSandjaja điểm công bằng. –

2

Có sự khác biệt đáng kể giữa việc “sửa đổi nguồn của Stream không phá vỡ” và giả định của bạn “sửa đổi sẽ được phản ánh bởi hoạt động Stream đang diễn ra”.

Thuộc tính CONCURRENT ngụ ý rằng việc sửa đổi các nguồn là phép, ví dụ rằng nó sẽ không bao giờ ném một ConcurrentModificationException, nhưng nó không có nghĩa là bạn có thể dựa vào một hành vi cụ thể về việc liệu những thay đổi này được phản ánh hay không.

Các documentation of the CONCURRENT flag bản thân nói:

Hầu hết các bộ sưu tập đồng thời duy trì một chính sách nhất quán đảm bảo độ chính xác liên quan đến các yếu tố có mặt tại điểm Spliterator xây dựng với, nhưng có thể không phản ánh bổ sung hoặc gỡ bỏ sau đó.

hành vi

Suối này là phù hợp với hành vi đã được biết đến của ConcurrentLinkedQueue:

vòng lặp là yếu phù hợp, trở về yếu tố phản ánh tình trạng của hàng đợi tại một số điểm tại hoặc kể từ sự ra đời của trình lặp. Họ làm không phải ném ConcurrentModificationException và có thể tiến hành đồng thời với các hoạt động khác. Các phần tử chứa trong hàng đợi kể từ khi tạo trình lặp sẽ được trả về chính xác một lần.

Thật khó để nói như thế nào để “đạt được các hành vi mong muốn bằng cách khác”, như khi bạn không mô tả các “hành vi mong muốn” trong bất kỳ hình thức nào khác hơn so với mã, có thể được chỉ đơn giản là thay thế bằng

public static int testSequential(int N) { 
    return N; 
} 
public static int testParallel1(int N) { 
    return N; 
} 

vì đó là hiệu ứng quan sát duy nhất… Hãy xem xét redefining your problem

Các vấn đề liên quan