Skip to content

Commit d07c841

Browse files
committed
pollinator: Added Receiver struct
1 parent 19d3355 commit d07c841

File tree

2 files changed

+48
-23
lines changed

2 files changed

+48
-23
lines changed

pollinator/src/bin/pollinator.rs

Lines changed: 10 additions & 10 deletions
Original file line numberDiff line numberDiff line change
@@ -65,15 +65,15 @@ struct CommLinkTask {
6565

6666
/// Poll comm links
6767
async fn poll_comm_links(db: Database) -> Result<()> {
68-
let (notifier, mut stream) = db
69-
.clone()
70-
.notifier(
71-
["comm_config", "comm_link", "controller", "detector"].into_iter(),
72-
)
73-
.await?;
68+
let (notifier, mut receiver) = db.clone().notifier().await?;
7469
let mut tasks: HashMap<String, CommLinkTask> = HashMap::new();
7570
let mut set = JoinSet::new();
7671
set.spawn(notifier.run());
72+
receiver
73+
.listen(
74+
["comm_config", "comm_link", "controller", "detector"].into_iter(),
75+
)
76+
.await?;
7777
loop {
7878
let cfgs = CommLinkCfg::lookup_all(db.clone()).await?;
7979
if cfgs.is_empty() {
@@ -112,17 +112,17 @@ async fn poll_comm_links(db: Database) -> Result<()> {
112112
tasks.insert(name, task);
113113
}
114114
}
115-
while let Some(not) = stream.recv().await {
115+
while let Some(not) = receiver.recv().await {
116116
if should_reload(&not, &cfgs) {
117117
log::info!(
118118
"{} {}: reloading configuration",
119119
not.channel(),
120120
not.payload()
121121
);
122122
tokio::time::sleep(Duration::from_secs(2)).await;
123-
// Empty the notification stream
124-
while !stream.is_empty() {
125-
stream.recv().await;
123+
// Empty the notification receiver
124+
while !receiver.is_empty() {
125+
receiver.recv().await;
126126
}
127127
break;
128128
}

resin/src/notify.rs

Lines changed: 38 additions & 13 deletions
Original file line numberDiff line numberDiff line change
@@ -24,15 +24,20 @@ use tokio_postgres::{AsyncMessage, Client, Connection, Notification, Socket};
2424

2525
/// Handler for Postgres NOTIFY
2626
pub struct Notifier {
27-
/// DB client (kept so it's not dropped)
28-
#[allow(unused)]
29-
client: Client,
3027
/// DB connection
3128
conn: Connection<Socket, NoTlsStream>,
3229
/// Notification sender
3330
tx: UnboundedSender<Notification>,
3431
}
3532

33+
/// Receiver for Postgres NOTIFY
34+
pub struct Receiver {
35+
/// DB client
36+
client: Client,
37+
/// Notification receiver
38+
rx: UnboundedReceiver<Notification>,
39+
}
40+
3641
impl Future for Notifier {
3742
type Output = bool;
3843

@@ -72,18 +77,38 @@ impl Notifier {
7277
}
7378
}
7479

75-
impl Database {
76-
/// Create a new notifier
77-
pub async fn notifier(
78-
self,
80+
impl Receiver {
81+
/// Listen for notifications on channels
82+
pub async fn listen(
83+
&self,
7984
channels: impl Iterator<Item = &str>,
80-
) -> Result<(Notifier, UnboundedReceiver<Notification>)> {
81-
let (client, conn) = self.dedicated_client().await?;
82-
let (tx, rx) = unbounded_channel();
85+
) -> Result<()> {
8386
for channel in channels {
84-
client.execute(&format!("LISTEN {channel}"), &[]).await?;
87+
self.client
88+
.execute(&format!("LISTEN {channel}"), &[])
89+
.await?;
8590
}
86-
let not = Notifier { client, conn, tx };
87-
Ok((not, rx))
91+
Ok(())
92+
}
93+
94+
/// Receive one notification
95+
pub async fn recv(&mut self) -> Option<Notification> {
96+
self.rx.recv().await
97+
}
98+
99+
/// Check if channel is empty
100+
pub fn is_empty(&self) -> bool {
101+
self.rx.is_empty()
102+
}
103+
}
104+
105+
impl Database {
106+
/// Create a new notifier / receiver
107+
pub async fn notifier(self) -> Result<(Notifier, Receiver)> {
108+
let (client, conn) = self.dedicated_client().await?;
109+
let (tx, rx) = unbounded_channel();
110+
let not = Notifier { conn, tx };
111+
let rcv = Receiver { client, rx };
112+
Ok((not, rcv))
88113
}
89114
}

0 commit comments

Comments
 (0)