mirror of
https://github.com/wavetermdev/waveterm
synced 2026-05-24 09:18:27 +00:00
This pull request introduces a new integration test tool for the StreamManager streaming system, adding a standalone test binary with supporting modules for simulating and verifying high-throughput data transfer. The changes include a test driver, a configurable in-memory delivery pipe for simulating network conditions, a data generator, a verifier for end-to-end integrity, and a metrics tracker. Additionally, several improvements are made to the circular buffer and StreamManager for better handling of blocking writes and out-of-order acknowledgments. **New StreamManager Integration Test Tool** * Added a new test binary `cmd/test-streammanager` with a main driver (`main-test-streammanager.go`) that orchestrates end-to-end streaming tests, including configuration for data size, delivery delay/skew, window size, slow reader simulation, and verbose logging. * Implemented a configurable `DeliveryPipe` (`deliverypipe.go`) for simulating network delivery with delay and skew, supporting separate data and ack channels, out-of-order delivery, and high water mark tracking. * Added `WriterBridge` and `ReaderBridge` modules for interfacing between brokers and the delivery pipe, enforcing correct directionality of data and acks. * Created a sequential test data generator (`generator.go`) and a verifier (`verifier.go`) for checking data integrity and reporting mismatches. [[1]](diffhunk://#diff-3f2d6e0349089e3748c001791a383687b33a2c2391fd3baccfceb83e76e6ee0dR1-R40) [[2]](diffhunk://#diff-cb3aab0bae9bec15ef0c06fe5d9e0e96094affcf4720680605a92054ab717575R1-R61) * Introduced a metrics module (`metrics.go`) for tracking throughput, packet counts, out-of-order events, and pipe usage, with a summary report at test completion. **StreamManager and CirBuf Improvements** * Refactored circular buffer (`pkg/jobmanager/cirbuf.go`) to replace blocking writes with a non-blocking `WriteAvailable` method, returning a wait channel for buffer-full scenarios, and removed context-based cancellation logic. * Updated StreamManager (`pkg/jobmanager/streammanager.go`) to track the maximum acknowledged sequence/rwnd tuple, ignoring stale or out-of-order ACKs, and resetting this state on disconnect. * Modified StreamManager's data handling to use the new non-blocking buffer write logic, ensuring correct signaling and waiting for space when needed. **Minor Cleanup** * Removed unused context import from `cirbuf.go`. * Minor whitespace cleanup in `streambroker.go`.
207 lines
3.8 KiB
Go
207 lines
3.8 KiB
Go
package streamclient
|
|
|
|
import (
|
|
"encoding/base64"
|
|
"fmt"
|
|
"io"
|
|
"sync"
|
|
|
|
"github.com/wavetermdev/waveterm/pkg/wshrpc"
|
|
)
|
|
|
|
type DataSender interface {
|
|
SendData(dataPk wshrpc.CommandStreamData)
|
|
}
|
|
|
|
type Writer struct {
|
|
lock sync.Mutex
|
|
cond *sync.Cond
|
|
id string
|
|
dataSender DataSender
|
|
readWindow int64
|
|
nextSeq int64
|
|
buffer []byte
|
|
sentNotAcked int64
|
|
maxAckedSeq int64
|
|
maxAckedRwnd int64
|
|
finAcked bool
|
|
canceled bool
|
|
canceledChan chan struct{}
|
|
eof bool
|
|
err error
|
|
closed bool
|
|
}
|
|
|
|
func NewWriter(id string, readWindow int64, dataSender DataSender) *Writer {
|
|
w := &Writer{
|
|
id: id,
|
|
readWindow: readWindow,
|
|
dataSender: dataSender,
|
|
nextSeq: 0,
|
|
sentNotAcked: 0,
|
|
maxAckedSeq: 0,
|
|
canceledChan: make(chan struct{}),
|
|
}
|
|
w.cond = sync.NewCond(&w.lock)
|
|
return w
|
|
}
|
|
|
|
func (w *Writer) RecvAck(ackPk wshrpc.CommandStreamAckData) {
|
|
w.lock.Lock()
|
|
defer w.lock.Unlock()
|
|
|
|
if ackPk.Id != w.id {
|
|
return
|
|
}
|
|
|
|
ackedSeq := ackPk.Seq
|
|
rwnd := ackPk.RWnd
|
|
|
|
if ackPk.Fin {
|
|
w.finAcked = true
|
|
w.maxAckedSeq = ackedSeq
|
|
return
|
|
}
|
|
|
|
if ackPk.Cancel && !w.canceled {
|
|
w.canceled = true
|
|
close(w.canceledChan)
|
|
if !w.closed {
|
|
w.err = fmt.Errorf("stream cancelled")
|
|
w.cond.Broadcast()
|
|
}
|
|
return
|
|
}
|
|
|
|
// Ignore stale ACKs using tuple comparison (seq, rwnd)
|
|
if ackedSeq < w.maxAckedSeq || (ackedSeq == w.maxAckedSeq && rwnd <= w.maxAckedRwnd) {
|
|
return
|
|
}
|
|
|
|
// Update max acked tuple
|
|
w.maxAckedSeq = ackedSeq
|
|
w.maxAckedRwnd = rwnd
|
|
|
|
if !w.closed {
|
|
if ackedSeq > (w.nextSeq - w.sentNotAcked) {
|
|
ackedBytes := ackedSeq - (w.nextSeq - w.sentNotAcked)
|
|
w.sentNotAcked -= ackedBytes
|
|
if w.sentNotAcked < 0 {
|
|
w.sentNotAcked = 0
|
|
}
|
|
}
|
|
|
|
w.readWindow = rwnd
|
|
w.cond.Broadcast()
|
|
}
|
|
}
|
|
|
|
func (w *Writer) GetAckState() (maxAckedSeq int64, finAcked bool, canceled bool) {
|
|
w.lock.Lock()
|
|
defer w.lock.Unlock()
|
|
|
|
return w.maxAckedSeq, w.finAcked, w.canceled
|
|
}
|
|
|
|
func (w *Writer) GetCanceledChan() <-chan struct{} {
|
|
return w.canceledChan
|
|
}
|
|
|
|
func (w *Writer) Write(p []byte) (int, error) {
|
|
w.lock.Lock()
|
|
defer w.lock.Unlock()
|
|
|
|
if w.closed {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
|
|
if w.err != nil {
|
|
return 0, w.err
|
|
}
|
|
|
|
w.buffer = append(w.buffer, p...)
|
|
n := len(p)
|
|
|
|
for len(w.buffer) > 0 {
|
|
if w.closed {
|
|
return 0, io.ErrClosedPipe
|
|
}
|
|
if w.err != nil {
|
|
return 0, w.err
|
|
}
|
|
|
|
sent := w.trySendDataLocked()
|
|
if !sent {
|
|
w.cond.Wait()
|
|
}
|
|
}
|
|
|
|
return n, nil
|
|
}
|
|
|
|
func (w *Writer) trySendDataLocked() bool {
|
|
availWindow := w.readWindow - w.sentNotAcked
|
|
if availWindow <= 0 {
|
|
return false
|
|
}
|
|
|
|
toSend := len(w.buffer)
|
|
if int64(toSend) > availWindow {
|
|
toSend = int(availWindow)
|
|
}
|
|
|
|
data := w.buffer[:toSend]
|
|
w.buffer = w.buffer[toSend:]
|
|
|
|
dataStr := base64.StdEncoding.EncodeToString(data)
|
|
dataPk := wshrpc.CommandStreamData{
|
|
Id: w.id,
|
|
Seq: w.nextSeq,
|
|
Data64: dataStr,
|
|
}
|
|
|
|
w.dataSender.SendData(dataPk)
|
|
w.nextSeq += int64(toSend)
|
|
w.sentNotAcked += int64(toSend)
|
|
|
|
return toSend > 0
|
|
}
|
|
|
|
// If Close() is called while a Write is blocked, the Write will return an error and buffered data may be discarded.
|
|
func (w *Writer) Close() error {
|
|
return w.CloseWithError(nil)
|
|
}
|
|
|
|
// If CloseWithError() is called while a Write is blocked, the Write will return an error and buffered data may be discarded.
|
|
func (w *Writer) CloseWithError(err error) error {
|
|
w.lock.Lock()
|
|
defer w.lock.Unlock()
|
|
|
|
if w.closed {
|
|
return nil
|
|
}
|
|
|
|
w.closed = true
|
|
if w.err == nil {
|
|
w.err = io.ErrClosedPipe
|
|
}
|
|
w.cond.Broadcast()
|
|
|
|
var dataPk wshrpc.CommandStreamData
|
|
if err == nil || err == io.EOF {
|
|
dataPk = wshrpc.CommandStreamData{
|
|
Id: w.id,
|
|
Seq: w.nextSeq,
|
|
Eof: true,
|
|
}
|
|
} else {
|
|
dataPk = wshrpc.CommandStreamData{
|
|
Id: w.id,
|
|
Seq: w.nextSeq,
|
|
Error: err.Error(),
|
|
}
|
|
}
|
|
w.dataSender.SendData(dataPk)
|
|
|
|
return nil
|
|
}
|