2014-11-06 16 views
11

Tiền đề

Tôi đang cố gắng để tìm ra cách chính xác để sớm chấm dứt một loạt các con suối bằng đường ống (đường ống) trong Node.js: Thỉnh thoảng tôi muốn xóa bỏ luồng một cách duyên dáng trước khi nó kết thúc. Cụ thể, tôi đang xử lý hầu hết các luồng song song không phải là bản địa, nhưng điều này không thực sự quan trọng.Đúng cách để unpipe một đường ống dẫn streams2 và sản phẩm nào đó (không chỉ tuôn ra)

Vấn đề

Vấn đề là khi tôi unpipe các đường ống dẫn, dữ liệu vẫn còn trong bộ đệm của mỗi dòng và là drain ed. có thể là không sao đối với hầu hết các luồng trung gian (ví dụ: Readable/Transform), nhưng cuối cùng Writable vẫn thoát vào mục tiêu ghi (ví dụ: tệp hoặc cơ sở dữ liệu hoặc ổ cắm hoặc w/e). Điều này có thể có vấn đề nếu bộ đệm chứa hàng trăm hoặc hàng nghìn khối mất một lượng thời gian đáng kể để thoát. Tôi muốn nó dừng ngay lập tức, tức là không thoát; lý do tại sao các chu kỳ chất thải và bộ nhớ trên dữ liệu không quan trọng?

Tùy thuộc vào tuyến đường tôi đi, tôi nhận được lỗi "ghi sau khi kết thúc" hoặc ngoại lệ khi luồng không thể tìm thấy các đường ống hiện có.

Câu hỏi

các cách thích hợp để gracefully giết chết một đường ống dẫn của suối dưới dạng a.pipe(b).pipe(c).pipe(z) là gì?

Giải pháp?

Các giải pháp tôi đã đưa ra là 3 bước:

  1. unpipe mỗi dòng trong các đường ống theo thứ tự ngược
  2. rỗng của mỗi dòng đệm mà thực hiện Writable
  3. end mỗi dòng mà thực hiện Writable

Một số mã giả minh họa toàn bộ quá trình:

var pipeline = [ // define the pipeline 
    readStream, 
    transformStream0, 
    transformStream1, 
    writeStream 
]; 

// build and start the pipeline 
var tmpBuildStream; 
pipeline.forEach(function(stream) { 
    if (!tmpBuildStream) { 
     tmpBuildStream = stream; 
     continue; 
    } 
    tmpBuildStream = lastStream.pipe(stream); 
}); 

// sleep, timeout, event, etc... 

// tear down the pipeline 
var tmpTearStream; 
pipeline.slice(0).reverse().forEach(function(stream) { 
    if (!tmpTearStream) { 
     tmpTearStream = stream; 
     continue; 
    } 
    tmpTearStream = stream.unpipe(tmpTearStream); 
}); 

// empty and end the pipeline 
pipeline.forEach(function(stream) { 
    if (typeof stream._writableState === 'object') { // empty 
    stream._writableState.length -= stream._writableState.buffer.length; 
    stream._writableState.buffer = []; 
    } 
    if (typeof stream.end === 'function') { // kill 
    stream.end(); 
    } 
}); 

Tôi thực sự lo lắng về việc sử dụng stream._writableState và sửa đổi nội bufferlength tài sản (các _ nghĩa một sở hữu tư nhân). Điều này có vẻ như một hack. Cũng lưu ý rằng kể từ khi tôi đang piping, những thứ như pauseresume của chúng tôi ra khỏi câu hỏi (dựa trên một gợi ý tôi nhận được từ IRC).

Tôi cũng đặt cùng một phiên bản Runnable (khá luộm thuộm), bạn có thể lấy từ github: https://github.com/zamnuts/multipipe-proto (git clone, NPM cài đặt, xem readme, NPM start)

+0

Tôi cũng thú vị trong làm thế nào để chặn trước và ngăn chặn một (đa gigabyte) dòng lớn cho hiệu quả. (ví dụ: bạn chỉ muốn đọc tiêu đề) – user949300

+2

Theo như tôi biết không có giải pháp chính thức nào để xóa luồng ghi. Giải pháp duy nhất tôi có thể nghĩ ra là viết luồng biến đổi tùy chỉnh mà bạn có thể chèn ngay trước khi viết luồng trong đường ống. Luồng này sẽ thực hiện hành vi đệm riêng của nó, đảm nhận trách nhiệm đó từ luồng ghi. Bởi vì chúng ta sở hữu cơ chế đệm của luồng này, chúng ta có thể xây dựng một phương thức để xóa nó mà không cần phải hack. Dòng ghi sau đó sẽ nhận được một highWaterMark rất thấp, để giảm thiểu dữ liệu cần ghi khi chúng ta chấm dứt. –

+0

@JasperWoudenberg Tôi nghĩ bạn đang ở một nơi nào đó ở đó. Ngoài ra, kể từ khi viết câu hỏi này, IIRC đã có các bản phát hành đã sửa lỗi quirk này. – zamnuts

Trả lời

1

Trong trường hợp đặc biệt này, tôi nghĩ chúng ta nên có được loại bỏ cấu trúc mà bạn có 4 luồng không được tùy chỉnh hoàn toàn khác nhau. Đường ống chúng lại với nhau sẽ tạo ra sự phụ thuộc dây chuyền sẽ khó kiểm soát nếu chúng ta không thực hiện cơ chế riêng của chúng ta.

Tôi muốn tập trung vào mục tiêu actuall của bạn ở đây:

INPUT >----[read] → [transform0] → [transform1] → [write]-----> OUTPUT 
       |   |    |   | 
KILL_ALL------o----------o--------------o------------o--------[nothing to drain] 

Tôi tin rằng cấu trúc trên có thể được thực hiện thông qua việc kết hợp tùy chỉnh:

  1. duplex stream - cho riêng _write(chunk, encoding, cb)_read(bytes) triển khai với

  2. transform stream - để thực hiện riêng _transform(chunk, encoding, cb).

Vì bạn đang sử dụng writable-stream-parallel gói bạn cũng có thể muốn đi qua libs của họ, như thực hiện duplex của họ có thể được tìm thấy ở đây: https://github.com/Clever/writable-stream-parallel/blob/master/lib/duplex.js. Và triển khai transform stream của chúng tôi ở đây: https://github.com/Clever/writable-stream-parallel/blob/master/lib/transform.js. Ở đây họ xử lý các highWaterMark.

giải pháp có thể

ghi dòng họ: https://github.com/Clever/writable-stream-parallel/blob/master/lib/writable.js#L189 có một chức năng thú vị writeOrBuffer, tôi nghĩ bạn có thể có thể tinh chỉnh nó một chút để làm gián đoạn ghi dữ liệu từ bộ đệm.

Lưu ý: Những 3 flags đang kiểm soát thanh toán bù trừ đệm:

(!finished && !state.bufferProcessing && state.buffer.length) 

Tài liệu tham khảo:

+0

1) Đường ống các luồng với nhau cho phép tôi có kiến ​​trúc có thể cắm được - đường ống có thể được tạo, sắp xếp và khác nhau về độ dài tùy thuộc vào các bộ lọc/biến đổi/etc của người dùng. 2) bạn không rõ ràng về phần "kết hợp tùy chỉnh", bạn có thể xây dựng hoặc viết lại không? 3) Các giải pháp có thể bạn đề cập là không tốt hơn cũng không tồi tệ hơn so với giải pháp đề xuất của tôi - Tôi vẫn phải vá mã mà tôi không duy trì. – zamnuts

+0

Tôi chưa thực hiện giải pháp này, nhưng tôi muốn chỉ ra rằng bạn có thể tận dụng lợi thế của việc tạo luồng của riêng bạn, cho phép bạn kiểm soát tốt những gì đang xảy ra "dưới mui xe", thay vì hack giải pháp đã tồn tại. Đây là một chủ đề rất thú vị, khi tôi đi qua các tài liệu “có thể ghi-song song' tôi có thể thấy ngày càng nhiều nơi để thực hiện kiểm soát. Và về đường ống: Tôi đồng ý rằng ổ cắm có thể cắm được. là tuyệt vời nhưng trong trường hợp của bạn tôi sẽ tư vấn để xây dựng "phích cắm" của riêng bạn. –

Các vấn đề liên quan