diff --git a/Classes/Job/ConfigurationFactory.php b/Classes/Job/ConfigurationFactory.php index 74187bb..6640b29 100644 --- a/Classes/Job/ConfigurationFactory.php +++ b/Classes/Job/ConfigurationFactory.php @@ -8,6 +8,8 @@ use Neos\Flow\Configuration\ConfigurationManager; use Neos\Flow\Core\Booting\Scripts; use Neos\Utility\ObjectAccess; +use Netlogix\JobQueue\Pool\ProcessFactory; + use function defined; use function preg_replace; use function rtrim; @@ -59,16 +61,7 @@ public function buildJobConfiguration(string $queueName): array ConfigurationManager::CONFIGURATION_TYPE_SETTINGS, 'Neos.Flow' ); - - $command = Scripts::buildPhpCommand( - $flowSettings - ); - $command .= sprintf( - ' %s %s --queue=%s', - escapeshellarg(\FLOW_PATH_FLOW . 'Scripts/flow.php'), - escapeshellarg('flowpack.jobqueue.common:job:execute'), - escapeshellarg($queueName) - ); + $command = (new ProcessFactory)->buildSubprocessCommand(); $workerPool = (array)$this->configurationManager->getConfiguration( ConfigurationManager::CONFIGURATION_TYPE_SETTINGS, diff --git a/Classes/Lock.php b/Classes/Lock.php index 16b5c42..75c31b0 100644 --- a/Classes/Lock.php +++ b/Classes/Lock.php @@ -30,6 +30,11 @@ public function __construct(int $numberOfWorkers, string $lockFileDirectory) } } + /** + * @template T + * @param callable(): T $run + * @return T + */ public function run(callable $run) { $this->findSlot(); diff --git a/Classes/Loop.php b/Classes/Loop.php index 89fd1fe..3509eab 100644 --- a/Classes/Loop.php +++ b/Classes/Loop.php @@ -1,57 +1,86 @@ queue = $queue; - $this->exitAfterTimestamp = $exitAfter > 0 ? time() + $exitAfter : null; - $this->timeout = $exitAfter > 0 ? $exitAfter : null; + public const int SIX_HOURS_IN_SECONDS = 21600; + + public function __construct( + /** + * The Queue to watch + */ + protected RabbitQueue $queue, + + protected readonly Pool $poolObject, + + /** + * Time in seconds after which the loop should exit + */ + protected readonly ?int $exitAfter + ) { } public function runMessagesOnWorker(Worker $worker) { - $worker->prepare(); - do { - try { - $message = $this->queue->waitAndReserve($this->timeout); - $worker->executeMessage($message); - } catch (AMQPTimeoutException $e) { - } + $this + ->poolObject + ->runLoop(function (Pool $pool) use ($worker) { + $worker->prepare(); + + $runDueJobs = $pool->eventLoop->addPeriodicTimer( + interval: 0.01, + callback: fn () => $this->runDueJob($pool, $worker) + ); - if ($this->exitAfterTimestamp !== null && time() >= $this->exitAfterTimestamp) { - break; + if ($this->exitAfter) { + $pool->eventLoop->addTimer( + interval: max($this->exitAfter, 1), + callback: function () use ($pool, $runDueJobs) { + $pool->eventLoop->cancelTimer($runDueJobs); + $checkForPoolToClear = $pool->eventLoop->addPeriodicTimer( + interval: 1, + callback: function () use ($pool, &$checkForPoolToClear) { + if (count($pool) === 0) { + $pool->eventLoop->cancelTimer($checkForPoolToClear); + $pool->eventLoop->stop(); + } + } + ); + } + ); + } + }); + } + + private function runDueJob(Pool $pool, Worker $worker): void + { + /** + * No parallel execution of multiple messages here, create multiple + * fast rabbit instances connected instead. + * Counting the running instances in the pool only prevents the + * pool from spawning too many workers. + */ + if (count($pool)) { + return; + } + try { + $message = $this->queue->waitAndReserve(10); + if ($message) { + $pool->eventLoop->futureTick(fn () => $worker->executeMessage($message)); } - } while (true); + } catch (AMQPTimeoutException $e) { + } } } diff --git a/Classes/Package.php b/Classes/Package.php new file mode 100644 index 0000000..e730447 --- /dev/null +++ b/Classes/Package.php @@ -0,0 +1,44 @@ +getSignalSlotDispatcher(); + assert($dispatcher instanceof Dispatcher); + + /** + * @see Compiler::compiledClasses() + * @see FileMonitor::emitFilesHaveChanged() + * @see AllSingletonsPreloader::flush() + */ + $dispatcher->connect( + signalClassName: FileMonitor::class, + signalName: 'filesHaveChanged', + slotClassNameOrObject: fn () => static::flushSingletonsPreloaderCache($bootstrap->getObjectManager()) + ); + } + + private function flushSingletonsPreloaderCache(ObjectManagerInterface $objectManager): void + { + if ($objectManager instanceof CompileTimeObjectManager) { + return; + } + $objectManager + ->get(AllSingletonsPreloader::CACHE) + ->flush(); + } +} diff --git a/Classes/PreventLoggingOfMissingInput/JobCommandInitializationAspect.php b/Classes/PreventLoggingOfMissingInput/JobCommandInitializationAspect.php new file mode 100644 index 0000000..c193d66 --- /dev/null +++ b/Classes/PreventLoggingOfMissingInput/JobCommandInitializationAspect.php @@ -0,0 +1,51 @@ +mapRequestArgumentsToControllerArguments())')] + public function preventLoggingOfMissingInputExceptions(JoinPointInterface $joinPoint): void + { + $jobCommandController = $joinPoint->getProxy(); + assert($jobCommandController instanceof JobCommandController); + + $reflection = new ClassReflection($jobCommandController); + + $commandMethodName = $reflection + ->getProperty('commandMethodName') + ->getValue($jobCommandController); + + if ($commandMethodName !== 'executeCommand') { + $joinPoint->getAdviceChain()->proceed($joinPoint); + return; + } + + try { + $joinPoint->getAdviceChain()->proceed($joinPoint); + } catch (MissingInputException $e) { + throw new StopCommandException(); + } + } +} diff --git a/Classes/SingletonPreloading/AllSingletonsPreloader.php b/Classes/SingletonPreloading/AllSingletonsPreloader.php new file mode 100644 index 0000000..7993000 --- /dev/null +++ b/Classes/SingletonPreloading/AllSingletonsPreloader.php @@ -0,0 +1,154 @@ +getClassList() as $className => $buildInstance) { + $this->preload(className: $className, buildInstance: $buildInstance); + } + $this->pauseExpiringObjects(); + } + + /** + * @return array + */ + protected function getClassList(): array + { + if ($this->cache->has('classList')) { + return $this->cache->get('classList'); + } else { + $list = [... $this->buildClassList()]; + $this->cache->set('classList', $list); + return $list; + } + } + + protected function preload(string $className, bool $buildInstance): void + { + try { + $buildInstance + ? $this->objectManager->get($className) + : class_exists(class: $className, autoload: true); + } catch (Throwable) { + // ignore + } + } + + protected function pauseExpiringObjects() + { + if ($this->objectManager->has(EntityManagerInterface::class)) { + $this->objectManager + ->get(EntityManagerInterface::class) + ->getConnection() + ->close(); + } + if ($this->objectManager->has(Connection::class)) { + $this->objectManager + ->get(Connection::class) + ->close(); + } + // TODO: There are other objects that might expire, for example ยด + } + + /** + * @return Traversable + */ + protected function buildClassList(): Traversable + { + foreach (self::getSingletonClassNamesFromReflection($this->objectManager) as $className => $buildInstance) { + yield $className => $buildInstance && !$this->ignoreClassName($className); + } + } + + protected function ignoreClassName(string $className): bool + { + foreach ($this->ignoreClassNames as $ignoredClassName) { + if (is_a($className, $ignoredClassName, true)) { + return true; + } + } + return false; + } + + /** + * @return array + */ + #[Flow\CompileStatic] + public static function getSingletonClassNamesFromReflection(ObjectManagerInterface $objectManager): array + { + $reflection = $objectManager->get(ReflectionService::class); + assert($reflection instanceof ReflectionService); + $classNames = []; + foreach ($reflection->getAllClassNames() as $className) { + try { + if ($objectManager->getScope($className) !== Configuration::SCOPE_SINGLETON) { + /** + * Only preload singletons + */ + $classNames[$className] = false; + continue; + } + } catch (\Exception $e) { + $classNames[$className] = false; + continue; + } + + $constructParameters = $reflection->getMethodParameters($className, '__construct'); + if (count($constructParameters)) { + /** + * Skip preloading for classes with constructor arguments because they are + * likely to depend on stateful objects that, in one way or other, expire, + * like database connections. + */ + $classNames[$className] = false; + } else { + $classNames[$className] = true; + } + } + + return $classNames; + } +} diff --git a/Classes/SingletonPreloading/JobCommandInitializationAspect.php b/Classes/SingletonPreloading/JobCommandInitializationAspect.php new file mode 100644 index 0000000..d4a6acd --- /dev/null +++ b/Classes/SingletonPreloading/JobCommandInitializationAspect.php @@ -0,0 +1,58 @@ +initializeCommandMethodArguments())')] + public function preloadSingletonsWhenJobCommandControllerGetsInitialized(JoinPointInterface $joinPoint): void + { + $jobCommandController = $joinPoint->getProxy(); + assert($jobCommandController instanceof JobCommandController); + + $reflection = new ClassReflection($jobCommandController); + + $commandMethodName = $reflection + ->getProperty('commandMethodName') + ->getValue($jobCommandController); + + if ($commandMethodName !== 'executeCommand') { + return; + } + + $request = $reflection + ->getProperty('request') + ->getValue($jobCommandController); + assert($request instanceof Request); + + $arguments = $reflection + ->getProperty('arguments') + ->getValue($jobCommandController); + assert($arguments instanceof Controller\Arguments); + + foreach ($arguments as $argument) { + assert($argument instanceof Controller\Argument); + if ($argument->isRequired() && !$request->hasArgument($argument->getName())) { + // only preload if the request is blocked by fetching an argument via stdin + $this->objectManager + ->get(SingletonsPreloader::class) + ->collect(); + return; + } + } + } +} diff --git a/Classes/SingletonPreloading/NoneSingletonsPreloader.php b/Classes/SingletonPreloading/NoneSingletonsPreloader.php new file mode 100644 index 0000000..8dea0be --- /dev/null +++ b/Classes/SingletonPreloading/NoneSingletonsPreloader.php @@ -0,0 +1,37 @@ +command = $command; - $this->queue = $queue; - $this->queueSettings = $queueSettings; - $this->messageCache = $messageCache; - $this->lock = $lock; } - public function prepare() + public function prepare(): void { $this->output = new ConsoleOutput(); - $this->outputLine('Watching queue "%s"', $this->queue->getName()); + $this->output->outputLine('Watching queue "%s"', [$this->queue->getName()]); } - public function executeMessage(Message $message) + public function executeMessage(Message $message): void { $messageCacheIdentifier = sha1(serialize($message)); $this->messageCache->set($messageCacheIdentifier, $message); - $this->lock->run(function() use (&$messageCacheIdentifier, &$commandOutput, &$result) { - exec( - $this->command . ' --messageCacheIdentifier=' . escapeshellarg($messageCacheIdentifier), - $commandOutput, - $result - ); - }); + $process = $this->lock->run( + fn () => $this->poolObject->runPayload(payload: $message->getPayload(), queueName: $this->queue->getName()), + ); + assert($process instanceof Process); - if ($result === 0) { + $process->on(Pool::EVENT_SUCCESS, function () use ($message) { $this->queue->finish($message->getIdentifier()); - $this->outputLine( - 'Successfully executed job "%s" (%s)', - $message->getIdentifier(), - join('', $commandOutput) + $this->output->outputLine( + 'Successfully executed job "%s"', + [$message->getIdentifier()] ); + }); - } else { + $process->on(Pool::EVENT_ERROR, function () use ($message) { $maximumNumberOfReleases = isset($this->queueSettings['maximumNumberOfReleases']) - ? (int)$this->queueSettings['maximumNumberOfReleases'] + ? (int) $this->queueSettings['maximumNumberOfReleases'] : JobManager::DEFAULT_MAXIMUM_NUMBER_RELEASES; if ($message->getNumberOfReleases() < $maximumNumberOfReleases) { $releaseOptions = isset($this->queueSettings['releaseOptions']) ? $this->queueSettings['releaseOptions'] : []; $this->queue->release($message->getIdentifier(), $releaseOptions); $this->queue->reQueueMessage($message, $releaseOptions); - $this->outputLine( - 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - RELEASE', - $message->getIdentifier(), - $this->queue->getName(), - $message->getNumberOfReleases() + 1, - $maximumNumberOfReleases + 1 + $this->output->outputLine( + 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - RELEASE', + [ + $message->getIdentifier(), + $this->queue->getName(), + $message->getNumberOfReleases() + 1, + $maximumNumberOfReleases + 1, + ] ); - $this->outputLine('Message: %s', join('', $commandOutput)); - } else { $this->queue->abort($message->getIdentifier()); - $this->outputLine( - 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING', - $message->getIdentifier(), - $this->queue->getName(), - $message->getNumberOfReleases() + 1, - $maximumNumberOfReleases + 1 + $this->output->outputLine( + 'Job execution for job (message: "%s", queue: "%s") failed (%d/%d trials) - ABORTING', + [ + $message->getIdentifier(), + $this->queue->getName(), + $message->getNumberOfReleases() + 1, + $maximumNumberOfReleases + 1, + ] ); - $this->outputLine('Message: %s', join('', $commandOutput)); } - } - - if ($messageCacheIdentifier !== null) { - $this->messageCache->remove($messageCacheIdentifier); - } - } - - protected function outputLine(string $text, ...$arguments) - { - $this->output->outputLine($text, $arguments); + }); } } diff --git a/Configuration/Caches.yaml b/Configuration/Caches.yaml new file mode 100644 index 0000000..1037265 --- /dev/null +++ b/Configuration/Caches.yaml @@ -0,0 +1,3 @@ +Netlogix_JobQueue_FastRabbit_SingletonPreloaderCache: + frontend: Neos\Cache\Frontend\VariableFrontend + backend: Neos\Cache\Backend\SimpleFileBackend diff --git a/Configuration/Objects.yaml b/Configuration/Objects.yaml new file mode 100644 index 0000000..9244cc8 --- /dev/null +++ b/Configuration/Objects.yaml @@ -0,0 +1,28 @@ +'Netlogix\JobQueue\FastRabbit\SingletonPreloading\SingletonsPreloader': + + # Don't create any singleton instances by default. This cautious setting completely + # avoids creating "expiring" singleton instances, like doctrine DBAL connections, + # during the preloading phase. + + className: Netlogix\JobQueue\FastRabbit\SingletonPreloading\NoneSingletonsPreloader + + + # Create "all" singletons. Those with constructor argument injection are excluded + # to avoid the before-mentioned Doctrine DBAL, which comes through the EntityManager. + # + # As soon as injected properties are "lazy: false", which needs to be the case + # for properly typed ones and which usually comes when switching from doc comments + # to PHP 8 attributes, the loader can't easily decide which singleton depends on + # an "expiring" object, so the "all" preloader might silently load doctrine DBAL. + # + # className: Netlogix\JobQueue\FastRabbit\SingletonPreloading\AllSingletonsPreloader + + +'Netlogix.JobQueue.FakeQueue:SingletonPreloaderCache': + className: Neos\Cache\Frontend\VariableFrontend + scope: singleton + factoryObjectName: Neos\Flow\Cache\CacheManager + factoryMethodName: getCache + arguments: + 1: + value: Netlogix_JobQueue_FastRabbit_SingletonPreloaderCache diff --git a/Configuration/Settings.SingletonsPreloader.yaml b/Configuration/Settings.SingletonsPreloader.yaml new file mode 100644 index 0000000..8bb4e9d --- /dev/null +++ b/Configuration/Settings.SingletonsPreloader.yaml @@ -0,0 +1,14 @@ +Netlogix: + JobQueue: + FastRabbit: + + AllSingletonsPreloader: + ignoreClassNames: + # cannot be preloaded due to missing constructor arguments + Neos\Flow\ObjectManagement\CompileTimeObjectManager: Neos\Flow\ObjectManagement\CompileTimeObjectManager + Neos\Flow\Property\TypeConverter\AbstractTypeConverter: Neos\Flow\Property\TypeConverter\AbstractTypeConverter + Neos\Flow\Security\Authentication\Controller\AbstractAuthenticationController: Neos\Flow\Security\Authentication\Controller\AbstractAuthenticationController + Neos\Flow\Session\Aspect\SessionObjectMethodsPointcutFilter: Neos\Flow\Session\Aspect\SessionObjectMethodsPointcutFilter + # must not be preloaded because they hold connections that can expire + Doctrine\DBAL\Connection: Doctrine\DBAL\Connection + Doctrine\ORM\EntityManagerInterface: Doctrine\ORM\EntityManagerInterface diff --git a/bin/fast-rabbit b/bin/fast-rabbit index cf96dd7..eae82f7 100755 --- a/bin/fast-rabbit +++ b/bin/fast-rabbit @@ -1,10 +1,12 @@ #!/usr/bin/env php injectMessageCache($messageCache); + +$worker = new Worker($command, $pool, $queue, $queueSettings, $messageCache, $lock); +$loop = new Loop($queue, $pool, Loop::SIX_HOURS_IN_SECONDS); $loop->runMessagesOnWorker($worker); diff --git a/composer.json b/composer.json index 4fce6c0..49259c8 100644 --- a/composer.json +++ b/composer.json @@ -1,7 +1,7 @@ { "name": "netlogix/jobqueue-fast-rabbit", "type": "neos-package", - "description": "Low memory footprint worker for RabbitMQ jobs", + "description": "Worker for RabbitMQ jobs", "license": "MIT", "autoload": { "psr-4": { @@ -11,9 +11,13 @@ "require": { "ext-json": "*", "flowpack/jobqueue-common": "^3.0.0", + "netlogix/jobqueue-pool": "^1.0", "netlogix/supervisor": "^1.0", "php-amqplib/php-amqplib": "^2.11", - "t3n/jobqueue-rabbitmq": "^2.3.0" + "react/child-process": "^0.6", + "react/event-loop": "^1.5", + "t3n/jobqueue-rabbitmq": "^2.3.0", + "php": "~8.2 || ~8.3 || ~8.4" }, "extra": { "neos": {