Skip to content

Commit dfacaa9

Browse files
committed
Workflow versioning
Signed-off-by: Albert Callarisa <[email protected]>
1 parent de3eef6 commit dfacaa9

File tree

7 files changed

+415
-2
lines changed

7 files changed

+415
-2
lines changed

dev-requirements.txt

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -14,7 +14,7 @@ Flask>=1.1
1414
# needed for auto fix
1515
ruff===0.14.1
1616
# needed for dapr-ext-workflow
17-
durabletask-dapr >= 0.2.0a15
17+
durabletask-dapr >= 0.2.0a16
1818
# needed for .env file loading in examples
1919
python-dotenv>=1.0.0
2020
# needed for enhanced schema generation from function features

examples/workflow/README.md

Lines changed: 47 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -461,3 +461,50 @@ app1 - received workflow error from app2
461461
```
462462
among others. This shows that the workflow calls are failing as expected, and they are being handled as expected too.
463463

464+
465+
### Versioning
466+
467+
This example demonstrates how to version a workflow.
468+
The test consists of two parts:
469+
1. Uses most of the common features of the workflow versioning. It also leaves some workflows stalled to demonstrate the stalled workflow feature.
470+
2. Fixes the stalled workflows to get them to completion.
471+
472+
It had to be done in two parts because the runtime needs to be restarted in order to rerun stalled workflows.
473+
474+
The Dapr CLI can be started using the following command:
475+
476+
<!--STEP
477+
name: Run the versioning example
478+
match_order: none
479+
expected_stdout_lines:
480+
- "== APP == test1: triggering workflow"
481+
- "== APP == test1: Received workflow call for version1"
482+
- "== APP == test1: Finished workflow for version1"
483+
- "== APP == test2: triggering workflow"
484+
- "== APP == test2: Received workflow call for version1"
485+
- "== APP == test2: Finished workflow for version1"
486+
- "== APP == test3: triggering workflow"
487+
- "== APP == test3: Received workflow call for version2"
488+
- "== APP == test3: Finished workflow for version2"
489+
- "== APP == test4: start"
490+
- "== APP == test4: patch1 is patched"
491+
- "== APP == test5: start"
492+
- "== APP == test5: patch1 is not patched"
493+
- "== APP == test5: patch2 is patched"
494+
- "== APP == test6: start"
495+
- "== APP == test6: patch1 is patched"
496+
- "== APP == test6: patch2 is patched"
497+
- "== APP == test7: Received workflow call for version1"
498+
- "== APP == test7: Workflow is stalled"
499+
- "== APP == test8: Workflow is stalled"
500+
- "== APP == test100: part2"
501+
- "== APP == test100: Finished stalled version1 workflow"
502+
- "== APP == test100: Finished stalled patching workflow"
503+
timeout_seconds: 60
504+
-->
505+
506+
```sh
507+
dapr run --app-id wf-versioning-example -- python3 versioning.py part1
508+
dapr run --app-id wf-versioning-example --log-level debug -- python3 versioning.py part2
509+
```
510+
<!--END_STEP-->

examples/workflow/versioning.py

Lines changed: 291 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,291 @@
1+
# -*- coding: utf-8 -*-
2+
# Copyright 2026 The Dapr Authors
3+
# Licensed under the Apache License, Version 2.0 (the "License");
4+
# you may not use this file except in compliance with the License.
5+
# You may obtain a copy of the License at
6+
# http://www.apache.org/licenses/LICENSE-2.0
7+
# Unless required by applicable law or agreed to in writing, software
8+
# distributed under the License is distributed on an "AS IS" BASIS,
9+
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
# See the License for the specific language governing permissions and
11+
# limitations under the License.
12+
13+
import sys
14+
import time
15+
16+
import dapr.ext.workflow as wf
17+
18+
current_test = 0
19+
20+
21+
def print_test(message):
22+
print(f'test{current_test}: {message}', flush=True)
23+
24+
25+
print_activity = None
26+
27+
wfr = None
28+
29+
30+
def new_wfr():
31+
global wfr
32+
global print_activity
33+
34+
if wfr is not None:
35+
wfr.shutdown()
36+
wfr = wf.WorkflowRuntime()
37+
38+
def print_activity(ctx, input):
39+
return print_test(input)
40+
41+
wfr.register_activity(print_activity, name='print_activity')
42+
43+
44+
new_wfr()
45+
46+
47+
def test_full_versioning(client: wf.DaprWorkflowClient):
48+
global current_test
49+
50+
# Start with only one version defined. Runnig the workflow should run this version as it normally would.
51+
current_test = 1
52+
53+
@wfr.versioned_workflow(name='workflow', is_latest=True)
54+
def version1_workflow(ctx: wf.DaprWorkflowContext):
55+
yield ctx.call_activity(print_activity, input='Received workflow call for version1')
56+
yield ctx.wait_for_external_event(name='event')
57+
yield ctx.call_activity(print_activity, input='Finished workflow for version1')
58+
return 1
59+
60+
print_test('triggering workflow')
61+
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
62+
client.raise_workflow_event(instance_id, event_name='event')
63+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
64+
65+
# Now we start a workflow, but introduce a latest version half way. It should resume the execution in the old version.
66+
current_test = 2
67+
print_test('triggering workflow')
68+
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
69+
time.sleep(2) # wait for the workflow to start and wait for the event
70+
71+
@wfr.versioned_workflow(name='workflow', is_latest=True)
72+
def version2_workflow(ctx: wf.DaprWorkflowContext):
73+
yield ctx.call_activity(print_activity, input='Received workflow call for version2')
74+
yield ctx.wait_for_external_event(name='event')
75+
yield ctx.call_activity(print_activity, input='Finished workflow for version2')
76+
return 1
77+
78+
client.raise_workflow_event(instance_id, event_name='event')
79+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
80+
81+
# Now we have the two versions defined, running the workflow now should run v2 as it's the latest version.
82+
current_test = 3
83+
print_test('triggering workflow')
84+
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
85+
client.raise_workflow_event(instance_id, event_name='event')
86+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
87+
88+
89+
def test_patching(client: wf.DaprWorkflowClient):
90+
global current_test
91+
92+
@wfr.workflow
93+
def patching_workflow(ctx: wf.DaprWorkflowContext):
94+
# This function will be changed throughout the test, to simulate different scenarios
95+
return workflow_code(ctx)
96+
97+
# Runs the patched branch by default
98+
current_test = 4
99+
100+
def workflow_code_v1_patch1_only(ctx: wf.DaprWorkflowContext):
101+
yield ctx.call_activity(print_activity, input='start')
102+
if ctx.is_patched('patch1'):
103+
yield ctx.call_activity(print_activity, input='patch1 is patched')
104+
else:
105+
yield ctx.call_activity(print_activity, input='patch1 is not patched')
106+
return 1
107+
108+
workflow_code = workflow_code_v1_patch1_only
109+
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
110+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
111+
112+
# When the execution passed the place where a patch is introduced, it should be not patched.
113+
def workflow_code_v2_patch2_after_event(ctx: wf.DaprWorkflowContext):
114+
yield ctx.call_activity(print_activity, input='start')
115+
yield ctx.wait_for_external_event(name='event')
116+
if ctx.is_patched('patch2'):
117+
yield ctx.call_activity(print_activity, input='patch2 is patched')
118+
else:
119+
yield ctx.call_activity(print_activity, input='patch2 is not patched')
120+
return 1
121+
122+
workflow_code = workflow_code_v2_patch2_after_event
123+
current_test = 5
124+
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
125+
time.sleep(2)
126+
127+
def workflow_code_v3_patch1_and_patch2_with_event(ctx: wf.DaprWorkflowContext):
128+
yield ctx.call_activity(print_activity, input='start')
129+
if ctx.is_patched('patch1'):
130+
yield ctx.call_activity(print_activity, input='patch1 is patched')
131+
else:
132+
yield ctx.call_activity(print_activity, input='patch1 is not patched')
133+
yield ctx.wait_for_external_event(name='event')
134+
if ctx.is_patched('patch2'):
135+
yield ctx.call_activity(print_activity, input='patch2 is patched')
136+
else:
137+
yield ctx.call_activity(print_activity, input='patch2 is not patched')
138+
return 1
139+
140+
workflow_code = workflow_code_v3_patch1_and_patch2_with_event
141+
client.raise_workflow_event(instance_id, event_name='event')
142+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
143+
144+
# It remembers previous patches.
145+
def workflow_code_v4_silence_patch1(ctx: wf.DaprWorkflowContext):
146+
yield ctx.call_activity(print_activity, input='start')
147+
if ctx.is_patched('patch1'):
148+
pass # keep it silenced for now, we'll add logs later and this ones would confuse the test
149+
else:
150+
pass
151+
yield ctx.wait_for_external_event(name='event')
152+
if ctx.is_patched('patch2'):
153+
yield ctx.call_activity(print_activity, input='patch2 is patched')
154+
else:
155+
yield ctx.call_activity(print_activity, input='patch2 is not patched')
156+
return 1
157+
158+
workflow_code = workflow_code_v4_silence_patch1
159+
current_test = 6
160+
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
161+
time.sleep(2)
162+
163+
workflow_code = workflow_code_v3_patch1_and_patch2_with_event
164+
client.raise_workflow_event(instance_id, event_name='event')
165+
client.wait_for_workflow_completion(instance_id, timeout_in_seconds=30)
166+
167+
168+
def test_full_versioning_stall(client: wf.DaprWorkflowClient):
169+
global current_test
170+
171+
new_wfr()
172+
173+
@wfr.versioned_workflow(name='stall_workflow', is_latest=True)
174+
def version1_workflow(ctx: wf.DaprWorkflowContext):
175+
yield ctx.call_activity(print_activity, input='Received workflow call for version1')
176+
yield ctx.wait_for_external_event(name='event')
177+
yield ctx.call_activity(print_activity, input='Finished workflow for version1')
178+
return 1
179+
180+
wfr.start()
181+
current_test = 7
182+
instance_id = client.schedule_new_workflow(workflow=version1_workflow)
183+
time.sleep(3)
184+
new_wfr()
185+
186+
@wfr.versioned_workflow(name='stall_workflow', is_latest=True)
187+
def version2_workflow(ctx: wf.DaprWorkflowContext):
188+
yield ctx.call_activity(print_activity, input='Received workflow call for version2')
189+
yield ctx.wait_for_external_event(name='event')
190+
yield ctx.call_activity(print_activity, input='Finished workflow for version2')
191+
return 1
192+
193+
wfr.start()
194+
client.raise_workflow_event(instance_id, event_name='event')
195+
time.sleep(2)
196+
md = client.get_workflow_state(instance_id)
197+
if md.runtime_status == wf.WorkflowStatus.STALLED:
198+
print_test('Workflow is stalled')
199+
else:
200+
print_test('Workflow is not stalled')
201+
202+
203+
def test_patching_stall(client: wf.DaprWorkflowClient):
204+
global current_test
205+
206+
current_test = 8
207+
208+
@wfr.workflow
209+
def patching_workflow(ctx: wf.DaprWorkflowContext):
210+
# This function will be changed throughout the test, to simulate different scenarios
211+
return workflow_code(ctx)
212+
213+
def workflow_code_v1_with_patch1_check(ctx: wf.DaprWorkflowContext):
214+
if ctx.is_patched('patch1'):
215+
pass
216+
else:
217+
pass
218+
yield ctx.wait_for_external_event(name='event')
219+
return 1
220+
221+
workflow_code = workflow_code_v1_with_patch1_check
222+
instance_id = client.schedule_new_workflow(workflow=patching_workflow)
223+
time.sleep(2)
224+
225+
def workflow_code_v2_without_patch1_check(ctx: wf.DaprWorkflowContext):
226+
# Removed patch1 check
227+
yield ctx.wait_for_external_event(name='event')
228+
return 1
229+
230+
workflow_code = workflow_code_v2_without_patch1_check
231+
client.raise_workflow_event(instance_id, event_name='event')
232+
time.sleep(2)
233+
md = client.get_workflow_state(instance_id)
234+
if md.runtime_status == wf.WorkflowStatus.STALLED:
235+
print_test('Workflow is stalled')
236+
else:
237+
print_test('Workflow is not stalled')
238+
239+
240+
def main():
241+
args = sys.argv[1:]
242+
if len(args) == 0:
243+
print('Usage: python versioning.py <part1|part2>')
244+
return
245+
if args[0] == 'part1':
246+
wfr.start()
247+
time.sleep(2) # wait for workflow runtime to start
248+
client = wf.DaprWorkflowClient()
249+
250+
test_full_versioning(client)
251+
test_patching(client)
252+
253+
test_full_versioning_stall(client)
254+
test_patching_stall(client)
255+
wfr.shutdown()
256+
elif args[0] == 'part2':
257+
global current_test
258+
current_test = 100
259+
print_test('part2')
260+
261+
@wfr.versioned_workflow(name='stall_workflow', is_latest=False)
262+
def version1_workflow(ctx: wf.DaprWorkflowContext):
263+
yield ctx.call_activity(print_activity, input='Received workflow call for version1')
264+
yield ctx.wait_for_external_event(name='event')
265+
yield ctx.call_activity(print_activity, input='Finished stalled version1 workflow')
266+
return 1
267+
268+
@wfr.versioned_workflow(name='stall_workflow', is_latest=True)
269+
def version2_workflow(ctx: wf.DaprWorkflowContext):
270+
yield ctx.call_activity(print_activity, input='Received workflow call for version2')
271+
yield ctx.wait_for_external_event(name='event')
272+
yield ctx.call_activity(print_activity, input='Finished stalled version2 workflow')
273+
return 1
274+
275+
@wfr.workflow
276+
def patching_workflow(ctx: wf.DaprWorkflowContext):
277+
if ctx.is_patched('patch1'):
278+
pass
279+
else:
280+
pass
281+
yield ctx.wait_for_external_event(name='event')
282+
yield ctx.call_activity(print_activity, input='Finished stalled patching workflow')
283+
return 1
284+
285+
wfr.start()
286+
time.sleep(10)
287+
wfr.shutdown()
288+
289+
290+
if __name__ == '__main__':
291+
main()

ext/dapr-ext-workflow/dapr/ext/workflow/dapr_workflow_context.py

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -153,6 +153,10 @@ def continue_as_new(self, new_input: Any, *, save_events: bool = False) -> None:
153153
self._logger.debug(f'{self.instance_id}: Continuing as new')
154154
self.__obj.continue_as_new(new_input, save_events=save_events)
155155

156+
def is_patched(self, patch_name: str) -> bool:
157+
self._logger.debug(f'{self.instance_id}: Checking if {patch_name} is patched')
158+
return self.__obj.is_patched(patch_name)
159+
156160

157161
def when_all(tasks: List[task.Task[T]]) -> task.WhenAllTask[T]:
158162
"""Returns a task that completes when all of the provided tasks complete or when one of the

0 commit comments

Comments
 (0)