diff --git a/src/Process/Consumer.php b/src/Process/Consumer.php index 653d64e..ac13ecf 100644 --- a/src/Process/Consumer.php +++ b/src/Process/Consumer.php @@ -27,7 +27,7 @@ class Consumer /** * @var string */ - protected $_consumerDir = ''; + protected $_consumerDirs = []; /** * @var array @@ -36,11 +36,11 @@ class Consumer /** * StompConsumer constructor. - * @param string $consumer_dir + * @param array|string $consumer_dirs */ - public function __construct($consumer_dir = '') + public function __construct($consumer_dirs = []) { - $this->_consumerDir = $consumer_dir; + $this->_consumerDirs = (array)$consumer_dirs; } /** @@ -48,38 +48,41 @@ public function __construct($consumer_dir = '') */ public function onWorkerStart() { - if (!is_dir($this->_consumerDir)) { - echo "Consumer directory {$this->_consumerDir} not exists\r\n"; - return; - } - $dir_iterator = new \RecursiveDirectoryIterator($this->_consumerDir); - $iterator = new \RecursiveIteratorIterator($dir_iterator); - foreach ($iterator as $file) { - if (is_dir($file)) { + foreach ($this->_consumerDirs as $dir) { + + if (!is_dir($dir)) { + echo "Consumer directory {$dir} not exists\r\n"; continue; } - $fileinfo = new \SplFileInfo($file); - $ext = $fileinfo->getExtension(); - if ($ext === 'php') { - $class = str_replace('/', "\\", substr(substr($file, strlen(base_path())), 0, -4)); - if (is_a($class, 'Webman\RedisQueue\Consumer', true)) { - $consumer = Container::get($class); - $connection_name = $consumer->connection ?? 'default'; - $queue = $consumer->queue; - if (!$queue) { - echo "Consumer {$class} queue not exists\r\n"; - continue; - } - $this->_consumers[$queue] = $consumer; - $connection = Client::connection($connection_name); - $connection->subscribe($queue, [$consumer, 'consume']); - if (method_exists($connection, 'onConsumeFailure')) { - $connection->onConsumeFailure(function ($exeption, $package) { - $consumer = $this->_consumers[$package['queue']] ?? null; - if ($consumer && method_exists($consumer, 'onConsumeFailure')) { - return call_user_func([$consumer, 'onConsumeFailure'], $exeption, $package); - } - }); + $dir_iterator = new \RecursiveDirectoryIterator($dir); + $iterator = new \RecursiveIteratorIterator($dir_iterator); + foreach ($iterator as $file) { + if (is_dir($file)) { + continue; + } + $fileinfo = new \SplFileInfo($file); + $ext = $fileinfo->getExtension(); + if ($ext === 'php') { + $class = str_replace('/', "\\", substr(substr($file, strlen(base_path())), 0, -4)); + if (is_a($class, 'Webman\RedisQueue\Consumer', true)) { + $consumer = Container::get($class); + $connection_name = $consumer->connection ?? 'default'; + $queue = $consumer->queue; + if (!$queue) { + echo "Consumer {$class} queue not exists\r\n"; + continue; + } + $this->_consumers[$queue] = $consumer; + $connection = Client::connection($connection_name); + $connection->subscribe($queue, [$consumer, 'consume']); + if (method_exists($connection, 'onConsumeFailure')) { + $connection->onConsumeFailure(function ($exeption, $package) { + $consumer = $this->_consumers[$package['queue']] ?? null; + if ($consumer && method_exists($consumer, 'onConsumeFailure')) { + return call_user_func([$consumer, 'onConsumeFailure'], $exeption, $package); + } + }); + } } } }