Fix deadlock when replacing (upserting) host_batteries (#15447)

#14779

This PR fixes the deadlock when upserting to `host_batteries`.
Which probably happens because InnoDB uses row-locking.

I was able to reproduce in main with the new test
`TestHosts/ReplaceHostBatteriesDeadlock`.
I refactored `ds.ReplaceHostBatteries` to use the same upsert pattern as
`ds.ReplaceHostDeviceMapping` (given `battery` is assumed to return just
a few rows per host). With such pattern the tests does not fail with
deadlock errors anymore.

Here are some of the techniques MySQL recommends:
https://dev.mysql.com/doc/refman/5.7/en/innodb-deadlocks-handling.html
Basically by changing the upsert pattern the deadlock goes away (It's
hard to know exactly why the original code deadlocks).

Here's the deadlock trace from load test performed in October:
```
2023-10-26T17:19:17.244707Z 0 [Note] [MY-012468] [InnoDB] Transactions deadlock detected, dumping detailed information. (lock0lock.cc:6482)
2023-10-26T17:19:17.244756Z 0 [Note] [MY-012469] [InnoDB]  *** (1) TRANSACTION:  (lock0lock.cc:6496)
TRANSACTION 3069771944, ACTIVE 0 sec inserting
mysql tables in use 1, locked 1
LOCK WAIT 7 lock struct(s), heap size 1136, 5 row lock(s), undo log entries 1
MySQL thread id 75, OS thread handle 70369297350384, query id 658 10.12.3.201 fleet update
INSERT INTO
      host_batteries (
        host_id,
        serial_number,
        cycle_count,
        health
      )
    VALUES
      (27472, '0000', 505, 'Good'),(27472, '0001', 730, 'Good')
    ON DUPLICATE KEY UPDATE
      cycle_count = VALUES(cycle_count),
      health = VALUES(health),
      updated_at = CURRENT_TIMESTAMP
2023-10-26T17:19:17.244800Z 0 [Note] [MY-012469] [InnoDB]  *** (1) HOLDS THE LOCK(S):  (lock0lock.cc:6496)
RECORD LOCKS space id 867 page no 320 n bits 280 index PRIMARY of table `fleet`.`host_batteries` trx id 3069771944 lock_mode X locks gap before rec
Record lock, heap no 205 PHYSICAL RECORD: n_fields 9; compact format; info bits 0
 0: len 4; hex 00526996; asc  Ri ;;
 1: len 6; hex 0000b6f900d0; asc       ;;
 2: len 7; hex 82000033370110; asc    37  ;;
 3: len 4; hex 0000d829; asc    );;
 4: len 4; hex 30303030; asc 0000;;
 5: len 4; hex 8000065b; asc    [;;
 6: len 4; hex 506f6f72; asc Poor;;
 7: len 4; hex 653a9f95; asc e:  ;;
 8: len 4; hex 653a9f95; asc e:  ;;

2023-10-26T17:19:17.245027Z 0 [Note] [MY-012469] [InnoDB]  *** (1) WAITING FOR THIS LOCK TO BE GRANTED:  (lock0lock.cc:6496)
RECORD LOCKS space id 867 page no 320 n bits 280 index PRIMARY of table `fleet`.`host_batteries` trx id 3069771944 lock_mode X locks gap before rec insert intention waiting
Record lock, heap no 205 PHYSICAL RECORD: n_fields 9; compact format; info bits 0
 0: len 4; hex 00526996; asc  Ri ;;
 1: len 6; hex 0000b6f900d0; asc       ;;
 2: len 7; hex 82000033370110; asc    37  ;;
 3: len 4; hex 0000d829; asc    );;
 4: len 4; hex 30303030; asc 0000;;
 5: len 4; hex 8000065b; asc    [;;
 6: len 4; hex 506f6f72; asc Poor;;
 7: len 4; hex 653a9f95; asc e:  ;;

2023-10-26T17:19:17.245239Z 0 [Note] [MY-012469] [InnoDB]  *** (2) TRANSACTION:  (lock0lock.cc:6496)
TRANSACTION 3069771958, ACTIVE 0 sec inserting
mysql tables in use 1, locked 1
LOCK WAIT 7 lock struct(s), heap size 1136, 5 row lock(s), undo log entries 1
MySQL thread id 9, OS thread handle 70369296809712, query id 708 10.12.2.156 fleet update
INSERT INTO
      host_batteries (
        host_id,
        serial_number,
        cycle_count,
        health
      )
    VALUES
      (59161, '0000', 1384, 'Fair'),(59161, '0001', 396, 'Good')
    ON DUPLICATE KEY UPDATE
      cycle_count = VALUES(cycle_count),
      health = VALUES(health),
      updated_at = CURRENT_TIMESTAMP
2023-10-26T17:19:17.245272Z 0 [Note] [MY-012469] [InnoDB]  *** (2) HOLDS THE LOCK(S):  (lock0lock.cc:6496)
RECORD LOCKS space id 867 page no 320 n bits 280 index PRIMARY of table `fleet`.`host_batteries` trx id 3069771958 lock_mode X locks gap before rec
Record lock, heap no 205 PHYSICAL RECORD: n_fields 9; compact format; info bits 0
 0: len 4; hex 00526996; asc  Ri ;;
 1: len 6; hex 0000b6f900d0; asc       ;;
 2: len 7; hex 82000033370110; asc    37  ;;
 3: len 4; hex 0000d829; asc    );;
 4: len 4; hex 30303030; asc 0000;;
 5: len 4; hex 8000065b; asc    [;;
 6: len 4; hex 506f6f72; asc Poor;;
 7: len 4; hex 653a9f95; asc e:  ;;
 8: len 4; hex 653a9f95; asc e:  ;;

2023-10-26T17:19:17.245504Z 0 [Note] [MY-012469] [InnoDB]  *** (2) WAITING FOR THIS LOCK TO BE GRANTED:  (lock0lock.cc:6496)
RECORD LOCKS space id 867 page no 320 n bits 280 index PRIMARY of table `fleet`.`host_batteries` trx id 3069771958 lock_mode X locks gap before rec insert intention waiting
Record lock, heap no 205 PHYSICAL RECORD: n_fields 9; compact format; info bits 0
 0: len 4; hex 00526996; asc  Ri ;;
 1: len 6; hex 0000b6f900d0; asc       ;;
 2: len 7; hex 82000033370110; asc    37  ;;
 3: len 4; hex 0000d829; asc    );;
 4: len 4; hex 30303030; asc 0000;;
 5: len 4; hex 8000065b; asc    [;;
 6: len 4; hex 506f6f72; asc Poor;;
 7: len 4; hex 653a9f95; asc e:  ;;
 8: len 4; hex 653a9f95; asc e:  ;;

2023-10-26T17:19:17.245730Z 0 [Note] [MY-012469] [InnoDB] *** WE ROLL BACK TRANSACTION (2)  (lock0lock.cc:6496)
```

- [X] Changes file added for user-visible changes in `changes/` or
`orbit/changes/`.
See [Changes
files](https://fleetdm.com/docs/contributing/committing-changes#changes-files)
for more information.
- [X] Input data is properly validated, `SELECT *` is avoided, SQL
injection is prevented (using placeholders for values in statements)
- [X] Added/updated tests
- [X] Manual QA for all new/changed functionality
This commit is contained in:
Lucas Manuel Rodriguez 2023-12-05 18:24:58 -03:00 committed by GitHub
parent a59b609f6f
commit 57351011fa
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
4 changed files with 142 additions and 43 deletions

View file

@ -0,0 +1 @@
* Fix possible deadlocks when upserting to `host_batteries` (found during load test).

View file

@ -2912,65 +2912,95 @@ func (ds *Datastore) ReplaceHostDeviceMapping(ctx context.Context, hid uint, map
}
func (ds *Datastore) ReplaceHostBatteries(ctx context.Context, hid uint, mappings []*fleet.HostBattery) error {
for _, m := range mappings {
if hid != m.HostID {
return ctxerr.Errorf(ctx, "host batteries mapping are not all for the provided host id %d, found %d", hid, m.HostID)
}
}
// The following SQL statements assume a small number of batteries reported per host.
// This is using the same pattern as ReplaceHostDeviceMapping.
const (
replaceStmt = `
INSERT INTO
host_batteries (
selStmt = `
SELECT
id,
host_id,
serial_number,
cycle_count,
health
)
VALUES
%s
ON DUPLICATE KEY UPDATE
cycle_count = VALUES(cycle_count),
health = VALUES(health),
updated_at = CURRENT_TIMESTAMP
`
valuesPart = `(?, ?, ?, ?),`
cycle_count,
health
FROM
host_batteries
WHERE
host_id = ?`
deleteExceptStmt = `
DELETE FROM
host_batteries
WHERE
host_id = ? AND
serial_number NOT IN (?)
`
deleteAllStmt = `
DELETE FROM
host_batteries
WHERE
host_id = ?
`
delStmt = `
DELETE FROM
host_batteries
WHERE
id IN (?)`
insStmt = `
INSERT INTO
host_batteries (host_id, serial_number, cycle_count, health)
VALUES`
insPart = ` (?, ?, ?, ?),`
)
replaceArgs := make([]interface{}, 0, len(mappings)*4)
deleteNotIn := make([]string, 0, len(mappings))
for _, hb := range mappings {
deleteNotIn = append(deleteNotIn, hb.SerialNumber)
replaceArgs = append(replaceArgs, hid, hb.SerialNumber, hb.CycleCount, hb.Health)
keyFn := func(b *fleet.HostBattery) string {
return b.SerialNumber + ":" + fmt.Sprint(b.CycleCount) + ":" + b.Health
}
return ds.withRetryTxx(ctx, func(tx sqlx.ExtContext) error {
// first, insert the new batteries or update the existing ones
if len(replaceArgs) > 0 {
if _, err := tx.ExecContext(ctx, fmt.Sprintf(replaceStmt, strings.TrimSuffix(strings.Repeat(valuesPart, len(mappings)), ",")), replaceArgs...); err != nil {
return ctxerr.Wrap(ctx, err, "upsert host batteries")
// Index the mappings by serial_number, to quickly check which ones
// need to be deleted and inserted.
toIns := make(map[string]*fleet.HostBattery)
serials := make(map[string]struct{})
for _, m := range mappings {
if _, ok := serials[m.SerialNumber]; ok {
// Ignore multiple rows with the same serial number
// (e.g. in case of bugs in results reported by osquery).
continue
}
toIns[keyFn(m)] = m
serials[m.SerialNumber] = struct{}{}
}
var prevMappings []*fleet.HostBattery
if err := sqlx.SelectContext(ctx, tx, &prevMappings, selStmt, hid); err != nil {
return ctxerr.Wrap(ctx, err, "select previous host batteries")
}
var delIDs []uint
for _, pm := range prevMappings {
key := keyFn(pm)
if _, ok := toIns[key]; ok {
// already exists, no need to insert
delete(toIns, key)
} else {
// does not exist anymore, must be deleted
delIDs = append(delIDs, pm.ID)
}
}
// then, delete the old ones
if len(deleteNotIn) > 0 {
delStmt, args, err := sqlx.In(deleteExceptStmt, hid, deleteNotIn)
if len(delIDs) > 0 {
stmt, args, err := sqlx.In(delStmt, delIDs)
if err != nil {
return ctxerr.Wrap(ctx, err, "generating host batteries delete NOT IN statement")
return ctxerr.Wrap(ctx, err, "prepare delete statement")
}
if _, err := tx.ExecContext(ctx, delStmt, args...); err != nil {
if _, err := tx.ExecContext(ctx, stmt, args...); err != nil {
return ctxerr.Wrap(ctx, err, "delete host batteries")
}
} else if _, err := tx.ExecContext(ctx, deleteAllStmt, hid); err != nil {
return ctxerr.Wrap(ctx, err, "delete all host batteries")
}
if len(toIns) > 0 {
var args []interface{}
for _, m := range toIns {
args = append(args, hid, m.SerialNumber, m.CycleCount, m.Health)
}
stmt := insStmt + strings.TrimSuffix(strings.Repeat(insPart, len(toIns)), ",")
if _, err := tx.ExecContext(ctx, stmt, args...); err != nil {
return ctxerr.Wrap(ctx, err, "insert host batteries")
}
}
return nil
})

View file

@ -29,6 +29,7 @@ import (
"github.com/micromdm/nanodep/godep"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
"golang.org/x/sync/errgroup"
)
var expLastExec = func() time.Time {
@ -134,6 +135,7 @@ func TestHosts(t *testing.T) {
{"DeleteHosts", testHostsDeleteHosts},
{"HostIDsByOSVersion", testHostIDsByOSVersion},
{"ReplaceHostBatteries", testHostsReplaceHostBatteries},
{"ReplaceHostBatteriesDeadlock", testHostsReplaceHostBatteriesDeadlock},
{"CountHostsNotResponding", testCountHostsNotResponding},
{"FailingPoliciesCount", testFailingPoliciesCount},
{"HostRecordNoPolicies", testHostsRecordNoPolicies},
@ -6000,6 +6002,31 @@ func testHostsReplaceHostBatteries(t *testing.T, ds *Datastore) {
require.NoError(t, err)
require.ElementsMatch(t, h1Bat, bat1)
type timestamp struct {
CreatedAt time.Time `db:"created_at"`
UpdatedAt time.Time `db:"updated_at"`
}
var timestamps1 []timestamp
ExecAdhocSQL(t, ds, func(q sqlx.ExtContext) error {
return sqlx.SelectContext(ctx, q, &timestamps1, `SELECT created_at, updated_at FROM host_batteries WHERE host_id = ?`, h1.ID)
})
// Insert the same battery data again.
err = ds.ReplaceHostBatteries(ctx, h1.ID, h1Bat)
require.NoError(t, err)
var timestamps2 []timestamp
ExecAdhocSQL(t, ds, func(q sqlx.ExtContext) error {
return sqlx.SelectContext(ctx, q, &timestamps2, `SELECT created_at, updated_at FROM host_batteries WHERE host_id = ?`, h1.ID)
})
// Verify that there were no inserts/updates (because reported data hasn't changed).
require.ElementsMatch(t, timestamps1, timestamps2)
bat1, err = ds.ListHostBatteries(ctx, h1.ID)
require.NoError(t, err)
require.ElementsMatch(t, h1Bat, bat1)
bat2, err := ds.ListHostBatteries(ctx, h2.ID)
require.NoError(t, err)
require.Len(t, bat2, 0)
@ -6045,6 +6072,46 @@ func testHostsReplaceHostBatteries(t *testing.T, ds *Datastore) {
require.ElementsMatch(t, h2Bat, bat2)
}
func testHostsReplaceHostBatteriesDeadlock(t *testing.T, ds *Datastore) {
ctx := context.Background()
var hosts []*fleet.Host
for i := 1; i <= 100; i++ {
h, err := ds.NewHost(ctx, &fleet.Host{
ID: uint(i),
OsqueryHostID: ptr.String(fmt.Sprintf("id-%d", i)),
NodeKey: ptr.String(fmt.Sprintf("key-%d", i)),
Platform: "linux",
Hostname: fmt.Sprintf("host-%d", i),
DetailUpdatedAt: time.Now(),
LabelUpdatedAt: time.Now(),
PolicyUpdatedAt: time.Now(),
SeenTime: time.Now(),
})
require.NoError(t, err)
hosts = append(hosts, h)
}
var g errgroup.Group
for _, h := range hosts {
hostID := h.ID
g.Go(func() error {
for i := 0; i < 100; i++ {
if err := ds.ReplaceHostBatteries(ctx, hostID, []*fleet.HostBattery{
{HostID: hostID, SerialNumber: fmt.Sprintf("%d-0000", hostID), CycleCount: 1, Health: "Good"},
{HostID: hostID, SerialNumber: fmt.Sprintf("%d-0000", hostID), CycleCount: 2, Health: "Fair"},
}); err != nil {
return err
}
time.Sleep(10 * time.Millisecond)
}
return nil
})
}
err := g.Wait()
require.NoError(t, err)
}
func testCountHostsNotResponding(t *testing.T, ds *Datastore) {
ctx := context.Background()
config := config.FleetConfig{Osquery: config.OsqueryConfig{DetailUpdateInterval: 1 * time.Hour}}

View file

@ -982,6 +982,7 @@ func (h *HostMDM) UnmarshalJSON(b []byte) error {
// HostBattery represents a host's battery, as reported by the osquery battery
// table.
type HostBattery struct {
ID uint `json:"-" db:"id"`
HostID uint `json:"-" db:"host_id"`
SerialNumber string `json:"-" db:"serial_number"`
CycleCount int `json:"cycle_count" db:"cycle_count"`