batch wsh:run tevents by hour (#3181)

This commit is contained in:
Mike Sawka 2026-04-15 11:06:26 -07:00 committed by GitHub
parent 1fe0fa236c
commit 5e3673c338
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
4 changed files with 50 additions and 5 deletions

View file

@ -1592,7 +1592,8 @@ declare global {
"ai:backendtype"?: string;
"ai:local"?: boolean;
"wsh:cmd"?: string;
"wsh:haderror"?: boolean;
"wsh:errorcount"?: number;
"wsh:count"?: number;
"conn:conntype"?: string;
"conn:wsherrorcode"?: string;
"conn:errorcode"?: string;

View file

@ -27,6 +27,7 @@ import (
const MaxTzNameLen = 50
const ActivityEventName = "app:activity"
const WshRunEventName = "wsh:run"
var cachedTosAgreedTs atomic.Int64
@ -196,6 +197,44 @@ func updateActivityTEvent(ctx context.Context, tevent *telemetrydata.TEvent) err
})
}
// aggregates wsh:run events per (cmd, haderror) key within the current 1-hour bucket
func updateWshRunTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error {
eventTs := time.Now().Truncate(time.Hour).Add(time.Hour)
incomingCount := tevent.Props.WshCount
if incomingCount <= 0 {
incomingCount = 1
}
return wstore.WithTx(ctx, func(tx *wstore.TxWrap) error {
uuidStr := tx.GetString(
`SELECT uuid FROM db_tevent WHERE ts = ? AND event = ? AND json_extract(props, '$."wsh:cmd"') IS ?`,
eventTs.UnixMilli(), WshRunEventName, tevent.Props.WshCmd,
)
if uuidStr != "" {
var curProps telemetrydata.TEventProps
rawProps := tx.GetString(`SELECT props FROM db_tevent WHERE uuid = ?`, uuidStr)
if rawProps != "" {
if err := json.Unmarshal([]byte(rawProps), &curProps); err != nil {
log.Printf("error unmarshalling wsh:run props: %v\n", err)
}
}
curCount := curProps.WshCount
if curCount <= 0 {
curCount = 1
}
curProps.WshCount = curCount + incomingCount
curProps.WshErrorCount += tevent.Props.WshErrorCount
tx.Exec(`UPDATE db_tevent SET props = ? WHERE uuid = ?`, dbutil.QuickJson(curProps), uuidStr)
} else {
newProps := tevent.Props
newProps.WshCount = incomingCount
tsLocal := utilfn.ConvertToWallClockPT(eventTs).Format(time.RFC3339)
tx.Exec(`INSERT INTO db_tevent (uuid, ts, tslocal, event, props) VALUES (?, ?, ?, ?, ?)`,
uuid.New().String(), eventTs.UnixMilli(), tsLocal, WshRunEventName, dbutil.QuickJson(newProps))
}
return nil
})
}
func TruncateActivityTEventForShutdown(ctx context.Context) error {
nowTs := time.Now()
eventTs := nowTs.Truncate(time.Hour).Add(time.Hour)
@ -259,6 +298,9 @@ func RecordTEvent(ctx context.Context, tevent *telemetrydata.TEvent) error {
if tevent.Event == ActivityEventName {
return updateActivityTEvent(ctx, tevent)
}
if tevent.Event == WshRunEventName {
return updateWshRunTEvent(ctx, tevent)
}
return insertTEvent(ctx, tevent)
}

View file

@ -126,8 +126,9 @@ type TEventProps struct {
AiBackendType string `json:"ai:backendtype,omitempty"`
AiLocal bool `json:"ai:local,omitempty"`
WshCmd string `json:"wsh:cmd,omitempty"`
WshHadError bool `json:"wsh:haderror,omitempty"`
WshCmd string `json:"wsh:cmd,omitempty"`
WshErrorCount int `json:"wsh:errorcount,omitempty"`
WshCount int `json:"wsh:count,omitempty"`
ConnType string `json:"conn:conntype,omitempty"`
ConnWshErrorCode string `json:"conn:wsherrorcode,omitempty"`

View file

@ -1329,7 +1329,8 @@ func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int
delete(data, key)
}
if strings.HasSuffix(key, "#error") {
props.WshHadError = true
props.WshCmd = strings.TrimSuffix(key, "#error")
props.WshErrorCount = 1
} else {
props.WshCmd = key
}
@ -1339,7 +1340,7 @@ func (ws *WshServer) WshActivityCommand(ctx context.Context, data map[string]int
}
telemetry.GoUpdateActivityWrap(activityUpdate, "wsh-activity")
telemetry.GoRecordTEventWrap(&telemetrydata.TEvent{
Event: "wsh:run",
Event: telemetry.WshRunEventName,
Props: props,
})
return nil