Skip to content

Commit 3464617

Browse files
committed
Implement handling of mesh_stats.
This message is generated by the ChirpStack Gateway Mesh component, when using this as a proxy to the ChirpStack Concentratord in case of a mesh setup.
1 parent 963cf74 commit 3464617

File tree

5 files changed

+54
-4
lines changed

5 files changed

+54
-4
lines changed

Cargo.lock

Lines changed: 2 additions & 2 deletions
Some generated files are not rendered by default. Learn more about customizing how changed files appear on GitHub.

Cargo.toml

Lines changed: 1 addition & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -16,7 +16,7 @@
1616
"usage",
1717
"derive",
1818
] }
19-
chirpstack_api = { version = "4.8", default-features = false, features = [
19+
chirpstack_api = { version = "4.9.0-test.1", default-features = false, features = [
2020
"json",
2121
] }
2222
lrwn_filters = { version = "4.7", features = ["serde"] }

src/backend/concentratord.rs

Lines changed: 6 additions & 1 deletion
Original file line numberDiff line numberDiff line change
@@ -13,7 +13,7 @@ use tokio::task;
1313
use super::Backend as BackendTrait;
1414
use crate::config::Configuration;
1515
use crate::metadata;
16-
use crate::mqtt::{send_gateway_stats, send_tx_ack, send_uplink_frame};
16+
use crate::mqtt::{send_gateway_stats, send_mesh_stats, send_tx_ack, send_uplink_frame};
1717

1818
pub struct Backend {
1919
gateway_id: String,
@@ -291,6 +291,11 @@ async fn handle_event_msg(
291291
pl.metadata.extend(metadata::get().await?);
292292
send_gateway_stats(&pl).await?;
293293
}
294+
"mesh_stats" => {
295+
let pl = gw::MeshStats::decode(pl)?;
296+
info!("Received mesh stats");
297+
send_mesh_stats(&pl).await?;
298+
}
294299
_ => {
295300
return Err(anyhow!("Unexpected event: {}", event));
296301
}

src/mqtt.rs

Lines changed: 15 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -286,6 +286,21 @@ pub async fn send_gateway_stats(pl: &gw::GatewayStats) -> Result<()> {
286286
Ok(())
287287
}
288288

289+
pub async fn send_mesh_stats(pl: &gw::MeshStats) -> Result<()> {
290+
let state = STATE.get().ok_or_else(|| anyhow!("STATE is not set"))?;
291+
292+
let b = match state.json {
293+
true => serde_json::to_vec(&pl)?,
294+
false => pl.encode_to_vec(),
295+
};
296+
let topic = get_event_topic(&state.topic_prefix, &state.gateway_id, "mesh-stats");
297+
info!("Sending mesh stats event, topic: {}", topic);
298+
state.client.publish(topic, state.qos, false, b).await?;
299+
trace!("Message published");
300+
301+
Ok(())
302+
}
303+
289304
pub async fn send_tx_ack(pl: &gw::DownlinkTxAck) -> Result<()> {
290305
let state = STATE.get().ok_or_else(|| anyhow!("STATE is not set"))?;
291306

tests/concentratord_test.rs

Lines changed: 30 additions & 0 deletions
Original file line numberDiff line numberDiff line change
@@ -187,6 +187,36 @@ async fn end_to_end() {
187187
pl
188188
);
189189

190+
// Mesh Stats
191+
let mesh_stats_pl = gw::MeshStats {
192+
gateway_id: "0102030405060708".into(),
193+
..Default::default()
194+
};
195+
thread::spawn({
196+
let zmq_pub = zmq_pub.clone();
197+
let mesh_stats_pl = mesh_stats_pl.encode_to_vec();
198+
199+
move || {
200+
let zmq_pub = zmq_pub.lock().unwrap();
201+
zmq_pub.send("mesh_stats", zmq::SNDMORE).unwrap();
202+
zmq_pub.send(mesh_stats_pl, 0).unwrap();
203+
}
204+
});
205+
206+
let mqtt_msg = mqtt_rx.recv().await.unwrap();
207+
assert_eq!(
208+
"eu868/gateway/0102030405060708/event/mesh-stats",
209+
String::from_utf8(mqtt_msg.topic.to_vec()).unwrap()
210+
);
211+
let pl = gw::MeshStats::decode(&mut Cursor::new(mqtt_msg.payload.to_vec())).unwrap();
212+
assert_eq!(
213+
gw::MeshStats {
214+
gateway_id: "0102030405060708".into(),
215+
..Default::default()
216+
},
217+
pl
218+
);
219+
190220
// Downlink
191221
let down_pl = gw::DownlinkFrame {
192222
gateway_id: "0102030405060708".into(),

0 commit comments

Comments
 (0)