Compare commits

...

3 Commits

Author SHA1 Message Date
2150fff6a5 gracefull shutdown of SSE loop 2025-05-27 14:06:38 +02:00
e527d41cb7 fixed WorkingDirectory in systemd service 2025-05-27 13:55:29 +02:00
427153a614 gracefull shutdown of PM3 2025-05-27 13:55:06 +02:00
3 changed files with 40 additions and 18 deletions

View File

@@ -12,7 +12,7 @@ Restart=on-failure
RestartSec=5
User=root
Group=root
RuntimeDirectory=/var/lib/fwa
WorkingDirectory=/var/lib/fwa
Environment="PM3_BIN=/usr/local/bin/pm3/pm3"
#Environment="LOG_LEVEL=warn"

View File

@@ -2,8 +2,10 @@ use log::{debug, info, trace, warn};
use std::env;
use std::error::Error;
use std::process::Stdio;
use tokio::io::{AsyncBufReadExt, BufReader};
use tokio::io::{AsyncBufReadExt, AsyncWriteExt, BufReader};
use tokio::process::Command;
use tokio::select;
use tokio::signal::unix::{SignalKind, signal};
use tokio::sync::broadcast;
/// Runs the pm3 binary and monitors it's output
@@ -27,31 +29,43 @@ pub async fn run_pm3(tx: broadcast::Sender<String>) -> Result<(), Box<dyn Error>
.arg("lf hitag reader -@")
.stdout(Stdio::piped())
.stderr(Stdio::null())
.stdin(Stdio::null())
.stdin(Stdio::piped())
.spawn()?;
let stdout = cmd.stdout.take().ok_or("Failed to get stdout")?;
let mut stdin = cmd.stdin.take().ok_or("Failed to get stdin")?;
let mut reader = BufReader::new(stdout).lines();
let mut last_id: String = "".to_owned();
let mut sigterm = signal(SignalKind::terminate())?;
while let Some(line) = reader.next_line().await? {
trace!("PM3: {line}");
if let Some(uid) = super::parser::parse_line(&line) {
if last_id == uid {
tx.send(uid.clone())?;
let child_handle = tokio::spawn(async move {
let mut last_id: String = "".to_owned();
while let Some(line) = reader.next_line().await.unwrap_or(None) {
trace!("PM3: {line}");
if let Some(uid) = super::parser::parse_line(&line) {
if last_id == uid {
let _ = tx.send(uid.clone());
}
last_id = uid;
}
last_id = uid.to_owned();
}
}
});
select! {
_ = child_handle => {}
_ = sigterm.recv() => {
debug!("Graceful shutdown of PM3");
let _ = stdin.write_all(b"\n").await;
let _ = stdin.flush().await;
}
};
let status = cmd.wait().await?;
if status.success() {
Ok(())
} else {
Err("PM3 exited with a non zero exit code".into())
Err("PM3 exited with a non-zero exit code".into())
}
}

View File

@@ -2,7 +2,7 @@ use log::{error, info, warn};
use rocket::http::Status;
use rocket::response::stream::{Event, EventStream};
use rocket::serde::json::Json;
use rocket::{Config, State, post};
use rocket::{Config, Shutdown, State, post};
use rocket::{get, http::ContentType, response::content::RawHtml, routes};
use rust_embed::Embed;
use serde::Deserialize;
@@ -10,6 +10,7 @@ use std::borrow::Cow;
use std::env;
use std::ffi::OsStr;
use std::sync::Arc;
use tokio::select;
use tokio::sync::Mutex;
use tokio::sync::broadcast::Sender;
@@ -84,13 +85,20 @@ fn static_files(file: std::path::PathBuf) -> Option<(ContentType, Vec<u8>)> {
}
#[get("/api/idevent")]
fn id_event(sse_broadcaster: &State<Sender<String>>) -> EventStream![] {
fn id_event(sse_broadcaster: &State<Sender<String>>, shutdown: Shutdown) -> EventStream![] {
let mut rx = sse_broadcaster.subscribe();
EventStream! {
loop {
let msg = rx.recv().await;
if let Ok(id) = msg {
yield Event::data(id);
select! {
msg = rx.recv() => {
if let Ok(id) = msg {
yield Event::data(id);
}
}
_ = &mut shutdown.clone() => {
// Shutdown signal received, exit the loop
break;
}
}
}
}