Tiền đề
Tôi đang cố gắng để tìm ra cách chính xác để sớm chấm dứt một loạt các con suối bằng đường ống (đường ống) trong Node.js: Thỉnh thoảng tôi muốn xóa bỏ luồng một cách duyên dáng trước khi nó kết thúc. Cụ thể, tôi đang xử lý hầu hết các luồng song song không phải là bản địa, nhưng điều này không thực sự quan trọng.Đúng cách để unpipe một đường ống dẫn streams2 và sản phẩm nào đó (không chỉ tuôn ra)
Vấn đề
Vấn đề là khi tôi unpipe
các đường ống dẫn, dữ liệu vẫn còn trong bộ đệm của mỗi dòng và là drain
ed. có thể là không sao đối với hầu hết các luồng trung gian (ví dụ: Readable
/Transform
), nhưng cuối cùng Writable
vẫn thoát vào mục tiêu ghi (ví dụ: tệp hoặc cơ sở dữ liệu hoặc ổ cắm hoặc w/e). Điều này có thể có vấn đề nếu bộ đệm chứa hàng trăm hoặc hàng nghìn khối mất một lượng thời gian đáng kể để thoát. Tôi muốn nó dừng ngay lập tức, tức là không thoát; lý do tại sao các chu kỳ chất thải và bộ nhớ trên dữ liệu không quan trọng?
Tùy thuộc vào tuyến đường tôi đi, tôi nhận được lỗi "ghi sau khi kết thúc" hoặc ngoại lệ khi luồng không thể tìm thấy các đường ống hiện có.
Câu hỏi
các cách thích hợp để gracefully giết chết một đường ống dẫn của suối dưới dạng a.pipe(b).pipe(c).pipe(z)
là gì?
Giải pháp?
Các giải pháp tôi đã đưa ra là 3 bước:
unpipe
mỗi dòng trong các đường ống theo thứ tự ngược- rỗng của mỗi dòng đệm mà thực hiện
Writable
end
mỗi dòng mà thực hiệnWritable
Một số mã giả minh họa toàn bộ quá trình:
var pipeline = [ // define the pipeline
readStream,
transformStream0,
transformStream1,
writeStream
];
// build and start the pipeline
var tmpBuildStream;
pipeline.forEach(function(stream) {
if (!tmpBuildStream) {
tmpBuildStream = stream;
continue;
}
tmpBuildStream = lastStream.pipe(stream);
});
// sleep, timeout, event, etc...
// tear down the pipeline
var tmpTearStream;
pipeline.slice(0).reverse().forEach(function(stream) {
if (!tmpTearStream) {
tmpTearStream = stream;
continue;
}
tmpTearStream = stream.unpipe(tmpTearStream);
});
// empty and end the pipeline
pipeline.forEach(function(stream) {
if (typeof stream._writableState === 'object') { // empty
stream._writableState.length -= stream._writableState.buffer.length;
stream._writableState.buffer = [];
}
if (typeof stream.end === 'function') { // kill
stream.end();
}
});
Tôi thực sự lo lắng về việc sử dụng stream._writableState
và sửa đổi nội buffer
và length
tài sản (các _
nghĩa một sở hữu tư nhân). Điều này có vẻ như một hack. Cũng lưu ý rằng kể từ khi tôi đang piping, những thứ như pause
và resume
của chúng tôi ra khỏi câu hỏi (dựa trên một gợi ý tôi nhận được từ IRC).
Tôi cũng đặt cùng một phiên bản Runnable (khá luộm thuộm), bạn có thể lấy từ github: https://github.com/zamnuts/multipipe-proto (git clone, NPM cài đặt, xem readme, NPM start)
Tôi cũng thú vị trong làm thế nào để chặn trước và ngăn chặn một (đa gigabyte) dòng lớn cho hiệu quả. (ví dụ: bạn chỉ muốn đọc tiêu đề) – user949300
Theo như tôi biết không có giải pháp chính thức nào để xóa luồng ghi. Giải pháp duy nhất tôi có thể nghĩ ra là viết luồng biến đổi tùy chỉnh mà bạn có thể chèn ngay trước khi viết luồng trong đường ống. Luồng này sẽ thực hiện hành vi đệm riêng của nó, đảm nhận trách nhiệm đó từ luồng ghi. Bởi vì chúng ta sở hữu cơ chế đệm của luồng này, chúng ta có thể xây dựng một phương thức để xóa nó mà không cần phải hack. Dòng ghi sau đó sẽ nhận được một highWaterMark rất thấp, để giảm thiểu dữ liệu cần ghi khi chúng ta chấm dứt. –
@JasperWoudenberg Tôi nghĩ bạn đang ở một nơi nào đó ở đó. Ngoài ra, kể từ khi viết câu hỏi này, IIRC đã có các bản phát hành đã sửa lỗi quirk này. – zamnuts