diff --git a/cmd/wsh/main-wsh.go b/cmd/wsh/main-wsh.go index bcfe879d8..927af8c97 100644 --- a/cmd/wsh/main-wsh.go +++ b/cmd/wsh/main-wsh.go @@ -8,8 +8,6 @@ import ( "os" ) -const WaveOSC = "23198" - func main() { barr, err := os.ReadFile("/Users/mike/Downloads/2.png") if err != nil { diff --git a/frontend/app/block/block.tsx b/frontend/app/block/block.tsx index 9dd06fbd0..a3cc83ae0 100644 --- a/frontend/app/block/block.tsx +++ b/frontend/app/block/block.tsx @@ -1,9 +1,11 @@ // Copyright 2024, Command Line Inc. // SPDX-License-Identifier: Apache-2.0 +import { CodeEditView } from "@/app/view/codeedit"; import { PlotView } from "@/app/view/plotview"; import { PreviewView } from "@/app/view/preview"; import { TerminalView } from "@/app/view/term"; +import { ErrorBoundary } from "@/element/errorboundary"; import { CenteredDiv } from "@/element/quickelems"; import * as WOS from "@/store/wos"; import * as React from "react"; @@ -32,6 +34,7 @@ const Block = ({ tabId, blockId }: { tabId: string; blockId: string }) => { let blockElem: JSX.Element = null; const [blockData, blockDataLoading] = WOS.useWaveObjectValue(WOS.makeORef("block", blockId)); + console.log("blockData: ", blockData); if (blockDataLoading) { blockElem = Loading...; } else if (blockData.view === "term") { @@ -40,6 +43,8 @@ const Block = ({ tabId, blockId }: { tabId: string; blockId: string }) => { blockElem = ; } else if (blockData.view === "plot") { blockElem = ; + } else if (blockData.view === "codeedit") { + blockElem = ; } return (
@@ -53,7 +58,9 @@ const Block = ({ tabId, blockId }: { tabId: string; blockId: string }) => {
- Loading...}>{blockElem} + + Loading...}>{blockElem} +
); diff --git a/frontend/app/element/errorboundary.tsx b/frontend/app/element/errorboundary.tsx new file mode 100644 index 000000000..617306a41 --- /dev/null +++ b/frontend/app/element/errorboundary.tsx @@ -0,0 +1,25 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +import React, { ReactNode } from "react"; + +export class ErrorBoundary extends React.Component<{ children: ReactNode }, { error: Error }> { + constructor(props) { + super(props); + this.state = { error: null }; + } + + componentDidCatch(error: Error, errorInfo: React.ErrorInfo) { + this.setState({ error: error }); + } + + render() { + const { error } = this.state; + if (error) { + const errorMsg = `Error: ${error?.message}\n\n${error?.stack}`; + return
{errorMsg}
; + } else { + return <>{this.props.children}; + } + } +} diff --git a/frontend/app/view/codeedit.less b/frontend/app/view/codeedit.less new file mode 100644 index 000000000..fe8f87ba1 --- /dev/null +++ b/frontend/app/view/codeedit.less @@ -0,0 +1,9 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +.codeedit { + display: flex; + flex-direction: column; + height: 100%; + width: 100%; +} diff --git a/frontend/app/view/codeedit.tsx b/frontend/app/view/codeedit.tsx new file mode 100644 index 000000000..010a195c9 --- /dev/null +++ b/frontend/app/view/codeedit.tsx @@ -0,0 +1,112 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +import "./codeedit.less"; + +import { globalStore } from "@/store/global"; +import loader from "@monaco-editor/loader"; +import { Editor, Monaco } from "@monaco-editor/react"; +import * as jotai from "jotai"; +import type * as MonacoTypes from "monaco-editor/esm/vs/editor/editor.api"; +import * as React from "react"; + +// there is a global monaco variable (TODO get the correct TS type) +declare var monaco: Monaco; +let monacoLoadedAtom = jotai.atom(false); + +function loadMonaco() { + loader.config({ paths: { vs: "./monaco" } }); + loader + .init() + .then(() => { + monaco.editor.defineTheme("wave-theme-dark", { + base: "hc-black", + inherit: true, + rules: [], + colors: { + "editor.background": "#000000", + }, + }); + monaco.editor.defineTheme("wave-theme-light", { + base: "hc-light", + inherit: true, + rules: [], + colors: { + "editor.background": "#fefefe", + }, + }); + globalStore.set(monacoLoadedAtom, true); + console.log("monaco loaded", monaco); + }) + .catch((e) => { + console.error("error loading monaco", e); + }); +} + +// TODO: need to update these on theme change (pull from CSS vars) +document.addEventListener("DOMContentLoaded", () => { + setTimeout(loadMonaco, 30); +}); + +function defaultEditorOptions(): MonacoTypes.editor.IEditorOptions { + const opts: MonacoTypes.editor.IEditorOptions = { + scrollBeyondLastLine: false, + fontSize: 12, + fontFamily: "Hack", + }; + return opts; +} + +export function CodeEdit() { + const divRef = React.useRef(null); + const monacoRef = React.useRef(null); + const theme = "wave-theme-dark"; + const [divDims, setDivDims] = React.useState(null); + const monacoLoaded = jotai.useAtomValue(monacoLoadedAtom); + + React.useEffect(() => { + if (!divRef.current) { + return; + } + const height = divRef.current.clientHeight; + const width = divRef.current.clientWidth; + setDivDims({ height, width }); + }, [divRef.current]); + + function handleEditorMount(editor: MonacoTypes.editor.IStandaloneCodeEditor) { + monacoRef.current = editor; + const monacoModel = editor.getModel(); + monaco.editor.setModelLanguage(monacoModel, "text/markdown"); + } + + function handleEditorChange(newText: string, ev: MonacoTypes.editor.IModelContentChangedEvent) { + // TODO + } + + const text = "Hello, world!"; + const editorOpts = defaultEditorOptions(); + + return ( +
+ {divDims != null && monacoLoaded ? ( + + ) : null} +
+ ); +} + +export function CodeEditView() { + return ( +
+ +
+ ); +} diff --git a/frontend/app/view/directorypreview.tsx b/frontend/app/view/directorypreview.tsx index bdeeb757b..464ae20e0 100644 --- a/frontend/app/view/directorypreview.tsx +++ b/frontend/app/view/directorypreview.tsx @@ -4,7 +4,6 @@ import { FileInfo } from "@/bindings/fileservice"; import { Table, createColumnHelper, flexRender, getCoreRowModel, useReactTable } from "@tanstack/react-table"; import * as jotai from "jotai"; -import path from "path"; import React from "react"; import "./directorypreview.less"; @@ -108,7 +107,7 @@ function TableBody({ table, setFileName }: TableBodyProps) { key={cell.id} style={{ width: `calc(var(--col-${cell.column.id}-size) * 1px)` }} > - {path.basename(cell.renderValue())} + {cell.renderValue()} ))} diff --git a/frontend/app/view/view.less b/frontend/app/view/view.less index 3a1fcb2c5..d3e1ddc5f 100644 --- a/frontend/app/view/view.less +++ b/frontend/app/view/view.less @@ -26,6 +26,16 @@ } } +.view-codeedit { + display: flex; + flex-direction: column; + width: 100%; + height: 100%; + overflow: hidden; + align-items: center; + justify-content: center; +} + .view-preview { display: flex; flex-direction: row; diff --git a/frontend/app/workspace/workspace.tsx b/frontend/app/workspace/workspace.tsx index 31dc57f56..325c4e0f6 100644 --- a/frontend/app/workspace/workspace.tsx +++ b/frontend/app/workspace/workspace.tsx @@ -84,6 +84,13 @@ function Widgets() { createBlock(plotDef); } + async function clickEdit() { + const editDef: BlockDef = { + view: "codeedit", + }; + createBlock(editDef); + } + return (
clickTerminal()}> @@ -104,6 +111,9 @@ function Widgets() {
clickPlot()}>
+
clickEdit()}> + +
diff --git a/package.json b/package.json index 6ddd4853f..476668b66 100644 --- a/package.json +++ b/package.json @@ -37,10 +37,13 @@ "typescript": "^5.4.5", "typescript-eslint": "^7.8.0", "vite": "^5.0.0", + "vite-plugin-static-copy": "^1.0.5", "vite-tsconfig-paths": "^4.3.2", "vitest": "^1.6.0" }, "dependencies": { + "@monaco-editor/loader": "^1.4.0", + "@monaco-editor/react": "^4.6.0", "@observablehq/plot": "^0.6.14", "@tanstack/react-table": "^8.17.3", "@xterm/addon-fit": "^0.10.0", @@ -49,7 +52,7 @@ "clsx": "^2.1.1", "immer": "^10.1.1", "jotai": "^2.8.0", - "path": "^0.12.7", + "monaco-editor": "^0.49.0", "react": "^18.3.1", "react-dom": "^18.3.1", "react-markdown": "^9.0.1", diff --git a/pkg/wshrpc/rpc_client.go b/pkg/wshrpc/rpc_client.go new file mode 100644 index 000000000..425fae7c6 --- /dev/null +++ b/pkg/wshrpc/rpc_client.go @@ -0,0 +1,264 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package wshprc + +import ( + "context" + "errors" + "fmt" + "log" + "runtime/debug" + "sync" + "sync/atomic" + "time" + + "github.com/google/uuid" +) + +// there is a single go-routine that reads from RecvCh +type RpcClient struct { + CVar *sync.Cond + NextSeqNum *atomic.Int64 + ReqPacketsInFlight map[int64]string // seqnum -> rpcId + AckList []int64 + RpcReqs map[string]*RpcInfo + SendCh chan *RpcPacket + RecvCh chan *RpcPacket +} + +type RpcInfo struct { + CloseSync *sync.Once + RpcId string + PacketsInFlight map[int64]bool // seqnum -> bool (for clients this is for requests, for servers it is for responses) + PkCh chan *RpcPacket // for clients this is for responses, for servers it is for requests +} + +func MakeRpcClient(sendCh chan *RpcPacket, recvCh chan *RpcPacket) *RpcClient { + if cap(sendCh) < MaxInFlightPackets { + panic(fmt.Errorf("sendCh buffer size must be at least MaxInFlightPackets(%d)", MaxInFlightPackets)) + } + rtn := &RpcClient{ + CVar: sync.NewCond(&sync.Mutex{}), + NextSeqNum: &atomic.Int64{}, + ReqPacketsInFlight: make(map[int64]string), + AckList: nil, + RpcReqs: make(map[string]*RpcInfo), + SendCh: sendCh, + RecvCh: recvCh, + } + go rtn.runRecvLoop() + return rtn +} + +func (c *RpcClient) runRecvLoop() { + defer func() { + if r := recover(); r != nil { + log.Printf("RpcClient.runRecvLoop() panic: %v", r) + debug.PrintStack() + } + }() + for pk := range c.RecvCh { + if pk.RpcType == RpcType_Resp { + c.handleResp(pk) + continue + } + log.Printf("RpcClient.runRecvLoop() bad packet type: %v", pk) + } + log.Printf("RpcClient.runRecvLoop() normal exit") +} + +func (c *RpcClient) getRpcInfo(rpcId string) *RpcInfo { + c.CVar.L.Lock() + defer c.CVar.L.Unlock() + return c.RpcReqs[rpcId] +} + +func (c *RpcClient) handleResp(pk *RpcPacket) { + c.handleAcks(pk.Acks) + if pk.RpcId == "" { + c.ackResp(pk.SeqNum) + log.Printf("RpcClient.handleResp() missing rpcId: %v", pk) + return + } + rpcInfo := c.getRpcInfo(pk.RpcId) + if rpcInfo == nil { + c.ackResp(pk.SeqNum) + log.Printf("RpcClient.handleResp() unknown rpcId: %v", pk) + return + } + select { + case rpcInfo.PkCh <- pk: + default: + log.Printf("RpcClient.handleResp() respCh full, dropping packet") + } + if pk.RespDone { + c.removeReqInfo(pk.RpcId, false) + } +} + +func (c *RpcClient) grabAcks() []int64 { + c.CVar.L.Lock() + defer c.CVar.L.Unlock() + acks := c.AckList + c.AckList = nil + return acks +} + +func (c *RpcClient) ackResp(seqNum int64) { + if seqNum == 0 { + return + } + c.CVar.L.Lock() + defer c.CVar.L.Unlock() + c.AckList = append(c.AckList, seqNum) +} + +func (c *RpcClient) waitForReq(ctx context.Context, req *RpcPacket) (*RpcInfo, error) { + c.CVar.L.Lock() + defer c.CVar.L.Unlock() + // issue with ctx timeout sync -- we need the cvar to be signaled fairly regularly so we can check ctx.Err() + for { + if ctx.Err() != nil { + return nil, ctx.Err() + } + if len(c.RpcReqs) >= MaxOpenRpcs { + c.CVar.Wait() + continue + } + if len(c.ReqPacketsInFlight) >= MaxOpenRpcs { + c.CVar.Wait() + continue + } + if rpcInfo, ok := c.RpcReqs[req.RpcId]; ok { + if len(rpcInfo.PacketsInFlight) >= MaxUnackedPerRpc { + c.CVar.Wait() + continue + } + } + break + } + select { + case c.SendCh <- req: + default: + return nil, errors.New("SendCh Full") + } + c.ReqPacketsInFlight[req.SeqNum] = req.RpcId + rpcInfo := c.RpcReqs[req.RpcId] + if rpcInfo == nil { + rpcInfo = &RpcInfo{ + CloseSync: &sync.Once{}, + RpcId: req.RpcId, + PacketsInFlight: make(map[int64]bool), + PkCh: make(chan *RpcPacket, MaxUnackedPerRpc), + } + rpcInfo.PacketsInFlight[req.SeqNum] = true + c.RpcReqs[req.RpcId] = rpcInfo + } + return rpcInfo, nil +} + +func (c *RpcClient) handleAcks(acks []int64) { + if len(acks) == 0 { + return + } + c.CVar.L.Lock() + defer c.CVar.L.Unlock() + for _, ack := range acks { + rpcId, ok := c.ReqPacketsInFlight[ack] + if !ok { + continue + } + rpcInfo := c.RpcReqs[rpcId] + if rpcInfo != nil { + delete(rpcInfo.PacketsInFlight, ack) + } + delete(c.ReqPacketsInFlight, ack) + } + c.CVar.Broadcast() +} + +func (c *RpcClient) removeReqInfo(rpcId string, clearSend bool) { + c.CVar.L.Lock() + defer c.CVar.L.Unlock() + rpcInfo := c.RpcReqs[rpcId] + delete(c.RpcReqs, rpcId) + if rpcInfo != nil { + if clearSend { + // unblock the recv loop if it happens to be waiting + // because the delete has already happens, it will not be able to send again on the channel + select { + case <-rpcInfo.PkCh: + default: + } + } + rpcInfo.CloseSync.Do(func() { + close(rpcInfo.PkCh) + }) + } +} + +func (c *RpcClient) SimpleReq(ctx context.Context, command string, data any) (any, error) { + rpcId := uuid.New().String() + seqNum := c.NextSeqNum.Add(1) + var timeoutInfo *TimeoutInfo + deadline, ok := ctx.Deadline() + if ok { + timeoutInfo = &TimeoutInfo{Deadline: deadline.UnixMilli()} + } + req := &RpcPacket{ + Command: command, + RpcId: rpcId, + RpcType: RpcType_Req, + SeqNum: seqNum, + ReqDone: true, + Acks: c.grabAcks(), + Timeout: timeoutInfo, + Data: data, + } + rpcInfo, err := c.waitForReq(ctx, req) + if err != nil { + return nil, err + } + defer c.removeReqInfo(rpcId, true) + var rtnPacket *RpcPacket + select { + case <-ctx.Done(): + return nil, ctx.Err() + case rtnPacket = <-rpcInfo.PkCh: + // fallthrough + } + if rtnPacket.Error != "" { + return nil, errors.New(rtnPacket.Error) + } + return rtnPacket.Data, nil +} + +func (c *RpcClient) StreamReq(ctx context.Context, command string, data any, respTimeout time.Duration) (chan *RpcPacket, error) { + rpcId := uuid.New().String() + seqNum := c.NextSeqNum.Add(1) + var timeoutInfo *TimeoutInfo = &TimeoutInfo{RespPacketTimeout: respTimeout.Milliseconds()} + deadline, ok := ctx.Deadline() + if ok { + timeoutInfo.Deadline = deadline.UnixMilli() + } + req := &RpcPacket{ + Command: command, + RpcId: rpcId, + RpcType: RpcType_Req, + SeqNum: seqNum, + ReqDone: true, + Acks: c.grabAcks(), + Timeout: timeoutInfo, + Data: data, + } + rpcInfo, err := c.waitForReq(ctx, req) + if err != nil { + return nil, err + } + return rpcInfo.PkCh, nil +} + +func (c *RpcClient) EndStreamReq(rpcId string) { + c.removeReqInfo(rpcId, true) +} diff --git a/pkg/wshrpc/rpc_server.go b/pkg/wshrpc/rpc_server.go new file mode 100644 index 000000000..8ba3a1bb0 --- /dev/null +++ b/pkg/wshrpc/rpc_server.go @@ -0,0 +1,247 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package wshprc + +import ( + "context" + "fmt" + "log" + "runtime/debug" + "sync" + "sync/atomic" + "time" +) + +type SimpleCommandHandlerFn func(context.Context, *RpcServer, string, any) (any, error) +type StreamCommandHandlerFn func(context.Context, *RpcServer, string, any) error + +type RpcServer struct { + CVar *sync.Cond + NextSeqNum *atomic.Int64 + RespPacketsInFlight map[int64]string // seqnum -> rpcId + AckList []int64 + RpcReqs map[string]*RpcInfo + SendCh chan *RpcPacket + RecvCh chan *RpcPacket + SimpleCommandHandlers map[string]SimpleCommandHandlerFn + StreamCommandHandlers map[string]StreamCommandHandlerFn +} + +func MakeRpcServer(sendCh chan *RpcPacket, recvCh chan *RpcPacket) *RpcServer { + if cap(sendCh) < MaxInFlightPackets { + panic(fmt.Errorf("sendCh buffer size must be at least MaxInFlightPackets(%d)", MaxInFlightPackets)) + } + rtn := &RpcServer{ + CVar: sync.NewCond(&sync.Mutex{}), + NextSeqNum: &atomic.Int64{}, + RespPacketsInFlight: make(map[int64]string), + AckList: nil, + RpcReqs: make(map[string]*RpcInfo), + SendCh: sendCh, + RecvCh: recvCh, + } + go rtn.runRecvLoop() + return rtn +} + +func (s *RpcServer) RegisterSimpleCommandHandler(command string, handler SimpleCommandHandlerFn) { + s.CVar.L.Lock() + defer s.CVar.L.Unlock() + s.SimpleCommandHandlers[command] = handler +} + +func (s *RpcServer) RegisterStreamCommandHandler(command string, handler StreamCommandHandlerFn) { + s.CVar.L.Lock() + defer s.CVar.L.Unlock() + s.StreamCommandHandlers[command] = handler +} + +func (s *RpcServer) runRecvLoop() { + defer func() { + if r := recover(); r != nil { + log.Printf("RpcServer.runRecvLoop() panic: %v", r) + debug.PrintStack() + } + }() + for pk := range s.RecvCh { + s.handleAcks(pk.Acks) + if pk.RpcType == RpcType_Req { + if pk.ReqDone { + s.handleSimpleReq(pk) + } else { + s.handleStreamReq(pk) + } + continue + } + log.Printf("RpcClient.runRecvLoop() bad packet type: %v", pk) + } + log.Printf("RpcServer.runRecvLoop() normal exit") +} + +func (s *RpcServer) ackResp(seqNum int64) { + if seqNum == 0 { + return + } + s.CVar.L.Lock() + defer s.CVar.L.Unlock() + s.AckList = append(s.AckList, seqNum) +} + +func makeContextFromTimeout(timeout *TimeoutInfo) (context.Context, context.CancelFunc) { + if timeout == nil { + return context.Background(), func() {} + } + return context.WithDeadline(context.Background(), time.UnixMilli(timeout.Deadline)) +} + +func (s *RpcServer) SendResponse(ctx context.Context, pk *RpcPacket) error { + return s.waitForSend(ctx, pk) +} + +func (s *RpcServer) waitForSend(ctx context.Context, pk *RpcPacket) error { + s.CVar.L.Lock() + defer s.CVar.L.Unlock() + for { + if ctx.Err() != nil { + return ctx.Err() + } + if len(s.RespPacketsInFlight) >= MaxInFlightPackets { + s.CVar.Wait() + continue + } + rpcInfo := s.RpcReqs[pk.RpcId] + if rpcInfo != nil { + if len(rpcInfo.PacketsInFlight) >= MaxUnackedPerRpc { + s.CVar.Wait() + continue + } + } + break + } + s.RespPacketsInFlight[pk.SeqNum] = pk.RpcId + pk.Acks = s.grabAcks_nolock() + s.SendCh <- pk + rpcInfo := s.RpcReqs[pk.RpcId] + if !pk.RespDone && rpcInfo != nil { + rpcInfo = &RpcInfo{ + CloseSync: &sync.Once{}, + RpcId: pk.RpcId, + PkCh: make(chan *RpcPacket, MaxUnackedPerRpc), + PacketsInFlight: make(map[int64]bool), + } + s.RpcReqs[pk.RpcId] = rpcInfo + } + if rpcInfo != nil { + rpcInfo.PacketsInFlight[pk.SeqNum] = true + } + if pk.RespDone { + delete(s.RpcReqs, pk.RpcId) + } + return nil +} + +func (s *RpcServer) handleAcks(acks []int64) { + if len(acks) == 0 { + return + } + s.CVar.L.Lock() + defer s.CVar.L.Unlock() + for _, ack := range acks { + rpcId, ok := s.RespPacketsInFlight[ack] + if !ok { + continue + } + rpcInfo := s.RpcReqs[rpcId] + if rpcInfo != nil { + delete(rpcInfo.PacketsInFlight, ack) + } + delete(s.RespPacketsInFlight, ack) + } + s.CVar.Broadcast() +} + +func (s *RpcServer) handleSimpleReq(pk *RpcPacket) { + s.ackResp(pk.SeqNum) + handler, ok := s.SimpleCommandHandlers[pk.Command] + if !ok { + log.Printf("RpcServer.handleReq() unknown command: %s", pk.Command) + return + } + go func() { + defer func() { + if r := recover(); r != nil { + log.Printf("RpcServer.handleReq(%q) panic: %v", pk.Command, r) + debug.PrintStack() + } + }() + ctx, cancelFn := makeContextFromTimeout(pk.Timeout) + defer cancelFn() + data, err := handler(ctx, s, pk.Command, pk.Data) + seqNum := s.NextSeqNum.Add(1) + respPk := &RpcPacket{ + Command: pk.Command, + RpcId: pk.RpcId, + RpcType: RpcType_Resp, + SeqNum: seqNum, + RespDone: true, + } + if err != nil { + respPk.Error = err.Error() + } else { + respPk.Data = data + } + s.waitForSend(ctx, respPk) + }() +} + +func (s *RpcServer) grabAcks_nolock() []int64 { + acks := s.AckList + s.AckList = nil + return acks +} + +func (s *RpcServer) handleStreamReq(pk *RpcPacket) { + s.ackResp(pk.SeqNum) + handler, ok := s.StreamCommandHandlers[pk.Command] + if !ok { + s.ackResp(pk.SeqNum) + log.Printf("RpcServer.handleStreamReq() unknown command: %s", pk.Command) + return + } + go func() { + defer func() { + r := recover() + if r == nil { + return + } + log.Printf("RpcServer.handleStreamReq(%q) panic: %v", pk.Command, r) + debug.PrintStack() + respPk := &RpcPacket{ + Command: pk.Command, + RpcId: pk.RpcId, + RpcType: RpcType_Resp, + SeqNum: s.NextSeqNum.Add(1), + RespDone: true, + Error: fmt.Sprintf("panic: %v", r), + } + s.waitForSend(context.Background(), respPk) + }() + ctx, cancelFn := makeContextFromTimeout(pk.Timeout) + defer cancelFn() + err := handler(ctx, s, pk.Command, pk.Data) + if err != nil { + respPk := &RpcPacket{ + Command: pk.Command, + RpcId: pk.RpcId, + RpcType: RpcType_Resp, + SeqNum: s.NextSeqNum.Add(1), + RespDone: true, + Error: err.Error(), + } + s.waitForSend(ctx, respPk) + return + } + // check if RespDone has been set, if not, send it here + }() +} diff --git a/pkg/wshrpc/rpc_test.go b/pkg/wshrpc/rpc_test.go new file mode 100644 index 000000000..4f8ed783b --- /dev/null +++ b/pkg/wshrpc/rpc_test.go @@ -0,0 +1,115 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package wshprc + +import ( + "context" + "sync" + "testing" + "time" +) + +func TestSimple(t *testing.T) { + sendCh := make(chan *RpcPacket, MaxInFlightPackets) + recvCh := make(chan *RpcPacket, MaxInFlightPackets) + client := MakeRpcClient(sendCh, recvCh) + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + resp, err := client.SimpleReq(ctx, "test", "hello") + if err != nil { + t.Errorf("SimpleReq() failed: %v", err) + return + } + if resp != "world" { + t.Errorf("SimpleReq() failed: expected 'world', got '%s'", resp) + } + }() + go func() { + defer wg.Done() + req := <-sendCh + if req.Command != "test" { + t.Errorf("expected 'test', got '%s'", req.Command) + } + if req.Data != "hello" { + t.Errorf("expected 'hello', got '%s'", req.Data) + } + resp := &RpcPacket{ + Command: "test", + RpcId: req.RpcId, + RpcType: RpcType_Resp, + SeqNum: 1, + RespDone: true, + Acks: []int64{req.SeqNum}, + Data: "world", + } + recvCh <- resp + }() + wg.Wait() +} + +func makeRpcResp(req *RpcPacket, data any, seqNum int64, done bool) *RpcPacket { + return &RpcPacket{ + Command: req.Command, + RpcId: req.RpcId, + RpcType: RpcType_Resp, + SeqNum: seqNum, + RespDone: done, + Data: data, + } +} + +func TestStream(t *testing.T) { + sendCh := make(chan *RpcPacket, MaxInFlightPackets) + recvCh := make(chan *RpcPacket, MaxInFlightPackets) + client := MakeRpcClient(sendCh, recvCh) + ctx, cancelFn := context.WithTimeout(context.Background(), 2*time.Second) + defer cancelFn() + var wg sync.WaitGroup + wg.Add(2) + go func() { + defer wg.Done() + respCh, err := client.StreamReq(ctx, "test", "hello", 1000) + if err != nil { + t.Errorf("StreamReq() failed: %v", err) + return + } + var output []string + for resp := range respCh { + if resp.Error != "" { + t.Errorf("StreamReq() failed: %v", resp.Error) + return + } + output = append(output, resp.Data.(string)) + } + if len(output) != 3 { + t.Errorf("expected 3 responses, got %d (%v)", len(output), output) + return + } + if output[0] != "one" || output[1] != "two" || output[2] != "three" { + t.Errorf("expected 'one', 'two', 'three', got %v", output) + return + } + }() + go func() { + defer wg.Done() + req := <-sendCh + if req.Command != "test" { + t.Errorf("expected 'test', got '%s'", req.Command) + } + if req.Data != "hello" { + t.Errorf("expected 'hello', got '%s'", req.Data) + } + resp := makeRpcResp(req, "one", 1, false) + recvCh <- resp + resp = makeRpcResp(req, "two", 2, false) + recvCh <- resp + resp = makeRpcResp(req, "three", 3, true) + recvCh <- resp + }() + wg.Wait() +} diff --git a/pkg/wshrpc/wshrpc.go b/pkg/wshrpc/wshrpc.go new file mode 100644 index 000000000..fe9034fb8 --- /dev/null +++ b/pkg/wshrpc/wshrpc.go @@ -0,0 +1,58 @@ +// Copyright 2024, Command Line Inc. +// SPDX-License-Identifier: Apache-2.0 + +package wshprc + +import ( + "context" +) + +const ( + MaxOpenRpcs = 10 + MaxUnackedPerRpc = 10 + MaxInFlightPackets = MaxOpenRpcs * MaxUnackedPerRpc +) + +const ( + RpcType_Req = "req" + RpcType_Resp = "resp" +) + +const ( + CommandType_Ack = ":ack" + CommandType_Ping = ":ping" + CommandType_Cancel = ":cancel" + CommandType_Timeout = ":timeout" +) + +var rpcClientContextKey = struct{}{} + +type TimeoutInfo struct { + Deadline int64 `json:"deadline,omitempty"` + ReqPacketTimeout int64 `json:"reqpackettimeout,omitempty"` // for streaming requests + RespPacketTimeout int64 `json:"resppackettimeout,omitempty"` // for streaming responses +} + +type RpcPacket struct { + Command string `json:"command"` + RpcId string `json:"rpcid"` + RpcType string `json:"rpctype"` + SeqNum int64 `json:"seqnum"` + ReqDone bool `json:"reqdone"` + RespDone bool `json:"resdone"` + Acks []int64 `json:"acks,omitempty"` // seqnums acked + Timeout *TimeoutInfo `json:"timeout,omitempty"` // for initial request only + Data any `json:"data"` // json data for command + Error string `json:"error,omitempty"` +} + +func GetRpcClient(ctx context.Context) *RpcClient { + if ctx == nil { + return nil + } + val := ctx.Value(rpcClientContextKey) + if val == nil { + return nil + } + return val.(*RpcClient) +} diff --git a/pkg/wshutil/wshcommands.go b/pkg/wshutil/wshcommands.go index 5051b09ae..53fc7b8f7 100644 --- a/pkg/wshutil/wshcommands.go +++ b/pkg/wshutil/wshcommands.go @@ -9,6 +9,7 @@ const ( CommandSetView = "setview" CommandSetMeta = "setmeta" CommandBlockFileAppend = "blockfile:append" + CommandStreamFile = "streamfile" ) var CommandToTypeMap = map[string]reflect.Type{ @@ -23,6 +24,8 @@ type Command interface { // for unmarshalling type baseCommand struct { Command string `json:"command"` + RpcID string `json:"rpcid"` + RpcType string `json:"rpctype"` } type SetViewCommand struct { @@ -52,3 +55,12 @@ type BlockFileAppendCommand struct { func (bfac *BlockFileAppendCommand) GetCommand() string { return CommandBlockFileAppend } + +type StreamFileCommand struct { + Command string `json:"command"` + FileName string `json:"filename"` +} + +func (c *StreamFileCommand) GetCommand() string { + return CommandStreamFile +} diff --git a/public/style.less b/public/style.less index 766c4de14..4a0cbdfe4 100644 --- a/public/style.less +++ b/public/style.less @@ -36,3 +36,7 @@ body { border-bottom: 1px solid var(--border-color); flex-shrink: 0; } + +.error-boundary { + color: var(--error-color); +} diff --git a/vite.config.ts b/vite.config.ts index d709ef5c0..66fa0b4a8 100644 --- a/vite.config.ts +++ b/vite.config.ts @@ -1,13 +1,20 @@ import react from "@vitejs/plugin-react"; import { defineConfig } from "vite"; +import { viteStaticCopy } from "vite-plugin-static-copy"; import tsconfigPaths from "vite-tsconfig-paths"; export default defineConfig({ - plugins: [react({}), tsconfigPaths()], - define: { "process.env": process.env }, + plugins: [ + react({}), + tsconfigPaths(), + viteStaticCopy({ + targets: [{ src: "node_modules/monaco-editor/min/vs/*", dest: "monaco" }], + }), + ], publicDir: "public", build: { target: "es6", + sourcemap: true, rollupOptions: { input: { app: "public/index.html", diff --git a/yarn.lock b/yarn.lock index 157902f90..d8512509a 100644 --- a/yarn.lock +++ b/yarn.lock @@ -1913,6 +1913,30 @@ __metadata: languageName: node linkType: hard +"@monaco-editor/loader@npm:^1.4.0": + version: 1.4.0 + resolution: "@monaco-editor/loader@npm:1.4.0" + dependencies: + state-local: "npm:^1.0.6" + peerDependencies: + monaco-editor: ">= 0.21.0 < 1" + checksum: 10c0/68938350adf2f42363a801d87f5d00c87d397d4cba7041141af10a9216bd35c85209b4723a26d56cb32e68eef61471deda2a450f8892891118fbdce7fa1d987d + languageName: node + linkType: hard + +"@monaco-editor/react@npm:^4.6.0": + version: 4.6.0 + resolution: "@monaco-editor/react@npm:4.6.0" + dependencies: + "@monaco-editor/loader": "npm:^1.4.0" + peerDependencies: + monaco-editor: ">= 0.25.0 < 1" + react: ^16.8.0 || ^17.0.0 || ^18.0.0 + react-dom: ^16.8.0 || ^17.0.0 || ^18.0.0 + checksum: 10c0/231e9a9b66a530db326f6732de0ebffcce6b79dcfaf4948923d78b9a3d5e2a04b7a06e1f85bbbca45a5ae15c107a124e4c5c46cabadc20a498fb5f2d05f7f379 + languageName: node + linkType: hard + "@ndelangen/get-tarball@npm:^3.0.7": version: 3.0.9 resolution: "@ndelangen/get-tarball@npm:3.0.9" @@ -4650,7 +4674,7 @@ __metadata: languageName: node linkType: hard -"chokidar@npm:^3.6.0": +"chokidar@npm:^3.5.3, chokidar@npm:^3.6.0": version: 3.6.0 resolution: "chokidar@npm:3.6.0" dependencies: @@ -6161,7 +6185,7 @@ __metadata: languageName: node linkType: hard -"fast-glob@npm:^3.2.9, fast-glob@npm:^3.3.2": +"fast-glob@npm:^3.2.11, fast-glob@npm:^3.2.9, fast-glob@npm:^3.3.2": version: 3.3.2 resolution: "fast-glob@npm:3.3.2" dependencies: @@ -7001,13 +7025,6 @@ __metadata: languageName: node linkType: hard -"inherits@npm:2.0.3": - version: 2.0.3 - resolution: "inherits@npm:2.0.3" - checksum: 10c0/6e56402373149ea076a434072671f9982f5fad030c7662be0332122fe6c0fa490acb3cc1010d90b6eff8d640b1167d77674add52dfd1bb85d545cf29e80e73e7 - languageName: node - linkType: hard - "inline-style-parser@npm:0.2.3": version: 0.2.3 resolution: "inline-style-parser@npm:0.2.3" @@ -8816,6 +8833,13 @@ __metadata: languageName: node linkType: hard +"monaco-editor@npm:^0.49.0": + version: 0.49.0 + resolution: "monaco-editor@npm:0.49.0" + checksum: 10c0/c62d19e65f8ad441d0b29d8f43181c730bd373854ff0f9331c42ed1b97b729ab099e6f961c2e377acddda6923b8030e0453ae87edd3f7db3ace75c91f44431cd + languageName: node + linkType: hard + "ms@npm:2.0.0": version: 2.0.0 resolution: "ms@npm:2.0.0" @@ -9344,16 +9368,6 @@ __metadata: languageName: node linkType: hard -"path@npm:^0.12.7": - version: 0.12.7 - resolution: "path@npm:0.12.7" - dependencies: - process: "npm:^0.11.1" - util: "npm:^0.10.3" - checksum: 10c0/f795ce5438a988a590c7b6dfd450ec9baa1c391a8be4c2dea48baa6e0f5b199e56cd83b8c9ebf3991b81bea58236d2c32bdafe2c17a2e70c3a2e4c69891ade59 - languageName: node - linkType: hard - "pathe@npm:^1.1.1, pathe@npm:^1.1.2": version: 1.1.2 resolution: "pathe@npm:1.1.2" @@ -9568,7 +9582,7 @@ __metadata: languageName: node linkType: hard -"process@npm:^0.11.1, process@npm:^0.11.10": +"process@npm:^0.11.10": version: 0.11.10 resolution: "process@npm:0.11.10" checksum: 10c0/40c3ce4b7e6d4b8c3355479df77aeed46f81b279818ccdc500124e6a5ab882c0cc81ff7ea16384873a95a74c4570b01b120f287abbdd4c877931460eca6084b3 @@ -10643,6 +10657,13 @@ __metadata: languageName: node linkType: hard +"state-local@npm:^1.0.6": + version: 1.0.7 + resolution: "state-local@npm:1.0.7" + checksum: 10c0/8dc7daeac71844452fafb514a6d6b6f40d7e2b33df398309ea1c7b3948d6110c57f112b7196500a10c54fdde40291488c52c875575670fb5c819602deca48bd9 + languageName: node + linkType: hard + "statuses@npm:2.0.1": version: 2.0.1 resolution: "statuses@npm:2.0.1" @@ -10950,6 +10971,8 @@ __metadata: dependencies: "@chromatic-com/storybook": "npm:^1.3.3" "@eslint/js": "npm:^9.2.0" + "@monaco-editor/loader": "npm:^1.4.0" + "@monaco-editor/react": "npm:^4.6.0" "@observablehq/plot": "npm:^0.6.14" "@storybook/addon-essentials": "npm:^8.0.10" "@storybook/addon-interactions": "npm:^8.0.10" @@ -10974,7 +10997,7 @@ __metadata: immer: "npm:^10.1.1" jotai: "npm:^2.8.0" less: "npm:^4.2.0" - path: "npm:^0.12.7" + monaco-editor: "npm:^0.49.0" prettier: "npm:^3.2.5" prettier-plugin-jsdoc: "npm:^1.3.0" prettier-plugin-organize-imports: "npm:^3.2.4" @@ -10990,6 +11013,7 @@ __metadata: typescript-eslint: "npm:^7.8.0" uuid: "npm:^9.0.1" vite: "npm:^5.0.0" + vite-plugin-static-copy: "npm:^1.0.5" vite-tsconfig-paths: "npm:^4.3.2" vitest: "npm:^1.6.0" languageName: unknown @@ -11529,15 +11553,6 @@ __metadata: languageName: node linkType: hard -"util@npm:^0.10.3": - version: 0.10.4 - resolution: "util@npm:0.10.4" - dependencies: - inherits: "npm:2.0.3" - checksum: 10c0/d29f6893e406b63b088ce9924da03201df89b31490d4d011f1c07a386ea4b3dbe907464c274023c237da470258e1805d806c7e4009a5974cd6b1d474b675852a - languageName: node - linkType: hard - "util@npm:^0.12.4, util@npm:^0.12.5": version: 0.12.5 resolution: "util@npm:0.12.5" @@ -11627,6 +11642,20 @@ __metadata: languageName: node linkType: hard +"vite-plugin-static-copy@npm:^1.0.5": + version: 1.0.5 + resolution: "vite-plugin-static-copy@npm:1.0.5" + dependencies: + chokidar: "npm:^3.5.3" + fast-glob: "npm:^3.2.11" + fs-extra: "npm:^11.1.0" + picocolors: "npm:^1.0.0" + peerDependencies: + vite: ^5.0.0 + checksum: 10c0/b11c6a0bd31b8592af4099f75122b2073738659d1d06c6aeedd47d15b84758a57c17d56b6e79cefa3b7871c8f51980b423a9d524bbceb4b17363d4363e78a63b + languageName: node + linkType: hard + "vite-tsconfig-paths@npm:^4.3.2": version: 4.3.2 resolution: "vite-tsconfig-paths@npm:4.3.2"