2009-04-17 24 views
24

Có hai InputStream trong Java, có cách nào hợp nhất chúng để bạn kết thúc bằng một InputStream cung cấp cho bạn đầu ra của cả hai luồng không? Làm sao?Làm cách nào để hợp nhất hai luồng đầu vào trong Java?

+3

Hợp nhất theo cách nào? Liền mạch tiếp tục đọc từ luồng thứ hai sau khi đọc lần đầu tiên? Tôi không quen thuộc với Java, nhưng trong C# bạn có thể làm điều này một cách dễ dàng đủ bằng cách thực hiện một lớp kế thừa từ Stream chứa tham chiếu tới cả hai luồng cơ sở và sau đó ghi đè phương thức Read. – Noldorin

Trả lời

37

Như đã nhận xét, không rõ ý bạn là gì khi hợp nhất.

Sử dụng đầu vào có sẵn "ngẫu nhiên" hoặc là phức tạp bởi InputStream.available không nhất thiết phải cung cấp cho bạn một câu trả lời hữu ích và hoạt động chặn luồng. Bạn sẽ cần hai luồng để đọc từ các luồng và sau đó truyền lại dữ liệu qua, ví dụ: java.io.Piped(In|Out)putStream (mặc dù các lớp đó có vấn đề). Ngoài ra đối với một số loại luồng, có thể sử dụng một giao diện khác, ví dụ: java.nio các kênh không chặn.

Nếu bạn muốn toàn bộ nội dung của luồng đầu vào đầu tiên theo sau là dòng thứ hai: new java.io.SequenceInputStream(s1, s2).

+2

Ồ, rất hay, tôi mới học được điều gì đó mới mẻ. SequenceInputStream về cơ bản giống với CatInputStream của tôi, nhưng sử dụng các Liệt kê kế thừa thay vì, nói, một LinkedList. :-) –

+0

Khi hack phần đầu của câu trả lời, thật khó để giải quyết trong trường hợp chung nhưng đối với các trường hợp cụ thể của FileInputStream (và cũng có thể là ổ cắm?), Bạn có thể instanceof/cast và tạo ra một kênh. (Các luồng khác có thể sử dụng Channels.newChannel để tạo giao diện nhất quán, nhưng sẽ không có chất lượng không chặn.) –

+0

Bộ sưu tập.bộ đếm là bạn của bạn. Tôi quên một phần của phần đầu tiên của tôi - sẽ chỉnh sửa. –

0

Không phải là tôi có thể nghĩ đến. Bạn có lẽ sẽ phải đọc nội dung của hai dòng vào một byte [] và sau đó tạo ra một ByteArrayInputStream từ đó.

+0

Upvote cho một giải pháp đơn giản, dễ hiểu, khả thi. Có thể không có hành vi bắt buộc nếu chặn là quan trọng (hoặc chúng rất lớn). –

4

Bạn có thể viết triển khai InputStream tùy chỉnh thực hiện việc này. Ví dụ:

import java.io.IOException; 
import java.io.InputStream; 
import java.util.Collections; 
import java.util.Deque; 
import java.util.LinkedList; 

public class CatInputStream extends InputStream { 
    private final Deque<InputStream> streams; 

    public CatInputStream(InputStream... streams) { 
     this.streams = new LinkedList<InputStream>(); 
     Collections.addAll(this.streams, streams); 
    } 

    private void nextStream() throws IOException { 
     streams.removeFirst().close(); 
    } 

    @Override 
    public int read() throws IOException { 
     int result = -1; 
     while (!streams.isEmpty() 
       && (result = streams.getFirst().read()) == -1) { 
      nextStream(); 
     } 
     return result; 
    } 

    @Override 
    public int read(byte b[], int off, int len) throws IOException { 
     int result = -1; 
     while (!streams.isEmpty() 
       && (result = streams.getFirst().read(b, off, len)) == -1) { 
      nextStream(); 
     } 
     return result; 
    } 

    @Override 
    public long skip(long n) throws IOException { 
     long skipped = 0L; 
     while (skipped < n && !streams.isEmpty()) { 
      int thisSkip = streams.getFirst().skip(n - skipped); 
      if (thisSkip > 0) 
       skipped += thisSkip; 
      else 
       nextStream(); 
     } 
     return skipped; 
    } 

    @Override 
    public int available() throws IOException { 
     return streams.isEmpty() ? 0 : streams.getFirst().available(); 
    } 

    @Override 
    public void close() throws IOException { 
     while (!streams.isEmpty()) 
      nextStream(); 
    } 
} 

Mã này không được kiểm tra, vì vậy số dặm của bạn có thể thay đổi.

+0

Điều này không giống với SequenceInputStream, theo gợi ý của Merzbow? –

+0

Xin lỗi, nhưng tackline đề xuất SequenceInputStream đầu tiên (và tôi đã +1 anh ấy cho điều đó). Trong SO, câu trả lời tốt nhất sớm nhất sẽ thắng; bạn không bao giờ biết nếu câu trả lời sau đó là đạo văn. Ngoài ra, đọc nhận xét của tôi về câu trả lời của tackline cho một so sánh giữa SequenceInputStream và CatInputStream (Tôi có quan điểm của mình về việc sử dụng Collections.enumeration). –

+0

Nếu người downvoted câu trả lời của tôi đã làm như vậy vì nhận xét cuối cùng của tôi, tôi xin lỗi; Tôi nên giải thích tốt hơn: nếu tôi viết một câu trả lời hóa ra là một bản sao (hoặc tập hợp con) của một câu trả lời được đăng trước đó, tôi thường xóa nó, biết rằng tôi sẽ không bao giờ nhận được bất kỳ điểm nào cho nó. Trên SO, nó thực sự là "Súng nhanh nhất ở phương Tây" (tìm kiếm tiêu đề câu hỏi đó). –

14

java.io.SequenceInputStream có thể là những gì bạn cần. Nó chấp nhận việc đếm các luồng và sẽ xuất nội dung của luồng đầu tiên, sau đó là luồng thứ hai, v.v. cho đến khi tất cả các luồng đều trống.

0

Dưới đây là triển khai MVar dành riêng cho mảng byte (đảm bảo thêm định nghĩa gói của riêng bạn). Từ đây, nó là tầm thường để viết một luồng đầu vào trên các dòng đã hợp nhất. Tôi cũng có thể đăng bài đó nếu được yêu cầu.

import java.nio.ByteBuffer; 

public final class MVar { 

    private static enum State { 
    EMPTY, ONE, MANY 
    } 

    private final Object lock; 

    private State state; 

    private byte b; 

    private ByteBuffer bytes; 
    private int length; 

    public MVar() { 
    lock = new Object(); 
    state = State.EMPTY; 
    } 

    public final void put(byte b) { 
    synchronized (lock) { 
     while (state != State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     this.b = b; 
     state = State.ONE; 
     lock.notifyAll(); 
    } 
    } 

    public final void put(byte[] bytes, int offset, int length) { 
    if (length == 0) { 
     return; 
    } 
    synchronized (lock) { 
     while (state != State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     this.bytes = ByteBuffer.allocateDirect(length); 
     this.bytes.put(bytes, offset, length); 
     this.bytes.position(0); 
     this.length = length; 
     state = State.MANY; 
     lock.notifyAll(); 
    } 
    } 

    public final byte take() { 
    synchronized (lock) { 
     while (state == State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     switch (state) { 
     case ONE: { 
     state = State.EMPTY; 
     byte b = this.b; 
     lock.notifyAll(); 
     return b; 
     } 
     case MANY: { 
     byte b = bytes.get(); 
     state = --length <= 0 ? State.EMPTY : State.MANY; 
     lock.notifyAll(); 
     return b; 
     } 
     default: 
     throw new AssertionError(); 
     } 
    } 
    } 

    public final int take(byte[] bytes, int offset, int length) { 
    if (length == 0) { 
     return 0; 
    } 
    synchronized (lock) { 
     while (state == State.EMPTY) { 
     try { 
      lock.wait(); 
     } catch (InterruptedException e) {} 
     } 
     switch (state) { 
     case ONE: 
     bytes[offset] = b; 
     state = State.EMPTY; 
     lock.notifyAll(); 
     return 1; 
     case MANY: 
     if (this.length > length) { 
      this.bytes.get(bytes, offset, length); 
      this.length = this.length - length; 
      synchronized (lock) { 
      lock.notifyAll(); 
      } 
      return length; 
     } 
     this.bytes.get(bytes, offset, this.length); 
     this.bytes = null; 
     state = State.EMPTY; 
     length = this.length; 
     lock.notifyAll(); 
     return length; 
     default: 
     throw new AssertionError(); 
     } 
    } 
    } 
} 
Các vấn đề liên quan