diff --git a/flutter/lib/models/terminal_model.dart b/flutter/lib/models/terminal_model.dart index a74241ccb..77198a4e4 100644 --- a/flutter/lib/models/terminal_model.dart +++ b/flutter/lib/models/terminal_model.dart @@ -31,6 +31,8 @@ class TerminalModel with ChangeNotifier { static const int _kMaxOutputBufferChars = 8 * 1024; // View ready state: true when terminal has valid dimensions, safe to write bool _terminalViewReady = false; + // Buffer for incomplete UTF-8 trailing bytes across data chunks. + final _utf8Remainder = []; bool get isPeerWindows => parent.ffiModel.pi.platform == kPeerPlatformWindows; @@ -333,31 +335,80 @@ class TerminalModel with ChangeNotifier { if (data != null) { try { - String text = ''; + List bytes; if (data is String) { // Try to decode as base64 first try { - final bytes = base64Decode(data); - text = utf8.decode(bytes, allowMalformed: true); + bytes = base64Decode(data); } catch (e) { // If base64 decode fails, treat as plain text - text = data; + _writeToTerminal(data); + return; } } else if (data is List) { - // Handle if data comes as byte array - text = utf8.decode(List.from(data), allowMalformed: true); + bytes = List.from(data); } else { debugPrint('[TerminalModel] Unknown data type: ${data.runtimeType}'); return; } - _writeToTerminal(text); + // Prepend any leftover bytes from previous chunk. + if (_utf8Remainder.isNotEmpty) { + bytes = [..._utf8Remainder, ...bytes]; + _utf8Remainder.clear(); + } + + // Find the split point so we only decode complete UTF-8 sequences. + final split = _findUtf8SplitPoint(bytes); + if (split < bytes.length) { + _utf8Remainder.addAll(bytes.sublist(split)); + } + + if (split > 0) { + final text = utf8.decode(bytes.sublist(0, split)); + _writeToTerminal(text); + } } catch (e) { debugPrint('[TerminalModel] Failed to process terminal data: $e'); } } } + /// Find the largest prefix length that ends on a complete UTF-8 character + /// boundary. Returns [bytes.length] if no trailing incomplete sequence. + static int _findUtf8SplitPoint(List bytes) { + if (bytes.isEmpty) return 0; + // Inspect at most the last 3 bytes (max incomplete tail of 4-byte seq). + final start = bytes.length >= 3 ? bytes.length - 3 : 0; + for (int i = bytes.length - 1; i >= start; i--) { + final b = bytes[i]; + if (b & 0x80 == 0) { + // ASCII – everything is complete. + return bytes.length; + } + if (b & 0xC0 == 0x80) { + // Continuation byte – keep scanning backwards. + continue; + } + // Leading byte – determine expected sequence length. + int seqLen; + if (b & 0xE0 == 0xC0) { + seqLen = 2; + } else if (b & 0xF0 == 0xE0) { + seqLen = 3; + } else if (b & 0xF8 == 0xF0) { + seqLen = 4; + } else { + // Invalid leading byte – treat as complete. + return bytes.length; + } + final available = bytes.length - i; + return available >= seqLen ? bytes.length : i; + } + // All inspected bytes are continuation bytes – treat as complete. + return bytes.length; + } + /// Write text to terminal, buffering if the view is not yet ready. /// All terminal output should go through this method to avoid NaN errors /// from writing before the terminal view has valid layout dimensions. @@ -427,6 +478,7 @@ class TerminalModel with ChangeNotifier { _inputBuffer.clear(); _pendingOutputChunks.clear(); _pendingOutputSize = 0; + _utf8Remainder.clear(); // Terminal cleanup is handled server-side when service closes super.dispose(); } diff --git a/src/server/terminal_service.rs b/src/server/terminal_service.rs index fb6b4fd29..74fddec7c 100644 --- a/src/server/terminal_service.rs +++ b/src/server/terminal_service.rs @@ -531,6 +531,54 @@ impl OutputBuffer { } } +/// Find the largest prefix of `buf` that is valid UTF-8, i.e. does not end +/// with an incomplete multi-byte sequence. Returns `buf.len()` when the +/// entire buffer is complete. The caller should send `buf[..split]` and +/// keep `buf[split..]` as a remainder for the next read. +/// +/// We only need to inspect the last 1-3 bytes because the longest UTF-8 +/// sequence is 4 bytes. +fn find_utf8_split_point(buf: &[u8]) -> usize { + if buf.is_empty() { + return 0; + } + // Walk backwards up to 3 bytes (max incomplete tail of a 4-byte sequence). + let start = if buf.len() >= 3 { buf.len() - 3 } else { 0 }; + for i in (start..buf.len()).rev() { + let b = buf[i]; + if b & 0x80 == 0 { + // ASCII – everything up to and including this byte is complete. + return buf.len(); + } + // Continuation byte (10xxxxxx) – keep scanning back. + if b & 0xC0 == 0x80 { + continue; + } + // Leading byte – determine expected sequence length. + let seq_len = if b & 0xE0 == 0xC0 { + 2 + } else if b & 0xF0 == 0xE0 { + 3 + } else if b & 0xF8 == 0xF0 { + 4 + } else { + // Invalid leading byte – treat as complete to avoid stalling. + return buf.len(); + }; + let available = buf.len() - i; + if available >= seq_len { + // Sequence is complete. + return buf.len(); + } else { + // Incomplete sequence starts at `i`. + return i; + } + } + // All inspected bytes are continuation bytes without a leading byte – + // treat as complete (probably binary / non-UTF-8 data). + buf.len() +} + /// Try to send data through the output channel with rate-limited drop logging. /// Returns `true` if the caller should break out of the read loop (channel disconnected). fn try_send_output( @@ -1086,22 +1134,52 @@ impl TerminalServiceProxy { let reader_thread = thread::spawn(move || { let mut reader = reader; let mut buf = vec![0u8; 4096]; + let mut utf8_remainder: Vec = Vec::new(); let mut drop_count: u64 = 0; // Initialize to > 5s ago so the first drop triggers a warning immediately. let mut last_drop_warn = Instant::now() - Duration::from_secs(6); loop { match reader.read(&mut buf) { Ok(0) => { - // EOF - // This branch can be reached when the child process exits on macOS. - // But not on Linux and Windows in my tests. + // EOF – flush any leftover remainder bytes. + if !utf8_remainder.is_empty() { + let _ = try_send_output( + &output_tx, + utf8_remainder.split_off(0), + terminal_id, + "", + &mut drop_count, + &mut last_drop_warn, + ); + } break; } Ok(n) => { if exiting.load(Ordering::SeqCst) { break; } - let data = buf[..n].to_vec(); + // Prepend any leftover bytes from the previous read. + let combined = if utf8_remainder.is_empty() { + buf[..n].to_vec() + } else { + let mut v = std::mem::take(&mut utf8_remainder); + v.extend_from_slice(&buf[..n]); + v + }; + // Split at a UTF-8 character boundary so we never send + // an incomplete multi-byte sequence. + let split = find_utf8_split_point(&combined); + if split < combined.len() { + utf8_remainder = combined[split..].to_vec(); + } + let data = if split == combined.len() { + combined + } else { + combined[..split].to_vec() + }; + if data.is_empty() { + continue; + } // Use try_send to avoid blocking the reader thread when channel is full. // During disconnect, the run loop (sp.ok()) stops and read_outputs() is // no longer called, so the channel won't be drained. Blocking send would @@ -1308,13 +1386,24 @@ impl TerminalServiceProxy { let terminal_id = open.terminal_id; let reader_thread = thread::spawn(move || { let mut buf = vec![0u8; 4096]; + let mut utf8_remainder: Vec = Vec::new(); let mut drop_count: u64 = 0; // Initialize to > 5s ago so the first drop triggers a warning immediately. let mut last_drop_warn = Instant::now() - Duration::from_secs(6); loop { match output_pipe.read(&mut buf) { Ok(0) => { - // EOF - helper process exited + // EOF – flush any leftover remainder bytes. + if !utf8_remainder.is_empty() { + let _ = try_send_output( + &output_tx, + utf8_remainder.split_off(0), + terminal_id, + " (helper)", + &mut drop_count, + &mut last_drop_warn, + ); + } log::debug!("Terminal {} helper output EOF", terminal_id); break; } @@ -1322,7 +1411,26 @@ impl TerminalServiceProxy { if exiting.load(Ordering::SeqCst) { break; } - let data = buf[..n].to_vec(); + // Prepend any leftover bytes from the previous read. + let combined = if utf8_remainder.is_empty() { + buf[..n].to_vec() + } else { + let mut v = std::mem::take(&mut utf8_remainder); + v.extend_from_slice(&buf[..n]); + v + }; + let split = find_utf8_split_point(&combined); + if split < combined.len() { + utf8_remainder = combined[split..].to_vec(); + } + let data = if split == combined.len() { + combined + } else { + combined[..split].to_vec() + }; + if data.is_empty() { + continue; + } // Use try_send to avoid blocking the reader thread (same as direct PTY mode) if try_send_output( &output_tx, @@ -1845,3 +1953,123 @@ impl TerminalServiceProxy { } } } + +#[cfg(test)] +mod tests { + use super::*; + + #[test] + fn test_utf8_split_empty() { + assert_eq!(find_utf8_split_point(&[]), 0); + } + + #[test] + fn test_utf8_split_ascii_only() { + assert_eq!(find_utf8_split_point(b"hello"), 5); + } + + #[test] + fn test_utf8_split_complete_2byte() { + // 'é' = 0xC3 0xA9 + let data = "café".as_bytes(); + assert_eq!(find_utf8_split_point(data), data.len()); + } + + #[test] + fn test_utf8_split_complete_3byte() { + // '中' = 0xE4 0xB8 0xAD + let data = "中文".as_bytes(); + assert_eq!(find_utf8_split_point(data), data.len()); + } + + #[test] + fn test_utf8_split_complete_4byte() { + // '😀' = 0xF0 0x9F 0x98 0x80 + let data = "😀".as_bytes(); + assert_eq!(find_utf8_split_point(data), data.len()); + } + + #[test] + fn test_utf8_split_incomplete_2byte() { + // Leading byte of 2-byte seq without continuation + let data = vec![b'a', 0xC3]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn test_utf8_split_incomplete_3byte_1of3() { + // Leading byte of 3-byte seq, missing 2 continuation bytes + let data = vec![b'a', 0xE4]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn test_utf8_split_incomplete_3byte_2of3() { + // Leading byte of 3-byte seq + 1 continuation, missing 1 + let data = vec![b'a', 0xE4, 0xB8]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn test_utf8_split_incomplete_4byte_1of4() { + let data = vec![b'a', 0xF0]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn test_utf8_split_incomplete_4byte_2of4() { + let data = vec![b'a', 0xF0, 0x9F]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn test_utf8_split_incomplete_4byte_3of4() { + let data = vec![b'a', 0xF0, 0x9F, 0x98]; + assert_eq!(find_utf8_split_point(&data), 1); + } + + #[test] + fn test_utf8_split_mixed_complete() { + // ASCII + 3-byte (中) + ASCII + let data = "a中b".as_bytes(); + assert_eq!(find_utf8_split_point(data), data.len()); + } + + #[test] + fn test_utf8_split_trailing_incomplete_after_valid() { + // "中" (complete) followed by incomplete start of another 3-byte char + let mut data = "中".as_bytes().to_vec(); + data.push(0xE4); // leading byte of next 3-byte char + assert_eq!(find_utf8_split_point(&data), 3); // split before incomplete + } + + #[test] + fn test_utf8_split_only_continuation_bytes() { + // Orphan continuation bytes (no leading byte) – treated as complete + let data = vec![0x80, 0x81, 0x82]; + assert_eq!(find_utf8_split_point(&data), data.len()); + } + + #[test] + fn test_utf8_split_simulated_chunked_read() { + // Simulate reading "你好世界" in awkward chunk boundaries + let full = "你好世界".as_bytes().to_vec(); // 12 bytes (3 * 4 chars) + assert_eq!(full.len(), 12); + + let mut decoded = Vec::new(); + let mut remainder: Vec = Vec::new(); + + // Read in chunks of 5 bytes (will split mid-character) + for chunk in full.chunks(5) { + let mut combined = remainder.clone(); + combined.extend_from_slice(chunk); + let split = find_utf8_split_point(&combined); + decoded.extend_from_slice(&combined[..split]); + remainder = combined[split..].to_vec(); + } + // Flush remainder + decoded.extend_from_slice(&remainder); + + assert_eq!(String::from_utf8(decoded).unwrap(), "你好世界"); + } +}