Add retry when applying queries (#28951)

for #28642

> Note: this PR diff is easier to view [with whitespace
off](https://github.com/fleetdm/fleet/pull/28951/files?w=1).

## Details

This PR adds retry to the "Apply Queries" logic, in an attempt to
alleviate deadlock issues when applying queries via GitOps. The
`applyQueriesInTx` now uses `withRetryTxx` instead of starting a
transaction with `BeginTxx`. This requires some downstream updates to
`updateQueryLabels` and `updateQueryLabelsInTx`, see PR comments for
details.

This is a first (and hopefully only necessary) step to fixing the
deadlock issues. If needed, we have other steps we can take like
batching the query inserts and splitting the read/write in
saveHostPackStatsDB
(see
https://github.com/fleetdm/fleet/issues/28642#issuecomment-2845804689)

## Testing

I tested this manually using `fleetctl gitops` to apply queries with and
without labels. Existing automated tests for Apply Queries still pass.
This commit is contained in:
Scott Gress 2025-05-13 12:06:20 -05:00 committed by GitHub
parent 0e6149355f
commit ecb1a1caca
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 114 additions and 149 deletions

View file

@ -0,0 +1 @@
- Fixed issue where GitOps may fail to apply new queries due to deadlocks.

View file

@ -41,131 +41,115 @@ func (ds *Datastore) ApplyQueries(ctx context.Context, authorID uint, queries []
}
func (ds *Datastore) applyQueriesInTx(ctx context.Context, authorID uint, queries []*fleet.Query) (err error) {
tx, err := ds.writer(ctx).BeginTxx(ctx, nil)
if err != nil {
return ctxerr.Wrap(ctx, err, "begin applyQueriesInTx")
}
defer func() {
if err != nil {
rbErr := tx.Rollback()
// It seems possible that there might be a case in
// which the error we are dealing with here was thrown
// by the call to tx.Commit(), and the docs suggest
// this call would then result in sql.ErrTxDone.
if rbErr != nil && rbErr != sql.ErrTxDone {
panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err))
err = ds.withRetryTxx(ctx, func(tx sqlx.ExtContext) error {
insertSql := `
INSERT INTO queries (
name,
description,
query,
author_id,
saved,
observer_can_run,
team_id,
team_id_char,
platform,
min_osquery_version,
schedule_interval,
automations_enabled,
logging_type,
discard_data
) VALUES ( ?, ?, ?, ?, true, ?, ?, ?, ?, ?, ?, ?, ?, ? )
ON DUPLICATE KEY UPDATE
name = VALUES(name),
description = VALUES(description),
query = VALUES(query),
author_id = VALUES(author_id),
saved = VALUES(saved),
observer_can_run = VALUES(observer_can_run),
team_id = VALUES(team_id),
team_id_char = VALUES(team_id_char),
platform = VALUES(platform),
min_osquery_version = VALUES(min_osquery_version),
schedule_interval = VALUES(schedule_interval),
automations_enabled = VALUES(automations_enabled),
logging_type = VALUES(logging_type),
discard_data = VALUES(discard_data)
`
for _, q := range queries {
if err := q.Verify(); err != nil {
return ctxerr.Wrap(ctx, err)
}
}
}()
insertSql := `
INSERT INTO queries (
name,
description,
query,
author_id,
saved,
observer_can_run,
team_id,
team_id_char,
platform,
min_osquery_version,
schedule_interval,
automations_enabled,
logging_type,
discard_data
) VALUES ( ?, ?, ?, ?, true, ?, ?, ?, ?, ?, ?, ?, ?, ? )
ON DUPLICATE KEY UPDATE
name = VALUES(name),
description = VALUES(description),
query = VALUES(query),
author_id = VALUES(author_id),
saved = VALUES(saved),
observer_can_run = VALUES(observer_can_run),
team_id = VALUES(team_id),
team_id_char = VALUES(team_id_char),
platform = VALUES(platform),
min_osquery_version = VALUES(min_osquery_version),
schedule_interval = VALUES(schedule_interval),
automations_enabled = VALUES(automations_enabled),
logging_type = VALUES(logging_type),
discard_data = VALUES(discard_data)
`
stmt, err := tx.PrepareContext(ctx, insertSql)
if err != nil {
return ctxerr.Wrap(ctx, err, "prepare queries insert")
}
defer stmt.Close()
for _, q := range queries {
if err := q.Verify(); err != nil {
return ctxerr.Wrap(ctx, err)
}
result, err := stmt.ExecContext(
ctx,
q.Name,
q.Description,
q.Query,
authorID,
q.ObserverCanRun,
q.TeamID,
q.TeamIDStr(),
q.Platform,
q.MinOsqueryVersion,
q.Interval,
q.AutomationsEnabled,
q.Logging,
q.DiscardData,
)
if err != nil {
return ctxerr.Wrap(ctx, err, "exec queries insert")
}
// Get the ID of the row, if it was a new query.
id, _ := result.LastInsertId()
// If the ID is 0, it was an update, so we need to get the ID.
if id == 0 {
var (
rows *sql.Rows
err error
stmt, args, err := sqlx.In(insertSql,
q.Name,
q.Description,
q.Query,
authorID,
q.ObserverCanRun,
q.TeamID,
q.TeamIDStr(),
q.Platform,
q.MinOsqueryVersion,
q.Interval,
q.AutomationsEnabled,
q.Logging,
q.DiscardData,
)
// Get the query that was updated.
if q.TeamID == nil {
rows, err = tx.QueryContext(ctx, "SELECT id FROM queries WHERE name = ? AND team_id is NULL", q.Name)
} else {
rows, err = tx.QueryContext(ctx, "SELECT id FROM queries WHERE name = ? AND team_id = ?", q.Name, q.TeamID)
}
if err != nil {
return ctxerr.Wrap(ctx, err, "select queries id")
return ctxerr.Wrap(ctx, err, "exec queries prepare")
}
// Get the ID from the rows
if rows.Next() {
if err := rows.Scan(&id); err != nil {
return ctxerr.Wrap(ctx, err, "scan queries id")
var result sql.Result
if result, err = tx.ExecContext(ctx, stmt, args...); err != nil {
return ctxerr.Wrap(ctx, err, "exec queries insert")
}
// Get the ID of the row, if it was a new query.
id, _ := result.LastInsertId()
// If the ID is 0, it was an update, so we need to get the ID.
if id == 0 {
var (
rows *sql.Rows
err error
)
// Get the query that was updated.
if q.TeamID == nil {
rows, err = tx.QueryContext(ctx, "SELECT id FROM queries WHERE name = ? AND team_id is NULL", q.Name)
} else {
rows, err = tx.QueryContext(ctx, "SELECT id FROM queries WHERE name = ? AND team_id = ?", q.Name, q.TeamID)
}
if err != nil {
return ctxerr.Wrap(ctx, err, "select queries id")
}
// Get the ID from the rows
if rows.Next() {
if err := rows.Scan(&id); err != nil {
return ctxerr.Wrap(ctx, err, "scan queries id")
}
} else {
return ctxerr.Wrap(ctx, err, "could not find query after update")
}
if err = rows.Err(); err != nil {
return ctxerr.Wrap(ctx, err, "err queries id")
}
if err := rows.Close(); err != nil {
return ctxerr.Wrap(ctx, err, "close queries id")
}
} else {
return ctxerr.Wrap(ctx, err, "could not find query after update")
}
if err = rows.Err(); err != nil {
return ctxerr.Wrap(ctx, err, "err queries id")
}
if err := rows.Close(); err != nil {
return ctxerr.Wrap(ctx, err, "close queries id")
}
}
//nolint:gosec // dismiss G115
q.ID = uint(id)
}
//nolint:gosec // dismiss G115
q.ID = uint(id)
err = ds.updateQueryLabelsInTx(ctx, q, tx)
if err != nil {
return ctxerr.Wrap(ctx, err, "exec queries update labels")
err = ds.updateQueryLabelsInTx(ctx, q, tx)
if err != nil {
return ctxerr.Wrap(ctx, err, "exec queries update labels")
}
}
return nil
})
if err != nil {
return ctxerr.Wrap(ctx, err, "apply queries in tx")
}
err = tx.Commit()
return ctxerr.Wrap(ctx, err, "commit queries tx")
return nil
}
func (ds *Datastore) deleteMultipleQueryResults(ctx context.Context, queryIDs []uint) (err error) {
@ -298,16 +282,23 @@ func (ds *Datastore) NewQuery(
}
func (ds *Datastore) updateQueryLabels(ctx context.Context, query *fleet.Query) error {
return ds.updateQueryLabelsInTx(ctx, query, nil)
err := ds.withRetryTxx(ctx, func(tx sqlx.ExtContext) error {
return ds.updateQueryLabelsInTx(ctx, query, tx)
})
if err != nil {
return ctxerr.Wrap(ctx, err, "updating query labels")
}
return nil
}
// updates the LabelsIncludeAny for a query, using the string value of
// the label. Labels IDs are populated
func (ds *Datastore) updateQueryLabelsInTx(ctx context.Context, query *fleet.Query, txToUse *sqlx.Tx) error {
var (
tx *sqlx.Tx
err error
)
func (ds *Datastore) updateQueryLabelsInTx(ctx context.Context, query *fleet.Query, tx sqlx.ExtContext) error {
if tx == nil {
return ctxerr.New(ctx, "updateQueryLabelsInTx called with nil tx")
}
var err error
insertLabelSql := `
INSERT INTO query_labels (
@ -324,33 +315,6 @@ func (ds *Datastore) updateQueryLabelsInTx(ctx context.Context, query *fleet.Que
WHERE query_id = ?
`
// If we aren't given a transaction, start our own.
if txToUse == nil {
tx, err = ds.writer(ctx).BeginTxx(ctx, nil)
if err != nil {
return ctxerr.Wrap(ctx, err, "begin updateQueryLabelsInTx")
}
// Handle errors by attempting to roll back.
defer func() {
if err != nil {
rbErr := tx.Rollback()
// Handle error in rollback.
if rbErr != nil && rbErr != sql.ErrTxDone {
panic(fmt.Sprintf("got err '%s' rolling back after err '%s'", rbErr, err))
}
}
}()
// When we're done updating labels, commit our transaction.
defer func() {
if err != nil {
return
}
err = tx.Commit()
}()
} else {
tx = txToUse
}
_, err = tx.ExecContext(ctx, deleteLabelStmt, query.ID)
if err != nil {
return ctxerr.Wrap(ctx, err, "removing old query labels")