This commit is contained in:
Miguel Casqueira 2025-04-02 17:53:27 -04:00
parent 4eeae97e4f
commit c9c10e3e20
5 changed files with 741 additions and 87 deletions

View file

@ -6,9 +6,12 @@ use crate::config::Configs;
pub const SSH_CONNECTION_TIMEOUT_SECS: u64 = 30;
pub const SSH_MESSAGE_TIMEOUT_SECS: u64 = 10;
pub const SSH_MAX_RECONNECT_ATTEMPTS: usize = 3;
pub const SSH_RECONNECT_DELAY_SECS: u64 = 5;
pub const SSH_MAX_CONNECT_ATTEMPTS: usize = 3;
pub const SSH_CONNECT_DELAY_SECS: u64 = 5;
pub const SSH_MAX_EMPTY_MESSAGES: usize = 100;
pub const SSH_IDLE_THRESHOLD_SECS: u64 = 5;
pub const SSH_MAX_RECONNECT_ATTEMPTS: u8 = 10;
pub const SSH_RECONNECT_DELAY_MS: u64 = 1000;
mod common;
mod platform;

View file

@ -1,5 +1,6 @@
use anyhow::Result;
use crossterm::terminal;
use std::io::Write;
use tokio::io::AsyncReadExt;
use tokio::select;
@ -23,38 +24,112 @@ pub async fn run_interactive_session(client: &mut TerminalClient) -> Result<()>
let mut stdin = tokio::io::stdin();
let mut stdin_buf = [0u8; 1024];
let mut exit_code = None;
let mut needs_init = false;
let (mut sigint, mut sigterm, mut sigwinch) = setup_signal_handlers().await?;
// Main event loop
loop {
// If reconnection happened and needs re-initialization, do it first
if needs_init {
if let Err(e) = client.init_shell(None).await {
eprintln!("Failed to re-initialize shell: {}", e);
exit_code = Some(1);
break;
}
needs_init = false;
// Reset terminal state
// Clear line and move cursor to beginning of line
print!("\r\x1B[K");
std::io::stdout().flush()?;
// After re-initialization, send window size only if shell is ready
if client.is_ready() {
if let Ok((cols, rows)) = terminal::size() {
if let Err(e) = client.send_window_size(cols, rows).await {
if !e.to_string().contains("Shell not ready yet") {
eprintln!("Failed to update window size: {}", e);
}
}
}
}
}
// Check if shell is ready for input
let is_ready = client.is_ready();
select! {
// Handle window resizes
_ = sigwinch.recv() => {
if let Ok((cols, rows)) = terminal::size() {
client.send_window_size(cols, rows).await?;
if is_ready {
match client.send_window_size(cols, rows).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
eprintln!("Failed to update window size: {}", e);
}
}
}
}
}
continue;
}
// Handle signals
_ = sigint.recv() => {
client.send_signal(2).await?; // SIGINT
if is_ready {
match client.send_signal(2).await { // SIGINT
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
eprintln!("Failed to send SIGINT: {}", e);
}
}
}
}
continue;
}
_ = sigterm.recv() => {
client.send_signal(15).await?; // SIGTERM
if is_ready {
match client.send_signal(15).await { // SIGTERM
Ok(_) => {},
Err(e) => {
if !e.to_string().contains("reconnected but needs re-initialization")
&& !e.to_string().contains("Shell not ready yet") {
eprintln!("Failed to send SIGTERM: {}", e);
}
}
}
}
break;
}
// Handle input from terminal
result = stdin.read(&mut stdin_buf) => {
// Handle input from terminal only if shell is ready
result = stdin.read(&mut stdin_buf), if is_ready => {
match result {
Ok(0) => break, // EOF
Ok(n) => {
let data = String::from_utf8_lossy(&stdin_buf[..n]);
client.send_data(&data).await?;
match client.send_data(&data).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
eprintln!("Error sending data: {}", e);
exit_code = Some(1);
break;
}
}
}
}
Err(e) => {
eprintln!("Error reading from stdin: {}", e);
exit_code = Some(1);
break;
}
}
@ -62,14 +137,19 @@ pub async fn run_interactive_session(client: &mut TerminalClient) -> Result<()>
// Handle messages from server
result = client.handle_server_messages() => {
match result {
Ok(()) => {
Ok(()) => {
exit_code = Some(0);
break;
}
Err(e) => {
eprintln!("Error: {}", e);
exit_code = Some(1);
break;
if e.to_string().contains("reconnected but needs re-initialization") {
needs_init = true;
continue;
} else {
eprintln!("Error: {}", e);
exit_code = Some(1);
break;
}
}
}
}
@ -79,6 +159,10 @@ pub async fn run_interactive_session(client: &mut TerminalClient) -> Result<()>
// Clean up terminal when done
let _ = terminal::disable_raw_mode();
// Ensure cursor is visible with ANSI escape sequence
print!("\x1b[?25h");
std::io::stdout().flush()?;
if let Some(code) = exit_code {
std::process::exit(code);
}

View file

@ -2,6 +2,7 @@ use anyhow::Result;
use crossterm::event::{self, Event, KeyCode, KeyEvent, KeyModifiers};
use crossterm::terminal;
use futures_util::stream::StreamExt;
use std::io::Write;
use tokio::io::AsyncReadExt;
use tokio::select;
use tokio::time::Duration;
@ -18,19 +19,38 @@ pub async fn run_interactive_session(client: &mut TerminalClient) -> Result<()>
let mut stdin = tokio::io::stdin();
let mut stdin_buf = [0u8; 1024];
let mut exit_code = None;
let mut needs_init = false;
let _ = setup_signal_handlers().await?;
// Event handling differs based on available features
#[cfg(feature = "event-stream")]
let run_result = run_with_event_stream(client, &mut stdin, &mut stdin_buf).await;
let run_result = run_with_event_stream(
client,
&mut stdin,
&mut stdin_buf,
&mut needs_init,
&mut exit_code,
)
.await;
#[cfg(not(feature = "event-stream"))]
let run_result = run_with_polling(client, &mut stdin, &mut stdin_buf).await;
let run_result = run_with_polling(
client,
&mut stdin,
&mut stdin_buf,
&mut needs_init,
&mut exit_code,
)
.await;
// Clean up terminal
let _ = terminal::disable_raw_mode();
// Ensure cursor is visible with ANSI escape sequence
print!("\x1b[?25h");
std::io::stdout().flush()?;
if let Some(code) = exit_code {
std::process::exit(code);
}
@ -43,29 +63,84 @@ async fn run_with_event_stream(
client: &mut TerminalClient,
stdin: &mut tokio::io::Stdin,
stdin_buf: &mut [u8; 1024],
needs_init: &mut bool,
exit_code: &mut Option<i32>,
) -> Result<()> {
let mut event_stream = crossterm::event::EventStream::new();
let mut exit_code = None;
loop {
// If reconnection happened and needs re-initialization, do it first
if *needs_init {
if let Err(e) = client.init_shell(None).await {
eprintln!("Failed to re-initialize shell: {}", e);
*exit_code = Some(1);
break;
}
*needs_init = false;
// Reset terminal state
// Clear line and move cursor to beginning of line
print!("\r\x1B[K");
std::io::stdout().flush()?;
// After successful initialization and ready, send window size
if let Ok((cols, rows)) = terminal::size() {
if let Err(e) = client.send_window_size(cols, rows).await {
if !e.to_string().contains("Shell not ready yet") {
eprintln!("Failed to send window size: {}", e);
}
}
}
}
// Check if the shell is ready for input
let is_ready = client.is_ready();
select! {
// Handle crossterm events for Windows with event-stream
maybe_event = event_stream.next().fuse() => {
maybe_event = event_stream.next().fuse(), if is_ready => {
match maybe_event {
Some(Ok(Event::Key(KeyEvent { code: KeyCode::Char('c'), modifiers, .. }))) if modifiers.contains(KeyModifiers::CONTROL) => {
// Handle Ctrl+C like SIGINT
client.send_signal(2).await?;
match client.send_signal(2).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
return Err(e);
}
}
}
continue;
},
Some(Ok(Event::Resize(width, height))) => {
// Handle terminal resize
client.send_window_size(width, height).await?;
match client.send_window_size(width, height).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
return Err(e);
}
}
}
continue;
},
Some(Ok(Event::Key(key))) => {
// Handle key input
if let Some(input) = key_event_to_string(key) {
client.send_data(&input).await?;
match client.send_data(&input).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
return Err(e);
}
}
}
}
},
Some(Err(e)) => {
@ -76,12 +151,21 @@ async fn run_with_event_stream(
}
},
result = stdin.read(stdin_buf) => {
result = stdin.read(stdin_buf), if is_ready => {
match result {
Ok(0) => break, // EOF
Ok(n) => {
let data = String::from_utf8_lossy(&stdin_buf[..n]);
client.send_data(&data).await?;
match client.send_data(&data).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
return Err(e);
}
}
}
}
Err(e) => {
eprintln!("Error reading from stdin: {}", e);
@ -94,23 +178,24 @@ async fn run_with_event_stream(
result = client.handle_server_messages() => {
match result {
Ok(()) => {
exit_code = Some(0);
*exit_code = Some(0);
break;
}
Err(e) => {
eprintln!("Error: {}", e);
exit_code = Some(1);
break;
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
continue;
} else {
eprintln!("Error: {}", e);
*exit_code = Some(1);
break;
}
}
}
}
}
}
if let Some(code) = exit_code {
std::process::exit(code);
}
Ok(())
}
@ -119,18 +204,55 @@ async fn run_with_polling(
client: &mut TerminalClient,
stdin: &mut tokio::io::Stdin,
stdin_buf: &mut [u8; 1024],
needs_init: &mut bool,
exit_code: &mut Option<i32>,
) -> Result<()> {
let event_poll_timeout = Duration::from_millis(100);
let mut exit_code = None;
loop {
// If reconnection happened and needs re-initialization, do it first
if *needs_init {
if let Err(e) = client.init_shell(None).await {
eprintln!("Failed to re-initialize shell: {}", e);
*exit_code = Some(1);
break;
}
*needs_init = false;
// Reset terminal state
// Clear line and move cursor to beginning of line
print!("\r\x1B[K");
std::io::stdout().flush()?;
// After successful initialization and ready, send window size
if let Ok((cols, rows)) = terminal::size() {
if let Err(e) = client.send_window_size(cols, rows).await {
if !e.to_string().contains("Shell not ready yet") {
eprintln!("Failed to send window size: {}", e);
}
}
}
}
// Check if the shell is ready for input
let is_ready = client.is_ready();
select! {
result = stdin.read(stdin_buf) => {
result = stdin.read(stdin_buf), if is_ready => {
match result {
Ok(0) => break, // EOF
Ok(n) => {
let data = String::from_utf8_lossy(&stdin_buf[..n]);
client.send_data(&data).await?;
match client.send_data(&data).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
return Err(e);
}
}
}
}
Err(e) => {
eprintln!("Error reading from stdin: {}", e);
@ -143,33 +265,65 @@ async fn run_with_polling(
result = client.handle_server_messages() => {
match result {
Ok(()) => {
exit_code = Some(0);
*exit_code = Some(0);
break;
}
Err(e) => {
eprintln!("Error: {}", e);
exit_code = Some(1);
break;
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
continue;
} else {
eprintln!("Error: {}", e);
*exit_code = Some(1);
break;
}
}
}
}
// Poll for crossterm events
_ = tokio::time::sleep(event_poll_timeout) => {
if event::poll(Duration::from_millis(0))? {
if is_ready && event::poll(Duration::from_millis(0))? {
match event::read()? {
Event::Key(KeyEvent { code: KeyCode::Char('c'), modifiers, .. }) if modifiers.contains(KeyModifiers::CONTROL) => {
// Handle Ctrl+C like SIGINT
client.send_signal(2).await?;
match client.send_signal(2).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
return Err(e);
}
}
}
},
Event::Resize(width, height) => {
// Handle terminal resize
client.send_window_size(width, height).await?;
match client.send_window_size(width, height).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
return Err(e);
}
}
}
},
Event::Key(key) => {
// Handle key input
if let Some(input) = key_event_to_string(key) {
client.send_data(&input).await?;
match client.send_data(&input).await {
Ok(_) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
*needs_init = true;
} else if !e.to_string().contains("Shell not ready yet") {
return Err(e);
}
}
}
}
},
_ => {}
@ -179,10 +333,6 @@ async fn run_with_polling(
}
}
if let Some(code) = exit_code {
std::process::exit(code);
}
Ok(())
}

View file

@ -1,19 +1,40 @@
use anyhow::{bail, Result};
use async_tungstenite::tungstenite::Message;
use async_tungstenite::tungstenite::{Error as WsError, Message};
use async_tungstenite::WebSocketStream;
use futures_util::stream::StreamExt;
use std::io::Write;
use tokio::time::{interval, timeout, Duration};
use std::time::{Duration as StdDuration, Instant};
use tokio::time::{interval, sleep, timeout, Duration};
use crate::commands::ssh::{SSH_MAX_EMPTY_MESSAGES, SSH_MESSAGE_TIMEOUT_SECS};
use crate::commands::ssh::{
SSH_IDLE_THRESHOLD_SECS, SSH_MAX_EMPTY_MESSAGES, SSH_MAX_RECONNECT_ATTEMPTS,
SSH_MESSAGE_TIMEOUT_SECS, SSH_RECONNECT_DELAY_MS,
};
use super::connection::{establish_connection, SSHConnectParams};
use super::messages::{ClientMessage, ClientPayload, DataPayload, ServerMessage};
use super::SSH_PING_INTERVAL_SECS;
struct ReconnectGuard<'a>(&'a std::sync::atomic::AtomicBool);
impl<'a> Drop for ReconnectGuard<'a> {
fn drop(&mut self) {
self.0.store(false, std::sync::atomic::Ordering::Release);
}
}
pub struct TerminalClient {
ws_stream: WebSocketStream<async_tungstenite::tokio::ConnectStream>,
initialized: bool,
url: String,
token: String,
params: SSHConnectParams,
last_activity: Instant,
reconnecting: std::sync::atomic::AtomicBool,
reconnect_attempts: u8,
last_reconnect_time: Option<Instant>,
ready: bool,
exec_command_mode: bool,
}
impl TerminalClient {
@ -23,6 +44,15 @@ impl TerminalClient {
let mut client = Self {
ws_stream,
initialized: false,
url: url.to_string(),
token: token.to_string(),
params: params.clone(),
last_activity: Instant::now(),
reconnecting: std::sync::atomic::AtomicBool::new(false),
reconnect_attempts: 0,
last_reconnect_time: None,
ready: false,
exec_command_mode: false,
};
// Wait for the initial welcome message from the server
@ -46,26 +76,195 @@ impl TerminalClient {
}
}
/// Check if the connection has been idle for the threshold period
fn is_idle(&self) -> bool {
Instant::now().duration_since(self.last_activity)
>= StdDuration::from_secs(SSH_IDLE_THRESHOLD_SECS)
}
/// Check if we've recently attempted a reconnect
/// This helps us handle quick failures after reconnecting
fn is_recent_reconnect(&self) -> bool {
if let Some(last_time) = self.last_reconnect_time {
Instant::now().duration_since(last_time) < StdDuration::from_secs(5)
} else {
false
}
}
/// Check if reconnection is allowed in the current mode
fn can_reconnect(&self) -> bool {
// Only allow reconnection in interactive shell mode, not in exec_command mode
!self.exec_command_mode
}
/// Update the last activity timestamp
fn update_activity(&mut self) {
self.last_activity = Instant::now();
}
/// Attempts to reconnect to the server
async fn reconnect(&mut self) -> Result<()> {
// Skip reconnection if we're in exec_command mode
if !self.can_reconnect() {
bail!("Reconnection not allowed in exec_command mode");
}
// Don't allow concurrent reconnections
if self
.reconnecting
.swap(true, std::sync::atomic::Ordering::Acquire)
{
return Err(anyhow::anyhow!("Reconnection already in progress"));
}
// Create a guard that will reset the flag when this function scope ends
let _guard = ReconnectGuard(&self.reconnecting);
// Only try to reconnect if idle OR if we recently reconnected (handles quick failures)
if !self.is_idle() && !self.is_recent_reconnect() {
bail!("Connection issue, but not idle enough for reconnection");
}
// Check if we've exceeded the maximum reconnection attempts
if self.reconnect_attempts >= SSH_MAX_RECONNECT_ATTEMPTS {
bail!(
"Maximum reconnection attempts ({}) reached",
SSH_MAX_RECONNECT_ATTEMPTS
);
}
// Increment the reconnection attempt counter
self.reconnect_attempts += 1;
// Update the last reconnect time
self.last_reconnect_time = Some(Instant::now());
// Add a small delay between reconnection attempts
if self.reconnect_attempts > 1 {
sleep(Duration::from_millis(SSH_RECONNECT_DELAY_MS)).await;
}
match establish_connection(&self.url, &self.token, &self.params).await {
Ok(new_ws_stream) => {
self.ws_stream = new_ws_stream;
self.ready = false;
// Wait for welcome message
if let Some(msg_result) = self.ws_stream.next().await {
if let Ok(Message::Text(text)) = msg_result {
if let Ok(server_msg) = serde_json::from_str::<ServerMessage>(&text) {
if server_msg.r#type == "welcome" {
// Reset initialized state
self.initialized = false;
// Clear the current line completely and move to beginning of line
print!("\r\x1B[K");
std::io::stdout().flush().ok();
// Ensure cursor is visible
print!("\x1b[?25h"); // Show cursor escape sequence
std::io::stdout().flush().ok();
// Signal that we need re-initialization
bail!("reconnected but needs re-initialization");
}
}
}
}
bail!("Didn't receive proper welcome message after reconnection");
}
Err(e) => {
bail!(
"Failed to reconnect (attempt {}/{}): {}",
self.reconnect_attempts,
SSH_MAX_RECONNECT_ATTEMPTS,
e
);
}
}
}
/// Sends a WebSocket message
async fn send_message(&mut self, msg: Message) -> Result<()> {
timeout(
// Check if the message is a ping for special handling afterwards
let is_ping = matches!(msg, Message::Ping(_));
if !self.initialized && !is_ping && !matches!(msg, Message::Pong(_)) {
if let Message::Text(text) = &msg {
if let Ok(value) = serde_json::from_str::<serde_json::Value>(text) {
if let Some(msg_type) = value.get("type").and_then(|v| v.as_str()) {
if msg_type != "window_resize"
&& msg_type != "init_shell"
&& msg_type != "exec_command"
{
bail!("Session not initialized");
}
} else {
bail!("Session not initialized");
}
} else {
bail!("Session not initialized");
}
} else {
bail!("Session not initialized");
}
}
// Don't update activity time for pings to avoid interfering with idle detection
if !is_ping {
self.update_activity();
}
match timeout(
Duration::from_secs(SSH_MESSAGE_TIMEOUT_SECS),
self.ws_stream.send(msg),
)
.await
.map_err(|_| {
anyhow::anyhow!(
{
Ok(Ok(_)) => {
// On successful message, reset reconnect attempts if it's not a ping
// We want to keep the counter for recent quick failures
if !is_ping && !self.is_recent_reconnect() {
self.reconnect_attempts = 0;
}
Ok(())
}
Ok(Err(e)) => {
// If connection error, try to reconnect
match &e {
WsError::ConnectionClosed
| WsError::AlreadyClosed
| WsError::Protocol(_)
| WsError::Io(_) => {
// Only try to reconnect if we're in interactive mode
if self.can_reconnect() {
self.reconnect().await
} else {
Err(anyhow::anyhow!(
"Connection error in exec_command mode: {}",
e
))
}
}
_ => Err(anyhow::anyhow!("Failed to send message: {}", e)),
}
}
Err(_) => Err(anyhow::anyhow!(
"Message send timed out after {} seconds",
SSH_MESSAGE_TIMEOUT_SECS
)
})??;
Ok(())
)),
}
}
/// Initializes an interactive shell session
/// Initializes an interactive shell session and waits for the ready response
pub async fn init_shell(&mut self, shell: Option<String>) -> Result<()> {
if self.initialized {
bail!("Session already initialized");
// Set to interactive shell mode
self.exec_command_mode = false;
// Allow re-initialization
if self.initialized && shell.is_none() {
return Ok(());
}
let message = ClientMessage {
@ -79,11 +278,63 @@ impl TerminalClient {
.map_err(|e| anyhow::anyhow!("Failed to initialize shell: {}", e))?;
self.initialized = true;
self.ready = false;
// Wait for the ready response
let timeout_duration = Duration::from_secs(10); // 10 seconds timeout
let start_time = Instant::now();
while !self.ready {
if start_time.elapsed() > StdDuration::from_secs(10) {
bail!("Timed out waiting for ready response from server");
}
if let Some(msg_result) = timeout(timeout_duration, self.ws_stream.next()).await? {
let msg = msg_result.map_err(|e| anyhow::anyhow!("WebSocket error: {}", e))?;
if let Message::Text(text) = msg {
let server_msg: ServerMessage = serde_json::from_str(&text)
.map_err(|e| anyhow::anyhow!("Failed to parse server message: {}", e))?;
match server_msg.r#type.as_str() {
"ready" => {
self.ready = true;
break;
}
"session_data" => {
// Echo any data received while waiting for ready
match server_msg.payload.data {
DataPayload::String(text) => {
print!("{}", text);
std::io::stdout().flush()?;
}
DataPayload::Buffer { data } => {
std::io::stdout().write_all(&data)?;
std::io::stdout().flush()?;
}
DataPayload::Empty {} => {}
}
}
"error" => {
bail!("Error initializing shell: {}", server_msg.payload.message);
}
_ => {
// Ignore other message types while waiting for ready
}
}
}
} else {
bail!("Connection closed while waiting for ready response");
}
}
Ok(())
}
/// Executes a single command
pub async fn send_command(&mut self, command: &str, args: Vec<String>) -> Result<()> {
self.exec_command_mode = true;
if self.initialized {
bail!("Session already initialized");
}
@ -103,6 +354,8 @@ impl TerminalClient {
.map_err(|e| anyhow::anyhow!("Failed to send command: {}", e))?;
self.initialized = true;
self.ready = true;
Ok(())
}
@ -112,6 +365,10 @@ impl TerminalClient {
bail!("Session not initialized");
}
if !self.ready {
bail!("Shell not ready yet");
}
let message = ClientMessage {
r#type: "session_data".to_string(),
payload: ClientPayload::Data {
@ -128,6 +385,10 @@ impl TerminalClient {
/// Updates the terminal window size
pub async fn send_window_size(&mut self, cols: u16, rows: u16) -> Result<()> {
if self.initialized && !self.ready {
bail!("Shell not ready yet");
}
let message = ClientMessage {
r#type: "window_resize".to_string(),
payload: ClientPayload::WindowSize { cols, rows },
@ -146,6 +407,10 @@ impl TerminalClient {
bail!("Session not initialized");
}
if !self.ready {
bail!("Shell not ready yet");
}
let message = ClientMessage {
r#type: "signal".to_string(),
payload: ClientPayload::Signal { signal },
@ -169,15 +434,13 @@ impl TerminalClient {
/// Process incoming messages from the server
pub async fn handle_server_messages(&mut self) -> Result<()> {
let mut consecutive_empty_messages = 0;
let mut ping_interval = interval(Duration::from_secs(SSH_PING_INTERVAL_SECS));
loop {
tokio::select! {
msg_option = self.ws_stream.next() => {
match msg_option {
Some(msg_result) => {
let msg = msg_result.map_err(|e| anyhow::anyhow!("WebSocket error: {}", e))?;
Some(Ok(msg)) => {
match msg {
Message::Text(text) => {
let server_msg: ServerMessage = serde_json::from_str(&text)
@ -189,11 +452,24 @@ impl TerminalClient {
consecutive_empty_messages = 0;
print!("{}", text);
std::io::stdout().flush()?;
// Reset reconnect attempts on successful data
// only if not a recent reconnect
if !self.is_recent_reconnect() {
self.reconnect_attempts = 0;
}
self.update_activity();
}
DataPayload::Buffer { data } => {
consecutive_empty_messages = 0;
std::io::stdout().write_all(&data)?;
std::io::stdout().flush()?;
// Update activity when receiving data
self.update_activity();
// Reset reconnect attempts on successful data
// only if not a recent reconnect
if !self.is_recent_reconnect() {
self.reconnect_attempts = 0;
}
}
DataPayload::Empty {} => {
consecutive_empty_messages += 1;
@ -202,6 +478,16 @@ impl TerminalClient {
}
}
},
"ready" => {
// Client can start sending data/events
self.ready = true;
self.update_activity();
},
"stand_by" => {
// This indicates command is in progress
self.ready = true;
self.update_activity();
},
"command_exit" => {
if let Some(code) = server_msg.payload.code {
std::io::stdout().flush()?;
@ -213,10 +499,46 @@ impl TerminalClient {
}
},
"error" => {
bail!(server_msg.payload.message);
}
// Check if this is the specific connection closed error
if server_msg.payload.message.contains("Connection to application unexpectedly closed") {
// Only try to reconnect if we're in interactive mode and either idle or recently reconnected
if self.can_reconnect() && (self.is_idle() || self.is_recent_reconnect()) {
// Try to reconnect
match self.reconnect().await {
Ok(()) => {}, // Successfully reconnected
Err(reconnect_err) => {
if reconnect_err.to_string().contains("reconnected but needs re-initialization") {
bail!("{}", reconnect_err);
}
if reconnect_err.to_string().contains("Maximum reconnection attempts") {
bail!("Connection to application closed. (Max reconnects reached)");
}
bail!("Connection to application closed. (Reconnect failed: {})", reconnect_err);
}
}
} else {
// Not in interactive mode or not idle, so just report the error
bail!(server_msg.payload.message);
}
} else {
// This is some other error, bail directly
bail!(server_msg.payload.message);
}
},
"welcome" => {
// Ignore welcome messages after initialization
// If we get a welcome message and we're already initialized,
// it could mean the server restarted our session.
if self.initialized {
self.initialized = false;
self.ready = false;
bail!("reconnected but needs re-initialization");
}
// Reset reconnect attempts on welcome message
// only if it's been some time since the last reconnect
if !self.is_recent_reconnect() {
self.reconnect_attempts = 0;
}
self.update_activity();
}
"pty_closed" => {
return Ok(());
@ -227,40 +549,139 @@ impl TerminalClient {
}
}
Message::Close(frame) => {
if let Some(frame) = frame {
bail!(
"WebSocket closed with code {}: {}",
frame.code,
frame.reason
);
// Only try to reconnect if we're in interactive mode
if self.can_reconnect() {
// Try to reconnect on close
match self.reconnect().await {
Ok(()) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
bail!("{}", e);
}
if e.to_string().contains("Maximum reconnection attempts") {
if let Some(frame) = frame {
bail!("WebSocket closed with code {}: {} (Max reconnects reached)",
frame.code, frame.reason);
} else {
bail!("WebSocket closed unexpectedly (Max reconnects reached)");
}
}
if let Some(frame) = frame {
bail!("WebSocket closed with code {}: {} (Reconnect failed: {})",
frame.code, frame.reason, e);
} else {
bail!("WebSocket closed unexpectedly and reconnect failed: {}", e);
}
}
}
} else {
bail!("WebSocket closed unexpectedly");
// In exec_command mode, just report the close
if let Some(frame) = frame {
bail!("WebSocket closed with code {}: {} (exec_command mode)",
frame.code, frame.reason);
} else {
bail!("WebSocket closed unexpectedly (exec_command mode)");
}
}
}
Message::Ping(data) => {
self.send_message(Message::Pong(data)).await?;
self.update_activity();
if let Err(e) = self.send_message(Message::Pong(data)).await {
if e.to_string().contains("reconnected but needs re-initialization") {
bail!("{}", e);
}
if e.to_string().contains("Maximum reconnection attempts") {
bail!("Max reconnects reached: {}", e);
}
return Err(e);
}
}
Message::Pong(_) => {
// Pong received, connection is still alive
// Don't update activity time for pongs
}
Message::Binary(_) => {
eprintln!("Warning: Unexpected binary message received");
}
Message::Frame(_) => {
eprintln!("Warning: Unexpected raw frame received");
_ => {}
}
},
Some(Err(e)) => {
// Only try to reconnect if we're in interactive mode
if self.can_reconnect() {
// Try to reconnect on error
match self.reconnect().await {
Ok(()) => {},
Err(reconnect_err) => {
if reconnect_err.to_string().contains("reconnected but needs re-initialization") {
bail!("{}", reconnect_err);
}
if reconnect_err.to_string().contains("Maximum reconnection attempts") {
bail!("WebSocket error: {} (Max reconnects reached)", e);
}
bail!("WebSocket error: {} (Reconnect failed: {})", e, reconnect_err);
}
}
} else {
// In exec_command mode, just report the error
bail!("WebSocket error in exec_command mode: {}", e);
}
},
None => {
bail!("WebSocket connection closed unexpectedly");
// Only try to reconnect if we're in interactive mode
if self.can_reconnect() {
// Try to reconnect on connection close
match self.reconnect().await {
Ok(()) => {},
Err(e) => {
if e.to_string().contains("reconnected but needs re-initialization") {
bail!("{}", e);
}
if e.to_string().contains("Maximum reconnection attempts") {
bail!("WebSocket connection closed unexpectedly (Max reconnects reached)");
}
bail!("WebSocket connection closed unexpectedly and reconnect failed: {}", e);
}
}
} else {
// In exec_command mode, just report the close
bail!("WebSocket connection closed unexpectedly (exec_command mode)");
}
}
}
},
_ = ping_interval.tick() => {
self.send_ping().await?;
if let Err(e) = self.send_ping().await {
if e.to_string().contains("reconnected but needs re-initialization") {
bail!("{}", e);
}
if e.to_string().contains("Maximum reconnection attempts") {
bail!("Max reconnects reached: {}", e);
}
// Only try to reconnect if we're in interactive mode
if self.can_reconnect() {
// If ping fails, try to reconnect
match self.reconnect().await {
Ok(()) => {},
Err(reconnect_err) => {
if reconnect_err.to_string().contains("reconnected but needs re-initialization") {
bail!("{}", reconnect_err);
}
if reconnect_err.to_string().contains("Maximum reconnection attempts") {
bail!("Ping failed (Max reconnects reached)");
}
bail!("Ping failed: {} (Reconnect failed: {})", e, reconnect_err);
}
}
} else {
// In exec_command mode, just report the ping failure
bail!("Ping failed in exec_command mode: {}", e);
}
}
}
}
}
}
/// Check if the shell is ready for input
pub fn is_ready(&self) -> bool {
self.ready
}
}

View file

@ -6,7 +6,7 @@ use tokio::time::{sleep, timeout, Duration};
use url::Url;
use crate::commands::ssh::{
SSH_CONNECTION_TIMEOUT_SECS, SSH_MAX_RECONNECT_ATTEMPTS, SSH_RECONNECT_DELAY_SECS,
SSH_CONNECTION_TIMEOUT_SECS, SSH_CONNECT_DELAY_SECS, SSH_MAX_CONNECT_ATTEMPTS,
};
use crate::consts::get_user_agent;
@ -26,24 +26,20 @@ pub async fn establish_connection(
) -> Result<WebSocketStream<async_tungstenite::tokio::ConnectStream>> {
let url = Url::parse(url)?;
for attempt in 1..=SSH_MAX_RECONNECT_ATTEMPTS {
for attempt in 1..=SSH_MAX_CONNECT_ATTEMPTS {
match attempt_connection(&url, token, params).await {
Ok(ws_stream) => {
return Ok(ws_stream);
}
Err(e) => {
if attempt == SSH_MAX_RECONNECT_ATTEMPTS {
if attempt == SSH_MAX_CONNECT_ATTEMPTS {
bail!(
"Failed to establish connection after {} attempts: {}",
SSH_MAX_RECONNECT_ATTEMPTS,
SSH_MAX_CONNECT_ATTEMPTS,
e
);
}
eprintln!(
"Connection attempt {} failed: {}. Retrying in {} seconds...",
attempt, e, SSH_RECONNECT_DELAY_SECS
);
sleep(Duration::from_secs(SSH_RECONNECT_DELAY_SECS)).await;
sleep(Duration::from_secs(SSH_CONNECT_DELAY_SECS)).await;
}
}
}