From d5a5f014efcbf386d44bdfd4907490dcd2e0ec12 Mon Sep 17 00:00:00 2001 From: Tomas Touceda Date: Tue, 7 Sep 2021 10:39:17 -0300 Subject: [PATCH] Add test to check that two hosts can store stats concurrently (#1929) --- server/datastore/mysql/hosts_test.go | 174 +++++++++++++++++++++++++++ 1 file changed, 174 insertions(+) diff --git a/server/datastore/mysql/hosts_test.go b/server/datastore/mysql/hosts_test.go index bff1ecf66d..f7b292ff5c 100644 --- a/server/datastore/mysql/hosts_test.go +++ b/server/datastore/mysql/hosts_test.go @@ -4,6 +4,7 @@ import ( "context" "encoding/json" "fmt" + "math/rand" "sort" "strconv" "strings" @@ -1583,3 +1584,176 @@ func TestSaveTonsOfUsers(t *testing.T) { fmt.Println("Count1", atomic.LoadInt32(&count1)) fmt.Println("Count2", atomic.LoadInt32(&count2)) } + +func TestSaveHostPackStatsConcurrent(t *testing.T) { + ds := CreateMySQLDS(t) + defer ds.Close() + + host1, err := ds.NewHost(&fleet.Host{ + DetailUpdatedAt: time.Now(), + LabelUpdatedAt: time.Now(), + SeenTime: time.Now(), + NodeKey: "1", + UUID: "1", + Hostname: "foo.local", + PrimaryIP: "192.168.1.1", + PrimaryMac: "30-65-EC-6F-C4-58", + OsqueryHostID: "1", + }) + require.NoError(t, err) + require.NotNil(t, host1) + + host2, err := ds.NewHost(&fleet.Host{ + DetailUpdatedAt: time.Now(), + LabelUpdatedAt: time.Now(), + SeenTime: time.Now(), + NodeKey: "2", + UUID: "2", + Hostname: "foo.local2", + PrimaryIP: "192.168.1.2", + PrimaryMac: "30-65-EC-6F-C4-58", + OsqueryHostID: "2", + }) + require.NoError(t, err) + require.NotNil(t, host2) + + pack1 := test.NewPack(t, ds, "test1") + query1 := test.NewQuery(t, ds, "time", "select * from time", 0, true) + squery1 := test.NewScheduledQuery(t, ds, pack1.ID, query1.ID, 30, true, true, "time-scheduled") + + pack2 := test.NewPack(t, ds, "test2") + query2 := test.NewQuery(t, ds, "time2", "select * from time", 0, true) + squery2 := test.NewScheduledQuery(t, ds, pack2.ID, query2.ID, 30, true, true, "time-scheduled") + + ctx, cancelFunc := context.WithCancel(context.Background()) + defer cancelFunc() + + saveHostRandomStats := func(host *fleet.Host) error { + host.PackStats = []fleet.PackStats{ + { + PackName: pack1.Name, + QueryStats: []fleet.ScheduledQueryStats{ + { + ScheduledQueryName: squery1.Name, + ScheduledQueryID: squery1.ID, + QueryName: query1.Name, + PackName: pack1.Name, + PackID: pack1.ID, + AverageMemory: 8000, + Denylisted: false, + Executions: rand.Intn(1000), + Interval: 30, + LastExecuted: time.Now().UTC(), + OutputSize: 1337, + SystemTime: 150, + UserTime: 180, + WallTime: 0, + }, + }, + }, + { + PackName: pack2.Name, + QueryStats: []fleet.ScheduledQueryStats{ + { + ScheduledQueryName: squery2.Name, + ScheduledQueryID: squery2.ID, + QueryName: query2.Name, + PackName: pack2.Name, + PackID: pack2.ID, + AverageMemory: 8000, + Denylisted: false, + Executions: rand.Intn(1000), + Interval: 30, + LastExecuted: time.Now().UTC(), + OutputSize: 1337, + SystemTime: 150, + UserTime: 180, + WallTime: 0, + }, + }, + }, + } + return ds.SaveHost(host1) + } + + errCh := make(chan error) + var counter int32 + total := int32(1000) + + loopAndSaveHost := func(host *fleet.Host) { + for { + err := saveHostRandomStats(host) + if err != nil { + errCh <- err + return + } + atomic.AddInt32(&counter, 1) + select { + case <-ctx.Done(): + return + default: + if atomic.LoadInt32(&counter) > total { + cancelFunc() + return + } + } + } + } + + go loopAndSaveHost(host1) + go loopAndSaveHost(host2) + + go func() { + for { + specs := []*fleet.PackSpec{ + { + Name: "test1", + Queries: []fleet.PackSpecQuery{ + { + QueryName: "time", + Interval: uint(rand.Intn(1000)), + }, + { + QueryName: "time2", + Interval: uint(rand.Intn(1000)), + }, + }, + }, + { + Name: "test2", + Queries: []fleet.PackSpecQuery{ + { + QueryName: "time", + Interval: uint(rand.Intn(1000)), + }, + { + QueryName: "time2", + Interval: uint(rand.Intn(1000)), + }, + }, + }, + } + err := ds.ApplyPackSpecs(specs) + if err != nil { + errCh <- err + return + } + + select { + case <-ctx.Done(): + return + default: + } + } + }() + + ticker := time.NewTicker(1 * time.Minute) + select { + case err := <-errCh: + require.NoError(t, err) + case <-ctx.Done(): + return + case <-ticker.C: + t.Fail() + } +}