2014-09-03 28 views
5

Tôi có một số tệp văn bản cỡ lớn mà tôi muốn xử lý bằng cách nhóm các dòng của nó.Có thể làm một nhóm người lười biếng, trả lại luồng, trong java 8 không?

Tôi cố gắng để sử dụng các tính năng trực tuyến mới, như

return FileUtils.readLines(...) 
      .parallelStream() 
      .map(...) 
      .collect(groupingBy(pair -> pair[0])); 

Vấn đề là, AFAIK, điều này tạo ra một bản đồ.

Có cách nào để có mã mức cao như mã ở trên tạo ra, ví dụ: Luồng các mục nhập không?

CẬP NHẬT: Những gì tôi đang tìm kiếm là một cái gì đó giống như python của itertools.groupby. Các tệp của tôi đã được sắp xếp (theo cặp [0]), tôi chỉ muốn tải từng nhóm một.

Tôi đã có giải pháp lặp lại. Tôi chỉ tự hỏi nếu có một cách khai báo hơn để làm điều đó. Btw, sử dụng ổi hoặc thư viện của bên thứ ba khác sẽ không phải là một vấn đề lớn.

+2

thế nào có thể bạn lười biếng nhóm theo? Để nhóm theo một số thuộc tính của đối tượng có trong Luồng, bạn phải lặp qua tất cả các phần tử trong Luồng. – Eran

+0

Bạn có ý gì khi "nhóm các dòng của nó?" bạn có nghĩa là binning như phương pháp Stream 'groupBy' hoặc bạn có nghĩa là đọc nhiều dòng tại một thời điểm với số lượng lớn? – dkatzel

+0

Cảm ơn nhận xét, đã thêm CẬP NHẬT vào câu hỏi. –

Trả lời

3

Tác vụ bạn muốn đạt được hoàn toàn khác với những gì nhóm làm. groupingBy không phụ thuộc vào thứ tự của các thành phần của Stream nhưng trên thuật toán của Map được áp dụng cho kết quả phân loại Function.

Điều bạn muốn là xếp các mục liền kề có giá trị thuộc tính chung vào một mục List. Thậm chí không cần thiết phải có số Stream được sắp xếp theo thuộc tính đó miễn là bạn có thể bảo đảm rằng tất cả các mục có cùng giá trị thuộc tính được nhóm lại.

Có thể bạn có thể xây dựng nhiệm vụ này như một sự giảm nhưng đối với tôi cấu trúc kết quả trông quá phức tạp.

Vì vậy, trừ khi hỗ trợ trực tiếp cho tính năng này được thêm vào Stream s, một cách tiếp cận lặp dựa trông thực dụng nhất đối với tôi:

class Folding<T,G> implements Spliterator<Map.Entry<G,List<T>>> { 
    static <T,G> Stream<Map.Entry<G,List<T>>> foldBy(
      Stream<? extends T> s, Function<? super T, ? extends G> f) { 
     return StreamSupport.stream(new Folding<>(s.spliterator(), f), false); 
    } 
    private final Spliterator<? extends T> source; 
    private final Function<? super T, ? extends G> pf; 
    private final Consumer<T> c=this::addItem; 
    private List<T> pending, result; 
    private G pendingGroup, resultGroup; 

    Folding(Spliterator<? extends T> s, Function<? super T, ? extends G> f) { 
     source=s; 
     pf=f; 
    } 
    private void addItem(T item) { 
     G group=pf.apply(item); 
     if(pending==null) pending=new ArrayList<>(); 
     else if(!pending.isEmpty()) { 
      if(!Objects.equals(group, pendingGroup)) { 
       if(pending.size()==1) 
        result=Collections.singletonList(pending.remove(0)); 
       else { 
        result=pending; 
        pending=new ArrayList<>(); 
       } 
       resultGroup=pendingGroup; 
      } 
     } 
     pendingGroup=group; 
     pending.add(item); 
    } 
    public boolean tryAdvance(Consumer<? super Map.Entry<G, List<T>>> action) { 
     while(source.tryAdvance(c)) { 
      if(result!=null) { 
       action.accept(entry(resultGroup, result)); 
       result=null; 
       return true; 
      } 
     } 
     if(pending!=null) { 
      action.accept(entry(pendingGroup, pending)); 
      pending=null; 
      return true; 
     } 
     return false; 
    } 
    private Map.Entry<G,List<T>> entry(G g, List<T> l) { 
     return new AbstractMap.SimpleImmutableEntry<>(g, l); 
    } 
    public int characteristics() { return 0; } 
    public long estimateSize() { return Long.MAX_VALUE; } 
    public Spliterator<Map.Entry<G, List<T>>> trySplit() { return null; } 
} 

Bản chất lười biếng của kết quả gấp Stream thể được chứng minh tốt nhất bằng cách áp dụng nó đến một dòng vô hạn:

Folding.foldBy(Stream.iterate(0, i->i+1), i->i>>4) 
     .filter(e -> e.getKey()>5) 
     .findFirst().ifPresent(e -> System.out.println(e.getValue())); 
+1

chỉnh sửa nhẹ: 'groupingBy' thực sự bảo toàn thứ tự của luồng ban đầu, nếu bộ thu hạ lưu hợp tác (hầu hết, trừ những người có đặc tính KHÔNG GIAN); tập hợp con của các phần tử trong bất kỳ nhóm đã cho nào được trình bày cho trình thu gom hạ lưu theo cùng thứ tự mà chúng có trong đầu vào. –

+1

@Brian Goetz: vâng, nó * duy trì * thứ tự, nhưng tất cả những gì tôi nói trong câu trả lời của tôi là nó không * dựa vào thứ tự hình thành các nhóm. Btw. đó là một trong những trường hợp thử nghiệm mà tôi đã thực hiện cho giải pháp của mình: thu thập một luồng được trả về bởi giải pháp của tôi vào một 'Bản đồ' phải tạo chính xác cùng một' Bản đồ' như 'groupingBy' bằng cùng một trình phân loại. – Holger

1

cyclops-react, tôi thư viện một đóng góp, cung cấp cả sharding và nhóm funcitonality mà có thể làm những gì bạn muốn.

ReactiveSeq<ListX<TYPE>> grouped = ReactiveSeq.fromCollection(FileUtils.readLines(...)) 
      .groupedStatefullyWhile((batch,next) -> batch.size()==0 ? true : next.equals(batch.get(0))); 

Toán tử groupedStatefullyWhile cho phép các thành phần được nhóm dựa trên trạng thái hiện tại của lô. ReactiveSeq là một luồng tuần tự luồng đơn.

Map<Key, Stream<Value> sharded = 
        new LazyReact() 
       .fromCollection(FileUtils.readLines(...)) 
       .map(..) 
       .shard(shards, pair -> pair[0]); 

Điều này sẽ tạo một LazyFutureStream (thực hiện java.util.stream.Stream), sẽ xử lý dữ liệu trong tệp không đồng bộ và song song. Nó lười và sẽ không bắt đầu xử lý cho đến khi dữ liệu được kéo qua.

Thông báo trước duy nhất là bạn cần xác định phân đoạn trước. I E. tham số 'shards' ở trên là Map của async.Queue được khóa bởi khóa tới phân đoạn (có thể là bất kỳ cặp nào [0] là?).

ví dụ:

Map<Integer,Queue<String>> shards; 

There is a sharding example with video heretest code here

0

Nó có thể được thực hiện bằng cách collapse với StreamEx

final int[][] aa = { { 1, 1 }, { 1, 2 }, { 2, 2 }, { 2, 3 }, { 3, 3 }, { 4, 4 } }; 

StreamEx.of(aa) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .forEach(System.out::println); 

Chúng ta có thể thêm peeklimit để xác minh nếu nó lười biếng tính:

StreamEx.of(aa) 
     .peek(System.out::println) 
     .collapse((a, b) -> a[0] == b[0], Collectors.groupingBy(a -> a[0])) 
     .limit(1) 
     .forEach(System.out::println); 
Các vấn đề liên quan