Moving context.WithoutCancel outside defer. (#17260)

#17197

Fixing orphaned live queries when context is canceled

Co-authored-by: Lucas Rodriguez <lucas@fleetdm.com>
This commit is contained in:
Victor Lyuboslavsky 2024-02-29 10:39:27 -06:00 committed by GitHub
parent f248701e0f
commit 1a679d0882
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
2 changed files with 30 additions and 19 deletions

View file

@ -156,17 +156,17 @@ func (svc *Service) NewDistributedQueryCampaign(ctx context.Context, queryString
}
}
err = svc.liveQueryStore.RunQuery(strconv.Itoa(int(campaign.ID)), queryString, hostIDs)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "run query")
}
// Metrics are used for total hosts targeted for the activity feed.
campaign.Metrics, err = svc.ds.CountHostsInTargets(ctx, filter, targets, time.Now())
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "counting hosts")
}
err = svc.liveQueryStore.RunQuery(strconv.Itoa(int(campaign.ID)), queryString, hostIDs)
if err != nil {
return nil, ctxerr.Wrap(ctx, err, "run query")
}
return campaign, nil
}

View file

@ -263,27 +263,30 @@ func (svc *Service) RunLiveQueryDeadline(
queryIDPtr = nil
queryString = query
}
campaign, err := svc.NewDistributedQueryCampaign(ctx, queryString, queryIDPtr, fleet.HostTargets{HostIDs: hostIDs})
if err != nil {
level.Error(svc.logger).Log(
"msg", "new distributed query campaign",
"queryString", queryString,
"queryID", queryID,
"err", err,
)
resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err}
return
}
queryID = campaign.QueryID
readChan, cancelFunc, err := svc.GetCampaignReader(ctx, campaign)
if err != nil {
resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err}
return
}
defer cancelFunc()
// We do not want to use the outer `ctx` directly because we want to cleanup the campaign
// even if the outer `ctx` is canceled (e.g. a client terminating the connection).
// Also, we make sure stats and activity DB operations don't get killed after we return results.
ctxWithoutCancel := context.WithoutCancel(ctx)
defer func() {
// We do not want to use the outer `ctx` directly because we want to cleanup the campaign
// even if the outer `ctx` is canceled (e.g. a client terminating the connection).
ctx := context.WithoutCancel(ctx)
err := svc.CompleteCampaign(ctx, campaign)
err := svc.CompleteCampaign(ctxWithoutCancel, campaign)
if err != nil {
level.Error(svc.logger).Log("msg", "completing campaign (sync)", "query.id", campaign.QueryID, "err", err)
level.Error(svc.logger).Log(
"msg", "completing campaign (sync)", "query.id", campaign.QueryID, "campaign.id", campaign.ID, "err", err,
)
resultsCh <- fleet.QueryCampaignResult{
QueryID: queryID,
Error: ptr.String(err.Error()),
@ -292,6 +295,16 @@ func (svc *Service) RunLiveQueryDeadline(
}
}()
readChan, cancelFunc, err := svc.GetCampaignReader(ctx, campaign)
if err != nil {
level.Error(svc.logger).Log(
"msg", "get campaign reader", "query.id", campaign.QueryID, "campaign.id", campaign.ID, "err", err,
)
resultsCh <- fleet.QueryCampaignResult{QueryID: queryID, Error: ptr.String(err.Error()), Err: err}
return
}
defer cancelFunc()
var results []fleet.QueryResult
timeout := time.After(deadline)
@ -305,8 +318,6 @@ func (svc *Service) RunLiveQueryDeadline(
level.Error(svc.logger).Log("msg", "error checking saved query", "query.id", campaign.QueryID, "err", err)
perfStatsTracker.saveStats = false
}
// to make sure stats and activity DB operations don't get killed after we return results.
ctxWithoutCancel := context.WithoutCancel(ctx)
totalHosts := campaign.Metrics.TotalHosts
// We update aggregated stats and activity at the end asynchronously.
defer func() {