mirror of
https://github.com/fleetdm/fleet
synced 2026-05-22 16:39:01 +00:00
Missed locking caused a race condition detected with the --race flag:
```
==================
WARNING: DATA RACE
Read at 0x00c0004b2cf0 by goroutine 67:
runtime.mapaccess2_fast64()
/usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:52 +0x0
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0x84
github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery()
/Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1499 +0x61c
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
Previous write at 0x00c0004b2cf0 by goroutine 104:
runtime.mapassign_fast64()
/usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:92 +0x0
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1a4
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError.func1()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1493 +0x117
Goroutine 67 (running) created at:
testing.(*T).Run()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7
testing.runTests.func1()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
testing.runTests()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612
testing.(*M).Run()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3
main.main()
_testmain.go:303 +0x236
Goroutine 104 (running) created at:
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1492 +0x558
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
==================
==================
WARNING: DATA RACE
Read at 0x00c0000ff2d8 by goroutine 67:
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0xa4
github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery()
/Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1499 +0x61c
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
Previous write at 0x00c0000ff2d8 by goroutine 104:
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1bc
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError.func1()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1493 +0x117
Goroutine 67 (running) created at:
testing.(*T).Run()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7
testing.runTests.func1()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
testing.runTests()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612
testing.(*M).Run()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3
main.main()
_testmain.go:303 +0x236
Goroutine 104 (running) created at:
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQueryRecordCompletionError()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1492 +0x558
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
==================
--- FAIL: TestIngestDistributedQueryRecordCompletionError (0.01s)
service_osquery_test.go:1502: PASS: QueryCompletedByHost(string,uint)
testing.go:1092: race detected during execution of test
==================
WARNING: DATA RACE
Read at 0x00c0000f8570 by goroutine 70:
runtime.mapaccess2_fast64()
/usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:52 +0x0
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0x84
github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery()
/Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1530 +0x5bc
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
Previous write at 0x00c0000f8570 by goroutine 71:
runtime.mapassign_fast64()
/usr/local/Cellar/go/1.16.5/libexec/src/runtime/map_fast64.go:92 +0x0
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1a4
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery.func1()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1524 +0x117
Goroutine 70 (running) created at:
testing.(*T).Run()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7
testing.runTests.func1()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
testing.runTests()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612
testing.(*M).Run()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3
main.main()
_testmain.go:303 +0x236
Goroutine 71 (running) created at:
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1523 +0x4f4
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
==================
==================
WARNING: DATA RACE
Read at 0x00c000c480d8 by goroutine 70:
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).WriteResult()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:37 +0xa4
github.com/fleetdm/fleet/v4/server/service.(*Service).ingestDistributedQuery()
/Users/zwass/dev/fleet/server/service/service_osquery.go:1020 +0x258
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1530 +0x5bc
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
Previous write at 0x00c000c480d8 by goroutine 71:
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).getChannel()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:31 +0x1bc
github.com/fleetdm/fleet/v4/server/pubsub.(*inmemQueryResults).ReadChannel()
/Users/zwass/dev/fleet/server/pubsub/inmem_query_results.go:53 +0x64
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery.func1()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1524 +0x117
Goroutine 70 (running) created at:
testing.(*T).Run()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1238 +0x5d7
testing.runTests.func1()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1511 +0xa6
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
testing.runTests()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1509 +0x612
testing.(*M).Run()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1417 +0x3b3
main.main()
_testmain.go:303 +0x236
Goroutine 71 (running) created at:
github.com/fleetdm/fleet/v4/server/service.TestIngestDistributedQuery()
/Users/zwass/dev/fleet/server/service/service_osquery_test.go:1523 +0x4f4
testing.tRunner()
/usr/local/Cellar/go/1.16.5/libexec/src/testing/testing.go:1193 +0x202
==================
--- FAIL: TestIngestDistributedQuery (0.01s)
service_osquery_test.go:1532: PASS: QueryCompletedByHost(string,uint)
testing.go:1092: race detected during execution of test
FAIL
FAIL github.com/fleetdm/fleet/v4/server/service 42.743s
```
63 lines
1.4 KiB
Go
63 lines
1.4 KiB
Go
package pubsub
|
|
|
|
import (
|
|
"context"
|
|
"strconv"
|
|
"sync"
|
|
|
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
|
)
|
|
|
|
type inmemQueryResults struct {
|
|
resultChannels map[uint]chan interface{}
|
|
channelMutex sync.Mutex
|
|
}
|
|
|
|
var _ fleet.QueryResultStore = &inmemQueryResults{}
|
|
|
|
// NewInmemQueryResults initializes a new in-memory implementation of the
|
|
// QueryResultStore interface.
|
|
func NewInmemQueryResults() *inmemQueryResults {
|
|
return &inmemQueryResults{resultChannels: map[uint]chan interface{}{}}
|
|
}
|
|
|
|
func (im *inmemQueryResults) getChannel(id uint) chan interface{} {
|
|
im.channelMutex.Lock()
|
|
defer im.channelMutex.Unlock()
|
|
|
|
channel, ok := im.resultChannels[id]
|
|
if !ok {
|
|
channel = make(chan interface{})
|
|
im.resultChannels[id] = channel
|
|
}
|
|
return channel
|
|
}
|
|
|
|
func (im *inmemQueryResults) WriteResult(result fleet.DistributedQueryResult) error {
|
|
channel := im.getChannel(result.DistributedQueryCampaignID)
|
|
|
|
select {
|
|
case channel <- result:
|
|
// intentionally do nothing
|
|
default:
|
|
return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))}
|
|
}
|
|
|
|
return nil
|
|
}
|
|
|
|
func (im *inmemQueryResults) ReadChannel(ctx context.Context, campaign fleet.DistributedQueryCampaign) (<-chan interface{}, error) {
|
|
channel := im.getChannel(campaign.ID)
|
|
go func() {
|
|
<-ctx.Done()
|
|
close(channel)
|
|
im.channelMutex.Lock()
|
|
delete(im.resultChannels, campaign.ID)
|
|
im.channelMutex.Unlock()
|
|
}()
|
|
return channel, nil
|
|
}
|
|
|
|
func (im *inmemQueryResults) HealthCheck() error {
|
|
return nil
|
|
}
|