Skip to content

Commit e575c28

Browse files
committed
chore: Refactor durable task and introduce telemetry
Signed-off-by: Javier Aliaga <javier@diagrid.io>
1 parent 7d46cfa commit e575c28

File tree

5 files changed

+228
-60
lines changed

5 files changed

+228
-60
lines changed

durabletask-client/src/main/java/io/dapr/durabletask/DurableTaskGrpcWorker.java

Lines changed: 9 additions & 53 deletions
Original file line numberDiff line numberDiff line change
@@ -14,14 +14,16 @@
1414
package io.dapr.durabletask;
1515

1616
import com.google.protobuf.StringValue;
17+
import io.dapr.durabletask.activity.ActivityRunner;
1718
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
18-
import io.dapr.durabletask.implementation.protobuf.OrchestratorService.TaskFailureDetails;
1919
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
2020
import io.grpc.Channel;
2121
import io.grpc.ManagedChannel;
2222
import io.grpc.ManagedChannelBuilder;
2323
import io.grpc.Status;
2424
import io.grpc.StatusRuntimeException;
25+
import io.opentelemetry.api.GlobalOpenTelemetry;
26+
import io.opentelemetry.api.trace.Tracer;
2527
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
2628
import io.opentelemetry.context.Context;
2729
import io.opentelemetry.context.propagation.TextMapGetter;
@@ -54,6 +56,7 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
5456
private final Duration maximumTimerInterval;
5557
private final ExecutorService workerPool;
5658
private final String appId; // App ID for cross-app routing
59+
private final Tracer tracer;
5760

5861
private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
5962
private final boolean isExecutorServiceManaged;
@@ -85,6 +88,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
8588
sidecarGrpcChannel = this.managedSidecarChannel;
8689
}
8790

91+
this.tracer = GlobalOpenTelemetry.getTracer("dapr-workflow");
92+
8893
this.sidecarClient = TaskHubSidecarServiceGrpc.newBlockingStub(sidecarGrpcChannel);
8994
this.dataConverter = builder.dataConverter != null ? builder.dataConverter : new JacksonDataConverter();
9095
this.maximumTimerInterval = builder.maximumTimerInterval != null ? builder.maximumTimerInterval
@@ -95,6 +100,8 @@ public final class DurableTaskGrpcWorker implements AutoCloseable {
95100
this.workerPool = Context.taskWrapping(rawExecutor);
96101

97102
this.isExecutorServiceManaged = builder.executorService == null;
103+
104+
98105
}
99106

100107
/**
@@ -217,59 +224,8 @@ public void startAndBlock() {
217224
activityRequest.getOrchestrationInstance().getInstanceId(),
218225
Context.current()));
219226

220-
// Extract trace context from the ActivityRequest and set it as current
221-
Context traceContext = extractTraceContext(activityRequest);
222-
223-
// TODO: Error handling
224-
this.workerPool.submit(() -> {
225-
String output = null;
226-
TaskFailureDetails failureDetails = null;
227-
try {
228-
output = taskActivityExecutor.execute(
229-
activityRequest.getName(),
230-
activityRequest.getInput().getValue(),
231-
activityRequest.getTaskExecutionId(),
232-
activityRequest.getTaskId(),
233-
activityRequest.getParentTraceContext().getTraceParent());
234-
} catch (Throwable e) {
235-
failureDetails = TaskFailureDetails.newBuilder()
236-
.setErrorType(e.getClass().getName())
237-
.setErrorMessage(e.getMessage())
238-
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
239-
.build();
240-
}
241-
242-
OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse
243-
.newBuilder()
244-
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
245-
.setTaskId(activityRequest.getTaskId())
246-
.setCompletionToken(workItem.getCompletionToken());
247-
248-
if (output != null) {
249-
responseBuilder.setResult(StringValue.of(output));
250-
}
251-
252-
if (failureDetails != null) {
253-
responseBuilder.setFailureDetails(failureDetails);
254-
}
227+
this.workerPool.submit(new ActivityRunner(workItem, taskActivityExecutor, sidecarClient, tracer));
255228

256-
try {
257-
this.sidecarClient.completeActivityTask(responseBuilder.build());
258-
} catch (StatusRuntimeException e) {
259-
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
260-
logger.log(Level.WARNING,
261-
"The sidecar at address {0} is unavailable while completing the activity task.",
262-
this.getSidecarAddress());
263-
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
264-
logger.log(Level.WARNING,
265-
"Durable Task worker has disconnected from {0} while completing the activity task.",
266-
this.getSidecarAddress());
267-
} else {
268-
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
269-
this.getSidecarAddress());
270-
}
271-
}
272-
});
273229
} else if (requestType == OrchestratorService.WorkItem.RequestCase.HEALTHPING) {
274230
// No-op
275231
} else {

durabletask-client/src/main/java/io/dapr/durabletask/FailureDetails.java

Lines changed: 7 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -124,7 +124,13 @@ public boolean isCausedBy(Class<? extends Exception> exceptionClass) {
124124
}
125125
}
126126

127-
static String getFullStackTrace(Throwable e) {
127+
/**
128+
* Gets the full stack trace of the specified exception.
129+
*
130+
* @param e the exception
131+
* @return the full stack trace of the exception
132+
*/
133+
public static String getFullStackTrace(Throwable e) {
128134
StackTraceElement[] elements = e.getStackTrace();
129135

130136
// Plan for 256 characters per stack frame (which is likely on the high-end)

durabletask-client/src/main/java/io/dapr/durabletask/TaskActivityExecutor.java

Lines changed: 19 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,11 +16,18 @@
1616
import java.util.HashMap;
1717
import java.util.logging.Logger;
1818

19-
final class TaskActivityExecutor {
19+
public final class TaskActivityExecutor {
2020
private final HashMap<String, TaskActivityFactory> activityFactories;
2121
private final DataConverter dataConverter;
2222
private final Logger logger;
2323

24+
/**
25+
* Constructor.
26+
*
27+
* @param activityFactories the activity factories to use for creating activities
28+
* @param dataConverter the data converter to use for serializing and deserializing activity inputs and outputs
29+
* @param logger the logger to use for logging
30+
*/
2431
public TaskActivityExecutor(
2532
HashMap<String, TaskActivityFactory> activityFactories,
2633
DataConverter dataConverter,
@@ -30,6 +37,17 @@ public TaskActivityExecutor(
3037
this.logger = logger;
3138
}
3239

40+
/**
41+
* Executes an activity task.
42+
*
43+
* @param taskName the name of the activity task to execute
44+
* @param input the serialized input payload for the activity task
45+
* @param taskExecutionId Unique ID for the task execution.
46+
* @param taskId Auto-incrementing ID for the task.
47+
* @param traceParent The traceparent header value.
48+
* @return the serialized output payload for the activity task, or null if the activity task returned null.
49+
* @throws Throwable if an unhandled exception occurs during activity task execution.
50+
*/
3351
public String execute(String taskName, String input,
3452
String taskExecutionId, int taskId, String traceParent) throws Throwable {
3553
TaskActivityFactory factory = this.activityFactories.get(taskName);
Lines changed: 193 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,193 @@
1+
/*
2+
* Copyright 2026 The Dapr Authors
3+
* Licensed under the Apache License, Version 2.0 (the "License");
4+
* you may not use this file except in compliance with the License.
5+
* You may obtain a copy of the License at
6+
* http://www.apache.org/licenses/LICENSE-2.0
7+
* Unless required by applicable law or agreed to in writing, software
8+
* distributed under the License is distributed on an "AS IS" BASIS,
9+
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
10+
* See the License for the specific language governing permissions and
11+
limitations under the License.
12+
*/
13+
14+
package io.dapr.durabletask.activity;
15+
16+
import com.google.protobuf.StringValue;
17+
import io.dapr.durabletask.FailureDetails;
18+
import io.dapr.durabletask.TaskActivityExecutor;
19+
import io.dapr.durabletask.implementation.protobuf.OrchestratorService;
20+
import io.dapr.durabletask.implementation.protobuf.TaskHubSidecarServiceGrpc;
21+
import io.grpc.Status;
22+
import io.grpc.StatusRuntimeException;
23+
import io.opentelemetry.api.trace.Span;
24+
import io.opentelemetry.api.trace.SpanKind;
25+
import io.opentelemetry.api.trace.StatusCode;
26+
import io.opentelemetry.api.trace.Tracer;
27+
import io.opentelemetry.api.trace.propagation.W3CTraceContextPropagator;
28+
import io.opentelemetry.context.Context;
29+
import io.opentelemetry.context.Scope;
30+
import io.opentelemetry.context.propagation.TextMapGetter;
31+
32+
import javax.annotation.Nullable;
33+
import java.util.HashMap;
34+
import java.util.Map;
35+
import java.util.logging.Level;
36+
import java.util.logging.Logger;
37+
38+
public class ActivityRunner implements Runnable {
39+
private static final Logger logger = Logger.getLogger(ActivityRunner.class.getPackage().getName());
40+
41+
private final OrchestratorService.ActivityRequest activityRequest;
42+
private final TaskActivityExecutor taskActivityExecutor;
43+
private final TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient;
44+
private final OrchestratorService.WorkItem workItem;
45+
@Nullable
46+
private final Tracer tracer;
47+
48+
/**
49+
* Constructor.
50+
*
51+
* <p> This class executes the activity requests</p>
52+
*
53+
* @param workItem work item to be executed
54+
* @param taskActivityExecutor executor for the activity
55+
* @param sidecarClient sidecar client to communicate with the sidecar
56+
* @param tracer tracer to be used for tracing
57+
*/
58+
public ActivityRunner(
59+
OrchestratorService.WorkItem workItem,
60+
TaskActivityExecutor taskActivityExecutor,
61+
TaskHubSidecarServiceGrpc.TaskHubSidecarServiceBlockingStub sidecarClient,
62+
@Nullable Tracer tracer) {
63+
this.activityRequest = workItem.getActivityRequest();
64+
this.workItem = workItem;
65+
this.taskActivityExecutor = taskActivityExecutor;
66+
this.sidecarClient = sidecarClient;
67+
this.tracer = tracer;
68+
}
69+
70+
@Override
71+
public void run() {
72+
if (tracer != null) {
73+
runWithTracing();
74+
} else {
75+
runWithoutTracing();
76+
}
77+
}
78+
79+
private void runWithTracing() {
80+
Context parentContext = extractTraceContext();
81+
82+
Span span = tracer.spanBuilder("activity:" + activityRequest.getName())
83+
.setParent(parentContext)
84+
.setSpanKind(SpanKind.INTERNAL)
85+
.setAttribute("durabletask.task.instance_id",
86+
activityRequest.getOrchestrationInstance().getInstanceId())
87+
.setAttribute("durabletask.task.id", activityRequest.getTaskId())
88+
.setAttribute("durabletask.activity.name", activityRequest.getName())
89+
.startSpan();
90+
91+
try (Scope scope = span.makeCurrent()) {
92+
executeActivity(span);
93+
} finally {
94+
span.end();
95+
}
96+
}
97+
98+
private void runWithoutTracing() {
99+
executeActivity(null);
100+
}
101+
102+
private void executeActivity(@Nullable Span span) {
103+
String output = null;
104+
OrchestratorService.TaskFailureDetails failureDetails = null;
105+
try {
106+
output = taskActivityExecutor.execute(
107+
activityRequest.getName(),
108+
activityRequest.getInput().getValue(),
109+
activityRequest.getTaskExecutionId(),
110+
activityRequest.getTaskId(),
111+
activityRequest.getParentTraceContext().getTraceParent());
112+
} catch (Throwable e) {
113+
if (span != null) {
114+
span.setStatus(StatusCode.ERROR, e.getMessage());
115+
span.recordException(e);
116+
}
117+
failureDetails = OrchestratorService.TaskFailureDetails.newBuilder()
118+
.setErrorType(e.getClass().getName())
119+
.setErrorMessage(e.getMessage())
120+
.setStackTrace(StringValue.of(FailureDetails.getFullStackTrace(e)))
121+
.build();
122+
}
123+
124+
OrchestratorService.ActivityResponse.Builder responseBuilder = OrchestratorService.ActivityResponse
125+
.newBuilder()
126+
.setInstanceId(activityRequest.getOrchestrationInstance().getInstanceId())
127+
.setTaskId(activityRequest.getTaskId())
128+
.setCompletionToken(workItem.getCompletionToken());
129+
130+
if (output != null) {
131+
responseBuilder.setResult(StringValue.of(output));
132+
}
133+
134+
if (failureDetails != null) {
135+
responseBuilder.setFailureDetails(failureDetails);
136+
}
137+
138+
try {
139+
this.sidecarClient.completeActivityTask(responseBuilder.build());
140+
} catch (StatusRuntimeException e) {
141+
if (span != null) {
142+
span.setStatus(StatusCode.ERROR, "Failed to complete activity task");
143+
span.recordException(e);
144+
}
145+
if (e.getStatus().getCode() == Status.Code.UNAVAILABLE) {
146+
logger.log(Level.WARNING,
147+
"The sidecar at address {0} is unavailable while completing the activity task.",
148+
this.sidecarClient.getChannel().authority());
149+
} else if (e.getStatus().getCode() == Status.Code.CANCELLED) {
150+
logger.log(Level.WARNING,
151+
"Durable Task worker has disconnected from {0} while completing the activity task.",
152+
this.sidecarClient.getChannel().authority());
153+
} else {
154+
logger.log(Level.WARNING, "Unexpected failure completing the activity task at {0}.",
155+
this.sidecarClient.getChannel().authority());
156+
}
157+
}
158+
}
159+
160+
private Context extractTraceContext() {
161+
if (!activityRequest.hasParentTraceContext()) {
162+
return Context.current();
163+
}
164+
165+
OrchestratorService.TraceContext traceContext = activityRequest.getParentTraceContext();
166+
String traceParent = traceContext.getTraceParent();
167+
168+
if (traceParent.isEmpty()) {
169+
return Context.current();
170+
}
171+
172+
Map<String, String> carrier = new HashMap<>();
173+
carrier.put("traceparent", traceParent);
174+
if (traceContext.hasTraceState()) {
175+
carrier.put("tracestate", traceContext.getTraceState().getValue());
176+
}
177+
178+
TextMapGetter<Map<String, String>> getter = new TextMapGetter<>() {
179+
@Override
180+
public Iterable<String> keys(Map<String, String> carrier) {
181+
return carrier.keySet();
182+
}
183+
184+
@Override
185+
public String get(Map<String, String> carrier, String key) {
186+
return carrier.get(key);
187+
}
188+
};
189+
190+
return W3CTraceContextPropagator.getInstance()
191+
.extract(Context.current(), carrier, getter);
192+
}
193+
}

durabletask-client/src/main/java/io/dapr/durabletask/util/TraceUtils.java

Lines changed: 0 additions & 5 deletions
This file was deleted.

0 commit comments

Comments
 (0)