Skip to content

Commit be7357c

Browse files
committed
Support websocket transport for MQTT.
This also includes a workaround for an issue in rumqttc where the `parse_url` function did not properly handle websocket URLs.
1 parent 205071d commit be7357c

File tree

4 files changed

+166
-5
lines changed

4 files changed

+166
-5
lines changed

Cargo.lock

Lines changed: 100 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -30,7 +30,7 @@ reqwest = { version = "0.12.5", default-features = false, features = [
3030
"stream",
3131
] }
3232
rhai = { version = "1.19.0", features = ["serde", "sync"] }
33-
rumqttc = { version = "0.24.0", features = ["url"] }
33+
rumqttc = { version = "0.24.0", features = ["url", "websocket"] }
3434
serde = { version = "1.0.204", features = ["derive", "rc"] }
3535
serde_json = "1.0.120"
3636
serde_with = "3.8.3"

src/connection/mod.rs

Lines changed: 22 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -50,13 +50,34 @@ pub struct WebSocketConnectionConfig {
5050

5151
#[derive(Debug, Deserialize)]
5252
pub struct MqttConnectionConfig {
53-
url: String,
53+
#[serde(flatten)]
54+
params: MqttConnectionParams,
5455
publish_topic: String,
5556
action_topic: Option<String>,
5657
#[serde(default)]
5758
allow_actions: bool,
5859
}
5960

61+
#[derive(Debug, Deserialize)]
62+
#[serde(untagged)]
63+
pub enum MqttConnectionParams {
64+
Url {
65+
url: String,
66+
},
67+
Options {
68+
client_id: String,
69+
host: String,
70+
port: Option<u16>,
71+
credentials: Option<MqttCredentials>,
72+
},
73+
}
74+
75+
#[derive(Debug, Deserialize)]
76+
pub struct MqttCredentials {
77+
username: String,
78+
password: String,
79+
}
80+
6081
#[derive(Debug, Deserialize)]
6182
#[serde(tag = "action", rename_all = "snake_case")]
6283
pub enum ConnectionAction {

src/connection/mqtt.rs

Lines changed: 43 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -1,6 +1,7 @@
11
use std::collections::{HashMap, HashSet};
22

33
use async_trait::async_trait;
4+
use eyre::{bail, OptionExt};
45
use tokio::{
56
select,
67
sync::mpsc::{channel, Sender},
@@ -23,7 +24,48 @@ impl MqttConnection {
2324
action_tx: Sender<super::ConnectionAction>,
2425
) -> eyre::Result<Self> {
2526
let (tx, mut rx) = channel::<(serde_json::Value, HashMap<String, String>)>(1);
26-
let options = rumqttc::MqttOptions::parse_url(config.url)?;
27+
28+
let options = match config.params {
29+
super::MqttConnectionParams::Url { url } => rumqttc::MqttOptions::parse_url(url)?,
30+
super::MqttConnectionParams::Options {
31+
client_id,
32+
host,
33+
port,
34+
credentials,
35+
} => {
36+
let mut options = if let Ok(url) = url::Url::parse(&host) {
37+
let (transport, host) = match url.scheme() {
38+
"mqtt" => (
39+
rumqttc::Transport::tcp(),
40+
url.host_str().ok_or_eyre("mqtt url scheme missing host")?,
41+
),
42+
"mqtts" => (
43+
rumqttc::Transport::tls_with_default_config(),
44+
url.host_str().ok_or_eyre("mqtts url scheme missing host")?,
45+
),
46+
"ws" => (rumqttc::Transport::ws(), url.as_str()),
47+
"wss" => (rumqttc::Transport::wss_with_default_config(), url.as_str()),
48+
scheme => bail!("unknown scheme: {scheme}"),
49+
};
50+
51+
let mut options =
52+
rumqttc::MqttOptions::new(client_id, host, url.port().unwrap_or(1883));
53+
54+
options.set_transport(transport);
55+
56+
options
57+
} else {
58+
rumqttc::MqttOptions::new(client_id, host, port.unwrap_or(1883))
59+
};
60+
61+
if let Some(super::MqttCredentials { username, password }) = credentials {
62+
options.set_credentials(username, password);
63+
}
64+
65+
options
66+
}
67+
};
68+
2769
let (client, mut event_loop) = rumqttc::AsyncClient::new(options, 1);
2870

2971
if let Some(action_topic) = config.action_topic.as_deref() {

0 commit comments

Comments
 (0)