2013-05-08 27 views
13
  • 2 dòng:nối hai (hoặc n) suối

    Với thể đọc được streamsstream1stream2, những gì là một thành ngữ (ngắn gọn) cách để nhận được một dòng chứa stream1stream2 nối?

    Tôi không thể làm stream1.pipe(outStream); stream2.pipe(outStream), bởi vì sau đó nội dung luồng được xáo trộn với nhau.

  • n suối:

    Với một EventEmitter mà phát ra một số không xác định của suối, ví dụ

    eventEmitter.emit('stream', stream1) 
    eventEmitter.emit('stream', stream2) 
    eventEmitter.emit('stream', stream3) 
    ... 
    eventEmitter.emit('end') 
    

    gì là một thành ngữ (ngắn gọn) cách để nhận được một dòng với tất cả các con suối nối với nhau?

Trả lời

11

Gói combined-stream ghép các luồng. Ví dụ từ README:

var CombinedStream = require('combined-stream'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 

Tôi tin rằng bạn phải nối thêm tất cả các luồng cùng một lúc. Nếu hàng đợi trống, thì combinedStream sẽ tự động kết thúc. Xem issue #5.

Thư viện stream-stream là phương án thay thế có rõ ràng .end, nhưng ít phổ biến hơn và có lẽ không được kiểm tra kỹ lưỡng. Nó sử dụng API stream2 của Nút 0.10 (xem this discussion).

3

Bạn có thể có thể làm cho nó ngắn gọn hơn, nhưng đây là một trong những hoạt động:

var util = require('util'); 
var EventEmitter = require('events').EventEmitter; 

function ConcatStream(streamStream) { 
    EventEmitter.call(this); 
    var isStreaming = false, 
    streamsEnded = false, 
    that = this; 

    var streams = []; 
    streamStream.on('stream', function(stream){ 
    stream.pause(); 
    streams.push(stream); 
    ensureState(); 
    }); 

    streamStream.on('end', function() { 
    streamsEnded = true; 
    ensureState(); 
    }); 

    var ensureState = function() { 
    if(isStreaming) return; 
    if(streams.length == 0) { 
     if(streamsEnded) 
     that.emit('end'); 
     return; 
    } 
    isStreaming = true; 
    streams[0].on('data', onData); 
    streams[0].on('end', onEnd); 
    streams[0].resume(); 
    }; 

    var onData = function(data) { 
    that.emit('data', data); 
    }; 

    var onEnd = function() { 
    isStreaming = false; 
    streams[0].removeAllListeners('data'); 
    streams[0].removeAllListeners('end'); 
    streams.shift(); 
    ensureState(); 
    }; 
} 

util.inherits(ConcatStream, EventEmitter); 

Chúng tôi theo dõi các trạng thái với streams (hàng đợi của suối; push để lưng và shift từ mặt trước), isStreamingstreamsEnded. Khi chúng tôi có một luồng mới, chúng tôi sẽ phát trực tuyến và khi luồng kết thúc, chúng tôi sẽ ngừng nghe và thay đổi. Khi luồng luồng kết thúc, chúng tôi đặt streamsEnded.

Trên mỗi sự kiện này, chúng tôi kiểm tra trạng thái của chúng tôi. Nếu chúng tôi đã phát trực tuyến (đang truyền luồng), chúng tôi sẽ không làm gì cả. Nếu hàng đợi trống và streamsEnded được đặt, chúng tôi phát ra sự kiện end. Nếu có thứ gì đó trong hàng đợi, chúng tôi sẽ tiếp tục và lắng nghe các sự kiện của nó.

* Lưu ý rằng pauseresume là cố vấn, vì vậy, một số luồng có thể không hoạt động chính xác và sẽ yêu cầu lưu vào bộ đệm. Bài tập này được để lại cho người đọc.

Sau khi thực hiện tất cả những điều này, tôi sẽ làm các trường hợp n=2 bằng cách xây dựng một EventEmitter, tạo ra một ConcatStream với nó, và phát ra hai stream sự kiện tiếp theo là một sự kiện end. Tôi chắc rằng nó có thể được thực hiện chính xác hơn, nhưng chúng tôi cũng có thể sử dụng những gì chúng tôi có.

+0

Cảm ơn Aaron! Tôi đã hy vọng sẽ có một số thư viện hiện có để tôi có thể giải quyết nó trong ba dòng. Nếu không có, tôi nghĩ tôi có thể giải nén giải pháp của bạn thành một gói. Tôi có thể sử dụng mã của bạn theo giấy phép MIT không? –

+0

Ah, đã tìm thấy thư viện truyền trực tuyến. Xem câu trả lời của tôi. –

+0

@JoLiss Tôi cũng đã tìm kiếm thứ gì đó đầu tiên, nhưng tôi không thể tìm thấy tùy chọn đó. Bạn chắc chắn có thể sử dụng mã của tôi trong thư viện nếu bạn vẫn muốn. –

1

streamee.js là tập hợp các máy biến áp dòng và nhà soạn nhạc dựa trên nút 1.0+ suối và bao gồm một phương pháp tiếp nhau:

var stream1ThenStream2 = streamee.concatenate([stream1, stream2]); 
+0

Cảm ơn, tôi sẽ kiểm tra. Đó là Node 0.10 tôi giả sử? –

+0

Có nút 0.10, nhưng bạn có thể bao bọc các luồng kiểu cũ thành 0.10+ luồng như được viết trong README – atamborrino

2

https://github.com/joepie91/node-combined-stream2 là một sự thay thế thả trong Streams2 tương thích cho các mô-đun kết hợp dòng (được mô tả ở trên.) Nó tự động kết thúc tốt đẹp Streams1 suối.

Ví dụ mã cho hỗn hợp stream2:

var CombinedStream = require('combined-stream2'); 
var fs = require('fs'); 

var combinedStream = CombinedStream.create(); 
combinedStream.append(fs.createReadStream('file1.txt')); 
combinedStream.append(fs.createReadStream('file2.txt')); 

combinedStream.pipe(fs.createWriteStream('combined.txt')); 
3

này có thể được thực hiện với vani nodejs

import { PassThrough } from 'stream' 
const merge = (...streams) => { 
    let pass = new PassThrough() 
    let waiting = streams.length 
    for (let stream of streams) { 
     pass = stream.pipe(pass, {end: false}) 
     stream.once('end',() => --waiting === 0 && pass.emit('end')) 
    } 
    return pass 
} 
5

Một reduce thao tác đơn giản nên được tốt trong nodejs!

const {PassThrough} = require('stream') 

let joined = [s0, s1, s2, ...sN].reduce((pt, s, i, a) => { 
    s.pipe(pt, {end: false}) 
    s.once('end',() => a.every(s => s.ended) && pt.emit('end')) 
    return pt 
}, new PassThrough()) 

Cheers;)

+0

Bạn có nên trả lại thứ gì đó từ việc giảm bớt không? Điều này có vẻ như 'đã tham gia' sẽ không được xác định. –

+1

đã được sửa. thanks;) @Mark_M – Ivo

+0

CẢNH BÁO: Điều này sẽ làm cho tất cả các luồng được nối vào luồng PassThrough song song, mà không có liên quan đến thứ tự của dữ liệu, nhiều khả năng làm hỏng dữ liệu của bạn. –

Các vấn đề liên quan