diff --git a/core/services/nodes/managers_distributed.go b/core/services/nodes/managers_distributed.go index e524756da..373f00b6d 100644 --- a/core/services/nodes/managers_distributed.go +++ b/core/services/nodes/managers_distributed.go @@ -106,6 +106,13 @@ func (d *DistributedBackendManager) enqueueAndDrainBackendOp(ctx context.Context if node.Status == StatusPending { continue } + // Backend lifecycle ops only make sense on backend-type workers. + // Agent workers don't subscribe to backend.install/delete/list, so + // enqueueing for them guarantees a forever-retrying row that the + // reconciler can never drain. Silently skip — they aren't consumers. + if node.NodeType != "" && node.NodeType != NodeTypeBackend { + continue + } if err := d.registry.UpsertPendingBackendOp(ctx, node.ID, backend, op, galleriesJSON); err != nil { xlog.Warn("Failed to enqueue backend op", "op", op, "node", node.Name, "backend", backend, "error", err) result.Nodes = append(result.Nodes, NodeOpStatus{ diff --git a/core/services/nodes/reconciler.go b/core/services/nodes/reconciler.go index 6d87165b4..063a68d39 100644 --- a/core/services/nodes/reconciler.go +++ b/core/services/nodes/reconciler.go @@ -3,12 +3,14 @@ package nodes import ( "context" "encoding/json" + "errors" "fmt" "time" "github.com/mudler/LocalAI/core/services/advisorylock" grpcclient "github.com/mudler/LocalAI/pkg/grpc" "github.com/mudler/xlog" + "github.com/nats-io/nats.go" "gorm.io/gorm" ) @@ -206,12 +208,47 @@ func (rc *ReplicaReconciler) drainPendingBackendOps(ctx context.Context) { } continue } + + // ErrNoResponders means the node has no active NATS subscription for + // this subject. Either its connection dropped, or it's the wrong + // node type entirely. Mark unhealthy so the health monitor's + // heartbeat-only pass doesn't immediately flip it back — and so + // ListDuePendingBackendOps (which filters by status=healthy) stops + // picking the row until the node genuinely recovers. + if errors.Is(applyErr, nats.ErrNoResponders) { + xlog.Warn("Reconciler: no NATS responders — marking node unhealthy", + "op", op.Op, "backend", op.Backend, "node", op.NodeID) + _ = rc.registry.MarkUnhealthy(ctx, op.NodeID) + } + + // Dead-letter cap: after maxAttempts the row is the reconciler + // equivalent of a poison message. Delete it loudly so the queue + // doesn't churn NATS every tick forever — operators can re-issue + // the op from the UI if they still want it applied. + if op.Attempts+1 >= maxPendingBackendOpAttempts { + xlog.Error("Reconciler: abandoning pending backend op after max attempts", + "op", op.Op, "backend", op.Backend, "node", op.NodeID, + "attempts", op.Attempts+1, "last_error", applyErr) + if err := rc.registry.DeletePendingBackendOp(ctx, op.ID); err != nil { + xlog.Warn("Reconciler: failed to delete abandoned op row", "id", op.ID, "error", err) + } + continue + } + _ = rc.registry.RecordPendingBackendOpFailure(ctx, op.ID, applyErr.Error()) xlog.Warn("Reconciler: pending backend op retry failed", "op", op.Op, "backend", op.Backend, "node", op.NodeID, "attempts", op.Attempts+1, "error", applyErr) } } +// maxPendingBackendOpAttempts caps how many times the reconciler retries a +// failing row before dead-lettering it. Ten attempts at exponential backoff +// (30s → 15m cap) is >1h of wall-clock patience — well past any transient +// worker restart or network blip. Poisoned rows beyond that are almost +// certainly structural (wrong node type, non-existent gallery entry) and no +// amount of further retrying will help. +const maxPendingBackendOpAttempts = 10 + // probeLoadedModels gRPC-health-checks model addresses that the DB says are // loaded. If a model's backend process is gone (OOM, crash, manual restart) // we remove the row so ghosts don't linger. Only probes rows older than diff --git a/core/services/nodes/reconciler_test.go b/core/services/nodes/reconciler_test.go index 52a488a2a..50175db35 100644 --- a/core/services/nodes/reconciler_test.go +++ b/core/services/nodes/reconciler_test.go @@ -373,4 +373,30 @@ var _ = Describe("ReplicaReconciler — state reconciliation", func() { Expect(row.NextRetryAt).To(BeTemporally(">", before)) }) }) + + Describe("NewNodeRegistry malformed-row pruning", func() { + It("drops queue rows for agent nodes and non-existent nodes on startup", func() { + agent := &BackendNode{Name: "agent-1", NodeType: NodeTypeAgent, Address: "x"} + Expect(registry.Register(context.Background(), agent, true)).To(Succeed()) + backend := &BackendNode{Name: "backend-1", NodeType: NodeTypeBackend, Address: "y"} + Expect(registry.Register(context.Background(), backend, true)).To(Succeed()) + + // Three rows: one for a valid backend node (should survive), + // one for an agent node (pruned), one for an empty backend name + // on the valid node (pruned). + Expect(registry.UpsertPendingBackendOp(context.Background(), backend.ID, "foo", OpBackendInstall, nil)).To(Succeed()) + Expect(registry.UpsertPendingBackendOp(context.Background(), agent.ID, "foo", OpBackendInstall, nil)).To(Succeed()) + Expect(registry.UpsertPendingBackendOp(context.Background(), backend.ID, "", OpBackendInstall, nil)).To(Succeed()) + + // Re-instantiating the registry runs the cleanup migration. + _, err := NewNodeRegistry(db) + Expect(err).ToNot(HaveOccurred()) + + var rows []PendingBackendOp + Expect(db.Find(&rows).Error).To(Succeed()) + Expect(rows).To(HaveLen(1)) + Expect(rows[0].NodeID).To(Equal(backend.ID)) + Expect(rows[0].Backend).To(Equal("foo")) + }) + }) }) diff --git a/core/services/nodes/registry.go b/core/services/nodes/registry.go index 3894b41c5..76403b36b 100644 --- a/core/services/nodes/registry.go +++ b/core/services/nodes/registry.go @@ -148,6 +148,30 @@ func NewNodeRegistry(db *gorm.DB) (*NodeRegistry, error) { }); err != nil { return nil, fmt.Errorf("migrating node tables: %w", err) } + + // One-shot cleanup of queue rows that can never drain: ops targeted at + // agent workers (wrong subscription set), at non-existent nodes, or with + // an empty backend name. The guard in enqueueAndDrainBackendOp prevents + // new ones from being written, but rows persisted by earlier versions + // keep the reconciler busy retrying a permanently-failing NATS request + // every 30s. Guarded by the same migration advisory lock so only one + // frontend runs it. + _ = advisorylock.WithLockCtx(context.Background(), db, advisorylock.KeySchemaMigrate, func() error { + res := db.Exec(` + DELETE FROM pending_backend_ops + WHERE backend = '' + OR node_id NOT IN (SELECT id FROM backend_nodes WHERE node_type = ? OR node_type = '') + `, NodeTypeBackend) + if res.Error != nil { + xlog.Warn("Failed to prune malformed pending_backend_ops rows", "error", res.Error) + return res.Error + } + if res.RowsAffected > 0 { + xlog.Info("Pruned pending_backend_ops rows (wrong node type or empty backend)", "count", res.RowsAffected) + } + return nil + }) + return &NodeRegistry{db: db}, nil }