From 220e55269e189132e1bcbf4e0cb3db5655e3ab1f Mon Sep 17 00:00:00 2001 From: Lucas Manuel Rodriguez Date: Tue, 27 Feb 2024 17:47:05 -0300 Subject: [PATCH] Fix orphaned live queries in Redis when client terminates connection (#17202) #17197 The following endpoints have the issue: ``` curl -v -k -X POST -H "Authorization: Bearer $TEST_TOKEN" https://localhost:8080/api/latest/fleet/queries/758/run -d '{"host_ids": [15858]}' # Hit Ctrl + C before the API returns. curl -v -k -X GET -H "Authorization: Bearer $TEST_TOKEN" https://localhost:8080/api/latest/fleet/queries/run -d '{"query_ids": [758], "host_ids": [15858]}' # Hit Ctrl + C before the API returns. curl -k -X POST -H "Authorization: Bearer $TEST_TOKEN" https://localhost:8080/api/latest/fleet/hosts/15858/query -d '{"query": "SELECT * FROM osquery_info;"}' # Hit Ctrl + C before the API returns. curl -k -X POST -H "Authorization: Bearer $TEST_TOKEN" https://localhost:8080/api/latest/fleet/hosts/identifier/2A249326-34B7-4B1D-BEB5-9B3A23BC30E6/query -d '{"query": "SELECT * FROM os_version;"}' # Hit Ctrl + C before the API returns. redis-cli -h 127.0.0.1 127.0.0.1:6379> SMEMBERS livequery:active # Will list the four live queries for 7 days... and Fleet will be in live query mode for such 7 days... ``` - [X] Changes file added for user-visible changes in `changes/` or `orbit/changes/`. See [Changes files](https://fleetdm.com/docs/contributing/committing-changes#changes-files) for more information. - [ ] Added/updated tests - [x] Manual QA for all new/changed functionality --- changes/17197-cleanup-campaign-when-ctx-canceled | 1 + server/service/live_queries.go | 12 ++++++++++-- server/service/service_campaigns.go | 9 ++++++++- 3 files changed, 19 insertions(+), 3 deletions(-) create mode 100644 changes/17197-cleanup-campaign-when-ctx-canceled diff --git a/changes/17197-cleanup-campaign-when-ctx-canceled b/changes/17197-cleanup-campaign-when-ctx-canceled new file mode 100644 index 0000000000..7ff75f5515 --- /dev/null +++ b/changes/17197-cleanup-campaign-when-ctx-canceled @@ -0,0 +1 @@ +* Fix orphaned live queries in Redis when client terminates connection (`POST /api/latest/fleet/queries/{id}/run`, `GET /api/latest/fleet/queries/run`, `POST /api/latest/fleet/hosts/identifier/{identifier}/query` and `POST /api/latest/fleet/hosts/{id}/query`). diff --git a/server/service/live_queries.go b/server/service/live_queries.go index 06b1cabf8b..7bbe6a79b1 100644 --- a/server/service/live_queries.go +++ b/server/service/live_queries.go @@ -190,7 +190,7 @@ func runLiveQueryOnHost(svc fleet.Service, ctx context.Context, host *fleet.Host } res.Rows = queryResult.Rows res.HostID = queryResult.HostID - } else { // timeout waiting for results + } else { err = errors.New("timeout waiting for results") } if err != nil { @@ -278,9 +278,17 @@ func (svc *Service) RunLiveQueryDeadline( defer cancelFunc() defer func() { + // We do not want to use the outer `ctx` directly because we want to cleanup the campaign + // even if the outer `ctx` is canceled (e.g. a client terminating the connection). + ctx := context.WithoutCancel(ctx) err := svc.CompleteCampaign(ctx, campaign) if err != nil { - resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err} + level.Error(svc.logger).Log("msg", "completing campaign (sync)", "query.id", campaign.QueryID, "err", err) + resultsCh <- fleet.QueryCampaignResult{ + QueryID: queryID, + Error: ptr.String(err.Error()), + Err: err, + } } }() diff --git a/server/service/service_campaigns.go b/server/service/service_campaigns.go index 4703947708..615f0543c0 100644 --- a/server/service/service_campaigns.go +++ b/server/service/service_campaigns.go @@ -97,7 +97,14 @@ func (svc Service) StreamCampaignResults(ctx context.Context, conn *websocket.Co // Setting the status to completed stops the query from being sent to // targets. If this fails, there is a background job that will clean up // this campaign. - defer svc.CompleteCampaign(ctx, campaign) //nolint:errcheck + defer func() { + // We do not want to use the outer `ctx` because we want to make sure + // to cleanup the campaign. + ctx := context.WithoutCancel(ctx) + if err := svc.CompleteCampaign(ctx, campaign); err != nil { + level.Error(logger).Log("msg", "complete campaign (async)", "err", err) + } + }() status := campaignStatus{ Status: campaignStatusPending,