Fix orphaned live queries in Redis when client terminates connection (#17202)

#17197

The following endpoints have the issue:
```
curl -v -k -X POST -H "Authorization: Bearer $TEST_TOKEN" https://localhost:8080/api/latest/fleet/queries/758/run -d '{"host_ids": [15858]}'
# Hit Ctrl + C before the API returns.

curl -v -k -X GET -H "Authorization: Bearer $TEST_TOKEN" https://localhost:8080/api/latest/fleet/queries/run -d '{"query_ids": [758], "host_ids": [15858]}'
# Hit Ctrl + C before the API returns.

curl -k -X POST -H "Authorization: Bearer $TEST_TOKEN" https://localhost:8080/api/latest/fleet/hosts/15858/query -d '{"query": "SELECT * FROM osquery_info;"}'
# Hit Ctrl + C before the API returns.

curl -k -X POST -H "Authorization: Bearer $TEST_TOKEN" https://localhost:8080/api/latest/fleet/hosts/identifier/2A249326-34B7-4B1D-BEB5-9B3A23BC30E6/query -d '{"query": "SELECT * FROM os_version;"}'
# Hit Ctrl + C before the API returns.

redis-cli -h 127.0.0.1
127.0.0.1:6379> SMEMBERS livequery:active
# Will list the four live queries for 7 days... and Fleet will be in live query mode for such 7 days...
```

- [X] Changes file added for user-visible changes in `changes/` or
`orbit/changes/`.
See [Changes
files](https://fleetdm.com/docs/contributing/committing-changes#changes-files)
for more information.
- [ ] Added/updated tests
- [x] Manual QA for all new/changed functionality
This commit is contained in:
Lucas Manuel Rodriguez 2024-02-27 17:47:05 -03:00 committed by GitHub
parent b95e723275
commit 220e55269e
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
3 changed files with 19 additions and 3 deletions

View file

@ -0,0 +1 @@
* Fix orphaned live queries in Redis when client terminates connection (`POST /api/latest/fleet/queries/{id}/run`, `GET /api/latest/fleet/queries/run`, `POST /api/latest/fleet/hosts/identifier/{identifier}/query` and `POST /api/latest/fleet/hosts/{id}/query`).

View file

@ -190,7 +190,7 @@ func runLiveQueryOnHost(svc fleet.Service, ctx context.Context, host *fleet.Host
}
res.Rows = queryResult.Rows
res.HostID = queryResult.HostID
} else { // timeout waiting for results
} else {
err = errors.New("timeout waiting for results")
}
if err != nil {
@ -278,9 +278,17 @@ func (svc *Service) RunLiveQueryDeadline(
defer cancelFunc()
defer func() {
// We do not want to use the outer `ctx` directly because we want to cleanup the campaign
// even if the outer `ctx` is canceled (e.g. a client terminating the connection).
ctx := context.WithoutCancel(ctx)
err := svc.CompleteCampaign(ctx, campaign)
if err != nil {
resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err}
level.Error(svc.logger).Log("msg", "completing campaign (sync)", "query.id", campaign.QueryID, "err", err)
resultsCh <- fleet.QueryCampaignResult{
QueryID: queryID,
Error: ptr.String(err.Error()),
Err: err,
}
}
}()

View file

@ -97,7 +97,14 @@ func (svc Service) StreamCampaignResults(ctx context.Context, conn *websocket.Co
// Setting the status to completed stops the query from being sent to
// targets. If this fails, there is a background job that will clean up
// this campaign.
defer svc.CompleteCampaign(ctx, campaign) //nolint:errcheck
defer func() {
// We do not want to use the outer `ctx` because we want to make sure
// to cleanup the campaign.
ctx := context.WithoutCancel(ctx)
if err := svc.CompleteCampaign(ctx, campaign); err != nil {
level.Error(logger).Log("msg", "complete campaign (async)", "err", err)
}
}()
status := campaignStatus{
Status: campaignStatusPending,