Skip to content

Commit 1fa9440

Browse files
authored
add retry possibilities to redap client and retry on connection establishment
### What Adding a simple and conservative retry logic to redap client. Using it on the connection establishment path for now, only of native, as this is the most common error we saw during concurrent readers testing. Code is mostly copy-paste from chunk scanner client in dataplatform. ``re_backoff`` is a clone of ``redap_backoff``. Can start using it on the dataplatform side after I merge this. Source-Ref: dc97f1937db58b3c84f10c4faa3b84bb8d1d4398
1 parent f050460 commit 1fa9440

File tree

11 files changed

+303
-18
lines changed

11 files changed

+303
-18
lines changed

ARCHITECTURE.md

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -225,6 +225,7 @@ Update instructions:
225225
| re_arrow_combinators | Type-safe, composable transformations for Arrow arrays |
226226
| re_arrow_util | Helpers for working with arrow |
227227
| re_auth | Authentication and authorization helpers |
228+
| re_backoff | Simple backoff logic used for retries. |
228229
| re_byte_size | Calculate the heap-allocated size of values at runtime |
229230
| re_capabilities | Capability tokens |
230231
| re_case | Case conversions, the way Rerun likes them |

Cargo.lock

Lines changed: 9 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -8429,6 +8429,14 @@ dependencies = [
84298429
"webbrowser",
84308430
]
84318431

8432+
[[package]]
8433+
name = "re_backoff"
8434+
version = "0.30.0-alpha.1+dev"
8435+
dependencies = [
8436+
"rand 0.9.2",
8437+
"tokio",
8438+
]
8439+
84328440
[[package]]
84338441
name = "re_blueprint_tree"
84348442
version = "0.30.0-alpha.1+dev"
@@ -9454,6 +9462,7 @@ dependencies = [
94549462
"jiff",
94559463
"re_arrow_util",
94569464
"re_auth",
9465+
"re_backoff",
94579466
"re_byte_size",
94589467
"re_chunk",
94599468
"re_format",

Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -100,6 +100,7 @@ rerun-cli = { path = "crates/top/rerun-cli", version = "=0.30.0-alpha.1", defaul
100100
re_analytics = { path = "crates/utils/re_analytics", version = "=0.30.0-alpha.1", default-features = false }
101101
re_arrow_util = { path = "crates/utils/re_arrow_util", version = "=0.30.0-alpha.1", default-features = false }
102102
re_auth = { path = "crates/utils/re_auth", version = "=0.30.0-alpha.1", default-features = false }
103+
re_backoff = { path = "crates/utils/re_backoff", version = "=0.30.0-alpha.1", default-features = false }
103104
re_byte_size = { path = "crates/utils/re_byte_size", version = "=0.30.0-alpha.1", default-features = false }
104105
re_capabilities = { path = "crates/utils/re_capabilities", version = "=0.30.0-alpha.1", default-features = false }
105106
re_case = { path = "crates/utils/re_case", version = "=0.30.0-alpha.1", default-features = false }

crates/store/re_redap_client/Cargo.toml

Lines changed: 1 addition & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -33,6 +33,7 @@ all-features = true
3333
[dependencies]
3434
re_arrow_util.workspace = true
3535
re_auth = { workspace = true, features = ["oauth"] }
36+
re_backoff.workspace = true
3637
re_byte_size.workspace = true
3738
re_chunk.workspace = true
3839
re_format.workspace = true

crates/store/re_redap_client/src/connection_registry.rs

Lines changed: 16 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -373,21 +373,24 @@ impl ConnectionRegistryHandle {
373373
// so if what we're trying to connect to is not a valid Rerun server, then cut out
374374
// a layer of noise:
375375
{
376-
let res = match ehttp::fetch_async(ehttp::Request::get(format!(
377-
"{}/version",
378-
origin.as_url()
379-
)))
380-
.await
381-
{
382-
Ok(res) => res,
383-
Err(err) => {
384-
let mut msg = format!("failed to connect to server '{origin}': {err}");
385-
if let Some(suggested) = suggest_api_prefix(&origin) {
386-
msg.push_str(&format!(". Did you mean '{suggested}'?"));
376+
let res = crate::with_retry("http_version_fetch", || async {
377+
match ehttp::fetch_async(ehttp::Request::get(format!(
378+
"{}/version",
379+
origin.as_url()
380+
)))
381+
.await
382+
{
383+
Ok(res) => Ok(res),
384+
Err(err) => {
385+
let mut msg = format!("failed to connect to server '{origin}': {err}");
386+
if let Some(suggested) = suggest_api_prefix(&origin) {
387+
msg.push_str(&format!(". Did you mean '{suggested}'?"));
388+
}
389+
Err(ApiError::connection(msg))
387390
}
388-
return Err(ApiError::connection(msg));
389391
}
390-
};
392+
})
393+
.await?;
391394

392395
if !res.ok {
393396
let hint = suggest_api_prefix(&origin).map(|suggested| {

crates/store/re_redap_client/src/grpc.rs

Lines changed: 8 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -130,7 +130,10 @@ pub(crate) async fn client(
130130
origin: Origin,
131131
credentials: Option<Arc<dyn re_auth::credentials::CredentialsProvider + Send + Sync + 'static>>,
132132
) -> ApiResult<RedapClient> {
133-
let channel = channel(origin).await?;
133+
let channel = crate::with_retry("redap_connection", || async {
134+
channel(origin.clone()).await
135+
})
136+
.await?;
134137

135138
let middlewares = tower::ServiceBuilder::new()
136139
.layer(AuthDecorator::new(credentials))
@@ -182,7 +185,10 @@ pub(crate) async fn client(
182185
origin: Origin,
183186
credentials: Option<Arc<dyn re_auth::credentials::CredentialsProvider + Send + Sync + 'static>>,
184187
) -> ApiResult<RedapClient> {
185-
let channel = channel(origin).await?;
188+
let channel = crate::with_retry("redap_connection", || async {
189+
channel(origin.clone()).await
190+
})
191+
.await?;
186192

187193
let middlewares = tower::ServiceBuilder::new()
188194
.layer(AuthDecorator::new(credentials))

crates/store/re_redap_client/src/lib.rs

Lines changed: 74 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -104,6 +104,7 @@ pub enum ApiErrorKind {
104104
Timeout,
105105
Internal,
106106
InvalidArguments,
107+
ResourcesExhausted,
107108
Serialization,
108109
InvalidServer,
109110
}
@@ -114,6 +115,7 @@ impl From<tonic::Code> for ApiErrorKind {
114115
tonic::Code::NotFound => Self::NotFound,
115116
tonic::Code::AlreadyExists => Self::AlreadyExists,
116117
tonic::Code::PermissionDenied => Self::PermissionDenied,
118+
tonic::Code::ResourceExhausted => Self::ResourcesExhausted,
117119
tonic::Code::Unauthenticated => Self::Unauthenticated,
118120
tonic::Code::Unimplemented => Self::Unimplemented,
119121
tonic::Code::Unavailable => Self::Connection,
@@ -135,6 +137,7 @@ impl std::fmt::Display for ApiErrorKind {
135137
Self::Connection => write!(f, "Connection"),
136138
Self::Internal => write!(f, "Internal"),
137139
Self::InvalidArguments => write!(f, "InvalidArguments"),
140+
Self::ResourcesExhausted => write!(f, "ResourcesExhausted"),
138141
Self::Serialization => write!(f, "Serialization"),
139142
Self::Timeout => write!(f, "Timeout"),
140143
Self::InvalidServer => write!(f, "InvalidServer"),
@@ -282,3 +285,74 @@ impl std::error::Error for ApiError {
282285
.map(|e| e as &(dyn std::error::Error + 'static))
283286
}
284287
}
288+
289+
/// Helper function for executing requests or connection attempts with retries.
290+
#[tracing::instrument(skip(f), level = "debug")]
291+
pub async fn with_retry<T, F, Fut>(req_name: &str, f: F) -> ApiResult<T>
292+
where
293+
F: Fn() -> Fut,
294+
Fut: Future<Output = ApiResult<T>>,
295+
{
296+
// targeting to have all retries finish under ~5 seconds
297+
const MAX_ATTEMPTS: usize = 5;
298+
299+
// 100 200 400 800 1600
300+
let mut backoff_gen = re_backoff::BackoffGenerator::new(
301+
std::time::Duration::from_millis(100),
302+
std::time::Duration::from_secs(3),
303+
)
304+
.expect("base is less than max");
305+
306+
let mut attempts = 1;
307+
let mut last_retryable_err = None;
308+
309+
while attempts <= MAX_ATTEMPTS {
310+
let res = f().await;
311+
312+
match res {
313+
Err(err)
314+
if matches!(
315+
err.kind,
316+
ApiErrorKind::Connection
317+
| ApiErrorKind::Timeout
318+
| ApiErrorKind::Internal
319+
| ApiErrorKind::ResourcesExhausted
320+
) =>
321+
{
322+
last_retryable_err = Some(err);
323+
let backoff = backoff_gen.gen_next();
324+
325+
tracing::debug!(
326+
attempts,
327+
max_attempts = MAX_ATTEMPTS,
328+
?backoff,
329+
"{req_name} failed with retryable gRPC error, retrying after backoff"
330+
);
331+
332+
backoff.sleep().await;
333+
}
334+
Err(err) => {
335+
tracing::error!(
336+
attempts,
337+
"{req_name} failed with non-retryable error: {err}"
338+
);
339+
return Err(err);
340+
}
341+
342+
Ok(value) => {
343+
tracing::debug!(attempts, "{req_name} succeeded");
344+
return Ok(value);
345+
}
346+
}
347+
348+
attempts += 1;
349+
}
350+
351+
tracing::error!(
352+
attempts,
353+
max_attempts = MAX_ATTEMPTS,
354+
"{req_name} failed after max retries, giving up"
355+
);
356+
357+
Err(last_retryable_err.expect("bug: this should not be None if we reach here"))
358+
}

crates/utils/re_backoff/Cargo.toml

Lines changed: 28 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,28 @@
1+
[package]
2+
name = "re_backoff"
3+
authors.workspace = true
4+
description = "Utilities for retrying operations with backoff."
5+
edition.workspace = true
6+
homepage.workspace = true
7+
include.workspace = true
8+
license.workspace = true
9+
publish = true
10+
readme = "README.md"
11+
repository.workspace = true
12+
rust-version.workspace = true
13+
version.workspace = true
14+
15+
[lints]
16+
workspace = true
17+
18+
[package.metadata.docs.rs]
19+
all-features = true
20+
21+
22+
[dependencies]
23+
# External
24+
rand.workspace = true
25+
tokio = { workspace = true, features = ["time"] }
26+
27+
28+
[dev-dependencies]

crates/utils/re_backoff/README.md

Lines changed: 14 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,14 @@
1+
# re_backoff
2+
3+
Part of the [`rerun`](https://github.com/rerun-io/rerun) family of crates.
4+
5+
[![Latest version](https://img.shields.io/crates/v/re_arrow_combinators.svg)](https://crates.io/crates/re_arrow_combinators)
6+
[![Documentation](https://docs.rs/re_arrow_combinators/badge.svg)](https://docs.rs/re_arrow_combinators)
7+
![MIT](https://img.shields.io/badge/license-MIT-blue.svg)
8+
![Apache](https://img.shields.io/badge/license-Apache-blue.svg)
9+
10+
Implements utility code to help with backoff and retry logic.
11+
12+
### Why not use existing traits like backoff, backon, tokio-retry2, or tower(retry)?
13+
14+
The code is small and simple, that it feels unnecessary to add an external dependency for it. We should re-evaluate should this become ever-complicated.

0 commit comments

Comments
 (0)