mirror of
https://github.com/wavetermdev/waveterm
synced 2026-04-21 14:37:16 +00:00
Lots of updates across all parts of the system to get this working. Big changes to routing, streaming, connection management, etc. * Persistent sessions behind a metadata flag for now * New backlog queue in the router to prevent hanging * Fix connection Close() issues that caused hangs when network was down * Fix issue with random routeids (need to be generated fresh each time the JWT is used and not fixed) so you can run multiple-wsh commands at once * Fix issue with domain sockets changing names across wave restarts (added a symlink mechanism to resolve new names) * ClientId caching in main server * Quick reorder queue for input to prevent out of order delivery across multiple hops * Fix out-of-order event delivery in router (remove unnecessary go routine creation) * Environment testing and fix environment variables for remote jobs (get from connserver, add to remote job starts) * Add new ConnServerInit() remote method to call before marking connection up * TODO -- remote file transfer needs to be fixed to not create OOM issues when transferring large files or directories
65 lines
1.5 KiB
Go
65 lines
1.5 KiB
Go
// Copyright 2025, Command Line Inc.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package wshutil
|
|
|
|
import (
|
|
"fmt"
|
|
"sync"
|
|
|
|
"github.com/wavetermdev/waveterm/pkg/baseds"
|
|
"github.com/wavetermdev/waveterm/pkg/panichandler"
|
|
"github.com/wavetermdev/waveterm/pkg/wshrpc"
|
|
)
|
|
|
|
type WshRpcProxy struct {
|
|
Lock *sync.Mutex
|
|
RpcContext *wshrpc.RpcContext
|
|
ToRemoteCh chan []byte
|
|
FromRemoteCh chan baseds.RpcInputChType
|
|
PeerInfo string
|
|
}
|
|
|
|
func MakeRpcProxy(peerInfo string) *WshRpcProxy {
|
|
return MakeRpcProxyWithSize(peerInfo, DefaultInputChSize, DefaultOutputChSize)
|
|
}
|
|
|
|
func MakeRpcProxyWithSize(peerInfo string, inputChSize int, outputChSize int) *WshRpcProxy {
|
|
return &WshRpcProxy{
|
|
Lock: &sync.Mutex{},
|
|
ToRemoteCh: make(chan []byte, inputChSize),
|
|
FromRemoteCh: make(chan baseds.RpcInputChType, outputChSize),
|
|
PeerInfo: peerInfo,
|
|
}
|
|
}
|
|
|
|
func (p *WshRpcProxy) GetPeerInfo() string {
|
|
return p.PeerInfo
|
|
}
|
|
|
|
func (p *WshRpcProxy) SetPeerInfo(peerInfo string) {
|
|
p.Lock.Lock()
|
|
defer p.Lock.Unlock()
|
|
p.PeerInfo = peerInfo
|
|
}
|
|
|
|
func (p *WshRpcProxy) SendRpcMessage(msg []byte, ingressLinkId baseds.LinkId, debugStr string) bool {
|
|
defer func() {
|
|
panicCtx := "WshRpcProxy.SendRpcMessage"
|
|
if debugStr != "" {
|
|
panicCtx = fmt.Sprintf("%s:%s", panicCtx, debugStr)
|
|
}
|
|
panichandler.PanicHandler(panicCtx, recover())
|
|
}()
|
|
select {
|
|
case p.ToRemoteCh <- msg:
|
|
return true
|
|
default:
|
|
return false
|
|
}
|
|
}
|
|
|
|
func (p *WshRpcProxy) RecvRpcMessage() ([]byte, bool) {
|
|
inputVal, more := <-p.FromRemoteCh
|
|
return inputVal.MsgBytes, more
|
|
}
|