From 44e7d9806b717bbef9cfdf213b0e96db9e4d7a07 Mon Sep 17 00:00:00 2001 From: Ettore Di Giacinto Date: Sun, 19 Apr 2026 21:27:05 +0000 Subject: [PATCH] fix(distributed): stop queue loops on agent nodes + dead-letter cap MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit pending_backend_ops rows targeting agent-type workers looped forever: the reconciler fan-out hit a NATS subject the worker doesn't subscribe to, returned ErrNoResponders, we marked the node unhealthy, and the health monitor flipped it back to healthy on the next heartbeat. Next tick, same row, same failure. Three related fixes: 1. enqueueAndDrainBackendOp skips nodes whose NodeType != backend. Agent workers handle agent NATS subjects, not backend.install / delete / list, so enqueueing for them guarantees an infinite retry loop. Silent skip is correct — they aren't consumers of these ops. 2. Reconciler drain mirrors enqueueAndDrainBackendOp's behavior on nats.ErrNoResponders: mark the node unhealthy before recording the failure, so subsequent ListDuePendingBackendOps (filters by status=healthy) stops picking the row until the node actually recovers. Matches the synchronous fan-out path. 3. Dead-letter cap at maxPendingBackendOpAttempts (10). After ~1h of exponential backoff the row is a poison message; further retries just thrash NATS. Row is deleted and logged at ERROR so it stays visible without staying infinite. Plus a one-shot startup cleanup in NewNodeRegistry: drop queue rows that target agent-type nodes, non-existent nodes, or carry an empty backend name. Guarded by the same schema-migration advisory lock so only one instance performs it. The guards above prevent new rows of this shape; this closes the migration gap for existing ones. Tests: the prune migration (valid row stays, agent + empty-name rows drop) on top of existing upsert / backoff coverage. --- core/services/nodes/managers_distributed.go | 7 ++++ core/services/nodes/reconciler.go | 37 +++++++++++++++++++++ core/services/nodes/reconciler_test.go | 26 +++++++++++++++ core/services/nodes/registry.go | 24 +++++++++++++ 4 files changed, 94 insertions(+) diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index e524756da..373f00b6d 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -106,6 +106,13 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context if node.Status == StatusPending { continue } + // Backend lifecycle ops only make sense on backend-type workers. + // Agent workers don't subscribe to backend.install/delete/list, so + // enqueueing for them guarantees a forever-retrying row that the + // reconciler can never drain. Silently skip — they aren't consumers. + if node.NodeType != "" && node.NodeType != NodeTypeBackend { + continue + } if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil { xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err) result.Nodes = append(result.Nodes, NodeOpStatus{ diff --git a/core/services/nodes/reconciler.go b/core/services/nodes/reconciler.go index 6d87165b4..063a68d39 100644 --- a/core/services/nodes/reconciler.go +++ b/core/services/nodes/reconciler.go @@ -3,12 +3,14 @@ package nodes import ( "context" "encoding/json" + "errors" "fmt" "time" "github.com/mudler/LocalAI/core/services/advisorylock" grpcclient "github.com/mudler/LocalAI/pkg/grpc" "github.com/mudler/xlog" + "github.com/nats-io/nats.go" "gorm.io/gorm" ) @@ -206,12 +208,47 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { } continue } + + // ErrNoResponders means the node has no active NATS subscription for + // this subject. Either its connection dropped, or it's the wrong + // node type entirely. Mark unhealthy so the health monitor's + // heartbeat-only pass doesn't immediately flip it back — and so + // ListDuePendingBackendOps (which filters by status=healthy) stops + // picking the row until the node genuinely recovers. + if errors.Is(applyErr, nats.ErrNoResponders) { + xlog.Warn("Reconciler: no NATS responders — marking node unhealthy", + "op", op.Op, "backend", op.Backend, "node", op.NodeID) + _ = rc.registry.MarkUnhealthy(ctx, op.NodeID) + } + + // Dead-letter cap: after maxAttempts the row is the reconciler + // equivalent of a poison message. Delete it loudly so the queue + // doesn't churn NATS every tick forever — operators can re-issue + // the op from the UI if they still want it applied. + if op.Attempts+1 >= maxPendingBackendOpAttempts { + xlog.Error("Reconciler: abandoning pending backend op after max attempts", + "op", op.Op, "backend", op.Backend, "node", op.NodeID, + "attempts", op.Attempts+1, "last_error", applyErr) + if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil { + xlog.Warn("Reconciler: failed to delete abandoned op row", "id", op.ID, "error", err) + } + continue + } + _ = rc.registry.RecordPendingBackendOpFailure(ctx, op.ID, applyErr.Error()) xlog.Warn("Reconciler: pending backend op retry failed", "op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1, "error", applyErr) } } +// maxPendingBackendOpAttempts caps how many times the reconciler retries a +// failing row before dead-lettering it. Ten attempts at exponential backoff +// (30s → 15m cap) is >1h of wall-clock patience — well past any transient +// worker restart or network blip. Poisoned rows beyond that are almost +// certainly structural (wrong node type, non-existent gallery entry) and no +// amount of further retrying will help. +const maxPendingBackendOpAttempts = 10 + // probeLoadedModels gRPC-health-checks model addresses that the DB says are // loaded. If a model's backend process is gone (OOM, crash, manual restart) // we remove the row so ghosts don't linger. Only probes rows older than diff --git a/core/services/nodes/reconciler_test.go b/core/services/nodes/reconciler_test.go index 52a488a2a..50175db35 100644 --- a/core/services/nodes/reconciler_test.go +++ b/core/services/nodes/reconciler_test.go @@ -373,4 +373,30 @@ var _ = Describe("ReplicaReconciler — state reconciliation", func() { Expect(row.NextRetryAt).To(BeTemporally(">", before)) }) }) + + Describe("NewNodeRegistry malformed-row pruning", func() { + It("drops queue rows for agent nodes and non-existent nodes on startup", func() { + agent := &BackendNode{Name: "agent-1", NodeType: NodeTypeAgent, Address: "x"} + Expect(registry.Register(context.Background(), agent, true)).To(Succeed()) + backend := &BackendNode{Name: "backend-1", NodeType: NodeTypeBackend, Address: "y"} + Expect(registry.Register(context.Background(), backend, true)).To(Succeed()) + + // Three rows: one for a valid backend node (should survive), + // one for an agent node (pruned), one for an empty backend name + // on the valid node (pruned). + Expect(registry.UpsertPendingBackendOp(context.Background(), backend.ID, "foo", OpBackendInstall, nil)).To(Succeed()) + Expect(registry.UpsertPendingBackendOp(context.Background(), agent.ID, "foo", OpBackendInstall, nil)).To(Succeed()) + Expect(registry.UpsertPendingBackendOp(context.Background(), backend.ID, "", OpBackendInstall, nil)).To(Succeed()) + + // Re-instantiating the registry runs the cleanup migration. + _, err := NewNodeRegistry(db) + Expect(err).ToNot(HaveOccurred()) + + var rows []PendingBackendOp + Expect(db.Find(&rows).Error).To(Succeed()) + Expect(rows).To(HaveLen(1)) + Expect(rows[0].NodeID).To(Equal(backend.ID)) + Expect(rows[0].Backend).To(Equal("foo")) + }) + }) }) diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index 3894b41c5..76403b36b 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -148,6 +148,30 @@ func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) { }); err != nil { return nil, fmt.Errorf("migrating node tables: %w", err) } + + // One-shot cleanup of queue rows that can never drain: ops targeted at + // agent workers (wrong subscription set), at non-existent nodes, or with + // an empty backend name. The guard in enqueueAndDrainBackendOp prevents + // new ones from being written, but rows persisted by earlier versions + // keep the reconciler busy retrying a permanently-failing NATS request + // every 30s. Guarded by the same migration advisory lock so only one + // frontend runs it. + _ = advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error { + res := db.Exec(` + DELETE FROM pending_backend_ops + WHERE backend = '' + OR node_id NOT IN (SELECT id FROM backend_nodes WHERE node_type = ? OR node_type = '') + `, NodeTypeBackend) + if res.Error != nil { + xlog.Warn("Failed to prune malformed pending_backend_ops rows", "error", res.Error) + return res.Error + } + if res.RowsAffected > 0 { + xlog.Info("Pruned pending_backend_ops rows (wrong node type or empty backend)", "count", res.RowsAffected) + } + return nil + }) + return &NodeRegistry{db: db}, nil }