migrate old file streaming to new modern interface (w/ flow control) (#3096)

This commit is contained in:
Mike Sawka 2026-03-24 09:59:14 -07:00 committed by GitHub
parent 9ed86e9e18
commit 6a287e4b84
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
18 changed files with 152 additions and 310 deletions

View file

@ -384,7 +384,6 @@ func shutdownActivityUpdate() {
func createMainWshClient() {
rpc := wshserver.GetMainRpcClient()
wshfs.RpcClient = rpc
wshutil.DefaultRouter.RegisterTrustedLeaf(rpc, wshutil.DefaultRoute)
wps.Broker.SetClient(wshutil.DefaultRouter)
localInitialEnv := envutil.PruneInitialEnv(envutil.SliceToMap(os.Environ()))
@ -393,6 +392,8 @@ func createMainWshClient() {
localConnWsh := wshutil.MakeWshRpc(wshrpc.RpcContext{Conn: wshrpc.LocalConnName}, remoteImpl, "conn:local")
go wshremote.RunSysInfoLoop(localConnWsh, wshrpc.LocalConnName)
wshutil.DefaultRouter.RegisterTrustedLeaf(localConnWsh, wshutil.MakeConnectionRouteId(wshrpc.LocalConnName))
wshfs.RpcClient = localConnWsh
wshfs.RpcClientRouteId = wshutil.MakeConnectionRouteId(wshrpc.LocalConnName)
}
func grabAndRemoveEnvVars() error {

View file

@ -183,7 +183,7 @@ func runListener(listener net.Listener, router *wshutil.WshRouter) {
}
}
func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName string) (*wshutil.WshRpc, error) {
func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName string) (*wshutil.WshRpc, string, error) {
routeId := wshutil.MakeConnectionRouteId(connServerConnName)
rpcCtx := wshrpc.RpcContext{
RouteId: routeId,
@ -196,7 +196,7 @@ func setupConnServerRpcClientWithRouter(router *wshutil.WshRouter, sockName stri
connServerClient := wshutil.MakeWshRpc(rpcCtx, wshremote.MakeRemoteRpcServerImpl(os.Stdout, router, bareClient, false, connServerInitialEnv, sockName), routeId)
router.RegisterTrustedLeaf(connServerClient, routeId)
return connServerClient, nil
return connServerClient, routeId, nil
}
func serverRunRouter() error {
@ -236,11 +236,12 @@ func serverRunRouter() error {
sockName := getRemoteDomainSocketName()
// setup the connserver rpc client first
client, err := setupConnServerRpcClientWithRouter(router, sockName)
client, bareRouteId, err := setupConnServerRpcClientWithRouter(router, sockName)
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
wshfs.RpcClient = client
wshfs.RpcClientRouteId = bareRouteId
log.Printf("trying to get JWT public key")
@ -360,11 +361,12 @@ func serverRunRouterDomainSocket(jwtToken string) error {
log.Printf("got JWT public key")
// now setup the connserver rpc client
client, err := setupConnServerRpcClientWithRouter(router, sockName)
client, bareRouteId, err := setupConnServerRpcClientWithRouter(router, sockName)
if err != nil {
return fmt.Errorf("error setting up connserver rpc client: %v", err)
}
wshfs.RpcClient = client
wshfs.RpcClientRouteId = bareRouteId
// set up the local domain socket listener for local wsh commands
unixListener, err := MakeRemoteUnixListener()
@ -402,6 +404,7 @@ func serverRunNormal(jwtToken string) error {
return err
}
wshfs.RpcClient = RpcClient
wshfs.RpcClientRouteId = RpcClientRouteId
WriteStdout("running wsh connserver (%s)\n", RpcContext.Conn)
go func() {
defer func() {

View file

@ -1,4 +1,4 @@
// Copyright 2025, Command Line Inc.
// Copyright 2026, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package cmd
@ -12,10 +12,10 @@ import (
"strings"
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil"
"github.com/wavetermdev/waveterm/pkg/util/fileutil"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"github.com/wavetermdev/waveterm/pkg/wshutil"
)
func convertNotFoundErr(err error) error {
@ -91,8 +91,38 @@ func streamWriteToFile(fileData wshrpc.FileData, reader io.Reader) error {
}
func streamReadFromFile(ctx context.Context, fileData wshrpc.FileData, writer io.Writer) error {
ch := wshclient.FileReadStreamCommand(RpcClient, fileData, &wshrpc.RpcOpts{Timeout: fileTimeout})
return fsutil.ReadFileStreamToWriter(ctx, ch, writer)
broker := RpcClient.StreamBroker
if broker == nil {
return fmt.Errorf("stream broker not available")
}
if fileData.Info == nil {
return fmt.Errorf("file info is required")
}
readerRouteId := RpcClientRouteId
if readerRouteId == "" {
return fmt.Errorf("no route id available")
}
conn, err := connparse.ParseURI(fileData.Info.Path)
if err != nil {
return fmt.Errorf("parsing file path: %w", err)
}
writerRouteId := wshutil.MakeConnectionRouteId(conn.Host)
reader, streamMeta := broker.CreateStreamReader(readerRouteId, writerRouteId, 256*1024)
defer reader.Close()
go func() {
<-ctx.Done()
reader.Close()
}()
data := wshrpc.CommandFileStreamData{
Info: fileData.Info,
StreamMeta: *streamMeta,
}
_, err = wshclient.FileStreamCommand(RpcClient, data, nil)
if err != nil {
return fmt.Errorf("starting file stream: %w", err)
}
_, err = io.Copy(writer, reader)
return err
}
func fixRelativePaths(path string) (string, error) {

View file

@ -172,11 +172,6 @@ func fileCatRun(cmd *cobra.Command, args []string) error {
return err
}
_, err = checkFileSize(path, MaxFileSize)
if err != nil {
return err
}
fileData := wshrpc.FileData{
Info: &wshrpc.FileInfo{
Path: path}}

View file

@ -31,6 +31,7 @@ var WrappedStdout io.Writer = &WrappedWriter{dest: os.Stdout}
var WrappedStderr io.Writer = &WrappedWriter{dest: os.Stderr}
var RpcClient *wshutil.WshRpc
var RpcContext wshrpc.RpcContext
var RpcClientRouteId string
var UsingTermWshMode bool
var blockArg string
var WshExitCode int
@ -140,7 +141,12 @@ func setupRpcClientWithToken(swapTokenStr string) (wshrpc.CommandAuthenticateRtn
if err != nil {
return rtn, fmt.Errorf("error setting up domain socket rpc client: %w", err)
}
return wshclient.AuthenticateTokenCommand(RpcClient, wshrpc.CommandAuthenticateTokenData{Token: token.Token}, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
rtn, err = wshclient.AuthenticateTokenCommand(RpcClient, wshrpc.CommandAuthenticateTokenData{Token: token.Token}, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
if err != nil {
return rtn, err
}
RpcClientRouteId = rtn.RouteId
return rtn, nil
}
// returns the wrapped stdin and a new rpc client (that wraps the stdin input and stdout output)
@ -158,10 +164,11 @@ func setupRpcClient(serverImpl wshutil.ServerImpl, jwtToken string) error {
if err != nil {
return fmt.Errorf("error setting up domain socket rpc client: %v", err)
}
_, err = wshclient.AuthenticateCommand(RpcClient, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
authRtn, err := wshclient.AuthenticateCommand(RpcClient, jwtToken, &wshrpc.RpcOpts{Route: wshutil.ControlRoute})
if err != nil {
return fmt.Errorf("error authenticating: %v", err)
}
RpcClientRouteId = authRtn.RouteId
blockId := os.Getenv("WAVETERM_BLOCKID")
if blockId != "" {
peerInfo := fmt.Sprintf("domain:block:%s", blockId)

View file

@ -366,12 +366,6 @@ export class RpcApiType {
return client.wshRpcCall("fileread", data, opts);
}
// command "filereadstream" [responsestream]
FileReadStreamCommand(client: WshClient, data: FileData, opts?: RpcOpts): AsyncGenerator<FileData, void, boolean> {
if (this.mockClient) return this.mockClient.mockWshRpcStream(client, "filereadstream", data, opts);
return client.wshRpcStream("filereadstream", data, opts);
}
// command "filerestorebackup" [call]
FileRestoreBackupCommand(client: WshClient, data: CommandFileRestoreBackupData, opts?: RpcOpts): Promise<void> {
if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "filerestorebackup", data, opts);
@ -780,12 +774,6 @@ export class RpcApiType {
return client.wshRpcStream("remotestreamcpudata", null, opts);
}
// command "remotestreamfile" [responsestream]
RemoteStreamFileCommand(client: WshClient, data: CommandRemoteStreamFileData, opts?: RpcOpts): AsyncGenerator<FileData, void, boolean> {
if (this.mockClient) return this.mockClient.mockWshRpcStream(client, "remotestreamfile", data, opts);
return client.wshRpcStream("remotestreamfile", data, opts);
}
// command "remoteterminatejobmanager" [call]
RemoteTerminateJobManagerCommand(client: WshClient, data: CommandRemoteTerminateJobManagerData, opts?: RpcOpts): Promise<void> {
if (this.mockClient) return this.mockClient.mockWshRpcCall(client, "remoteterminatejobmanager", data, opts);

View file

@ -2,9 +2,9 @@
// SPDX-License-Identifier: Apache-2.0
import { ContextMenuModel } from "@/app/store/contextmenu";
import { useWaveEnv } from "@/app/waveenv/waveenv";
import { globalStore } from "@/app/store/jotaiStore";
import { TabRpcClient } from "@/app/store/wshrpcutil";
import { useWaveEnv } from "@/app/waveenv/waveenv";
import { checkKeyPressed, isCharacterKeyEvent } from "@/util/keyutil";
import { PLATFORM, PlatformMacOS } from "@/util/platformutil";
import { addOpenMenuItems } from "@/util/previewutil";
@ -112,7 +112,6 @@ function DirectoryTable({
newDirectory,
}: DirectoryTableProps) {
const env = useWaveEnv<PreviewEnv>();
const searchActive = useAtomValue(model.directorySearchActive);
const fullConfig = useAtomValue(env.atoms.fullConfigAtom);
const defaultSort = useAtomValue(env.getSettingsKeyAtom("preview:defaultsort")) ?? "name";
const setErrorMsg = useSetAtom(model.errorMsgAtom);
@ -587,28 +586,26 @@ function DirectoryPreview({ model }: DirectoryPreviewProps) {
useEffect(
() =>
fireAndForget(async () => {
let entries: FileInfo[];
const entries: FileInfo[] = [];
try {
const file = await env.rpc.FileReadCommand(
TabRpcClient,
{
info: {
path: await model.formatRemoteUri(dirPath, globalStore.get),
},
},
null
);
entries = file.entries ?? [];
if (file?.info && file.info.dir && file.info?.path !== file.info?.dir) {
const remotePath = await model.formatRemoteUri(dirPath, globalStore.get);
const stream = env.rpc.FileListStreamCommand(TabRpcClient, { path: remotePath }, null);
for await (const chunk of stream) {
if (chunk?.fileinfo) {
entries.push(...chunk.fileinfo);
}
}
if (finfo?.dir && finfo?.path !== finfo?.dir) {
entries.unshift({
name: "..",
path: file?.info?.dir,
path: finfo.dir,
isdir: true,
modtime: new Date().getTime(),
mimetype: "directory",
});
}
} catch (e) {
console.error("Directory Read Error", e);
setErrorMsg({
status: "Cannot Read Directory",
text: `${e}`,

View file

@ -11,6 +11,7 @@ export type PreviewEnv = WaveEnvSubset<{
ConnEnsureCommand: WaveEnv["rpc"]["ConnEnsureCommand"];
FileInfoCommand: WaveEnv["rpc"]["FileInfoCommand"];
FileReadCommand: WaveEnv["rpc"]["FileReadCommand"];
FileListStreamCommand: WaveEnv["rpc"]["FileListStreamCommand"];
FileWriteCommand: WaveEnv["rpc"]["FileWriteCommand"];
FileMoveCommand: WaveEnv["rpc"]["FileMoveCommand"];
FileDeleteCommand: WaveEnv["rpc"]["FileDeleteCommand"];

View file

@ -8,26 +8,23 @@ const MockDirMimeType = "directory";
const MockDirMode = 0o040755;
const MockFileMode = 0o100644;
const MockDirectoryChunkSize = 128;
const MockFileChunkSize = 64 * 1024;
const MockBaseModTime = Date.parse("2026-03-10T09:00:00.000Z");
const TinyPngBytes = Uint8Array.from([
0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, 0x52,
0x00, 0x00, 0x00, 0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x04, 0x00, 0x00, 0x00, 0xb5, 0x1c, 0x0c,
0x02, 0x00, 0x00, 0x00, 0x0b, 0x49, 0x44, 0x41, 0x54, 0x78, 0xda, 0x63, 0xfc, 0xff, 0x1f, 0x00,
0x03, 0x03, 0x01, 0xff, 0xa5, 0xf8, 0x8f, 0xb1, 0x00, 0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, 0x44,
0xae, 0x42, 0x60, 0x82,
0x89, 0x50, 0x4e, 0x47, 0x0d, 0x0a, 0x1a, 0x0a, 0x00, 0x00, 0x00, 0x0d, 0x49, 0x48, 0x44, 0x52, 0x00, 0x00, 0x00,
0x01, 0x00, 0x00, 0x00, 0x01, 0x08, 0x04, 0x00, 0x00, 0x00, 0xb5, 0x1c, 0x0c, 0x02, 0x00, 0x00, 0x00, 0x0b, 0x49,
0x44, 0x41, 0x54, 0x78, 0xda, 0x63, 0xfc, 0xff, 0x1f, 0x00, 0x03, 0x03, 0x01, 0xff, 0xa5, 0xf8, 0x8f, 0xb1, 0x00,
0x00, 0x00, 0x00, 0x49, 0x45, 0x4e, 0x44, 0xae, 0x42, 0x60, 0x82,
]);
const TinyJpegBytes = Uint8Array.from([
0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10, 0x4a, 0x46, 0x49, 0x46, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01,
0x00, 0x01, 0x00, 0x00, 0xff, 0xdb, 0x00, 0x43, 0x00, 0x03, 0x02, 0x02, 0x03, 0x02, 0x02, 0x03,
0x03, 0x03, 0x03, 0x04, 0x03, 0x03, 0x04, 0x05, 0x08, 0x05, 0x05, 0x04, 0x04, 0x05, 0x0a, 0x07,
0x07, 0x06, 0x08, 0x0c, 0x0a, 0x0c, 0x0c, 0x0b, 0x0a, 0x0b, 0x0b, 0x0d, 0x0e, 0x12, 0x10, 0x0d,
0x0e, 0x11, 0x0e, 0x0b, 0x0b, 0x10, 0x16, 0x10, 0x11, 0x13, 0x14, 0x15, 0x15, 0x15, 0x0c, 0x0f,
0x17, 0x18, 0x16, 0x14, 0x18, 0x12, 0x14, 0x15, 0x14, 0xff, 0xc0, 0x00, 0x0b, 0x08, 0x00, 0x01,
0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0xff, 0xc4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xff, 0xc4, 0x00, 0x14,
0x10, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0xff, 0xda, 0x00, 0x08, 0x01, 0x01, 0x00, 0x00, 0x3f, 0x00, 0xbf, 0xff, 0xd9,
0xff, 0xd8, 0xff, 0xe0, 0x00, 0x10, 0x4a, 0x46, 0x49, 0x46, 0x00, 0x01, 0x01, 0x00, 0x00, 0x01, 0x00, 0x01, 0x00,
0x00, 0xff, 0xdb, 0x00, 0x43, 0x00, 0x03, 0x02, 0x02, 0x03, 0x02, 0x02, 0x03, 0x03, 0x03, 0x03, 0x04, 0x03, 0x03,
0x04, 0x05, 0x08, 0x05, 0x05, 0x04, 0x04, 0x05, 0x0a, 0x07, 0x07, 0x06, 0x08, 0x0c, 0x0a, 0x0c, 0x0c, 0x0b, 0x0a,
0x0b, 0x0b, 0x0d, 0x0e, 0x12, 0x10, 0x0d, 0x0e, 0x11, 0x0e, 0x0b, 0x0b, 0x10, 0x16, 0x10, 0x11, 0x13, 0x14, 0x15,
0x15, 0x15, 0x0c, 0x0f, 0x17, 0x18, 0x16, 0x14, 0x18, 0x12, 0x14, 0x15, 0x14, 0xff, 0xc0, 0x00, 0x0b, 0x08, 0x00,
0x01, 0x00, 0x01, 0x01, 0x01, 0x11, 0x00, 0xff, 0xc4, 0x00, 0x14, 0x00, 0x01, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x08, 0xff, 0xc4, 0x00, 0x14, 0x10, 0x01, 0x00, 0x00, 0x00,
0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0x00, 0xff, 0xda, 0x00, 0x08, 0x01, 0x01,
0x00, 0x00, 0x3f, 0x00, 0xbf, 0xff, 0xd9,
]);
type MockFsEntry = {
@ -61,7 +58,6 @@ export type MockFilesystem = {
fileRead: (data: FileData) => Promise<FileData>;
fileList: (data: FileListData) => Promise<FileInfo[]>;
fileJoin: (paths: string[]) => Promise<FileInfo>;
fileReadStream: (data: FileData) => AsyncGenerator<FileData, void, boolean>;
fileListStream: (data: FileListData) => AsyncGenerator<CommandRemoteListEntriesRtnData, void, boolean>;
};
@ -492,33 +488,9 @@ export function makeMockFilesystem(): MockFilesystem {
}
return toFileInfo(entry);
};
const fileReadStream = async function* (data: FileData): AsyncGenerator<FileData, void, boolean> {
const info = await fileInfo(data);
yield { info };
if (info.notfound) {
return;
}
const entry = getEntry(info.path);
if (entry.isdir) {
const dirEntries = (childrenByDir.get(entry.path) ?? []).map((child) => toFileInfo(child));
for (let idx = 0; idx < dirEntries.length; idx += MockDirectoryChunkSize) {
yield { entries: dirEntries.slice(idx, idx + MockDirectoryChunkSize) };
}
return;
}
if (entry.content == null || entry.content.byteLength === 0) {
return;
}
const { offset, end } = getReadRange(data, entry.content.byteLength);
for (let currentOffset = offset; currentOffset < end; currentOffset += MockFileChunkSize) {
const chunkEnd = Math.min(currentOffset + MockFileChunkSize, end);
yield {
data64: arrayToBase64(entry.content.slice(currentOffset, chunkEnd)),
at: { offset: currentOffset, size: chunkEnd - currentOffset },
};
}
};
const fileListStream = async function* (data: FileListData): AsyncGenerator<CommandRemoteListEntriesRtnData, void, boolean> {
const fileListStream = async function* (
data: FileListData
): AsyncGenerator<CommandRemoteListEntriesRtnData, void, boolean> {
const fileInfos = await fileList(data);
for (let idx = 0; idx < fileInfos.length; idx += MockDirectoryChunkSize) {
yield { fileinfo: fileInfos.slice(idx, idx + MockDirectoryChunkSize) };
@ -535,7 +507,6 @@ export function makeMockFilesystem(): MockFilesystem {
fileRead,
fileList,
fileJoin,
fileReadStream,
fileListStream,
};
}

View file

@ -1,4 +1,4 @@
import { base64ToArray, base64ToString } from "@/util/util";
import { base64ToString } from "@/util/util";
import { describe, expect, it, vi } from "vitest";
import { DefaultMockFilesystem } from "./mockfilesystem";
@ -82,26 +82,19 @@ describe("makeMockWaveEnv", () => {
}
expect(listPackets).toHaveLength(1);
expect(listPackets[0].fileinfo).toHaveLength(4);
const readPackets: FileData[] = [];
for await (const packet of env.rpc.FileReadStreamCommand(null as any, {
info: { path: "/Users/mike/Pictures/beach-sunrise.png" },
})) {
readPackets.push(packet);
}
expect(readPackets[0].info?.path).toBe("/Users/mike/Pictures/beach-sunrise.png");
const imageBytes = base64ToArray(readPackets[1].data64);
expect(Array.from(imageBytes.slice(0, 4))).toEqual([0x89, 0x50, 0x4e, 0x47]);
});
it("implements secrets commands with in-memory storage", async () => {
const { makeMockWaveEnv } = await import("./mockwaveenv");
const env = makeMockWaveEnv({ platform: "linux" });
await env.rpc.SetSecretsCommand(null as any, {
OPENAI_API_KEY: "sk-test",
ANTHROPIC_API_KEY: "anthropic-test",
} as any);
await env.rpc.SetSecretsCommand(
null as any,
{
OPENAI_API_KEY: "sk-test",
ANTHROPIC_API_KEY: "anthropic-test",
} as any
);
expect(await env.rpc.GetSecretsLinuxStorageBackendCommand(null as any)).toBe("libsecret");
expect(await env.rpc.GetSecretsNamesCommand(null as any)).toEqual(["ANTHROPIC_API_KEY", "OPENAI_API_KEY"]);

View file

@ -311,9 +311,6 @@ export function makeMockRpc(
setCallHandler("fileread", async (_client, data: FileData) => DefaultMockFilesystem.fileRead(data));
setCallHandler("filelist", async (_client, data: FileListData) => DefaultMockFilesystem.fileList(data));
setCallHandler("filejoin", async (_client, data: string[]) => DefaultMockFilesystem.fileJoin(data));
setStreamHandler("filereadstream", async function* (_client, data: FileData) {
yield* DefaultMockFilesystem.fileReadStream(data);
});
setStreamHandler("fileliststream", async function* (_client, data: FileListData) {
yield* DefaultMockFilesystem.fileListStream(data);
});

View file

@ -587,12 +587,6 @@ declare global {
publickeybase64: string;
};
// wshrpc.CommandRemoteStreamFileData
type CommandRemoteStreamFileData = {
path: string;
byterange?: string;
};
// wshrpc.CommandRemoteTerminateJobManagerData
type CommandRemoteTerminateJobManagerData = {
jobid: string;

View file

@ -150,13 +150,3 @@ func ReadStreamToFileData(ctx context.Context, readCh <-chan wshrpc.RespOrErrorU
}
return fileData, nil
}
func ReadFileStreamToWriter(ctx context.Context, readCh <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData], writer io.Writer) error {
return ReadFileStream(ctx, readCh, func(finfo wshrpc.FileInfo) {
}, func(entries []*wshrpc.FileInfo) error {
return nil
}, func(data io.Reader) error {
_, err := io.Copy(writer, data)
return err
})
}

View file

@ -1,4 +1,4 @@
// Copyright 2025, Command Line Inc.
// Copyright 2026, Command Line Inc.
// SPDX-License-Identifier: Apache-2.0
package wshfs
@ -7,12 +7,12 @@ import (
"context"
"encoding/base64"
"fmt"
"io"
"log"
"os"
"time"
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"github.com/wavetermdev/waveterm/pkg/wshutil"
@ -30,6 +30,7 @@ const (
// This needs to be set by whoever initializes the client, either main-server or wshcmd-connserver
var RpcClient *wshutil.WshRpc
var RpcClientRouteId string
func parseConnection(ctx context.Context, path string) (*connparse.Connection, error) {
conn, err := connparse.ParseURIAndReplaceCurrentHost(ctx, path)
@ -40,31 +41,54 @@ func parseConnection(ctx context.Context, path string) (*connparse.Connection, e
}
func Read(ctx context.Context, data wshrpc.FileData) (*wshrpc.FileData, error) {
if data.Info == nil {
return nil, fmt.Errorf("file info is required")
}
log.Printf("Read: %v", data.Info.Path)
conn, err := parseConnection(ctx, data.Info.Path)
if err != nil {
return nil, err
}
rtnCh := readStream(conn, data)
return fsutil.ReadStreamToFileData(ctx, rtnCh)
}
func ReadStream(ctx context.Context, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
log.Printf("ReadStream: %v", data.Info.Path)
conn, err := parseConnection(ctx, data.Info.Path)
if err != nil {
return wshutil.SendErrCh[wshrpc.FileData](err)
broker := RpcClient.StreamBroker
if broker == nil {
return nil, fmt.Errorf("stream broker not available")
}
return readStream(conn, data)
}
func readStream(conn *connparse.Connection, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
if RpcClientRouteId == "" {
return nil, fmt.Errorf("no route id available")
}
readerRouteId := RpcClientRouteId
writerRouteId := wshutil.MakeConnectionRouteId(conn.Host)
reader, streamMeta := broker.CreateStreamReader(readerRouteId, writerRouteId, 256*1024)
defer reader.Close()
go func() {
<-ctx.Done()
reader.Close()
}()
byteRange := ""
if data.At != nil && data.At.Size > 0 {
byteRange = fmt.Sprintf("%d-%d", data.At.Offset, data.At.Offset+int64(data.At.Size)-1)
}
streamFileData := wshrpc.CommandRemoteStreamFileData{Path: conn.Path, ByteRange: byteRange}
return wshclient.RemoteStreamFileCommand(RpcClient, streamFileData, &wshrpc.RpcOpts{Route: wshutil.MakeConnectionRouteId(conn.Host)})
remoteData := wshrpc.CommandRemoteFileStreamData{
Path: conn.Path,
ByteRange: byteRange,
StreamMeta: *streamMeta,
}
fileInfo, err := wshclient.RemoteFileStreamCommand(RpcClient, remoteData, &wshrpc.RpcOpts{Route: writerRouteId})
if err != nil {
return nil, fmt.Errorf("starting remote file stream: %w", err)
}
var rawData []byte
if fileInfo != nil && !fileInfo.IsDir {
rawData, err = io.ReadAll(reader)
if err != nil {
return nil, fmt.Errorf("reading file stream: %w", err)
}
}
rtnData := &wshrpc.FileData{Info: fileInfo}
if len(rawData) > 0 {
rtnData.Data64 = base64.StdEncoding.EncodeToString(rawData)
}
return rtnData, nil
}
func GetConnectionRouteId(ctx context.Context, path string) (string, error) {

View file

@ -364,11 +364,6 @@ func FileReadCommand(w *wshutil.WshRpc, data wshrpc.FileData, opts *wshrpc.RpcOp
return resp, err
}
// command "filereadstream", wshserver.FileReadStreamCommand
func FileReadStreamCommand(w *wshutil.WshRpc, data wshrpc.FileData, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
return sendRpcRequestResponseStreamHelper[wshrpc.FileData](w, "filereadstream", data, opts)
}
// command "filerestorebackup", wshserver.FileRestoreBackupCommand
func FileRestoreBackupCommand(w *wshutil.WshRpc, data wshrpc.CommandFileRestoreBackupData, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "filerestorebackup", data, opts)
@ -775,11 +770,6 @@ func RemoteStreamCpuDataCommand(w *wshutil.WshRpc, opts *wshrpc.RpcOpts) chan ws
return sendRpcRequestResponseStreamHelper[wshrpc.TimeSeriesData](w, "remotestreamcpudata", nil, opts)
}
// command "remotestreamfile", wshserver.RemoteStreamFileCommand
func RemoteStreamFileCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteStreamFileData, opts *wshrpc.RpcOpts) chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
return sendRpcRequestResponseStreamHelper[wshrpc.FileData](w, "remotestreamfile", data, opts)
}
// command "remoteterminatejobmanager", wshserver.RemoteTerminateJobManagerCommand
func RemoteTerminateJobManagerCommand(w *wshutil.WshRpc, data wshrpc.CommandRemoteTerminateJobManagerData, opts *wshrpc.RpcOpts) error {
_, err := sendRpcRequestCallHelper[any](w, "remoteterminatejobmanager", data, opts)

View file

@ -18,7 +18,6 @@ import (
"github.com/wavetermdev/waveterm/pkg/panichandler"
"github.com/wavetermdev/waveterm/pkg/remote/connparse"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/fsutil"
"github.com/wavetermdev/waveterm/pkg/remote/fileshare/wshfs"
"github.com/wavetermdev/waveterm/pkg/util/fileutil"
"github.com/wavetermdev/waveterm/pkg/util/utilfn"
@ -32,153 +31,6 @@ const RemoteFileTransferSizeLimit = 32 * 1024 * 1024
var DisableRecursiveFileOpts = true
func (impl *ServerImpl) remoteStreamFileDir(ctx context.Context, path string, byteRange fileutil.ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error {
innerFilesEntries, err := os.ReadDir(path)
if err != nil {
return fmt.Errorf("cannot open dir %q: %w", path, err)
}
if byteRange.All {
if len(innerFilesEntries) > wshrpc.MaxDirSize {
innerFilesEntries = innerFilesEntries[:wshrpc.MaxDirSize]
}
} else {
if byteRange.Start < int64(len(innerFilesEntries)) {
var realEnd int64
if byteRange.OpenEnd {
realEnd = int64(len(innerFilesEntries))
} else {
realEnd = byteRange.End + 1
if realEnd > int64(len(innerFilesEntries)) {
realEnd = int64(len(innerFilesEntries))
}
}
innerFilesEntries = innerFilesEntries[byteRange.Start:realEnd]
} else {
innerFilesEntries = []os.DirEntry{}
}
}
var fileInfoArr []*wshrpc.FileInfo
for _, innerFileEntry := range innerFilesEntries {
if ctx.Err() != nil {
return ctx.Err()
}
innerFileInfoInt, err := innerFileEntry.Info()
if err != nil {
continue
}
innerFileInfo := statToFileInfo(filepath.Join(path, innerFileInfoInt.Name()), innerFileInfoInt, false)
fileInfoArr = append(fileInfoArr, innerFileInfo)
if len(fileInfoArr) >= wshrpc.DirChunkSize {
dataCallback(fileInfoArr, nil, byteRange)
fileInfoArr = nil
}
}
if len(fileInfoArr) > 0 {
dataCallback(fileInfoArr, nil, byteRange)
}
return nil
}
func (impl *ServerImpl) remoteStreamFileRegular(ctx context.Context, path string, byteRange fileutil.ByteRangeType, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error {
fd, err := os.Open(path)
if err != nil {
return fmt.Errorf("cannot open file %q: %w", path, err)
}
defer utilfn.GracefulClose(fd, "remoteStreamFileRegular", path)
var filePos int64
if !byteRange.All && byteRange.Start > 0 {
_, err := fd.Seek(byteRange.Start, io.SeekStart)
if err != nil {
return fmt.Errorf("seeking file %q: %w", path, err)
}
filePos = byteRange.Start
}
buf := make([]byte, wshrpc.FileChunkSize)
for {
if ctx.Err() != nil {
return ctx.Err()
}
n, err := fd.Read(buf)
if n > 0 {
if !byteRange.All && !byteRange.OpenEnd && filePos+int64(n) > byteRange.End+1 {
n = int(byteRange.End + 1 - filePos)
}
filePos += int64(n)
dataCallback(nil, buf[:n], byteRange)
}
if !byteRange.All && !byteRange.OpenEnd && filePos >= byteRange.End+1 {
break
}
if errors.Is(err, io.EOF) {
break
}
if err != nil {
return fmt.Errorf("reading file %q: %w", path, err)
}
}
return nil
}
func (impl *ServerImpl) remoteStreamFileInternal(ctx context.Context, data wshrpc.CommandRemoteStreamFileData, dataCallback func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType)) error {
byteRange, err := fileutil.ParseByteRange(data.ByteRange)
if err != nil {
return err
}
path, err := wavebase.ExpandHomeDir(data.Path)
if err != nil {
return err
}
finfo, err := impl.fileInfoInternal(path, true)
if err != nil {
return fmt.Errorf("cannot stat file %q: %w", path, err)
}
dataCallback([]*wshrpc.FileInfo{finfo}, nil, byteRange)
if finfo.NotFound {
return nil
}
if finfo.IsDir {
return impl.remoteStreamFileDir(ctx, path, byteRange, dataCallback)
} else {
if finfo.Size > RemoteFileTransferSizeLimit {
return fmt.Errorf("file %q size %d exceeds transfer limit of %d bytes", path, finfo.Size, RemoteFileTransferSizeLimit)
}
return impl.remoteStreamFileRegular(ctx, path, byteRange, dataCallback)
}
}
func (impl *ServerImpl) RemoteStreamFileCommand(ctx context.Context, data wshrpc.CommandRemoteStreamFileData) chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
ch := make(chan wshrpc.RespOrErrorUnion[wshrpc.FileData], 16)
go func() {
defer func() {
panichandler.PanicHandler("RemoteStreamFileCommand", recover())
}()
defer close(ch)
firstPk := true
err := impl.remoteStreamFileInternal(ctx, data, func(fileInfo []*wshrpc.FileInfo, data []byte, byteRange fileutil.ByteRangeType) {
resp := wshrpc.FileData{}
fileInfoLen := len(fileInfo)
if fileInfoLen > 1 || !firstPk {
resp.Entries = fileInfo
} else if fileInfoLen == 1 {
resp.Info = fileInfo[0]
}
if firstPk {
firstPk = false
}
if len(data) > 0 {
resp.Data64 = base64.StdEncoding.EncodeToString(data)
resp.At = &wshrpc.FileDataAt{Offset: byteRange.Start, Size: len(data)}
}
ch <- wshrpc.RespOrErrorUnion[wshrpc.FileData]{Response: resp}
})
if err != nil {
ch <- wshutil.RespErr[wshrpc.FileData](err)
}
}()
return ch
}
// prepareDestForCopy resolves the final destination path and handles overwrite logic.
// destPath is the raw destination path (may be a directory or file path).
// srcBaseName is the basename of the source file (used when dest is a directory or ends with slash).
@ -315,8 +167,25 @@ func (impl *ServerImpl) RemoteFileCopyCommand(ctx context.Context, data wshrpc.C
}
defer destFile.Close()
streamChan := wshclient.RemoteStreamFileCommand(wshfs.RpcClient, wshrpc.CommandRemoteStreamFileData{Path: srcConn.Path}, &wshrpc.RpcOpts{Timeout: opts.Timeout, Route: wshutil.MakeConnectionRouteId(srcConn.Host)})
if err = fsutil.ReadFileStreamToWriter(readCtx, streamChan, destFile); err != nil {
if wshfs.RpcClientRouteId == "" {
return false, fmt.Errorf("stream broker route id not available for file copy")
}
writerRouteId := wshutil.MakeConnectionRouteId(srcConn.Host)
reader, streamMeta := wshfs.RpcClient.StreamBroker.CreateStreamReader(wshfs.RpcClientRouteId, writerRouteId, 256*1024)
log.Printf("RemoteFileCopyCommand: readroute=%s writeroute=%s", streamMeta.ReaderRouteId, streamMeta.WriterRouteId)
defer reader.Close()
go func() {
<-readCtx.Done()
reader.Close()
}()
streamData := wshrpc.CommandRemoteFileStreamData{
Path: srcConn.Path,
StreamMeta: *streamMeta,
}
if _, err = wshclient.RemoteFileStreamCommand(wshfs.RpcClient, streamData, &wshrpc.RpcOpts{Route: writerRouteId}); err != nil {
return false, fmt.Errorf("error starting file stream for %q: %w", data.SrcUri, err)
}
if _, err = io.Copy(destFile, reader); err != nil {
return false, fmt.Errorf("error copying file %q to %q: %w", data.SrcUri, data.DestUri, err)
}
@ -342,6 +211,9 @@ func (impl *ServerImpl) RemoteListEntriesCommand(ctx context.Context, data wshrp
ch <- wshutil.RespErr[wshrpc.CommandRemoteListEntriesRtnData](err)
return
}
if data.Opts == nil {
data.Opts = &wshrpc.FileListOpts{}
}
innerFilesEntries := []os.DirEntry{}
seen := 0
if data.Opts.Limit == 0 {

View file

@ -18,24 +18,17 @@ type WshRpcFileInterface interface {
FileAppendCommand(ctx context.Context, data FileData) error
FileWriteCommand(ctx context.Context, data FileData) error
FileReadCommand(ctx context.Context, data FileData) (*FileData, error)
FileReadStreamCommand(ctx context.Context, data FileData) <-chan RespOrErrorUnion[FileData]
FileMoveCommand(ctx context.Context, data CommandFileCopyData) error
FileCopyCommand(ctx context.Context, data CommandFileCopyData) error
FileInfoCommand(ctx context.Context, data FileData) (*FileInfo, error)
FileListCommand(ctx context.Context, data FileListData) ([]*FileInfo, error)
FileJoinCommand(ctx context.Context, paths []string) (*FileInfo, error)
FileListStreamCommand(ctx context.Context, data FileListData) <-chan RespOrErrorUnion[CommandRemoteListEntriesRtnData]
// modern streaming interface
FileStreamCommand(ctx context.Context, data CommandFileStreamData) (*FileInfo, error)
}
type WshRpcRemoteFileInterface interface {
// old streaming inferface
RemoteStreamFileCommand(ctx context.Context, data CommandRemoteStreamFileData) chan RespOrErrorUnion[FileData]
// modern streaming interface
RemoteFileStreamCommand(ctx context.Context, data CommandRemoteFileStreamData) (*FileInfo, error)
RemoteFileCopyCommand(ctx context.Context, data CommandFileCopyData) (bool, error)
RemoteListEntriesCommand(ctx context.Context, data CommandRemoteListEntriesData) chan RespOrErrorUnion[CommandRemoteListEntriesRtnData]
RemoteFileInfoCommand(ctx context.Context, path string) (*FileInfo, error)

View file

@ -388,10 +388,6 @@ func (ws *WshServer) FileReadCommand(ctx context.Context, data wshrpc.FileData)
return wshfs.Read(ctx, data)
}
func (ws *WshServer) FileReadStreamCommand(ctx context.Context, data wshrpc.FileData) <-chan wshrpc.RespOrErrorUnion[wshrpc.FileData] {
return wshfs.ReadStream(ctx, data)
}
func (ws *WshServer) FileStreamCommand(ctx context.Context, data wshrpc.CommandFileStreamData) (*wshrpc.FileInfo, error) {
return wshfs.FileStream(ctx, data)
}