Skip to content

Commit 19d3355

Browse files
committed
pollinator: Reload configuration after notify
1 parent 9702e25 commit 19d3355

File tree

6 files changed

+63
-18
lines changed

6 files changed

+63
-18
lines changed

Cargo.lock

Lines changed: 0 additions & 1 deletion
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

pollinator/src/bin/pollinator.rs

Lines changed: 44 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -15,8 +15,8 @@ use pollinator::CommLinkCfg;
1515
use resin::{Database, Error, Result};
1616
use std::collections::HashMap;
1717
use tokio::task::{AbortHandle, JoinSet};
18-
use tokio::time::{Duration, interval};
19-
use tokio_stream::StreamExt;
18+
use tokio::time::Duration;
19+
use tokio_postgres::Notification;
2020

2121
/// Command-line arguments
2222
#[derive(FromArgs)]
@@ -84,34 +84,71 @@ async fn poll_comm_links(db: Database) -> Result<()> {
8484
Some(cfg) => {
8585
let changed = *cfg != task.cfg;
8686
if changed {
87+
log::info!("{name}: changed, aborting");
8788
// comm link changed: abort task
8889
task.handle.abort();
8990
}
9091
!changed
9192
}
9293
None => {
94+
log::info!("{name}: removed, aborting");
9395
// no comm link: abort task
9496
task.handle.abort();
9597
false
9698
}
9799
}
98100
});
99101
// spawn tasks for new comm links
100-
for cfg in cfgs {
102+
for cfg in &cfgs {
101103
if !tasks.contains_key(cfg.name()) {
102104
let name = cfg.name().to_string();
103105
let db = Some(db.clone());
106+
log::info!("{name}: spawning");
104107
let handle = set.spawn(cfg.clone().run(db));
105-
let task = CommLinkTask { cfg, handle };
108+
let task = CommLinkTask {
109+
cfg: cfg.clone(),
110+
handle,
111+
};
106112
tasks.insert(name, task);
107113
}
108114
}
109-
let _not = stream.next().await;
110-
// FIXME: check notification channel / payload
115+
while let Some(not) = stream.recv().await {
116+
if should_reload(&not, &cfgs) {
117+
log::info!(
118+
"{} {}: reloading configuration",
119+
not.channel(),
120+
not.payload()
121+
);
122+
tokio::time::sleep(Duration::from_secs(2)).await;
123+
// Empty the notification stream
124+
while !stream.is_empty() {
125+
stream.recv().await;
126+
}
127+
break;
128+
}
129+
}
111130
}
112131
Ok(())
113132
}
114133

134+
/// Check if a notification should trigger reloading the configuration
135+
fn should_reload(not: &Notification, cfgs: &[CommLinkCfg]) -> bool {
136+
// FIXME: comm_link "connected" / controller "fail_time" should not reload
137+
match (not.channel(), not.payload()) {
138+
("comm_config", _) => true,
139+
("comm_link", nm) => {
140+
nm.is_empty() || cfgs.iter().any(|c| c.name() == nm)
141+
}
142+
("controller", nm) => {
143+
nm.is_empty() || cfgs.iter().any(|c| c.controller() == nm)
144+
}
145+
("detector", nm) => {
146+
nm.is_empty() || cfgs.iter().any(|c| c.has_detector(nm))
147+
}
148+
_ => false,
149+
}
150+
}
151+
115152
/// Main entry point
116153
#[tokio::main]
117154
async fn main() -> Result<()> {
@@ -124,9 +161,6 @@ async fn main() -> Result<()> {
124161
let db = Database::new("tms").await?;
125162
loop {
126163
poll_comm_links(db.clone()).await?;
127-
let mut ticker = interval(Duration::from_secs(60));
128-
// apparently, the first tick completes immediately
129-
ticker.tick().await;
130-
ticker.tick().await;
164+
tokio::time::sleep(Duration::from_secs(60)).await;
131165
}
132166
}

pollinator/src/comm_link.rs

Lines changed: 10 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -158,6 +158,11 @@ impl CommLinkCfg {
158158
self.long_per_s
159159
}
160160

161+
/// Get controller
162+
pub fn controller(&self) -> &str {
163+
&self.controller
164+
}
165+
161166
/// Set user name
162167
pub fn with_user(mut self, user: &str) -> Self {
163168
self.user = Some(user.to_string());
@@ -180,6 +185,11 @@ impl CommLinkCfg {
180185
self.password.as_deref()
181186
}
182187

188+
/// Check if configuration has a detector
189+
pub fn has_detector(&self, det_id: &str) -> bool {
190+
self.detectors.iter().any(|d| d == det_id)
191+
}
192+
183193
/// Make detector hashmap
184194
pub fn make_detectors(&self) -> HashMap<usize, &str> {
185195
let mut detectors = HashMap::new();

resin/Cargo.toml

Lines changed: 0 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -8,7 +8,6 @@ edition.workspace = true
88
[dependencies]
99
bb8 = { workspace = true }
1010
bb8-postgres = { workspace = true }
11-
futures = { workspace = true }
1211
hyper = { version = "1.8", features = ["client", "http1"] }
1312
jiff = "0.2.16"
1413
log = { workspace = true }

resin/src/notify.rs

Lines changed: 5 additions & 6 deletions
Original file line numberDiff line numberDiff line change
@@ -14,13 +14,13 @@
1414
//
1515
use crate::database::Database;
1616
use crate::error::Result;
17-
use futures::Stream;
1817
use std::pin::Pin;
1918
use std::task::{Context, Poll};
20-
use tokio::sync::mpsc::{UnboundedSender, unbounded_channel};
19+
use tokio::sync::mpsc::{
20+
UnboundedReceiver, UnboundedSender, unbounded_channel,
21+
};
2122
use tokio_postgres::tls::NoTlsStream;
2223
use tokio_postgres::{AsyncMessage, Client, Connection, Notification, Socket};
23-
use tokio_stream::wrappers::UnboundedReceiverStream;
2424

2525
/// Handler for Postgres NOTIFY
2626
pub struct Notifier {
@@ -77,14 +77,13 @@ impl Database {
7777
pub async fn notifier(
7878
self,
7979
channels: impl Iterator<Item = &str>,
80-
) -> Result<(Notifier, impl Stream<Item = Notification> + Unpin)> {
80+
) -> Result<(Notifier, UnboundedReceiver<Notification>)> {
8181
let (client, conn) = self.dedicated_client().await?;
8282
let (tx, rx) = unbounded_channel();
8383
for channel in channels {
8484
client.execute(&format!("LISTEN {channel}"), &[]).await?;
8585
}
8686
let not = Notifier { client, conn, tx };
87-
let stream = Box::pin(UnboundedReceiverStream::new(rx));
88-
Ok((not, stream))
87+
Ok((not, rx))
8988
}
9089
}

sql/tms-template.sql

Lines changed: 4 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -1343,6 +1343,8 @@ CREATE TABLE iris.comm_link (
13431343
CREATE FUNCTION iris.comm_link_notify() RETURNS TRIGGER AS
13441344
$comm_link_notify$
13451345
BEGIN
1346+
-- FIXME: make connected secondary and
1347+
-- add another channel for connected changes?
13461348
-- all attributes are primary
13471349
NOTIFY comm_link;
13481350
RETURN NULL; -- AFTER trigger return is ignored
@@ -1436,6 +1438,8 @@ CREATE UNIQUE INDEX ctrl_link_drop_idx ON iris.controller
14361438
CREATE FUNCTION iris.controller_notify() RETURNS TRIGGER AS
14371439
$controller_notify$
14381440
BEGIN
1441+
-- FIXME: make fail_time secondary and
1442+
-- add another channel for fail_time changes?
14391443
IF (NEW.drop_id IS DISTINCT FROM OLD.drop_id) OR
14401444
(NEW.comm_link IS DISTINCT FROM OLD.comm_link) OR
14411445
(NEW.cabinet_style IS DISTINCT FROM OLD.cabinet_style) OR

0 commit comments

Comments
 (0)