2015-05-26 15 views
5

Tôi có một luồng mà tôi xử lý bằng cách lắng nghe các sự kiện data, errorend và tôi gọi hàm để xử lý từng sự kiện data trong luồng đầu tiên. Đương nhiên, hàm xử lý dữ liệu gọi các callback khác, làm cho nó không đồng bộ. Vậy làm cách nào để tôi bắt đầu thực thi thêm mã khi dữ liệu trong luồng được xử lý? Lắng nghe sự kiện end trong luồng KHÔNG có nghĩa là chức năng xử lý data không đồng bộ đã hoàn tất.Làm cách nào để đảm bảo mã không đồng bộ được thực thi sau khi luồng xử lý xong?

Làm cách nào để đảm bảo rằng các chức năng xử lý dữ liệu luồng được hoàn thành khi tôi thực hiện câu lệnh tiếp theo?

Dưới đây là một ví dụ:

function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { 
    var self = this; 
    var promises = []; 
    accountStream 
    .on('data', function (account) { 
     migrateAccount.bind(self)(account, finishMigration); 
    }) 
    .on('error', function (err) { 
     return console.log(err); 
    }) 
    .on('end', function() { 
     console.log("Finished updating account stream (but finishMigration is still running!!!)"); 
     callThisOnlyAfterAllAccountsAreMigrated() // finishMigration is still running! 
    }); 
} 

var migrateAccount = function (oldAccount, callback) { 
    executeSomeAction(oldAccount, function(err, newAccount) { 
    if (err) return console.log("error received:", err); 
    return callback(newAccount); 
    }); 
} 

var finishMigration = function (newAccount) { 
    // some code that is executed asynchronously... 
} 

Làm thế nào để đảm bảo rằng callThisOnlyAfterAllAccountsAreMigrated được gọi SAU suối đã được xử lý?

Điều này có thể được thực hiện với lời hứa không? Nó có thể được thực hiện thông qua các luồng không? Tôi đang làm việc với Nodej, vì vậy việc tham khảo các mô-đun npm khác có thể hữu ích.

Trả lời

2

Như bạn đã nói, hãy nghe sự kiện end trên luồng là vô dụng. Luồng không biết hoặc quan tâm đến những gì bạn đang làm với dữ liệu trong trình xử lý data của bạn, vì vậy, bạn cần phải viết một số mã để theo dõi trạng thái di chuyển tài khoản di chuyển của chính mình.

Nếu là tôi, tôi sẽ viết lại toàn bộ phần này. Nếu bạn sử dụng sự kiện readable với .read() trên luồng của mình, bạn có thể đọc nhiều mục cùng lúc mà bạn muốn xử lý. Nếu đó là một, không sao cả. Nếu đó là 30, tuyệt vời. Lý do bạn làm điều này là để bạn sẽ không overrun bất cứ điều gì đang làm việc với các dữ liệu đến từ các dòng. Như hiện tại, nếu accountStream nhanh, ứng dụng của bạn chắc chắn sẽ bị lỗi tại một số điểm.

Khi bạn đọc một mục từ một luồng và bắt đầu công việc, hãy thực hiện lời hứa bạn quay lại (sử dụng Bluebird hoặc tương tự) và ném nó vào một mảng. Khi lời hứa được giải quyết, hãy xóa nó khỏi mảng. Khi luồng kết thúc, hãy đính kèm một trình xử lý .done() tới .all() (về cơ bản là tạo ra một lời hứa lớn trong mọi lời hứa vẫn còn trong mảng).

Bạn cũng có thể sử dụng bộ đếm đơn giản cho công việc đang tiến hành.

+0

Đối với tôi, 'cái end' sự kiện có vẻ khá hữu ích, vì đó là điểm mà bạn cần phải bắt đầu tìm kiếm có bao nhiêu công việc vẫn đang chạy , để bạn có thể kích hoạt cuộc gọi lại cuối cùng khi không còn nữa. – Bergi

+0

@Bergi Điểm tốt, tôi đã làm rõ những gì tôi đã nhận được. – Brad

+1

@Brad Tôi nghĩ rằng giải pháp của bạn sử dụng lời hứa và sự kiện 'có thể đọc được sẽ hiệu quả hơn giải pháp của tôi ở trên. Bạn có nhớ thêm một ví dụ mã hóa không? Lời hứa là một chút khó khăn, vì vậy sẽ hữu ích khi thấy lời hứa trong hành động trên mã của câu hỏi ... – modulitos

1

Sử dụng một đường dòng (module NPM through2), tôi giải quyết vấn đề này bằng cách sử dụng đoạn mã sau để điều khiển hành vi không đồng bộ:

var through = require('through2').obj; 
function updateAccountStream (accountStream, callThisOnlyAfterAllAccountsAreMigrated) { 
    var self = this; 
    var promises = []; 
    accountStream.pipe(through(function(account, _, next) { 
    migrateAccount.bind(self)(account, finishMigration, next); 
    })) 
    .on('data', function (account) { 
    }) 
    .on('error', function (err) { 
     return console.log(err); 
    }) 
    .on('end', function() { 
     console.log("Finished updating account stream"); 
     callThisOnlyAfterAllAccountsAreMigrated(); 
    }); 
} 

var migrateAccount = function (oldAccount, callback, next) { 
    executeSomeAction(oldAccount, function(err, newAccount) { 
    if (err) return console.log("error received:", err); 
    return callback(newAccount, next); 
    }); 
} 

var finishMigration = function (newAccount, next) { 
    // some code that is executed asynchronously, but using 'next' callback when migration is finished... 
} 
1

Nó là dễ dàng hơn nhiều khi bạn xử lý các luồng thông qua lời hứa.

sao chép từ here, một ví dụ sử dụng spex thư viện:

var spex = require('spex')(Promise); 
var fs = require('fs'); 

var rs = fs.createReadStream('values.txt'); 

function receiver(index, data, delay) { 
    return new Promise(function (resolve) { 
     console.log("RECEIVED:", index, data, delay); 
     resolve(); // ok to read the next data; 
    }); 
} 

spex.stream.read(rs, receiver) 
    .then(function (data) { 
     // streaming successfully finished; 
     console.log("DATA:", data); 
    }, function (reason) { 
     // streaming has failed; 
     console.log("REASON:", reason); 
    }); 
Các vấn đề liên quan