2014-09-06 16 views
7

Tôi đang làm cho một trò chơi html5 www.titansoftime.comsử dụng pthreads PHP với Ratchet WebSocket

Tôi đang sử dụng ratchet như một giải pháp php máy chủ WebSocket. Nó hoạt động tuyệt vời! http://socketo.me/docs/push

Tôi đã thực hiện một số thử nghiệm độc lập bằng cách sử dụng phần mở rộng php pthreads và đã thấy một số kết quả rất thú vị. Nó thực sự hoạt động và hoạt động tốt .. miễn là các ổ cắm web không có trong hỗn hợp.

Pthread cung cấp khả năng đa luồng php (nó thực sự hoạt động và thật tuyệt vời). http://php.net/manual/en/book.pthreads.php

Đây là những gì tôi làm:

/src/server.php Đây là tập tin đó ra mắt daemon.

<?php 
    session_start(); 

    use Ratchet\Server\IoServer; 
    use Ratchet\WebSocket\WsServer; 
    use MyApp\Pusher; 

    require __DIR__ . '/../vendor/autoload.php'; 

    require_once __DIR__ . '/../mysql.cls.php'; 
    require_once __DIR__ . '/../game.cls.php'; 
    require_once __DIR__ . '/../model.cls.php'; 

    $mysql = new mysql; 
    $game = new game; 

    $loop = React\EventLoop\Factory::create(); 
    $pusher = new MyApp\Pusher(); 

    $loop->addPeriodicTimer(0.50, function() use($pusher){ 
     $pusher->load(); 
    }); 

    $webSock = new React\Socket\Server($loop); 

    if ($loop instanceof \React\EventLoop\LibEventLoop) { 
     echo "\n HAS LibEvent"; 
    } 

    $webSock->listen(8080, '0.0.0.0'); // Binding to 0.0.0.0 means remotes can connect 
    $webServer = new Ratchet\Server\IoServer(
      new Ratchet\Http\HttpServer(
        new Ratchet\WebSocket\WsServer($pusher) 
      ), 
      $webSock 
    ); 

    $loop->run(); 

Tất cả đều hoạt động tốt.

/src/MyApp/Pusher.php Lớp này đẩy dữ liệu đến tất cả người dùng được kết nối.

<?php 
namespace MyApp; 
use Ratchet\ConnectionInterface; 
use Ratchet\MessageComponentInterface; 

class AsyncThread extends \Thread{ 

    public $client; 

    public function __construct($client){ 
     $this->client = $client; 
    } 

    public function run(){ 

     // do work on $this->client 
     $user = mysql::assoc('SELECT * from users WHERE connection_id = "'.$this->client->resourceId.'"'); 
     // etc.. 
     $this->client->send(json_encode(array('foo'=>'bar'))); 

    } 

} 

class Pusher implements MessageComponentInterface{ 

    public static $clients = array(); 

    #load 
    public static function load(){ 

     $client_count = count(self::$clients); 

     echo "\n\n\n".'Serving to '.$client_count.' clients. '.time(); 

     $start = $istart = microtime(true); 

     if(!count(self::$clients)){ 
      if(!mysql_ping()){ 
       $game->connect(); 
      } 
     } 

     $threads = array(); 
     foreach(self::$clients as $key => $client){  

      // HANDLE CLIENT 

      // This works just fine, the only problem is that if I have lets say 50 simultaneous users, the people near the end of the clients array will have to wait till the other users have been processed. This is not desirable 
      $client->send(json_encode('foo'=>'bar')); 

      // So I tried this: 
      $threads[$key] = new AsyncThread($client); 
      $threads[$key]->start(); 

      // At this point the AsyncThread class will throw a fatal error complaining about not being able to serialize a closure. 
      // If I dont set "$this->data = $client;" in the thread constructor no error appears but now I cant use the data. 

      // Also regardless of whether or not I bind the data in the AsyncThread constructor, 
      // the connection disappears if I call "new AsyncThread($client)". I cannot explain this behavior. 

     } 

    } 

    public function onMessage(ConnectionInterface $from, $msg) { 
     global $game; 
     if($msg){ 
      $data = json_decode($msg); 
      if($data){  

       switch($data->task){ 

        #connect 
        case 'connect': 
         echo "\n".'New connection! ('.$from->resourceId.') '.$from->remoteAddress; 
         self::$clients[] = $from; 
         break; 

        default: 
         self::closeConnection($from); 
         echo "\nNO TASK CLOSING"; 
         break; 

       } 
      }else{ 
       echo "\n NO DATA"; 
       self::closeConnection($from); 
      } 
     }else{ 
      echo "\n NO MSG"; 
      self::closeConnection($from); 
     } 
    } 

    public function closeConnection($conn){ 
     global $game; 
     if($conn){ 
      if($conn->resourceId){ 
       $connid = $conn->resourceId; 
       $conn->close(); 
       $new = array(); 
       foreach(self::$clients as $client){ 
        if($client->resourceId != $connid){ 
         $new[] = $client; 
        } 
       } 
       self::$clients = $new; 
       $game->query('UPDATE users set connection_id = 0 WHERE connection_id = "'.intval($connid).'" LIMIT 1'); 
       echo "\n".'Connection '.$connid.' has disconnected'; 
      } 
     } 
    } 

    public function onClose(ConnectionInterface $conn) { 
     echo "\nCLIENT DROPPED"; 
     self::closeConnection($conn); 
    } 

    public function onOpen(ConnectionInterface $conn) { 
    } 
    public function onError(ConnectionInterface $conn, \Exception $e) { 
     echo "\nCLIENT ERRORED"; 
     self::closeConnection($conn); 
    } 
    public function onSubscribe(ConnectionInterface $conn, $topic) { 
    } 
    public function onUnSubscribe(ConnectionInterface $conn, $topic) { 
    } 
    public function onCall(ConnectionInterface $conn, $id, $topic, array $params) { 
    } 
    public function onPublish(ConnectionInterface $conn, $topic, $event, array $exclude, array $eligible) { 
    } 

} 

Tất cả đều hoạt động tốt miễn là tôi không tạo chuỗi trong vòng lặp sự kiện.

Tôi có làm theo cách này sai hay là đa luồng và websockets không tương thích?

+0

Hobbes làm bạn phải cập nhật cho câu hỏi này? –

+0

Tôi vẫn đang chờ câu trả lời lol. – Hobbes

+1

Tôi không nghĩ rằng nó là cần thiết để thực hiện đa luồng, nếu bạn đã đi qua nguồn của ratchet và phản ứng thì bạn sẽ hiểu rằng nó đang sử dụng các chức năng đọc không chặn socket. Ngoài ra, nếu bạn muốn có một lượng nhỏ tăng hiệu suất thì bạn có thể muốn xem xét đến sự bất tiện. –

Trả lời

1

kiểm tra gói https://github.com/huyanping/react-multi-process

này Cài đặt

nhà soạn nhạc yêu cầu jenner/phản ứng-đa quá trình Làm thế nào để sử dụng nó?

Vì vậy, đơn giản như:

$loop = React\EventLoop\Factory::create(); 
$server = stream_socket_server('tcp://127.0.0.1:4020'); 
stream_set_blocking($server, 0); 
$loop->addReadStream($server, function ($server) use ($loop) { 
    $conn = stream_socket_accept($server); 
    $data = "pid:" . getmypid() . PHP_EOL; 
    $loop->addWriteStream($conn, function ($conn) use (&$data, $loop) { 
     $written = fwrite($conn, $data); 
     if ($written === strlen($data)) { 
      fclose($conn); 
      $loop->removeStream($conn); 
     } else { 
      $data = substr($data, 0, $written); 
     } 
    }); 
}); 

// the second param is the sub process count 
$master = new \React\Multi\Master($loop, 20); 
$master->start(); 

Một ví dụ sử dụng jenner/simple_fork như:

class IoServer { 
    /** 
    * @param int $count worker process count 
    * Run the application by entering the event loop 
    * @throws \RuntimeException If a loop was not previously specified 
    */ 
    public function run($count = 1) { 
     if (null === $this->loop) { 
      throw new \RuntimeException("A React Loop was not provided during instantiation"); 
     } 

     if($count <= 1){ 
      $this->loop->run(); 
     }else{ 
      $loop = $this->loop; 
      $master = new \Jenner\SimpleFork\FixedPool(function() use($loop) { 
       $this->loop->run(); 
      }, $count); 
      $master->start(); 
      $master->keep(true); 
//   or just 
//   $master = new \React\Multi\Master($this->loop, $count); 
//   $master->start(); 
     } 
    } 
} 
Các vấn đề liên quan