vendor/gos/web-socket-bundle/src/Topic/TopicManager.php line 27

Open in your IDE?
  1. <?php declare(strict_types=1);
  2. namespace Gos\Bundle\WebSocketBundle\Topic;
  3. use Ratchet\ConnectionInterface;
  4. use Ratchet\Wamp\Topic;
  5. use Ratchet\Wamp\WampServerInterface;
  6. use Ratchet\WebSocket\WsServerInterface;
  7. /**
  8.  * @author Edu Salguero <edusalguero@gmail.com>
  9.  */
  10. class TopicManager implements WsServerInterfaceWampServerInterface
  11. {
  12.     protected ?WampServerInterface $app null;
  13.     /**
  14.      * @var array<string, Topic>
  15.      */
  16.     protected array $topicLookup = [];
  17.     /**
  18.      * @deprecated to be removed in 4.0, the dependency will be injected through the constructor instead
  19.      */
  20.     public function setWampApplication(WampServerInterface $app): void
  21.     {
  22.         trigger_deprecation('gos/web-socket-bundle''3.7''%s() is deprecated and will be removed in 4.0, the dependency will be injected through the constructor instead.'__METHOD__);
  23.         $this->app $app;
  24.     }
  25.     public function onOpen(ConnectionInterface $conn): void
  26.     {
  27.         $conn->WAMP->subscriptions = new \SplObjectStorage();
  28.         $this->app->onOpen($conn);
  29.     }
  30.     public function onCall(ConnectionInterface $conn$id$topic, array $params): void
  31.     {
  32.         $this->app->onCall($conn$id$this->getTopic($topic), $params);
  33.     }
  34.     public function onSubscribe(ConnectionInterface $conn$topic): void
  35.     {
  36.         $topicObj $this->getTopic($topic);
  37.         if ($conn->WAMP->subscriptions->contains($topicObj)) {
  38.             return;
  39.         }
  40.         $this->topicLookup[(string) $topic]->add($conn);
  41.         $conn->WAMP->subscriptions->attach($topicObj);
  42.         $this->app->onSubscribe($conn$topicObj);
  43.     }
  44.     /**
  45.      * @param Topic|string $topic
  46.      */
  47.     public function onUnsubscribe(ConnectionInterface $conn$topic): void
  48.     {
  49.         $topicObj $this->getTopic($topic);
  50.         if (!$conn->WAMP->subscriptions->contains($topicObj)) {
  51.             return;
  52.         }
  53.         $this->cleanTopic($topicObj$conn);
  54.         $this->app->onUnsubscribe($conn$topicObj);
  55.     }
  56.     public function onPublish(ConnectionInterface $conn$topic$event, array $exclude, array $eligible): void
  57.     {
  58.         $this->app->onPublish($conn$this->getTopic($topic), $event$exclude$eligible);
  59.     }
  60.     public function onClose(ConnectionInterface $conn): void
  61.     {
  62.         $this->app->onClose($conn);
  63.         foreach ($this->topicLookup as $topic) {
  64.             $this->cleanTopic($topic$conn);
  65.         }
  66.     }
  67.     public function onError(ConnectionInterface $conn, \Exception $e): void
  68.     {
  69.         $this->app->onError($conn$e);
  70.     }
  71.     public function getSubProtocols(): array
  72.     {
  73.         if ($this->app instanceof WsServerInterface) {
  74.             return $this->app->getSubProtocols();
  75.         }
  76.         return [];
  77.     }
  78.     /**
  79.      * @param Topic|string $topic
  80.      *
  81.      * @throws \InvalidArgumentException if the $topic argument is not a supported type
  82.      */
  83.     public function getTopic($topic): Topic
  84.     {
  85.         if (!($topic instanceof Topic) && !\is_string($topic)) {
  86.             throw new \InvalidArgumentException(sprintf('The $topic argument of %s() must be an instance of %s or a string, %s was given.'__METHOD__Topic::class, ('object' === \gettype($topic) ? 'an instance of '.\get_class($topic) : 'a '.\gettype($topic))));
  87.         }
  88.         $key $topic instanceof Topic $topic->getId() : $topic;
  89.         if (!\array_key_exists($key$this->topicLookup)) {
  90.             if ($topic instanceof Topic) {
  91.                 $this->topicLookup[$key] = $topic;
  92.             } else {
  93.                 $this->topicLookup[$key] = new Topic($topic);
  94.             }
  95.         }
  96.         return $this->topicLookup[$key];
  97.     }
  98.     protected function cleanTopic(Topic $topicConnectionInterface $conn): void
  99.     {
  100.         if ($conn->WAMP->subscriptions->contains($topic)) {
  101.             $conn->WAMP->subscriptions->detach($topic);
  102.         }
  103.         $this->topicLookup[$topic->getId()]->remove($conn);
  104.         if (=== $topic->count()) {
  105.             unset($this->topicLookup[$topic->getId()]);
  106.         }
  107.     }
  108. }