Skip to content

Commit c2a0b5c

Browse files
authored
feat: Add graceful shutdown (#366)
* feat: Add graceful shutdown * chore: Bump bytes crate to 1.11.1 Fixes RUSTSEC-2026-0007. * chore: Bump git2 crate to 0.20.4 Fixes RUSTSEC-2026-0008. * chore: Add changelog entry
1 parent 837881f commit c2a0b5c

File tree

7 files changed

+67
-53
lines changed

7 files changed

+67
-53
lines changed

CHANGELOG.md

Lines changed: 2 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -12,6 +12,7 @@ All notable changes to this project will be documented in this file.
1212

1313
### Changed
1414

15+
- Gracefully shutdown all concurrent tasks by forwarding the SIGTERM signal ([#366]).
1516
- OLM deployer doesn't add owner references to cluster scoped objects anymore ([#360]).
1617
Owner references ensure that objects are garbage collected by OpenShift upon operator removal but they cause problems when the operator is updated.
1718
This means that cluster wide objects are not removed anymore when the operator is uninstalled.
@@ -22,6 +23,7 @@ All notable changes to this project will be documented in this file.
2223
[#363]: https://github.com/stackabletech/listener-operator/pull/363
2324
[#364]: https://github.com/stackabletech/listener-operator/pull/364
2425
[#365]: https://github.com/stackabletech/listener-operator/pull/365
26+
[#366]: https://github.com/stackabletech/listener-operator/pull/366
2527

2628
## [25.11.0] - 2025-11-07
2729

Cargo.lock

Lines changed: 14 additions & 14 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.nix

Lines changed: 23 additions & 23 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 4 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -10,7 +10,10 @@ edition = "2021"
1010
repository = "https://github.com/stackabletech/listener-operator"
1111

1212
[workspace.dependencies]
13-
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.104.0", features = ["telemetry", "versioned"] }
13+
stackable-operator = { git = "https://github.com/stackabletech/operator-rs.git", tag = "stackable-operator-0.105.0", features = [
14+
"telemetry",
15+
"versioned",
16+
] }
1417

1518
anyhow = "1.0"
1619
built = { version = "0.8", features = ["chrono", "git2"] }

crate-hashes.json

Lines changed: 7 additions & 7 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

rust/operator-binary/src/listener_controller.rs

Lines changed: 6 additions & 2 deletions
Original file line numberDiff line numberDiff line change
@@ -1,5 +1,6 @@
11
use std::{
22
collections::{BTreeMap, BTreeSet},
3+
future::Future,
34
sync::Arc,
45
};
56

@@ -48,7 +49,10 @@ const OPERATOR_NAME: &str = "listeners.stackable.tech";
4849
const CONTROLLER_NAME: &str = "listener";
4950
pub const FULL_CONTROLLER_NAME: &str = concatcp!(CONTROLLER_NAME, '.', OPERATOR_NAME);
5051

51-
pub async fn run(client: stackable_operator::client::Client) {
52+
pub async fn run<F>(client: stackable_operator::client::Client, shutdown_signal: F)
53+
where
54+
F: Future<Output = ()> + Send + Sync + 'static,
55+
{
5256
let controller = controller::Controller::new(
5357
client.get_all_api::<DeserializeGuard<listener::v1alpha1::Listener>>(),
5458
watcher::Config::default(),
@@ -119,7 +123,7 @@ pub async fn run(client: stackable_operator::client::Client) {
119123
})
120124
},
121125
)
122-
.shutdown_on_signal()
126+
.graceful_shutdown_on(shutdown_signal)
123127
.run(
124128
reconcile,
125129
error_policy,

rust/operator-binary/src/main.rs

Lines changed: 11 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -23,8 +23,8 @@ use stackable_operator::{
2323
eos::EndOfSupportChecker,
2424
shared::yaml::SerializeOptions,
2525
telemetry::Tracing,
26+
utils::signal::SignalWatcher,
2627
};
27-
use tokio::signal::unix::{SignalKind, signal};
2828
use tokio_stream::wrappers::UnixListenerStream;
2929
use tonic::transport::Server;
3030
use utils::unix_stream::{TonicUnixStream, uds_bind_private};
@@ -116,9 +116,13 @@ async fn main() -> anyhow::Result<()> {
116116
description = built_info::PKG_DESCRIPTION
117117
);
118118

119+
// Watches for the SIGTERM signal and sends a signal to all receivers, which gracefully
120+
// shuts down all concurrent tasks below (EoS checker, controller).
121+
let sigterm_watcher = SignalWatcher::sigterm()?;
122+
119123
let eos_checker =
120124
EndOfSupportChecker::new(built_info::BUILT_TIME_UTC, maintenance.end_of_support)?
121-
.run()
125+
.run(sigterm_watcher.handle())
122126
.map(anyhow::Ok);
123127

124128
let client = stackable_operator::client::initialize_operator(
@@ -134,9 +138,9 @@ async fn main() -> anyhow::Result<()> {
134138
let _ = std::fs::remove_file(&csi_endpoint);
135139
}
136140

137-
let mut sigterm = signal(SignalKind::terminate())?;
138141
let csi_listener =
139142
UnixListenerStream::new(uds_bind_private(csi_endpoint)?).map_ok(TonicUnixStream);
143+
140144
let csi_server = Server::builder()
141145
.add_service(
142146
tonic_reflection::server::Builder::configure()
@@ -152,9 +156,10 @@ async fn main() -> anyhow::Result<()> {
152156
.add_service(ControllerServer::new(ListenerOperatorController {
153157
client: client.clone(),
154158
}))
155-
.serve_with_incoming_shutdown(csi_listener, sigterm.recv().map(|_| ()))
159+
.serve_with_incoming_shutdown(csi_listener, sigterm_watcher.handle())
156160
.map_err(|err| anyhow!(err).context("failed to run csi server"));
157-
let controller = listener_controller::run(client).map(anyhow::Ok);
161+
let controller =
162+
listener_controller::run(client, sigterm_watcher.handle()).map(anyhow::Ok);
158163

159164
futures::try_join!(csi_server, controller, eos_checker)?;
160165
}
@@ -165,7 +170,7 @@ async fn main() -> anyhow::Result<()> {
165170
client: client.clone(),
166171
node_name: node_name.to_owned(),
167172
}))
168-
.serve_with_incoming_shutdown(csi_listener, sigterm.recv().map(|_| ()))
173+
.serve_with_incoming_shutdown(csi_listener, sigterm_watcher.handle())
169174
.map_err(|err| anyhow!(err).context("failed to run csi server"));
170175

171176
futures::try_join!(csi_server, eos_checker)?;

0 commit comments

Comments
 (0)