2015-05-24 23 views
54

Sau khi tìm hiểu về Observables, tôi thấy chúng tương tự như Node.js streams. Cả hai đều có một cơ chế thông báo cho người tiêu dùng bất cứ khi nào dữ liệu mới đến, một lỗi xảy ra hoặc không có thêm dữ liệu (EOF).Các luồng Node.js so với các Quan sát

Tôi rất muốn tìm hiểu về sự khác biệt về khái niệm/chức năng giữa hai yếu tố này. Cảm ơn!

+1

@BenjaminGruenbaum Tôi tự hỏi tại sao bạn gắn thẻ trang này bằng rxjs và thịt xông khói? OP dường như đề cập đến các quan sát từ [tag: harmony ecmascript] – Bergi

+0

@Bergi kiến ​​thức trước về OP và câu hỏi. Về cơ bản. –

+1

Lol congrats trên upvotes, nhưng tôi không có ý tưởng tại sao câu hỏi này đã không nhận được đóng cửa. Làm thế nào là một câu hỏi thực sự/thích hợp cho SO. –

Trả lời

70

Cả quan sát và Node.js của Streams cho phép bạn để giải quyết vấn đề cơ bản giống nhau: không đồng bộ xử lý một chuỗi các giá trị. Sự khác biệt chính giữa hai, tôi tin rằng, có liên quan đến bối cảnh thúc đẩy sự xuất hiện của nó. Ngữ cảnh đó được phản ánh trong thuật ngữ và API.

Trên Quan sát bên bạn có phần mở rộng cho EcmaScript giới thiệu mô hình lập trình phản ứng. Nó cố gắng lấp đầy khoảng cách giữa việc tạo giá trị và tính đồng bộ với các khái niệm tối giản và có thể tổng hợp của ObserverObservable.

Trên node.js và Luồng bên bạn muốn tạo giao diện để xử lý luồng không đồng bộ và hiệu suất luồng mạng và tệp cục bộ. Thuật ngữ bắt nguồn từ bối cảnh ban đầu đó và bạn nhận được pipe, chunk, encoding, flush, Duplex, Buffer, v.v. Bằng cách tiếp cận thực dụng cung cấp hỗ trợ rõ ràng cho các trường hợp sử dụng cụ thể, bạn mất khả năng soạn mọi thứ vì nó không đồng nhất. Ví dụ: bạn sử dụng push trên luồng Readablewrite trên Writable mặc dù, về khái niệm, bạn đang làm điều tương tự: xuất bản giá trị.

Vì vậy, trong thực tế, nếu bạn nhìn vào các khái niệm, và nếu bạn sử dụng tùy chọn { objectMode: true }, bạn có thể kết hợp Observable với Readable suối và Observer với dòng Writable. Bạn thậm chí có thể tạo ra một số bộ điều hợp đơn giản giữa hai mô hình.

var Readable = require('stream').Readable; 
var Writable = require('stream').Writable; 
var util = require('util'); 

var Observable = function(subscriber) { 
    this.subscribe = subscriber; 
} 

var Subscription = function(unsubscribe) { 
    this.unsubscribe = unsubscribe; 
} 

Observable.fromReadable = function(readable) { 
    return new Observable(function(observer) { 
     function nop() {}; 

     var nextFn = observer.next ? observer.next.bind(observer) : nop; 
     var returnFn = observer.return ? observer.return.bind(observer) : nop; 
     var throwFn = observer.throw ? observer.throw.bind(observer) : nop; 

     readable.on('data', nextFn); 
     readable.on('end', returnFn); 
     readable.on('error', throwFn); 

     return new Subscription(function() { 
      readable.removeListener('data', nextFn); 
      readable.removeListener('end', returnFn); 
      readable.removeListener('error', throwFn); 
     }); 
    }); 
} 

var Observer = function(handlers) { 
    function nop() {}; 

    this.next = handlers.next || nop; 
    this.return = handlers.return || nop; 
    this.throw = handlers.throw || nop; 
} 

Observer.fromWritable = function(writable, shouldEnd, throwFn) { 
    return new Observer({ 
     next: writable.write.bind(writable), 
     return: shouldEnd ? writable.end.bind(writable) : function() {}, 
     throw: throwFn 
    }); 
} 

Bạn có thể đã nhận thấy rằng tôi đã thay đổi một vài tên và sử dụng các khái niệm đơn giản của ObserverSubscription, được giới thiệu ở đây, để tránh sự quá tải của reponsibilities thực hiện bằng cách quan sát trong Generator. Về cơ bản, Subscription cho phép bạn hủy đăng ký khỏi số Observable. Dù sao, với mã trên, bạn có thể có pipe.

Observable.fromReadable(process.stdin).subscribe(Observer.fromWritable(process.stdout)); 

So với process.stdin.pipe(process.stdout), những gì bạn có là một cách để kết hợp, lọc và chuyển đổi dòng mà cũng làm việc cho bất kỳ chuỗi dữ liệu khác. Bạn có thể đạt được điều đó với các luồng Readable, TransformWritable nhưng API ưu tiên phân lớp thay vì chuỗi Readable và các hàm áp dụng. Trên mô hình Observable, Ví dụ, các giá trị chuyển đổi tương ứng với việc áp dụng một chức năng biến áp cho luồng. Nó không yêu cầu một kiểu con mới của Transform.

Observable.just = function(/*... arguments*/) { 
    var values = arguments; 
    return new Observable(function(observer) { 
     [].forEach.call(values, function(value) { 
      observer.next(value); 
     }); 
     observer.return(); 
     return new Subscription(function() {}); 
    }); 
}; 

Observable.prototype.transform = function(transformer) { 
    var source = this; 
    return new Observable(function(observer) { 
     return source.subscribe({ 
      next: function(v) { 
       observer.next(transformer(v)); 
      }, 
      return: observer.return.bind(observer), 
      throw: observer.throw.bind(observer) 
     }); 
    }); 
}; 

Observable.just(1, 2, 3, 4, 5).transform(JSON.stringify) 
    .subscribe(Observer.fromWritable(process.stdout)) 

Kết luận? Thật dễ dàng để giới thiệu mô hình phản ứng và khái niệm Observable ở bất cứ đâu. Khó thực hiện toàn bộ thư viện xung quanh khái niệm đó. Tất cả những chức năng nhỏ đó cần phải làm việc cùng nhau một cách nhất quán.Xét cho cùng, dự án ReactiveX vẫn đang diễn ra. Nhưng nếu bạn thực sự cần phải gửi nội dung tập tin cho khách hàng, đối phó với mã hóa, và nén nó sau đó hỗ trợ nó có, trong NodeJS, và nó hoạt động khá tốt.

+3

Tôi thực sự không chắc chắn về toàn bộ điều này "mở rộng để điều Ecmascript". RxJS chỉ là một thư viện, cùng với RxJava, vv Cuối cùng, trong ES7 hoặc ES8 có thể có một số từ khóa trong ES/JS liên quan đến Observables, nhưng chúng chắc chắn không phải là một phần của ngôn ngữ, và chắc chắn không phải khi bạn trả lời câu hỏi vào năm 2015. –

+2

ES7 Có thể quan sát được: https://github.com/tc39/proposal-observable – shannon

Các vấn đề liên quan