diff --git a/server/pubsub/doc.go b/server/pubsub/doc.go index edbbfac0db..73974910da 100644 --- a/server/pubsub/doc.go +++ b/server/pubsub/doc.go @@ -1,2 +1,24 @@ // Package pubsub implements pub/sub interfaces defined in package kolide. package pubsub + +// Error defines the interface of errors specific to the pubsub package +type Error interface { + error + // NoSubscriber returns true if the error occurred because there are no + // subscribers on the channel + NoSubscriber() bool +} + +// NoSubscriberError can be returned when channel operations fail because there +// are no subscribers. Its NoSubscriber() method always returns true. +type noSubscriberError struct { + Channel string +} + +func (e noSubscriberError) Error() string { + return "no subscriber for channel " + e.Channel +} + +func (e noSubscriberError) NoSubscriber() bool { + return true +} diff --git a/server/pubsub/inmem_query_results.go b/server/pubsub/inmem_query_results.go index 68dc3bfff7..59af988e7c 100644 --- a/server/pubsub/inmem_query_results.go +++ b/server/pubsub/inmem_query_results.go @@ -1,7 +1,7 @@ package pubsub import ( - "errors" + "strconv" "sync" "golang.org/x/net/context" @@ -37,14 +37,14 @@ func (im *inmemQueryResults) getChannel(id uint) chan interface{} { func (im *inmemQueryResults) WriteResult(result kolide.DistributedQueryResult) error { channel, ok := im.resultChannels[result.DistributedQueryCampaignID] if !ok { - return errors.New("no subscribers for channel") + return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))} } select { case channel <- result: // intentionally do nothing default: - return errors.New("no subscribers for channel") + return noSubscriberError{strconv.Itoa(int(result.DistributedQueryCampaignID))} } return nil diff --git a/server/pubsub/query_results_test.go b/server/pubsub/query_results_test.go index 84ee60edea..c4c6794c78 100644 --- a/server/pubsub/query_results_test.go +++ b/server/pubsub/query_results_test.go @@ -91,10 +91,10 @@ func setupRedis(t *testing.T) (store *redisQueryResults, teardown func()) { } func testQueryResultsStoreErrors(t *testing.T, store kolide.QueryResultStore) { - // Test handling results for two campaigns in parallel + // Write with no subscriber err := store.WriteResult( kolide.DistributedQueryResult{ - DistributedQueryCampaignID: 1, + DistributedQueryCampaignID: 9999, Rows: []map[string]string{{"bing": "fds"}}, Host: kolide.Host{ ID: 4, @@ -108,6 +108,10 @@ func testQueryResultsStoreErrors(t *testing.T, store kolide.QueryResultStore) { }, ) assert.NotNil(t, err) + castErr, ok := err.(Error) + if assert.True(t, ok, "err should be pubsub.Error") { + assert.True(t, castErr.NoSubscriber(), "NoSubscriber() should be true") + } } func testQueryResultsStore(t *testing.T, store kolide.QueryResultStore) { diff --git a/server/pubsub/redis_query_results.go b/server/pubsub/redis_query_results.go index f5b0cccfdb..b3018bed90 100644 --- a/server/pubsub/redis_query_results.go +++ b/server/pubsub/redis_query_results.go @@ -2,7 +2,6 @@ package pubsub import ( "encoding/json" - "errors" "fmt" "time" @@ -10,6 +9,7 @@ import ( "github.com/garyburd/redigo/redis" "github.com/kolide/kolide-ose/server/kolide" + "github.com/pkg/errors" ) type redisQueryResults struct { @@ -66,15 +66,15 @@ func (r *redisQueryResults) WriteResult(result kolide.DistributedQueryResult) er jsonVal, err := json.Marshal(&result) if err != nil { - return errors.New("error marshalling JSON for writing result: " + err.Error()) + return errors.Wrap(err, "marshalling JSON for result") } n, err := redis.Int(conn.Do("PUBLISH", channelName, string(jsonVal))) if err != nil { - return fmt.Errorf("PUBLISH failed to channel %s: %s", channelName, err.Error()) + return errors.Wrap(err, "PUBLISH failed to channel "+channelName) } if n == 0 { - return fmt.Errorf("no subscribers for channel %s", channelName) + return noSubscriberError{channelName} } return nil @@ -142,7 +142,7 @@ func (r *redisQueryResults) ReadChannel(ctx context.Context, query kolide.Distri } outChannel <- res case error: - outChannel <- msg + outChannel <- errors.Wrap(msg, "reading from redis") } case <-ctx.Done(): diff --git a/server/service/service_osquery.go b/server/service/service_osquery.go index 9ff248566d..f5c8fb8a7e 100644 --- a/server/service/service_osquery.go +++ b/server/service/service_osquery.go @@ -11,6 +11,7 @@ import ( hostctx "github.com/kolide/kolide-ose/server/contexts/host" "github.com/kolide/kolide-ose/server/errors" "github.com/kolide/kolide-ose/server/kolide" + "github.com/kolide/kolide-ose/server/pubsub" "golang.org/x/net/context" ) @@ -420,7 +421,23 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows [] err = svc.resultStore.WriteResult(res) if err != nil { - return osqueryError{message: "writing results: " + err.Error()} + nErr, ok := err.(pubsub.Error) + if !ok || !nErr.NoSubscriber() { + return osqueryError{message: "writing results: " + err.Error()} + } + + // If there are no subscribers, the campaign is "orphaned" + // and should be closed so that we don't continue trying to + // execute that query when we can't write to any subscriber + campaign, err := svc.ds.DistributedQueryCampaign(uint(campaignID)) + if err != nil { + return osqueryError{message: "loading orphaned campaign: " + err.Error()} + } + + campaign.Status = kolide.QueryComplete + if err := svc.ds.SaveDistributedQueryCampaign(campaign); err != nil { + return osqueryError{message: "closing orphaned campaign: " + err.Error()} + } } // Record execution of the query diff --git a/server/service/service_osquery_test.go b/server/service/service_osquery_test.go index 1c7e00d7a6..41145f7973 100644 --- a/server/service/service_osquery_test.go +++ b/server/service/service_osquery_test.go @@ -665,10 +665,6 @@ func TestDistributedQueries(t *testing.T) { queryKey: expectedRows, } - // Submit results (should error because no one is listening) - err = svc.SubmitDistributedQueryResults(ctx, results) - assert.NotNil(t, err) - // TODO use service method readChan, err := rs.ReadChannel(ctx, *campaign) require.Nil(t, err) @@ -716,3 +712,60 @@ func TestDistributedQueries(t *testing.T) { waitComplete.Wait() } + +func TestOrphanedQueryCampaign(t *testing.T) { + ds, err := inmem.New(config.TestConfig()) + require.Nil(t, err) + + rs := pubsub.NewInmemQueryResults() + + svc, err := newTestService(ds, rs) + require.Nil(t, err) + + ctx := context.Background() + + nodeKey, err := svc.EnrollAgent(ctx, "", "host123") + require.Nil(t, err) + + host, err := ds.AuthenticateHost(nodeKey) + require.Nil(t, err) + + ctx = viewer.NewContext(context.Background(), viewer.Viewer{ + User: &kolide.User{ + ID: 0, + }, + }) + q := "select year, month, day, hour, minutes, seconds from time" + campaign, err := svc.NewDistributedQueryCampaign(ctx, q, []uint{}, []uint{}) + require.Nil(t, err) + + campaign.Status = kolide.QueryRunning + err = ds.SaveDistributedQueryCampaign(campaign) + require.Nil(t, err) + + queryKey := fmt.Sprintf("%s%d", hostDistributedQueryPrefix, campaign.ID) + + expectedRows := []map[string]string{ + { + "year": "2016", + "month": "11", + "day": "11", + "hour": "6", + "minutes": "12", + "seconds": "10", + }, + } + results := map[string][]map[string]string{ + queryKey: expectedRows, + } + + // Submit results + ctx = hostctx.NewContext(context.Background(), *host) + err = svc.SubmitDistributedQueryResults(ctx, results) + require.Nil(t, err) + + // The campaign should be set to completed because it is orphaned + campaign, err = ds.DistributedQueryCampaign(campaign.ID) + require.Nil(t, err) + assert.Equal(t, kolide.QueryComplete, campaign.Status) +}