update logging for jobmanager. (#2854)

one directory, prune files in connserver, write keepalives every hour.
remove senddata log line.
This commit is contained in:
Mike Sawka 2026-02-10 18:43:54 -08:00 committed by GitHub
parent 8abe3f009b
commit 9ea1d24d6c
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
7 changed files with 105 additions and 21 deletions

View file

@ -38,6 +38,12 @@ var serverCmd = &cobra.Command{
RunE: serverRun,
}
const (
JobLogRetentionTime = 48 * time.Hour
JobLogCleanupDelay = 10 * time.Second
JobLogCleanupInterval = 1 * time.Hour
)
var connServerRouter bool
var connServerRouterDomainSocket bool
var connServerConnName string
@ -53,6 +59,61 @@ func init() {
rootCmd.AddCommand(serverCmd)
}
func cleanupOldJobLogs() {
jobDir := wavebase.GetRemoteJobLogDir()
entries, err := os.ReadDir(jobDir)
if err != nil {
return
}
cutoffTime := time.Now().Add(-JobLogRetentionTime)
for _, entry := range entries {
if entry.IsDir() {
continue
}
name := entry.Name()
if !strings.HasSuffix(name, ".log") {
continue
}
info, err := entry.Info()
if err != nil {
continue
}
if info.ModTime().Before(cutoffTime) {
filePath := filepath.Join(jobDir, name)
err := os.Remove(filePath)
if err != nil {
log.Printf("error removing old job log file %s: %v", filePath, err)
} else {
log.Printf("removed old job log file: %s", filePath)
}
}
}
}
func startJobLogCleanup() {
go func() {
defer func() {
panichandler.PanicHandler("startJobLogCleanup", recover())
}()
time.Sleep(JobLogCleanupDelay)
cleanupOldJobLogs()
ticker := time.NewTicker(JobLogCleanupInterval)
defer ticker.Stop()
for range ticker.C {
cleanupOldJobLogs()
}
}()
}
func getRemoteDomainSocketName() string {
homeDir := wavebase.GetHomeDir()
return filepath.Join(homeDir, wavebase.RemoteWaveHomeDirName, wavebase.RemoteDomainSocketBaseName)
@ -218,6 +279,7 @@ func serverRunRouter() error {
}()
wshremote.RunSysInfoLoop(client, connServerConnName)
}()
startJobLogCleanup()
log.Printf("running server, successfully started")
select {}
}
@ -324,6 +386,7 @@ func serverRunRouterDomainSocket(jwtToken string) error {
}()
wshremote.RunSysInfoLoop(client, connServerConnName)
}()
startJobLogCleanup()
log.Printf("running server (router-domainsocket mode), successfully started")
select {}
@ -346,6 +409,7 @@ func serverRunNormal(jwtToken string) error {
}()
wshremote.RunSysInfoLoop(RpcClient, RpcContext.Conn)
}()
startJobLogCleanup()
select {} // run forever
}

View file

@ -79,6 +79,17 @@ func SetupJobManager(clientId string, jobId string, publicKeyBytes []byte, jobAu
return fmt.Errorf("failed to daemonize: %w", err)
}
go func() {
defer func() {
panichandler.PanicHandler("JobManager:keepalive", recover())
}()
ticker := time.NewTicker(1 * time.Hour)
defer ticker.Stop()
for range ticker.C {
log.Printf("keepalive: job manager active\n")
}
}()
return nil
}
@ -365,17 +376,6 @@ func (jm *JobManager) StartStream(msc *MainServerConn) error {
return nil
}
func GetJobSocketPath(jobId string) string {
socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid()))
return filepath.Join(socketDir, fmt.Sprintf("%s.sock", jobId))
}
func GetJobFilePath(clientId string, jobId string, extension string) string {
homeDir := wavebase.GetHomeDir()
jobDir := filepath.Join(homeDir, ".waveterm", "jobs", clientId)
return filepath.Join(jobDir, fmt.Sprintf("%s.%s", jobId, extension))
}
func MakeJobDomainSocket(clientId string, jobId string) error {
socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid()))
err := os.MkdirAll(socketDir, 0700)
@ -383,7 +383,7 @@ func MakeJobDomainSocket(clientId string, jobId string) error {
return fmt.Errorf("failed to create socket directory: %w", err)
}
socketPath := GetJobSocketPath(jobId)
socketPath := wavebase.GetRemoteJobSocketPath(jobId)
os.Remove(socketPath)

View file

@ -13,6 +13,7 @@ import (
"path/filepath"
"syscall"
"github.com/wavetermdev/waveterm/pkg/wavebase"
"golang.org/x/sys/unix"
)
@ -32,7 +33,7 @@ func daemonize(clientId string, jobId string) error {
}
devNull.Close()
logPath := GetJobFilePath(clientId, jobId, "log")
logPath := wavebase.GetRemoteJobFilePath(jobId, "log")
logDir := filepath.Dir(logPath)
err = os.MkdirAll(logDir, 0700)
if err != nil {
@ -54,6 +55,7 @@ func daemonize(clientId string, jobId string) error {
log.SetOutput(logFile)
log.Printf("job manager daemonized, logging to %s\n", logPath)
log.Printf("job owner clientid: %s\n", clientId)
signal.Ignore(syscall.SIGHUP)

View file

@ -42,8 +42,8 @@ type routedDataSender struct {
}
func (rds *routedDataSender) SendData(dataPk wshrpc.CommandStreamData) {
log.Printf("SendData: sending seq=%d, len=%d, eof=%t, error=%s, route=%s",
dataPk.Seq, len(dataPk.Data64), dataPk.Eof, dataPk.Error, rds.route)
// log.Printf("SendData: sending seq=%d, len=%d, eof=%t, error=%s, route=%s",
// dataPk.Seq, len(dataPk.Data64), dataPk.Eof, dataPk.Error, rds.route)
err := wshclient.StreamDataCommand(rds.wshRpc, dataPk, &wshrpc.RpcOpts{NoResponse: true, Route: rds.route})
if err != nil {
log.Printf("SendData: error sending stream data: %v\n", err)
@ -132,4 +132,3 @@ func (msc *MainServerConn) JobInputCommand(ctx context.Context, data wshrpc.Comm
WshCmdJobManager.InputQueue.QueueItem(data.InputSessionId, data.SeqNum, data)
return nil
}

View file

@ -162,8 +162,8 @@ func mergeActivity(curActivity *telemetrydata.TEventProps, newActivity telemetry
// ignores the timestamp in tevent, and uses the current time
func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error {
eventTs := time.Now()
// compute to 2-hour boundary, and round up to next 2-hour boundary
eventTs = eventTs.Truncate(2 * time.Hour).Add(2 * time.Hour)
// compute to 1-hour boundary, and round up to next 1-hour boundary
eventTs = eventTs.Truncate(time.Hour).Add(time.Hour)
return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
// find event that matches this timestamp with event name "app:activity"
@ -195,7 +195,7 @@ func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err
func TruncateActivityTEventForShutdown(ctx context.Context) error {
nowTs := time.Now()
eventTs := nowTs.Truncate(2 * time.Hour).Add(2 * time.Hour)
eventTs := nowTs.Truncate(time.Hour).Add(time.Hour)
return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
// find event that matches this timestamp with event name "app:activity"
uuidStr := tx.GetString(`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ?`, eventTs.UnixMilli(), ActivityEventName)

View file

@ -435,3 +435,22 @@ func getSystemSummary(ctx context.Context) string {
return fmt.Sprintf("%s (%s)", runtime.GOOS, runtime.GOARCH)
}
}
// job socket path on remote machine
func GetRemoteJobSocketPath(jobId string) string {
socketDir := filepath.Join("/tmp", fmt.Sprintf("waveterm-%d", os.Getuid()))
return filepath.Join(socketDir, fmt.Sprintf("%s.sock", jobId))
}
// job file path on remote machine
func GetRemoteJobFilePath(jobId string, extension string) string {
jobDir := GetRemoteJobLogDir()
return filepath.Join(jobDir, fmt.Sprintf("%s.%s", jobId, extension))
}
// job file dir on remote machines
func GetRemoteJobLogDir() string {
homeDir := GetHomeDir()
jobDir := filepath.Join(homeDir, ".waveterm", "jobs")
return jobDir
}

View file

@ -17,7 +17,7 @@ import (
"time"
"github.com/shirou/gopsutil/v4/process"
"github.com/wavetermdev/waveterm/pkg/jobmanager"
"github.com/wavetermdev/waveterm/pkg/wavebase"
"github.com/wavetermdev/waveterm/pkg/wshrpc"
"github.com/wavetermdev/waveterm/pkg/wshrpc/wshclient"
"github.com/wavetermdev/waveterm/pkg/wshutil"
@ -43,7 +43,7 @@ func isProcessRunning(pid int, pidStartTs int64) (*process.Process, error) {
// returns jobRouteId, cleanupFunc, error
func (impl *ServerImpl) connectToJobManager(ctx context.Context, jobId string, mainServerJwtToken string) (string, func(), error) {
socketPath := jobmanager.GetJobSocketPath(jobId)
socketPath := wavebase.GetRemoteJobSocketPath(jobId)
log.Printf("connectToJobManager: connecting to socket: %s\n", socketPath)
conn, err := net.Dial("unix", socketPath)
if err != nil {