fleet/server/pubsub/inmem_query_results.go
Zachary Wasserman e8e4bc9879
Fix cleanup of orphaned queries (#2316)
The expiration logic was incorrect leading to queries not being cleaned
up properly. Tests added for the whole subroutine.

Fixes #2302
2020-10-06 09:30:24 -07:00

66 lines
1.5 KiB
Go

package pubsub
import (
"context"
"strconv"
"sync"
"github.com/kolide/fleet/server/kolide"
)
type inmemQueryResults struct {
resultChannels map[uint]chan interface{}
channelMutex sync.Mutex
}
var _ kolide.QueryResultStore = &inmemQueryResults{}
// NewInmemQueryResults initializes a new in-memory implementation of the
// QueryResultStore interface.
func NewInmemQueryResults() *inmemQueryResults {
return &inmemQueryResults{resultChannels: map[uint]chan interface{}{}}
}
func (im *inmemQueryResults) getChannel(id uint) chan interface{} {
im.channelMutex.Lock()
defer im.channelMutex.Unlock()
channel, ok := im.resultChannels[id]
if !ok {
channel = make(chan interface{})
im.resultChannels[id] = channel
}
return channel
}
func (im *inmemQueryResults) WriteResult(result kolide.DistributedQueryResult) error {
channel, ok := im.resultChannels[result.DistributedQueryCampaignID]
if !ok {
return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))}
}
select {
case channel <- result:
// intentionally do nothing
default:
return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))}
}
return nil
}
func (im *inmemQueryResults) ReadChannel(ctx context.Context, campaign kolide.DistributedQueryCampaign) (<-chan interface{}, error) {
channel := im.getChannel(campaign.ID)
go func() {
<-ctx.Done()
close(channel)
im.channelMutex.Lock()
delete(im.resultChannels, campaign.ID)
im.channelMutex.Unlock()
}()
return channel, nil
}
func (im *inmemQueryResults) HealthCheck() error {
return nil
}