From ecb1a1caca1d6b09219f1816bc9453a3e5a0621f Mon Sep 17 00:00:00 2001 From: Scott Gress Date: Tue, 13 May 2025 12:06:20 -0500 Subject: [PATCH] Add retry when applying queries (#28951) for #28642 > Note: this PR diff is easier to view [with whitespace off](https://github.com/fleetdm/fleet/pull/28951/files?w=1). ## Details This PR adds retry to the "Apply Queries" logic, in an attempt to alleviate deadlock issues when applying queries via GitOps. The `applyQueriesInTx` now uses `withRetryTxx` instead of starting a transaction with `BeginTxx`. This requires some downstream updates to `updateQueryLabels` and `updateQueryLabelsInTx`, see PR comments for details. This is a first (and hopefully only necessary) step to fixing the deadlock issues. If needed, we have other steps we can take like batching the query inserts and splitting the read/write in saveHostPackStatsDB (see https://github.com/fleetdm/fleet/issues/28642#issuecomment-2845804689) ## Testing I tested this manually using `fleetctl gitops` to apply queries with and without labels. Existing automated tests for Apply Queries still pass. --- changes/48642-add-retry-to-apply-queries | 1 + server/datastore/mysql/queries.go | 262 ++++++++++------------- 2 files changed, 114 insertions(+), 149 deletions(-) create mode 100644 changes/48642-add-retry-to-apply-queries diff --git a/changes/48642-add-retry-to-apply-queries b/changes/48642-add-retry-to-apply-queries new file mode 100644 index 0000000000..9068137e49 --- /dev/null +++ b/changes/48642-add-retry-to-apply-queries @@ -0,0 +1 @@ +- Fixed issue where GitOps may fail to apply new queries due to deadlocks. diff --git a/server/datastore/mysql/queries.go b/server/datastore/mysql/queries.go index 50cd797256..8b52a07afa 100644 --- a/server/datastore/mysql/queries.go +++ b/server/datastore/mysql/queries.go @@ -41,131 +41,115 @@ func (ds *Datastore) ApplyQueries(ctx context.Context, authorID uint, queries [] } func (ds *Datastore) applyQueriesInTx(ctx context.Context, authorID uint, queries []*fleet.Query) (err error) { - tx, err := ds.writer(ctx).BeginTxx(ctx, nil) - if err != nil { - return ctxerr.Wrap(ctx, err, "begin applyQueriesInTx") - } - - defer func() { - if err != nil { - rbErr := tx.Rollback() - // It seems possible that there might be a case in - // which the error we are dealing with here was thrown - // by the call to tx.Commit(), and the docs suggest - // this call would then result in sql.ErrTxDone. - if rbErr != nil && rbErr != sql.ErrTxDone { - panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err)) + err = ds.withRetryTxx(ctx, func(tx sqlx.ExtContext) error { + insertSql := ` + INSERT INTO queries ( + name, + description, + query, + author_id, + saved, + observer_can_run, + team_id, + team_id_char, + platform, + min_osquery_version, + schedule_interval, + automations_enabled, + logging_type, + discard_data + ) VALUES ( ?, ?, ?, ?, true, ?, ?, ?, ?, ?, ?, ?, ?, ? ) + ON DUPLICATE KEY UPDATE + name = VALUES(name), + description = VALUES(description), + query = VALUES(query), + author_id = VALUES(author_id), + saved = VALUES(saved), + observer_can_run = VALUES(observer_can_run), + team_id = VALUES(team_id), + team_id_char = VALUES(team_id_char), + platform = VALUES(platform), + min_osquery_version = VALUES(min_osquery_version), + schedule_interval = VALUES(schedule_interval), + automations_enabled = VALUES(automations_enabled), + logging_type = VALUES(logging_type), + discard_data = VALUES(discard_data) + ` + for _, q := range queries { + if err := q.Verify(); err != nil { + return ctxerr.Wrap(ctx, err) } - } - }() - - insertSql := ` - INSERT INTO queries ( - name, - description, - query, - author_id, - saved, - observer_can_run, - team_id, - team_id_char, - platform, - min_osquery_version, - schedule_interval, - automations_enabled, - logging_type, - discard_data - ) VALUES ( ?, ?, ?, ?, true, ?, ?, ?, ?, ?, ?, ?, ?, ? ) - ON DUPLICATE KEY UPDATE - name = VALUES(name), - description = VALUES(description), - query = VALUES(query), - author_id = VALUES(author_id), - saved = VALUES(saved), - observer_can_run = VALUES(observer_can_run), - team_id = VALUES(team_id), - team_id_char = VALUES(team_id_char), - platform = VALUES(platform), - min_osquery_version = VALUES(min_osquery_version), - schedule_interval = VALUES(schedule_interval), - automations_enabled = VALUES(automations_enabled), - logging_type = VALUES(logging_type), - discard_data = VALUES(discard_data) - ` - stmt, err := tx.PrepareContext(ctx, insertSql) - if err != nil { - return ctxerr.Wrap(ctx, err, "prepare queries insert") - } - defer stmt.Close() - - for _, q := range queries { - if err := q.Verify(); err != nil { - return ctxerr.Wrap(ctx, err) - } - result, err := stmt.ExecContext( - ctx, - q.Name, - q.Description, - q.Query, - authorID, - q.ObserverCanRun, - q.TeamID, - q.TeamIDStr(), - q.Platform, - q.MinOsqueryVersion, - q.Interval, - q.AutomationsEnabled, - q.Logging, - q.DiscardData, - ) - if err != nil { - return ctxerr.Wrap(ctx, err, "exec queries insert") - } - - // Get the ID of the row, if it was a new query. - id, _ := result.LastInsertId() - // If the ID is 0, it was an update, so we need to get the ID. - if id == 0 { - var ( - rows *sql.Rows - err error + stmt, args, err := sqlx.In(insertSql, + q.Name, + q.Description, + q.Query, + authorID, + q.ObserverCanRun, + q.TeamID, + q.TeamIDStr(), + q.Platform, + q.MinOsqueryVersion, + q.Interval, + q.AutomationsEnabled, + q.Logging, + q.DiscardData, ) - // Get the query that was updated. - if q.TeamID == nil { - rows, err = tx.QueryContext(ctx, "SELECT id FROM queries WHERE name = ? AND team_id is NULL", q.Name) - } else { - rows, err = tx.QueryContext(ctx, "SELECT id FROM queries WHERE name = ? AND team_id = ?", q.Name, q.TeamID) - } if err != nil { - return ctxerr.Wrap(ctx, err, "select queries id") + return ctxerr.Wrap(ctx, err, "exec queries prepare") } - // Get the ID from the rows - if rows.Next() { - if err := rows.Scan(&id); err != nil { - return ctxerr.Wrap(ctx, err, "scan queries id") + + var result sql.Result + if result, err = tx.ExecContext(ctx, stmt, args...); err != nil { + return ctxerr.Wrap(ctx, err, "exec queries insert") + } + + // Get the ID of the row, if it was a new query. + id, _ := result.LastInsertId() + // If the ID is 0, it was an update, so we need to get the ID. + if id == 0 { + var ( + rows *sql.Rows + err error + ) + // Get the query that was updated. + if q.TeamID == nil { + rows, err = tx.QueryContext(ctx, "SELECT id FROM queries WHERE name = ? AND team_id is NULL", q.Name) + } else { + rows, err = tx.QueryContext(ctx, "SELECT id FROM queries WHERE name = ? AND team_id = ?", q.Name, q.TeamID) + } + if err != nil { + return ctxerr.Wrap(ctx, err, "select queries id") + } + // Get the ID from the rows + if rows.Next() { + if err := rows.Scan(&id); err != nil { + return ctxerr.Wrap(ctx, err, "scan queries id") + } + } else { + return ctxerr.Wrap(ctx, err, "could not find query after update") + } + if err = rows.Err(); err != nil { + return ctxerr.Wrap(ctx, err, "err queries id") + } + if err := rows.Close(); err != nil { + return ctxerr.Wrap(ctx, err, "close queries id") } - } else { - return ctxerr.Wrap(ctx, err, "could not find query after update") - } - if err = rows.Err(); err != nil { - return ctxerr.Wrap(ctx, err, "err queries id") - } - if err := rows.Close(); err != nil { - return ctxerr.Wrap(ctx, err, "close queries id") - } - } - //nolint:gosec // dismiss G115 - q.ID = uint(id) + } + //nolint:gosec // dismiss G115 + q.ID = uint(id) - err = ds.updateQueryLabelsInTx(ctx, q, tx) - if err != nil { - return ctxerr.Wrap(ctx, err, "exec queries update labels") + err = ds.updateQueryLabelsInTx(ctx, q, tx) + if err != nil { + return ctxerr.Wrap(ctx, err, "exec queries update labels") + } } + return nil + }) + if err != nil { + return ctxerr.Wrap(ctx, err, "apply queries in tx") } - - err = tx.Commit() - return ctxerr.Wrap(ctx, err, "commit queries tx") + return nil } func (ds *Datastore) deleteMultipleQueryResults(ctx context.Context, queryIDs []uint) (err error) { @@ -298,16 +282,23 @@ func (ds *Datastore) NewQuery( } func (ds *Datastore) updateQueryLabels(ctx context.Context, query *fleet.Query) error { - return ds.updateQueryLabelsInTx(ctx, query, nil) + err := ds.withRetryTxx(ctx, func(tx sqlx.ExtContext) error { + return ds.updateQueryLabelsInTx(ctx, query, tx) + }) + if err != nil { + return ctxerr.Wrap(ctx, err, "updating query labels") + } + return nil } // updates the LabelsIncludeAny for a query, using the string value of // the label. Labels IDs are populated -func (ds *Datastore) updateQueryLabelsInTx(ctx context.Context, query *fleet.Query, txToUse *sqlx.Tx) error { - var ( - tx *sqlx.Tx - err error - ) +func (ds *Datastore) updateQueryLabelsInTx(ctx context.Context, query *fleet.Query, tx sqlx.ExtContext) error { + if tx == nil { + return ctxerr.New(ctx, "updateQueryLabelsInTx called with nil tx") + } + + var err error insertLabelSql := ` INSERT INTO query_labels ( @@ -324,33 +315,6 @@ func (ds *Datastore) updateQueryLabelsInTx(ctx context.Context, query *fleet.Que WHERE query_id = ? ` - // If we aren't given a transaction, start our own. - if txToUse == nil { - tx, err = ds.writer(ctx).BeginTxx(ctx, nil) - if err != nil { - return ctxerr.Wrap(ctx, err, "begin updateQueryLabelsInTx") - } - // Handle errors by attempting to roll back. - defer func() { - if err != nil { - rbErr := tx.Rollback() - // Handle error in rollback. - if rbErr != nil && rbErr != sql.ErrTxDone { - panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err)) - } - } - }() - // When we're done updating labels, commit our transaction. - defer func() { - if err != nil { - return - } - err = tx.Commit() - }() - } else { - tx = txToUse - } - _, err = tx.ExecContext(ctx, deleteLabelStmt, query.ID) if err != nil { return ctxerr.Wrap(ctx, err, "removing old query labels")