Skip to content

Commit 57e5e68

Browse files
committed
feat: Add workflow versioning
Signed-off-by: Javier Aliaga <javier@diagrid.io>
1 parent 08652db commit 57e5e68

File tree

11 files changed

+228
-28
lines changed

11 files changed

+228
-28
lines changed

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 4 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,6 +17,7 @@
1717
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
1818
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails;
1919
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
20+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
2021
import io.grpc.Channel;
2122
import io.grpc.ManagedChannel;
2223
import io.grpc.ManagedChannelBuilder;
@@ -42,7 +43,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
4243
private static final Logger logger = Logger.getLogger(DurableTaskGrpcWorker.class.getPackage().getName());
4344
private static final Duration DEFAULT_MAXIMUM_TIMER_INTERVAL = Duration.ofDays(3);
4445

45-
private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
46+
private final TaskOrchestrationFactories orchestrationFactories;
47+
4648
private final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
4749

4850
private final ManagedChannel managedSidecarChannel;
@@ -57,7 +59,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
5759
private Thread workerThread;
5860

5961
DurableTaskGrpcWorker(DurableTaskGrpcWorkerBuilder builder) {
60-
this.orchestrationFactories.putAll(builder.orchestrationFactories);
62+
this.orchestrationFactories = builder.orchestrationFactories;
6163
this.activityFactories.putAll(builder.activityFactories);
6264
this.appId = builder.appId;
6365

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorkerBuilder.java

Lines changed: 7 additions & 12 deletions
Original file line numberDiff line numberDiff line change
@@ -13,6 +13,8 @@
1313

1414
package io.dapr.durabletask;
1515

16+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
17+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactory;
1618
import io.grpc.Channel;
1719

1820
import java.time.Duration;
@@ -24,7 +26,7 @@
2426
*
2527
*/
2628
public final class DurableTaskGrpcWorkerBuilder {
27-
final HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
29+
TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories();
2830
final HashMap<String, TaskActivityFactory> activityFactories = new HashMap<>();
2931
int port;
3032
Channel channel;
@@ -40,20 +42,11 @@ public final class DurableTaskGrpcWorkerBuilder {
4042
* @return this builder object
4143
*/
4244
public DurableTaskGrpcWorkerBuilder addOrchestration(TaskOrchestrationFactory factory) {
43-
String key = factory.getName();
44-
if (key == null || key.length() == 0) {
45-
throw new IllegalArgumentException("A non-empty task orchestration name is required.");
46-
}
47-
48-
if (this.orchestrationFactories.containsKey(key)) {
49-
throw new IllegalArgumentException(
50-
String.format("A task orchestration factory named %s is already registered.", key));
51-
}
52-
53-
this.orchestrationFactories.put(key, factory);
45+
this.orchestrationFactories.addOrchestration(factory);
5446
return this;
5547
}
5648

49+
5750
/**
5851
* Adds an activity factory to be used by the constructed {@link DurableTaskGrpcWorker}.
5952
*
@@ -161,4 +154,6 @@ public DurableTaskGrpcWorkerBuilder appId(String appId) {
161154
public DurableTaskGrpcWorker build() {
162155
return new DurableTaskGrpcWorker(this);
163156
}
157+
158+
164159
}

durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRunner.java

Lines changed: 14 additions & 3 deletions
Original file line numberDiff line numberDiff line change
@@ -16,10 +16,11 @@
1616
import com.google.protobuf.InvalidProtocolBufferException;
1717
import com.google.protobuf.StringValue;
1818
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
19+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
20+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactory;
1921

2022
import java.time.Duration;
2123
import java.util.Base64;
22-
import java.util.HashMap;
2324
import java.util.logging.Logger;
2425

2526
/**
@@ -134,8 +135,8 @@ public static byte[] loadAndRun(byte[] orchestratorRequestBytes, TaskOrchestrati
134135
}
135136

136137
// Register the passed orchestration as the default ("*") orchestration
137-
HashMap<String, TaskOrchestrationFactory> orchestrationFactories = new HashMap<>();
138-
orchestrationFactories.put("*", new TaskOrchestrationFactory() {
138+
TaskOrchestrationFactories orchestrationFactories = new TaskOrchestrationFactories();
139+
orchestrationFactories.addOrchestration(new TaskOrchestrationFactory() {
139140
@Override
140141
public String getName() {
141142
return "*";
@@ -145,6 +146,16 @@ public String getName() {
145146
public TaskOrchestration create() {
146147
return orchestration;
147148
}
149+
150+
@Override
151+
public String getVersionName() {
152+
return "";
153+
}
154+
155+
@Override
156+
public Boolean isLatestVersion() {
157+
return false;
158+
}
148159
});
149160

150161
TaskOrchestrationExecutor taskOrchestrationExecutor = new TaskOrchestrationExecutor(

durabletask-client/src/main/java/io/dapr/durabletask/OrchestrationRuntimeStatus.java

Lines changed: 8 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -68,7 +68,12 @@ public enum OrchestrationRuntimeStatus {
6868
/**
6969
* The orchestration is in a suspended state.
7070
*/
71-
SUSPENDED;
71+
SUSPENDED,
72+
73+
/**
74+
* The orchestration is in a stalled state.
75+
*/
76+
STALLED;
7277

7378
static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.OrchestrationStatus status) {
7479
switch (status) {
@@ -88,6 +93,8 @@ static OrchestrationRuntimeStatus fromProtobuf(OrchestratorService.Orchestration
8893
return PENDING;
8994
case ORCHESTRATION_STATUS_SUSPENDED:
9095
return SUSPENDED;
96+
case ORCHESTRATION_STATUS_STALLED:
97+
return STALLED;
9198
default:
9299
throw new IllegalArgumentException(String.format("Unknown status value: %s", status));
93100
}

durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationContext.java

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -326,6 +326,15 @@ default void continueAsNew(Object input) {
326326
this.continueAsNew(input, true);
327327
}
328328

329+
/**
330+
* Check if the given patch name can be applied to the orchestration.
331+
*
332+
* @param patchName The name of the patch to check.
333+
* @return True if the given patch name can be applied to the orchestration, False otherwise.
334+
*/
335+
336+
boolean isPatched(String patchName);
337+
329338
/**
330339
* Restarts the orchestration with a new input and clears its history.
331340
*

durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestrationExecutor.java

Lines changed: 86 additions & 7 deletions
Original file line numberDiff line numberDiff line change
@@ -19,7 +19,11 @@
1919
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.ScheduleTaskAction.Builder;
2020
import io.dapr.durabletask.interruption.ContinueAsNewInterruption;
2121
import io.dapr.durabletask.interruption.OrchestratorBlockedException;
22+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactories;
23+
import io.dapr.durabletask.orchestration.TaskOrchestrationFactory;
24+
import io.dapr.durabletask.orchestration.exception.VersionNotRegisteredException;
2225
import io.dapr.durabletask.util.UuidGenerator;
26+
import io.opentelemetry.api.internal.StringUtils;
2327

2428
import javax.annotation.Nullable;
2529
import java.time.Duration;
@@ -47,14 +51,14 @@
4751
final class TaskOrchestrationExecutor {
4852

4953
private static final String EMPTY_STRING = "";
50-
private final HashMap<String, TaskOrchestrationFactory> orchestrationFactories;
54+
private final TaskOrchestrationFactories orchestrationFactories;
5155
private final DataConverter dataConverter;
5256
private final Logger logger;
5357
private final Duration maximumTimerInterval;
5458
private final String appId;
5559

5660
public TaskOrchestrationExecutor(
57-
HashMap<String, TaskOrchestrationFactory> orchestrationFactories,
61+
TaskOrchestrationFactories orchestrationFactories,
5862
DataConverter dataConverter,
5963
Duration maximumTimerInterval,
6064
Logger logger,
@@ -79,6 +83,9 @@ public TaskOrchestratorResult execute(List<OrchestratorService.HistoryEvent> pas
7983
}
8084
completed = true;
8185
logger.finest("The orchestrator execution completed normally");
86+
} catch (VersionNotRegisteredException versionNotRegisteredException) {
87+
logger.warning("The orchestrator version is not registered: " + versionNotRegisteredException.toString());
88+
context.setVersionNotRegistered();
8289
} catch (OrchestratorBlockedException orchestratorBlockedException) {
8390
logger.fine("The orchestrator has yielded and will await for new events.");
8491
} catch (ContinueAsNewInterruption continueAsNewInterruption) {
@@ -87,7 +94,7 @@ public TaskOrchestratorResult execute(List<OrchestratorService.HistoryEvent> pas
8794
} catch (Exception e) {
8895
// The orchestrator threw an unhandled exception - fail it
8996
// TODO: What's the right way to log this?
90-
logger.warning("The orchestrator failed with an unhandled exception: " + e.toString());
97+
logger.warning("The orchestrator failed with an unhandled exception: " + e);
9198
context.fail(new FailureDetails(e));
9299
}
93100

@@ -97,12 +104,16 @@ public TaskOrchestratorResult execute(List<OrchestratorService.HistoryEvent> pas
97104
context.complete(null);
98105
}
99106

100-
return new TaskOrchestratorResult(context.pendingActions.values(), context.getCustomStatus());
107+
return new TaskOrchestratorResult(context.pendingActions.values(),
108+
context.getCustomStatus(),
109+
context.version,
110+
context.encounteredPatches);
101111
}
102112

103113
private class ContextImplTask implements TaskOrchestrationContext {
104114

105115
private String orchestratorName;
116+
private final List<String> encounteredPatches = new ArrayList<>();
106117
private String rawInput;
107118
private String instanceId;
108119
private Instant currentInstant;
@@ -127,6 +138,11 @@ private class ContextImplTask implements TaskOrchestrationContext {
127138
private Object continuedAsNewInput;
128139
private boolean preserveUnprocessedEvents;
129140
private Object customStatus;
141+
private final Map<String, Boolean> appliedPatches = new HashMap<>();
142+
private final Map<String, Boolean> historyPatches = new HashMap<>();
143+
144+
private OrchestratorService.OrchestrationVersion orchestratorStartedVersion;
145+
private String version;
130146

131147
public ContextImplTask(List<OrchestratorService.HistoryEvent> pastEvents,
132148
List<OrchestratorService.HistoryEvent> newEvents) {
@@ -144,6 +160,7 @@ private void setName(String name) {
144160
this.orchestratorName = name;
145161
}
146162

163+
147164
private void setInput(String rawInput) {
148165
this.rawInput = rawInput;
149166
}
@@ -363,6 +380,34 @@ public <V> Task<V> callActivity(
363380
return this.createAppropriateTask(taskFactory, options);
364381
}
365382

383+
@Override
384+
public boolean isPatched(String patchName) {
385+
var isPatched = this.checkPatch(patchName);
386+
if (isPatched) {
387+
this.encounteredPatches.add(patchName);
388+
}
389+
390+
return isPatched;
391+
}
392+
393+
public boolean checkPatch(String patchName) {
394+
if (this.appliedPatches.containsKey(patchName)) {
395+
return this.appliedPatches.get(patchName);
396+
}
397+
398+
if (this.historyPatches.containsKey(patchName)) {
399+
this.appliedPatches.put(patchName, true);
400+
return true;
401+
}
402+
403+
if (this.isReplaying) {
404+
this.appliedPatches.put(patchName, false);
405+
return false;
406+
}
407+
this.appliedPatches.put(patchName, true);
408+
return true;
409+
}
410+
366411
@Override
367412
public void continueAsNew(Object input, boolean preserveUnprocessedEvents) {
368413
Helpers.throwIfOrchestratorComplete(this.isComplete);
@@ -924,6 +969,7 @@ private void processEvent(OrchestratorService.HistoryEvent e) {
924969
case ORCHESTRATORSTARTED:
925970
Instant instant = DataConverter.getInstantFromTimestamp(e.getTimestamp());
926971
this.setCurrentInstant(instant);
972+
this.orchestratorStartedVersion = e.getOrchestratorStarted().getVersion();
927973
this.logger.fine(() -> this.instanceId + ": Workflow orchestrator started");
928974
break;
929975
case ORCHESTRATORCOMPLETED:
@@ -938,18 +984,32 @@ private void processEvent(OrchestratorService.HistoryEvent e) {
938984
this.logger.fine(() -> this.instanceId + ": Workflow execution started");
939985
this.setAppId(e.getRouter().getSourceAppID());
940986

987+
if (this.orchestratorStartedVersion != null
988+
&& this.orchestratorStartedVersion.getPatchesCount() > 0) {
989+
for (var patch : this.orchestratorStartedVersion.getPatchesList()) {
990+
this.historyPatches.put(patch, true);
991+
}
992+
}
993+
994+
var versionName = "";
995+
if (this.orchestratorStartedVersion != null && !StringUtils.isNullOrEmpty(this.orchestratorStartedVersion.getName())) {
996+
versionName = this.orchestratorStartedVersion.getName();
997+
}
998+
941999
// Create and invoke the workflow orchestrator
942-
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories
943-
.get(executionStarted.getName());
1000+
TaskOrchestrationFactory factory = TaskOrchestrationExecutor.this.orchestrationFactories.getOrchestrationFactory(executionStarted.getName(), versionName);
1001+
9441002
if (factory == null) {
9451003
// Try getting the default orchestrator
946-
factory = TaskOrchestrationExecutor.this.orchestrationFactories.get("*");
1004+
factory = TaskOrchestrationExecutor.this.orchestrationFactories.getOrchestrationFactory("*");
9471005
}
9481006
// TODO: Throw if the factory is null (orchestration by that name doesn't exist)
9491007
if (factory == null) {
9501008
throw new IllegalStateException("No factory found for orchestrator: " + executionStarted.getName());
9511009
}
9521010

1011+
this.version = factory.getVersionName();
1012+
9531013
TaskOrchestration orchestrator = factory.create();
9541014
orchestrator.run(this);
9551015
break;
@@ -959,6 +1019,9 @@ private void processEvent(OrchestratorService.HistoryEvent e) {
9591019
case EXECUTIONTERMINATED:
9601020
this.handleExecutionTerminated(e);
9611021
break;
1022+
case EXECUTIONSTALLED:
1023+
this.logger.fine(() -> this.instanceId + ": Workflow execution stalled");
1024+
break;
9621025
case TASKSCHEDULED:
9631026
this.handleTaskScheduled(e);
9641027
break;
@@ -998,6 +1061,22 @@ private void processEvent(OrchestratorService.HistoryEvent e) {
9981061
}
9991062
}
10001063

1064+
public void setVersionNotRegistered() {
1065+
this.pendingActions.clear();
1066+
1067+
OrchestratorService.CompleteOrchestrationAction.Builder builder = OrchestratorService.CompleteOrchestrationAction
1068+
.newBuilder();
1069+
builder.setOrchestrationStatus(OrchestratorService.OrchestrationStatus.ORCHESTRATION_STATUS_STALLED);
1070+
1071+
int id = this.sequenceNumber++;
1072+
OrchestratorService.OrchestratorAction action = OrchestratorService.OrchestratorAction.newBuilder()
1073+
.setId(id)
1074+
.setCompleteOrchestration(builder.build())
1075+
.build();
1076+
this.pendingActions.put(id, action);
1077+
1078+
}
1079+
10011080
private class TaskRecord<V> {
10021081
private final CompletableTask<V> task;
10031082
private final String taskName;

durabletask-client/src/main/java/io/dapr/durabletask/TaskOrchestratorResult.java

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -17,17 +17,23 @@
1717

1818
import java.util.Collection;
1919
import java.util.Collections;
20+
import java.util.List;
2021

2122
final class TaskOrchestratorResult {
2223

2324
private final Collection<OrchestratorService.OrchestratorAction> actions;
2425

2526
private final String customStatus;
2627

27-
public TaskOrchestratorResult(Collection<OrchestratorService.OrchestratorAction> actions, String customStatus) {
28+
private final String version;
29+
30+
private final List<String> patches;
31+
32+
public TaskOrchestratorResult(Collection<OrchestratorService.OrchestratorAction> actions, String customStatus, String version, List<String> patches) {
2833
this.actions = Collections.unmodifiableCollection(actions);
29-
;
3034
this.customStatus = customStatus;
35+
this.version = version;
36+
this.patches = patches;
3137
}
3238

3339
public Collection<OrchestratorService.OrchestratorAction> getActions() {

0 commit comments

Comments
 (0)