Skip to content

Commit 791f38c

Browse files
guidobreitoddbaert
andauthored
fix(flagd): improve stream observer, refine retry policy; don't use retry to avoid busy loop (#1590)
Signed-off-by: Guido Breitenhuber <[email protected]> Signed-off-by: Todd Baert <[email protected]> Co-authored-by: Todd Baert <[email protected]>
1 parent 8fe9fcc commit 791f38c

File tree

10 files changed

+259
-203
lines changed

10 files changed

+259
-203
lines changed

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/Config.java

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -14,6 +14,7 @@ public final class Config {
1414
static final String DEFAULT_HOST = "localhost";
1515

1616
static final int DEFAULT_DEADLINE = 500;
17+
static final int DEFAULT_MAX_RETRY_BACKOFF_MS = 12000;
1718
static final int DEFAULT_STREAM_DEADLINE_MS = 10 * 60 * 1000;
1819
static final int DEFAULT_STREAM_RETRY_GRACE_PERIOD = 5;
1920
static final int DEFAULT_MAX_CACHE_SIZE = 1000;
@@ -31,6 +32,7 @@ public final class Config {
3132
static final String MAX_CACHE_SIZE_ENV_VAR_NAME = "FLAGD_MAX_CACHE_SIZE";
3233
static final String BASE_EVENT_STREAM_RETRY_BACKOFF_MS_ENV_VAR_NAME = "FLAGD_RETRY_BACKOFF_MS";
3334
static final String DEADLINE_MS_ENV_VAR_NAME = "FLAGD_DEADLINE_MS";
35+
static final String FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME = "FLAGD_RETRY_BACKOFF_MAX_MS";
3436
static final String STREAM_DEADLINE_MS_ENV_VAR_NAME = "FLAGD_STREAM_DEADLINE_MS";
3537
static final String SOURCE_SELECTOR_ENV_VAR_NAME = "FLAGD_SOURCE_SELECTOR";
3638
/**

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/FlagdOptions.java

Lines changed: 9 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -98,10 +98,18 @@ public class FlagdOptions {
9898
@Builder.Default
9999
private int deadline = fallBackToEnvOrDefault(Config.DEADLINE_MS_ENV_VAR_NAME, Config.DEFAULT_DEADLINE);
100100

101+
/**
102+
* Max stream retry backoff in milliseconds.
103+
*/
104+
@Builder.Default
105+
private int retryBackoffMaxMs =
106+
fallBackToEnvOrDefault(Config.FLAGD_RETRY_BACKOFF_MAX_MS_VAR_NAME, Config.DEFAULT_MAX_RETRY_BACKOFF_MS);
107+
101108
/**
102109
* Streaming connection deadline in milliseconds.
103110
* Set to 0 to disable the deadline.
104-
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from killing idle connections.
111+
* Defaults to 600000 (10 minutes); recommended to prevent infrastructure from
112+
* killing idle connections.
105113
*/
106114
@Builder.Default
107115
private int streamDeadlineMs =

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelBuilder.java

Lines changed: 48 additions & 58 deletions
Original file line numberDiff line numberDiff line change
@@ -24,67 +24,57 @@
2424
import javax.net.ssl.SSLException;
2525

2626
/** gRPC channel builder helper. */
27+
@SuppressFBWarnings(value = "SE_BAD_FIELD", justification = "we don't care to serialize this")
2728
public class ChannelBuilder {
28-
2929
/**
3030
* Controls retry (not-reconnection) policy for failed RPCs.
3131
*/
3232
@SuppressWarnings({"unchecked", "rawtypes"})
33-
static final Map<String, ?> SERVICE_CONFIG_WITH_RETRY = new HashMap() {
34-
{
35-
put("methodConfig", Arrays.asList(new HashMap() {
36-
{
37-
put(
38-
"name",
39-
Arrays.asList(
40-
new HashMap() {
41-
{
42-
put("service", "flagd.sync.v1.FlagSyncService");
43-
}
44-
},
45-
new HashMap() {
46-
{
47-
put("service", "flagd.evaluation.v1.Service");
48-
}
49-
}));
50-
put("retryPolicy", new HashMap() {
51-
{
52-
// 1 + 2 + 4
53-
put("maxAttempts", 3.0); // types used here are important, need to be doubles
54-
put("initialBackoff", "1s");
55-
put("maxBackoff", "5s");
56-
put("backoffMultiplier", 2.0);
57-
// status codes to retry on:
58-
put(
59-
"retryableStatusCodes",
60-
Arrays.asList(
61-
/*
62-
* All codes are retryable except OK and DEADLINE_EXCEEDED since
63-
* any others not listed here cause a very tight loop of retries.
64-
* DEADLINE_EXCEEDED is typically a result of a client specified deadline,
65-
* and definitionally should not result in a tight loop (it's a timeout).
66-
*/
67-
Code.CANCELLED.toString(),
68-
Code.UNKNOWN.toString(),
69-
Code.INVALID_ARGUMENT.toString(),
70-
Code.NOT_FOUND.toString(),
71-
Code.ALREADY_EXISTS.toString(),
72-
Code.PERMISSION_DENIED.toString(),
73-
Code.RESOURCE_EXHAUSTED.toString(),
74-
Code.FAILED_PRECONDITION.toString(),
75-
Code.ABORTED.toString(),
76-
Code.OUT_OF_RANGE.toString(),
77-
Code.UNIMPLEMENTED.toString(),
78-
Code.INTERNAL.toString(),
79-
Code.UNAVAILABLE.toString(),
80-
Code.DATA_LOSS.toString(),
81-
Code.UNAUTHENTICATED.toString()));
82-
}
83-
});
84-
}
85-
}));
86-
}
87-
};
33+
static Map<String, ?> buildRetryPolicy(final FlagdOptions options) {
34+
return new HashMap() {
35+
{
36+
put("methodConfig", Arrays.asList(new HashMap() {
37+
{
38+
put(
39+
"name",
40+
Arrays.asList(
41+
new HashMap() {
42+
{
43+
put("service", "flagd.sync.v1.FlagSyncService");
44+
}
45+
},
46+
new HashMap() {
47+
{
48+
put("service", "flagd.evaluation.v1.Service");
49+
}
50+
}));
51+
put("retryPolicy", new HashMap() {
52+
{
53+
// 1 + 2 + 4
54+
put("maxAttempts", 3.0); // types used here are important, need to be doubles
55+
put("initialBackoff", "1s");
56+
put(
57+
"maxBackoff",
58+
options.getRetryBackoffMaxMs() >= 1000
59+
? String.format("%ds", options.getRetryBackoffMaxMs() / 1000)
60+
: "1s");
61+
put("backoffMultiplier", 2.0);
62+
// status codes to retry on:
63+
put(
64+
"retryableStatusCodes",
65+
Arrays.asList(
66+
/*
67+
* As per gRPC spec, the following status codes are safe to retry:
68+
* UNAVAILABLE, UNKNOWN,
69+
*/
70+
Code.UNKNOWN.toString(), Code.UNAVAILABLE.toString()));
71+
}
72+
});
73+
}
74+
}));
75+
}
76+
};
77+
}
8878

8979
private ChannelBuilder() {}
9080

@@ -109,7 +99,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
10999
.eventLoopGroup(new MultiThreadIoEventLoopGroup(EpollIoHandler.newFactory()))
110100
.channelType(EpollDomainSocketChannel.class)
111101
.usePlaintext()
112-
.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
102+
.defaultServiceConfig(buildRetryPolicy(options))
113103
.enableRetry()
114104
.build();
115105
}
@@ -155,7 +145,7 @@ public static ManagedChannel nettyChannel(final FlagdOptions options) {
155145
builder.intercept(new FlagdGrpcInterceptor(options.getOpenTelemetry()));
156146
}
157147

158-
return builder.defaultServiceConfig(SERVICE_CONFIG_WITH_RETRY)
148+
return builder.defaultServiceConfig(buildRetryPolicy(options))
159149
.enableRetry()
160150
.build();
161151
} catch (SSLException ssle) {

providers/flagd/src/main/java/dev/openfeature/contrib/providers/flagd/resolver/common/ChannelConnector.java

Lines changed: 1 addition & 17 deletions
Original file line numberDiff line numberDiff line change
@@ -5,8 +5,6 @@
55
import dev.openfeature.sdk.ProviderEvent;
66
import io.grpc.ConnectivityState;
77
import io.grpc.ManagedChannel;
8-
import io.grpc.stub.AbstractBlockingStub;
9-
import io.grpc.stub.AbstractStub;
108
import java.util.Collections;
119
import java.util.concurrent.TimeUnit;
1210
import java.util.function.Consumer;
@@ -16,12 +14,9 @@
1614
/**
1715
* A generic GRPC connector that manages connection states, reconnection logic, and event streaming for
1816
* GRPC services.
19-
*
20-
* @param <T> the type of the asynchronous stub for the GRPC service
21-
* @param <K> the type of the blocking stub for the GRPC service
2217
*/
2318
@Slf4j
24-
public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlockingStub<K>> {
19+
public class ChannelConnector {
2520

2621
/**
2722
* The GRPC managed channel for managing the underlying GRPC connection.
@@ -48,22 +43,11 @@ public class ChannelConnector<T extends AbstractStub<T>, K extends AbstractBlock
4843
*/
4944
public ChannelConnector(
5045
final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent, ManagedChannel channel) {
51-
5246
this.channel = channel;
5347
this.deadline = options.getDeadline();
5448
this.onConnectionEvent = onConnectionEvent;
5549
}
5650

57-
/**
58-
* Constructs a {@code ChannelConnector} instance for testing purposes.
59-
*
60-
* @param options the configuration options for the GRPC connection
61-
* @param onConnectionEvent a consumer to handle connection events
62-
*/
63-
public ChannelConnector(final FlagdOptions options, final Consumer<FlagdProviderEvent> onConnectionEvent) {
64-
this(options, onConnectionEvent, ChannelBuilder.nettyChannel(options));
65-
}
66-
6751
/**
6852
* Initializes the GRPC connection by waiting for the channel to be ready and monitoring its state.
6953
*

0 commit comments

Comments
 (0)