2015-04-23 39 views
12

Tôi muốn sử dụng RxJS bên trong của tôi socket.on('sense',function(data){});. Tôi bị mắc kẹt và bối rối với rất ít tài liệu có sẵn và sự thiếu hiểu biết của tôi RxJS. Đây là vấn đề của tôi.Làm thế nào để sử dụng RxJs với Socket.IO trên sự kiện

Tôi có một distSensor.js mà có một chức năng pingEnd()

function pingEnd(x){ 
socket.emit("sense", dist); //pingEnd is fired when an Interrupt is generated. 
} 

Bên App.js của tôi, tôi có

io.on('connection', function (socket) { 
    socket.on('sense', function (data) { 
     //console.log('sense from App4 was called ' + data); 
    }); 
}); 

Chức năng cảm giác được rất nhiều dữ liệu cảm biến mà tôi muốn lọc sử dụng RxJS và tôi không biết mình nên làm gì tiếp theo khi sử dụng RxJ ở đây. Bất kỳ con trỏ nào đến đúng tài liệu hoặc mẫu sẽ giúp ích.

Trả lời

12

Tôi nghĩ bạn có thể sử dụng Rx.Observable.fromEvent (https://github.com/Reactive-Extensions/RxJS/blob/master/doc/api/core/operators/fromevent.md).

Đây là cách tôi đã làm một điều tương tự như sử dụng Bacon.js, trong đó có một API rất giống nhau: https://github.com/raimohanska/bacon-minsk-2015/blob/gh-pages/server.js#L13

Vì vậy, trong Bacon.js nó sẽ đi như

io.on('connection', function(socket){ 
    Bacon.fromEvent(socket, "sense") 
    .filter(function(data) { return true }) 
    .forEach(function(data) { dealWith(data) }) 
}) 

Và trong RxJs bạn muốn thay thế Bacon.fromEvent bằng Rx.Observable.fromEvent.

+0

Vui lòng xóa" i think "vì nó hoạt động tốt. Cũng thay thế Bacon bằng 'Rx.Observable' khi bạn đã ghi rõ rồi. – Mick

8

Bạn có thể tạo một Quan sát như vậy:

var senses = Rx.Observable.fromEventPattern(
    function add (h) { 
     socket.on('sense',h); 
    } 
); 

Sau đó sử dụng senses giống như bất kỳ Quan sát khác.

+0

chức năng của phương thức add (h) là gì? Đó là một phần chức năng của RxJs? – Mitul

+1

Tôi chỉ đặt tên hàm, bạn chỉ cần làm: 'function (h) {...}'. – Xesued

+0

từEvent didnt làm việc cho tôi. Và tôi đã kết thúc với giải pháp này thay thế. Trong TS của nó: 'let obs = Observable.fromEventPattern (h => this.socket.on ('my_event', h));' – DarkNeuron

15

tôi đã trải qua một số vấn đề lạ bằng cách sử dụng phương pháp fromEvent, vì vậy tôi thích chỉ để tạo của riêng tôi Quan sát:

function RxfromIO (io, eventName) { 
    return Rx.Observable.create(observer => { 
    io.on(eventName, (data) => { 
     observer.onNext(data) 
    }); 
    return { 
     dispose : io.close 
    } 
}); 

sau đó tôi có thể sử dụng như thế này:

let $connection = RxfromIO(io, 'connection'); 
+0

Chỉ cần sử dụng từEvent không cần phải viết thêm mã này! const connection $ = Rx.Observable.fromEvent (io, 'kết nối'); – Mick

0

Bạn có thể sử dụng rxjs -dom,

Rx.DOM.fromWebSocket(url, protocol, [openObserver], [closeObserver]) 


// an observer for when the socket is open 
var openObserver = Rx.Observer.create(function(e) { 
    console.info('socket open'); 

    // Now it is safe to send a message 
    socket.onNext('test'); 
}); 

// an observer for when the socket is about to close 
var closingObserver = Rx.Observer.create(function() { 
    console.log('socket is about to close'); 
}); 

// create a web socket subject 
socket = Rx.DOM.fromWebSocket(
    'ws://echo.websocket.org', 
    null, // no protocol 
    openObserver, 
    closingObserver); 

// subscribing creates the underlying socket and will emit a stream of incoming 
// message events 
socket.subscribe(
    function(e) { 
    console.log('message: %s', e.data); 
    }, 
    function(e) { 
    // errors and "unclean" closes land here 
    console.error('error: %s', e); 
    }, 
    function() { 
    // the socket has been closed 
    console.info('socket closed'); 
    } 
); 
+0

Dường như Rx.DOM.fromWebSocket không thể sử dụng với socket.io-server. Rx.DOM.fromWebSocket sử dụng API websocket chuẩn w3c. – clark

1

ES6 một lớp lót mà tôi sử dụng, sử dụng cú pháp ES7 bind:
(đọc $ như stream)

import { Observable } from 'rxjs' 

// create socket 

const message$ = Observable.create($ => socket.on('message', ::$.next)) 
// translates to: Observable.create($ => socket.on('message', $.next.bind(this))) 

// filter example 
const subscription = message$ 
    .filter(message => message.text !== 'spam') 
    //or .filter(({ text }) => text !== 'spam') 
    .subscribe(::console.log) 
0

Đơn giản chỉ cần sử dụng fromEvent(). Dưới đây là một ví dụ đầy đủ trong Node.js nhưng hoạt động tương tự trong trình duyệt. Lưu ý rằng tôi sử dụng first()takeUntil() để ngăn chặn rò rỉ bộ nhớ: first() chỉ lắng nghe một sự kiện và sau đó hoàn tất. Bây giờ sử dụng takeUntil() trên tất cả các ổ cắm-sự kiện khác mà bạn nghe nên quan sát đầy đủ về ngắt kết nối:

const app = require('express')(); 
const server = require('http').createServer(app); 
const io = require('socket.io')(server); 
const Rx = require('rxjs/Rx'); 

connection$ = Rx.Observable.fromEvent(io, 'connection'); 

connection$.subscribe(socket => { 
    console.log(`Client connected`); 

    // Observables 
    const disconnect$ = Rx.Observable.fromEvent(socket, 'disconnect').first(); 
    const message$ = Rx.Observable.fromEvent(socket, 'message').takeUntil(disconnect$); 

    // Subscriptions 
    message$.subscribe(data => { 
     console.log(`Got message from client with data: ${data}`); 
     io.emit('message', data); // Emit to all clients 
    }); 

    disconnect$.subscribe(() => { 
     console.log(`Client disconnected`); 
    }) 
}); 

server.listen(3000); 
Các vấn đề liên quan