2012-02-28 31 views
18

Tôi đang viết một tập tin lớn với Node.js sử dụng một writable stream:Viết file lớn với Node.js

var fs  = require('fs'); 
var stream = fs.createWriteStream('someFile.txt', { flags : 'w' }); 

var lines; 
while (lines = getLines()) { 
    for (var i = 0; i < lines.length; i++) { 
     stream.write(lines[i]); 
    } 
} 

Tôi tự hỏi nếu chương trình này là an toàn mà không sử dụng drain sự kiện? Nếu nó không phải là (mà tôi nghĩ là trường hợp), các mô hình để viết một dữ liệu lớn tùy ý vào một tập tin là gì?

Trả lời

13

Đó là cách cuối cùng tôi đã làm. Ý tưởng đằng sau là tạo luồng có thể đọc được thực hiện giao diện ReadStream và sau đó sử dụng phương thức pipe() cho dữ liệu đường ống đến luồng có thể ghi.

var fs = require('fs'); 
var writeStream = fs.createWriteStream('someFile.txt', { flags : 'w' }); 
var readStream = new MyReadStream(); 

readStream.pipe(writeStream); 
writeStream.on('close', function() { 
    console.log('All done!'); 
}); 

Ví dụ về MyReadStream lớp học có thể được lấy từ mongoose QueryStream.

+12

Tại sao bạn cần một ReadStream() khi chúng tôi chỉ quan tâm đến việc viết những thứ vào một tập tin? – krjampani

+0

@nab cảm ơn bạn. Khi đường ống có vẻ như nó không thêm '\ r \ n' cho nguồn cấp dữ liệu dòng, vì vậy hãy concat mỗi dòng vào một ... – loretoparisi

9

Ý tưởng đằng sau cống là bạn sẽ sử dụng nó để kiểm tra ở đây:

var fs = require('fs'); 
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'}); 

var lines; 
while (lines = getLines()) { 
    for (var i = 0; i < lines.length; i++) { 
     stream.write(lines[i]); //<-- the place to test 
    } 
} 

mà bạn không. Vì vậy, bạn sẽ cần phải rearchitect để làm cho nó "reentrant".

var fs = require('fs'); 
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'}); 

var lines; 
while (lines = getLines()) { 
    for (var i = 0; i < lines.length; i++) { 
     var written = stream.write(lines[i]); //<-- the place to test 
     if (!written){ 
      //do something here to wait till you can safely write again 
      //this means prepare a buffer and wait till you can come back to finish 
      // lines[i] -> remainder 
     } 
    } 
} 

Tuy nhiên, điều này có nghĩa là bạn cũng cần phải giữ lại getLines khi bạn đợi?

var fs = require('fs'); 
var stream = fs.createWriteStream('someFile.txt', {flags: 'w'}); 

var lines, 
    buffer = { 
    remainingLines = [] 
    }; 
while (lines = getLines()) { 
    for (var i = 0; i < lines.length; i++) { 
     var written = stream.write(lines[i]); //<-- the place to test 
     if (!written){ 
      //do something here to wait till you can safely write again 
      //this means prepare a buffer and wait till you can come back to finish 
      // lines[i] -> remainder 
      buffer.remainingLines = lines.slice(i); 
      break; 
      //notice there's no way to re-run this once we leave here. 
     } 
    } 
} 

stream.on('drain',function(){ 
    if (buffer.remainingLines.length){ 
    for (var i = 0; i < buffer.remainingLines.length; i++) { 
     var written = stream.write(buffer.remainingLines[i]); //<-- the place to test 
     if (!written){ 
     //do something here to wait till you can safely write again 
     //this means prepare a buffer and wait till you can come back to finish 
     // lines[i] -> remainder 
     buffer.remainingLines = lines.slice(i); 
     } 
    } 
    } 
}); 
+3

Không cần thiết phải sử dụng bộ đệm của riêng bạn. Node.js đã làm cho bạn. Đọc tập tin nguồn nodejs-source/lib/fs.js # WriteStream.prototype.write – ayanamist

2

[Chỉnh sửa] Các Node.js cập nhật writable.write(...) API docs nói:

[The] trở lại giá trị là tư vấn chặt chẽ. Bạn CÓ THỂ tiếp tục viết, ngay cả khi nó trả về sai. Tuy nhiên, ghi sẽ được đệm trong bộ nhớ, vì vậy tốt nhất là không làm điều này quá mức. Thay vào đó, hãy đợi sự kiện thoát trước khi ghi thêm dữ liệu.

[Original] Từ (tôi nhấn mạnh) stream.write(...) documentation:

Returns true nếu chuỗi đã được đỏ mặt vào bộ đệm hạt nhân. Trả về false để cho biết bộ đệm hạt nhân đã đầy và dữ liệu sẽ được gửi đi trong tương lai.

tôi giải thích này có nghĩa là "ghi" hàm trả true nếu chuỗi cho trước đã ngay lập tức ghi vào bộ đệm OS tiềm ẩn hoặc false nếu nó đã được chưa viết nhưng sẽ được viết bởi các ghi chức năng (ví dụ như có lẽ được đệm cho bạn bởi WriteStream) để bạn không phải gọi lại "write" nữa.

+1

nhưng "Khi viết một bộ mô tả tập tin theo cách này, đóng bộ mô tả trước khi luồng thoát nguy cơ gửi một FD (không hợp lệ) bị đóng". làm cho tôi nghĩ rằng bộ đệm đầy đủ nghĩa là nó không thể chấp nhận thêm bất kỳ mã nào từ bạn. Tôi thành thật không biết, và chỉ đưa ra dự đoán tốt nhất của tôi như là một câu trả lời ở đây. – jcolebrand

+0

@jcolebrand: ya, tôi cũng không biết, nhưng tôi đoán sự kiện "thoát" chỉ là tín hiệu cho thấy hệ điều hành đã sẵn sàng để viết ngay lập tức, trong trường hợp bạn thực sự muốn tránh việc lưu vào bộ đệm, có thể là của riêng bạn hoặc từ phương thức "write" của WriteStream. Tuy nhiên, các tài liệu cho "cống" đề cập đến "* an toàn để viết lại *", đó là một sự lựa chọn nghèo của từ ngữ hoặc bằng chứng chống lại giải thích của tôi! – maerics

+0

dat Liên kết 404's. – Alan

2

Tôi tìm thấy luồng là cách hoạt động kém để xử lý các tệp lớn - điều này là do bạn không thể đặt kích thước bộ đệm đầu vào phù hợp (ít nhất tôi không biết cách tốt để thực hiện). Đây là những gì tôi làm:

var fs = require('fs'); 

var i = fs.openSync('input.txt', 'r'); 
var o = fs.openSync('output.txt', 'w'); 

var buf = new Buffer(1024 * 1024), len, prev = ''; 

while(len = fs.readSync(i, buf, 0, buf.length)) { 

    var a = (prev + buf.toString('ascii', 0, len)).split('\n'); 
    prev = len === buf.length ? '\n' + a.splice(a.length - 1)[0] : ''; 

    var out = ''; 
    a.forEach(function(line) { 

     if(!line) 
      return; 

     // do something with your line here 

     out += line + '\n'; 
    }); 

    var bout = new Buffer(out, 'ascii'); 
    fs.writeSync(o, bout, 0, bout.length); 
} 

fs.closeSync(o); 
fs.closeSync(i); 
+0

Bạn có bất kỳ điểm chuẩn nào giữa kiểm tra' readStream/writeStream' và 'readSync/writeSync' để xác nhận điều này không câu trả lời? Cảm ơn bạn. – loretoparisi

1

Cách sạch để xử lý này là làm cho máy phát điện dòng của bạn một readable stream - chúng ta hãy gọi nó lineReader.Sau đó, những điều sau đây sẽ tự động xử lý các bộ đệm và thoát độc đáo dành cho bạn:

lineReader.pipe(fs.createWriteStream('someFile.txt')); 

Nếu bạn không muốn thực hiện một dòng có thể đọc được, bạn có thể nghe write 's đầu ra cho đệm-viên mãn và đáp ứng như thế này:

var i = 0, n = lines.length; 
function write() { 
    if (i === n) return; // A callback could go here to know when it's done. 
    while (stream.write(lines[i++]) && i < n); 
    stream.once('drain', write); 
} 
write(); // Initial call. 

Ví dụ dài hơn về trường hợp này có thể được tìm thấy here.

1

Một số câu trả lời được đề xuất cho câu hỏi này đã bỏ lỡ tất cả các điểm về luồng.

Module này có thể giúp https://www.npmjs.org/package/JSONStream

Tuy nhiên, cho phép giả sử tình hình như mô tả và viết mã chính mình. Bạn đang đọc từ MongoDB dưới dạng luồng, với ObjectMode = true theo mặc định.

Điều này sẽ dẫn đến sự cố nếu bạn cố gắng phát trực tiếp vào tệp - chẳng hạn như "lỗi không phải chuỗi không hợp lệ/bộ đệm" không hợp lệ.

Giải pháp cho loại vấn đề này rất đơn giản.

Chỉ cần đặt một chuyển đổi khác ở giữa có thể đọc được và có thể ghi để thích ứng với đối tượng có thể đọc được với một chuỗi có thể ghi một cách thích hợp.

Mẫu Mã Giải pháp:

var fs = require('fs'), 
    writeStream = fs.createWriteStream('./out' + process.pid, {flags: 'w', encoding: 'utf-8' }), 
    stream = require('stream'), 
    stringifier = new stream.Transform(); 
stringifier._writableState.objectMode = true; 
stringifier._transform = function (data, encoding, done) { 
    this.push(JSON.stringify(data)); 
    this.push('\n'); 
    done(); 
} 
rowFeedDao.getRowFeedsStream(merchantId, jobId) 
.pipe(stringifier) 
.pipe(writeStream).on('error', function (err) { 
    // handle error condition 
} 
0

Nếu bạn không xảy ra để có thêm một input stream bạn không thể dễ dàng sử dụng ống. Không có điều nào ở trên phù hợp với tôi, sự kiện thoát nước không kích hoạt. Giải quyết như sau (dựa trên Tylers trả lời):

var lines[]; // some very large array 
var i = 0; 

function write() { 
    if (i < lines.length) { 
     wstream.write(lines[i]), function(err){ 
      if (err) { 
       console.log(err); 
      } else { 
       i++; 
       write(); 
      } 
     }); 
    } else { 
     wstream.end(); 
     console.log("done"); 
    } 
}; 
write(); 
Các vấn đề liên quan