time trigger executor

This commit is contained in:
booleanmaybe 2026-04-07 16:04:53 -04:00
parent b5921c68b4
commit 7d48d538c3
7 changed files with 401 additions and 19 deletions

View file

@ -119,7 +119,7 @@ func Bootstrap(tikiSkillContent, dokiSkillContent string) (*Result, error) {
// Phase 6.5: Trigger system
schema := rukiRuntime.NewSchema()
userName, _, _ := taskStore.GetCurrentUser()
triggerCount, err := service.LoadAndRegisterTriggers(gate, schema, func() string { return userName })
triggerEngine, triggerCount, err := service.LoadAndRegisterTriggers(gate, schema, func() string { return userName })
if err != nil {
return nil, fmt.Errorf("load triggers: %w", err)
}
@ -178,6 +178,7 @@ func Bootstrap(tikiSkillContent, dokiSkillContent string) (*Result, error) {
// Phase 11: Background tasks
ctx, cancel := context.WithCancel(context.Background()) //nolint:gosec // G118: cancel stored in Result.CancelFunc, called by app shutdown
background.StartBurndownHistoryBuilder(ctx, tikiStore, headerConfig, application)
triggerEngine.StartScheduler(ctx)
// Phase 12: Navigation and input wiring
wireNavigation(controllers.Nav, layoutModel, rootLayout)

View file

@ -98,7 +98,7 @@ func CreateTaskFromReader(r io.Reader) (string, error) {
// load triggers so piped creates fire them
schema := rukiRuntime.NewSchema()
userName, _, _ := taskStore.GetCurrentUser()
if _, loadErr := service.LoadAndRegisterTriggers(gate, schema, func() string { return userName }); loadErr != nil {
if _, _, loadErr := service.LoadAndRegisterTriggers(gate, schema, func() string { return userName }); loadErr != nil {
return "", fmt.Errorf("load triggers: %w", loadErr)
}

View file

@ -233,7 +233,7 @@ func runExec(args []string) int {
// load triggers so exec queries fire them
schema := rukiRuntime.NewSchema()
userName, _, _ := taskStore.GetCurrentUser()
if _, err := service.LoadAndRegisterTriggers(gate, schema, func() string { return userName }); err != nil {
if _, _, err := service.LoadAndRegisterTriggers(gate, schema, func() string { return userName }); err != nil {
_, _ = fmt.Fprintf(os.Stderr, "error: load triggers: %v\n", err)
return exitStartupFailure
}

View file

@ -43,6 +43,17 @@ func (te *TriggerExecutor) EvalGuard(trig *Trigger, tc *TriggerContext) (bool, e
return exec.evalCondition(trig.Where, sentinel, tc.AllTasks)
}
// ExecTimeTriggerAction executes a time trigger's action against all tasks.
// Uses a plain Executor (no old/new overrides) since time triggers have no
// mutation context — the parser forbids qualified refs in them.
func (te *TriggerExecutor) ExecTimeTriggerAction(tt *TimeTrigger, allTasks []*task.Task) (*Result, error) {
if tt.Action == nil {
return nil, fmt.Errorf("time trigger has no action")
}
exec := NewExecutor(te.schema, te.userFunc)
return exec.Execute(tt.Action, allTasks)
}
// ExecAction executes a trigger's CRUD action statement and returns the result.
// QualifiedRefs resolve against tc.Old/tc.New. Bare fields resolve against target tasks.
// Returns *Result for persistence by service/.

View file

@ -2420,3 +2420,100 @@ func TestEvalCountOverride_NonSubQueryArg(t *testing.T) {
t.Fatalf("expected 'count() argument must be a select subquery' error, got: %v", err)
}
}
// --- ExecTimeTriggerAction ---
func TestExecTimeTriggerAction_Update(t *testing.T) {
te := newTestTriggerExecutor()
p := newTestParser()
tt, err := p.ParseTimeTrigger(`every 1day update where status = "in_progress" set status="backlog"`)
if err != nil {
t.Fatalf("parse: %v", err)
}
tasks := []*task.Task{
{ID: "TIKI-000001", Status: "in_progress", Title: "stale", Type: "story", Priority: 3},
{ID: "TIKI-000002", Status: "done", Title: "finished", Type: "story", Priority: 3},
}
result, err := te.ExecTimeTriggerAction(tt, tasks)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Update == nil {
t.Fatal("expected Update result")
}
if len(result.Update.Updated) != 1 {
t.Fatalf("expected 1 updated task, got %d", len(result.Update.Updated))
}
if result.Update.Updated[0].Status != "backlog" {
t.Fatalf("expected status=backlog, got %q", result.Update.Updated[0].Status)
}
}
func TestExecTimeTriggerAction_Create(t *testing.T) {
te := newTestTriggerExecutor()
p := newTestParser()
tt, err := p.ParseTimeTrigger(`every 1day create title="daily standup" status="ready" type="story" priority=3`)
if err != nil {
t.Fatalf("parse: %v", err)
}
result, err := te.ExecTimeTriggerAction(tt, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Create == nil {
t.Fatal("expected Create result")
}
if result.Create.Task.Title != "daily standup" {
t.Fatalf("expected title='daily standup', got %q", result.Create.Task.Title)
}
if result.Create.Task.Status != "ready" {
t.Fatalf("expected status=ready, got %q", result.Create.Task.Status)
}
}
func TestExecTimeTriggerAction_Delete(t *testing.T) {
te := newTestTriggerExecutor()
p := newTestParser()
tt, err := p.ParseTimeTrigger(`every 1day delete where status = "done"`)
if err != nil {
t.Fatalf("parse: %v", err)
}
tasks := []*task.Task{
{ID: "TIKI-000001", Status: "done", Title: "finished", Type: "story", Priority: 3},
{ID: "TIKI-000002", Status: "ready", Title: "active", Type: "story", Priority: 3},
{ID: "TIKI-000003", Status: "done", Title: "also done", Type: "story", Priority: 3},
}
result, err := te.ExecTimeTriggerAction(tt, tasks)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if result.Delete == nil {
t.Fatal("expected Delete result")
}
if len(result.Delete.Deleted) != 2 {
t.Fatalf("expected 2 deleted tasks, got %d", len(result.Delete.Deleted))
}
}
func TestExecTimeTriggerAction_NilAction(t *testing.T) {
te := newTestTriggerExecutor()
tt := &TimeTrigger{
Interval: DurationLiteral{Value: 1, Unit: "day"},
Action: nil,
}
_, err := te.ExecTimeTriggerAction(tt, nil)
if err == nil {
t.Fatal("expected error for nil action")
}
if !strings.Contains(err.Error(), "time trigger has no action") {
t.Fatalf("expected 'time trigger has no action' error, got: %v", err)
}
}

View file

@ -11,6 +11,7 @@ import (
"github.com/boolean-maybe/tiki/config"
"github.com/boolean-maybe/tiki/ruki"
"github.com/boolean-maybe/tiki/task"
"github.com/boolean-maybe/tiki/util/duration"
)
// maxTriggerDepth is the maximum cascade depth for triggers.
@ -223,15 +224,21 @@ func (te *TriggerEngine) execRun(ctx context.Context, entry triggerEntry, tc *ru
}
// LoadAndRegisterTriggers loads trigger definitions from workflow.yaml, parses them,
// and registers them with the gate. Returns the number of triggers loaded.
// and registers them with the gate. Returns the engine (always non-nil), the number
// of triggers loaded, and any error. Callers can call StartScheduler on the engine
// without nil-checking — it early-returns on zero time triggers.
// Fails fast on parse errors — a bad trigger blocks startup.
func LoadAndRegisterTriggers(gate *TaskMutationGate, schema ruki.Schema, userFunc func() string) (int, error) {
func LoadAndRegisterTriggers(gate *TaskMutationGate, schema ruki.Schema, userFunc func() string) (*TriggerEngine, int, error) {
executor := ruki.NewTriggerExecutor(schema, userFunc)
empty := func() *TriggerEngine { return NewTriggerEngine(nil, nil, executor) }
defs, err := config.LoadTriggerDefs()
if err != nil {
return 0, fmt.Errorf("loading trigger definitions: %w", err)
return empty(), 0, fmt.Errorf("loading trigger definitions: %w", err)
}
if len(defs) == 0 {
return 0, nil
return empty(), 0, nil
}
parser := ruki.NewParser(schema)
@ -246,7 +253,7 @@ func LoadAndRegisterTriggers(gate *TaskMutationGate, schema ruki.Schema, userFun
rule, err := parser.ParseRule(def.Ruki)
if err != nil {
return 0, fmt.Errorf("trigger %q: %w", desc, err)
return empty(), 0, fmt.Errorf("trigger %q: %w", desc, err)
}
switch {
@ -263,12 +270,62 @@ func LoadAndRegisterTriggers(gate *TaskMutationGate, schema ruki.Schema, userFun
}
}
executor := ruki.NewTriggerExecutor(schema, userFunc)
engine := NewTriggerEngine(eventEntries, timeEntries, executor)
engine.RegisterWithGate(gate)
total := len(eventEntries) + len(timeEntries)
slog.Info("triggers loaded", "event", len(eventEntries), "time", len(timeEntries))
return total, nil
return engine, total, nil
}
// StartScheduler launches a background goroutine for each time trigger.
// Each goroutine fires on a time.Ticker interval. Context cancellation stops all goroutines.
// Safe to call even when there are no time triggers — returns immediately.
func (te *TriggerEngine) StartScheduler(ctx context.Context) {
if len(te.timeTriggers) == 0 {
return
}
for _, entry := range te.timeTriggers {
d, err := duration.ToDuration(entry.Trigger.Interval.Value, entry.Trigger.Interval.Unit)
if err != nil {
slog.Error("invalid time trigger interval, skipping",
"trigger", entry.Description, "error", err)
continue
}
slog.Info("starting time trigger scheduler",
"trigger", entry.Description, "interval", d)
go te.runTimeTrigger(ctx, entry, d)
}
}
// runTimeTrigger runs a single time trigger on a ticker loop until ctx is cancelled.
// All errors are logged and swallowed — the ticker keeps running (fail-open).
func (te *TriggerEngine) runTimeTrigger(ctx context.Context, entry TimeTriggerEntry, interval time.Duration) {
ticker := time.NewTicker(interval)
defer ticker.Stop()
for {
select {
case <-ctx.Done():
return
case <-ticker.C:
te.executeTimeTrigger(ctx, entry)
}
}
}
// executeTimeTrigger runs a single tick of a time trigger: snapshot tasks, execute, persist.
func (te *TriggerEngine) executeTimeTrigger(ctx context.Context, entry TimeTriggerEntry) {
allTasks := te.gate.ReadStore().GetAllTasks()
result, err := te.executor.ExecTimeTriggerAction(entry.Trigger, allTasks)
if err != nil {
slog.Error("time trigger action failed",
"trigger", entry.Description, "error", err)
return
}
if err := te.persistResult(ctx, result); err != nil {
slog.Error("time trigger persist failed",
"trigger", entry.Description, "error", err)
}
}

View file

@ -662,16 +662,19 @@ func TestTriggerEngine_BeforeDeleteUnconditionalDeny(t *testing.T) {
// --- LoadAndRegisterTriggers ---
func TestLoadAndRegisterTriggers_EmptyDefs(t *testing.T) {
// no workflow files → empty defs → 0, nil
// no workflow files → empty defs → engine, 0, nil
gate := NewTaskMutationGate()
schema := testTriggerSchema{}
count, err := LoadAndRegisterTriggers(gate, schema, nil)
engine, count, err := LoadAndRegisterTriggers(gate, schema, nil)
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if count != 0 {
t.Fatalf("expected 0 triggers loaded, got %d", count)
}
if engine == nil {
t.Fatal("expected non-nil engine even with zero triggers")
}
}
// --- coverage gap tests ---
@ -848,7 +851,7 @@ func TestLoadAndRegisterTriggers_WithValidTriggers(t *testing.T) {
s := store.NewInMemoryStore()
gate.SetStore(s)
count, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, func() string { return "test-user" })
_, count, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, func() string { return "test-user" })
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
@ -884,13 +887,16 @@ func TestLoadAndRegisterTriggers_ParseError(t *testing.T) {
gate := NewTaskMutationGate()
gate.SetStore(store.NewInMemoryStore())
_, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
engine, _, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
if err == nil {
t.Fatal("expected parse error")
}
if !strings.Contains(err.Error(), "broken") {
t.Fatalf("expected trigger description in error, got: %v", err)
}
if engine == nil {
t.Fatal("expected non-nil engine even on error")
}
}
func TestLoadAndRegisterTriggers_ParseErrorNoDescription(t *testing.T) {
@ -906,7 +912,7 @@ func TestLoadAndRegisterTriggers_ParseErrorNoDescription(t *testing.T) {
gate := NewTaskMutationGate()
gate.SetStore(store.NewInMemoryStore())
_, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
_, _, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
if err == nil {
t.Fatal("expected parse error")
}
@ -1017,13 +1023,16 @@ func TestLoadAndRegisterTriggers_LoadDefError(t *testing.T) {
gate := NewTaskMutationGate()
gate.SetStore(store.NewInMemoryStore())
_, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
engine, _, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
if err == nil {
t.Fatal("expected error for unreadable workflow.yaml")
}
if !strings.Contains(err.Error(), "loading trigger definitions") {
t.Fatalf("expected 'loading trigger definitions' error, got: %v", err)
}
if engine == nil {
t.Fatal("expected non-nil engine even on load error")
}
}
func TestTriggerEngine_ExecRunEvalError(t *testing.T) {
@ -1069,13 +1078,16 @@ func TestLoadAndRegisterTriggers_MixedEventAndTimeTriggers(t *testing.T) {
gate := NewTaskMutationGate()
gate.SetStore(store.NewInMemoryStore())
count, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, func() string { return "test-user" })
engine, count, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, func() string { return "test-user" })
if err != nil {
t.Fatalf("unexpected error: %v", err)
}
if count != 3 {
t.Fatalf("expected 3 triggers loaded, got %d", count)
}
if engine == nil {
t.Fatal("expected non-nil engine with mixed triggers")
}
}
func TestLoadAndRegisterTriggers_TimeTriggerAccessor(t *testing.T) {
@ -1115,7 +1127,7 @@ func TestLoadAndRegisterTriggers_InvalidTimeTrigger(t *testing.T) {
gate := NewTaskMutationGate()
gate.SetStore(store.NewInMemoryStore())
_, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
_, _, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
if err == nil {
t.Fatal("expected error for invalid time trigger")
}
@ -1138,7 +1150,7 @@ func TestLoadAndRegisterTriggers_RunRejectedInTimeTrigger(t *testing.T) {
gate := NewTaskMutationGate()
gate.SetStore(store.NewInMemoryStore())
_, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
_, _, err := LoadAndRegisterTriggers(gate, testTriggerSchema{}, nil)
if err == nil {
t.Fatal("expected error for run() in time trigger")
}
@ -1146,3 +1158,207 @@ func TestLoadAndRegisterTriggers_RunRejectedInTimeTrigger(t *testing.T) {
t.Fatalf("expected trigger description in error, got: %v", err)
}
}
// --- StartScheduler tests ---
func TestTriggerEngine_StartScheduler_TickExecutes(t *testing.T) {
// time trigger: delete all done tasks every 50ms
p := ruki.NewParser(testTriggerSchema{})
tt, err := p.ParseTimeTrigger(`every 1sec delete where status = "done"`)
if err != nil {
t.Fatalf("parse: %v", err)
}
doneTk := &task.Task{ID: "TIKI-DONE01", Title: "done task", Status: "done", Type: "story", Priority: 3}
activeTk := &task.Task{ID: "TIKI-ACT001", Title: "active", Status: "ready", Type: "story", Priority: 3}
gate, s := newGateWithStoreAndTasks(doneTk, activeTk)
engine := NewTriggerEngine(nil, []TimeTriggerEntry{
{Description: "cleanup", Trigger: tt},
}, ruki.NewTriggerExecutor(testTriggerSchema{}, nil))
engine.RegisterWithGate(gate)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// override the interval to 50ms for fast test
go engine.runTimeTrigger(ctx, engine.timeTriggers[0], 50*time.Millisecond)
// wait long enough for at least one tick
time.Sleep(200 * time.Millisecond)
cancel()
// done task should have been deleted
if s.GetTask("TIKI-DONE01") != nil {
t.Fatal("expected done task to be deleted by time trigger")
}
// active task should remain
if s.GetTask("TIKI-ACT001") == nil {
t.Fatal("expected active task to remain")
}
}
func TestTriggerEngine_StartScheduler_NoTimeTriggers(t *testing.T) {
// StartScheduler with no time triggers should return immediately without error
engine := NewTriggerEngine(nil, nil, ruki.NewTriggerExecutor(testTriggerSchema{}, nil))
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// should not block or panic
engine.StartScheduler(ctx)
}
func TestTriggerEngine_StartScheduler_ContextCancellation(t *testing.T) {
p := ruki.NewParser(testTriggerSchema{})
tt, err := p.ParseTimeTrigger(`every 1day delete where status = "done"`)
if err != nil {
t.Fatalf("parse: %v", err)
}
gate, _ := newGateWithStoreAndTasks()
engine := NewTriggerEngine(nil, []TimeTriggerEntry{
{Description: "daily cleanup", Trigger: tt},
}, ruki.NewTriggerExecutor(testTriggerSchema{}, nil))
engine.RegisterWithGate(gate)
ctx, cancel := context.WithCancel(context.Background())
done := make(chan struct{})
go func() {
engine.runTimeTrigger(ctx, engine.timeTriggers[0], 1*time.Hour)
close(done)
}()
// cancel immediately — goroutine should exit promptly
cancel()
select {
case <-done:
// success — goroutine exited
case <-time.After(2 * time.Second):
t.Fatal("runTimeTrigger did not exit after context cancellation")
}
}
func TestTriggerEngine_StartScheduler_ActionErrorContinues(t *testing.T) {
// time trigger with an action that will error on execution
// (update with assignment to immutable field)
p := ruki.NewParser(testTriggerSchema{})
tt, err := p.ParseTimeTrigger(`every 1sec update where status = "ready" set createdBy="hacker"`)
if err != nil {
t.Fatalf("parse: %v", err)
}
tk := &task.Task{ID: "TIKI-ERR001", Title: "test", Status: "ready", Type: "story", Priority: 3}
gate, s := newGateWithStoreAndTasks(tk)
engine := NewTriggerEngine(nil, []TimeTriggerEntry{
{Description: "broken trigger", Trigger: tt},
}, ruki.NewTriggerExecutor(testTriggerSchema{}, nil))
engine.RegisterWithGate(gate)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// run with a short interval — the error should be swallowed and ticker continues
go engine.runTimeTrigger(ctx, engine.timeTriggers[0], 50*time.Millisecond)
time.Sleep(200 * time.Millisecond)
cancel()
// task should remain unchanged since the action errored
persisted := s.GetTask("TIKI-ERR001")
if persisted == nil {
t.Fatal("task should still exist")
}
if persisted.CreatedBy != "" {
t.Errorf("createdBy should be unchanged, got %q", persisted.CreatedBy)
}
}
func TestTriggerEngine_StartScheduler_ValidTriggerRuns(t *testing.T) {
// verify StartScheduler actually launches goroutines that execute the trigger
p := ruki.NewParser(testTriggerSchema{})
tt, err := p.ParseTimeTrigger(`every 1sec delete where status = "done"`)
if err != nil {
t.Fatalf("parse: %v", err)
}
doneTk := &task.Task{ID: "TIKI-SCH001", Title: "done task", Status: "done", Type: "story", Priority: 3}
gate, s := newGateWithStoreAndTasks(doneTk)
engine := NewTriggerEngine(nil, []TimeTriggerEntry{
{Description: "scheduler-test", Trigger: tt},
}, ruki.NewTriggerExecutor(testTriggerSchema{}, nil))
engine.RegisterWithGate(gate)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
engine.StartScheduler(ctx)
// 1sec is the smallest parseable interval; wait long enough for one tick
time.Sleep(1500 * time.Millisecond)
cancel()
if s.GetTask("TIKI-SCH001") != nil {
t.Fatal("expected done task to be deleted by scheduler")
}
}
func TestTriggerEngine_StartScheduler_InvalidIntervalSkipped(t *testing.T) {
// construct a time trigger with an unrecognized unit — StartScheduler should
// log an error and skip it without panicking or launching a goroutine
tt := &ruki.TimeTrigger{
Interval: ruki.DurationLiteral{Value: 1, Unit: "fortnights"},
Action: nil, // won't be reached
}
gate, _ := newGateWithStoreAndTasks()
engine := NewTriggerEngine(nil, []TimeTriggerEntry{
{Description: "bad interval", Trigger: tt},
}, ruki.NewTriggerExecutor(testTriggerSchema{}, nil))
engine.RegisterWithGate(gate)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// should not panic or launch any goroutines
engine.StartScheduler(ctx)
}
func TestTriggerEngine_ExecuteTimeTrigger_PersistError(t *testing.T) {
// update time trigger where persistResult fails because a before-update
// validator denies the mutation
p := ruki.NewParser(testTriggerSchema{})
tt, err := p.ParseTimeTrigger(`every 1sec update where status = "ready" set status="in_progress"`)
if err != nil {
t.Fatalf("parse: %v", err)
}
tk := &task.Task{ID: "TIKI-PER001", Title: "target", Status: "ready", Type: "story", Priority: 3}
gate, s := newGateWithStoreAndTasks(tk)
// register a before-update validator that always denies
gate.OnUpdate(func(old, proposed *task.Task, all []*task.Task) *Rejection {
return &Rejection{Reason: "update blocked by validator"}
})
engine := NewTriggerEngine(nil, []TimeTriggerEntry{
{Description: "persist-fail", Trigger: tt},
}, ruki.NewTriggerExecutor(testTriggerSchema{}, nil))
engine.RegisterWithGate(gate)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
// run one tick — persist should fail (logged), ticker continues
go engine.runTimeTrigger(ctx, engine.timeTriggers[0], 50*time.Millisecond)
time.Sleep(200 * time.Millisecond)
cancel()
// task should remain unchanged since persist was rejected
persisted := s.GetTask("TIKI-PER001")
if persisted == nil {
t.Fatal("task should still exist")
}
if persisted.Status != "ready" {
t.Errorf("status should be unchanged, got %q", persisted.Status)
}
}