2016-03-10 12 views
13

Môi trường: NodeJS, Express, DynamoDB (nhưng có thể là bất kỳ cơ sở dữ liệu thực sự)Làm cách nào để tạo luồng có thể đọc được bằng nguồn dữ liệu không đồng bộ trong NodeJ?

Kịch bản: Cần phải đọc một số lượng lớn các hồ sơ và trả lại cho người sử dụng như một tập tin tải về. Điều này có nghĩa rằng tôi không thể đệm tất cả nội dung cùng một lúc và sau đó gửi nó trong một phản ứng từ Express. Ngoài ra, tôi có thể cần thực hiện truy vấn nhiều lần vì tất cả dữ liệu có thể không được trả lại trong một truy vấn.

Giải pháp đề xuất: Sử dụng luồng có thể đọc có thể được dẫn tới luồng phản hồi trong Express.

Tôi bắt đầu bằng cách tạo một đối tượng kế thừa từ luồng.Đọc và thực hiện phương thức _read() để đẩy kết quả truy vấn. Vấn đề là truy vấn cơ sở dữ liệu được gọi trong _read() là không đồng bộ nhưng stream.read() là một phương thức đồng bộ.

Khi luồng được truyền đến phản hồi của máy chủ, đọc được gọi nhiều lần trước khi truy vấn db thậm chí có cơ hội thực thi. Vì vậy, truy vấn được gọi nhiều lần và ngay cả khi trường hợp đầu tiên của truy vấn kết thúc và thực hiện một cú đẩy (null), các truy vấn khác hoàn tất và tôi nhận được lỗi "push() sau EOF".

  1. Có cách nào để thực hiện việc này đúng với _read() không?
  2. Tôi có nên quên _read() và chỉ thực hiện truy vấn và đẩy() kết quả trong hàm tạo không?
  3. Tôi có nên thực hiện truy vấn và phát ra các sự kiện dữ liệu thay vì push() không?

Cảm ơn bạn

function DynamoDbResultStream(query, options){ 
    if(!(this instanceof DynamoDbResultStream)){ 
     return new DynamoDbResultStream(query, options); 
    } 

    Readable.call(this, options); 

    this.dbQuery = query; 
    this.done = false; 
} 
util.inherits(DynamoDbResultStream, Readable); 

DynamoDbResultStream.prototype._read = function(){ 
    var self = this; 
    if(!this.done){ 
     dynamoDB.query(this.dbQuery, function(err, data) { 
      if (!err) { 
       try{ 
        for(i=0;i<data.Items.length;i++){ 
         self.push(data.Items[i]); 
        } 
       }catch(err){ 
        console.log(err); 
       } 
       if (data.LastEvaluatedKey) { 
        //Next read() should invoke the query with a new start key 
        self.dbQuery.ExclusiveStartKey = data.LastEvaluatedKey; 
       }else{ 
        self.done=true; 
        self.push(null); 
       } 
      }else{ 
       console.log(err); 
       self.emit('error',err); 
      } 
     }); 
    }else{ 
     self.push(null); 
    } 
}; 

EDIT: Sau khi đăng câu hỏi này, tôi đã tìm thấy bài đăng này với một câu trả lời cho thấy làm thế nào để làm điều đó mà không sử dụng thừa kế: How to call an asynchronous function inside a node.js readable stream

Một bình luận đã được thực hiện ở đó mà bên trong _read() chỉ nên có một push(). Và mỗi push() thường sẽ tạo ra một lời gọi read() khác.

+0

bạn có thể cung cấp một ví dụ về mã bạn đang viết? – mikefrey

+0

Tôi đã thêm mã tôi có cho đến nay – swbandit

+0

Có thể có liên quan: http://stackoverflow.com/questions/20058614/stream-from-a-mongodb-cursor-to-express-response-in-node-js – Tomalak

Trả lời

2

Hãy nhận biết trong các phương thức khác nhau của Stream: https://nodejs.org/api/stream.html#stream_two_modes

const Readable = require('stream').Readable; 

// starts in paused mode 
const readable = new Readable(); 

let i = 0; 
fetchMyAsyncData() { 
    setTimeout(() => { 
    // still remains in paused mode 
    readable.push(++i); 

    if (i === 5) { 
     return readable.emit('end'); 
    } 

    fetchMyAsyncData(); 
    }, 500);  
} 

// "The res object is an enhanced version of Node’s own response object and supports all built-in fields and methods." 
app.get('/mystreamingresponse', (req, res) => { 

    // remains in paused mode 
    readable.on('readable',() => res.write(readable.read())); 

    fetchMyAsyncData(); 

    // closes the response stream once all external data arrived 
    readable.on('end',() => res.end()); 
}) 
+0

Cố gắng chạy điều này mang lại cho tôi _read không được triển khai – atlanteh

Các vấn đề liên quan