From eb0b25a9fb0376c1a05751f04f3d754033cdad17 Mon Sep 17 00:00:00 2001 From: Christopher Sardegna Date: Tue, 10 Jun 2025 20:40:44 -0700 Subject: [PATCH 1/4] Update error string --- src/util/error.rs | 2 +- 1 file changed, 1 insertion(+), 1 deletion(-) diff --git a/src/util/error.rs b/src/util/error.rs index 57898a7..45c0656 100644 --- a/src/util/error.rs +++ b/src/util/error.rs @@ -41,7 +41,7 @@ impl Display for LogriaError { write!(fmt, "Invalid format description: {msg}") } LogriaError::InvalidCommand(msg) => { - write!(fmt, "Invalid poll command: {msg}") + write!(fmt, "Invalid command: {msg}") } LogriaError::CannotParseMessage(msg) => { write!(fmt, "Unable to parse message: {msg}") From de91a5c7e10a51898911d48f78ec3cc29a52c88a Mon Sep 17 00:00:00 2001 From: Christopher Sardegna Date: Tue, 10 Jun 2025 20:40:49 -0700 Subject: [PATCH 2/4] Handle streams, fix assignment bug if saving fails --- src/communication/handlers/startup.rs | 14 ++++++++++---- 1 file changed, 10 insertions(+), 4 deletions(-) diff --git a/src/communication/handlers/startup.rs b/src/communication/handlers/startup.rs index 0dd6b61..f28d994 100644 --- a/src/communication/handlers/startup.rs +++ b/src/communication/handlers/startup.rs @@ -82,7 +82,13 @@ impl StartupHandler { Ok(streams) => streams, Err(why) => { window.write_to_command_line(&why.to_string())?; - build_streams_from_input(&[command.to_owned()], false).unwrap() + match build_streams_from_input(&[command.to_owned()], false) { + Ok(streams) => streams, + Err(why) => { + window.write_to_command_line(&why.to_string())?; + return Ok(()); + } + } } }; window.config.stream_type = StdErr; @@ -204,7 +210,7 @@ mod startup_tests { } #[test] - fn doesnt_crash_alpha() { + fn doesnt_crash_invalid_command_startup() { // Setup dummy window let mut window = MainWindow::_new_dummy(); window.config.stream_type = StreamType::Auxiliary; @@ -219,8 +225,8 @@ mod startup_tests { .process_command(&mut window, "zzzfake_file_name") .is_ok() ); - assert!(matches!(window.input_type, InputType::Normal)); - assert!(matches!(window.config.stream_type, StreamType::StdErr)); + assert!(matches!(window.input_type, InputType::Startup)); + assert!(matches!(window.config.stream_type, StreamType::Auxiliary)); Session::del(&[Session::list_full().len() - 1]).unwrap(); } } From 1620066e82afe8af77b0aac2153c101dcef61812 Mon Sep 17 00:00:00 2001 From: Christopher Sardegna Date: Tue, 10 Jun 2025 20:40:53 -0700 Subject: [PATCH 3/4] Use `AtomicBool,` store thread handle and child process --- src/communication/input.rs | 88 +++++++++++++++++++++----------------- 1 file changed, 48 insertions(+), 40 deletions(-) diff --git a/src/communication/input.rs b/src/communication/input.rs index 516ce18..bb96467 100644 --- a/src/communication/input.rs +++ b/src/communication/input.rs @@ -18,10 +18,11 @@ use std::{ fs::File, io::{BufRead, BufReader}, path::Path, - process::{Command, Stdio}, + process::{Child, Command, Stdio}, result::Result, sync::{ Arc, Mutex, + atomic::{AtomicBool, Ordering}, mpsc::{Receiver, channel}, }, thread, time, @@ -31,8 +32,10 @@ use std::{ pub struct InputStream { pub stdout: Receiver, pub stderr: Receiver, - pub should_die: Arc>, + pub should_die: Arc, pub _type: String, + pub handle: Option>, + pub child: Option, } pub trait Input { @@ -86,8 +89,10 @@ impl Input for FileInput { Ok(InputStream { stdout: out_rx, stderr: err_rx, - should_die: Arc::new(Mutex::new(false)), + should_die: Arc::new(AtomicBool::new(false)), _type: String::from("FileInput"), + handle: None, // No handle needed for file input + child: None, // No child process for file input }) } } @@ -105,45 +110,49 @@ impl CommandInput { impl Input for CommandInput { /// Create a command input fn build(name: String, command: String) -> Result { + let command_to_run = CommandInput::parse_command(&command); + let mut child = match Command::new(command_to_run[0]) + .args(&command_to_run[1..]) + .current_dir(current_dir().unwrap()) + .stdout(Stdio::piped()) + .stderr(Stdio::piped()) + .stdin(Stdio::null()) + .spawn() + { + Ok(child) => child, + Err(why) => { + return Err(LogriaError::InvalidCommand(format!( + "Unable to connect to process: {why}" + ))); + } + }; + + // Get stdout and stderr handles + let stdout = child.stdout.take().unwrap(); + let stderr = child.stderr.take().unwrap(); + // Setup multiprocessing queues let (err_tx, err_rx) = channel(); let (out_tx, out_rx) = channel(); // Provide check for termination outside of the thread - let should_die = Arc::new(Mutex::new(false)); - let die = should_die.clone(); + let should_die = Arc::new(AtomicBool::new(false)); + let should_die_clone = Arc::clone(&should_die); // Handle poll rate for each stream let poll_rate_stdout = Arc::new(Mutex::new(RollingMean::new(5))); let poll_rate_stderr = Arc::new(Mutex::new(RollingMean::new(5))); // Start reading from the queues - let _ = thread::Builder::new() + let handle = thread::Builder::new() .name(format!("CommandInput: {name}")) .spawn(move || { - let command_to_run = CommandInput::parse_command(&command); - let mut child = match Command::new(command_to_run[0]) - .args(&command_to_run[1..]) - .current_dir(current_dir().unwrap()) - .stdout(Stdio::piped()) - .stderr(Stdio::piped()) - .stdin(Stdio::null()) - .spawn() - { - Ok(child) => child, - Err(why) => panic!("Unable to connect to process: {why}"), - }; - - // Get stdout and stderr handles - let stdout = child.stdout.take().unwrap(); - let stderr = child.stderr.take().unwrap(); - // Create readers let mut stdout_reader = BufReader::new(stdout); let mut stderr_reader = BufReader::new(stderr); // Create threads to read stdout and stderr independently - let die_clone = die.clone(); + let die_clone = Arc::clone(&should_die_clone); let poll_stdout = poll_rate_stdout.clone(); let stdout_handle = thread::spawn(move || { loop { @@ -151,6 +160,11 @@ impl Input for CommandInput { poll_stdout.lock().unwrap().mean(), )); + // Exit if the process is requested to die + if die_clone.load(Ordering::Relaxed) { + break; + } + let mut buf_stdout = String::new(); let timestamp = time::Instant::now(); stdout_reader.read_line(&mut buf_stdout).unwrap(); @@ -171,14 +185,10 @@ impl Input for CommandInput { .lock() .unwrap() .update(ms_per_message(timestamp.elapsed(), 1)); - - if *die_clone.lock().unwrap() { - break; - } } }); - let die_clone = die.clone(); + let die_clone = Arc::clone(&should_die_clone); let poll_stderr = poll_rate_stderr.clone(); let stderr_handle = thread::spawn(move || { loop { @@ -186,6 +196,11 @@ impl Input for CommandInput { poll_stderr.lock().unwrap().mean(), )); + // Exit if the process is requested to die + if die_clone.load(Ordering::Relaxed) { + break; + } + let mut buf_stderr = String::new(); let timestamp = time::Instant::now(); stderr_reader.read_line(&mut buf_stderr).unwrap(); @@ -206,29 +221,22 @@ impl Input for CommandInput { .lock() .unwrap() .update(ms_per_message(timestamp.elapsed(), 1)); - - if *die_clone.lock().unwrap() { - break; - } } }); // Wait for both readers to complete stdout_handle.join().unwrap(); stderr_handle.join().unwrap(); - - // Kill the child process if requested - if *die.lock().unwrap() { - let _ = child.kill(); - } - let _ = child.wait(); - }); + }) + .unwrap(); Ok(InputStream { stdout: out_rx, stderr: err_rx, should_die, _type: String::from("CommandInput"), + handle: Some(handle), + child: Some(child), }) } } From 35369814376aedfe4d56f4e1af05e3b096612180 Mon Sep 17 00:00:00 2001 From: Christopher Sardegna Date: Tue, 10 Jun 2025 20:40:57 -0700 Subject: [PATCH 4/4] Fix #132 --- src/communication/reader.rs | 28 +++++++++++++++++++++++++--- 1 file changed, 25 insertions(+), 3 deletions(-) diff --git a/src/communication/reader.rs b/src/communication/reader.rs index a79bc07..7cd519b 100644 --- a/src/communication/reader.rs +++ b/src/communication/reader.rs @@ -3,6 +3,7 @@ use std::{ cmp::max, io::{Result, Write, stdout}, panic, + sync::atomic::Ordering, time::{Duration, Instant}, }; @@ -123,6 +124,8 @@ pub struct LogriaConfig { pub current_status: Option, /// Function that can generate messages for display pub generate_auxiliary_messages: Option Vec>, + /// False if the app should continue running, True if it should stop + pub should_exit: bool, } pub struct MainWindow { @@ -244,6 +247,7 @@ impl MainWindow { generate_auxiliary_messages: None, current_status: None, message_speed_tracker: RollingMean::new(5), + should_exit: false, }, } } @@ -805,7 +809,7 @@ impl MainWindow { Ok(streams) => self.config.streams = streams, Err(why) => { self.write_to_command_line(&why.to_string())?; - build_streams_from_input(&c, false).unwrap(); + self.config.streams = build_streams_from_input(&c, false).unwrap(); } } @@ -837,9 +841,22 @@ impl MainWindow { execute!(stdout(), cursor::Show, Clear(ClearType::All))?; disable_raw_mode()?; for stream in &self.config.streams { - *stream.should_die.lock().unwrap() = true; + stream.should_die.store(true, Ordering::Relaxed); } - std::process::exit(0); + + for mut stream in self.config.streams.drain(..) { + if let Some(mut child) = stream.child.take() { + // Exit the child process's blocking read + let _ = child.kill(); + } + if let Some(handle) = stream.handle.take() { + // Wait for the thread to finish + let _ = handle.join(); + } + } + + self.config.should_exit = true; + Ok(()) } /// Update stderr and stdout buffers from every stream's queue @@ -903,12 +920,17 @@ impl MainWindow { let num_new_messages = self.receive_streams(); self.handle_smart_poll_rate(self.config.loop_time.elapsed(), num_new_messages); + if self.config.should_exit { + break Ok(()); + } + if poll(Duration::from_millis(self.config.poll_rate))? { match read()? { Event::Key(input) => { // Die on Ctrl-C if input == exit_key { self.quit()?; + return Ok(()); } // Otherwise, match input to action