From 971eca9b2b5e86dc815326592a319a36753ed3c9 Mon Sep 17 00:00:00 2001 From: Zachary Wasserman Date: Tue, 10 Jan 2017 19:34:32 -0800 Subject: [PATCH] Push distributed query errors over results websocket (#878) As of recently, osquery will report when a distributed query fails. We now expose errors over the results websocket. When a query errored on the host, the `error` key in the result will be non-null. Note that osquery currently doesn't provide any details so the error string will always be "failed". I anticipate that we will fix this and the string is included for future-proofing. Successful result: ``` { "type": "result", "data": { "distributed_query_execution_id": 15, "host": { ... omitted ... }, "rows": [ { "hour": "1" } ], "error": null } } ``` Failed result: ``` { "type": "result", "data": { "distributed_query_execution_id": 14, "host": { ... omitted ... }, "rows": [ ], "error": "failed" } } ``` --- server/kolide/campaigns.go | 5 +++++ server/kolide/osquery.go | 2 +- server/service/endpoint_osquery.go | 7 +++--- server/service/logging_osquery.go | 4 ++-- server/service/service_osquery.go | 28 +++++++++++++++++------- server/service/service_osquery_test.go | 8 ++++--- server/service/transport_osquery.go | 10 +++++---- server/service/transport_osquery_test.go | 7 ++++-- 8 files changed, 48 insertions(+), 23 deletions(-) diff --git a/server/kolide/campaigns.go b/server/kolide/campaigns.go index 9991d5f4d5..7be241315d 100644 --- a/server/kolide/campaigns.go +++ b/server/kolide/campaigns.go @@ -103,6 +103,11 @@ type DistributedQueryResult struct { DistributedQueryCampaignID uint `json:"distributed_query_execution_id"` Host Host `json:"host"` Rows []map[string]string `json:"rows"` + // osquery currently doesn't return any helpful error information, + // but we use string here instead of bool for future-proofing. Note also + // that we can't use the error interface here because something + // implementing that interface may not (un)marshal properly + Error *string `json:"error"` } // DistributedQueryExecution is the metadata associated with a distributed diff --git a/server/kolide/osquery.go b/server/kolide/osquery.go index 897b3c728c..e26b2c25c0 100644 --- a/server/kolide/osquery.go +++ b/server/kolide/osquery.go @@ -7,7 +7,7 @@ type OsqueryService interface { AuthenticateHost(ctx context.Context, nodeKey string) (host *Host, err error) GetClientConfig(ctx context.Context) (config *OsqueryConfig, err error) GetDistributedQueries(ctx context.Context) (queries map[string]string, err error) - SubmitDistributedQueryResults(ctx context.Context, results OsqueryDistributedQueryResults) (err error) + SubmitDistributedQueryResults(ctx context.Context, results OsqueryDistributedQueryResults, statuses map[string]string) (err error) SubmitStatusLogs(ctx context.Context, logs []OsqueryStatusLog) (err error) SubmitResultLogs(ctx context.Context, logs []OsqueryResultLog) (err error) } diff --git a/server/service/endpoint_osquery.go b/server/service/endpoint_osquery.go index 48bf4fe2f8..ed1cebfe6f 100644 --- a/server/service/endpoint_osquery.go +++ b/server/service/endpoint_osquery.go @@ -90,8 +90,9 @@ func makeGetDistributedQueriesEndpoint(svc kolide.Service) endpoint.Endpoint { //////////////////////////////////////////////////////////////////////////////// type submitDistributedQueryResultsRequest struct { - NodeKey string `json:"node_key"` - Results kolide.OsqueryDistributedQueryResults `json:"queries"` + NodeKey string `json:"node_key"` + Results kolide.OsqueryDistributedQueryResults `json:"queries"` + Statuses map[string]string `json:"statuses"` } type submitDistributedQueryResultsResponse struct { @@ -103,7 +104,7 @@ func (r submitDistributedQueryResultsResponse) error() error { return r.Err } func makeSubmitDistributedQueryResultsEndpoint(svc kolide.Service) endpoint.Endpoint { return func(ctx context.Context, request interface{}) (interface{}, error) { req := request.(submitDistributedQueryResultsRequest) - err := svc.SubmitDistributedQueryResults(ctx, req.Results) + err := svc.SubmitDistributedQueryResults(ctx, req.Results, req.Statuses) if err != nil { return submitDistributedQueryResultsResponse{Err: err}, nil } diff --git a/server/service/logging_osquery.go b/server/service/logging_osquery.go index 67d069cdca..808af09c99 100644 --- a/server/service/logging_osquery.go +++ b/server/service/logging_osquery.go @@ -79,7 +79,7 @@ func (mw loggingMiddleware) GetDistributedQueries(ctx context.Context) (map[stri return queries, err } -func (mw loggingMiddleware) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults) error { +func (mw loggingMiddleware) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults, statuses map[string]string) error { var ( err error ) @@ -92,7 +92,7 @@ func (mw loggingMiddleware) SubmitDistributedQueryResults(ctx context.Context, r ) }(time.Now()) - err = mw.Service.SubmitDistributedQueryResults(ctx, results) + err = mw.Service.SubmitDistributedQueryResults(ctx, results, statuses) return err } diff --git a/server/service/service_osquery.go b/server/service/service_osquery.go index 998f6e8050..8c65ed64d4 100644 --- a/server/service/service_osquery.go +++ b/server/service/service_osquery.go @@ -3,13 +3,11 @@ package service import ( "encoding/json" "fmt" - "net/http" "strconv" "strings" "time" hostctx "github.com/kolide/kolide-ose/server/contexts/host" - "github.com/kolide/kolide-ose/server/errors" "github.com/kolide/kolide-ose/server/kolide" "github.com/kolide/kolide-ose/server/pubsub" "golang.org/x/net/context" @@ -132,7 +130,7 @@ func (svc service) SubmitStatusLogs(ctx context.Context, logs []kolide.OsquerySt for _, log := range logs { err := json.NewEncoder(svc.osqueryStatusLogWriter).Encode(log) if err != nil { - return errors.NewFromError(err, http.StatusInternalServerError, "error writing status log") + return osqueryError{message: "error writing status log: " + err.Error()} } } @@ -153,7 +151,7 @@ func (svc service) SubmitResultLogs(ctx context.Context, logs []kolide.OsqueryRe for _, log := range logs { err := json.NewEncoder(svc.osqueryResultLogWriter).Encode(log) if err != nil { - return errors.NewFromError(err, http.StatusInternalServerError, "error writing result log") + return osqueryError{message: "error writing result log: " + err.Error()} } } @@ -419,7 +417,7 @@ func (svc service) ingestLabelQuery(host kolide.Host, query string, rows []map[s // ingestDistributedQuery takes the results of a distributed query and modifies the // provided kolide.Host appropriately. -func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []map[string]string) error { +func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows []map[string]string, failed bool) error { trimmedQuery := strings.TrimPrefix(name, hostDistributedQueryPrefix) campaignID, err := strconv.Atoi(trimmedQuery) @@ -433,6 +431,12 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows [] Host: host, Rows: rows, } + if failed { + // osquery errors are not currently helpful, but we should fix + // them to be better in the future + errString := "failed" + res.Error = &errString + } err = svc.resultStore.WriteResult(res) if err != nil { @@ -456,10 +460,14 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows [] } // Record execution of the query + status := kolide.ExecutionSucceeded + if failed { + status = kolide.ExecutionFailed + } exec := &kolide.DistributedQueryExecution{ HostID: host.ID, DistributedQueryCampaignID: uint(campaignID), - Status: kolide.ExecutionSucceeded, + Status: status, } _, err = svc.ds.NewDistributedQueryExecution(exec) @@ -470,7 +478,7 @@ func (svc service) ingestDistributedQuery(host kolide.Host, name string, rows [] return nil } -func (svc service) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults) error { +func (svc service) SubmitDistributedQueryResults(ctx context.Context, results kolide.OsqueryDistributedQueryResults, statuses map[string]string) error { host, ok := hostctx.FromContext(ctx) if !ok { @@ -490,7 +498,11 @@ func (svc service) SubmitDistributedQueryResults(ctx context.Context, results ko case strings.HasPrefix(query, hostLabelQueryPrefix): err = svc.ingestLabelQuery(host, query, rows, labelResults) case strings.HasPrefix(query, hostDistributedQueryPrefix): - err = svc.ingestDistributedQuery(host, query, rows) + // osquery docs say any nonzero (string) value for + // status indicates a query error + status, ok := statuses[query] + failed := ok && status != "0" + err = svc.ingestDistributedQuery(host, query, rows, failed) default: err = osqueryError{message: "unknown query prefix: " + query} } diff --git a/server/service/service_osquery_test.go b/server/service/service_osquery_test.go index 04aca88319..ad1a1dfa7d 100644 --- a/server/service/service_osquery_test.go +++ b/server/service/service_osquery_test.go @@ -324,6 +324,7 @@ func TestLabelQueries(t *testing.T) { map[string][]map[string]string{ hostLabelQueryPrefix + "1": {{"col1": "val1"}}, }, + map[string]string{}, ) assert.Nil(t, err) @@ -358,6 +359,7 @@ func TestLabelQueries(t *testing.T) { hostLabelQueryPrefix + "2": {{"col1": "val1"}}, hostLabelQueryPrefix + "3": {}, }, + map[string]string{}, ) assert.Nil(t, err) @@ -567,7 +569,7 @@ func TestDetailQueries(t *testing.T) { require.Nil(t, err) // Verify that results are ingested properly - svc.SubmitDistributedQueryResults(ctx, results) + svc.SubmitDistributedQueryResults(ctx, results, map[string]string{}) // Make sure the result saved to the datastore host, err = ds.AuthenticateHost(nodeKey) @@ -715,7 +717,7 @@ func TestDistributedQueries(t *testing.T) { // this test. time.Sleep(10 * time.Millisecond) - err = svc.SubmitDistributedQueryResults(ctx, results) + err = svc.SubmitDistributedQueryResults(ctx, results, map[string]string{}) require.Nil(t, err) // Now the distributed query should be completed and not returned @@ -775,7 +777,7 @@ func TestOrphanedQueryCampaign(t *testing.T) { // Submit results ctx = hostctx.NewContext(context.Background(), *host) - err = svc.SubmitDistributedQueryResults(ctx, results) + err = svc.SubmitDistributedQueryResults(ctx, results, map[string]string{}) require.Nil(t, err) // The campaign should be set to completed because it is orphaned diff --git a/server/service/transport_osquery.go b/server/service/transport_osquery.go index f95124cf0a..73d5319609 100644 --- a/server/service/transport_osquery.go +++ b/server/service/transport_osquery.go @@ -48,8 +48,9 @@ func decodeSubmitDistributedQueryResultsRequest(ctx context.Context, r *http.Req // "node_key":"IGXCXknWQ1baTa8TZ6rF3kAPZ4\/aTsui" // } type distributedQueryResultsShim struct { - NodeKey string `json:"node_key"` - Results map[string]json.RawMessage `json:"queries"` + NodeKey string `json:"node_key"` + Results map[string]json.RawMessage `json:"queries"` + Statuses map[string]string `json:"statuses"` } var shim distributedQueryResultsShim @@ -68,8 +69,9 @@ func decodeSubmitDistributedQueryResultsRequest(ctx context.Context, r *http.Req } req := submitDistributedQueryResultsRequest{ - NodeKey: shim.NodeKey, - Results: results, + NodeKey: shim.NodeKey, + Results: results, + Statuses: shim.Statuses, } return req, nil diff --git a/server/service/transport_osquery_test.go b/server/service/transport_osquery_test.go index 0068002160..e9a55ba03f 100644 --- a/server/service/transport_osquery_test.go +++ b/server/service/transport_osquery_test.go @@ -2,11 +2,12 @@ package service import ( "bytes" - "golang.org/x/net/context" "net/http" "net/http/httptest" "testing" + "golang.org/x/net/context" + "github.com/gorilla/mux" "github.com/kolide/kolide-ose/server/kolide" "github.com/stretchr/testify/assert" @@ -95,6 +96,7 @@ func TestDecodeSubmitDistributedQueryResultsRequest(t *testing.T) { }, "id3": {}, }, params.Results) + assert.Equal(t, map[string]string{"id1": "0", "id3": "1"}, params.Statuses) }).Methods("POST") // Note we explicitly test the case that requires using the shim @@ -111,7 +113,8 @@ func TestDecodeSubmitDistributedQueryResultsRequest(t *testing.T) { {"col3": "val5", "col4": "val6"} ], "id3": "" - } + }, + "statuses": {"id1": "0", "id3": "1"} }`)) router.ServeHTTP(