waveterm/pkg/jobmanager/streammanager.go
Mike Sawka f36187f619
stress test for the new RPC streaming primitives (+ bug fixes) (#2828)
This pull request introduces a new integration test tool for the
StreamManager streaming system, adding a standalone test binary with
supporting modules for simulating and verifying high-throughput data
transfer. The changes include a test driver, a configurable in-memory
delivery pipe for simulating network conditions, a data generator, a
verifier for end-to-end integrity, and a metrics tracker. Additionally,
several improvements are made to the circular buffer and StreamManager
for better handling of blocking writes and out-of-order acknowledgments.

**New StreamManager Integration Test Tool**

* Added a new test binary `cmd/test-streammanager` with a main driver
(`main-test-streammanager.go`) that orchestrates end-to-end streaming
tests, including configuration for data size, delivery delay/skew,
window size, slow reader simulation, and verbose logging.
* Implemented a configurable `DeliveryPipe` (`deliverypipe.go`) for
simulating network delivery with delay and skew, supporting separate
data and ack channels, out-of-order delivery, and high water mark
tracking.
* Added `WriterBridge` and `ReaderBridge` modules for interfacing
between brokers and the delivery pipe, enforcing correct directionality
of data and acks.
* Created a sequential test data generator (`generator.go`) and a
verifier (`verifier.go`) for checking data integrity and reporting
mismatches.
[[1]](diffhunk://#diff-3f2d6e0349089e3748c001791a383687b33a2c2391fd3baccfceb83e76e6ee0dR1-R40)
[[2]](diffhunk://#diff-cb3aab0bae9bec15ef0c06fe5d9e0e96094affcf4720680605a92054ab717575R1-R61)
* Introduced a metrics module (`metrics.go`) for tracking throughput,
packet counts, out-of-order events, and pipe usage, with a summary
report at test completion.

**StreamManager and CirBuf Improvements**

* Refactored circular buffer (`pkg/jobmanager/cirbuf.go`) to replace
blocking writes with a non-blocking `WriteAvailable` method, returning a
wait channel for buffer-full scenarios, and removed context-based
cancellation logic.
* Updated StreamManager (`pkg/jobmanager/streammanager.go`) to track the
maximum acknowledged sequence/rwnd tuple, ignoring stale or out-of-order
ACKs, and resetting this state on disconnect.
* Modified StreamManager's data handling to use the new non-blocking
buffer write logic, ensuring correct signaling and waiting for space
when needed.

**Minor Cleanup**

* Removed unused context import from `cirbuf.go`.
* Minor whitespace cleanup in `streambroker.go`.
2026-02-05 14:48:12 -08:00

437 lines
9.4 KiB
Go

// Copyright 2025, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package jobmanager
import (
"encoding/base64"
"fmt"
"io"
"log"
"sync"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
)
const (
CwndSize = 64 * 1024 // 64 KB window for connected mode
CirBufSize = 2 * 1024 * 1024 // 2 MB max buffer size
DisconnReadSz = 4 * 1024 // 4 KB read chunks when disconnected
MaxPacketSize = 4 * 1024 // 4 KB max data per packet
)
type DataSender interface {
SendData(dataPk wshrpc.CommandStreamData)
}
type streamTerminalEvent struct {
isEof bool
err string
}
// StreamManager handles PTY output buffering with ACK-based flow control
type StreamManager struct {
lock sync.Mutex
drainCond *sync.Cond
streamId string
// this is the data read from the attached reader
buf *CirBuf
terminalEvent *streamTerminalEvent
eofPos int64 // fixed position when EOF/error occurs (-1 if not yet)
reader io.Reader
cwndSize int
rwndSize int
// invariant: if connected is true, dataSender is non-nil
connected bool
dataSender DataSender
// unacked state (reset on disconnect)
sentNotAcked int64
terminalEventSent bool
// track max acked to handle out-of-order ACKs (reset on disconnect)
maxAckedSeq int64
maxAckedRwnd int64
// terminal state - once true, stream is complete
terminalEventAcked bool
closed bool
}
func MakeStreamManager() *StreamManager {
return MakeStreamManagerWithSizes(CwndSize, CirBufSize)
}
func MakeStreamManagerWithSizes(cwndSize, cirbufSize int) *StreamManager {
sm := &StreamManager{
buf: MakeCirBuf(cirbufSize, true),
eofPos: -1,
cwndSize: cwndSize,
rwndSize: cwndSize,
}
sm.drainCond = sync.NewCond(&sm.lock)
go sm.senderLoop()
return sm
}
// AttachReader starts reading from the given reader
func (sm *StreamManager) AttachReader(r io.Reader) error {
sm.lock.Lock()
defer sm.lock.Unlock()
if sm.reader != nil {
return fmt.Errorf("reader already attached")
}
sm.reader = r
go sm.readLoop()
return nil
}
// ClientConnected transitions to CONNECTED mode
func (sm *StreamManager) ClientConnected(streamId string, dataSender DataSender, rwndSize int, clientSeq int64) (int64, error) {
sm.lock.Lock()
defer sm.lock.Unlock()
if sm.closed || sm.terminalEventAcked {
return 0, fmt.Errorf("stream is closed")
}
if sm.connected {
return 0, fmt.Errorf("client already connected")
}
if dataSender == nil {
return 0, fmt.Errorf("dataSender cannot be nil")
}
headPos := sm.buf.HeadPos()
if clientSeq > headPos {
bytesToConsume := int(clientSeq - headPos)
available := sm.buf.Size()
if bytesToConsume > available {
return 0, fmt.Errorf("client seq %d is beyond our stream end (head=%d, size=%d)", clientSeq, headPos, available)
}
if bytesToConsume > 0 {
if err := sm.buf.Consume(bytesToConsume); err != nil {
return 0, fmt.Errorf("failed to consume buffer: %w", err)
}
headPos = sm.buf.HeadPos()
}
}
sm.streamId = streamId
sm.dataSender = dataSender
sm.connected = true
sm.rwndSize = rwndSize
sm.sentNotAcked = 0
effectiveWindow := sm.cwndSize
if sm.rwndSize < effectiveWindow {
effectiveWindow = sm.rwndSize
}
sm.buf.SetEffectiveWindow(true, effectiveWindow)
sm.drainCond.Signal()
startSeq := headPos
if clientSeq > startSeq {
startSeq = clientSeq
}
return startSeq, nil
}
// GetStreamId returns the current stream ID (safe to call with lock held by caller)
func (sm *StreamManager) GetStreamId() string {
sm.lock.Lock()
defer sm.lock.Unlock()
return sm.streamId
}
// GetStreamDoneInfo returns whether the stream is done and the error if there was one.
// The error is only meaningful if done=true, as the error is delivered as part of the stream otherwise.
func (sm *StreamManager) GetStreamDoneInfo() (done bool, streamError string) {
sm.lock.Lock()
defer sm.lock.Unlock()
if !sm.terminalEventAcked {
return false, ""
}
if sm.terminalEvent != nil && !sm.terminalEvent.isEof {
return true, sm.terminalEvent.err
}
return true, ""
}
// ClientDisconnected transitions to DISCONNECTED mode
func (sm *StreamManager) ClientDisconnected() {
sm.lock.Lock()
defer sm.lock.Unlock()
if !sm.connected {
return
}
sm.connected = false
sm.dataSender = nil
sm.sentNotAcked = 0
sm.maxAckedSeq = 0
sm.maxAckedRwnd = 0
if !sm.terminalEventAcked {
sm.terminalEventSent = false
}
sm.buf.SetEffectiveWindow(false, CirBufSize)
sm.drainCond.Signal()
}
// RecvAck processes an ACK from the client
// must be connected, and streamid must match
func (sm *StreamManager) RecvAck(ackPk wshrpc.CommandStreamAckData) {
sm.lock.Lock()
defer sm.lock.Unlock()
if !sm.connected || ackPk.Id != sm.streamId {
return
}
if ackPk.Fin {
sm.terminalEventAcked = true
sm.drainCond.Signal()
return
}
seq := ackPk.Seq
rwnd := ackPk.RWnd
// Ignore stale ACKs using tuple comparison (seq, rwnd)
if seq < sm.maxAckedSeq || (seq == sm.maxAckedSeq && rwnd <= sm.maxAckedRwnd) {
// log.Printf("streammanager ignoring stale ACK: seq=%d rwnd=%d (max: seq=%d rwnd=%d)",
// seq, rwnd, sm.maxAckedSeq, sm.maxAckedRwnd)
return
}
// Update max acked tuple
sm.maxAckedSeq = seq
sm.maxAckedRwnd = rwnd
headPos := sm.buf.HeadPos()
if seq < headPos {
return
}
ackedBytes := seq - headPos
if ackedBytes > sm.sentNotAcked {
return
}
if ackedBytes > 0 {
if err := sm.buf.Consume(int(ackedBytes)); err != nil {
return
}
sm.sentNotAcked -= ackedBytes
}
prevRwnd := sm.rwndSize
sm.rwndSize = int(ackPk.RWnd)
effectiveWindow := sm.cwndSize
if sm.rwndSize < effectiveWindow {
effectiveWindow = sm.rwndSize
}
sm.buf.SetEffectiveWindow(true, effectiveWindow)
if sm.rwndSize > prevRwnd || ackedBytes > 0 {
sm.drainCond.Signal()
}
}
// SetRwndSize dynamically updates the receive window size
func (sm *StreamManager) SetRwndSize(rwndSize int) error {
sm.lock.Lock()
defer sm.lock.Unlock()
if rwndSize < 0 {
return fmt.Errorf("rwndSize cannot be negative")
}
if !sm.connected {
return fmt.Errorf("not connected")
}
sm.rwndSize = rwndSize
effectiveWindow := sm.cwndSize
if sm.rwndSize < effectiveWindow {
effectiveWindow = sm.rwndSize
}
sm.buf.SetEffectiveWindow(true, effectiveWindow)
sm.drainCond.Signal()
return nil
}
// Close shuts down the sender loop. The reader loop will exit on its next iteration
// or when the underlying reader is closed.
func (sm *StreamManager) Close() {
sm.lock.Lock()
defer sm.lock.Unlock()
sm.closed = true
sm.drainCond.Signal()
}
// readLoop is the main read goroutine
func (sm *StreamManager) readLoop() {
readBuf := make([]byte, MaxPacketSize)
for {
sm.lock.Lock()
closed := sm.closed
sm.lock.Unlock()
if closed {
return
}
n, err := sm.reader.Read(readBuf)
if n > 0 {
sm.handleReadData(readBuf[:n])
}
if err != nil {
if err == io.EOF {
sm.handleEOF()
} else {
sm.handleError(err)
}
return
}
}
}
func (sm *StreamManager) handleReadData(data []byte) {
offset := 0
for offset < len(data) {
n, waitCh := sm.buf.WriteAvailable(data[offset:])
offset += n
if n > 0 {
sm.lock.Lock()
sm.drainCond.Signal()
sm.lock.Unlock()
}
if waitCh != nil {
<-waitCh
}
}
}
func (sm *StreamManager) handleEOF() {
sm.lock.Lock()
defer sm.lock.Unlock()
log.Printf("handleEOF: PTY reached EOF, totalSize=%d", sm.buf.TotalSize())
sm.eofPos = sm.buf.TotalSize()
sm.terminalEvent = &streamTerminalEvent{isEof: true}
sm.drainCond.Signal()
}
func (sm *StreamManager) handleError(err error) {
sm.lock.Lock()
defer sm.lock.Unlock()
log.Printf("handleError: PTY error=%v, totalSize=%d", err, sm.buf.TotalSize())
sm.eofPos = sm.buf.TotalSize()
sm.terminalEvent = &streamTerminalEvent{err: err.Error()}
sm.drainCond.Signal()
}
func (sm *StreamManager) senderLoop() {
for {
done, pkt, sender := sm.prepareNextPacket()
if done {
return
}
if pkt == nil {
continue
}
sender.SendData(*pkt)
}
}
func (sm *StreamManager) prepareNextPacket() (done bool, pkt *wshrpc.CommandStreamData, sender DataSender) {
sm.lock.Lock()
defer sm.lock.Unlock()
available := sm.buf.Size()
if sm.closed || sm.terminalEventAcked {
return true, nil, nil
}
if !sm.connected {
sm.drainCond.Wait()
return false, nil, nil
}
if available == 0 {
if sm.terminalEvent != nil && !sm.terminalEventSent {
return false, sm.prepareTerminalPacket(), sm.dataSender
}
sm.drainCond.Wait()
return false, nil, nil
}
effectiveRwnd := sm.rwndSize
if sm.cwndSize < effectiveRwnd {
effectiveRwnd = sm.cwndSize
}
availableToSend := int64(effectiveRwnd) - sm.sentNotAcked
if availableToSend <= 0 {
sm.drainCond.Wait()
return false, nil, nil
}
peekSize := int(availableToSend)
if peekSize > MaxPacketSize {
peekSize = MaxPacketSize
}
if peekSize > available {
peekSize = available
}
data := make([]byte, peekSize)
n := sm.buf.PeekDataAt(int(sm.sentNotAcked), data)
if n == 0 {
sm.drainCond.Wait()
return false, nil, nil
}
data = data[:n]
seq := sm.buf.HeadPos() + sm.sentNotAcked
sm.sentNotAcked += int64(n)
return false, &wshrpc.CommandStreamData{
Id: sm.streamId,
Seq: seq,
Data64: base64.StdEncoding.EncodeToString(data),
}, sm.dataSender
}
func (sm *StreamManager) prepareTerminalPacket() *wshrpc.CommandStreamData {
if sm.terminalEventSent || sm.terminalEvent == nil {
return nil
}
pkt := &wshrpc.CommandStreamData{
Id: sm.streamId,
Seq: sm.eofPos,
}
if sm.terminalEvent.isEof {
pkt.Eof = true
} else {
pkt.Error = sm.terminalEvent.err
}
sm.terminalEventSent = true
return pkt
}