diff --git a/psalm-baseline.xml b/psalm-baseline.xml
index 10e3ad5fd..61dd32203 100644
--- a/psalm-baseline.xml
+++ b/psalm-baseline.xml
@@ -160,15 +160,9 @@
-
-
-
-
-
-
@@ -1475,6 +1469,8 @@
$converter ?? DataConverter::createDefault(),
$rpc ?? Goridge::create(),
$credentials,
+ $pluginRegistry ?? new PluginRegistry(),
+ $client,
)]]>
diff --git a/src/Client/ScheduleClient.php b/src/Client/ScheduleClient.php
index d97a0002c..7f28cfd9b 100644
--- a/src/Client/ScheduleClient.php
+++ b/src/Client/ScheduleClient.php
@@ -32,6 +32,11 @@
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Internal\Mapper\ScheduleMapper;
+use Temporal\Plugin\ConnectionPluginContext;
+use Temporal\Plugin\ConnectionPluginInterface;
+use Temporal\Plugin\PluginRegistry;
+use Temporal\Plugin\ScheduleClientPluginContext;
+use Temporal\Plugin\ScheduleClientPluginInterface;
use Temporal\Internal\Marshaller\Mapper\AttributeMapperFactory;
use Temporal\Internal\Marshaller\Marshaller;
use Temporal\Internal\Marshaller\MarshallerInterface;
@@ -45,14 +50,38 @@ final class ScheduleClient implements ScheduleClientInterface
private DataConverterInterface $converter;
private MarshallerInterface $marshaller;
private ProtoToArrayConverter $protoConverter;
+ private PluginRegistry $pluginRegistry;
public function __construct(
ServiceClientInterface $serviceClient,
?ClientOptions $options = null,
?DataConverterInterface $converter = null,
+ ?PluginRegistry $pluginRegistry = null,
) {
$this->clientOptions = $options ?? new ClientOptions();
$this->converter = $converter ?? DataConverter::createDefault();
+ $this->pluginRegistry = $pluginRegistry ?? new PluginRegistry();
+
+ // Apply connection plugins (before client-level configuration)
+ $connectionContext = new ConnectionPluginContext($serviceClient);
+ foreach ($this->pluginRegistry->getPlugins(ConnectionPluginInterface::class) as $plugin) {
+ $plugin->configureServiceClient($connectionContext);
+ }
+ $serviceClient = $connectionContext->getServiceClient();
+
+ $pluginContext = new ScheduleClientPluginContext(
+ clientOptions: $this->clientOptions,
+ dataConverter: $this->converter,
+ );
+ foreach ($this->pluginRegistry->getPlugins(ScheduleClientPluginInterface::class) as $plugin) {
+ $plugin->configureScheduleClient($pluginContext);
+ }
+ $this->clientOptions = $pluginContext->getClientOptions();
+ $pluginConverter = $pluginContext->getDataConverter();
+ if ($pluginConverter !== null) {
+ $this->converter = $pluginConverter;
+ }
+
$this->marshaller = new Marshaller(
new AttributeMapperFactory(new AttributeReader()),
);
@@ -71,8 +100,9 @@ public static function create(
ServiceClientInterface $serviceClient,
?ClientOptions $options = null,
?DataConverterInterface $converter = null,
+ ?PluginRegistry $pluginRegistry = null,
): ScheduleClientInterface {
- return new self($serviceClient, $options, $converter);
+ return new self($serviceClient, $options, $converter, $pluginRegistry);
}
public function createSchedule(
diff --git a/src/Client/WorkflowClient.php b/src/Client/WorkflowClient.php
index e982fa5e9..6524045e0 100644
--- a/src/Client/WorkflowClient.php
+++ b/src/Client/WorkflowClient.php
@@ -38,6 +38,14 @@
use Temporal\Interceptor\WorkflowClientCallsInterceptor;
use Temporal\Internal\Client\ActivityCompletionClient;
use Temporal\Internal\Client\WorkflowProxy;
+use Temporal\Plugin\ClientPluginContext;
+use Temporal\Plugin\ClientPluginInterface;
+use Temporal\Plugin\CompositePipelineProvider;
+use Temporal\Plugin\ConnectionPluginContext;
+use Temporal\Plugin\ConnectionPluginInterface;
+use Temporal\Plugin\PluginRegistry;
+use Temporal\Plugin\ScheduleClientPluginInterface;
+use Temporal\Plugin\WorkerPluginInterface;
use Temporal\Internal\Client\WorkflowRun;
use Temporal\Internal\Client\WorkflowStarter;
use Temporal\Internal\Client\WorkflowStub;
@@ -63,6 +71,7 @@ class WorkflowClient implements WorkflowClientInterface
private DataConverterInterface $converter;
private ?WorkflowStarter $starter = null;
private WorkflowReader $reader;
+ private PluginRegistry $pluginRegistry;
/** @var Pipeline */
private Pipeline $interceptorPipeline;
@@ -72,11 +81,40 @@ public function __construct(
?ClientOptions $options = null,
?DataConverterInterface $converter = null,
?PipelineProvider $interceptorProvider = null,
+ ?PluginRegistry $pluginRegistry = null,
) {
- $this->interceptorPipeline = ($interceptorProvider ?? new SimplePipelineProvider())
- ->getPipeline(WorkflowClientCallsInterceptor::class);
+ $this->pluginRegistry = $pluginRegistry ?? new PluginRegistry();
$this->clientOptions = $options ?? new ClientOptions();
$this->converter = $converter ?? DataConverter::createDefault();
+
+ // Apply connection plugins (before client-level configuration)
+ $connectionContext = new ConnectionPluginContext($serviceClient);
+ foreach ($this->pluginRegistry->getPlugins(ConnectionPluginInterface::class) as $plugin) {
+ $plugin->configureServiceClient($connectionContext);
+ }
+ $serviceClient = $connectionContext->getServiceClient();
+
+ $pluginContext = new ClientPluginContext(
+ clientOptions: $this->clientOptions,
+ dataConverter: $this->converter,
+ );
+ foreach ($this->pluginRegistry->getPlugins(ClientPluginInterface::class) as $plugin) {
+ $plugin->configureClient($pluginContext);
+ }
+
+ $this->clientOptions = $pluginContext->getClientOptions();
+ $pluginConverter = $pluginContext->getDataConverter();
+ if ($pluginConverter !== null) {
+ $this->converter = $pluginConverter;
+ }
+
+ // Build interceptor pipeline: merge plugin-contributed interceptors with user-provided ones
+ $provider = new CompositePipelineProvider(
+ $pluginContext->getInterceptors(),
+ $interceptorProvider ?? new SimplePipelineProvider(),
+ );
+
+ $this->interceptorPipeline = $provider->getPipeline(WorkflowClientCallsInterceptor::class);
$this->reader = new WorkflowReader($this->createReader());
// Set Temporal-Namespace metadata
@@ -88,16 +126,34 @@ public function __construct(
);
}
- /**
- * @return static
- */
public static function create(
ServiceClientInterface $serviceClient,
?ClientOptions $options = null,
?DataConverterInterface $converter = null,
?PipelineProvider $interceptorProvider = null,
+ ?PluginRegistry $pluginRegistry = null,
): self {
- return new self($serviceClient, $options, $converter, $interceptorProvider);
+ return new self($serviceClient, $options, $converter, $interceptorProvider, $pluginRegistry);
+ }
+
+ /**
+ * Get plugins that also implement WorkerPluginInterface for propagation to workers.
+ *
+ * @return list
+ */
+ public function getWorkerPlugins(): array
+ {
+ return $this->pluginRegistry->getPlugins(WorkerPluginInterface::class);
+ }
+
+ /**
+ * Get plugins that also implement ScheduleClientPluginInterface for propagation to schedule clients.
+ *
+ * @return list
+ */
+ public function getScheduleClientPlugins(): array
+ {
+ return $this->pluginRegistry->getPlugins(ScheduleClientPluginInterface::class);
}
public function getServiceClient(): ServiceClientInterface
diff --git a/src/Interceptor/SimplePipelineProvider.php b/src/Interceptor/SimplePipelineProvider.php
index 56360f6a1..ba0f1ab1d 100644
--- a/src/Interceptor/SimplePipelineProvider.php
+++ b/src/Interceptor/SimplePipelineProvider.php
@@ -22,14 +22,28 @@ class SimplePipelineProvider implements PipelineProvider
* @param array $interceptors
*/
public function __construct(
- private iterable $interceptors = [],
+ private readonly iterable $interceptors = [],
) {}
+ /**
+ * Create a new provider with additional interceptors prepended.
+ *
+ * @param list $interceptors Interceptors to prepend before existing ones.
+ */
+ public function withPrependedInterceptors(array $interceptors): self
+ {
+ if ($interceptors === []) {
+ return $this;
+ }
+
+ return new self(\array_merge($interceptors, [...$this->interceptors]));
+ }
+
public function getPipeline(string $interceptorClass): Pipeline
{
return $this->cache[$interceptorClass] ??= Pipeline::prepare(
\array_filter(
- $this->interceptors,
+ [...$this->interceptors],
static fn(Interceptor $i): bool => $i instanceof $interceptorClass,
),
);
diff --git a/src/Internal/Interceptor/Pipeline.php b/src/Internal/Interceptor/Pipeline.php
index 484b777b6..1d7f9cc03 100644
--- a/src/Internal/Interceptor/Pipeline.php
+++ b/src/Internal/Interceptor/Pipeline.php
@@ -53,7 +53,7 @@ private function __construct(
/**
* Make sure that interceptors implement the same interface.
*
- * @template T of Interceptor
+ * @template T of object
*
* @param iterable $interceptors
*
diff --git a/src/Internal/Transport/Router/GetWorkerInfo.php b/src/Internal/Transport/Router/GetWorkerInfo.php
index 915141de7..596c214d4 100644
--- a/src/Internal/Transport/Router/GetWorkerInfo.php
+++ b/src/Internal/Transport/Router/GetWorkerInfo.php
@@ -18,6 +18,8 @@
use Temporal\Internal\Declaration\Prototype\WorkflowPrototype;
use Temporal\Internal\Marshaller\MarshallerInterface;
use Temporal\Internal\Repository\RepositoryInterface;
+use Temporal\Plugin\PluginInterface;
+use Temporal\Plugin\PluginRegistry;
use Temporal\Worker\ServiceCredentials;
use Temporal\Worker\Transport\Command\ServerRequestInterface;
use Temporal\Worker\WorkerInterface;
@@ -28,6 +30,7 @@ public function __construct(
private readonly RepositoryInterface $queues,
private readonly MarshallerInterface $marshaller,
private readonly ServiceCredentials $credentials,
+ private readonly PluginRegistry $pluginRegistry,
) {}
public function handle(ServerRequestInterface $request, array $headers, Deferred $resolver): void
@@ -54,6 +57,10 @@ private function workerToArray(WorkerInterface $worker): array
'Name' => $activity->getID(),
];
+ $map = $this->map($this->pluginRegistry->getPlugins(PluginInterface::class), static fn(PluginInterface $plugin): array => [
+ 'Name' => $plugin->getName(),
+ 'Version' => null,
+ ]);
return [
'TaskQueue' => $worker->getID(),
'Options' => $this->marshaller->marshal($worker->getOptions()),
@@ -62,6 +69,7 @@ private function workerToArray(WorkerInterface $worker): array
// ActivityInfo[]
'Activities' => $this->map($worker->getActivities(), $activityMap),
'PhpSdkVersion' => SdkVersion::getSdkVersion(),
+ 'Plugins' => $map,
'Flags' => (object) $this->prepareFlags(),
];
}
diff --git a/src/Plugin/AbstractPlugin.php b/src/Plugin/AbstractPlugin.php
new file mode 100644
index 000000000..2b2e6d10a
--- /dev/null
+++ b/src/Plugin/AbstractPlugin.php
@@ -0,0 +1,34 @@
+name;
+ }
+}
diff --git a/src/Plugin/ClientPluginContext.php b/src/Plugin/ClientPluginContext.php
new file mode 100644
index 000000000..fad098ecd
--- /dev/null
+++ b/src/Plugin/ClientPluginContext.php
@@ -0,0 +1,81 @@
+ */
+ private array $interceptors = [];
+
+ public function __construct(
+ private ClientOptions $clientOptions,
+ private ?DataConverterInterface $dataConverter = null,
+ ) {}
+
+ public function getClientOptions(): ClientOptions
+ {
+ return $this->clientOptions;
+ }
+
+ public function setClientOptions(ClientOptions $clientOptions): self
+ {
+ $this->clientOptions = $clientOptions;
+ return $this;
+ }
+
+ public function getDataConverter(): ?DataConverterInterface
+ {
+ return $this->dataConverter;
+ }
+
+ public function setDataConverter(?DataConverterInterface $dataConverter): self
+ {
+ $this->dataConverter = $dataConverter;
+ return $this;
+ }
+
+ /**
+ * @return list
+ */
+ public function getInterceptors(): array
+ {
+ return $this->interceptors;
+ }
+
+ /**
+ * @param list $interceptors
+ */
+ public function setInterceptors(array $interceptors): self
+ {
+ $this->interceptors = $interceptors;
+ return $this;
+ }
+
+ /**
+ * Add an interceptor to the client pipeline.
+ */
+ public function addInterceptor(Interceptor $interceptor): self
+ {
+ $this->interceptors[] = $interceptor;
+ return $this;
+ }
+}
diff --git a/src/Plugin/ClientPluginInterface.php b/src/Plugin/ClientPluginInterface.php
new file mode 100644
index 000000000..c7eb36878
--- /dev/null
+++ b/src/Plugin/ClientPluginInterface.php
@@ -0,0 +1,30 @@
+ $pluginInterceptors Interceptors contributed by plugins.
+ * @param PipelineProvider $baseProvider The original user-provided pipeline provider.
+ */
+ public function __construct(
+ private readonly array $pluginInterceptors,
+ private readonly PipelineProvider $baseProvider,
+ ) {
+ $this->delegate = match (true) {
+ $pluginInterceptors === [] => $baseProvider,
+ $baseProvider instanceof SimplePipelineProvider => $baseProvider->withPrependedInterceptors($pluginInterceptors),
+ default => $this,
+ };
+ }
+
+ public function getPipeline(string $interceptorClass): Pipeline
+ {
+ if ($this->delegate !== $this) {
+ return $this->delegate->getPipeline($interceptorClass);
+ }
+
+ if (isset($this->cache[$interceptorClass])) {
+ return $this->cache[$interceptorClass];
+ }
+
+ $filtered = \array_filter(
+ $this->pluginInterceptors,
+ static fn(Interceptor $i): bool => $i instanceof $interceptorClass,
+ );
+
+ if ($filtered === []) {
+ return $this->cache[$interceptorClass] = $this->baseProvider->getPipeline($interceptorClass);
+ }
+
+ // Use only plugin interceptors - the base pipeline is lost in this edge case.
+ // Users should either use plugins OR a custom PipelineProvider, not both.
+ return $this->cache[$interceptorClass] = Pipeline::prepare($filtered);
+ }
+}
diff --git a/src/Plugin/ConnectionPluginContext.php b/src/Plugin/ConnectionPluginContext.php
new file mode 100644
index 000000000..0108e1581
--- /dev/null
+++ b/src/Plugin/ConnectionPluginContext.php
@@ -0,0 +1,39 @@
+serviceClient;
+ }
+
+ public function setServiceClient(ServiceClientInterface $serviceClient): self
+ {
+ $this->serviceClient = $serviceClient;
+ return $this;
+ }
+}
diff --git a/src/Plugin/ConnectionPluginInterface.php b/src/Plugin/ConnectionPluginInterface.php
new file mode 100644
index 000000000..4caa6f7fe
--- /dev/null
+++ b/src/Plugin/ConnectionPluginInterface.php
@@ -0,0 +1,36 @@
+ */
+ private array $plugins = [];
+
+ /**
+ * @param iterable $plugins
+ */
+ public function __construct(iterable $plugins = [])
+ {
+ foreach ($plugins as $plugin) {
+ $this->add($plugin);
+ }
+ }
+
+ public function add(ConnectionPluginInterface|ClientPluginInterface|ScheduleClientPluginInterface|WorkerPluginInterface $plugin): void
+ {
+ $name = $plugin->getName();
+ if (isset($this->plugins[$name])) {
+ throw new \RuntimeException(\sprintf(
+ 'Duplicate plugin "%s": a plugin with this name is already registered.',
+ $name,
+ ));
+ }
+ $this->plugins[$name] = $plugin;
+ }
+
+ /**
+ * Merge another set of plugins. Throws on duplicate names.
+ *
+ * @param iterable $plugins
+ */
+ public function merge(iterable $plugins): void
+ {
+ foreach ($plugins as $plugin) {
+ $this->add($plugin);
+ }
+ }
+
+ /**
+ * Get all plugins implementing a given interface.
+ *
+ * @template T of TPlugin
+ * @param class-string $interface
+ * @return list
+ */
+ public function getPlugins(string $interface): array
+ {
+ $result = [];
+ foreach ($this->plugins as $plugin) {
+ if ($plugin instanceof $interface) {
+ $result[] = $plugin;
+ }
+ }
+ return $result;
+ }
+}
diff --git a/src/Plugin/ScheduleClientPluginContext.php b/src/Plugin/ScheduleClientPluginContext.php
new file mode 100644
index 000000000..c38ce683e
--- /dev/null
+++ b/src/Plugin/ScheduleClientPluginContext.php
@@ -0,0 +1,51 @@
+clientOptions;
+ }
+
+ public function setClientOptions(ClientOptions $clientOptions): self
+ {
+ $this->clientOptions = $clientOptions;
+ return $this;
+ }
+
+ public function getDataConverter(): ?DataConverterInterface
+ {
+ return $this->dataConverter;
+ }
+
+ public function setDataConverter(?DataConverterInterface $dataConverter): self
+ {
+ $this->dataConverter = $dataConverter;
+ return $this;
+ }
+}
diff --git a/src/Plugin/ScheduleClientPluginInterface.php b/src/Plugin/ScheduleClientPluginInterface.php
new file mode 100644
index 000000000..c070d5665
--- /dev/null
+++ b/src/Plugin/ScheduleClientPluginInterface.php
@@ -0,0 +1,30 @@
+dataConverter;
+ }
+
+ public function setDataConverter(?DataConverterInterface $dataConverter): self
+ {
+ $this->dataConverter = $dataConverter;
+ return $this;
+ }
+}
diff --git a/src/Plugin/WorkerPluginContext.php b/src/Plugin/WorkerPluginContext.php
new file mode 100644
index 000000000..9c36311d2
--- /dev/null
+++ b/src/Plugin/WorkerPluginContext.php
@@ -0,0 +1,87 @@
+ */
+ private array $interceptors = [];
+
+ public function __construct(
+ private readonly string $taskQueue,
+ private WorkerOptions $workerOptions,
+ private ?ExceptionInterceptorInterface $exceptionInterceptor = null,
+ ) {}
+
+ public function getTaskQueue(): string
+ {
+ return $this->taskQueue;
+ }
+
+ public function getWorkerOptions(): WorkerOptions
+ {
+ return $this->workerOptions;
+ }
+
+ public function setWorkerOptions(WorkerOptions $workerOptions): self
+ {
+ $this->workerOptions = $workerOptions;
+ return $this;
+ }
+
+ public function getExceptionInterceptor(): ?ExceptionInterceptorInterface
+ {
+ return $this->exceptionInterceptor;
+ }
+
+ public function setExceptionInterceptor(?ExceptionInterceptorInterface $exceptionInterceptor): self
+ {
+ $this->exceptionInterceptor = $exceptionInterceptor;
+ return $this;
+ }
+
+ /**
+ * @return list
+ */
+ public function getInterceptors(): array
+ {
+ return $this->interceptors;
+ }
+
+ /**
+ * @param list $interceptors
+ */
+ public function setInterceptors(array $interceptors): self
+ {
+ $this->interceptors = $interceptors;
+ return $this;
+ }
+
+ /**
+ * Add an interceptor to the worker pipeline.
+ */
+ public function addInterceptor(Interceptor $interceptor): self
+ {
+ $this->interceptors[] = $interceptor;
+ return $this;
+ }
+}
diff --git a/src/Plugin/WorkerPluginInterface.php b/src/Plugin/WorkerPluginInterface.php
new file mode 100644
index 000000000..e496d6979
--- /dev/null
+++ b/src/Plugin/WorkerPluginInterface.php
@@ -0,0 +1,74 @@
+close();
+ * }
+ * }
+ * ```
+ *
+ * @param callable(WorkerFactoryInterface): int $next Calls the next plugin or the actual run loop.
+ */
+ public function run(WorkerFactoryInterface $factory, callable $next): int;
+}
diff --git a/src/Plugin/WorkerPluginTrait.php b/src/Plugin/WorkerPluginTrait.php
new file mode 100644
index 000000000..a2750f4f6
--- /dev/null
+++ b/src/Plugin/WorkerPluginTrait.php
@@ -0,0 +1,43 @@
+converter = $dataConverter;
+ $this->pluginRegistry = $pluginRegistry ?? new PluginRegistry();
+ // Propagate worker plugins from the client
+ if ($client !== null) {
+ $this->pluginRegistry->merge($client->getWorkerPlugins());
+ }
+
+ // Apply worker factory plugins
+ $factoryContext = new WorkerFactoryPluginContext(
+ dataConverter: $dataConverter,
+ );
+ foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) {
+ $plugin->configureWorkerFactory($factoryContext);
+ }
+
+ $this->converter = $factoryContext->getDataConverter() ?? $dataConverter;
$this->boot($credentials ?? ServiceCredentials::create());
}
@@ -118,11 +142,15 @@ public static function create(
?DataConverterInterface $converter = null,
?RPCConnectionInterface $rpc = null,
?ServiceCredentials $credentials = null,
+ ?PluginRegistry $pluginRegistry = null,
+ ?WorkflowClient $client = null,
): static {
return new static(
$converter ?? DataConverter::createDefault(),
$rpc ?? Goridge::create(),
$credentials,
+ $pluginRegistry ?? new PluginRegistry(),
+ $client,
);
}
@@ -134,13 +162,32 @@ public function newWorker(
?LoggerInterface $logger = null,
): WorkerInterface {
$options ??= WorkerOptions::new();
+
+ // Apply worker plugins
+ $workerContext = new WorkerPluginContext(
+ taskQueue: $taskQueue,
+ workerOptions: $options,
+ exceptionInterceptor: $exceptionInterceptor,
+ );
+ foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) {
+ $plugin->configureWorker($workerContext);
+ }
+
+ $options = $workerContext->getWorkerOptions();
+
+ // Merge plugin-contributed interceptors with user-provided ones
+ $provider = new CompositePipelineProvider(
+ $workerContext->getInterceptors(),
+ $interceptorProvider ?? new SimplePipelineProvider(),
+ );
+
$worker = new Worker(
$taskQueue,
$options,
ServiceContainer::fromWorkerFactory(
$this,
- $exceptionInterceptor ?? ExceptionInterceptor::createDefault(),
- $interceptorProvider ?? new SimplePipelineProvider(),
+ $workerContext->getExceptionInterceptor() ?? ExceptionInterceptor::createDefault(),
+ $provider,
new Logger(
$logger ?? new StderrLogger(),
$options->enableLoggingInReplay,
@@ -149,11 +196,22 @@ public function newWorker(
),
$this->rpc,
);
+
+ // Call initializeWorker hooks (forward order)
+ foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) {
+ $plugin->initializeWorker($worker);
+ }
+
$this->queues->add($worker);
return $worker;
}
+ public function getPluginRegistry(): PluginRegistry
+ {
+ return $this->pluginRegistry;
+ }
+
public function getReader(): ReaderInterface
{
return $this->reader;
@@ -192,15 +250,21 @@ public function run(?HostConnectionInterface $host = null): int
$host ??= RoadRunner::create();
$this->codec = $this->createCodec();
- while ($msg = $host->waitBatch()) {
- try {
- $host->send($this->dispatch($msg->messages, $msg->context));
- } catch (\Throwable $e) {
- $host->error($e);
+ $plugins = $this->pluginRegistry->getPlugins(WorkerPluginInterface::class);
+ $pipeline = Pipeline::prepare($plugins);
+
+ return $pipeline->with(function () use ($host): int {
+ while ($msg = $host->waitBatch()) {
+ try {
+ $host->send($this->dispatch($msg->messages, $msg->context));
+ } catch (\Throwable $e) {
+ $host->error($e);
+ }
}
- }
- return 0;
+ return 0;
+ /** @see WorkerPluginInterface::run() */
+ }, 'run')($this);
}
public function tick(): void
@@ -232,7 +296,12 @@ protected function createTaskQueue(): RepositoryInterface
protected function createRouter(ServiceCredentials $credentials): RouterInterface
{
$router = new Router();
- $router->add(new Router\GetWorkerInfo($this->queues, $this->marshaller, $credentials));
+ $router->add(new Router\GetWorkerInfo(
+ $this->queues,
+ $this->marshaller,
+ $credentials,
+ $this->pluginRegistry,
+ ));
return $router;
}
diff --git a/testing/src/Environment.php b/testing/src/Environment.php
index 2fa8119ee..6260f473a 100644
--- a/testing/src/Environment.php
+++ b/testing/src/Environment.php
@@ -21,12 +21,14 @@ final class Environment
private ?Process $temporalTestServerProcess = null;
private ?Process $temporalServerProcess = null;
private ?Process $roadRunnerProcess = null;
+ private bool $externalTemporalProcessActive = false;
public function __construct(
OutputInterface $output,
private Downloader $downloader,
private SystemInfo $systemInfo,
?Command $command = null,
+ private bool $allowExternalTemporalProcess = false,
) {
$this->io = $output instanceof SymfonyStyle
? $output
@@ -37,6 +39,7 @@ public function __construct(
public static function create(?Command $command = null): self
{
$token = \getenv('GITHUB_TOKEN');
+ $allowExternalTemporalProcess = \getenv('ALLOW_EXTERNAL_TEMPORAL_PROCESS') === 'true';
$systemInfo = SystemInfo::detect();
\is_string(\getenv('ROADRUNNER_BINARY')) and $systemInfo->rrExecutable = \getenv('ROADRUNNER_BINARY');
@@ -50,6 +53,7 @@ public static function create(?Command $command = null): self
])),
$systemInfo,
$command,
+ $allowExternalTemporalProcess,
);
}
@@ -139,17 +143,23 @@ public function startTemporalServer(
}
if (!$temporalStarted || !$this->temporalServerProcess->isRunning()) {
- $this->io->error([
- \sprintf(
- 'Error starting Temporal server: %s.',
- !$temporalStarted ? "Health check failed" : $this->temporalServerProcess->getErrorOutput(),
- ),
- \sprintf(
- 'Command: `%s`.',
- $this->serializeProcess($this->temporalServerProcess),
- ),
- ]);
- exit(1);
+ $errorOutput = $this->temporalServerProcess->getErrorOutput();
+ if (!$this->allowExternalTemporalProcess || !\str_contains($errorOutput, 'address already in use')) {
+ $this->io->error([
+ \sprintf(
+ 'Error starting Temporal server: %s.',
+ !$temporalStarted ? "Health check failed" : $errorOutput,
+ ),
+ \sprintf(
+ 'Command: `%s`.',
+ $this->serializeProcess($this->temporalServerProcess),
+ ),
+ ]);
+ exit(1);
+ }
+ $this->io->warning('Using external Temporal Server');
+
+ $this->externalTemporalProcessActive = true;
}
$this->io->info('Temporal server started.');
}
@@ -174,17 +184,21 @@ public function startTemporalTestServer(int $commandTimeout = 10): void
\sleep(1);
if (!$this->temporalTestServerProcess->isRunning()) {
- $this->io->error([
- \sprintf(
- 'Error starting Temporal Test server: %s.',
- $this->temporalTestServerProcess->getErrorOutput(),
- ),
- \sprintf(
- 'Command: `%s`.',
- $this->serializeProcess($this->temporalTestServerProcess),
- ),
- ]);
- exit(1);
+ $errorOutput = $this->temporalTestServerProcess->getErrorOutput();
+ if (!$this->allowExternalTemporalProcess || !\str_contains($errorOutput, 'address already in use')) {
+ $this->io->error([
+ \sprintf(
+ 'Error starting Temporal Test server: %s.',
+ $errorOutput,
+ ),
+ \sprintf(
+ 'Command: `%s`.',
+ $this->serializeProcess($this->temporalTestServerProcess),
+ ),
+ ]);
+ exit(1);
+ }
+ $this->io->warning('Using external Temporal Test Server');
}
$this->io->info('Temporal Test server started.');
}
@@ -263,8 +277,7 @@ public function stopTemporalServer(): void
{
if ($this->isTemporalRunning()) {
$this->io->info('Stopping Temporal server... ');
- $this->temporalServerProcess->stop();
- $this->temporalServerProcess = null;
+ $this->stopTemporalServerProcess();
$this->io->info('Temporal server stopped.');
}
}
@@ -273,8 +286,7 @@ public function stopTemporalTestServer(): void
{
if ($this->isTemporalTestRunning()) {
$this->io->info('Stopping Temporal Test server... ');
- $this->temporalTestServerProcess->stop();
- $this->temporalTestServerProcess = null;
+ $this->stopTemporalTestServerProcess();
$this->io->info('Temporal Test server stopped.');
}
}
@@ -291,7 +303,8 @@ public function stopRoadRunner(): void
public function isTemporalRunning(): bool
{
- return $this->temporalServerProcess?->isRunning() === true;
+ return ($this->allowExternalTemporalProcess && $this->externalTemporalProcessActive) ||
+ $this->temporalServerProcess?->isRunning() === true;
}
public function isRoadRunnerRunning(): bool
@@ -301,7 +314,28 @@ public function isRoadRunnerRunning(): bool
public function isTemporalTestRunning(): bool
{
- return $this->temporalTestServerProcess?->isRunning() === true;
+ return ($this->allowExternalTemporalProcess && $this->externalTemporalProcessActive) ||
+ $this->temporalTestServerProcess?->isRunning() === true;
+ }
+
+ private function stopTemporalTestServerProcess(): void
+ {
+ if ($this->externalTemporalProcessActive) {
+ $this->externalTemporalProcessActive = false;
+ return;
+ }
+ $this->temporalTestServerProcess->stop();
+ $this->temporalTestServerProcess = null;
+ }
+
+ private function stopTemporalServerProcess(): void
+ {
+ if ($this->externalTemporalProcessActive) {
+ $this->externalTemporalProcessActive = false;
+ return;
+ }
+ $this->temporalServerProcess->stop();
+ $this->temporalServerProcess = null;
}
private function serializeProcess(?Process $temporalServerProcess): string|array
diff --git a/testing/src/WorkerFactory.php b/testing/src/WorkerFactory.php
index 181b3bca0..41691c980 100644
--- a/testing/src/WorkerFactory.php
+++ b/testing/src/WorkerFactory.php
@@ -6,6 +6,7 @@
use Psr\Log\LoggerInterface;
use Psr\Log\NullLogger;
+use Temporal\Client\WorkflowClient;
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Exception\ExceptionInterceptor;
@@ -14,6 +15,10 @@
use Temporal\Interceptor\SimplePipelineProvider;
use Temporal\Internal\ServiceContainer;
use Temporal\Internal\Workflow\Logger;
+use Temporal\Plugin\CompositePipelineProvider;
+use Temporal\Plugin\PluginRegistry;
+use Temporal\Plugin\WorkerPluginContext;
+use Temporal\Plugin\WorkerPluginInterface;
use Temporal\Worker\ActivityInvocationCache\ActivityInvocationCacheInterface;
use Temporal\Worker\ActivityInvocationCache\RoadRunnerActivityInvocationCache;
use Temporal\Worker\ServiceCredentials;
@@ -32,16 +37,20 @@ public function __construct(
RPCConnectionInterface $rpc,
ActivityInvocationCacheInterface $activityCache,
?ServiceCredentials $credentials = null,
+ ?PluginRegistry $pluginRegistry = null,
+ ?WorkflowClient $client = null,
) {
$this->activityCache = $activityCache;
- parent::__construct($dataConverter, $rpc, $credentials ?? ServiceCredentials::create());
+ parent::__construct($dataConverter, $rpc, $credentials ?? ServiceCredentials::create(), $pluginRegistry, $client);
}
public static function create(
?DataConverterInterface $converter = null,
?RPCConnectionInterface $rpc = null,
?ServiceCredentials $credentials = null,
+ ?PluginRegistry $pluginRegistry = null,
+ ?WorkflowClient $client = null,
?ActivityInvocationCacheInterface $activityCache = null,
): static {
return new static(
@@ -49,6 +58,8 @@ public static function create(
$rpc ?? Goridge::create(),
$activityCache ?? RoadRunnerActivityInvocationCache::create($converter),
$credentials,
+ $pluginRegistry ?? new PluginRegistry(),
+ $client,
);
}
@@ -60,14 +71,32 @@ public function newWorker(
?LoggerInterface $logger = null,
): WorkerInterface {
$options ??= WorkerOptions::new();
+
+ $workerContext = new WorkerPluginContext(
+ taskQueue: $taskQueue,
+ workerOptions: $options,
+ exceptionInterceptor: $exceptionInterceptor,
+ );
+ foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) {
+ $plugin->configureWorker($workerContext);
+ }
+
+ $options = $workerContext->getWorkerOptions();
+
+ // Merge plugin-contributed interceptors with user-provided ones
+ $provider = new CompositePipelineProvider(
+ $workerContext->getInterceptors(),
+ $interceptorProvider ?? new SimplePipelineProvider(),
+ );
+
$worker = new WorkerMock(
new Worker(
$taskQueue,
- $options ?? WorkerOptions::new(),
+ $options,
ServiceContainer::fromWorkerFactory(
$this,
- $exceptionInterceptor ?? ExceptionInterceptor::createDefault(),
- $interceptorProvider ?? new SimplePipelineProvider(),
+ $workerContext->getExceptionInterceptor() ?? ExceptionInterceptor::createDefault(),
+ $provider,
new Logger(
$logger ?? new NullLogger(),
$options->enableLoggingInReplay,
@@ -78,6 +107,12 @@ public function newWorker(
),
$this->activityCache,
);
+
+ // Call initializeWorker hooks (forward order)
+ foreach ($this->pluginRegistry->getPlugins(WorkerPluginInterface::class) as $plugin) {
+ $plugin->initializeWorker($worker);
+ }
+
$this->queues->add($worker);
return $worker;
diff --git a/tests/Acceptance/App/Attribute/Worker.php b/tests/Acceptance/App/Attribute/Worker.php
index 819806921..87e572313 100644
--- a/tests/Acceptance/App/Attribute/Worker.php
+++ b/tests/Acceptance/App/Attribute/Worker.php
@@ -6,6 +6,7 @@
use Psr\Log\LoggerInterface;
use Temporal\Interceptor\PipelineProvider;
+use Temporal\Plugin\PluginInterface;
use Temporal\Worker\WorkerOptions;
/**
@@ -22,10 +23,12 @@ final class Worker
* @param array|null $options Callable that returns {@see WorkerOptions}
* @param array|null $pipelineProvider Callable that returns {@see PipelineProvider}
* @param array|null $logger Callable that returns {@see LoggerInterface}
+ * @param array|null $plugins
*/
public function __construct(
public readonly ?array $options = null,
public readonly ?array $pipelineProvider = null,
public readonly ?array $logger = null,
+ public readonly ?array $plugins = null,
) {}
}
diff --git a/tests/Acceptance/App/Feature/WorkerFactory.php b/tests/Acceptance/App/Feature/WorkerFactory.php
index b6c007a71..c2c14f741 100644
--- a/tests/Acceptance/App/Feature/WorkerFactory.php
+++ b/tests/Acceptance/App/Feature/WorkerFactory.php
@@ -35,16 +35,19 @@ public function createWorker(
...$feature->workflows,
...$feature->activities,
);
- if ($attr !== null) {
- $attr->options === null or $options = $this->invoker->invoke($attr->options);
- $attr->pipelineProvider === null or $interceptorProvider = $this->invoker->invoke($attr->pipelineProvider);
- $attr->logger === null or $logger = $this->invoker->invoke($attr->logger);
+ $options = $attr?->options === null ? null : $this->invoker->invoke($attr->options);
+ $interceptorProvider = $attr?->pipelineProvider === null ? null : $this->invoker->invoke($attr->pipelineProvider);
+ $logger = $attr?->logger === null ? null : $this->invoker->invoke($attr->logger);
+
+ // Add plugins from the attribute to the factory's registry (already instantiated, no invoker needed)
+ if ($attr?->plugins !== null) {
+ $this->workerFactory->getPluginRegistry()->merge($attr->plugins);
}
return $this->workerFactory->newWorker(
$feature->taskQueue,
$options ?? WorkerOptions::new()->withMaxConcurrentActivityExecutionSize(10),
- interceptorProvider: $interceptorProvider ?? null,
+ interceptorProvider: $interceptorProvider,
logger: $logger ?? LoggerFactory::createServerLogger($feature->taskQueue),
);
}
@@ -53,7 +56,7 @@ public function createWorker(
* Find {@see Worker} attribute in the classes collection.
* If more than one attribute is found, an exception is thrown.
*/
- private static function findAttribute(string ...$classes): ?Worker
+ public static function findAttribute(string ...$classes): ?Worker
{
$classes = \array_unique($classes);
/** @var array $found */
diff --git a/tests/Acceptance/App/TestCase.php b/tests/Acceptance/App/TestCase.php
index 7f3c726ff..e6f4675a6 100644
--- a/tests/Acceptance/App/TestCase.php
+++ b/tests/Acceptance/App/TestCase.php
@@ -11,9 +11,15 @@
use Spiral\Core\Scope;
use Temporal\Api\Enums\V1\EventType;
use Temporal\Api\Failure\V1\Failure;
+use Temporal\Client\ClientOptions;
+use Temporal\Client\WorkflowClient;
use Temporal\Client\WorkflowClientInterface;
use Temporal\Client\WorkflowStubInterface;
use Temporal\Exception\TemporalException;
+use Temporal\Plugin\ClientPluginInterface;
+use Temporal\Plugin\PluginRegistry;
+use Temporal\Tests\Acceptance\App\Attribute\Worker;
+use Temporal\Tests\Acceptance\App\Feature\WorkerFactory;
use Temporal\Tests\Acceptance\App\Logger\ClientLogger;
use Temporal\Tests\Acceptance\App\Logger\LoggerFactory;
use Temporal\Tests\Acceptance\App\Runtime\ContainerFacade;
@@ -45,14 +51,33 @@ protected function runTest(): mixed
$logger = LoggerFactory::createClientLogger($feature->taskQueue);
$logger->clear();
+ // Build scope bindings
+ $bindings = [
+ Feature::class => $feature,
+ static::class => $this,
+ State::class => $runtime,
+ LoggerInterface::class => ClientLogger::class,
+ ClientLogger::class => $logger,
+ ];
+
+ // Auto-inject plugin-configured client from #[Worker(plugins: [...])] attribute
+ $workerAttr = WorkerFactory::findAttribute(static::class);
+ if ($workerAttr?->plugins !== null) {
+ $pluginRegistry = new PluginRegistry($workerAttr->plugins);
+ $clientPlugins = $pluginRegistry->getPlugins(ClientPluginInterface::class);
+ if ($clientPlugins !== []) {
+ $existingClient = $container->get(WorkflowClientInterface::class);
+ $pluginClient = WorkflowClient::create(
+ serviceClient: $existingClient->getServiceClient(),
+ options: (new ClientOptions())->withNamespace($runtime->namespace),
+ pluginRegistry: new PluginRegistry($workerAttr->plugins),
+ );
+ $bindings[WorkflowClientInterface::class] = $pluginClient;
+ }
+ }
+
return $container->runScope(
- new Scope(name: 'feature', bindings: [
- Feature::class => $feature,
- static::class => $this,
- State::class => $runtime,
- LoggerInterface::class => ClientLogger::class,
- ClientLogger::class => $logger,
- ]),
+ new Scope(name: 'feature', bindings: $bindings),
function (Container $container): mixed {
$reflection = new \ReflectionMethod($this, $this->name());
$args = $container->resolveArguments($reflection);
diff --git a/tests/Acceptance/Extra/Plugin/ClientPluginTest.php b/tests/Acceptance/Extra/Plugin/ClientPluginTest.php
new file mode 100644
index 000000000..53f460164
--- /dev/null
+++ b/tests/Acceptance/Extra/Plugin/ClientPluginTest.php
@@ -0,0 +1,180 @@
+getServiceClient(),
+ options: (new ClientOptions())->withNamespace($runtime->namespace),
+ pluginRegistry: new PluginRegistry([new PrefixPlugin()]),
+ )->withTimeout(5);
+
+ $stub = $pluginClient->newUntypedWorkflowStub(
+ 'Extra_Plugin_ClientPlugin',
+ WorkflowOptions::new()->withTaskQueue($feature->taskQueue),
+ );
+ $pluginClient->start($stub, 'hello');
+
+ $result = $stub->getResult('string');
+ self::assertSame('plugin:hello', $result);
+ }
+
+ /**
+ * Multiple plugins apply interceptors in registration order.
+ */
+ #[Test]
+ public function multiplePluginsApplyInOrder(
+ WorkflowClientInterface $client,
+ Feature $feature,
+ State $runtime,
+ ): void {
+ $pluginClient = WorkflowClient::create(
+ serviceClient: $client->getServiceClient(),
+ options: (new ClientOptions())->withNamespace($runtime->namespace),
+ pluginRegistry: new PluginRegistry([new PrefixPlugin('A:'), new PrefixPlugin2('B:')]),
+ )->withTimeout(5);
+
+ $stub = $pluginClient->newUntypedWorkflowStub(
+ 'Extra_Plugin_ClientPlugin',
+ WorkflowOptions::new()->withTaskQueue($feature->taskQueue),
+ );
+ $pluginClient->start($stub, 'test');
+
+ $result = $stub->getResult('string');
+ // Plugin interceptors prepend, so A runs first, then B
+ self::assertSame('B:A:test', $result);
+ }
+
+ /**
+ * Duplicate plugin names throw exception.
+ */
+ #[Test]
+ public function duplicatePluginThrowsException(
+ WorkflowClientInterface $client,
+ State $runtime,
+ ): void {
+ $this->expectException(\RuntimeException::class);
+ $this->expectExceptionMessage('Duplicate plugin "prefix-plugin"');
+
+ WorkflowClient::create(
+ serviceClient: $client->getServiceClient(),
+ options: (new ClientOptions())->withNamespace($runtime->namespace),
+ pluginRegistry: new PluginRegistry([new PrefixPlugin(), new PrefixPlugin()]),
+ );
+ }
+
+ /**
+ * Plugin from #[Worker(plugins: [...])] is also applied via #[Stub] attribute.
+ */
+ #[Test]
+ public function pluginAppliedViaWorkerAttribute(
+ #[Stub('Extra_Plugin_ClientPlugin', args: ['world'])]
+ WorkflowStubInterface $stub,
+ ): void {
+ self::assertSame('plugin:world', $stub->getResult('string'));
+ }
+}
+
+
+#[WorkflowInterface]
+class TestWorkflow
+{
+ #[WorkflowMethod(name: 'Extra_Plugin_ClientPlugin')]
+ public function handle(string $input)
+ {
+ return $input;
+ }
+}
+
+
+class PrefixPlugin implements ClientPluginInterface
+{
+ public function __construct(
+ private readonly string $prefix = 'plugin:',
+ ) {}
+
+ public function getName(): string
+ {
+ return 'prefix-plugin';
+ }
+
+ public function configureClient(ClientPluginContext $context): void
+ {
+ $context->addInterceptor(new PrefixInterceptor($this->prefix));
+ }
+}
+
+
+class PrefixPlugin2 implements ClientPluginInterface
+{
+ public function __construct(
+ private readonly string $prefix = 'plugin2:',
+ ) {}
+
+ public function getName(): string
+ {
+ return 'prefix-plugin-2';
+ }
+
+ public function configureClient(ClientPluginContext $context): void
+ {
+ $context->addInterceptor(new PrefixInterceptor($this->prefix));
+ }
+}
+
+
+class PrefixInterceptor implements WorkflowClientCallsInterceptor
+{
+ use WorkflowClientCallsInterceptorTrait;
+
+ public function __construct(
+ private readonly string $prefix,
+ ) {}
+
+ public function start(StartInput $input, callable $next): WorkflowExecution
+ {
+ $original = $input->arguments->getValue(0, 'string');
+
+ return $next($input->with(
+ arguments: EncodedValues::fromValues([$this->prefix . $original], DataConverter::createDefault()),
+ ));
+ }
+}
diff --git a/tests/Unit/Framework/WorkerFactoryMock.php b/tests/Unit/Framework/WorkerFactoryMock.php
index 07b1779e9..6006a486f 100644
--- a/tests/Unit/Framework/WorkerFactoryMock.php
+++ b/tests/Unit/Framework/WorkerFactoryMock.php
@@ -31,6 +31,7 @@
use Temporal\Internal\Transport\RouterInterface;
use Temporal\Internal\Transport\Server;
use Temporal\Internal\Transport\ServerInterface;
+use Temporal\Plugin\PluginRegistry;
use Temporal\Internal\Workflow\Logger;
use Temporal\Worker\Environment\Environment;
use Temporal\Worker\Environment\EnvironmentInterface;
@@ -69,6 +70,7 @@ class WorkerFactoryMock implements WorkerFactoryInterface, LoopInterface
private QueueInterface $responses;
private MarshallerInterface $marshaller;
private EnvironmentInterface $env;
+ private PluginRegistry $pluginRegistry;
public function __construct(DataConverterInterface $dataConverter)
{
@@ -112,6 +114,11 @@ public function newWorker(
return $worker;
}
+ public function getPluginRegistry(): PluginRegistry
+ {
+ return $this->pluginRegistry;
+ }
+
public function getReader(): ReaderInterface
{
return $this->reader;
@@ -168,6 +175,7 @@ public function tick(): void
private function boot(): void
{
+ $this->pluginRegistry = new PluginRegistry();
$this->reader = $this->createReader();
$this->marshaller = $this->createMarshaller($this->reader);
$this->queues = new ArrayRepository();
@@ -187,29 +195,24 @@ private function createReader(): ReaderInterface
return new AttributeReader();
}
- /**
- * @return RouterInterface
- */
private function createRouter(): RouterInterface
{
$router = new Router();
- $router->add(new Router\GetWorkerInfo($this->queues, $this->marshaller, ServiceCredentials::create()));
+ $router->add(new Router\GetWorkerInfo(
+ $this->queues,
+ $this->marshaller,
+ ServiceCredentials::create(),
+ new PluginRegistry(),
+ ));
return $router;
}
- /**
- * @return ServerInterface
- */
private function createServer(): ServerInterface
{
return new Server($this->responses, \Closure::fromCallable([$this, 'onRequest']));
}
- /**
- * @param ReaderInterface $reader
- * @return MarshallerInterface
- */
private function createMarshaller(ReaderInterface $reader): MarshallerInterface
{
return new Marshaller(new AttributeMapperFactory($reader));
diff --git a/tests/Unit/Plugin/AbstractPluginTestCase.php b/tests/Unit/Plugin/AbstractPluginTestCase.php
new file mode 100644
index 000000000..84f2dbac3
--- /dev/null
+++ b/tests/Unit/Plugin/AbstractPluginTestCase.php
@@ -0,0 +1,86 @@
+getName());
+ }
+
+ public function testConfigureClientPassthrough(): void
+ {
+ $plugin = new class('noop') extends AbstractPlugin {};
+ $context = new ClientPluginContext(new ClientOptions());
+
+ $clone = clone $context;
+ $plugin->configureClient($context);
+
+ self::assertSame($clone->getClientOptions(), $context->getClientOptions());
+ self::assertSame($clone->getDataConverter(), $context->getDataConverter());
+ }
+
+ public function testConfigureScheduleClientPassthrough(): void
+ {
+ $plugin = new class('noop') extends AbstractPlugin {};
+ $context = new ScheduleClientPluginContext(new ClientOptions());
+
+ $clone = clone $context;
+ $plugin->configureScheduleClient($context);
+
+ self::assertSame($clone->getClientOptions(), $context->getClientOptions());
+ self::assertSame($clone->getDataConverter(), $context->getDataConverter());
+ }
+
+ public function testConfigureWorkerFactoryPassthrough(): void
+ {
+ $plugin = new class('noop') extends AbstractPlugin {};
+ $context = new WorkerFactoryPluginContext();
+
+ $clone = clone $context;
+ $plugin->configureWorkerFactory($context);
+
+ self::assertSame($clone->getDataConverter(), $context->getDataConverter());
+ }
+
+ public function testConfigureWorkerPassthrough(): void
+ {
+ $plugin = new class('noop') extends AbstractPlugin {};
+ $context = new WorkerPluginContext('test-queue', WorkerOptions::new());
+
+ $clone = clone $context;
+ $plugin->configureWorker($context);
+
+ self::assertSame($clone->getWorkerOptions(), $context->getWorkerOptions());
+ self::assertSame($clone->getExceptionInterceptor(), $context->getExceptionInterceptor());
+ }
+
+ public function testInitializeWorkerNoop(): void
+ {
+ $plugin = new class('noop') extends AbstractPlugin {};
+ $worker = $this->createMock(WorkerInterface::class);
+
+ // Should not throw
+ $plugin->initializeWorker($worker);
+ self::assertTrue(true);
+ }
+}
diff --git a/tests/Unit/Plugin/ClientPluginContextTestCase.php b/tests/Unit/Plugin/ClientPluginContextTestCase.php
new file mode 100644
index 000000000..bd2bdf644
--- /dev/null
+++ b/tests/Unit/Plugin/ClientPluginContextTestCase.php
@@ -0,0 +1,70 @@
+getClientOptions());
+ self::assertNull($context->getDataConverter());
+ self::assertSame([], $context->getInterceptors());
+ }
+
+ public function testSetters(): void
+ {
+ $context = new ClientPluginContext(new ClientOptions());
+ $newOptions = new ClientOptions();
+ $converter = $this->createMock(DataConverterInterface::class);
+
+ $result = $context
+ ->setClientOptions($newOptions)
+ ->setDataConverter($converter);
+
+ self::assertSame($context, $result);
+ self::assertSame($newOptions, $context->getClientOptions());
+ self::assertSame($converter, $context->getDataConverter());
+ }
+
+ public function testAddInterceptor(): void
+ {
+ $context = new ClientPluginContext(new ClientOptions());
+
+ $interceptor = new class implements WorkflowClientCallsInterceptor {
+ use WorkflowClientCallsInterceptorTrait;
+ };
+ $result = $context->addInterceptor($interceptor);
+
+ self::assertSame($context, $result);
+ self::assertCount(1, $context->getInterceptors());
+ self::assertSame($interceptor, $context->getInterceptors()[0]);
+ }
+
+ public function testSetInterceptors(): void
+ {
+ $context = new ClientPluginContext(new ClientOptions());
+
+ $interceptor = new class implements WorkflowClientCallsInterceptor {
+ use WorkflowClientCallsInterceptorTrait;
+ };
+ $context->setInterceptors([$interceptor]);
+
+ self::assertCount(1, $context->getInterceptors());
+ }
+}
diff --git a/tests/Unit/Plugin/ClientPluginTestCase.php b/tests/Unit/Plugin/ClientPluginTestCase.php
new file mode 100644
index 000000000..8af441763
--- /dev/null
+++ b/tests/Unit/Plugin/ClientPluginTestCase.php
@@ -0,0 +1,252 @@
+called = true;
+ }
+ };
+
+ new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertTrue($called);
+ }
+
+ public function testPluginModifiesClientOptions(): void
+ {
+ $plugin = new class implements ClientPluginInterface {
+ use ClientPluginTrait;
+
+ public function getName(): string
+ {
+ return 'test.namespace';
+ }
+
+ public function configureClient(ClientPluginContext $context): void
+ {
+ $context->setClientOptions(
+ (new ClientOptions())->withNamespace('plugin-namespace'),
+ );
+ }
+ };
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ // The namespace metadata is set from plugin-modified options
+ self::assertNotNull($client->getServiceClient());
+ }
+
+ public function testPluginModifiesDataConverter(): void
+ {
+ $customConverter = $this->createMock(DataConverterInterface::class);
+
+ $plugin = new class($customConverter) implements ClientPluginInterface {
+ use ClientPluginTrait;
+
+ public function __construct(private DataConverterInterface $converter) {}
+
+ public function getName(): string
+ {
+ return 'test.converter';
+ }
+
+ public function configureClient(ClientPluginContext $context): void
+ {
+ $context->setDataConverter($this->converter);
+ }
+ };
+
+ // Should not throw — converter is applied
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+ self::assertNotNull($client);
+ }
+
+ public function testPluginAddsInterceptor(): void
+ {
+ $interceptor = new class implements WorkflowClientCallsInterceptor {
+ use WorkflowClientCallsInterceptorTrait;
+ };
+
+ $plugin = new class($interceptor) implements ClientPluginInterface {
+ use ClientPluginTrait;
+
+ public function __construct(private WorkflowClientCallsInterceptor $interceptor) {}
+
+ public function getName(): string
+ {
+ return 'test.interceptor';
+ }
+
+ public function configureClient(ClientPluginContext $context): void
+ {
+ $context->addInterceptor($this->interceptor);
+ }
+ };
+
+ // Should not throw — interceptor pipeline is built with plugin interceptor
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+ self::assertNotNull($client);
+ }
+
+ public function testMultiplePluginsCalledInOrder(): void
+ {
+ $order = [];
+
+ $plugin1 = new class($order) implements ClientPluginInterface {
+ use ClientPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.first';
+ }
+
+ public function configureClient(ClientPluginContext $context): void
+ {
+ $this->order[] = 'first';
+ }
+ };
+
+ $plugin2 = new class($order) implements ClientPluginInterface {
+ use ClientPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.second';
+ }
+
+ public function configureClient(ClientPluginContext $context): void
+ {
+ $this->order[] = 'second';
+ }
+ };
+
+ new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin1, $plugin2]));
+
+ self::assertSame(['first', 'second'], $order);
+ }
+
+ public function testDuplicatePluginThrowsException(): void
+ {
+ $plugin1 = new class('dup') extends AbstractPlugin {};
+ $plugin2 = new class('dup') extends AbstractPlugin {};
+
+ $this->expectException(\RuntimeException::class);
+ $this->expectExceptionMessage('Duplicate plugin "dup"');
+
+ new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin1, $plugin2]));
+ }
+
+ public function testGetWorkerPluginsPropagation(): void
+ {
+ $plugin = new class('combo') extends AbstractPlugin {};
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ $workerPlugins = $client->getWorkerPlugins();
+ self::assertCount(1, $workerPlugins);
+ self::assertSame($plugin, $workerPlugins[0]);
+ }
+
+ public function testGetScheduleClientPluginsPropagation(): void
+ {
+ $plugin = new class('combo') extends AbstractPlugin {};
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ $schedulePlugins = $client->getScheduleClientPlugins();
+ self::assertCount(1, $schedulePlugins);
+ self::assertSame($plugin, $schedulePlugins[0]);
+ }
+
+ public function testClientOnlyPluginNotPropagatedToWorkers(): void
+ {
+ $plugin = new class implements ClientPluginInterface {
+ use ClientPluginTrait;
+
+ public function getName(): string
+ {
+ return 'client-only';
+ }
+ };
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertCount(0, $client->getWorkerPlugins());
+ self::assertCount(0, $client->getScheduleClientPlugins());
+ }
+
+ public function testWorkerOnlyPluginNotPropagatedToScheduleClient(): void
+ {
+ $plugin = new class implements ClientPluginInterface, WorkerPluginInterface {
+ use ClientPluginTrait;
+ use WorkerPluginTrait;
+
+ public function getName(): string
+ {
+ return 'client-worker';
+ }
+ };
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertCount(1, $client->getWorkerPlugins());
+ self::assertCount(0, $client->getScheduleClientPlugins());
+ }
+
+ private function mockServiceClient(): ServiceClientInterface
+ {
+ $context = $this->createMock(ContextInterface::class);
+ $context->method('getMetadata')->willReturn([]);
+ $context->method('withMetadata')->willReturn($context);
+
+ $client = $this->createMock(ServiceClientInterface::class);
+ $client->method('getContext')->willReturn($context);
+ $client->method('withContext')->willReturn($client);
+
+ return $client;
+ }
+}
diff --git a/tests/Unit/Plugin/CompositePipelineProviderTestCase.php b/tests/Unit/Plugin/CompositePipelineProviderTestCase.php
new file mode 100644
index 000000000..4a8bb3673
--- /dev/null
+++ b/tests/Unit/Plugin/CompositePipelineProviderTestCase.php
@@ -0,0 +1,124 @@
+getPipeline(TestInvokeInterceptor::class);
+ /** @see TestInvokeInterceptor::__invoke() */
+ $result = $pipeline->with(static fn(string $s) => $s, '__invoke')('_');
+
+ self::assertSame('_A', $result);
+ }
+
+ public function testPluginInterceptorsPrependedToSimpleProvider(): void
+ {
+ $first = new TestOrderInterceptor('1');
+ $second = new TestOrderInterceptor('2');
+
+ $baseProvider = new SimplePipelineProvider([$second]);
+ $composite = new CompositePipelineProvider([$first], $baseProvider);
+
+ $pipeline = $composite->getPipeline(TestOrderInterceptor::class);
+ /** @see TestOrderInterceptor::handle() */
+ $result = $pipeline->with(static fn(string $s) => $s, 'handle')('_');
+
+ // Plugin interceptor ($first) runs before base ($second)
+ self::assertSame('_12', $result);
+ }
+
+ public function testPipelineCaching(): void
+ {
+ $composite = new CompositePipelineProvider([], new SimplePipelineProvider());
+
+ $pipeline1 = $composite->getPipeline(TestOrderInterceptor::class);
+ $pipeline2 = $composite->getPipeline(TestOrderInterceptor::class);
+
+ self::assertSame($pipeline1, $pipeline2);
+ }
+
+ public function testCustomPipelineProviderWithPluginInterceptors(): void
+ {
+ // Custom provider that doesn't extend SimplePipelineProvider
+ $customProvider = new class implements PipelineProvider {
+ public function getPipeline(string $interceptorClass): Pipeline
+ {
+ return Pipeline::prepare([]);
+ }
+ };
+
+ $interceptor = new TestOrderInterceptor('P');
+ $composite = new CompositePipelineProvider([$interceptor], $customProvider);
+
+ $pipeline = $composite->getPipeline(TestOrderInterceptor::class);
+ /** @see TestOrderInterceptor::handle() */
+ $result = $pipeline->with(static fn(string $s) => $s, 'handle')('_');
+
+ self::assertSame('_P', $result);
+ }
+
+ public function testEmptyPluginInterceptorsWithCustomProvider(): void
+ {
+ $customProvider = new class implements PipelineProvider {
+ public function getPipeline(string $interceptorClass): Pipeline
+ {
+ return Pipeline::prepare([new TestOrderInterceptor('X')]);
+ }
+ };
+
+ $composite = new CompositePipelineProvider([], $customProvider);
+ $pipeline = $composite->getPipeline(TestOrderInterceptor::class);
+ /** @see TestOrderInterceptor::handle() */
+ $result = $pipeline->with(static fn(string $s) => $s, 'handle')('_');
+
+ self::assertSame('_X', $result);
+ }
+}
+
+/**
+ * Test interceptor that appends a tag to the input string.
+ * @internal
+ */
+class TestOrderInterceptor implements Interceptor
+{
+ public function __construct(private readonly string $tag) {}
+
+ public function handle(string $s, callable $next): string
+ {
+ return $next($s . $this->tag);
+ }
+}
+
+/**
+ * Test interceptor using __invoke.
+ * @internal
+ */
+class TestInvokeInterceptor implements Interceptor
+{
+ public function __construct(private readonly string $tag) {}
+
+ public function __invoke(string $s, callable $next): string
+ {
+ return $next($s . $this->tag);
+ }
+}
diff --git a/tests/Unit/Plugin/ConnectionPluginContextTestCase.php b/tests/Unit/Plugin/ConnectionPluginContextTestCase.php
new file mode 100644
index 000000000..f52c8443f
--- /dev/null
+++ b/tests/Unit/Plugin/ConnectionPluginContextTestCase.php
@@ -0,0 +1,48 @@
+createMock(ServiceClientInterface::class);
+ $context = new ConnectionPluginContext($serviceClient);
+
+ self::assertSame($serviceClient, $context->getServiceClient());
+ }
+
+ public function testSetServiceClientReplacesValue(): void
+ {
+ $original = $this->createMock(ServiceClientInterface::class);
+ $replacement = $this->createMock(ServiceClientInterface::class);
+
+ $context = new ConnectionPluginContext($original);
+ $context->setServiceClient($replacement);
+
+ self::assertSame($replacement, $context->getServiceClient());
+ }
+
+ public function testSetServiceClientReturnsSelf(): void
+ {
+ $context = new ConnectionPluginContext(
+ $this->createMock(ServiceClientInterface::class),
+ );
+
+ $result = $context->setServiceClient(
+ $this->createMock(ServiceClientInterface::class),
+ );
+
+ self::assertSame($context, $result);
+ }
+}
diff --git a/tests/Unit/Plugin/ConnectionPluginTestCase.php b/tests/Unit/Plugin/ConnectionPluginTestCase.php
new file mode 100644
index 000000000..a7e19d090
--- /dev/null
+++ b/tests/Unit/Plugin/ConnectionPluginTestCase.php
@@ -0,0 +1,307 @@
+called = true;
+ }
+ };
+
+ new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertTrue($called);
+ }
+
+ public function testConfigureServiceClientCalledFromScheduleClient(): void
+ {
+ $called = false;
+ $plugin = new class($called) implements ConnectionPluginInterface, ScheduleClientPluginInterface {
+ use ConnectionPluginTrait;
+ use ScheduleClientPluginTrait;
+
+ public function __construct(private bool &$called) {}
+
+ public function getName(): string
+ {
+ return 'test.connection';
+ }
+
+ public function configureServiceClient(ConnectionPluginContext $context): void
+ {
+ $this->called = true;
+ }
+ };
+
+ new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertTrue($called);
+ }
+
+ public function testPluginModifiesServiceClientViaWithAuthKey(): void
+ {
+ $authedClient = $this->mockServiceClient();
+
+ $originalClient = $this->createMock(ServiceClientInterface::class);
+ $originalClient->method('withAuthKey')->willReturn($authedClient);
+ // Allow context calls for the client constructor
+ $context = $this->createMock(ContextInterface::class);
+ $context->method('getMetadata')->willReturn([]);
+ $context->method('withMetadata')->willReturn($context);
+ $originalClient->method('getContext')->willReturn($context);
+ $originalClient->method('withContext')->willReturn($originalClient);
+
+ $plugin = new class implements ConnectionPluginInterface, ClientPluginInterface {
+ use ConnectionPluginTrait;
+ use ClientPluginTrait;
+
+ public function getName(): string
+ {
+ return 'test.auth';
+ }
+
+ public function configureServiceClient(ConnectionPluginContext $context): void
+ {
+ $context->setServiceClient(
+ $context->getServiceClient()->withAuthKey('my-api-key'),
+ );
+ }
+ };
+
+ $client = new WorkflowClient($originalClient, pluginRegistry: new PluginRegistry([$plugin]));
+
+ // The service client should be the authed version
+ self::assertSame($authedClient, $client->getServiceClient());
+ }
+
+ public function testPluginAddsMetadataViaContext(): void
+ {
+ $metadataSet = null;
+
+ $context = $this->createMock(ContextInterface::class);
+ $context->method('getMetadata')->willReturn([]);
+ $context->method('withMetadata')->willReturnCallback(
+ static function (array $metadata) use ($context, &$metadataSet) {
+ $metadataSet = $metadata;
+ return $context;
+ },
+ );
+
+ $serviceClient = $this->createMock(ServiceClientInterface::class);
+ $serviceClient->method('getContext')->willReturn($context);
+ $serviceClient->method('withContext')->willReturn($serviceClient);
+
+ $plugin = new class implements ConnectionPluginInterface, ClientPluginInterface {
+ use ConnectionPluginTrait;
+ use ClientPluginTrait;
+
+ public function getName(): string
+ {
+ return 'test.metadata';
+ }
+
+ public function configureServiceClient(ConnectionPluginContext $context): void
+ {
+ $client = $context->getServiceClient();
+ $ctx = $client->getContext();
+ $context->setServiceClient(
+ $client->withContext(
+ $ctx->withMetadata(['x-custom-header' => ['value']] + $ctx->getMetadata()),
+ ),
+ );
+ }
+ };
+
+ new WorkflowClient($serviceClient, pluginRegistry: new PluginRegistry([$plugin]));
+
+ // Metadata should have been set (by plugin and then by WorkflowClient for namespace)
+ self::assertNotNull($metadataSet);
+ }
+
+ public function testMultipleConnectionPluginsCalledInOrder(): void
+ {
+ $order = [];
+
+ $plugin1 = new class($order) implements ConnectionPluginInterface, ClientPluginInterface {
+ use ConnectionPluginTrait;
+ use ClientPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.first';
+ }
+
+ public function configureServiceClient(ConnectionPluginContext $context): void
+ {
+ $this->order[] = 'first';
+ }
+ };
+
+ $plugin2 = new class($order) implements ConnectionPluginInterface, ClientPluginInterface {
+ use ConnectionPluginTrait;
+ use ClientPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.second';
+ }
+
+ public function configureServiceClient(ConnectionPluginContext $context): void
+ {
+ $this->order[] = 'second';
+ }
+ };
+
+ new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin1, $plugin2]));
+
+ self::assertSame(['first', 'second'], $order);
+ }
+
+ public function testConnectionPluginRunsBeforeClientPlugin(): void
+ {
+ $order = [];
+
+ $plugin = new class($order) implements ConnectionPluginInterface, ClientPluginInterface {
+ use ConnectionPluginTrait;
+ use ClientPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.order';
+ }
+
+ public function configureServiceClient(ConnectionPluginContext $context): void
+ {
+ $this->order[] = 'connection';
+ }
+
+ public function configureClient(ClientPluginContext $context): void
+ {
+ $this->order[] = 'client';
+ }
+ };
+
+ new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertSame(['connection', 'client'], $order);
+ }
+
+ public function testDefaultTraitIsNoOp(): void
+ {
+ $plugin = new class('test.noop') extends AbstractPlugin {};
+
+ // Should not throw — all trait methods are no-ops
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+ self::assertNotNull($client);
+ }
+
+ public function testAbstractPluginWorksWithConnectionPlugin(): void
+ {
+ $called = false;
+
+ $plugin = new class($called) extends AbstractPlugin {
+ private bool $ref;
+
+ public function __construct(bool &$called)
+ {
+ parent::__construct('test.abstract');
+ $this->ref = &$called;
+ }
+
+ public function configureServiceClient(ConnectionPluginContext $context): void
+ {
+ $this->ref = true;
+ }
+ };
+
+ new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertTrue($called);
+ }
+
+ public function testConnectionOnlyPluginNotRegisteredAsClientPlugin(): void
+ {
+ // A plugin implementing only ConnectionPluginInterface
+ // should still work when passed to WorkflowClient
+ $called = false;
+ $plugin = new class($called) implements ConnectionPluginInterface {
+ use ConnectionPluginTrait;
+
+ public function __construct(private bool &$called) {}
+
+ public function getName(): string
+ {
+ return 'test.conn-only';
+ }
+
+ public function configureServiceClient(ConnectionPluginContext $context): void
+ {
+ $this->called = true;
+ }
+ };
+
+ new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertTrue($called);
+ }
+
+ private function mockServiceClient(): ServiceClientInterface
+ {
+ $context = $this->createMock(ContextInterface::class);
+ $context->method('getMetadata')->willReturn([]);
+ $context->method('withMetadata')->willReturn($context);
+
+ $client = $this->createMock(ServiceClientInterface::class);
+ $client->method('getContext')->willReturn($context);
+ $client->method('withContext')->willReturn($client);
+ $client->method('withAuthKey')->willReturn($client);
+
+ return $client;
+ }
+}
diff --git a/tests/Unit/Plugin/PluginPropagationTestCase.php b/tests/Unit/Plugin/PluginPropagationTestCase.php
new file mode 100644
index 000000000..56581d862
--- /dev/null
+++ b/tests/Unit/Plugin/PluginPropagationTestCase.php
@@ -0,0 +1,239 @@
+order[] = 'configureClient';
+ }
+
+ public function configureWorkerFactory(WorkerFactoryPluginContext $context): void
+ {
+ $this->order[] = 'configureWorkerFactory';
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $this->order[] = 'configureWorker';
+ }
+
+ public function initializeWorker(WorkerInterface $worker): void
+ {
+ $this->order[] = 'initializeWorker';
+ }
+ };
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertSame(['configureClient'], $order);
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ client: $client,
+ );
+
+ self::assertSame(['configureClient', 'configureWorkerFactory'], $order);
+
+ $factory->newWorker('test-queue');
+
+ self::assertSame([
+ 'configureClient',
+ 'configureWorkerFactory',
+ 'configureWorker',
+ 'initializeWorker',
+ ], $order);
+ }
+
+ public function testPluginFromClientMergesWithFactoryPlugins(): void
+ {
+ $order = [];
+
+ $clientPlugin = new class($order) extends AbstractPlugin {
+ public function __construct(private array &$order)
+ {
+ parent::__construct('test.from-client');
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $this->order[] = 'from-client';
+ }
+ };
+
+ $factoryPlugin = new class($order) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.from-factory';
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $this->order[] = 'from-factory';
+ }
+ };
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$clientPlugin]));
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$factoryPlugin]),
+ client: $client,
+ );
+ $factory->newWorker();
+
+ self::assertSame(['from-factory', 'from-client'], $order);
+ }
+
+ public function testDuplicateAcrossClientAndFactoryThrows(): void
+ {
+ $clientPlugin = new class('shared-name') extends AbstractPlugin {};
+
+ $factoryPlugin = new class implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function getName(): string
+ {
+ return 'shared-name';
+ }
+ };
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$clientPlugin]));
+
+ $this->expectException(\RuntimeException::class);
+ $this->expectExceptionMessage('Duplicate plugin "shared-name"');
+
+ new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$factoryPlugin]),
+ client: $client,
+ );
+ }
+
+ public function testClientOnlyPluginNotPropagatedToFactory(): void
+ {
+ $factoryConfigureCalled = false;
+
+ $plugin = new class($factoryConfigureCalled) implements ClientPluginInterface {
+ use ClientPluginTrait;
+
+ public function __construct(private bool &$called) {}
+
+ public function getName(): string
+ {
+ return 'test.client-only';
+ }
+ };
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ // Client-only plugin should NOT appear in getWorkerPlugins
+ self::assertCount(0, $client->getWorkerPlugins());
+
+ // Factory should work fine without this plugin
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ client: $client,
+ );
+ $factory->newWorker();
+
+ self::assertCount(0, $factory->getPluginRegistry()->getPlugins(PluginInterface::class));
+ }
+
+ public function testScheduleClientPluginPropagation(): void
+ {
+ $called = false;
+
+ $plugin = new class($called) implements ClientPluginInterface, ScheduleClientPluginInterface {
+ use ClientPluginTrait;
+ use ScheduleClientPluginTrait;
+
+ public function __construct(private bool &$called) {}
+
+ public function getName(): string
+ {
+ return 'test.schedule-combo';
+ }
+
+ public function configureScheduleClient(ScheduleClientPluginContext $context): void
+ {
+ $this->called = true;
+ }
+ };
+
+ $client = new WorkflowClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ $schedulePlugins = $client->getScheduleClientPlugins();
+ self::assertCount(1, $schedulePlugins);
+ self::assertSame($plugin, $schedulePlugins[0]);
+ }
+
+ private function mockServiceClient(): ServiceClientInterface
+ {
+ $context = $this->createMock(ContextInterface::class);
+ $context->method('getMetadata')->willReturn([]);
+ $context->method('withMetadata')->willReturn($context);
+
+ $client = $this->createMock(ServiceClientInterface::class);
+ $client->method('getContext')->willReturn($context);
+ $client->method('withContext')->willReturn($client);
+
+ return $client;
+ }
+
+ private function mockRpc(): RPCConnectionInterface
+ {
+ return $this->createMock(RPCConnectionInterface::class);
+ }
+}
diff --git a/tests/Unit/Plugin/PluginRegistryTestCase.php b/tests/Unit/Plugin/PluginRegistryTestCase.php
new file mode 100644
index 000000000..b8dde7e79
--- /dev/null
+++ b/tests/Unit/Plugin/PluginRegistryTestCase.php
@@ -0,0 +1,123 @@
+createPlugin('plugin-1');
+ $plugin2 = $this->createPlugin('plugin-2');
+
+ $registry = new PluginRegistry([$plugin1, $plugin2]);
+
+ // AbstractPlugin implements all three interfaces (via TemporalPluginInterface),
+ // so we can retrieve both via any of them
+ $plugins = $registry->getPlugins(ClientPluginInterface::class);
+ self::assertCount(2, $plugins);
+ self::assertSame($plugin1, $plugins[0]);
+ self::assertSame($plugin2, $plugins[1]);
+ }
+
+ public function testDuplicateThrowsException(): void
+ {
+ $plugin1 = $this->createPlugin('my-plugin');
+ $plugin2 = $this->createPlugin('my-plugin');
+
+ $this->expectException(\RuntimeException::class);
+ $this->expectExceptionMessage('Duplicate plugin "my-plugin"');
+
+ new PluginRegistry([$plugin1, $plugin2]);
+ }
+
+ public function testDuplicateViaAddThrowsException(): void
+ {
+ $registry = new PluginRegistry();
+ $registry->add($this->createPlugin('dup-plugin'));
+
+ $this->expectException(\RuntimeException::class);
+ $this->expectExceptionMessage('Duplicate plugin "dup-plugin"');
+
+ $registry->add($this->createPlugin('dup-plugin'));
+ }
+
+ public function testMergeThrowsOnDuplicates(): void
+ {
+ $plugin1 = $this->createPlugin('plugin-a');
+ $plugin2 = $this->createPlugin('plugin-b');
+ $plugin3 = $this->createPlugin('plugin-a'); // duplicate
+
+ $registry = new PluginRegistry([$plugin1]);
+
+ $this->expectException(\RuntimeException::class);
+ $this->expectExceptionMessage('Duplicate plugin "plugin-a"');
+
+ $registry->merge([$plugin2, $plugin3]);
+ }
+
+ public function testGetPluginsByInterface(): void
+ {
+ $clientPlugin = new class implements ClientPluginInterface {
+ public function getName(): string
+ {
+ return 'client-only';
+ }
+
+ public function configureClient(ClientPluginContext $context): void {}
+ };
+
+ $workerPlugin = new class implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function getName(): string
+ {
+ return 'worker-only';
+ }
+ };
+
+ $bothPlugin = $this->createPlugin('both');
+
+ $registry = new PluginRegistry([$clientPlugin, $workerPlugin, $bothPlugin]);
+
+ $clientPlugins = $registry->getPlugins(ClientPluginInterface::class);
+ self::assertCount(2, $clientPlugins);
+ self::assertSame($clientPlugin, $clientPlugins[0]);
+ self::assertSame($bothPlugin, $clientPlugins[1]);
+
+ $workerPlugins = $registry->getPlugins(WorkerPluginInterface::class);
+ self::assertCount(2, $workerPlugins);
+ self::assertSame($workerPlugin, $workerPlugins[0]);
+ self::assertSame($bothPlugin, $workerPlugins[1]);
+
+ $schedulePlugins = $registry->getPlugins(ScheduleClientPluginInterface::class);
+ self::assertCount(1, $schedulePlugins);
+ self::assertSame($bothPlugin, $schedulePlugins[0]);
+ }
+
+ public function testEmptyRegistry(): void
+ {
+ $registry = new PluginRegistry();
+
+ self::assertSame([], $registry->getPlugins(ClientPluginInterface::class));
+ }
+
+ private function createPlugin(string $name): AbstractPlugin
+ {
+ return new class($name) extends AbstractPlugin {};
+ }
+}
diff --git a/tests/Unit/Plugin/ScheduleClientPluginContextTestCase.php b/tests/Unit/Plugin/ScheduleClientPluginContextTestCase.php
new file mode 100644
index 000000000..32133950c
--- /dev/null
+++ b/tests/Unit/Plugin/ScheduleClientPluginContextTestCase.php
@@ -0,0 +1,41 @@
+getClientOptions());
+ self::assertNull($context->getDataConverter());
+ }
+
+ public function testSetters(): void
+ {
+ $context = new ScheduleClientPluginContext(new ClientOptions());
+ $newOptions = new ClientOptions();
+ $converter = $this->createMock(DataConverterInterface::class);
+
+ $result = $context
+ ->setClientOptions($newOptions)
+ ->setDataConverter($converter);
+
+ self::assertSame($context, $result);
+ self::assertSame($newOptions, $context->getClientOptions());
+ self::assertSame($converter, $context->getDataConverter());
+ }
+}
diff --git a/tests/Unit/Plugin/ScheduleClientPluginTestCase.php b/tests/Unit/Plugin/ScheduleClientPluginTestCase.php
new file mode 100644
index 000000000..8c220b9f0
--- /dev/null
+++ b/tests/Unit/Plugin/ScheduleClientPluginTestCase.php
@@ -0,0 +1,194 @@
+called = true;
+ }
+ };
+
+ new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+
+ self::assertTrue($called);
+ }
+
+ public function testPluginModifiesClientOptions(): void
+ {
+ $plugin = new class implements ScheduleClientPluginInterface {
+ use ScheduleClientPluginTrait;
+
+ public function getName(): string
+ {
+ return 'test.namespace';
+ }
+
+ public function configureScheduleClient(ScheduleClientPluginContext $context): void
+ {
+ $context->setClientOptions(
+ (new ClientOptions())->withNamespace('schedule-namespace'),
+ );
+ }
+ };
+
+ $client = new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+ self::assertNotNull($client);
+ }
+
+ public function testPluginModifiesDataConverter(): void
+ {
+ $customConverter = $this->createMock(DataConverterInterface::class);
+
+ $plugin = new class($customConverter) implements ScheduleClientPluginInterface {
+ use ScheduleClientPluginTrait;
+
+ public function __construct(private DataConverterInterface $converter) {}
+
+ public function getName(): string
+ {
+ return 'test.converter';
+ }
+
+ public function configureScheduleClient(ScheduleClientPluginContext $context): void
+ {
+ $context->setDataConverter($this->converter);
+ }
+ };
+
+ $client = new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin]));
+ self::assertNotNull($client);
+ }
+
+ public function testMultiplePluginsCalledInOrder(): void
+ {
+ $order = [];
+
+ $plugin1 = new class($order) implements ScheduleClientPluginInterface {
+ use ScheduleClientPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.first';
+ }
+
+ public function configureScheduleClient(ScheduleClientPluginContext $context): void
+ {
+ $this->order[] = 'first';
+ }
+ };
+
+ $plugin2 = new class($order) implements ScheduleClientPluginInterface {
+ use ScheduleClientPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.second';
+ }
+
+ public function configureScheduleClient(ScheduleClientPluginContext $context): void
+ {
+ $this->order[] = 'second';
+ }
+ };
+
+ new ScheduleClient($this->mockServiceClient(), pluginRegistry: new PluginRegistry([$plugin1, $plugin2]));
+
+ self::assertSame(['first', 'second'], $order);
+ }
+
+ public function testNoPluginsDoesNotBreak(): void
+ {
+ $client = new ScheduleClient($this->mockServiceClient());
+ self::assertNotNull($client);
+ }
+
+ public function testPluginReceivesCorrectInitialContext(): void
+ {
+ $initialOptions = (new ClientOptions())->withNamespace('initial-ns');
+ $initialConverter = $this->createMock(DataConverterInterface::class);
+
+ $receivedOptions = null;
+ $receivedConverter = null;
+
+ $plugin = new class($receivedOptions, $receivedConverter) implements ScheduleClientPluginInterface {
+ use ScheduleClientPluginTrait;
+
+ public function __construct(
+ private ?ClientOptions &$receivedOptions,
+ private ?DataConverterInterface &$receivedConverter,
+ ) {}
+
+ public function getName(): string
+ {
+ return 'test.inspector';
+ }
+
+ public function configureScheduleClient(ScheduleClientPluginContext $context): void
+ {
+ $this->receivedOptions = $context->getClientOptions();
+ $this->receivedConverter = $context->getDataConverter();
+ }
+ };
+
+ new ScheduleClient(
+ $this->mockServiceClient(),
+ options: $initialOptions,
+ converter: $initialConverter,
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+
+ self::assertSame($initialOptions, $receivedOptions);
+ self::assertSame($initialConverter, $receivedConverter);
+ }
+
+ private function mockServiceClient(): ServiceClientInterface
+ {
+ $context = $this->createMock(ContextInterface::class);
+ $context->method('getMetadata')->willReturn([]);
+ $context->method('withMetadata')->willReturn($context);
+
+ $client = $this->createMock(ServiceClientInterface::class);
+ $client->method('getContext')->willReturn($context);
+ $client->method('withContext')->willReturn($client);
+
+ return $client;
+ }
+}
diff --git a/tests/Unit/Plugin/WorkerFactoryPluginTestCase.php b/tests/Unit/Plugin/WorkerFactoryPluginTestCase.php
new file mode 100644
index 000000000..7489dc95d
--- /dev/null
+++ b/tests/Unit/Plugin/WorkerFactoryPluginTestCase.php
@@ -0,0 +1,679 @@
+called = true;
+ }
+ };
+
+ new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+
+ self::assertTrue($called);
+ }
+
+ public function testConfigureWorkerFactoryModifiesDataConverter(): void
+ {
+ $customConverter = $this->createMock(DataConverterInterface::class);
+
+ $plugin = new class($customConverter) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private DataConverterInterface $converter) {}
+
+ public function getName(): string
+ {
+ return 'test.converter';
+ }
+
+ public function configureWorkerFactory(WorkerFactoryPluginContext $context): void
+ {
+ $context->setDataConverter($this->converter);
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+
+ self::assertSame($customConverter, $factory->getDataConverter());
+ }
+
+ public function testConfigureWorkerIsCalled(): void
+ {
+ $called = false;
+ $receivedTaskQueue = null;
+
+ $plugin = new class($called, $receivedTaskQueue) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(
+ private bool &$called,
+ private ?string &$receivedTaskQueue,
+ ) {}
+
+ public function getName(): string
+ {
+ return 'test.spy';
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $this->called = true;
+ $this->receivedTaskQueue = $context->getTaskQueue();
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+ $factory->newWorker('my-queue');
+
+ self::assertTrue($called);
+ self::assertSame('my-queue', $receivedTaskQueue);
+ }
+
+ public function testConfigureWorkerModifiesWorkerOptions(): void
+ {
+ $customOptions = WorkerOptions::new()->withMaxConcurrentActivityExecutionSize(42);
+
+ $plugin = new class($customOptions) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private WorkerOptions $opts) {}
+
+ public function getName(): string
+ {
+ return 'test.options';
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $context->setWorkerOptions($this->opts);
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+ $worker = $factory->newWorker('test-queue');
+
+ self::assertSame(42, $worker->getOptions()->maxConcurrentActivityExecutionSize);
+ }
+
+ public function testInitializeWorkerIsCalled(): void
+ {
+ $receivedWorker = null;
+
+ $plugin = new class($receivedWorker) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private ?WorkerInterface &$receivedWorker) {}
+
+ public function getName(): string
+ {
+ return 'test.init';
+ }
+
+ public function initializeWorker(WorkerInterface $worker): void
+ {
+ $this->receivedWorker = $worker;
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+ $worker = $factory->newWorker('test-queue');
+
+ self::assertSame($worker, $receivedWorker);
+ }
+
+ public function testInitializeWorkerReceivesCorrectTaskQueue(): void
+ {
+ $receivedTaskQueue = null;
+
+ $plugin = new class($receivedTaskQueue) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private ?string &$receivedTaskQueue) {}
+
+ public function getName(): string
+ {
+ return 'test.tq';
+ }
+
+ public function initializeWorker(WorkerInterface $worker): void
+ {
+ $this->receivedTaskQueue = $worker->getID();
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+ $factory->newWorker('my-task-queue');
+
+ self::assertSame('my-task-queue', $receivedTaskQueue);
+ }
+
+ public function testPluginHookOrder(): void
+ {
+ $order = [];
+
+ $plugin = new class($order) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.order';
+ }
+
+ public function configureWorkerFactory(WorkerFactoryPluginContext $context): void
+ {
+ $this->order[] = 'configureWorkerFactory';
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $this->order[] = 'configureWorker';
+ }
+
+ public function initializeWorker(WorkerInterface $worker): void
+ {
+ $this->order[] = 'initializeWorker';
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+
+ self::assertSame(['configureWorkerFactory'], $order);
+
+ $factory->newWorker();
+
+ self::assertSame([
+ 'configureWorkerFactory',
+ 'configureWorker',
+ 'initializeWorker',
+ ], $order);
+ }
+
+ public function testMultiplePluginsCalledInOrder(): void
+ {
+ $order = [];
+
+ $plugin1 = new class($order) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.first';
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $this->order[] = 'first';
+ }
+ };
+
+ $plugin2 = new class($order) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.second';
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $this->order[] = 'second';
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin1, $plugin2]),
+ );
+ $factory->newWorker();
+
+ self::assertSame(['first', 'second'], $order);
+ }
+
+ public function testConfigureWorkerCalledPerWorker(): void
+ {
+ $taskQueues = [];
+
+ $plugin = new class($taskQueues) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private array &$taskQueues) {}
+
+ public function getName(): string
+ {
+ return 'test.per-worker';
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $this->taskQueues[] = $context->getTaskQueue();
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+ $factory->newWorker('queue-a');
+ $factory->newWorker('queue-b');
+
+ self::assertSame(['queue-a', 'queue-b'], $taskQueues);
+ }
+
+ public function testGetWorkerPluginsReturnsRegistered(): void
+ {
+ $plugin1 = new class('p1') extends AbstractPlugin {};
+ $plugin2 = new class('p2') extends AbstractPlugin {};
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin1, $plugin2]),
+ );
+
+ $plugins = $factory->getPluginRegistry()->getPlugins(PluginInterface::class);
+ self::assertCount(2, $plugins);
+ self::assertSame($plugin1, $plugins[0]);
+ self::assertSame($plugin2, $plugins[1]);
+ }
+
+ public function testDuplicatePluginThrowsException(): void
+ {
+ $plugin1 = new class('dup') extends AbstractPlugin {};
+ $plugin2 = new class('dup') extends AbstractPlugin {};
+
+ $this->expectException(\RuntimeException::class);
+ $this->expectExceptionMessage('Duplicate plugin "dup"');
+
+ new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin1, $plugin2]),
+ );
+ }
+
+ public function testRunHookIsCalled(): void
+ {
+ $called = false;
+ $plugin = new class($called) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private bool &$called) {}
+
+ public function getName(): string
+ {
+ return 'test.run';
+ }
+
+ public function run(WorkerFactoryInterface $factory, callable $next): int
+ {
+ $this->called = true;
+ return $next($factory);
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+
+ $factory->run($this->mockHost());
+
+ self::assertTrue($called);
+ }
+
+ public function testRunHookReceivesFactoryInstance(): void
+ {
+ $receivedFactory = null;
+
+ $plugin = new class($receivedFactory) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private ?WorkerFactoryInterface &$receivedFactory) {}
+
+ public function getName(): string
+ {
+ return 'test.factory-ref';
+ }
+
+ public function run(WorkerFactoryInterface $factory, callable $next): int
+ {
+ $this->receivedFactory = $factory;
+ return $next($factory);
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+
+ $factory->run($this->mockHost());
+
+ self::assertSame($factory, $receivedFactory);
+ }
+
+ public function testRunHookChainOrder(): void
+ {
+ $order = [];
+
+ $plugin1 = new class($order) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.first';
+ }
+
+ public function run(WorkerFactoryInterface $factory, callable $next): int
+ {
+ $this->order[] = 'first:before';
+ try {
+ return $next($factory);
+ } finally {
+ $this->order[] = 'first:after';
+ }
+ }
+ };
+
+ $plugin2 = new class($order) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.second';
+ }
+
+ public function run(WorkerFactoryInterface $factory, callable $next): int
+ {
+ $this->order[] = 'second:before';
+ try {
+ return $next($factory);
+ } finally {
+ $this->order[] = 'second:after';
+ }
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin1, $plugin2]),
+ );
+
+ $factory->run($this->mockHost());
+
+ // First plugin is outermost: before in forward order, after in reverse (LIFO)
+ self::assertSame([
+ 'first:before',
+ 'second:before',
+ 'second:after',
+ 'first:after',
+ ], $order);
+ }
+
+ public function testRunHookCanWrapWithTryFinally(): void
+ {
+ $cleanupCalled = false;
+
+ $plugin = new class($cleanupCalled) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private bool &$cleanupCalled) {}
+
+ public function getName(): string
+ {
+ return 'test.cleanup';
+ }
+
+ public function run(WorkerFactoryInterface $factory, callable $next): int
+ {
+ try {
+ return $next($factory);
+ } finally {
+ $this->cleanupCalled = true;
+ }
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+
+ $factory->run($this->mockHost());
+
+ self::assertTrue($cleanupCalled);
+ }
+
+ public function testRunHookCanSkipNext(): void
+ {
+ $innerCalled = false;
+
+ $outerPlugin = new class() implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function getName(): string
+ {
+ return 'test.outer';
+ }
+
+ public function run(WorkerFactoryInterface $factory, callable $next): int
+ {
+ // Intentionally skip $next()
+ return 42;
+ }
+ };
+
+ $innerPlugin = new class($innerCalled) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private bool &$innerCalled) {}
+
+ public function getName(): string
+ {
+ return 'test.inner';
+ }
+
+ public function run(WorkerFactoryInterface $factory, callable $next): int
+ {
+ $this->innerCalled = true;
+ return $next($factory);
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$outerPlugin, $innerPlugin]),
+ );
+
+ $result = $factory->run($this->mockHost());
+
+ self::assertSame(42, $result);
+ self::assertFalse($innerCalled);
+ }
+
+ public function testRunHookFullLifecycleOrder(): void
+ {
+ $order = [];
+
+ $plugin = new class($order) implements WorkerPluginInterface {
+ use WorkerPluginTrait;
+
+ public function __construct(private array &$order) {}
+
+ public function getName(): string
+ {
+ return 'test.lifecycle';
+ }
+
+ public function configureWorkerFactory(WorkerFactoryPluginContext $context): void
+ {
+ $this->order[] = 'configureWorkerFactory';
+ }
+
+ public function configureWorker(WorkerPluginContext $context): void
+ {
+ $this->order[] = 'configureWorker';
+ }
+
+ public function initializeWorker(WorkerInterface $worker): void
+ {
+ $this->order[] = 'initializeWorker';
+ }
+
+ public function run(WorkerFactoryInterface $factory, callable $next): int
+ {
+ $this->order[] = 'run:before';
+ try {
+ return $next($factory);
+ } finally {
+ $this->order[] = 'run:after';
+ }
+ }
+ };
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+ $factory->newWorker();
+ $factory->run($this->mockHost());
+
+ self::assertSame([
+ 'configureWorkerFactory',
+ 'configureWorker',
+ 'initializeWorker',
+ 'run:before',
+ 'run:after',
+ ], $order);
+ }
+
+ public function testRunHookReturnsValueFromRunLoop(): void
+ {
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ );
+
+ $result = $factory->run($this->mockHost());
+
+ self::assertSame(0, $result);
+ }
+
+ public function testDefaultTraitRunPassesThrough(): void
+ {
+ $plugin = new class('test.noop') extends AbstractPlugin {};
+
+ $factory = new WorkerFactory(
+ DataConverter::createDefault(),
+ $this->mockRpc(),
+ pluginRegistry: new PluginRegistry([$plugin]),
+ );
+
+ $result = $factory->run($this->mockHost());
+
+ self::assertSame(0, $result);
+ }
+
+ private function mockRpc(): RPCConnectionInterface
+ {
+ return $this->createMock(RPCConnectionInterface::class);
+ }
+
+ /**
+ * Create a mock host that immediately returns null (empty run loop).
+ */
+ private function mockHost(): HostConnectionInterface
+ {
+ $host = $this->createMock(HostConnectionInterface::class);
+ $host->method('waitBatch')->willReturn(null);
+
+ return $host;
+ }
+}
diff --git a/tests/Unit/Plugin/WorkerPluginContextTestCase.php b/tests/Unit/Plugin/WorkerPluginContextTestCase.php
new file mode 100644
index 000000000..22cd5709b
--- /dev/null
+++ b/tests/Unit/Plugin/WorkerPluginContextTestCase.php
@@ -0,0 +1,62 @@
+getDataConverter());
+ }
+
+ public function testFactoryContextSetters(): void
+ {
+ $context = new WorkerFactoryPluginContext();
+ $converter = $this->createMock(DataConverterInterface::class);
+
+ $result = $context->setDataConverter($converter);
+
+ self::assertSame($context, $result);
+ self::assertSame($converter, $context->getDataConverter());
+ }
+
+ // --- WorkerPluginContext ---
+
+ public function testWorkerContextBuilderPattern(): void
+ {
+ $options = WorkerOptions::new();
+ $context = new WorkerPluginContext('test-queue', $options);
+
+ self::assertSame('test-queue', $context->getTaskQueue());
+ self::assertSame($options, $context->getWorkerOptions());
+ self::assertNull($context->getExceptionInterceptor());
+ self::assertSame([], $context->getInterceptors());
+ }
+
+ public function testWorkerContextSetWorkerOptions(): void
+ {
+ $context = new WorkerPluginContext('test-queue', WorkerOptions::new());
+ $newOptions = WorkerOptions::new();
+
+ $result = $context->setWorkerOptions($newOptions);
+
+ self::assertSame($context, $result);
+ self::assertSame($newOptions, $context->getWorkerOptions());
+ }
+}