implemented server side event stream for ids

- changed channel to brodcast
- added route /api/idevent for SSE stream
This commit is contained in:
Djeeberjr 2025-05-18 19:55:25 +02:00
parent 7f94362069
commit 3f7f209c91
3 changed files with 38 additions and 13 deletions

View File

@ -10,7 +10,7 @@ use std::{env, error::Error, sync::Arc};
use tally_id::TallyID; use tally_id::TallyID;
use tokio::{ use tokio::{
fs, join, fs, join,
sync::{Mutex, mpsc}, sync::{Mutex, broadcast, mpsc},
}; };
use webserver::start_webserver; use webserver::start_webserver;
@ -20,6 +20,7 @@ use mock::{MockBuzzer, MockHotspot, MockLed};
mod buzzer; mod buzzer;
mod color; mod color;
mod hotspot; mod hotspot;
mod id_mapping;
mod id_store; mod id_store;
mod led; mod led;
mod mock; mod mock;
@ -27,7 +28,6 @@ mod parser;
mod pm3; mod pm3;
mod tally_id; mod tally_id;
mod webserver; mod webserver;
mod id_mapping;
const STORE_PATH: &str = "./data.json"; const STORE_PATH: &str = "./data.json";
const PWM_CHANNEL_BUZZER: Channel = Channel::Pwm0; //PWM0 = GPIO18/Physical pin 12 const PWM_CHANNEL_BUZZER: Channel = Channel::Pwm0; //PWM0 = GPIO18/Physical pin 12
@ -139,7 +139,8 @@ async fn main() -> Result<(), Box<dyn Error>> {
info!("Starting application"); info!("Starting application");
let (tx, mut rx) = mpsc::channel::<String>(32); let (tx, mut rx) = broadcast::channel::<String>(32);
let sse_tx = tx.clone();
tokio::spawn(async move { tokio::spawn(async move {
match run_pm3(tx).await { match run_pm3(tx).await {
@ -179,7 +180,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
let channel_store = store.clone(); let channel_store = store.clone();
tokio::spawn(async move { tokio::spawn(async move {
while let Some(tally_id_string) = rx.recv().await { while let Ok(tally_id_string) = rx.recv().await {
let tally_id = TallyID(tally_id_string); let tally_id = TallyID(tally_id_string);
if hotspot_ids.contains(&tally_id) { if hotspot_ids.contains(&tally_id) {
@ -204,7 +205,7 @@ async fn main() -> Result<(), Box<dyn Error>> {
} }
}); });
match start_webserver(store.clone()).await { match start_webserver(store.clone(), sse_tx).await {
Ok(()) => {} Ok(()) => {}
Err(e) => { Err(e) => {
error!("Failed to start webserver: {e}"); error!("Failed to start webserver: {e}");

View File

@ -4,12 +4,12 @@ use std::error::Error;
use std::process::Stdio; use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::process::Command; use tokio::process::Command;
use tokio::sync::mpsc; use tokio::sync::broadcast;
/// Runs the pm3 binary and monitors it's output /// Runs the pm3 binary and monitors it's output
/// The pm3 binary is ether set in the env var PM3_BIN or found in the path /// The pm3 binary is ether set in the env var PM3_BIN or found in the path
/// The ouput is parsed and send via the `tx` channel /// The ouput is parsed and send via the `tx` channel
pub async fn run_pm3(tx: mpsc::Sender<String>) -> Result<(), Box<dyn Error>> { pub async fn run_pm3(tx: broadcast::Sender<String>) -> Result<(), Box<dyn Error>> {
let pm3_path = match env::var("PM3_BIN") { let pm3_path = match env::var("PM3_BIN") {
Ok(path) => path, Ok(path) => path,
Err(_) => { Err(_) => {
@ -35,7 +35,7 @@ pub async fn run_pm3(tx: mpsc::Sender<String>) -> Result<(), Box<dyn Error>> {
while let Some(line) = reader.next_line().await? { while let Some(line) = reader.next_line().await? {
trace!("PM3: {line}"); trace!("PM3: {line}");
if let Some(uid) = super::parser::parse_line(&line) { if let Some(uid) = super::parser::parse_line(&line) {
tx.send(uid).await?; tx.send(uid)?;
} }
} }

View File

@ -1,6 +1,6 @@
use log::{debug, error, info, warn}; use log::{error, info, warn};
use rocket::data::FromData;
use rocket::http::Status; use rocket::http::Status;
use rocket::response::stream::{Event, EventStream};
use rocket::serde::json::Json; use rocket::serde::json::Json;
use rocket::{Config, State, post}; use rocket::{Config, State, post};
use rocket::{get, http::ContentType, response::content::RawHtml, routes}; use rocket::{get, http::ContentType, response::content::RawHtml, routes};
@ -10,8 +10,8 @@ use std::borrow::Cow;
use std::env; use std::env;
use std::ffi::OsStr; use std::ffi::OsStr;
use std::sync::Arc; use std::sync::Arc;
use std::time::Instant;
use tokio::sync::Mutex; use tokio::sync::Mutex;
use tokio::sync::broadcast::Sender;
use crate::id_mapping::{IDMapping, Name}; use crate::id_mapping::{IDMapping, Name};
use crate::id_store::IDStore; use crate::id_store::IDStore;
@ -27,7 +27,10 @@ struct NewMapping {
name: Name, name: Name,
} }
pub async fn start_webserver(store: Arc<Mutex<IDStore>>) -> Result<(), rocket::Error> { pub async fn start_webserver(
store: Arc<Mutex<IDStore>>,
sse_broadcaster: Sender<String>,
) -> Result<(), rocket::Error> {
let port = match env::var("HTTP_PORT") { let port = match env::var("HTTP_PORT") {
Ok(port) => port.parse().unwrap_or_else(|_| { Ok(port) => port.parse().unwrap_or_else(|_| {
warn!("Failed to parse HTTP_PORT. Using default 80"); warn!("Failed to parse HTTP_PORT. Using default 80");
@ -45,9 +48,17 @@ pub async fn start_webserver(store: Arc<Mutex<IDStore>>) -> Result<(), rocket::E
rocket::custom(config) rocket::custom(config)
.mount( .mount(
"/", "/",
routes![static_files, index, export_csv, get_mapping, add_mapping], routes![
static_files,
index,
export_csv,
id_event,
get_mapping,
add_mapping
],
) )
.manage(store) .manage(store)
.manage(sse_broadcaster)
.launch() .launch()
.await?; .await?;
Ok(()) Ok(())
@ -72,6 +83,19 @@ fn static_files(file: std::path::PathBuf) -> Option<(ContentType, Vec<u8>)> {
Some((content_type, asset.data.into_owned())) Some((content_type, asset.data.into_owned()))
} }
#[get("/api/idevent")]
fn id_event(sse_broadcaster: &State<Sender<String>>) -> EventStream![] {
let mut rx = sse_broadcaster.subscribe();
EventStream! {
loop {
let msg = rx.recv().await;
if let Ok(id) = msg {
yield Event::data(id);
}
}
}
}
#[get("/api/csv")] #[get("/api/csv")]
async fn export_csv(manager: &State<Arc<Mutex<IDStore>>>) -> Result<String, Status> { async fn export_csv(manager: &State<Arc<Mutex<IDStore>>>) -> Result<String, Status> {
info!("Exporting CSV"); info!("Exporting CSV");