mirror of
https://github.com/wavetermdev/waveterm
synced 2026-05-24 09:18:27 +00:00
Got preview hooked up, log output hooked up, environment tab hooked up... the controller, restarting the apps. it is actually working! still lots to do and lots of hard coding to fix, but it is coming together...
448 lines
11 KiB
Go
448 lines
11 KiB
Go
// Copyright 2025, Command Line Inc.
|
|
// SPDX-License-Identifier: Apache-2.0
|
|
|
|
package blockcontroller
|
|
|
|
import (
|
|
"context"
|
|
"encoding/json"
|
|
"fmt"
|
|
"io"
|
|
"log"
|
|
"net/http"
|
|
"os"
|
|
"os/exec"
|
|
"path/filepath"
|
|
"runtime"
|
|
"sync"
|
|
"syscall"
|
|
"time"
|
|
|
|
"github.com/wavetermdev/waveterm/pkg/tsunamiutil"
|
|
"github.com/wavetermdev/waveterm/pkg/utilds"
|
|
"github.com/wavetermdev/waveterm/pkg/wavebase"
|
|
"github.com/wavetermdev/waveterm/pkg/waveobj"
|
|
"github.com/wavetermdev/waveterm/pkg/wps"
|
|
"github.com/wavetermdev/waveterm/pkg/wstore"
|
|
"github.com/wavetermdev/waveterm/tsunami/build"
|
|
)
|
|
|
|
type TsunamiAppProc struct {
|
|
Cmd *exec.Cmd
|
|
LineBuffer *utilds.MultiReaderLineBuffer
|
|
StdinWriter io.WriteCloser
|
|
Port int // Port the tsunami app is listening on
|
|
WaitCh chan struct{} // Channel that gets closed when cmd.Wait() returns
|
|
WaitRtn error // Error returned by cmd.Wait()
|
|
}
|
|
|
|
type TsunamiController struct {
|
|
blockId string
|
|
tabId string
|
|
runLock sync.Mutex
|
|
tsunamiProc *TsunamiAppProc
|
|
statusLock sync.Mutex
|
|
status string
|
|
statusVersion int
|
|
exitCode int
|
|
port int
|
|
}
|
|
|
|
|
|
func (c *TsunamiController) fetchAndSetSchemas(port int) {
|
|
url := fmt.Sprintf("http://localhost:%d/api/schemas", port)
|
|
client := &http.Client{
|
|
Timeout: 10 * time.Second,
|
|
}
|
|
|
|
resp, err := client.Get(url)
|
|
if err != nil {
|
|
log.Printf("TsunamiController: failed to fetch schemas from %s: %v", url, err)
|
|
return
|
|
}
|
|
defer resp.Body.Close()
|
|
|
|
if resp.StatusCode != http.StatusOK {
|
|
log.Printf("TsunamiController: received non-200 status %d from %s", resp.StatusCode, url)
|
|
return
|
|
}
|
|
|
|
var schemas any
|
|
if err := json.NewDecoder(resp.Body).Decode(&schemas); err != nil {
|
|
log.Printf("TsunamiController: failed to decode schemas response: %v", err)
|
|
return
|
|
}
|
|
|
|
blockRef := waveobj.MakeORef(waveobj.OType_Block, c.blockId)
|
|
wstore.SetRTInfo(blockRef, map[string]any{
|
|
"tsunami:schemas": schemas,
|
|
})
|
|
|
|
log.Printf("TsunamiController: successfully fetched and cached schemas for block %s", c.blockId)
|
|
}
|
|
|
|
func (c *TsunamiController) clearSchemas() {
|
|
blockRef := waveobj.MakeORef(waveobj.OType_Block, c.blockId)
|
|
wstore.SetRTInfo(blockRef, map[string]any{
|
|
"tsunami:schemas": nil,
|
|
})
|
|
log.Printf("TsunamiController: cleared schemas for block %s", c.blockId)
|
|
}
|
|
|
|
|
|
func isBuildCacheUpToDate(appPath string) (bool, error) {
|
|
appName := build.GetAppName(appPath)
|
|
|
|
osArch := runtime.GOOS + "-" + runtime.GOARCH
|
|
|
|
cachePath, err := tsunamiutil.GetTsunamiAppCachePath("local", appName, osArch)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
cacheInfo, err := os.Stat(cachePath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return false, nil
|
|
}
|
|
return false, err
|
|
}
|
|
|
|
appModTime, err := build.GetAppModTime(appPath)
|
|
if err != nil {
|
|
return false, err
|
|
}
|
|
|
|
cacheModTime := cacheInfo.ModTime()
|
|
return !cacheModTime.Before(appModTime), nil
|
|
}
|
|
|
|
func (c *TsunamiController) Start(ctx context.Context, blockMeta waveobj.MetaMapType, rtOpts *waveobj.RuntimeOpts, force bool) error {
|
|
log.Printf("TsunamiController.Start called for block %s", c.blockId)
|
|
c.runLock.Lock()
|
|
defer c.runLock.Unlock()
|
|
|
|
scaffoldPath := blockMeta.GetString(waveobj.MetaKey_TsunamiScaffoldPath, "")
|
|
if scaffoldPath == "" {
|
|
return fmt.Errorf("tsunami:scaffoldpath is required")
|
|
}
|
|
scaffoldPath, err := wavebase.ExpandHomeDir(scaffoldPath)
|
|
if err != nil {
|
|
return fmt.Errorf("tsunami:scaffoldpath invalid: %w", err)
|
|
}
|
|
if !filepath.IsAbs(scaffoldPath) {
|
|
return fmt.Errorf("tsunami:scaffoldpath must be absolute: %s", scaffoldPath)
|
|
}
|
|
|
|
sdkReplacePath := blockMeta.GetString(waveobj.MetaKey_TsunamiSdkReplacePath, "")
|
|
if sdkReplacePath == "" {
|
|
return fmt.Errorf("tsunami:sdkreplacepath is required")
|
|
}
|
|
sdkReplacePath, err = wavebase.ExpandHomeDir(sdkReplacePath)
|
|
if err != nil {
|
|
return fmt.Errorf("tsunami:sdkreplacepath invalid: %w", err)
|
|
}
|
|
if !filepath.IsAbs(sdkReplacePath) {
|
|
return fmt.Errorf("tsunami:sdkreplacepath must be absolute: %s", sdkReplacePath)
|
|
}
|
|
|
|
appPath := blockMeta.GetString(waveobj.MetaKey_TsunamiAppPath, "")
|
|
if appPath == "" {
|
|
return fmt.Errorf("tsunami:apppath is required")
|
|
}
|
|
appPath, err = wavebase.ExpandHomeDir(appPath)
|
|
if err != nil {
|
|
return fmt.Errorf("tsunami:apppath invalid: %w", err)
|
|
}
|
|
if !filepath.IsAbs(appPath) {
|
|
return fmt.Errorf("tsunami:apppath must be absolute: %s", appPath)
|
|
}
|
|
|
|
appName := build.GetAppName(appPath)
|
|
osArch := runtime.GOOS + "-" + runtime.GOARCH
|
|
|
|
cachePath, err := tsunamiutil.GetTsunamiAppCachePath("local", appName, osArch)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to get cache path: %w", err)
|
|
}
|
|
|
|
upToDate, err := isBuildCacheUpToDate(appPath)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to check build cache: %w", err)
|
|
}
|
|
|
|
if !upToDate || force {
|
|
nodePath := wavebase.GetWaveAppElectronExecPath()
|
|
if nodePath == "" {
|
|
return fmt.Errorf("electron executable path not set")
|
|
}
|
|
|
|
opts := build.BuildOpts{
|
|
AppPath: appPath,
|
|
Verbose: true,
|
|
Open: false,
|
|
KeepTemp: false,
|
|
OutputFile: cachePath,
|
|
ScaffoldPath: scaffoldPath,
|
|
SdkReplacePath: sdkReplacePath,
|
|
NodePath: nodePath,
|
|
}
|
|
|
|
err = build.TsunamiBuild(opts)
|
|
if err != nil {
|
|
log.Printf("TsunamiController build error for block %s: %v", c.blockId, err)
|
|
return fmt.Errorf("failed to build tsunami app: %w", err)
|
|
}
|
|
}
|
|
|
|
info, err := os.Stat(cachePath)
|
|
if err != nil {
|
|
if os.IsNotExist(err) {
|
|
return fmt.Errorf("app cache does not exist: %s", cachePath)
|
|
}
|
|
return fmt.Errorf("failed to stat app cache: %w", err)
|
|
}
|
|
|
|
if runtime.GOOS != "windows" && info.Mode()&0111 == 0 {
|
|
return fmt.Errorf("app cache is not executable: %s", cachePath)
|
|
}
|
|
|
|
tsunamiProc, err := runTsunamiAppBinary(ctx, cachePath, appPath, blockMeta)
|
|
if err != nil {
|
|
return fmt.Errorf("failed to run tsunami app: %w", err)
|
|
}
|
|
|
|
c.tsunamiProc = tsunamiProc
|
|
c.WithStatusLock(func() {
|
|
c.status = Status_Running
|
|
c.port = tsunamiProc.Port
|
|
})
|
|
go c.sendStatusUpdate()
|
|
|
|
// Asynchronously fetch schemas after port is detected
|
|
go func() {
|
|
c.fetchAndSetSchemas(tsunamiProc.Port)
|
|
}()
|
|
|
|
// Monitor process completion
|
|
go func() {
|
|
<-tsunamiProc.WaitCh
|
|
c.runLock.Lock()
|
|
if c.tsunamiProc == tsunamiProc {
|
|
c.tsunamiProc = nil
|
|
c.WithStatusLock(func() {
|
|
c.status = Status_Done
|
|
c.port = 0
|
|
c.exitCode = exitCodeFromWaitErr(tsunamiProc.WaitRtn)
|
|
})
|
|
c.clearSchemas()
|
|
go c.sendStatusUpdate()
|
|
}
|
|
c.runLock.Unlock()
|
|
}()
|
|
|
|
return nil
|
|
}
|
|
|
|
func (c *TsunamiController) Stop(graceful bool, newStatus string) error {
|
|
log.Printf("TsunamiController.Stop called for block %s (graceful: %t, newStatus: %s)", c.blockId, graceful, newStatus)
|
|
c.runLock.Lock()
|
|
defer c.runLock.Unlock()
|
|
|
|
if c.tsunamiProc == nil {
|
|
return nil
|
|
}
|
|
|
|
if c.tsunamiProc.Cmd.Process != nil {
|
|
c.tsunamiProc.Cmd.Process.Kill()
|
|
}
|
|
|
|
if c.tsunamiProc.StdinWriter != nil {
|
|
c.tsunamiProc.StdinWriter.Close()
|
|
}
|
|
|
|
c.tsunamiProc = nil
|
|
if newStatus == "" {
|
|
newStatus = Status_Done
|
|
}
|
|
c.WithStatusLock(func() {
|
|
c.status = newStatus
|
|
c.port = 0
|
|
})
|
|
c.clearSchemas()
|
|
go c.sendStatusUpdate()
|
|
return nil
|
|
}
|
|
|
|
func (c *TsunamiController) GetRuntimeStatus() *BlockControllerRuntimeStatus {
|
|
var rtn *BlockControllerRuntimeStatus
|
|
c.WithStatusLock(func() {
|
|
c.statusVersion++
|
|
rtn = &BlockControllerRuntimeStatus{
|
|
BlockId: c.blockId,
|
|
Version: c.statusVersion,
|
|
ShellProcStatus: c.status,
|
|
ShellProcExitCode: c.exitCode,
|
|
}
|
|
|
|
if c.status == Status_Running && c.port > 0 {
|
|
rtn.TsunamiPort = c.port
|
|
}
|
|
})
|
|
|
|
return rtn
|
|
}
|
|
|
|
func (c *TsunamiController) SendInput(input *BlockInputUnion) error {
|
|
return fmt.Errorf("tsunami controller send input not implemented")
|
|
}
|
|
|
|
func runTsunamiAppBinary(ctx context.Context, appBinPath string, appPath string, blockMeta waveobj.MetaMapType) (*TsunamiAppProc, error) {
|
|
cmd := exec.Command(appBinPath)
|
|
cmd.Env = append(os.Environ(), "TSUNAMI_CLOSEONSTDIN=1")
|
|
|
|
// Add TsunamiEnv variables if configured
|
|
tsunamiEnv := blockMeta.GetMap(waveobj.MetaKey_TsunamiEnv)
|
|
for key, value := range tsunamiEnv {
|
|
if strValue, ok := value.(string); ok {
|
|
cmd.Env = append(cmd.Env, key+"="+strValue)
|
|
}
|
|
}
|
|
|
|
stdoutPipe, err := cmd.StdoutPipe()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create stdout pipe: %w", err)
|
|
}
|
|
|
|
stderrPipe, err := cmd.StderrPipe()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create stderr pipe: %w", err)
|
|
}
|
|
|
|
stdinPipe, err := cmd.StdinPipe()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to create stdin pipe: %w", err)
|
|
}
|
|
|
|
appName := build.GetAppName(appPath)
|
|
|
|
lineBuffer := utilds.MakeMultiReaderLineBuffer(1000)
|
|
portChan := make(chan int, 1)
|
|
portFound := false
|
|
|
|
lineBuffer.SetLineCallback(func(line string) {
|
|
log.Printf("[tsunami:%s] %s\n", appName, line)
|
|
|
|
if !portFound {
|
|
if port := build.ParseTsunamiPort(line); port > 0 {
|
|
portFound = true
|
|
portChan <- port
|
|
}
|
|
}
|
|
})
|
|
|
|
err = cmd.Start()
|
|
if err != nil {
|
|
return nil, fmt.Errorf("failed to start tsunami app: %w", err)
|
|
}
|
|
|
|
// Create wait channel and tsunami proc first
|
|
waitCh := make(chan struct{})
|
|
tsunamiProc := &TsunamiAppProc{
|
|
Cmd: cmd,
|
|
LineBuffer: lineBuffer,
|
|
StdinWriter: stdinPipe,
|
|
WaitCh: waitCh,
|
|
}
|
|
|
|
// Start goroutine to handle cmd.Wait()
|
|
go func() {
|
|
tsunamiProc.WaitRtn = cmd.Wait()
|
|
log.Printf("WAIT RETURN: %v\n", tsunamiProc.WaitRtn)
|
|
if err := tsunamiProc.WaitRtn; err != nil {
|
|
if ee, ok := err.(*exec.ExitError); ok {
|
|
if ws, ok := ee.ProcessState.Sys().(syscall.WaitStatus); ok {
|
|
if ws.Signaled() {
|
|
sig := ws.Signal()
|
|
log.Printf("tsunami proc killed by signal: %s (%d)", sig, int(sig))
|
|
} else {
|
|
log.Printf("tsunami proc exited with code %d", ee.ExitCode())
|
|
}
|
|
}
|
|
} else {
|
|
log.Printf("tsunami proc error: %v", err)
|
|
}
|
|
}
|
|
|
|
close(waitCh)
|
|
}()
|
|
|
|
// Start reading both stdout and stderr
|
|
go lineBuffer.ReadAll(stdoutPipe)
|
|
go lineBuffer.ReadAll(stderrPipe)
|
|
|
|
// Wait for either port detection, process death, or context timeout
|
|
errChan := make(chan error, 1)
|
|
go func() {
|
|
<-tsunamiProc.WaitCh
|
|
select {
|
|
case <-portChan:
|
|
// Port already found, nothing to do
|
|
default:
|
|
errChan <- fmt.Errorf("tsunami process died before emitting listening message")
|
|
}
|
|
}()
|
|
|
|
select {
|
|
case port := <-portChan:
|
|
tsunamiProc.Port = port
|
|
return tsunamiProc, nil
|
|
case err := <-errChan:
|
|
cmd.Process.Kill()
|
|
return nil, err
|
|
case <-ctx.Done():
|
|
cmd.Process.Kill()
|
|
return nil, fmt.Errorf("timeout waiting for tsunami port: %w", ctx.Err())
|
|
}
|
|
}
|
|
|
|
func MakeTsunamiController(tabId string, blockId string) Controller {
|
|
log.Printf("make tsunami controller: %s %s\n", tabId, blockId)
|
|
return &TsunamiController{
|
|
blockId: blockId,
|
|
tabId: tabId,
|
|
status: Status_Init,
|
|
}
|
|
}
|
|
|
|
// requires the lock (so do not call while holding statusLock)
|
|
func (c *TsunamiController) sendStatusUpdate() {
|
|
rtStatus := c.GetRuntimeStatus()
|
|
log.Printf("sending blockcontroller update %#v\n", rtStatus)
|
|
wps.Broker.Publish(wps.WaveEvent{
|
|
Event: wps.Event_ControllerStatus,
|
|
Scopes: []string{
|
|
waveobj.MakeORef(waveobj.OType_Tab, c.tabId).String(),
|
|
waveobj.MakeORef(waveobj.OType_Block, c.blockId).String(),
|
|
},
|
|
Data: rtStatus,
|
|
})
|
|
}
|
|
|
|
func (c *TsunamiController) WithStatusLock(fn func()) {
|
|
c.statusLock.Lock()
|
|
defer c.statusLock.Unlock()
|
|
fn()
|
|
}
|
|
|
|
func exitCodeFromWaitErr(waitErr error) int {
|
|
if waitErr != nil {
|
|
if exitError, ok := waitErr.(*exec.ExitError); ok {
|
|
return exitError.ExitCode()
|
|
} else {
|
|
return 1
|
|
}
|
|
} else {
|
|
return 0
|
|
}
|
|
}
|