fleet/server/logging/firehose.go
Lucas Manuel Rodriguez 2affb29381
Fix STS assume role in aws-sdk-go v2 (#30699)
Fix unreleased bug #30693.

<!-- This is an auto-generated comment: release notes by coderabbit.ai
-->

## Summary by CodeRabbit

* **Documentation**
* Updated testing documentation to include a missing command for
creating the Firehose delivery stream for "status" logs.
* **Refactor**
* Centralized AWS STS Assume Role credential configuration across
multiple AWS integrations (S3, Firehose, Kinesis, Lambda, SES) to use a
shared helper, improving maintainability and consistency.
* Removed deprecated inline credential configuration logic in favor of
the new centralized approach.

<!-- end of auto-generated comment: release notes by coderabbit.ai -->
2025-07-10 13:00:27 -03:00

216 lines
6.2 KiB
Go

package logging
import (
"context"
"encoding/json"
"errors"
"fmt"
"math"
"time"
"github.com/aws/aws-sdk-go-v2/aws"
aws_config "github.com/aws/aws-sdk-go-v2/config"
"github.com/aws/aws-sdk-go-v2/credentials"
"github.com/aws/aws-sdk-go-v2/service/firehose"
"github.com/aws/aws-sdk-go-v2/service/firehose/types"
"github.com/fleetdm/fleet/v4/server/aws_common"
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
"github.com/go-kit/log"
"github.com/go-kit/log/level"
)
const (
firehoseMaxRetries = 8
// See
// https://docs.aws.amazon.com/sdk-for-go/api/service/firehose/#Firehose.PutRecordBatch
// for documentation on limits.
firehoseMaxRecordsInBatch = 500
firehoseMaxSizeOfRecord = 1000 * 1000 // 1,000 KB
firehoseMaxSizeOfBatch = 4 * 1000 * 1000 // 4 MB
)
type FirehoseAPI interface {
PutRecordBatch(ctx context.Context, input *firehose.PutRecordBatchInput, optFns ...func(*firehose.Options)) (*firehose.PutRecordBatchOutput, error)
DescribeDeliveryStream(ctx context.Context, input *firehose.DescribeDeliveryStreamInput, optFns ...func(*firehose.Options)) (*firehose.DescribeDeliveryStreamOutput, error)
}
type firehoseLogWriter struct {
client FirehoseAPI
stream string
logger log.Logger
}
func NewFirehoseLogWriter(region, endpointURL, id, secret, stsAssumeRoleArn, stsExternalID, stream string, logger log.Logger) (*firehoseLogWriter, error) {
var opts []func(*aws_config.LoadOptions) error
// The service endpoint is deprecated, but we still set it
// in case users are using it.
if endpointURL != "" {
opts = append(opts, aws_config.WithEndpointResolver(aws.EndpointResolverFunc(
func(service, region string) (aws.Endpoint, error) {
return aws.Endpoint{
URL: endpointURL,
}, nil
})),
)
}
// Only provide static credentials if we have them
// otherwise use the default credentials provider chain.
if id != "" && secret != "" {
opts = append(opts,
aws_config.WithCredentialsProvider(credentials.NewStaticCredentialsProvider(id, secret, "")),
)
}
opts = append(opts, aws_config.WithRegion(region))
conf, err := aws_config.LoadDefaultConfig(context.Background(), opts...)
if err != nil {
return nil, fmt.Errorf("failed to create default config: %w", err)
}
if stsAssumeRoleArn != "" {
conf, err = aws_common.ConfigureAssumeRoleProvider(conf, opts, stsAssumeRoleArn, stsExternalID)
if err != nil {
return nil, fmt.Errorf("failed to configure assume role provider: %w", err)
}
}
firehoseClient := firehose.NewFromConfig(conf)
f := &firehoseLogWriter{
client: firehoseClient,
stream: stream,
logger: logger,
}
if err := f.validateStream(context.Background()); err != nil {
return nil, fmt.Errorf("validate firehose: %w", err)
}
return f, nil
}
func (f *firehoseLogWriter) validateStream(ctx context.Context) error {
out, err := f.client.DescribeDeliveryStream(ctx,
&firehose.DescribeDeliveryStreamInput{
DeliveryStreamName: &f.stream,
},
)
if err != nil {
return fmt.Errorf("describe stream %s: %w", f.stream, err)
}
if out.DeliveryStreamDescription.DeliveryStreamStatus != types.DeliveryStreamStatusActive {
return fmt.Errorf("delivery stream %s not active", f.stream)
}
return nil
}
func (f *firehoseLogWriter) Write(ctx context.Context, logs []json.RawMessage) error {
var records []types.Record
totalBytes := 0
for _, log := range logs {
// Add newline because Firehose does not output each record on
// a separate line.
log = append(log, '\n')
// We don't really have a good option for what to do with logs
// that are too big for Firehose. This behavior is consistent
// with osquery's behavior in the Firehose logger plugin, and
// the beginning bytes of the log should help the Fleet admin
// diagnose the query generating huge results.
if len(log) > firehoseMaxSizeOfRecord {
level.Info(f.logger).Log(
"msg", "dropping log over 1MB Firehose limit",
"size", len(log),
"log", string(log[:100])+"...",
)
continue
}
// If adding this log will exceed the limit on number of
// records in the batch, or the limit on total size of the
// records in the batch, we need to push this batch before
// adding any more.
if len(records) >= firehoseMaxRecordsInBatch ||
totalBytes+len(log) > firehoseMaxSizeOfBatch {
if err := f.putRecordBatch(ctx, 0, records); err != nil {
return ctxerr.Wrap(ctx, err, "put records")
}
totalBytes = 0
records = nil
}
records = append(records, types.Record{
Data: []byte(log),
})
totalBytes += len(log)
}
// Push the final batch
if len(records) > 0 {
if err := f.putRecordBatch(ctx, 0, records); err != nil {
return ctxerr.Wrap(ctx, err, "put records")
}
}
return nil
}
func (f *firehoseLogWriter) putRecordBatch(ctx context.Context, try int, records []types.Record) error {
if try > 0 {
time.Sleep(100 * time.Millisecond * time.Duration(math.Pow(2.0, float64(try))))
}
input := &firehose.PutRecordBatchInput{
DeliveryStreamName: &f.stream,
Records: records,
}
output, err := f.client.PutRecordBatch(ctx, input)
if err != nil {
var serviceUnavailableException *types.ServiceUnavailableException
if errors.As(err, &serviceUnavailableException) {
if try < firehoseMaxRetries {
// Retry with backoff
return f.putRecordBatch(ctx, try+1, records)
}
}
// Not retryable or retries expired
return err
}
// Check errors on individual records
if output.FailedPutCount != nil && *output.FailedPutCount > 0 {
if try >= firehoseMaxRetries {
// Retrieve first error message to provide to user.
// There could be up to firehoseMaxRecordsInBatch
// errors here and we don't want to flood that.
var errMsg string
for _, record := range output.RequestResponses {
if record.ErrorCode != nil && record.ErrorMessage != nil {
errMsg = *record.ErrorMessage
break
}
}
return fmt.Errorf(
"failed to put %d records, retries exhausted. First error: %s",
*output.FailedPutCount, errMsg,
)
}
var failedRecords []types.Record
// Collect failed records for retry
for i, record := range output.RequestResponses {
if record.ErrorCode != nil {
failedRecords = append(failedRecords, records[i])
}
}
return f.putRecordBatch(ctx, try+1, failedRecords)
}
return nil
}