Skip to content
8 changes: 2 additions & 6 deletions psalm-baseline.xml
Original file line number Diff line number Diff line change
Expand Up @@ -160,15 +160,9 @@
<code><![CDATA[object]]></code>
<code><![CDATA[object]]></code>
</InvalidReturnType>
<LessSpecificReturnStatement>
<code><![CDATA[new self($serviceClient, $options, $converter, $interceptorProvider)]]></code>
</LessSpecificReturnStatement>
<MissingParamType>
<code><![CDATA[$workflow]]></code>
</MissingParamType>
<MoreSpecificReturnType>
<code><![CDATA[static]]></code>
</MoreSpecificReturnType>
<RedundantFunctionCall>
<code><![CDATA[\sprintf]]></code>
<code><![CDATA[\sprintf]]></code>
Expand Down Expand Up @@ -1475,6 +1469,8 @@
$converter ?? DataConverter::createDefault(),
$rpc ?? Goridge::create(),
$credentials,
$pluginRegistry ?? new PluginRegistry(),
$client,
)]]></code>
</UnsafeInstantiation>
</file>
Expand Down
32 changes: 31 additions & 1 deletion src/Client/ScheduleClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -32,6 +32,11 @@
use Temporal\DataConverter\DataConverter;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Internal\Mapper\ScheduleMapper;
use Temporal\Plugin\ConnectionPluginContext;
use Temporal\Plugin\ConnectionPluginInterface;
use Temporal\Plugin\PluginRegistry;
use Temporal\Plugin\ScheduleClientPluginContext;
use Temporal\Plugin\ScheduleClientPluginInterface;
use Temporal\Internal\Marshaller\Mapper\AttributeMapperFactory;
use Temporal\Internal\Marshaller\Marshaller;
use Temporal\Internal\Marshaller\MarshallerInterface;
Expand All @@ -45,14 +50,38 @@ final class ScheduleClient implements ScheduleClientInterface
private DataConverterInterface $converter;
private MarshallerInterface $marshaller;
private ProtoToArrayConverter $protoConverter;
private PluginRegistry $pluginRegistry;

public function __construct(
ServiceClientInterface $serviceClient,
?ClientOptions $options = null,
?DataConverterInterface $converter = null,
?PluginRegistry $pluginRegistry = null,
) {
$this->clientOptions = $options ?? new ClientOptions();
$this->converter = $converter ?? DataConverter::createDefault();
$this->pluginRegistry = $pluginRegistry ?? new PluginRegistry();

// Apply connection plugins (before client-level configuration)
$connectionContext = new ConnectionPluginContext($serviceClient);
foreach ($this->pluginRegistry->getPlugins(ConnectionPluginInterface::class) as $plugin) {
$plugin->configureServiceClient($connectionContext);
}
$serviceClient = $connectionContext->getServiceClient();

$pluginContext = new ScheduleClientPluginContext(
clientOptions: $this->clientOptions,
dataConverter: $this->converter,
);
foreach ($this->pluginRegistry->getPlugins(ScheduleClientPluginInterface::class) as $plugin) {
$plugin->configureScheduleClient($pluginContext);
}
$this->clientOptions = $pluginContext->getClientOptions();
$pluginConverter = $pluginContext->getDataConverter();
if ($pluginConverter !== null) {
$this->converter = $pluginConverter;
}

$this->marshaller = new Marshaller(
new AttributeMapperFactory(new AttributeReader()),
);
Expand All @@ -71,8 +100,9 @@ public static function create(
ServiceClientInterface $serviceClient,
?ClientOptions $options = null,
?DataConverterInterface $converter = null,
?PluginRegistry $pluginRegistry = null,
): ScheduleClientInterface {
return new self($serviceClient, $options, $converter);
return new self($serviceClient, $options, $converter, $pluginRegistry);
}

public function createSchedule(
Expand Down
68 changes: 62 additions & 6 deletions src/Client/WorkflowClient.php
Original file line number Diff line number Diff line change
Expand Up @@ -38,6 +38,14 @@
use Temporal\Interceptor\WorkflowClientCallsInterceptor;
use Temporal\Internal\Client\ActivityCompletionClient;
use Temporal\Internal\Client\WorkflowProxy;
use Temporal\Plugin\ClientPluginContext;
use Temporal\Plugin\ClientPluginInterface;
use Temporal\Plugin\CompositePipelineProvider;
use Temporal\Plugin\ConnectionPluginContext;
use Temporal\Plugin\ConnectionPluginInterface;
use Temporal\Plugin\PluginRegistry;
use Temporal\Plugin\ScheduleClientPluginInterface;
use Temporal\Plugin\WorkerPluginInterface;
use Temporal\Internal\Client\WorkflowRun;
use Temporal\Internal\Client\WorkflowStarter;
use Temporal\Internal\Client\WorkflowStub;
Expand All @@ -63,6 +71,7 @@ class WorkflowClient implements WorkflowClientInterface
private DataConverterInterface $converter;
private ?WorkflowStarter $starter = null;
private WorkflowReader $reader;
private PluginRegistry $pluginRegistry;

/** @var Pipeline<WorkflowClientCallsInterceptor, mixed> */
private Pipeline $interceptorPipeline;
Expand All @@ -72,11 +81,40 @@ public function __construct(
?ClientOptions $options = null,
?DataConverterInterface $converter = null,
?PipelineProvider $interceptorProvider = null,
?PluginRegistry $pluginRegistry = null,
) {
$this->interceptorPipeline = ($interceptorProvider ?? new SimplePipelineProvider())
->getPipeline(WorkflowClientCallsInterceptor::class);
$this->pluginRegistry = $pluginRegistry ?? new PluginRegistry();
$this->clientOptions = $options ?? new ClientOptions();
$this->converter = $converter ?? DataConverter::createDefault();

// Apply connection plugins (before client-level configuration)
$connectionContext = new ConnectionPluginContext($serviceClient);
foreach ($this->pluginRegistry->getPlugins(ConnectionPluginInterface::class) as $plugin) {
$plugin->configureServiceClient($connectionContext);
}
$serviceClient = $connectionContext->getServiceClient();

$pluginContext = new ClientPluginContext(
clientOptions: $this->clientOptions,
dataConverter: $this->converter,
);
foreach ($this->pluginRegistry->getPlugins(ClientPluginInterface::class) as $plugin) {
$plugin->configureClient($pluginContext);
}

$this->clientOptions = $pluginContext->getClientOptions();
$pluginConverter = $pluginContext->getDataConverter();
if ($pluginConverter !== null) {
$this->converter = $pluginConverter;
}

// Build interceptor pipeline: merge plugin-contributed interceptors with user-provided ones
$provider = new CompositePipelineProvider(
$pluginContext->getInterceptors(),
$interceptorProvider ?? new SimplePipelineProvider(),
);

$this->interceptorPipeline = $provider->getPipeline(WorkflowClientCallsInterceptor::class);
$this->reader = new WorkflowReader($this->createReader());

// Set Temporal-Namespace metadata
Expand All @@ -88,16 +126,34 @@ public function __construct(
);
}

/**
* @return static
*/
public static function create(
ServiceClientInterface $serviceClient,
?ClientOptions $options = null,
?DataConverterInterface $converter = null,
?PipelineProvider $interceptorProvider = null,
?PluginRegistry $pluginRegistry = null,
): self {
return new self($serviceClient, $options, $converter, $interceptorProvider);
return new self($serviceClient, $options, $converter, $interceptorProvider, $pluginRegistry);
}

/**
* Get plugins that also implement WorkerPluginInterface for propagation to workers.
*
* @return list<WorkerPluginInterface>
*/
public function getWorkerPlugins(): array
{
return $this->pluginRegistry->getPlugins(WorkerPluginInterface::class);
}

/**
* Get plugins that also implement ScheduleClientPluginInterface for propagation to schedule clients.
*
* @return list<ScheduleClientPluginInterface>
*/
public function getScheduleClientPlugins(): array
{
return $this->pluginRegistry->getPlugins(ScheduleClientPluginInterface::class);
}

public function getServiceClient(): ServiceClientInterface
Expand Down
18 changes: 16 additions & 2 deletions src/Interceptor/SimplePipelineProvider.php
Original file line number Diff line number Diff line change
Expand Up @@ -22,14 +22,28 @@ class SimplePipelineProvider implements PipelineProvider
* @param array<array-key, Interceptor> $interceptors
*/
public function __construct(
private iterable $interceptors = [],
private readonly iterable $interceptors = [],
) {}

/**
* Create a new provider with additional interceptors prepended.
*
* @param list<Interceptor> $interceptors Interceptors to prepend before existing ones.
*/
public function withPrependedInterceptors(array $interceptors): self
{
if ($interceptors === []) {
return $this;
}

return new self(\array_merge($interceptors, [...$this->interceptors]));
}

public function getPipeline(string $interceptorClass): Pipeline
{
return $this->cache[$interceptorClass] ??= Pipeline::prepare(
\array_filter(
$this->interceptors,
[...$this->interceptors],
static fn(Interceptor $i): bool => $i instanceof $interceptorClass,
),
);
Expand Down
2 changes: 1 addition & 1 deletion src/Internal/Interceptor/Pipeline.php
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ private function __construct(
/**
* Make sure that interceptors implement the same interface.
*
* @template T of Interceptor
* @template T of object
*
* @param iterable<T> $interceptors
*
Expand Down
8 changes: 8 additions & 0 deletions src/Internal/Transport/Router/GetWorkerInfo.php
Original file line number Diff line number Diff line change
Expand Up @@ -18,6 +18,8 @@
use Temporal\Internal\Declaration\Prototype\WorkflowPrototype;
use Temporal\Internal\Marshaller\MarshallerInterface;
use Temporal\Internal\Repository\RepositoryInterface;
use Temporal\Plugin\PluginInterface;
use Temporal\Plugin\PluginRegistry;
use Temporal\Worker\ServiceCredentials;
use Temporal\Worker\Transport\Command\ServerRequestInterface;
use Temporal\Worker\WorkerInterface;
Expand All @@ -28,6 +30,7 @@ public function __construct(
private readonly RepositoryInterface $queues,
private readonly MarshallerInterface $marshaller,
private readonly ServiceCredentials $credentials,
private readonly PluginRegistry $pluginRegistry,
) {}

public function handle(ServerRequestInterface $request, array $headers, Deferred $resolver): void
Expand All @@ -54,6 +57,10 @@ private function workerToArray(WorkerInterface $worker): array
'Name' => $activity->getID(),
];

$map = $this->map($this->pluginRegistry->getPlugins(PluginInterface::class), static fn(PluginInterface $plugin): array => [
'Name' => $plugin->getName(),
'Version' => null,
]);
return [
'TaskQueue' => $worker->getID(),
'Options' => $this->marshaller->marshal($worker->getOptions()),
Expand All @@ -62,6 +69,7 @@ private function workerToArray(WorkerInterface $worker): array
// ActivityInfo[]
'Activities' => $this->map($worker->getActivities(), $activityMap),
'PhpSdkVersion' => SdkVersion::getSdkVersion(),
'Plugins' => $map,
'Flags' => (object) $this->prepareFlags(),
];
}
Expand Down
34 changes: 34 additions & 0 deletions src/Plugin/AbstractPlugin.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,34 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Plugin;

/**
* Abstract base class providing no-op defaults for all plugin methods.
*
* Plugin authors can extend this and override only what they need.
*/
abstract class AbstractPlugin implements TemporalPluginInterface
{
use ConnectionPluginTrait;
use ClientPluginTrait;
use ScheduleClientPluginTrait;
use WorkerPluginTrait;

public function __construct(
private readonly string $name,
) {}

public function getName(): string
{
return $this->name;
}
}
81 changes: 81 additions & 0 deletions src/Plugin/ClientPluginContext.php
Original file line number Diff line number Diff line change
@@ -0,0 +1,81 @@
<?php

/**
* This file is part of Temporal package.
*
* For the full copyright and license information, please view the LICENSE
* file that was distributed with this source code.
*/

declare(strict_types=1);

namespace Temporal\Plugin;

use Temporal\Client\ClientOptions;
use Temporal\DataConverter\DataConverterInterface;
use Temporal\Internal\Interceptor\Interceptor;

/**
* Builder-style configuration context for workflow client plugins.
*
* Plugins modify this builder in {@see ClientPluginInterface::configureClient()}.
* Uses a fluent API similar to Java SDK's Options.Builder pattern.
*/
final class ClientPluginContext
{
/** @var list<Interceptor> */
private array $interceptors = [];

public function __construct(
private ClientOptions $clientOptions,
private ?DataConverterInterface $dataConverter = null,
) {}

public function getClientOptions(): ClientOptions
{
return $this->clientOptions;
}

public function setClientOptions(ClientOptions $clientOptions): self
{
$this->clientOptions = $clientOptions;
return $this;
}

public function getDataConverter(): ?DataConverterInterface
{
return $this->dataConverter;
}

public function setDataConverter(?DataConverterInterface $dataConverter): self
{
$this->dataConverter = $dataConverter;
return $this;
}

/**
* @return list<Interceptor>
*/
public function getInterceptors(): array
{
return $this->interceptors;
}

/**
* @param list<Interceptor> $interceptors
*/
public function setInterceptors(array $interceptors): self
{
$this->interceptors = $interceptors;
return $this;
}

/**
* Add an interceptor to the client pipeline.
*/
public function addInterceptor(Interceptor $interceptor): self
{
$this->interceptors[] = $interceptor;
return $this;
}
}
Loading