2015-10-28 16 views
14

Tôi đang viết một mô-đun, một luồng có thể ghi. Tôi muốn triển khai giao diện ống cho người dùng của mình.Cách chính xác để tạm dừng luồng có thể đọc được bằng đường ống từ luồng có thể ghi trong nútj là gì?

Nếu một số lỗi xảy ra, tôi cần tạm dừng luồng có thể đọc và phát ra sự kiện lỗi. Sau đó, người dùng sẽ quyết định - nếu anh ấy chấp nhận lỗi, anh ấy có thể tiếp tục xử lý dữ liệu.

var writeable = new BackPressureStream(); 
writeable.on('error', function(error){ 
    console.log(error); 
    writeable.resume(); 
}); 

var readable = require('fs').createReadStream('somefile.txt'); 
readable.pipe.(writeable); 

Tôi thấy nút đó cung cấp cho chúng tôi phương pháp readable.pause(), có thể được sử dụng để tạm dừng luồng có thể đọc được. Nhưng tôi không thể biết cách tôi có thể gọi nó từ mô-đun luồng có thể ghi của mình:

var Writable = require('stream').Writable; 

function BackPressureStream(options) { 
    Writable.call(this, options); 
} 
require('util').inherits(BackPressureStream, Writable); 

BackPressureStream.prototype._write = function(chunk, encoding, done) { 
    done(); 
}; 

BackPressureStream.prototype.resume = function() { 
    this.emit('drain'); 
} 

Áp suất ngược có thể được thực hiện trong luồng có thể ghi?

P.S. Có thể sử dụng các sự kiện pipe/unpipe, cung cấp luồng có thể đọc làm thông số. Nhưng nó cũng được nói, rằng đối với các luồng đường ống, cơ hội duy nhất để tạm dừng là để bỏ đọc luồng có thể đọc được từ ghi.

Tôi đã hiểu đúng chưa? Tôi phải bỏ qua luồng ghi có thể ghi của mình cho đến khi cuộc gọi của người dùng tiếp tục? Và sau khi cuộc gọi của người dùng tiếp tục, tôi có thể đọc lại luồng có thể đọc được không?

+1

quan tâm đến việc bắt đầu một bounty cho một này? –

+1

này, bạn có tìm thấy câu trả lời cho câu hỏi của mình không? –

Trả lời

0

Về cơ bản, như tôi đã hiểu, bạn đang tìm cách đặt áp lực lên luồng trong trường hợp xảy ra lỗi. Bạn có một cặp đôi tùy chọn.

Thứ nhất, như bạn đã xác định, hãy sử dụng pipe để lấy mẫu thể hiện của luồng đọc và thực hiện một số bước chân ưa thích.

Một lựa chọn khác là tạo ra một dòng ghi gói cung cấp chức năng này (tức là phải mất một WritableStream như một đầu vào, và khi thực hiện chức năng suối, vượt qua dữ liệu dọc theo bờ suối cung cấp.

Về cơ bản bạn kết thúc với một cái gì đó giống như

source stream -> wrapping writable -> writable

https://nodejs.org/api/stream.html#stream_implementing_a_writable_stream giao dịch với việc thực hiện một dòng ghi.

Mấu chốt fo r bạn là nếu một lỗi xảy ra trong cơ sở ghi, bạn sẽ thiết lập một lá cờ trên dòng, và các cuộc gọi tiếp theo để write xảy ra, bạn sẽ đệm đoạn, lưu trữ các cuộc gọi lại và chỉ gọi. Một cái gì đó như

// ... 
constructor(wrappedWritableStream) { 
    wrappedWritableStream.on('error', this.errorHandler); 
    this.wrappedWritableStream = wrappedWritableStream; 
} 
// ... 
write(chunk, encoding, callback) { 
    if (this.hadError) { 
     // Note: until callback is called, this function won't be called again, so we will have maximum one stored 
     // chunk. 
     this.bufferedChunk = [chunk, encoding, callback]; 
    } else { 
     wrappedWritableStream.write(chunk, encoding, callback); 
    } 
} 
// ... 
errorHandler(err) { 
    console.error(err); 
    this.hadError = err; 
    this.emit(err); 
} 
// ... 
recoverFromError() { 
    if (this.bufferedChunk) { 
     wrappedWritableStream.write(...this.bufferedChunk); 
     this.bufferedChunk = undefined; 
    } 
    this.hadError = false; 
} 

Lưu ý: Bạn chỉ nên cần phải thực hiện các write chức năng, nhưng tôi khuyến khích bạn đào bới xung quanh và chơi với các chức năng thực hiện khác. Cũng cần lưu ý rằng bạn có thể gặp khó khăn khi viết thư cho các luồng đã phát ra một sự kiện lỗi, nhưng tôi sẽ để điều đó cho bạn như một vấn đề riêng biệt cần giải quyết.

Đây là một nguồn lực tốt trên backpressuring https://nodejs.org/en/docs/guides/backpressuring-in-streams/

Các vấn đề liên quan