diff --git a/src/main.rs b/src/main.rs index f3a2c56..69876d1 100644 --- a/src/main.rs +++ b/src/main.rs @@ -10,7 +10,7 @@ use std::{env, error::Error, sync::Arc}; use tally_id::TallyID; use tokio::{ fs, join, - sync::{Mutex, mpsc}, + sync::{Mutex, broadcast, mpsc}, }; use webserver::start_webserver; @@ -20,6 +20,7 @@ use mock::{MockBuzzer, MockHotspot, MockLed}; mod buzzer; mod color; mod hotspot; +mod id_mapping; mod id_store; mod led; mod mock; @@ -27,7 +28,6 @@ mod parser; mod pm3; mod tally_id; mod webserver; -mod id_mapping; const STORE_PATH: &str = "./data.json"; const PWM_CHANNEL_BUZZER: Channel = Channel::Pwm0; //PWM0 = GPIO18/Physical pin 12 @@ -139,7 +139,8 @@ async fn main() -> Result<(), Box> { info!("Starting application"); - let (tx, mut rx) = mpsc::channel::(32); + let (tx, mut rx) = broadcast::channel::(32); + let sse_tx = tx.clone(); tokio::spawn(async move { match run_pm3(tx).await { @@ -179,7 +180,7 @@ async fn main() -> Result<(), Box> { let channel_store = store.clone(); tokio::spawn(async move { - while let Some(tally_id_string) = rx.recv().await { + while let Ok(tally_id_string) = rx.recv().await { let tally_id = TallyID(tally_id_string); if hotspot_ids.contains(&tally_id) { @@ -204,7 +205,7 @@ async fn main() -> Result<(), Box> { } }); - match start_webserver(store.clone()).await { + match start_webserver(store.clone(), sse_tx).await { Ok(()) => {} Err(e) => { error!("Failed to start webserver: {e}"); diff --git a/src/pm3.rs b/src/pm3.rs index 5bf98c3..a1a8b4f 100644 --- a/src/pm3.rs +++ b/src/pm3.rs @@ -4,12 +4,12 @@ use std::error::Error; use std::process::Stdio; use tokio::io::{AsyncBufReadExt, BufReader}; use tokio::process::Command; -use tokio::sync::mpsc; +use tokio::sync::broadcast; /// Runs the pm3 binary and monitors it's output /// The pm3 binary is ether set in the env var PM3_BIN or found in the path /// The ouput is parsed and send via the `tx` channel -pub async fn run_pm3(tx: mpsc::Sender) -> Result<(), Box> { +pub async fn run_pm3(tx: broadcast::Sender) -> Result<(), Box> { let pm3_path = match env::var("PM3_BIN") { Ok(path) => path, Err(_) => { @@ -35,7 +35,7 @@ pub async fn run_pm3(tx: mpsc::Sender) -> Result<(), Box> { while let Some(line) = reader.next_line().await? { trace!("PM3: {line}"); if let Some(uid) = super::parser::parse_line(&line) { - tx.send(uid).await?; + tx.send(uid)?; } } diff --git a/src/webserver.rs b/src/webserver.rs index e18a7a7..f7b37c7 100644 --- a/src/webserver.rs +++ b/src/webserver.rs @@ -1,6 +1,6 @@ -use log::{debug, error, info, warn}; -use rocket::data::FromData; +use log::{error, info, warn}; use rocket::http::Status; +use rocket::response::stream::{Event, EventStream}; use rocket::serde::json::Json; use rocket::{Config, State, post}; use rocket::{get, http::ContentType, response::content::RawHtml, routes}; @@ -10,8 +10,8 @@ use std::borrow::Cow; use std::env; use std::ffi::OsStr; use std::sync::Arc; -use std::time::Instant; use tokio::sync::Mutex; +use tokio::sync::broadcast::Sender; use crate::id_mapping::{IDMapping, Name}; use crate::id_store::IDStore; @@ -27,7 +27,10 @@ struct NewMapping { name: Name, } -pub async fn start_webserver(store: Arc>) -> Result<(), rocket::Error> { +pub async fn start_webserver( + store: Arc>, + sse_broadcaster: Sender, +) -> Result<(), rocket::Error> { let port = match env::var("HTTP_PORT") { Ok(port) => port.parse().unwrap_or_else(|_| { warn!("Failed to parse HTTP_PORT. Using default 80"); @@ -45,9 +48,17 @@ pub async fn start_webserver(store: Arc>) -> Result<(), rocket::E rocket::custom(config) .mount( "/", - routes![static_files, index, export_csv, get_mapping, add_mapping], + routes![ + static_files, + index, + export_csv, + id_event, + get_mapping, + add_mapping + ], ) .manage(store) + .manage(sse_broadcaster) .launch() .await?; Ok(()) @@ -72,6 +83,19 @@ fn static_files(file: std::path::PathBuf) -> Option<(ContentType, Vec)> { Some((content_type, asset.data.into_owned())) } +#[get("/api/idevent")] +fn id_event(sse_broadcaster: &State>) -> EventStream![] { + let mut rx = sse_broadcaster.subscribe(); + EventStream! { + loop { + let msg = rx.recv().await; + if let Ok(id) = msg { + yield Event::data(id); + } + } + } +} + #[get("/api/csv")] async fn export_csv(manager: &State>>) -> Result { info!("Exporting CSV");