From 39ebd81dc56917eff9908d41112f688207d0f89b Mon Sep 17 00:00:00 2001 From: Zachary Wasserman Date: Tue, 27 Dec 2016 10:35:19 -0500 Subject: [PATCH] Close orphaned distributed query campaign after failed publish attempt (#707) A distributed query campaign can be "orphaned" (left in the QueryRunning state) if the Kolide server restarts while it is running, or other weirdness occurs. When this happens, no subscribers are waiting to read results written by osqueryd agents, but the agents continue to receive the query. Previously, this would cause us to error on ingestion. The new behavior will instead set the campaign to completed when it detects that it is orphaned. This should prevent sending queries for which there is no subscriber. - New NoSubscriber error interface in pubsub - Detect NoSubscriber errors and close campaigns - Tests on pubsub and service methods Fixes #695 --- server/pubsub/doc.go | 22 ++++++++++ server/pubsub/inmem_query_results.go | 6 +-- server/pubsub/query_results_test.go | 8 +++- server/pubsub/redis_query_results.go | 10 ++--- server/service/service_osquery.go | 19 +++++++- server/service/service_osquery_test.go | 61 ++++++++++++++++++++++++-- 6 files changed, 111 insertions(+), 15 deletions(-) diff --git a/server/pubsub/doc.go b/server/pubsub/doc.go index edbbfac0db..73974910da 100644 --- a/server/pubsub/doc.go +++ b/server/pubsub/doc.go @@ -1,2 +1,24 @@ // Package pubsub implements pub/sub interfaces defined in package kolide. package pubsub + +// Error defines the interface of errors specific to the pubsub package +type Error interface { + error + // NoSubscriber returns true if the error occurred because there are no + // subscribers on the channel + NoSubscriber() bool +} + +// NoSubscriberError can be returned when channel operations fail because there +// are no subscribers. Its NoSubscriber() method always returns true. +type noSubscriberError struct { + Channel string +} + +func (e noSubscriberError) Error() string { + return "no subscriber for channel " + e.Channel +} + +func (e noSubscriberError) NoSubscriber() bool { + return true +} diff --git a/server/pubsub/inmem_query_results.go b/server/pubsub/inmem_query_results.go index 68dc3bfff7..59af988e7c 100644 --- a/server/pubsub/inmem_query_results.go +++ b/server/pubsub/inmem_query_results.go @@ -1,7 +1,7 @@ package pubsub import ( - "errors" + "strconv" "sync" "golang.org/x/net/context" @@ -37,14 +37,14 @@ func (im *inmemQueryResults) getChannel(id uint) chan interface{} { func (im *inmemQueryResults) WriteResult(result kolide.DistributedQueryResult) error { channel, ok := im.resultChannels[result.DistributedQueryCampaignID] if !ok { - return errors.New("no subscribers for channel") + return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))} } select { case channel <- result: // intentionally do nothing default: - return errors.New("no subscribers for channel") + return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))} } return nil diff --git a/server/pubsub/query_results_test.go b/server/pubsub/query_results_test.go index 84ee60edea..c4c6794c78 100644 --- a/server/pubsub/query_results_test.go +++ b/server/pubsub/query_results_test.go @@ -91,10 +91,10 @@ func setupRedis(t *testing.T) (store *redisQueryResults, teardown func()) { } func testQueryResultsStoreErrors(t *testing.T, store kolide.QueryResultStore) { - // Test handling results for two campaigns in parallel + // Write with no subscriber err := store.WriteResult( kolide.DistributedQueryResult{ - DistributedQueryCampaignID: 1, + DistributedQueryCampaignID: 9999, Rows: []map[string]string{{"bing": "fds"}}, Host: kolide.Host{ ID: 4, @@ -108,6 +108,10 @@ func testQueryResultsStoreErrors(t *testing.T, store kolide.QueryResultStore) { }, ) assert.NotNil(t, err) + castErr, ok := err.(Error) + if assert.True(t, ok, "err should be pubsub.Error") { + assert.True(t, castErr.NoSubscriber(), "NoSubscriber() should be true") + } } func testQueryResultsStore(t *testing.T, store kolide.QueryResultStore) { diff --git a/server/pubsub/redis_query_results.go b/server/pubsub/redis_query_results.go index f5b0cccfdb..b3018bed90 100644 --- a/server/pubsub/redis_query_results.go +++ b/server/pubsub/redis_query_results.go @@ -2,7 +2,6 @@ package pubsub import ( "encoding/json" - "errors" "fmt" "time" @@ -10,6 +9,7 @@ import ( "github.com/garyburd/redigo/redis" "github.com/kolide/kolide-ose/server/kolide" + "github.com/pkg/errors" ) type redisQueryResults struct { @@ -66,15 +66,15 @@ func (r *redisQueryResults) WriteResult(result kolide.DistributedQueryResult) er jsonVal, err := json.Marshal(&result) if err != nil { - return errors.New("error marshalling JSON for writing result: " + err.Error()) + return errors.Wrap(err, "marshalling JSON for result") } n, err := redis.Int(conn.Do("PUBLISH", channelName, string(jsonVal))) if err != nil { - return fmt.Errorf("PUBLISH failed to channel %s: %s", channelName, err.Error()) + return errors.Wrap(err, "PUBLISH failed to channel "+channelName) } if n == 0 { - return fmt.Errorf("no subscribers for channel %s", channelName) + return noSubscriberError{channelName} } return nil @@ -142,7 +142,7 @@ func (r *redisQueryResults) ReadChannel(ctx context.Context, query kolide.Distri } outChannel <- res case error: - outChannel <- msg + outChannel <- errors.Wrap(msg, "reading from redis") } case <-ctx.Done(): diff --git a/server/service/service_osquery.go b/server/service/service_osquery.go index 9ff248566d..f5c8fb8a7e 100644 --- a/server/service/service_osquery.go +++ b/server/service/service_osquery.go @@ -11,6 +11,7 @@ import ( hostctx "github.com/kolide/kolide-ose/server/contexts/host" "github.com/kolide/kolide-ose/server/errors" "github.com/kolide/kolide-ose/server/kolide" + "github.com/kolide/kolide-ose/server/pubsub" "golang.org/x/net/context" ) @@ -420,7 +421,23 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows [] err = svc.resultStore.WriteResult(res) if err != nil { - return osqueryError{message: "writing results: " + err.Error()} + nErr, ok := err.(pubsub.Error) + if !ok || !nErr.NoSubscriber() { + return osqueryError{message: "writing results: " + err.Error()} + } + + // If there are no subscribers, the campaign is "orphaned" + // and should be closed so that we don't continue trying to + // execute that query when we can't write to any subscriber + campaign, err := svc.ds.DistributedQueryCampaign(uint(campaignID)) + if err != nil { + return osqueryError{message: "loading orphaned campaign: " + err.Error()} + } + + campaign.Status = kolide.QueryComplete + if err := svc.ds.SaveDistributedQueryCampaign(campaign); err != nil { + return osqueryError{message: "closing orphaned campaign: " + err.Error()} + } } // Record execution of the query diff --git a/server/service/service_osquery_test.go b/server/service/service_osquery_test.go index 1c7e00d7a6..41145f7973 100644 --- a/server/service/service_osquery_test.go +++ b/server/service/service_osquery_test.go @@ -665,10 +665,6 @@ func TestDistributedQueries(t *testing.T) { queryKey: expectedRows, } - // Submit results (should error because no one is listening) - err = svc.SubmitDistributedQueryResults(ctx, results) - assert.NotNil(t, err) - // TODO use service method readChan, err := rs.ReadChannel(ctx, *campaign) require.Nil(t, err) @@ -716,3 +712,60 @@ func TestDistributedQueries(t *testing.T) { waitComplete.Wait() } + +func TestOrphanedQueryCampaign(t *testing.T) { + ds, err := inmem.New(config.TestConfig()) + require.Nil(t, err) + + rs := pubsub.NewInmemQueryResults() + + svc, err := newTestService(ds, rs) + require.Nil(t, err) + + ctx := context.Background() + + nodeKey, err := svc.EnrollAgent(ctx, "", "host123") + require.Nil(t, err) + + host, err := ds.AuthenticateHost(nodeKey) + require.Nil(t, err) + + ctx = viewer.NewContext(context.Background(), viewer.Viewer{ + User: &kolide.User{ + ID: 0, + }, + }) + q := "select year, month, day, hour, minutes, seconds from time" + campaign, err := svc.NewDistributedQueryCampaign(ctx, q, []uint{}, []uint{}) + require.Nil(t, err) + + campaign.Status = kolide.QueryRunning + err = ds.SaveDistributedQueryCampaign(campaign) + require.Nil(t, err) + + queryKey := fmt.Sprintf("%s%d", hostDistributedQueryPrefix, campaign.ID) + + expectedRows := []map[string]string{ + { + "year": "2016", + "month": "11", + "day": "11", + "hour": "6", + "minutes": "12", + "seconds": "10", + }, + } + results := map[string][]map[string]string{ + queryKey: expectedRows, + } + + // Submit results + ctx = hostctx.NewContext(context.Background(), *host) + err = svc.SubmitDistributedQueryResults(ctx, results) + require.Nil(t, err) + + // The campaign should be set to completed because it is orphaned + campaign, err = ds.DistributedQueryCampaign(campaign.ID) + require.Nil(t, err) + assert.Equal(t, kolide.QueryComplete, campaign.Status) +}