mirror of
https://github.com/fleetdm/fleet
synced 2026-05-22 16:39:01 +00:00
For #32571. Original PR from the community: https://github.com/fleetdm/fleet/pull/32573. Changes on this PR: - Only setting the checksum algorithm when using GCS as backend (to not break other S3 backends). - Changes for carves, bootstrap packages, and software icons which also use S3. ## Testing - [X] QA'd all new/changed functionality manually ```sh FLEET_S3_SOFTWARE_INSTALLERS_BUCKET=some-software-installers-bucket \ FLEET_S3_SOFTWARE_INSTALLERS_ACCESS_KEY_ID=... \ FLEET_S3_SOFTWARE_INSTALLERS_SECRET_ACCESS_KEY=... \ FLEET_S3_SOFTWARE_INSTALLERS_ENDPOINT_URL=https://storage.googleapis.com \ FLEET_S3_SOFTWARE_INSTALLERS_REGION=us \ FLEET_S3_SOFTWARE_INSTALLERS_FORCE_S3_PATH_STYLE=true \ FLEET_S3_CARVES_BUCKET=some-carves-bucket \ FLEET_S3_CARVES_ACCESS_KEY_ID=... \ FLEET_S3_CARVES_SECRET_ACCESS_KEY=... \ FLEET_S3_CARVES_ENDPOINT_URL=https://storage.googleapis.com \ FLEET_S3_CARVES_REGION=us \ FLEET_S3_CARVES_FORCE_S3_PATH_STYLE=true \ ./build/fleet serve --dev --dev_license --logging_debug 2>&1 | tee ~/fleet.txt ```
215 lines
6.6 KiB
Go
215 lines
6.6 KiB
Go
package s3
|
|
|
|
import (
|
|
"context"
|
|
"errors"
|
|
"io"
|
|
"net/url"
|
|
"path"
|
|
"runtime"
|
|
"strings"
|
|
"sync/atomic"
|
|
"time"
|
|
|
|
"github.com/aws/aws-sdk-go-v2/feature/cloudfront/sign"
|
|
"github.com/aws/aws-sdk-go-v2/feature/s3/manager"
|
|
"github.com/aws/aws-sdk-go-v2/service/s3"
|
|
types "github.com/aws/aws-sdk-go-v2/service/s3/types"
|
|
"github.com/fleetdm/fleet/v4/server/contexts/ctxerr"
|
|
"github.com/fleetdm/fleet/v4/server/fleet"
|
|
"golang.org/x/sync/errgroup"
|
|
)
|
|
|
|
const signedURLExpiresIn = 6 * time.Hour
|
|
|
|
// commonFileStore implements the common Get, Put, Exists, Sign and Cleanup
|
|
// operations typical for storage of files in the SoftwareInstallers S3 bucket
|
|
// configuration. It is used by the SoftwareInstallerStore and the
|
|
// BootstrapPackageStore. The only variable thing is the path prefix inside
|
|
// the configured bucket, e.g. for software installers it is:
|
|
//
|
|
// <bucket>/<prefix>/software-installers/<fileID>
|
|
//
|
|
// and for the bootstrap packages it is:
|
|
//
|
|
// <bucket>/<prefix>/bootstrap-packages/<fileID>
|
|
type commonFileStore struct {
|
|
*s3store
|
|
pathPrefix string
|
|
fileLabel string // how to call the file in error messages
|
|
|
|
gcs bool
|
|
}
|
|
|
|
func isGCS(endpointURL string) bool {
|
|
return strings.Contains(endpointURL, "storage.googleapis.com")
|
|
}
|
|
|
|
// Get retrieves the requested file from S3.
|
|
// It is important that the caller closes the reader when done.
|
|
func (s *commonFileStore) Get(ctx context.Context, fileID string) (io.ReadCloser, int64, error) {
|
|
key := s.keyForFile(fileID)
|
|
|
|
req, err := s.s3Client.GetObject(ctx, &s3.GetObjectInput{
|
|
Bucket: &s.bucket,
|
|
Key: &key,
|
|
})
|
|
if err != nil {
|
|
var (
|
|
noSuchBucket *types.NoSuchBucket
|
|
noSuchKey *types.NoSuchKey
|
|
notFound *types.NotFound
|
|
)
|
|
if errors.As(err, &noSuchBucket) || errors.As(err, &noSuchKey) || errors.As(err, ¬Found) {
|
|
return nil, int64(0), installerNotFoundError{}
|
|
}
|
|
return nil, int64(0), ctxerr.Wrapf(ctx, err, "retrieving %s from S3 store", s.fileLabel)
|
|
}
|
|
return req.Body, *req.ContentLength, nil
|
|
}
|
|
|
|
// Put uploads a file to S3.
|
|
func (s *commonFileStore) Put(ctx context.Context, fileID string, content io.ReadSeeker) error {
|
|
if fileID == "" {
|
|
return errors.New("S3 file identifier is empty")
|
|
}
|
|
|
|
key := s.keyForFile(fileID)
|
|
|
|
// Init the uploader with the default upload part size (5MB) and concurrency
|
|
// equal to the host's logical processors.
|
|
uploader := manager.NewUploader(s.s3Client, func(u *manager.Uploader) {
|
|
u.PartSize = manager.DefaultUploadPartSize
|
|
u.Concurrency = runtime.NumCPU()
|
|
})
|
|
|
|
var checksumAlgorithm types.ChecksumAlgorithm
|
|
if s.gcs {
|
|
checksumAlgorithm = types.ChecksumAlgorithmCrc32c // Required for GCS
|
|
}
|
|
|
|
_, err := uploader.Upload(ctx, &s3.PutObjectInput{
|
|
Bucket: &s.bucket,
|
|
Body: content,
|
|
Key: &key,
|
|
|
|
ChecksumAlgorithm: checksumAlgorithm,
|
|
})
|
|
|
|
return err
|
|
}
|
|
|
|
// Exists checks if a file exists in the S3 bucket for the ID.
|
|
func (s *commonFileStore) Exists(ctx context.Context, fileID string) (bool, error) {
|
|
key := s.keyForFile(fileID)
|
|
|
|
_, err := s.s3Client.HeadObject(ctx, &s3.HeadObjectInput{
|
|
Bucket: &s.bucket,
|
|
Key: &key,
|
|
})
|
|
if err != nil {
|
|
var (
|
|
noSuchBucket *types.NoSuchBucket
|
|
noSuchKey *types.NoSuchKey
|
|
notFound *types.NotFound
|
|
)
|
|
if errors.As(err, &noSuchBucket) || errors.As(err, &noSuchKey) || errors.As(err, ¬Found) {
|
|
return false, nil
|
|
}
|
|
return false, ctxerr.Wrapf(ctx, err, "checking existence of %s in S3 store", s.fileLabel)
|
|
}
|
|
return true, nil
|
|
}
|
|
|
|
func (s *commonFileStore) Cleanup(ctx context.Context, usedFileIDs []string, removeCreatedBefore time.Time) (int, error) {
|
|
removeCreatedBefore = removeCreatedBefore.UTC()
|
|
|
|
usedSet := make(map[string]struct{}, len(usedFileIDs))
|
|
for _, id := range usedFileIDs {
|
|
usedSet[id] = struct{}{}
|
|
}
|
|
|
|
// ListObjectsV2 defaults to a max of 1000 keys, which is sufficient for the
|
|
// cleanup task - if more files are present, the next run will get another
|
|
// 1000 and will periodically complete the cleanups.
|
|
//
|
|
// Iterating over all pages would potentially take a long time and would make
|
|
// it more likely that a conflict arises, where an unused file becomes used
|
|
// again. This approach makes it only two API requests between the read of
|
|
// used files and the deletions.
|
|
prefix := path.Join(s.prefix, s.pathPrefix)
|
|
page, err := s.s3Client.ListObjectsV2(ctx, &s3.ListObjectsV2Input{
|
|
Bucket: &s.bucket,
|
|
Prefix: &prefix,
|
|
})
|
|
if err != nil {
|
|
return 0, ctxerr.Wrapf(ctx, err, "listing %s in S3 store", s.fileLabel)
|
|
}
|
|
|
|
// NOTE: there is an inherent risk that we could delete files that were added
|
|
// between the query to list used IDs and now. We minimize that risk by
|
|
// checking that the S3 file was created before removeCreatedBefore.
|
|
var toDeleteKeys []*types.ObjectIdentifier
|
|
for _, item := range page.Contents {
|
|
if item.Key == nil {
|
|
continue
|
|
}
|
|
if _, ok := usedSet[path.Base(*item.Key)]; ok {
|
|
continue
|
|
}
|
|
if item.LastModified == nil || !item.LastModified.UTC().After(removeCreatedBefore) {
|
|
// default to doing the cleanup if we don't have the timestamp information
|
|
toDeleteKeys = append(toDeleteKeys, &types.ObjectIdentifier{Key: item.Key})
|
|
}
|
|
}
|
|
|
|
if len(toDeleteKeys) == 0 {
|
|
return 0, nil
|
|
}
|
|
|
|
var deleted atomic.Int32
|
|
var g errgroup.Group
|
|
g.SetLimit(10)
|
|
|
|
for _, obj := range toDeleteKeys {
|
|
obj := obj
|
|
g.Go(func() error {
|
|
_, err := s.s3Client.DeleteObject(ctx, &s3.DeleteObjectInput{
|
|
Bucket: &s.bucket,
|
|
Key: obj.Key,
|
|
})
|
|
if err != nil {
|
|
return ctxerr.Wrapf(ctx, err, "deleting %s in S3 store", s.fileLabel)
|
|
}
|
|
deleted.Add(1)
|
|
return nil
|
|
})
|
|
}
|
|
|
|
if err := g.Wait(); err != nil {
|
|
return int(deleted.Load()), ctxerr.Wrap(ctx, err, "errors occurred during S3 deletion")
|
|
}
|
|
|
|
return int(deleted.Load()), ctxerr.Wrapf(ctx, err, "deleting %s in S3 store", s.fileLabel)
|
|
}
|
|
|
|
func (s *commonFileStore) Sign(ctx context.Context, fileID string) (string, error) {
|
|
if s.cloudFrontConfig == nil {
|
|
return "", ctxerr.Wrapf(ctx, fleet.ErrNotConfigured, "signing %s URL in S3 store", s.fileLabel)
|
|
}
|
|
urlToAccess, err := url.JoinPath(s.cloudFrontConfig.BaseURL, s.keyForFile(fileID))
|
|
if err != nil {
|
|
return "", ctxerr.Wrapf(ctx, err, "building URL for %s with ID %s in S3 store", s.fileLabel, fileID)
|
|
}
|
|
signer := sign.NewURLSigner(s.cloudFrontConfig.SigningPublicKeyID, s.cloudFrontConfig.Signer)
|
|
signedURL, err := signer.Sign(urlToAccess, time.Now().Add(signedURLExpiresIn))
|
|
if err != nil {
|
|
return "", ctxerr.Wrapf(ctx, err, "signing %s URL %s in S3 store", s.fileLabel, urlToAccess)
|
|
}
|
|
return signedURL, nil
|
|
}
|
|
|
|
// keyForFile builds an S3 key to identify the file.
|
|
func (s *commonFileStore) keyForFile(fileID string) string {
|
|
return path.Join(s.prefix, s.pathPrefix, fileID)
|
|
}
|