diff --git a/src/commands/ssh/mod.rs b/src/commands/ssh/mod.rs index fa89ac7..2a00b81 100644 --- a/src/commands/ssh/mod.rs +++ b/src/commands/ssh/mod.rs @@ -6,9 +6,12 @@ use crate::config::Configs; pub const SSH_CONNECTION_TIMEOUT_SECS: u64 = 30; pub const SSH_MESSAGE_TIMEOUT_SECS: u64 = 10; -pub const SSH_MAX_RECONNECT_ATTEMPTS: usize = 3; -pub const SSH_RECONNECT_DELAY_SECS: u64 = 5; +pub const SSH_MAX_CONNECT_ATTEMPTS: usize = 3; +pub const SSH_CONNECT_DELAY_SECS: u64 = 5; pub const SSH_MAX_EMPTY_MESSAGES: usize = 100; +pub const SSH_IDLE_THRESHOLD_SECS: u64 = 5; +pub const SSH_MAX_RECONNECT_ATTEMPTS: u8 = 10; +pub const SSH_RECONNECT_DELAY_MS: u64 = 1000; mod common; mod platform; diff --git a/src/commands/ssh/platform/unix.rs b/src/commands/ssh/platform/unix.rs index 7d34f3a..d17607d 100644 --- a/src/commands/ssh/platform/unix.rs +++ b/src/commands/ssh/platform/unix.rs @@ -1,5 +1,6 @@ use anyhow::Result; use crossterm::terminal; +use std::io::Write; use tokio::io::AsyncReadExt; use tokio::select; @@ -23,38 +24,112 @@ pub async fn run_interactive_session(client: &mut TerminalClient) -> Result<()> let mut stdin = tokio::io::stdin(); let mut stdin_buf = [0u8; 1024]; let mut exit_code = None; + let mut needs_init = false; let (mut sigint, mut sigterm, mut sigwinch) = setup_signal_handlers().await?; // Main event loop loop { + // If reconnection happened and needs re-initialization, do it first + if needs_init { + if let Err(e) = client.init_shell(None).await { + eprintln!("Failed to re-initialize shell: {}", e); + exit_code = Some(1); + break; + } + needs_init = false; + + // Reset terminal state + // Clear line and move cursor to beginning of line + print!("\r\x1B[K"); + std::io::stdout().flush()?; + + // After re-initialization, send window size only if shell is ready + if client.is_ready() { + if let Ok((cols, rows)) = terminal::size() { + if let Err(e) = client.send_window_size(cols, rows).await { + if !e.to_string().contains("Shell not ready yet") { + eprintln!("Failed to update window size: {}", e); + } + } + } + } + } + + // Check if shell is ready for input + let is_ready = client.is_ready(); + select! { // Handle window resizes _ = sigwinch.recv() => { if let Ok((cols, rows)) = terminal::size() { - client.send_window_size(cols, rows).await?; + if is_ready { + match client.send_window_size(cols, rows).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + eprintln!("Failed to update window size: {}", e); + } + } + } + } } continue; } // Handle signals _ = sigint.recv() => { - client.send_signal(2).await?; // SIGINT + if is_ready { + match client.send_signal(2).await { // SIGINT + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + eprintln!("Failed to send SIGINT: {}", e); + } + } + } + } continue; } _ = sigterm.recv() => { - client.send_signal(15).await?; // SIGTERM + if is_ready { + match client.send_signal(15).await { // SIGTERM + Ok(_) => {}, + Err(e) => { + if !e.to_string().contains("reconnected but needs re-initialization") + && !e.to_string().contains("Shell not ready yet") { + eprintln!("Failed to send SIGTERM: {}", e); + } + } + } + } break; } - // Handle input from terminal - result = stdin.read(&mut stdin_buf) => { + // Handle input from terminal only if shell is ready + result = stdin.read(&mut stdin_buf), if is_ready => { match result { Ok(0) => break, // EOF Ok(n) => { let data = String::from_utf8_lossy(&stdin_buf[..n]); - client.send_data(&data).await?; + match client.send_data(&data).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + eprintln!("Error sending data: {}", e); + exit_code = Some(1); + break; + } + } + } } Err(e) => { eprintln!("Error reading from stdin: {}", e); + exit_code = Some(1); break; } } @@ -62,14 +137,19 @@ pub async fn run_interactive_session(client: &mut TerminalClient) -> Result<()> // Handle messages from server result = client.handle_server_messages() => { match result { - Ok(()) => { + Ok(()) => { exit_code = Some(0); break; } Err(e) => { - eprintln!("Error: {}", e); - exit_code = Some(1); - break; + if e.to_string().contains("reconnected but needs re-initialization") { + needs_init = true; + continue; + } else { + eprintln!("Error: {}", e); + exit_code = Some(1); + break; + } } } } @@ -79,6 +159,10 @@ pub async fn run_interactive_session(client: &mut TerminalClient) -> Result<()> // Clean up terminal when done let _ = terminal::disable_raw_mode(); + // Ensure cursor is visible with ANSI escape sequence + print!("\x1b[?25h"); + std::io::stdout().flush()?; + if let Some(code) = exit_code { std::process::exit(code); } diff --git a/src/commands/ssh/platform/windows.rs b/src/commands/ssh/platform/windows.rs index dafb59b..2315c73 100644 --- a/src/commands/ssh/platform/windows.rs +++ b/src/commands/ssh/platform/windows.rs @@ -2,6 +2,7 @@ use anyhow::Result; use crossterm::event::{self, Event, KeyCode, KeyEvent, KeyModifiers}; use crossterm::terminal; use futures_util::stream::StreamExt; +use std::io::Write; use tokio::io::AsyncReadExt; use tokio::select; use tokio::time::Duration; @@ -18,19 +19,38 @@ pub async fn run_interactive_session(client: &mut TerminalClient) -> Result<()> let mut stdin = tokio::io::stdin(); let mut stdin_buf = [0u8; 1024]; let mut exit_code = None; + let mut needs_init = false; let _ = setup_signal_handlers().await?; // Event handling differs based on available features #[cfg(feature = "event-stream")] - let run_result = run_with_event_stream(client, &mut stdin, &mut stdin_buf).await; + let run_result = run_with_event_stream( + client, + &mut stdin, + &mut stdin_buf, + &mut needs_init, + &mut exit_code, + ) + .await; #[cfg(not(feature = "event-stream"))] - let run_result = run_with_polling(client, &mut stdin, &mut stdin_buf).await; + let run_result = run_with_polling( + client, + &mut stdin, + &mut stdin_buf, + &mut needs_init, + &mut exit_code, + ) + .await; // Clean up terminal let _ = terminal::disable_raw_mode(); + // Ensure cursor is visible with ANSI escape sequence + print!("\x1b[?25h"); + std::io::stdout().flush()?; + if let Some(code) = exit_code { std::process::exit(code); } @@ -43,29 +63,84 @@ async fn run_with_event_stream( client: &mut TerminalClient, stdin: &mut tokio::io::Stdin, stdin_buf: &mut [u8; 1024], + needs_init: &mut bool, + exit_code: &mut Option, ) -> Result<()> { let mut event_stream = crossterm::event::EventStream::new(); - let mut exit_code = None; loop { + // If reconnection happened and needs re-initialization, do it first + if *needs_init { + if let Err(e) = client.init_shell(None).await { + eprintln!("Failed to re-initialize shell: {}", e); + *exit_code = Some(1); + break; + } + *needs_init = false; + + // Reset terminal state + // Clear line and move cursor to beginning of line + print!("\r\x1B[K"); + std::io::stdout().flush()?; + + // After successful initialization and ready, send window size + if let Ok((cols, rows)) = terminal::size() { + if let Err(e) = client.send_window_size(cols, rows).await { + if !e.to_string().contains("Shell not ready yet") { + eprintln!("Failed to send window size: {}", e); + } + } + } + } + + // Check if the shell is ready for input + let is_ready = client.is_ready(); + select! { // Handle crossterm events for Windows with event-stream - maybe_event = event_stream.next().fuse() => { + maybe_event = event_stream.next().fuse(), if is_ready => { match maybe_event { Some(Ok(Event::Key(KeyEvent { code: KeyCode::Char('c'), modifiers, .. }))) if modifiers.contains(KeyModifiers::CONTROL) => { // Handle Ctrl+C like SIGINT - client.send_signal(2).await?; + match client.send_signal(2).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + return Err(e); + } + } + } continue; }, Some(Ok(Event::Resize(width, height))) => { // Handle terminal resize - client.send_window_size(width, height).await?; + match client.send_window_size(width, height).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + return Err(e); + } + } + } continue; }, Some(Ok(Event::Key(key))) => { // Handle key input if let Some(input) = key_event_to_string(key) { - client.send_data(&input).await?; + match client.send_data(&input).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + return Err(e); + } + } + } } }, Some(Err(e)) => { @@ -76,12 +151,21 @@ async fn run_with_event_stream( } }, - result = stdin.read(stdin_buf) => { + result = stdin.read(stdin_buf), if is_ready => { match result { Ok(0) => break, // EOF Ok(n) => { let data = String::from_utf8_lossy(&stdin_buf[..n]); - client.send_data(&data).await?; + match client.send_data(&data).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + return Err(e); + } + } + } } Err(e) => { eprintln!("Error reading from stdin: {}", e); @@ -94,23 +178,24 @@ async fn run_with_event_stream( result = client.handle_server_messages() => { match result { Ok(()) => { - exit_code = Some(0); + *exit_code = Some(0); break; } Err(e) => { - eprintln!("Error: {}", e); - exit_code = Some(1); - break; + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + continue; + } else { + eprintln!("Error: {}", e); + *exit_code = Some(1); + break; + } } } } } } - if let Some(code) = exit_code { - std::process::exit(code); - } - Ok(()) } @@ -119,18 +204,55 @@ async fn run_with_polling( client: &mut TerminalClient, stdin: &mut tokio::io::Stdin, stdin_buf: &mut [u8; 1024], + needs_init: &mut bool, + exit_code: &mut Option, ) -> Result<()> { let event_poll_timeout = Duration::from_millis(100); - let mut exit_code = None; loop { + // If reconnection happened and needs re-initialization, do it first + if *needs_init { + if let Err(e) = client.init_shell(None).await { + eprintln!("Failed to re-initialize shell: {}", e); + *exit_code = Some(1); + break; + } + *needs_init = false; + + // Reset terminal state + // Clear line and move cursor to beginning of line + print!("\r\x1B[K"); + std::io::stdout().flush()?; + + // After successful initialization and ready, send window size + if let Ok((cols, rows)) = terminal::size() { + if let Err(e) = client.send_window_size(cols, rows).await { + if !e.to_string().contains("Shell not ready yet") { + eprintln!("Failed to send window size: {}", e); + } + } + } + } + + // Check if the shell is ready for input + let is_ready = client.is_ready(); + select! { - result = stdin.read(stdin_buf) => { + result = stdin.read(stdin_buf), if is_ready => { match result { Ok(0) => break, // EOF Ok(n) => { let data = String::from_utf8_lossy(&stdin_buf[..n]); - client.send_data(&data).await?; + match client.send_data(&data).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + return Err(e); + } + } + } } Err(e) => { eprintln!("Error reading from stdin: {}", e); @@ -143,33 +265,65 @@ async fn run_with_polling( result = client.handle_server_messages() => { match result { Ok(()) => { - exit_code = Some(0); + *exit_code = Some(0); break; } Err(e) => { - eprintln!("Error: {}", e); - exit_code = Some(1); - break; + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + continue; + } else { + eprintln!("Error: {}", e); + *exit_code = Some(1); + break; + } } } } // Poll for crossterm events _ = tokio::time::sleep(event_poll_timeout) => { - if event::poll(Duration::from_millis(0))? { + if is_ready && event::poll(Duration::from_millis(0))? { match event::read()? { Event::Key(KeyEvent { code: KeyCode::Char('c'), modifiers, .. }) if modifiers.contains(KeyModifiers::CONTROL) => { // Handle Ctrl+C like SIGINT - client.send_signal(2).await?; + match client.send_signal(2).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + return Err(e); + } + } + } }, Event::Resize(width, height) => { // Handle terminal resize - client.send_window_size(width, height).await?; + match client.send_window_size(width, height).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + return Err(e); + } + } + } }, Event::Key(key) => { // Handle key input if let Some(input) = key_event_to_string(key) { - client.send_data(&input).await?; + match client.send_data(&input).await { + Ok(_) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + *needs_init = true; + } else if !e.to_string().contains("Shell not ready yet") { + return Err(e); + } + } + } } }, _ => {} @@ -179,10 +333,6 @@ async fn run_with_polling( } } - if let Some(code) = exit_code { - std::process::exit(code); - } - Ok(()) } diff --git a/src/controllers/terminal/client.rs b/src/controllers/terminal/client.rs index d79f240..cfcb8d6 100644 --- a/src/controllers/terminal/client.rs +++ b/src/controllers/terminal/client.rs @@ -1,19 +1,40 @@ use anyhow::{bail, Result}; -use async_tungstenite::tungstenite::Message; +use async_tungstenite::tungstenite::{Error as WsError, Message}; use async_tungstenite::WebSocketStream; use futures_util::stream::StreamExt; use std::io::Write; -use tokio::time::{interval, timeout, Duration}; +use std::time::{Duration as StdDuration, Instant}; +use tokio::time::{interval, sleep, timeout, Duration}; -use crate::commands::ssh::{SSH_MAX_EMPTY_MESSAGES, SSH_MESSAGE_TIMEOUT_SECS}; +use crate::commands::ssh::{ + SSH_IDLE_THRESHOLD_SECS, SSH_MAX_EMPTY_MESSAGES, SSH_MAX_RECONNECT_ATTEMPTS, + SSH_MESSAGE_TIMEOUT_SECS, SSH_RECONNECT_DELAY_MS, +}; use super::connection::{establish_connection, SSHConnectParams}; use super::messages::{ClientMessage, ClientPayload, DataPayload, ServerMessage}; use super::SSH_PING_INTERVAL_SECS; +struct ReconnectGuard<'a>(&'a std::sync::atomic::AtomicBool); + +impl<'a> Drop for ReconnectGuard<'a> { + fn drop(&mut self) { + self.0.store(false, std::sync::atomic::Ordering::Release); + } +} + pub struct TerminalClient { ws_stream: WebSocketStream, initialized: bool, + url: String, + token: String, + params: SSHConnectParams, + last_activity: Instant, + reconnecting: std::sync::atomic::AtomicBool, + reconnect_attempts: u8, + last_reconnect_time: Option, + ready: bool, + exec_command_mode: bool, } impl TerminalClient { @@ -23,6 +44,15 @@ impl TerminalClient { let mut client = Self { ws_stream, initialized: false, + url: url.to_string(), + token: token.to_string(), + params: params.clone(), + last_activity: Instant::now(), + reconnecting: std::sync::atomic::AtomicBool::new(false), + reconnect_attempts: 0, + last_reconnect_time: None, + ready: false, + exec_command_mode: false, }; // Wait for the initial welcome message from the server @@ -46,26 +76,195 @@ impl TerminalClient { } } + /// Check if the connection has been idle for the threshold period + fn is_idle(&self) -> bool { + Instant::now().duration_since(self.last_activity) + >= StdDuration::from_secs(SSH_IDLE_THRESHOLD_SECS) + } + + /// Check if we've recently attempted a reconnect + /// This helps us handle quick failures after reconnecting + fn is_recent_reconnect(&self) -> bool { + if let Some(last_time) = self.last_reconnect_time { + Instant::now().duration_since(last_time) < StdDuration::from_secs(5) + } else { + false + } + } + + /// Check if reconnection is allowed in the current mode + fn can_reconnect(&self) -> bool { + // Only allow reconnection in interactive shell mode, not in exec_command mode + !self.exec_command_mode + } + + /// Update the last activity timestamp + fn update_activity(&mut self) { + self.last_activity = Instant::now(); + } + + /// Attempts to reconnect to the server + async fn reconnect(&mut self) -> Result<()> { + // Skip reconnection if we're in exec_command mode + if !self.can_reconnect() { + bail!("Reconnection not allowed in exec_command mode"); + } + + // Don't allow concurrent reconnections + if self + .reconnecting + .swap(true, std::sync::atomic::Ordering::Acquire) + { + return Err(anyhow::anyhow!("Reconnection already in progress")); + } + + // Create a guard that will reset the flag when this function scope ends + let _guard = ReconnectGuard(&self.reconnecting); + + // Only try to reconnect if idle OR if we recently reconnected (handles quick failures) + if !self.is_idle() && !self.is_recent_reconnect() { + bail!("Connection issue, but not idle enough for reconnection"); + } + + // Check if we've exceeded the maximum reconnection attempts + if self.reconnect_attempts >= SSH_MAX_RECONNECT_ATTEMPTS { + bail!( + "Maximum reconnection attempts ({}) reached", + SSH_MAX_RECONNECT_ATTEMPTS + ); + } + + // Increment the reconnection attempt counter + self.reconnect_attempts += 1; + + // Update the last reconnect time + self.last_reconnect_time = Some(Instant::now()); + + // Add a small delay between reconnection attempts + if self.reconnect_attempts > 1 { + sleep(Duration::from_millis(SSH_RECONNECT_DELAY_MS)).await; + } + + match establish_connection(&self.url, &self.token, &self.params).await { + Ok(new_ws_stream) => { + self.ws_stream = new_ws_stream; + self.ready = false; + + // Wait for welcome message + if let Some(msg_result) = self.ws_stream.next().await { + if let Ok(Message::Text(text)) = msg_result { + if let Ok(server_msg) = serde_json::from_str::(&text) { + if server_msg.r#type == "welcome" { + // Reset initialized state + self.initialized = false; + + // Clear the current line completely and move to beginning of line + print!("\r\x1B[K"); + std::io::stdout().flush().ok(); + + // Ensure cursor is visible + print!("\x1b[?25h"); // Show cursor escape sequence + std::io::stdout().flush().ok(); + + // Signal that we need re-initialization + bail!("reconnected but needs re-initialization"); + } + } + } + } + bail!("Didn't receive proper welcome message after reconnection"); + } + Err(e) => { + bail!( + "Failed to reconnect (attempt {}/{}): {}", + self.reconnect_attempts, + SSH_MAX_RECONNECT_ATTEMPTS, + e + ); + } + } + } + /// Sends a WebSocket message async fn send_message(&mut self, msg: Message) -> Result<()> { - timeout( + // Check if the message is a ping for special handling afterwards + let is_ping = matches!(msg, Message::Ping(_)); + + if !self.initialized && !is_ping && !matches!(msg, Message::Pong(_)) { + if let Message::Text(text) = &msg { + if let Ok(value) = serde_json::from_str::(text) { + if let Some(msg_type) = value.get("type").and_then(|v| v.as_str()) { + if msg_type != "window_resize" + && msg_type != "init_shell" + && msg_type != "exec_command" + { + bail!("Session not initialized"); + } + } else { + bail!("Session not initialized"); + } + } else { + bail!("Session not initialized"); + } + } else { + bail!("Session not initialized"); + } + } + + // Don't update activity time for pings to avoid interfering with idle detection + if !is_ping { + self.update_activity(); + } + + match timeout( Duration::from_secs(SSH_MESSAGE_TIMEOUT_SECS), self.ws_stream.send(msg), ) .await - .map_err(|_| { - anyhow::anyhow!( + { + Ok(Ok(_)) => { + // On successful message, reset reconnect attempts if it's not a ping + // We want to keep the counter for recent quick failures + if !is_ping && !self.is_recent_reconnect() { + self.reconnect_attempts = 0; + } + Ok(()) + } + Ok(Err(e)) => { + // If connection error, try to reconnect + match &e { + WsError::ConnectionClosed + | WsError::AlreadyClosed + | WsError::Protocol(_) + | WsError::Io(_) => { + // Only try to reconnect if we're in interactive mode + if self.can_reconnect() { + self.reconnect().await + } else { + Err(anyhow::anyhow!( + "Connection error in exec_command mode: {}", + e + )) + } + } + _ => Err(anyhow::anyhow!("Failed to send message: {}", e)), + } + } + Err(_) => Err(anyhow::anyhow!( "Message send timed out after {} seconds", SSH_MESSAGE_TIMEOUT_SECS - ) - })??; - Ok(()) + )), + } } - /// Initializes an interactive shell session + /// Initializes an interactive shell session and waits for the ready response pub async fn init_shell(&mut self, shell: Option) -> Result<()> { - if self.initialized { - bail!("Session already initialized"); + // Set to interactive shell mode + self.exec_command_mode = false; + + // Allow re-initialization + if self.initialized && shell.is_none() { + return Ok(()); } let message = ClientMessage { @@ -79,11 +278,63 @@ impl TerminalClient { .map_err(|e| anyhow::anyhow!("Failed to initialize shell: {}", e))?; self.initialized = true; + self.ready = false; + + // Wait for the ready response + let timeout_duration = Duration::from_secs(10); // 10 seconds timeout + let start_time = Instant::now(); + + while !self.ready { + if start_time.elapsed() > StdDuration::from_secs(10) { + bail!("Timed out waiting for ready response from server"); + } + + if let Some(msg_result) = timeout(timeout_duration, self.ws_stream.next()).await? { + let msg = msg_result.map_err(|e| anyhow::anyhow!("WebSocket error: {}", e))?; + + if let Message::Text(text) = msg { + let server_msg: ServerMessage = serde_json::from_str(&text) + .map_err(|e| anyhow::anyhow!("Failed to parse server message: {}", e))?; + + match server_msg.r#type.as_str() { + "ready" => { + self.ready = true; + break; + } + "session_data" => { + // Echo any data received while waiting for ready + match server_msg.payload.data { + DataPayload::String(text) => { + print!("{}", text); + std::io::stdout().flush()?; + } + DataPayload::Buffer { data } => { + std::io::stdout().write_all(&data)?; + std::io::stdout().flush()?; + } + DataPayload::Empty {} => {} + } + } + "error" => { + bail!("Error initializing shell: {}", server_msg.payload.message); + } + _ => { + // Ignore other message types while waiting for ready + } + } + } + } else { + bail!("Connection closed while waiting for ready response"); + } + } + Ok(()) } /// Executes a single command pub async fn send_command(&mut self, command: &str, args: Vec) -> Result<()> { + self.exec_command_mode = true; + if self.initialized { bail!("Session already initialized"); } @@ -103,6 +354,8 @@ impl TerminalClient { .map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?; self.initialized = true; + self.ready = true; + Ok(()) } @@ -112,6 +365,10 @@ impl TerminalClient { bail!("Session not initialized"); } + if !self.ready { + bail!("Shell not ready yet"); + } + let message = ClientMessage { r#type: "session_data".to_string(), payload: ClientPayload::Data { @@ -128,6 +385,10 @@ impl TerminalClient { /// Updates the terminal window size pub async fn send_window_size(&mut self, cols: u16, rows: u16) -> Result<()> { + if self.initialized && !self.ready { + bail!("Shell not ready yet"); + } + let message = ClientMessage { r#type: "window_resize".to_string(), payload: ClientPayload::WindowSize { cols, rows }, @@ -146,6 +407,10 @@ impl TerminalClient { bail!("Session not initialized"); } + if !self.ready { + bail!("Shell not ready yet"); + } + let message = ClientMessage { r#type: "signal".to_string(), payload: ClientPayload::Signal { signal }, @@ -169,15 +434,13 @@ impl TerminalClient { /// Process incoming messages from the server pub async fn handle_server_messages(&mut self) -> Result<()> { let mut consecutive_empty_messages = 0; - let mut ping_interval = interval(Duration::from_secs(SSH_PING_INTERVAL_SECS)); loop { tokio::select! { msg_option = self.ws_stream.next() => { match msg_option { - Some(msg_result) => { - let msg = msg_result.map_err(|e| anyhow::anyhow!("WebSocket error: {}", e))?; + Some(Ok(msg)) => { match msg { Message::Text(text) => { let server_msg: ServerMessage = serde_json::from_str(&text) @@ -189,11 +452,24 @@ impl TerminalClient { consecutive_empty_messages = 0; print!("{}", text); std::io::stdout().flush()?; + // Reset reconnect attempts on successful data + // only if not a recent reconnect + if !self.is_recent_reconnect() { + self.reconnect_attempts = 0; + } + self.update_activity(); } DataPayload::Buffer { data } => { consecutive_empty_messages = 0; std::io::stdout().write_all(&data)?; std::io::stdout().flush()?; + // Update activity when receiving data + self.update_activity(); + // Reset reconnect attempts on successful data + // only if not a recent reconnect + if !self.is_recent_reconnect() { + self.reconnect_attempts = 0; + } } DataPayload::Empty {} => { consecutive_empty_messages += 1; @@ -202,6 +478,16 @@ impl TerminalClient { } } }, + "ready" => { + // Client can start sending data/events + self.ready = true; + self.update_activity(); + }, + "stand_by" => { + // This indicates command is in progress + self.ready = true; + self.update_activity(); + }, "command_exit" => { if let Some(code) = server_msg.payload.code { std::io::stdout().flush()?; @@ -213,10 +499,46 @@ impl TerminalClient { } }, "error" => { - bail!(server_msg.payload.message); - } + // Check if this is the specific connection closed error + if server_msg.payload.message.contains("Connection to application unexpectedly closed") { + // Only try to reconnect if we're in interactive mode and either idle or recently reconnected + if self.can_reconnect() && (self.is_idle() || self.is_recent_reconnect()) { + // Try to reconnect + match self.reconnect().await { + Ok(()) => {}, // Successfully reconnected + Err(reconnect_err) => { + if reconnect_err.to_string().contains("reconnected but needs re-initialization") { + bail!("{}", reconnect_err); + } + if reconnect_err.to_string().contains("Maximum reconnection attempts") { + bail!("Connection to application closed. (Max reconnects reached)"); + } + bail!("Connection to application closed. (Reconnect failed: {})", reconnect_err); + } + } + } else { + // Not in interactive mode or not idle, so just report the error + bail!(server_msg.payload.message); + } + } else { + // This is some other error, bail directly + bail!(server_msg.payload.message); + } + }, "welcome" => { - // Ignore welcome messages after initialization + // If we get a welcome message and we're already initialized, + // it could mean the server restarted our session. + if self.initialized { + self.initialized = false; + self.ready = false; + bail!("reconnected but needs re-initialization"); + } + // Reset reconnect attempts on welcome message + // only if it's been some time since the last reconnect + if !self.is_recent_reconnect() { + self.reconnect_attempts = 0; + } + self.update_activity(); } "pty_closed" => { return Ok(()); @@ -227,40 +549,139 @@ impl TerminalClient { } } Message::Close(frame) => { - if let Some(frame) = frame { - bail!( - "WebSocket closed with code {}: {}", - frame.code, - frame.reason - ); + // Only try to reconnect if we're in interactive mode + if self.can_reconnect() { + // Try to reconnect on close + match self.reconnect().await { + Ok(()) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + bail!("{}", e); + } + if e.to_string().contains("Maximum reconnection attempts") { + if let Some(frame) = frame { + bail!("WebSocket closed with code {}: {} (Max reconnects reached)", + frame.code, frame.reason); + } else { + bail!("WebSocket closed unexpectedly (Max reconnects reached)"); + } + } + if let Some(frame) = frame { + bail!("WebSocket closed with code {}: {} (Reconnect failed: {})", + frame.code, frame.reason, e); + } else { + bail!("WebSocket closed unexpectedly and reconnect failed: {}", e); + } + } + } } else { - bail!("WebSocket closed unexpectedly"); + // In exec_command mode, just report the close + if let Some(frame) = frame { + bail!("WebSocket closed with code {}: {} (exec_command mode)", + frame.code, frame.reason); + } else { + bail!("WebSocket closed unexpectedly (exec_command mode)"); + } } } Message::Ping(data) => { - self.send_message(Message::Pong(data)).await?; + self.update_activity(); + if let Err(e) = self.send_message(Message::Pong(data)).await { + if e.to_string().contains("reconnected but needs re-initialization") { + bail!("{}", e); + } + if e.to_string().contains("Maximum reconnection attempts") { + bail!("Max reconnects reached: {}", e); + } + return Err(e); + } } Message::Pong(_) => { // Pong received, connection is still alive + // Don't update activity time for pongs } - Message::Binary(_) => { - eprintln!("Warning: Unexpected binary message received"); - } - Message::Frame(_) => { - eprintln!("Warning: Unexpected raw frame received"); + _ => {} + } + }, + Some(Err(e)) => { + // Only try to reconnect if we're in interactive mode + if self.can_reconnect() { + // Try to reconnect on error + match self.reconnect().await { + Ok(()) => {}, + Err(reconnect_err) => { + if reconnect_err.to_string().contains("reconnected but needs re-initialization") { + bail!("{}", reconnect_err); + } + if reconnect_err.to_string().contains("Maximum reconnection attempts") { + bail!("WebSocket error: {} (Max reconnects reached)", e); + } + bail!("WebSocket error: {} (Reconnect failed: {})", e, reconnect_err); + } } + } else { + // In exec_command mode, just report the error + bail!("WebSocket error in exec_command mode: {}", e); } }, None => { - bail!("WebSocket connection closed unexpectedly"); + // Only try to reconnect if we're in interactive mode + if self.can_reconnect() { + // Try to reconnect on connection close + match self.reconnect().await { + Ok(()) => {}, + Err(e) => { + if e.to_string().contains("reconnected but needs re-initialization") { + bail!("{}", e); + } + if e.to_string().contains("Maximum reconnection attempts") { + bail!("WebSocket connection closed unexpectedly (Max reconnects reached)"); + } + bail!("WebSocket connection closed unexpectedly and reconnect failed: {}", e); + } + } + } else { + // In exec_command mode, just report the close + bail!("WebSocket connection closed unexpectedly (exec_command mode)"); + } } } }, - _ = ping_interval.tick() => { - self.send_ping().await?; + if let Err(e) = self.send_ping().await { + if e.to_string().contains("reconnected but needs re-initialization") { + bail!("{}", e); + } + if e.to_string().contains("Maximum reconnection attempts") { + bail!("Max reconnects reached: {}", e); + } + // Only try to reconnect if we're in interactive mode + if self.can_reconnect() { + // If ping fails, try to reconnect + match self.reconnect().await { + Ok(()) => {}, + Err(reconnect_err) => { + if reconnect_err.to_string().contains("reconnected but needs re-initialization") { + bail!("{}", reconnect_err); + } + if reconnect_err.to_string().contains("Maximum reconnection attempts") { + bail!("Ping failed (Max reconnects reached)"); + } + bail!("Ping failed: {} (Reconnect failed: {})", e, reconnect_err); + } + } + } else { + // In exec_command mode, just report the ping failure + bail!("Ping failed in exec_command mode: {}", e); + } + } } } } } + + /// Check if the shell is ready for input + pub fn is_ready(&self) -> bool { + self.ready + } } diff --git a/src/controllers/terminal/connection.rs b/src/controllers/terminal/connection.rs index 802572d..18a2ccc 100644 --- a/src/controllers/terminal/connection.rs +++ b/src/controllers/terminal/connection.rs @@ -6,7 +6,7 @@ use tokio::time::{sleep, timeout, Duration}; use url::Url; use crate::commands::ssh::{ - SSH_CONNECTION_TIMEOUT_SECS, SSH_MAX_RECONNECT_ATTEMPTS, SSH_RECONNECT_DELAY_SECS, + SSH_CONNECTION_TIMEOUT_SECS, SSH_CONNECT_DELAY_SECS, SSH_MAX_CONNECT_ATTEMPTS, }; use crate::consts::get_user_agent; @@ -26,24 +26,20 @@ pub async fn establish_connection( ) -> Result> { let url = Url::parse(url)?; - for attempt in 1..=SSH_MAX_RECONNECT_ATTEMPTS { + for attempt in 1..=SSH_MAX_CONNECT_ATTEMPTS { match attempt_connection(&url, token, params).await { Ok(ws_stream) => { return Ok(ws_stream); } Err(e) => { - if attempt == SSH_MAX_RECONNECT_ATTEMPTS { + if attempt == SSH_MAX_CONNECT_ATTEMPTS { bail!( "Failed to establish connection after {} attempts: {}", - SSH_MAX_RECONNECT_ATTEMPTS, + SSH_MAX_CONNECT_ATTEMPTS, e ); } - eprintln!( - "Connection attempt {} failed: {}. Retrying in {} seconds...", - attempt, e, SSH_RECONNECT_DELAY_SECS - ); - sleep(Duration::from_secs(SSH_RECONNECT_DELAY_SECS)).await; + sleep(Duration::from_secs(SSH_CONNECT_DELAY_SECS)).await; } } }