Close orphaned distributed query campaign after failed publish attempt (#707)

A distributed query campaign can be "orphaned" (left in the QueryRunning state)
if the Kolide server restarts while it is running, or other weirdness occurs.
When this happens, no subscribers are waiting to read results written by
osqueryd agents, but the agents continue to receive the query. Previously, this
would cause us to error on ingestion.

The new behavior will instead set the campaign to completed when it detects
that it is orphaned. This should prevent sending queries for which there is no
subscriber.

- New NoSubscriber error interface in pubsub
- Detect NoSubscriber errors and close campaigns
- Tests on pubsub and service methods

Fixes #695
This commit is contained in:
Zachary Wasserman 2016-12-27 10:35:19 -05:00 committed by GitHub
parent 6f90e51bc7
commit 39ebd81dc5
6 changed files with 111 additions and 15 deletions

View file

@ -1,2 +1,24 @@
// Package pubsub implements pub/sub interfaces defined in package kolide.
package pubsub
// Error defines the interface of errors specific to the pubsub package
type Error interface {
error
// NoSubscriber returns true if the error occurred because there are no
// subscribers on the channel
NoSubscriber() bool
}
// NoSubscriberError can be returned when channel operations fail because there
// are no subscribers. Its NoSubscriber() method always returns true.
type noSubscriberError struct {
Channel string
}
func (e noSubscriberError) Error() string {
return "no subscriber for channel " + e.Channel
}
func (e noSubscriberError) NoSubscriber() bool {
return true
}

View file

@ -1,7 +1,7 @@
package pubsub
import (
"errors"
"strconv"
"sync"
"golang.org/x/net/context"
@ -37,14 +37,14 @@ func (im *inmemQueryResults) getChannel(id uint) chan interface{} {
func (im *inmemQueryResults) WriteResult(result kolide.DistributedQueryResult) error {
channel, ok := im.resultChannels[result.DistributedQueryCampaignID]
if !ok {
return errors.New("no subscribers for channel")
return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))}
}
select {
case channel <- result:
// intentionally do nothing
default:
return errors.New("no subscribers for channel")
return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))}
}
return nil

View file

@ -91,10 +91,10 @@ func setupRedis(t *testing.T) (store *redisQueryResults, teardown func()) {
}
func testQueryResultsStoreErrors(t *testing.T, store kolide.QueryResultStore) {
// Test handling results for two campaigns in parallel
// Write with no subscriber
err := store.WriteResult(
kolide.DistributedQueryResult{
DistributedQueryCampaignID: 1,
DistributedQueryCampaignID: 9999,
Rows: []map[string]string{{"bing": "fds"}},
Host: kolide.Host{
ID: 4,
@ -108,6 +108,10 @@ func testQueryResultsStoreErrors(t *testing.T, store kolide.QueryResultStore) {
},
)
assert.NotNil(t, err)
castErr, ok := err.(Error)
if assert.True(t, ok, "err should be pubsub.Error") {
assert.True(t, castErr.NoSubscriber(), "NoSubscriber() should be true")
}
}
func testQueryResultsStore(t *testing.T, store kolide.QueryResultStore) {

View file

@ -2,7 +2,6 @@ package pubsub
import (
"encoding/json"
"errors"
"fmt"
"time"
@ -10,6 +9,7 @@ import (
"github.com/garyburd/redigo/redis"
"github.com/kolide/kolide-ose/server/kolide"
"github.com/pkg/errors"
)
type redisQueryResults struct {
@ -66,15 +66,15 @@ func (r *redisQueryResults) WriteResult(result kolide.DistributedQueryResult) er
jsonVal, err := json.Marshal(&result)
if err != nil {
return errors.New("error marshalling JSON for writing result: " + err.Error())
return errors.Wrap(err, "marshalling JSON for result")
}
n, err := redis.Int(conn.Do("PUBLISH", channelName, string(jsonVal)))
if err != nil {
return fmt.Errorf("PUBLISH failed to channel %s: %s", channelName, err.Error())
return errors.Wrap(err, "PUBLISH failed to channel "+channelName)
}
if n == 0 {
return fmt.Errorf("no subscribers for channel %s", channelName)
return noSubscriberError{channelName}
}
return nil
@ -142,7 +142,7 @@ func (r *redisQueryResults) ReadChannel(ctx context.Context, query kolide.Distri
}
outChannel <- res
case error:
outChannel <- msg
outChannel <- errors.Wrap(msg, "reading from redis")
}
case <-ctx.Done():

View file

@ -11,6 +11,7 @@ import (
hostctx "github.com/kolide/kolide-ose/server/contexts/host"
"github.com/kolide/kolide-ose/server/errors"
"github.com/kolide/kolide-ose/server/kolide"
"github.com/kolide/kolide-ose/server/pubsub"
"golang.org/x/net/context"
)
@ -420,7 +421,23 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []
err = svc.resultStore.WriteResult(res)
if err != nil {
return osqueryError{message: "writing results: " + err.Error()}
nErr, ok := err.(pubsub.Error)
if !ok || !nErr.NoSubscriber() {
return osqueryError{message: "writing results: " + err.Error()}
}
// If there are no subscribers, the campaign is "orphaned"
// and should be closed so that we don't continue trying to
// execute that query when we can't write to any subscriber
campaign, err := svc.ds.DistributedQueryCampaign(uint(campaignID))
if err != nil {
return osqueryError{message: "loading orphaned campaign: " + err.Error()}
}
campaign.Status = kolide.QueryComplete
if err := svc.ds.SaveDistributedQueryCampaign(campaign); err != nil {
return osqueryError{message: "closing orphaned campaign: " + err.Error()}
}
}
// Record execution of the query

View file

@ -665,10 +665,6 @@ func TestDistributedQueries(t *testing.T) {
queryKey: expectedRows,
}
// Submit results (should error because no one is listening)
err = svc.SubmitDistributedQueryResults(ctx, results)
assert.NotNil(t, err)
// TODO use service method
readChan, err := rs.ReadChannel(ctx, *campaign)
require.Nil(t, err)
@ -716,3 +712,60 @@ func TestDistributedQueries(t *testing.T) {
waitComplete.Wait()
}
func TestOrphanedQueryCampaign(t *testing.T) {
ds, err := inmem.New(config.TestConfig())
require.Nil(t, err)
rs := pubsub.NewInmemQueryResults()
svc, err := newTestService(ds, rs)
require.Nil(t, err)
ctx := context.Background()
nodeKey, err := svc.EnrollAgent(ctx, "", "host123")
require.Nil(t, err)
host, err := ds.AuthenticateHost(nodeKey)
require.Nil(t, err)
ctx = viewer.NewContext(context.Background(), viewer.Viewer{
User: &kolide.User{
ID: 0,
},
})
q := "select year, month, day, hour, minutes, seconds from time"
campaign, err := svc.NewDistributedQueryCampaign(ctx, q, []uint{}, []uint{})
require.Nil(t, err)
campaign.Status = kolide.QueryRunning
err = ds.SaveDistributedQueryCampaign(campaign)
require.Nil(t, err)
queryKey := fmt.Sprintf("%s%d", hostDistributedQueryPrefix, campaign.ID)
expectedRows := []map[string]string{
{
"year": "2016",
"month": "11",
"day": "11",
"hour": "6",
"minutes": "12",
"seconds": "10",
},
}
results := map[string][]map[string]string{
queryKey: expectedRows,
}
// Submit results
ctx = hostctx.NewContext(context.Background(), *host)
err = svc.SubmitDistributedQueryResults(ctx, results)
require.Nil(t, err)
// The campaign should be set to completed because it is orphaned
campaign, err = ds.DistributedQueryCampaign(campaign.ID)
require.Nil(t, err)
assert.Equal(t, kolide.QueryComplete, campaign.Status)
}