refactor: use github.com/go-logr/logr for logging (#162)

This commit is contained in:
Mikhail Mazurskiy 2020-10-27 11:14:56 +11:00 committed by GitHub
parent 4eb3ca3fee
commit 31311943a5
No known key found for this signature in database
GPG key ID: 4AEE18F83AFDEB23
29 changed files with 723 additions and 554 deletions

View file

@ -25,4 +25,8 @@ agent-image:
.PHONY: agent-manifests
agent-manifests:
kustomize build ./agent/manifests/cluster-install > ./agent/manifests/install.yaml
kustomize build ./agent/manifests/namespace-install > ./agent/manifests/install-namespaced.yaml
kustomize build ./agent/manifests/namespace-install > ./agent/manifests/install-namespaced.yaml
.PHONY: generate-mocks
generate-mocks:
go generate -x -v "github.com/argoproj/gitops-engine/pkg/utils/tracing/tracer_testing"

View file

@ -14,16 +14,15 @@ import (
"text/tabwriter"
"time"
log "github.com/sirupsen/logrus"
"github.com/go-logr/logr"
"github.com/spf13/cobra"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/tools/clientcmd"
"k8s.io/klog/v2/klogr"
"github.com/argoproj/gitops-engine/pkg/cache"
"github.com/argoproj/gitops-engine/pkg/engine"
"github.com/argoproj/gitops-engine/pkg/sync"
"github.com/argoproj/gitops-engine/pkg/utils/errors"
"github.com/argoproj/gitops-engine/pkg/utils/io"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
)
@ -32,10 +31,9 @@ const (
)
func main() {
if err := newCmd().Execute(); err != nil {
fmt.Fprintln(os.Stderr, err)
os.Exit(1)
}
log := klogr.New() // Delegates to klog
err := newCmd(log).Execute()
checkError(err, log)
}
type resourceInfo struct {
@ -98,7 +96,7 @@ func (s *settings) parseManifests() ([]*unstructured.Unstructured, string, error
return res, string(revision), nil
}
func newCmd() *cobra.Command {
func newCmd(log logr.Logger) *cobra.Command {
var (
clientConfig clientcmd.ClientConfig
paths []string
@ -117,10 +115,10 @@ func newCmd() *cobra.Command {
}
s := settings{args[0], paths}
config, err := clientConfig.ClientConfig()
errors.CheckErrorWithCode(err, errors.ErrorCommandSpecific)
checkError(err, log)
if namespace == "" {
namespace, _, err = clientConfig.Namespace()
errors.CheckErrorWithCode(err, errors.ErrorCommandSpecific)
checkError(err, log)
}
var namespaces []string
@ -129,6 +127,7 @@ func newCmd() *cobra.Command {
}
clusterCache := cache.NewClusterCache(config,
cache.SetNamespaces(namespaces),
cache.SetLogr(log),
cache.SetPopulateResourceInfoHandler(func(un *unstructured.Unstructured, isRoot bool) (info interface{}, cacheManifest bool) {
// store gc mark of every resource
gcMark := un.GetAnnotations()[annotationGCMark]
@ -138,43 +137,42 @@ func newCmd() *cobra.Command {
return
}),
)
gitOpsEngine := engine.NewEngine(config, clusterCache)
errors.CheckErrorWithCode(err, errors.ErrorCommandSpecific)
gitOpsEngine := engine.NewEngine(config, clusterCache, engine.WithLogr(log))
checkError(err, log)
closer, err := gitOpsEngine.Run()
errors.CheckErrorWithCode(err, errors.ErrorCommandSpecific)
defer io.Close(closer)
cleanup, err := gitOpsEngine.Run()
checkError(err, log)
defer cleanup()
resync := make(chan bool)
go func() {
ticker := time.NewTicker(time.Second * time.Duration(resyncSeconds))
for {
<-ticker.C
log.Infof("Synchronization triggered by timer")
log.Info("Synchronization triggered by timer")
resync <- true
}
}()
http.HandleFunc("/api/v1/sync", func(writer http.ResponseWriter, request *http.Request) {
log.Infof("Synchronization triggered by API call")
log.Info("Synchronization triggered by API call")
resync <- true
})
go func() {
errors.CheckErrorWithCode(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", port), nil), errors.ErrorCommandSpecific)
checkError(http.ListenAndServe(fmt.Sprintf("0.0.0.0:%d", port), nil), log)
}()
for ; true; <-resync {
target, revision, err := s.parseManifests()
if err != nil {
log.Warnf("failed to parse target state: %v", err)
log.Error(err, "Failed to parse target state")
continue
}
result, err := gitOpsEngine.Sync(context.Background(), target, func(r *cache.Resource) bool {
return r.Info.(*resourceInfo).gcMark == s.getGCMark(r.ResourceKey())
}, revision, namespace, sync.WithPrune(prune))
}, revision, namespace, sync.WithPrune(prune), sync.WithLogr(log))
if err != nil {
log.Warnf("failed to synchronize cluster state: %v", err)
log.Error(err, "Failed to synchronize cluster state")
continue
}
w := tabwriter.NewWriter(os.Stdout, 0, 0, 2, ' ', 0)
@ -209,3 +207,11 @@ func addKubectlFlagsToCmd(cmd *cobra.Command) clientcmd.ClientConfig {
clientcmd.BindOverrideFlags(&overrides, cmd.PersistentFlags(), kflags)
return clientcmd.NewInteractiveDeferredLoadingClientConfig(loadingRules, &overrides, os.Stdin)
}
// checkError is a convenience function to check if an error is non-nil and exit if it was
func checkError(err error, log logr.Logger) {
if err != nil {
log.Error(err, "Fatal error")
os.Exit(1)
}
}

4
go.mod
View file

@ -4,7 +4,8 @@ go 1.14
require (
github.com/evanphx/json-patch v4.9.0+incompatible
github.com/sirupsen/logrus v1.6.0
github.com/go-logr/logr v0.2.0
github.com/golang/mock v1.4.4
github.com/spf13/cobra v1.0.0
github.com/stretchr/testify v1.6.1
golang.org/x/sync v0.0.0-20200625203802-6e8e738ad208
@ -13,6 +14,7 @@ require (
k8s.io/apimachinery v0.19.2
k8s.io/cli-runtime v0.19.2
k8s.io/client-go v0.19.2
k8s.io/klog/v2 v2.2.0
k8s.io/kube-aggregator v0.19.2
k8s.io/kubectl v0.19.2
k8s.io/kubernetes v1.19.2

5
go.sum
View file

@ -236,7 +236,10 @@ github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7 h1:5ZkaAPbicIKTF
github.com/golang/groupcache v0.0.0-20191227052852-215e87163ea7/go.mod h1:cIg4eruTrX1D+g88fzRXU5OdNfaM+9IcxsU14FzY7Hc=
github.com/golang/mock v1.1.1/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.2.0/go.mod h1:oTYuIxOrZwtPieC+H1uAHpcLFnEyAGVDL/k47Jfbm0A=
github.com/golang/mock v1.3.1 h1:qGJ6qTW+x6xX/my+8YUVl4WNpX9B7+/l2tRsHGZ7f2s=
github.com/golang/mock v1.3.1/go.mod h1:sBzyDLLjw3U8JLTeZvSv8jJB+tU5PVekmnlKIyFUx0Y=
github.com/golang/mock v1.4.4 h1:l75CXGRSwbaYNpl/Z2X1XIIAMSCquvXgpVZDhwEIJsc=
github.com/golang/mock v1.4.4/go.mod h1:l3mdAwkq5BuhzHwde/uurv3sEJeZMXNpwsxVWU71h+4=
github.com/golang/protobuf v1.2.0/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.1/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
github.com/golang/protobuf v1.3.2/go.mod h1:6lQm79b+lXiMfvg/cZm0SGofjICqVBUtrP5yJMmIC1U=
@ -562,6 +565,7 @@ golang.org/x/mod v0.0.0-20190513183733-4bf6d317e70e/go.mod h1:mXi4GBBbnImb6dmsKG
golang.org/x/mod v0.1.0/go.mod h1:0QHyrYULN0/3qlju5TqG8bIK38QM8yzMo5ekMj3DlcY=
golang.org/x/mod v0.1.1-0.20191105210325-c90efee705ee/go.mod h1:QqPTAvyqsEbceGzBzNggFXnrqF1CaUcvgkdR5Ot7KZg=
golang.org/x/mod v0.2.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/mod v0.3.0 h1:RM4zey1++hCTbCVQfnWeKs9/IEsaBLA8vTkd0WVtmH4=
golang.org/x/mod v0.3.0/go.mod h1:s0Qsj1ACt9ePp/hMypM3fl4fZqREWJwdYDEqhRiZZUA=
golang.org/x/net v0.0.0-20180724234803-3673e40ba225/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
golang.org/x/net v0.0.0-20180826012351-8a410e7b638d/go.mod h1:mL1N/T3taQHkDXs73rZJwtUhF3w3ftmwwsq0BUmARs4=
@ -681,6 +685,7 @@ golang.org/x/tools v0.0.0-20191012152004-8de300cfc20a/go.mod h1:b+2E5dAYhXwXZwtn
golang.org/x/tools v0.0.0-20191119224855-298f0cb1881e/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191125144606-a911d9008d1f/go.mod h1:b+2E5dAYhXwXZwtnZ6UAqBI28+e2cm9otk0dWdXHAEo=
golang.org/x/tools v0.0.0-20191227053925-7b8e75db28f4/go.mod h1:TB2adYChydJhpapKDTa4BR/hXlZSLoq2Wpct/0txZ28=
golang.org/x/tools v0.0.0-20200616133436-c1934b75d054 h1:HHeAlu5H9b71C+Fx0K+1dGgVFN1DM1/wz4aoGOA5qS8=
golang.org/x/tools v0.0.0-20200616133436-c1934b75d054/go.mod h1:EkVYQZoAsY45+roYkvgYkIh4xh/qjgUK9TdY2XT94GE=
golang.org/x/xerrors v0.0.0-20190717185122-a985d3407aa7/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=
golang.org/x/xerrors v0.0.0-20191011141410-1b5146add898/go.mod h1:I/5z698sn9Ka8TeJc9MKroUUfqBBauWjQqLJ2OPfmY0=

53
pkg/cache/cluster.go vendored
View file

@ -9,7 +9,7 @@ import (
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/go-logr/logr"
"golang.org/x/sync/semaphore"
"k8s.io/apimachinery/pkg/api/errors"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -23,8 +23,10 @@ import (
"k8s.io/client-go/tools/cache"
"k8s.io/client-go/tools/pager"
watchutil "k8s.io/client-go/tools/watch"
"k8s.io/klog/v2/klogr"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
)
const (
@ -110,21 +112,25 @@ type WeightedSemaphore interface {
// NewClusterCache creates new instance of cluster cache
func NewClusterCache(config *rest.Config, opts ...UpdateSettingsFunc) *clusterCache {
log := klogr.New()
cache := &clusterCache{
resyncTimeout: clusterResyncTimeout,
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
apisMeta: make(map[schema.GroupKind]*apiMeta),
listPageSize: defaultListPageSize,
listPageBufferSize: defaultListPageBufferSize,
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
resources: make(map[kube.ResourceKey]*Resource),
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
config: config,
kubectl: &kube.KubectlCmd{},
resyncTimeout: clusterResyncTimeout,
settings: Settings{ResourceHealthOverride: &noopSettings{}, ResourcesFilter: &noopSettings{}},
apisMeta: make(map[schema.GroupKind]*apiMeta),
listPageSize: defaultListPageSize,
listPageBufferSize: defaultListPageBufferSize,
listSemaphore: semaphore.NewWeighted(defaultListSemaphoreWeight),
resources: make(map[kube.ResourceKey]*Resource),
nsIndex: make(map[string]map[kube.ResourceKey]*Resource),
config: config,
kubectl: &kube.KubectlCmd{
Log: log,
Tracer: tracing.NopTracer{},
},
syncTime: nil,
resourceUpdatedHandlers: map[uint64]OnResourceUpdatedHandler{},
eventHandlers: map[uint64]OnEventHandler{},
log: log.WithField("server", config.Host),
log: log,
}
for i := range opts {
opts[i](cache)
@ -154,7 +160,7 @@ type clusterCache struct {
nsIndex map[string]map[kube.ResourceKey]*Resource
kubectl kube.Kubectl
log *log.Entry
log logr.Logger
config *rest.Config
namespaces []string
settings Settings
@ -315,7 +321,7 @@ func (c *clusterCache) Invalidate(opts ...UpdateSettingsFunc) {
}
c.apisMeta = nil
c.namespacedResources = nil
c.log.Warnf("invalidated cluster")
c.log.Info("Invalidated cluster")
}
func (c *clusterCache) synced() bool {
@ -336,7 +342,7 @@ func (c *clusterCache) stopWatching(gk schema.GroupKind, ns string) {
info.watchCancel()
delete(c.apisMeta, gk)
c.replaceResourceCache(gk, nil, ns)
c.log.Warnf("Stop watching: %s not found", gk)
c.log.Info(fmt.Sprintf("Stop watching: %s not found", gk))
}
}
@ -398,7 +404,7 @@ func (c *clusterCache) listResources(ctx context.Context, resClient dynamic.Reso
}
func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo, resClient dynamic.ResourceInterface, ns string, resourceVersion string) {
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), func() (err error) {
kube.RetryUntilSucceed(ctx, watchResourcesRetryTimeout, fmt.Sprintf("watch %s on %s", api.GroupKind, c.config.Host), c.log, func() (err error) {
defer func() {
if r := recover(); r != nil {
err = fmt.Errorf("Recovered from panic: %+v\n%s", r, debug.Stack())
@ -493,7 +499,7 @@ func (c *clusterCache) watchEvents(ctx context.Context, api kube.APIResourceInfo
return c.startMissingWatches()
})
if err != nil {
c.log.Warnf("Failed to start missing watch: %v", err)
c.log.Error(err, "Failed to start missing watch")
}
}
}
@ -586,8 +592,7 @@ func (c *clusterCache) sync() error {
})
if err != nil {
log.Errorf("Failed to sync cluster %s: %v", c.config.Host, err)
return err
return fmt.Errorf("failed to sync cluster %s: %v", c.config.Host, err)
}
c.log.Info("Cluster successfully synced")
@ -653,7 +658,13 @@ func (c *clusterCache) IterateHierarchy(key kube.ResourceKey, action func(resour
})
child := children[0]
action(child, nsNodes)
child.iterateChildren(nsNodes, map[kube.ResourceKey]bool{res.ResourceKey(): true}, action)
child.iterateChildren(nsNodes, map[kube.ResourceKey]bool{res.ResourceKey(): true}, func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource) {
if err != nil {
c.log.V(2).Info(err.Error())
return
}
action(child, namespaceResources)
})
}
}
}
@ -744,7 +755,7 @@ func (c *clusterCache) GetManagedLiveObjs(targetObjs []*unstructured.Unstructure
converted, err := c.kubectl.ConvertToVersion(managedObj, targetObj.GroupVersionKind().Group, targetObj.GroupVersionKind().Version)
if err != nil {
// fallback to loading resource from kubernetes if conversion fails
log.Debugf("Failed to convert resource: %v", err)
c.log.V(1).Info(fmt.Sprintf("Failed to convert resource: %v", err))
managedObj, err = c.kubectl.GetResource(context.TODO(), c.config, targetObj.GroupVersionKind(), managedObj.GetName(), managedObj.GetNamespace())
if err != nil {
if errors.IsNotFound(err) {

View file

@ -5,7 +5,6 @@ import (
"fmt"
"strings"
log "github.com/sirupsen/logrus"
v1 "k8s.io/api/apps/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -52,7 +51,7 @@ func (c *clusterCache) resolveResourceReferences(un *unstructured.Unstructured)
case (un.GroupVersionKind().Group == "apps" || un.GroupVersionKind().Group == "extensions") && un.GetKind() == kube.StatefulSetKind:
if refs, err := isStatefulSetChild(un); err != nil {
log.Errorf("Failed to extract StatefulSet %s/%s PVC references: %v", un.GetNamespace(), un.GetName(), err)
c.log.Error(err, fmt.Sprintf("Failed to extract StatefulSet %s/%s PVC references", un.GetNamespace(), un.GetName()))
} else {
isInferredParentOf = refs
}

View file

@ -1,7 +1,8 @@
package cache
import (
log "github.com/sirupsen/logrus"
"fmt"
v1 "k8s.io/api/core/v1"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -84,14 +85,14 @@ func newResourceKeySet(set map[kube.ResourceKey]bool, keys ...kube.ResourceKey)
return newSet
}
func (r *Resource) iterateChildren(ns map[kube.ResourceKey]*Resource, parents map[kube.ResourceKey]bool, action func(child *Resource, namespaceResources map[kube.ResourceKey]*Resource)) {
func (r *Resource) iterateChildren(ns map[kube.ResourceKey]*Resource, parents map[kube.ResourceKey]bool, action func(err error, child *Resource, namespaceResources map[kube.ResourceKey]*Resource)) {
for childKey, child := range ns {
if r.isParentOf(ns[childKey]) {
if parents[childKey] {
key := r.ResourceKey()
log.Warnf("Circular dependency detected. %s is child and parent of %s", childKey.String(), key.String())
action(fmt.Errorf("circular dependency detected. %s is child and parent of %s", childKey.String(), key.String()), child, ns)
} else {
action(child, ns)
action(nil, child, ns)
child.iterateChildren(ns, newResourceKeySet(parents, r.ResourceKey()), action)
}
}

38
pkg/cache/settings.go vendored
View file

@ -1,15 +1,15 @@
package cache
import (
"reflect"
"time"
log "github.com/sirupsen/logrus"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/rest"
"github.com/argoproj/gitops-engine/pkg/health"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
)
type noopSettings struct {
@ -50,30 +50,21 @@ func SetPopulateResourceInfoHandler(handler OnPopulateResourceInfoHandler) Updat
// SetSettings updates caching settings
func SetSettings(settings Settings) UpdateSettingsFunc {
return func(cache *clusterCache) {
if !reflect.DeepEqual(cache.settings, settings) {
log.WithField("server", cache.config.Host).Infof("Changing cluster cache settings to: %v", settings)
cache.settings = Settings{settings.ResourceHealthOverride, settings.ResourcesFilter}
}
cache.settings = Settings{settings.ResourceHealthOverride, settings.ResourcesFilter}
}
}
// SetNamespaces updates list of monitored namespaces
func SetNamespaces(namespaces []string) UpdateSettingsFunc {
return func(cache *clusterCache) {
if !reflect.DeepEqual(cache.namespaces, namespaces) {
log.WithField("server", cache.config.Host).Infof("Changing cluster namespaces to: %v", namespaces)
cache.namespaces = namespaces
}
cache.namespaces = namespaces
}
}
// SetConfig updates cluster rest config
func SetConfig(config *rest.Config) UpdateSettingsFunc {
return func(cache *clusterCache) {
if !reflect.DeepEqual(cache.config, config) {
log.WithField("server", cache.config.Host).Infof("Changing cluster config to: %v", config)
cache.config = config
}
cache.config = config
}
}
@ -105,3 +96,22 @@ func SetResyncTimeout(timeout time.Duration) UpdateSettingsFunc {
cache.resyncTimeout = timeout
}
}
// SetLogr sets the logger to use.
func SetLogr(log logr.Logger) UpdateSettingsFunc {
return func(cache *clusterCache) {
cache.log = log
if kcmd, ok := cache.kubectl.(*kube.KubectlCmd); ok {
kcmd.Log = log
}
}
}
// SetTracer sets the tracer to use.
func SetTracer(tracer tracing.Tracer) UpdateSettingsFunc {
return func(cache *clusterCache) {
if kcmd, ok := cache.kubectl.(*kube.KubectlCmd); ok {
kcmd.Tracer = tracer
}
}
}

View file

@ -5,14 +5,13 @@ The package provide functions that allows to compare set of Kubernetes resources
package diff
import (
"bytes"
"encoding/json"
"errors"
"fmt"
"reflect"
"strings"
jsonpatch "github.com/evanphx/json-patch"
log "github.com/sirupsen/logrus"
corev1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
@ -29,12 +28,6 @@ import (
const couldNotMarshalErrMsg = "Could not unmarshal to object of type %s: %v"
// Holds diffing settings
type DiffOptions struct {
// If set to true then differences caused by aggregated roles in RBAC resources are ignored.
IgnoreAggregatedRoles bool `json:"ignoreAggregatedRoles,omitempty"`
}
// Holds diffing result of two resources
type DiffResult struct {
// Modified is set to true if resources are not matching
@ -68,32 +61,30 @@ func GetNoopNormalizer() Normalizer {
return &noopNormalizer{}
}
// Returns the default diff options
func GetDefaultDiffOptions() DiffOptions {
return DiffOptions{
IgnoreAggregatedRoles: false,
}
}
// Diff performs a diff on two unstructured objects. If the live object happens to have a
// "kubectl.kubernetes.io/last-applied-configuration", then perform a three way diff.
func Diff(config, live *unstructured.Unstructured, normalizer Normalizer, options DiffOptions) (*DiffResult, error) {
func Diff(config, live *unstructured.Unstructured, opts ...Option) (*DiffResult, error) {
o := applyOptions(opts)
if config != nil {
config = remarshal(config)
Normalize(config, normalizer, options)
config = remarshal(config, o)
Normalize(config, opts...)
}
if live != nil {
live = remarshal(live)
Normalize(live, normalizer, options)
live = remarshal(live, o)
Normalize(live, opts...)
}
orig := GetLastAppliedConfigAnnotation(live)
if orig != nil && config != nil {
Normalize(orig, normalizer, options)
dr, err := ThreeWayDiff(orig, config, live)
if err == nil {
return dr, nil
orig, err := GetLastAppliedConfigAnnotation(live)
if err != nil {
o.log.V(1).Info(fmt.Sprintf("Failed to get last applied configuration: %v", err))
} else {
if orig != nil && config != nil {
Normalize(orig, opts...)
dr, err := ThreeWayDiff(orig, config, live)
if err == nil {
return dr, nil
}
o.log.V(1).Info("three-way diff calculation failed: %v. Falling back to two-way diff", err)
}
log.Debugf("three-way diff calculation failed: %v. Falling back to two-way diff", err)
}
return TwoWayDiff(config, live)
}
@ -398,33 +389,29 @@ func threeWayMergePatch(orig, config, live *unstructured.Unstructured) ([]byte,
}
}
func GetLastAppliedConfigAnnotation(live *unstructured.Unstructured) *unstructured.Unstructured {
func GetLastAppliedConfigAnnotation(live *unstructured.Unstructured) (*unstructured.Unstructured, error) {
if live == nil {
return nil
return nil, nil
}
annots := live.GetAnnotations()
if annots == nil {
return nil
}
lastAppliedStr, ok := annots[corev1.LastAppliedConfigAnnotation]
if !ok {
return nil
return nil, nil
}
var obj unstructured.Unstructured
err := json.Unmarshal([]byte(lastAppliedStr), &obj)
if err != nil {
log.Warnf("Failed to unmarshal %s in %s", corev1.LastAppliedConfigAnnotation, live.GetName())
return nil
return nil, fmt.Errorf("failed to unmarshal %s in %s: %v", corev1.LastAppliedConfigAnnotation, live.GetName(), err)
}
return &obj
return &obj, nil
}
// DiffArray performs a diff on a list of unstructured objects. Objects are expected to match
// environments
func DiffArray(configArray, liveArray []*unstructured.Unstructured, normalizer Normalizer, options DiffOptions) (*DiffResultList, error) {
func DiffArray(configArray, liveArray []*unstructured.Unstructured, opts ...Option) (*DiffResultList, error) {
numItems := len(configArray)
if len(liveArray) != numItems {
return nil, fmt.Errorf("left and right arrays have mismatched lengths")
return nil, errors.New("left and right arrays have mismatched lengths")
}
diffResultList := DiffResultList{
@ -433,7 +420,7 @@ func DiffArray(configArray, liveArray []*unstructured.Unstructured, normalizer N
for i := 0; i < numItems; i++ {
config := configArray[i]
live := liveArray[i]
diffRes, err := Diff(config, live, normalizer, options)
diffRes, err := Diff(config, live, opts...)
if err != nil {
return nil, err
}
@ -445,10 +432,11 @@ func DiffArray(configArray, liveArray []*unstructured.Unstructured, normalizer N
return &diffResultList, nil
}
func Normalize(un *unstructured.Unstructured, normalizer Normalizer, options DiffOptions) {
func Normalize(un *unstructured.Unstructured, opts ...Option) {
if un == nil {
return
}
o := applyOptions(opts)
// creationTimestamp is sometimes set to null in the config when exported (e.g. SealedSecrets)
// Removing the field allows a cleaner diff.
@ -456,24 +444,22 @@ func Normalize(un *unstructured.Unstructured, normalizer Normalizer, options Dif
gvk := un.GroupVersionKind()
if gvk.Group == "" && gvk.Kind == "Secret" {
NormalizeSecret(un)
NormalizeSecret(un, opts...)
} else if gvk.Group == "rbac.authorization.k8s.io" && (gvk.Kind == "ClusterRole" || gvk.Kind == "Role") {
normalizeRole(un, options)
normalizeRole(un, o)
} else if gvk.Group == "" && gvk.Kind == "Endpoints" {
normalizeEndpoint(un)
normalizeEndpoint(un, o)
}
if normalizer != nil {
err := normalizer.Normalize(un)
if err != nil {
log.Warnf("Failed to normalize %s/%s/%s: %v", un.GroupVersionKind(), un.GetNamespace(), un.GetName(), err)
}
err := o.normalizer.Normalize(un)
if err != nil {
o.log.Error(err, fmt.Sprintf("Failed to normalize %s/%s/%s", un.GroupVersionKind(), un.GetNamespace(), un.GetName()))
}
}
// NormalizeSecret mutates the supplied object and encodes stringData to data, and converts nils to
// empty strings. If the object is not a secret, or is an invalid secret, then returns the same object.
func NormalizeSecret(un *unstructured.Unstructured) {
func NormalizeSecret(un *unstructured.Unstructured, opts ...Option) {
if un == nil {
return
}
@ -481,9 +467,11 @@ func NormalizeSecret(un *unstructured.Unstructured) {
if gvk.Group != "" || gvk.Kind != "Secret" {
return
}
o := applyOptions(opts)
var secret corev1.Secret
err := runtime.DefaultUnstructuredConverter.FromUnstructured(un.Object, &secret)
if err != nil {
o.log.Error(err, "Failed to convert from unstructured into Secret")
return
}
// We normalize nils to empty string to handle: https://github.com/argoproj/argo-cd/issues/943
@ -503,20 +491,20 @@ func NormalizeSecret(un *unstructured.Unstructured) {
}
newObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&secret)
if err != nil {
log.Warnf("object unable to convert from secret: %v", err)
o.log.Error(err, "object unable to convert from secret")
return
}
if secret.Data != nil {
err = unstructured.SetNestedField(un.Object, newObj["data"], "data")
if err != nil {
log.Warnf("failed to set secret.data: %v", err)
o.log.Error(err, "failed to set secret.data")
return
}
}
}
// normalizeEndpoint normalizes endpoint meaning that EndpointSubsets are sorted lexicographically
func normalizeEndpoint(un *unstructured.Unstructured) {
func normalizeEndpoint(un *unstructured.Unstructured, o options) {
if un == nil {
return
}
@ -527,12 +515,13 @@ func normalizeEndpoint(un *unstructured.Unstructured) {
var ep corev1.Endpoints
err := runtime.DefaultUnstructuredConverter.FromUnstructured(un.Object, &ep)
if err != nil {
o.log.Error(err, "Failed to convert from unstructured into Endpoints")
return
}
var coreEp core.Endpoints
err = v1.Convert_v1_Endpoints_To_core_Endpoints(&ep, &coreEp, nil)
if err != nil {
log.Warnf("Could not convert from v1 to core endpoint type %s: %v", gvk, err)
o.log.Error(err, "Could not convert from v1 to core endpoint type %s", gvk)
return
}
@ -540,18 +529,19 @@ func normalizeEndpoint(un *unstructured.Unstructured) {
err = v1.Convert_core_Endpoints_To_v1_Endpoints(&coreEp, &ep, nil)
if err != nil {
log.Warnf("Could not convert from core to vi endpoint type %s: %v", gvk, err)
o.log.Error(err, "Could not convert from core to vi endpoint type %s", gvk)
return
}
un.Object, err = runtime.DefaultUnstructuredConverter.ToUnstructured(&ep)
newObj, err := runtime.DefaultUnstructuredConverter.ToUnstructured(&ep)
if err != nil {
log.Warnf(couldNotMarshalErrMsg, gvk, err)
o.log.Info(fmt.Sprintf(couldNotMarshalErrMsg, gvk, err))
return
}
un.Object = newObj
}
// normalizeRole mutates the supplied Role/ClusterRole and sets rules to null if it is an empty list or an aggregated role
func normalizeRole(un *unstructured.Unstructured, options DiffOptions) {
func normalizeRole(un *unstructured.Unstructured, o options) {
if un == nil {
return
}
@ -561,12 +551,12 @@ func normalizeRole(un *unstructured.Unstructured, options DiffOptions) {
}
// Check whether the role we're checking is an aggregation role. If it is, we ignore any differences in rules.
if options.IgnoreAggregatedRoles {
if o.ignoreAggregatedRoles {
aggrIf, ok := un.Object["aggregationRule"]
if ok {
_, ok = aggrIf.(map[string]interface{})
if !ok {
log.Infof("Malformed aggregrationRule in resource '%s', won't modify.", un.GetName())
o.log.Info(fmt.Sprintf("Malformed aggregrationRule in resource '%s', won't modify.", un.GetName()))
} else {
un.Object["rules"] = nil
}
@ -610,7 +600,7 @@ func CreateTwoWayMergePatch(orig, new, dataStruct interface{}) ([]byte, bool, er
func HideSecretData(target *unstructured.Unstructured, live *unstructured.Unstructured) (*unstructured.Unstructured, *unstructured.Unstructured, error) {
var orig *unstructured.Unstructured
if live != nil {
orig = GetLastAppliedConfigAnnotation(live)
orig, _ = GetLastAppliedConfigAnnotation(live)
live = live.DeepCopy()
}
if target != nil {
@ -691,7 +681,7 @@ func toString(val interface{}) string {
// and allows to find differences between actual and target states more accurately.
// Remarshalling also strips any type information (e.g. float64 vs. int) from the unstructured
// object. This is important for diffing since it will cause godiff to report a false difference.
func remarshal(obj *unstructured.Unstructured) *unstructured.Unstructured {
func remarshal(obj *unstructured.Unstructured, o options) *unstructured.Unstructured {
obj = stripTypeInformation(obj)
data, err := json.Marshal(obj)
if err != nil {
@ -701,24 +691,24 @@ func remarshal(obj *unstructured.Unstructured) *unstructured.Unstructured {
item, err := scheme.Scheme.New(obj.GroupVersionKind())
if err != nil {
// This is common. the scheme is not registered
log.Debugf("Could not create new object of type %s: %v", gvk, err)
o.log.V(1).Info(fmt.Sprintf("Could not create new object of type %s: %v", gvk, err))
return obj
}
// This will drop any omitempty fields, perform resource conversion etc...
unmarshalledObj := reflect.New(reflect.TypeOf(item).Elem()).Interface()
// Unmarshal data into unmarshalledObj, but detect if there are any unknown fields that are not
// found in the target GVK object.
decoder := json.NewDecoder(strings.NewReader(string(data)))
decoder := json.NewDecoder(bytes.NewReader(data))
decoder.DisallowUnknownFields()
if err := decoder.Decode(&unmarshalledObj); err != nil {
// Likely a field present in obj that is not present in the GVK type, or user
// may have specified an invalid spec in git, so return original object
log.Debugf(couldNotMarshalErrMsg, gvk, err)
o.log.V(1).Info(fmt.Sprintf(couldNotMarshalErrMsg, gvk, err))
return obj
}
unstrBody, err := runtime.DefaultUnstructuredConverter.ToUnstructured(unmarshalledObj)
if err != nil {
log.Warnf(couldNotMarshalErrMsg, gvk, err)
o.log.V(1).Info(fmt.Sprintf(couldNotMarshalErrMsg, gvk, err))
return obj
}
// Remove all default values specified by custom formatter (e.g. creationTimestamp)

46
pkg/diff/diff_options.go Normal file
View file

@ -0,0 +1,46 @@
package diff
import (
"github.com/go-logr/logr"
"k8s.io/klog/v2/klogr"
)
type Option func(*options)
// Holds diffing settings
type options struct {
// If set to true then differences caused by aggregated roles in RBAC resources are ignored.
ignoreAggregatedRoles bool
normalizer Normalizer
log logr.Logger
}
func applyOptions(opts []Option) options {
o := options{
ignoreAggregatedRoles: false,
normalizer: GetNoopNormalizer(),
log: klogr.New(),
}
for _, opt := range opts {
opt(&o)
}
return o
}
func IgnoreAggregatedRoles(ignore bool) Option {
return func(o *options) {
o.ignoreAggregatedRoles = ignore
}
}
func WithNormalizer(normalizer Normalizer) Option {
return func(o *options) {
o.normalizer = normalizer
}
}
func WithLogr(log logr.Logger) Option {
return func(o *options) {
o.log = log
}
}

View file

@ -4,15 +4,13 @@ import (
"encoding/json"
"fmt"
"io/ioutil"
"os"
"os/exec"
"path/filepath"
"strings"
"testing"
"github.com/argoproj/gitops-engine/pkg/utils/io"
"github.com/stretchr/testify/assert"
"github.com/stretchr/testify/require"
appsv1 "k8s.io/api/apps/v1"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/api/core/v1"
@ -20,33 +18,10 @@ import (
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/apimachinery/pkg/runtime"
"k8s.io/klog/v2/klogr"
"sigs.k8s.io/yaml"
)
func captureStdout(callback func()) (string, error) {
oldStdout := os.Stdout
oldStderr := os.Stderr
r, w, err := os.Pipe()
if err != nil {
return "", err
}
os.Stdout = w
defer func() {
os.Stdout = oldStdout
os.Stderr = oldStderr
}()
callback()
io.Close(w)
data, err := ioutil.ReadAll(r)
if err != nil {
return "", err
}
return string(data), err
}
func printDiff(result *DiffResult) (string, error) {
var live unstructured.Unstructured
if err := json.Unmarshal(result.NormalizedLive, &live); err != nil {
@ -56,45 +31,42 @@ func printDiff(result *DiffResult) (string, error) {
if err := json.Unmarshal(result.PredictedLive, &target); err != nil {
return "", err
}
return captureStdout(func() {
_ = printDiffInternal("diff", &live, &target)
})
out, _ := printDiffInternal("diff", &live, &target)
return string(out), nil
}
// printDiffInternal prints a diff between two unstructured objects to stdout using an external diff utility
func printDiffInternal(name string, live *unstructured.Unstructured, target *unstructured.Unstructured) error {
// printDiffInternal prints a diff between two unstructured objects using an external diff utility and returns the output.
func printDiffInternal(name string, live *unstructured.Unstructured, target *unstructured.Unstructured) ([]byte, error) {
tempDir, err := ioutil.TempDir("", "argocd-diff")
if err != nil {
return err
return nil, err
}
targetFile := filepath.Join(tempDir, name)
targetData := []byte("")
var targetData []byte
if target != nil {
targetData, err = yaml.Marshal(target)
if err != nil {
return err
return nil, err
}
}
err = ioutil.WriteFile(targetFile, targetData, 0644)
if err != nil {
return err
return nil, err
}
liveFile := filepath.Join(tempDir, fmt.Sprintf("%s-live.yaml", name))
liveData := []byte("")
if live != nil {
liveData, err = yaml.Marshal(live)
if err != nil {
return err
return nil, err
}
}
err = ioutil.WriteFile(liveFile, liveData, 0644)
if err != nil {
return err
return nil, err
}
cmd := exec.Command("diff", liveFile, targetFile)
cmd.Stderr = os.Stderr
cmd.Stdout = os.Stdout
return cmd.Run()
return cmd.Output()
}
func toUnstructured(obj interface{}) (*unstructured.Unstructured, error) {
@ -168,8 +140,8 @@ func newDeployment() *appsv1.Deployment {
}
}
func diff(t *testing.T, config, live *unstructured.Unstructured, options DiffOptions) *DiffResult {
res, err := Diff(config, live, nil, options)
func diff(t *testing.T, config, live *unstructured.Unstructured, options ...Option) *DiffResult {
res, err := Diff(config, live, options...)
assert.NoError(t, err)
return res
}
@ -178,10 +150,10 @@ func TestDiff(t *testing.T) {
leftDep := newDeployment()
leftUn := mustToUnstructured(leftDep)
diffRes := diff(t, leftUn, leftUn, GetDefaultDiffOptions())
diffRes := diff(t, leftUn, leftUn, diffOptionsForTest()...)
assert.False(t, diffRes.Modified)
ascii, err := printDiff(diffRes)
assert.Nil(t, err)
require.NoError(t, err)
if ascii != "" {
t.Log(ascii)
}
@ -195,10 +167,10 @@ func TestDiff_KnownTypeInvalidValue(t *testing.T) {
}
t.Run("NoDifference", func(t *testing.T) {
diffRes := diff(t, leftUn, leftUn, GetDefaultDiffOptions())
diffRes := diff(t, leftUn, leftUn, diffOptionsForTest()...)
assert.False(t, diffRes.Modified)
ascii, err := printDiff(diffRes)
assert.Nil(t, err)
require.NoError(t, err)
if ascii != "" {
t.Log(ascii)
}
@ -210,7 +182,7 @@ func TestDiff_KnownTypeInvalidValue(t *testing.T) {
return
}
diffRes := diff(t, leftUn, rightUn, GetDefaultDiffOptions())
diffRes := diff(t, leftUn, rightUn, diffOptionsForTest()...)
assert.True(t, diffRes.Modified)
})
}
@ -219,7 +191,7 @@ func TestDiffWithNils(t *testing.T) {
dep := newDeployment()
resource := mustToUnstructured(dep)
diffRes := diff(t, nil, resource, GetDefaultDiffOptions())
diffRes := diff(t, nil, resource, diffOptionsForTest()...)
// NOTE: if live is non-nil, and config is nil, this is not considered difference
// This "difference" is checked at the comparator.
assert.False(t, diffRes.Modified)
@ -227,7 +199,7 @@ func TestDiffWithNils(t *testing.T) {
assert.NoError(t, err)
assert.False(t, diffRes.Modified)
diffRes = diff(t, resource, nil, GetDefaultDiffOptions())
diffRes = diff(t, resource, nil, diffOptionsForTest()...)
assert.True(t, diffRes.Modified)
diffRes, err = TwoWayDiff(resource, nil)
assert.NoError(t, err)
@ -241,9 +213,9 @@ func TestDiffNilFieldInLive(t *testing.T) {
leftUn := mustToUnstructured(leftDep)
rightUn := mustToUnstructured(rightDep)
err := unstructured.SetNestedField(rightUn.Object, nil, "spec")
assert.Nil(t, err)
require.NoError(t, err)
diffRes := diff(t, leftUn, rightUn, GetDefaultDiffOptions())
diffRes := diff(t, leftUn, rightUn, diffOptionsForTest()...)
assert.True(t, diffRes.Modified)
}
@ -256,8 +228,8 @@ func TestDiffArraySame(t *testing.T) {
left := []*unstructured.Unstructured{leftUn}
right := []*unstructured.Unstructured{rightUn}
diffResList, err := DiffArray(left, right, nil, GetDefaultDiffOptions())
assert.Nil(t, err)
diffResList, err := DiffArray(left, right, diffOptionsForTest()...)
require.NoError(t, err)
assert.False(t, diffResList.Modified)
}
@ -271,8 +243,8 @@ func TestDiffArrayAdditions(t *testing.T) {
left := []*unstructured.Unstructured{leftUn}
right := []*unstructured.Unstructured{rightUn}
diffResList, err := DiffArray(left, right, nil, GetDefaultDiffOptions())
assert.Nil(t, err)
diffResList, err := DiffArray(left, right, diffOptionsForTest()...)
require.NoError(t, err)
assert.False(t, diffResList.Modified)
}
@ -287,8 +259,8 @@ func TestDiffArrayModification(t *testing.T) {
left := []*unstructured.Unstructured{leftUn}
right := []*unstructured.Unstructured{rightUn}
diffResList, err := DiffArray(left, right, nil, GetDefaultDiffOptions())
assert.Nil(t, err)
diffResList, err := DiffArray(left, right, diffOptionsForTest()...)
require.NoError(t, err)
assert.True(t, diffResList.Modified)
}
@ -308,24 +280,24 @@ func TestThreeWayDiff(t *testing.T) {
liveDep.SetNamespace("default")
configUn := mustToUnstructured(configDep)
liveUn := mustToUnstructured(liveDep)
res := diff(t, configUn, liveUn, GetDefaultDiffOptions())
res := diff(t, configUn, liveUn, diffOptionsForTest()...)
if !assert.False(t, res.Modified) {
ascii, err := printDiff(res)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
// 3. Add a last-applied-configuration annotation in the live. There should still not be any
// difference
configBytes, err := json.Marshal(configDep)
assert.Nil(t, err)
require.NoError(t, err)
liveDep.Annotations[v1.LastAppliedConfigAnnotation] = string(configBytes)
configUn = mustToUnstructured(configDep)
liveUn = mustToUnstructured(liveDep)
res = diff(t, configUn, liveUn, GetDefaultDiffOptions())
res = diff(t, configUn, liveUn, diffOptionsForTest()...)
if !assert.False(t, res.Modified) {
ascii, err := printDiff(res)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
@ -334,7 +306,7 @@ func TestThreeWayDiff(t *testing.T) {
delete(configDep.Annotations, "foo")
configUn = mustToUnstructured(configDep)
liveUn = mustToUnstructured(liveDep)
res = diff(t, configUn, liveUn, GetDefaultDiffOptions())
res = diff(t, configUn, liveUn, diffOptionsForTest()...)
assert.True(t, res.Modified)
// 5. Just to prove three way diff incorporates last-applied-configuration, remove the
@ -344,9 +316,9 @@ func TestThreeWayDiff(t *testing.T) {
delete(liveDep.Annotations, v1.LastAppliedConfigAnnotation)
configUn = mustToUnstructured(configDep)
liveUn = mustToUnstructured(liveDep)
res = diff(t, configUn, liveUn, GetDefaultDiffOptions())
res = diff(t, configUn, liveUn, diffOptionsForTest()...)
ascii, err := printDiff(res)
assert.Nil(t, err)
require.NoError(t, err)
if ascii != "" {
t.Log(ascii)
}
@ -399,13 +371,13 @@ func TestThreeWayDiffExample1(t *testing.T) {
// since it catches a case when we comparison fails due to subtle differences in types
// (e.g. float vs. int)
err := json.Unmarshal([]byte(demoConfig), &configUn.Object)
assert.Nil(t, err)
require.NoError(t, err)
err = json.Unmarshal([]byte(demoLive), &liveUn.Object)
assert.Nil(t, err)
dr := diff(t, &configUn, &liveUn, GetDefaultDiffOptions())
require.NoError(t, err)
dr := diff(t, &configUn, &liveUn, diffOptionsForTest()...)
assert.False(t, dr.Modified)
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
if ascii != "" {
t.Log(ascii)
}
@ -418,20 +390,20 @@ func TestDiffOptionIgnoreAggregateRoles(t *testing.T) {
{
configUn := unmarshalFile("testdata/aggr-clusterrole-config.json")
liveUn := unmarshalFile("testdata/aggr-clusterrole-live.json")
dr := diff(t, configUn, liveUn, DiffOptions{IgnoreAggregatedRoles: true})
dr := diff(t, configUn, liveUn, IgnoreAggregatedRoles(true))
assert.False(t, dr.Modified)
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
// Test case 2: Ignore option is false, the aggregation should produce a diff
{
configUn := unmarshalFile("testdata/aggr-clusterrole-config.json")
liveUn := unmarshalFile("testdata/aggr-clusterrole-live.json")
dr := diff(t, configUn, liveUn, DiffOptions{IgnoreAggregatedRoles: false})
dr := diff(t, configUn, liveUn, IgnoreAggregatedRoles(false))
assert.True(t, dr.Modified)
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
}
@ -439,10 +411,10 @@ func TestDiffOptionIgnoreAggregateRoles(t *testing.T) {
func TestThreeWayDiffExample2(t *testing.T) {
configUn := unmarshalFile("testdata/elasticsearch-config.json")
liveUn := unmarshalFile("testdata/elasticsearch-live.json")
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
assert.False(t, dr.Modified)
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
@ -451,10 +423,10 @@ func TestThreeWayDiffExample3(t *testing.T) {
configUn := unmarshalFile("testdata/deployment-config.json")
liveUn := unmarshalFile("testdata/deployment-live.json")
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
assert.False(t, dr.Modified)
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
if ascii != "" {
t.Log(ascii)
}
@ -464,10 +436,10 @@ func TestThreeWayDiffExample4(t *testing.T) {
configUn := unmarshalFile("testdata/mutatingwebhookconfig-config.json")
liveUn := unmarshalFile("testdata/mutatingwebhookconfig-live.json")
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
assert.False(t, dr.Modified)
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
if ascii != "" {
t.Log(ascii)
}
@ -486,10 +458,10 @@ func TestThreeWayDiffExample2WithDifference(t *testing.T) {
delete(labels, "release")
configUn.SetLabels(labels)
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
assert.True(t, dr.Modified)
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err, ascii)
t.Log(ascii)
// Check that we indicate missing/extra/changed correctly
@ -518,10 +490,10 @@ func TestThreeWayDiffExample2WithDifference(t *testing.T) {
func TestThreeWayDiffExplicitNamespace(t *testing.T) {
configUn := unmarshalFile("testdata/spinnaker-sa-config.json")
liveUn := unmarshalFile("testdata/spinnaker-sa-live.json")
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
assert.False(t, dr.Modified)
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
@ -546,7 +518,7 @@ func TestDiffResourceWithInvalidField(t *testing.T) {
rightUn := leftUn.DeepCopy()
unstructured.RemoveNestedField(rightUn.Object, "invalidKey")
diffRes := diff(t, &leftUn, rightUn, GetDefaultDiffOptions())
diffRes := diff(t, &leftUn, rightUn, diffOptionsForTest()...)
assert.True(t, diffRes.Modified)
ascii, err := printDiff(diffRes)
assert.Nil(t, err)
@ -604,10 +576,10 @@ func TestIgnoreNamespaceForClusterScopedResources(t *testing.T) {
var configUn unstructured.Unstructured
var liveUn unstructured.Unstructured
err := yaml.Unmarshal([]byte(customObjLive), &liveUn)
assert.Nil(t, err)
require.NoError(t, err)
err = yaml.Unmarshal([]byte(customObjConfig), &configUn)
assert.Nil(t, err)
dr := diff(t, &configUn, &liveUn, GetDefaultDiffOptions())
require.NoError(t, err)
dr := diff(t, &configUn, &liveUn, diffOptionsForTest()...)
assert.False(t, dr.Modified)
}
@ -645,16 +617,16 @@ func TestSecretStringData(t *testing.T) {
var err error
var configUn unstructured.Unstructured
err = yaml.Unmarshal([]byte(secretConfig), &configUn)
assert.Nil(t, err)
require.NoError(t, err)
var liveUn unstructured.Unstructured
err = yaml.Unmarshal([]byte(secretLive), &liveUn)
assert.Nil(t, err)
require.NoError(t, err)
dr := diff(t, &configUn, &liveUn, GetDefaultDiffOptions())
dr := diff(t, &configUn, &liveUn, diffOptionsForTest()...)
if !assert.False(t, dr.Modified) {
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
}
@ -689,23 +661,23 @@ func TestInvalidSecretStringData(t *testing.T) {
var err error
var configUn unstructured.Unstructured
err = yaml.Unmarshal([]byte(secretInvalidConfig), &configUn)
assert.Nil(t, err)
require.NoError(t, err)
var liveUn unstructured.Unstructured
err = yaml.Unmarshal([]byte(secretInvalidLive), &liveUn)
assert.Nil(t, err)
require.NoError(t, err)
dr := diff(t, &configUn, nil, GetDefaultDiffOptions())
dr := diff(t, &configUn, nil, diffOptionsForTest()...)
assert.True(t, dr.Modified)
}
func TestNullSecretData(t *testing.T) {
configUn := unmarshalFile("testdata/wordpress-config.json")
liveUn := unmarshalFile("testdata/wordpress-live.json")
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
if !assert.False(t, dr.Modified) {
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
}
@ -721,10 +693,10 @@ func TestRedactedSecretData(t *testing.T) {
configData["smtp-password"] = "++++++++"
liveData["wordpress-password"] = "++++++++++++"
liveData["smtp-password"] = "++++++++++++"
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
if !assert.True(t, dr.Modified) {
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
}
@ -732,10 +704,10 @@ func TestRedactedSecretData(t *testing.T) {
func TestNullRoleRule(t *testing.T) {
configUn := unmarshalFile("testdata/grafana-clusterrole-config.json")
liveUn := unmarshalFile("testdata/grafana-clusterrole-live.json")
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
if !assert.False(t, dr.Modified) {
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
}
@ -743,10 +715,10 @@ func TestNullRoleRule(t *testing.T) {
func TestNullCreationTimestamp(t *testing.T) {
configUn := unmarshalFile("testdata/sealedsecret-config.json")
liveUn := unmarshalFile("testdata/sealedsecret-live.json")
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
if !assert.False(t, dr.Modified) {
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
}
@ -754,10 +726,10 @@ func TestNullCreationTimestamp(t *testing.T) {
func TestUnsortedEndpoints(t *testing.T) {
configUn := unmarshalFile("testdata/endpoints-config.json")
liveUn := unmarshalFile("testdata/endpoints-live.json")
dr := diff(t, configUn, liveUn, GetDefaultDiffOptions())
dr := diff(t, configUn, liveUn, diffOptionsForTest()...)
if !assert.False(t, dr.Modified) {
ascii, err := printDiff(dr)
assert.Nil(t, err)
require.NoError(t, err)
t.Log(ascii)
}
}
@ -789,7 +761,7 @@ func TestHideSecretDataSameKeysDifferentValues(t *testing.T) {
target, live, err := HideSecretData(
createSecret(map[string]string{"key1": "test", "key2": "test"}),
createSecret(map[string]string{"key1": "test-1", "key2": "test-1"}))
assert.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{"key1": replacement1, "key2": replacement1}, secretData(target))
assert.Equal(t, map[string]interface{}{"key1": replacement2, "key2": replacement2}, secretData(live))
@ -799,7 +771,7 @@ func TestHideSecretDataSameKeysSameValues(t *testing.T) {
target, live, err := HideSecretData(
createSecret(map[string]string{"key1": "test", "key2": "test"}),
createSecret(map[string]string{"key1": "test", "key2": "test"}))
assert.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{"key1": replacement1, "key2": replacement1}, secretData(target))
assert.Equal(t, map[string]interface{}{"key1": replacement1, "key2": replacement1}, secretData(live))
@ -809,7 +781,7 @@ func TestHideSecretDataDifferentKeysDifferentValues(t *testing.T) {
target, live, err := HideSecretData(
createSecret(map[string]string{"key1": "test", "key2": "test"}),
createSecret(map[string]string{"key2": "test-1", "key3": "test-1"}))
assert.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{"key1": replacement1, "key2": replacement1}, secretData(target))
assert.Equal(t, map[string]interface{}{"key2": replacement2, "key3": replacement1}, secretData(live))
@ -820,13 +792,13 @@ func TestHideSecretDataLastAppliedConfig(t *testing.T) {
targetSecret := createSecret(map[string]string{"key1": "test2"})
liveSecret := createSecret(map[string]string{"key1": "test3"})
lastAppliedStr, err := json.Marshal(lastAppliedSecret)
assert.Nil(t, err)
require.NoError(t, err)
liveSecret.SetAnnotations(map[string]string{corev1.LastAppliedConfigAnnotation: string(lastAppliedStr)})
target, live, err := HideSecretData(targetSecret, liveSecret)
assert.Nil(t, err)
require.NoError(t, err)
err = json.Unmarshal([]byte(live.GetAnnotations()[corev1.LastAppliedConfigAnnotation]), &lastAppliedSecret)
assert.Nil(t, err)
require.NoError(t, err)
assert.Equal(t, map[string]interface{}{"key1": replacement1}, secretData(target))
assert.Equal(t, map[string]interface{}{"key1": replacement2}, secretData(live))
@ -845,7 +817,7 @@ metadata:
var un unstructured.Unstructured
err := yaml.Unmarshal(manifest, &un)
assert.NoError(t, err)
newUn := remarshal(&un)
newUn := remarshal(&un, applyOptions(diffOptionsForTest()))
_, ok := newUn.Object["imagePullSecrets"]
assert.False(t, ok)
metadata := newUn.Object["metadata"].(map[string]interface{})
@ -872,7 +844,7 @@ spec:
assert.NoError(t, err)
requestsBefore := un.Object["spec"].(map[string]interface{})["containers"].([]interface{})[0].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{})
t.Log(requestsBefore)
newUn := remarshal(&un)
newUn := remarshal(&un, applyOptions(diffOptionsForTest()))
requestsAfter := newUn.Object["spec"].(map[string]interface{})["containers"].([]interface{})[0].(map[string]interface{})["resources"].(map[string]interface{})["requests"].(map[string]interface{})
t.Log(requestsAfter)
assert.Equal(t, float64(0.2), requestsBefore["cpu"])
@ -919,7 +891,7 @@ spec:
`), &liveResource); err != nil {
panic(err)
}
diff, err := Diff(&expectedResource, &liveResource, GetNoopNormalizer(), GetDefaultDiffOptions())
diff, err := Diff(&expectedResource, &liveResource, diffOptionsForTest()...)
if err != nil {
panic(err)
}
@ -927,3 +899,10 @@ spec:
fmt.Println("Resources are different")
}
}
func diffOptionsForTest() []Option {
return []Option{
WithLogr(klogr.New()),
IgnoreAggregatedRoles(false),
}
}

View file

@ -12,10 +12,9 @@ package engine
import (
"context"
"fmt"
"io"
"time"
log "github.com/sirupsen/logrus"
"github.com/go-logr/logr"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
"k8s.io/client-go/rest"
@ -23,7 +22,6 @@ import (
"github.com/argoproj/gitops-engine/pkg/diff"
"github.com/argoproj/gitops-engine/pkg/sync"
"github.com/argoproj/gitops-engine/pkg/sync/common"
ioutil "github.com/argoproj/gitops-engine/pkg/utils/io"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
)
@ -31,9 +29,11 @@ const (
operationRefreshTimeout = time.Second * 1
)
type StopFunc func()
type GitOpsEngine interface {
// Run initializes engine
Run() (io.Closer, error)
Run() (StopFunc, error)
// Synchronizes resources in the cluster
Sync(ctx context.Context, resources []*unstructured.Unstructured, isManaged func(r *cache.Resource) bool, revision string, namespace string, opts ...sync.SyncOpt) ([]common.ResourceSyncResult, error)
}
@ -42,27 +42,29 @@ type gitOpsEngine struct {
config *rest.Config
cache cache.ClusterCache
kubectl kube.Kubectl
log logr.Logger
}
// NewEngine creates new instances of the GitOps engine
func NewEngine(config *rest.Config, clusterCache cache.ClusterCache) GitOpsEngine {
func NewEngine(config *rest.Config, clusterCache cache.ClusterCache, opts ...Option) GitOpsEngine {
o := applyOptions(opts)
return &gitOpsEngine{
config: config,
kubectl: &kube.KubectlCmd{},
cache: clusterCache,
kubectl: o.kubectl,
log: o.log,
}
}
func (e *gitOpsEngine) Run() (io.Closer, error) {
func (e *gitOpsEngine) Run() (StopFunc, error) {
err := e.cache.EnsureSynced()
if err != nil {
return nil, err
}
return ioutil.NewCloser(func() error {
return func() {
e.cache.Invalidate()
return nil
}), nil
}, nil
}
func (e *gitOpsEngine) Sync(ctx context.Context,
@ -77,12 +79,12 @@ func (e *gitOpsEngine) Sync(ctx context.Context,
return nil, err
}
result := sync.Reconcile(resources, managedResources, namespace, e.cache)
diffRes, err := diff.DiffArray(result.Target, result.Live, diff.GetNoopNormalizer(), diff.GetDefaultDiffOptions())
diffRes, err := diff.DiffArray(result.Target, result.Live, diff.WithLogr(e.log))
if err != nil {
return nil, err
}
opts = append(opts, sync.WithSkipHooks(!diffRes.Modified))
syncCtx, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, log.NewEntry(log.New()), opts...)
syncCtx, err := sync.NewSyncContext(revision, result, e.config, e.config, e.kubectl, namespace, opts...)
if err != nil {
return nil, err
}

View file

@ -0,0 +1,56 @@
package engine
import (
"github.com/go-logr/logr"
"k8s.io/klog/v2/klogr"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
)
type Option func(*options)
type options struct {
log logr.Logger
kubectl kube.Kubectl
}
func applyOptions(opts []Option) options {
log := klogr.New()
o := options{
log: log,
kubectl: &kube.KubectlCmd{
Log: log,
Tracer: tracing.NopTracer{},
},
}
for _, opt := range opts {
opt(&o)
}
return o
}
func WithLogr(log logr.Logger) Option {
return func(o *options) {
o.log = log
if kcmd, ok := o.kubectl.(*kube.KubectlCmd); ok {
kcmd.Log = log
}
}
}
// SetTracer sets the tracer to use.
func SetTracer(tracer tracing.Tracer) Option {
return func(o *options) {
if kcmd, ok := o.kubectl.(*kube.KubectlCmd); ok {
kcmd.Tracer = tracer
}
}
}
// WithKubectl allows to override kubectl wrapper implementation.
func WithKubectl(kubectl kube.Kubectl) Option {
return func(o *options) {
o.kubectl = kubectl
}
}

View file

@ -8,7 +8,7 @@ import (
"sync"
"time"
log "github.com/sirupsen/logrus"
"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
"k8s.io/apiextensions-apiserver/pkg/apis/apiextensions/v1beta1"
"k8s.io/apiextensions-apiserver/pkg/client/clientset/clientset"
@ -19,6 +19,7 @@ import (
"k8s.io/client-go/discovery"
"k8s.io/client-go/dynamic"
"k8s.io/client-go/rest"
"k8s.io/klog/v2/klogr"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"github.com/argoproj/gitops-engine/pkg/health"
@ -127,6 +128,13 @@ func WithNamespaceCreation(createNamespace bool, namespaceModifier func(*unstruc
}
}
// WithLogr sets the logger to use.
func WithLogr(log logr.Logger) SyncOpt {
return func(ctx *syncContext) {
ctx.log = log
}
}
// NewSyncContext creates new instance of a SyncContext
func NewSyncContext(
revision string,
@ -135,7 +143,6 @@ func NewSyncContext(
rawConfig *rest.Config,
kubectl kubeutil.Kubectl,
namespace string,
log *log.Entry,
opts ...SyncOpt,
) (SyncContext, error) {
dynamicIf, err := dynamic.NewForConfig(restConfig)
@ -161,7 +168,7 @@ func NewSyncContext(
extensionsclientset: extensionsclientset,
kubectl: kubectl,
namespace: namespace,
log: log,
log: klogr.New(),
validate: true,
syncRes: map[string]common.ResourceSyncResult{},
permissionValidator: func(_ *unstructured.Unstructured, _ *metav1.APIResource) error {
@ -248,7 +255,7 @@ type syncContext struct {
phase common.OperationPhase
message string
log *log.Entry
log logr.Logger
// lock to protect concurrent updates of the result list
lock sync.Mutex
@ -279,23 +286,23 @@ func (sc *syncContext) setRunningPhase(tasks []*syncTask, isPendingDeletion bool
// sync has performs the actual apply or hook based sync
func (sc *syncContext) Sync() {
sc.log.WithFields(log.Fields{"skipHooks": sc.skipHooks, "started": sc.started()}).Info("syncing")
sc.log.WithValues("skipHooks", sc.skipHooks, "started", sc.started()).Info("Syncing")
tasks, ok := sc.getSyncTasks()
if !ok {
sc.setOperationPhase(common.OperationFailed, "one or more synchronization tasks are not valid")
return
}
sc.log.WithFields(log.Fields{"tasks": tasks}).Info("tasks")
// Perform a `kubectl apply --dry-run` against all the manifests. This will detect most (but
// not all) validation issues with the user's manifests (e.g. will detect syntax issues, but
// will not not detect if they are mutating immutable fields). If anything fails, we will refuse
// to perform the sync. we only wish to do this once per operation, performing additional dry-runs
// is harmless, but redundant. The indicator we use to detect if we have already performed
// the dry-run for this operation, is if the resource or hook list is empty.
if !sc.started() {
sc.log.Debug("dry-run")
if sc.started() {
sc.log.WithValues("tasks", tasks).Info("Tasks")
} else {
// Perform a `kubectl apply --dry-run` against all the manifests. This will detect most (but
// not all) validation issues with the user's manifests (e.g. will detect syntax issues, but
// will not not detect if they are mutating immutable fields). If anything fails, we will refuse
// to perform the sync. we only wish to do this once per operation, performing additional dry-runs
// is harmless, but redundant. The indicator we use to detect if we have already performed
// the dry-run for this operation, is if the resource or hook list is empty.
sc.log.WithValues("tasks", tasks).Info("Tasks (dry-run)")
if sc.runTasks(tasks, true) == failed {
sc.setOperationPhase(common.OperationFailed, "one or more objects failed to apply (dry run)")
return
@ -319,7 +326,7 @@ func (sc *syncContext) Sync() {
// this must be calculated on the live object
healthStatus, err := health.GetResourceHealth(task.liveObj, sc.healthOverride)
if err == nil {
log.WithFields(log.Fields{"task": task, "healthStatus": healthStatus}).Debug("attempting to update health of running task")
sc.log.WithValues("task", task, "healthStatus", healthStatus).V(1).Info("attempting to update health of running task")
if healthStatus == nil {
// some objects (e.g. secret) do not have health, and they automatically success
sc.setResourceResult(task, task.syncStatus, common.OperationSucceeded, task.message)
@ -364,7 +371,7 @@ func (sc *syncContext) Sync() {
return
}
sc.log.WithFields(log.Fields{"tasks": tasks}).Debug("filtering out non-pending tasks")
sc.log.WithValues("tasks", tasks).V(1).Info("Filtering out non-pending tasks")
// remove tasks that are completed, we can assume that there are no running tasks
tasks = tasks.Filter(func(t *syncTask) bool { return t.pending() })
@ -386,12 +393,12 @@ func (sc *syncContext) Sync() {
// This handles the common case where neither hooks or waves are used and a sync equates to simply an (asynchronous) kubectl apply of manifests, which succeeds immediately.
remainingTasks := tasks.Filter(func(t *syncTask) bool { return t.phase != phase || wave != t.wave() || t.isHook() })
sc.log.WithFields(log.Fields{"phase": phase, "wave": wave, "tasks": tasks, "syncFailTasks": syncFailTasks}).Debug("filtering tasks in correct phase and wave")
sc.log.WithValues("phase", phase, "wave", wave, "tasks", tasks, "syncFailTasks", syncFailTasks).V(1).Info("Filtering tasks in correct phase and wave")
tasks = tasks.Filter(func(t *syncTask) bool { return t.phase == phase && t.wave() == wave })
sc.setOperationPhase(common.OperationRunning, "one or more tasks are running")
sc.log.WithFields(log.Fields{"tasks": tasks}).Debug("wet-run")
sc.log.WithValues("tasks", tasks).V(1).Info("Wet-run")
runState := sc.runTasks(tasks, false)
switch runState {
case failed:
@ -441,7 +448,7 @@ func (sc *syncContext) setOperationFailed(syncFailTasks syncTasks, message strin
}
// otherwise, we need to start the failure hooks, and then return without setting
// the phase, so we make sure we have at least one more sync
sc.log.WithFields(log.Fields{"syncFailTasks": syncFailTasks}).Debug("running sync fail tasks")
sc.log.WithValues("syncFailTasks", syncFailTasks).V(1).Info("Running sync fail tasks")
if sc.runTasks(syncFailTasks, false) == failed {
sc.setOperationPhase(common.OperationFailed, message)
}
@ -465,8 +472,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
for k, resource := range sc.resources {
if !sc.containsResource(resource) {
sc.log.WithFields(log.Fields{"group": k.Group, "kind": k.Kind, "name": k.Name}).
Debug("skipping")
sc.log.WithValues("group", k.Group, "kind", k.Kind, "name", k.Name).V(1).Info("Skipping")
continue
}
@ -474,8 +480,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
// this creates garbage tasks
if hook.IsHook(obj) {
sc.log.WithFields(log.Fields{"group": obj.GroupVersionKind().Group, "kind": obj.GetKind(), "namespace": obj.GetNamespace(), "name": obj.GetName()}).
Debug("skipping hook")
sc.log.WithValues("group", obj.GroupVersionKind().Group, "kind", obj.GetKind(), "namespace", obj.GetNamespace(), "name", obj.GetName()).V(1).Info("Skipping hook")
continue
}
@ -484,7 +489,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
}
}
sc.log.WithFields(log.Fields{"resourceTasks": resourceTasks}).Debug("tasks from managed resources")
sc.log.WithValues("resourceTasks", resourceTasks).V(1).Info("Tasks from managed resources")
hookTasks := syncTasks{}
if !sc.skipHooks {
@ -511,7 +516,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
}
}
sc.log.WithFields(log.Fields{"hookTasks": hookTasks}).Debug("tasks from hooks")
sc.log.WithValues("hookTasks", hookTasks).V(1).Info("tasks from hooks")
tasks := resourceTasks
tasks = append(tasks, hookTasks...)
@ -565,7 +570,7 @@ func (sc *syncContext) getSyncTasks() (_ syncTasks, successful bool) {
if apierr.IsNotFound(err) &&
((task.targetObj != nil && resourceutil.HasAnnotationOption(task.targetObj, common.AnnotationSyncOptions, common.SyncOptionSkipDryRunOnMissingResource)) ||
sc.hasCRDOfGroupKind(task.group(), task.kind())) {
sc.log.WithFields(log.Fields{"task": task}).Debug("skip dry-run for custom resource")
sc.log.WithValues("task", task).V(1).Info("Skip dry-run for custom resource")
task.skipDryRun = true
} else {
sc.setResourceResult(task, common.ResultCodeSyncFailed, "", err.Error())
@ -604,14 +609,14 @@ func (sc *syncContext) autoCreateNamespace(tasks syncTasks) syncTasks {
nsSpec := &v1.Namespace{TypeMeta: metav1.TypeMeta{APIVersion: "v1", Kind: kube.NamespaceKind}, ObjectMeta: metav1.ObjectMeta{Name: sc.namespace}}
unstructuredObj, err := kube.ToUnstructured(nsSpec)
if err == nil {
liveObj, err := sc.kubectl.GetResource(context.TODO(), sc.config, unstructuredObj.GroupVersionKind(), unstructuredObj.GetName(), "")
liveObj, err := sc.kubectl.GetResource(context.TODO(), sc.config, unstructuredObj.GroupVersionKind(), unstructuredObj.GetName(), metav1.NamespaceNone)
if err == nil {
nsTask := &syncTask{phase: common.SyncPhasePreSync, targetObj: unstructuredObj, liveObj: liveObj}
_, ok := sc.syncRes[nsTask.resultKey()]
if ok {
tasks = append(tasks, nsTask)
} else {
sc.log.Infof("Namespace %s is already existed.", sc.namespace)
sc.log.WithValues("namespace", sc.namespace).Info("Namespace already exists")
liveObjCopy := liveObj.DeepCopy()
if sc.namespaceModifier(liveObjCopy) {
tasks = append(tasks, &syncTask{phase: common.SyncPhasePreSync, targetObj: liveObjCopy, liveObj: liveObj})
@ -666,7 +671,7 @@ func (sc *syncContext) liveObj(obj *unstructured.Unstructured) *unstructured.Uns
func (sc *syncContext) setOperationPhase(phase common.OperationPhase, message string) {
if sc.phase != phase || sc.message != message {
sc.log.Infof("Updating operation state. phase: %s -> %s, message: '%s' -> '%s'", sc.phase, phase, sc.message, message)
sc.log.Info(fmt.Sprintf("Updating operation state. phase: %s -> %s, message: '%s' -> '%s'", sc.phase, phase, sc.message, message))
}
sc.phase = phase
sc.message = message
@ -765,7 +770,7 @@ func (sc *syncContext) hasCRDOfGroupKind(group string, kind string) bool {
// terminate looks for any running jobs/workflow hooks and deletes the resource
func (sc *syncContext) Terminate() {
terminateSuccessful := true
sc.log.Debug("terminating")
sc.log.V(1).Info("terminating")
tasks, _ := sc.getSyncTasks()
for _, task := range tasks {
if !task.isHook() || task.liveObj == nil {
@ -796,7 +801,7 @@ func (sc *syncContext) Terminate() {
}
func (sc *syncContext) deleteResource(task *syncTask) error {
sc.log.WithFields(log.Fields{"task": task}).Debug("deleting resource")
sc.log.WithValues("task", task).V(1).Info("Deleting resource")
resIf, err := sc.getResourceIf(task)
if err != nil {
return err
@ -834,7 +839,7 @@ const (
func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
dryRun = dryRun || sc.dryRun
sc.log.WithFields(log.Fields{"numTasks": len(tasks), "dryRun": dryRun}).Debug("running tasks")
sc.log.WithValues("numTasks", len(tasks), "dryRun", dryRun).V(1).Info("Running tasks")
state := successful
var createTasks syncTasks
@ -853,12 +858,12 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
for _, task := range pruneTasks {
t := task
ss.Go(func(state runState) runState {
logCtx := sc.log.WithFields(log.Fields{"dryRun": dryRun, "task": t})
logCtx.Debug("pruning")
logCtx := sc.log.WithValues("dryRun", dryRun, "task", t)
logCtx.V(1).Info("Pruning")
result, message := sc.pruneObject(t.liveObj, sc.prune, dryRun)
if result == common.ResultCodeSyncFailed {
state = failed
logCtx.WithField("message", message).Info("pruning failed")
logCtx.WithValues("message", message).Info("Pruning failed")
}
if !dryRun || sc.dryRun || result == common.ResultCodeSyncFailed {
sc.setResourceResult(t, result, operationPhases[result], message)
@ -880,7 +885,7 @@ func (sc *syncContext) runTasks(tasks syncTasks, dryRun bool) runState {
for _, task := range hooksPendingDeletion {
t := task
ss.Go(func(state runState) runState {
sc.log.WithFields(log.Fields{"dryRun": dryRun, "task": t}).Debug("deleting")
sc.log.WithValues("dryRun", dryRun, "task", t).V(1).Info("Deleting")
if !dryRun {
err := sc.deleteResource(t)
if err != nil {
@ -931,12 +936,12 @@ func (sc *syncContext) processCreateTasks(state runState, tasks syncTasks, dryRu
}
t := task
ss.Go(func(state runState) runState {
logCtx := sc.log.WithFields(log.Fields{"dryRun": dryRun, "task": t})
logCtx.Debug("applying")
logCtx := sc.log.WithValues("dryRun", dryRun, "task", t)
logCtx.V(1).Info("Applying")
validate := sc.validate && !resourceutil.HasAnnotationOption(t.targetObj, common.AnnotationSyncOptions, common.SyncOptionsDisableValidation)
result, message := sc.applyObject(t.targetObj, dryRun, sc.force, validate)
if result == common.ResultCodeSyncFailed {
logCtx.WithField("message", message).Info("apply failed")
logCtx.WithValues("message", message).Info("Apply failed")
state = failed
}
if !dryRun || sc.dryRun || result == common.ResultCodeSyncFailed {
@ -971,22 +976,22 @@ func (sc *syncContext) setResourceResult(task *syncTask, syncStatus common.Resul
SyncPhase: task.phase,
}
logCtx := sc.log.WithFields(log.Fields{"namespace": task.namespace(), "kind": task.kind(), "name": task.name(), "phase": task.phase})
logCtx := sc.log.WithValues("namespace", task.namespace(), "kind", task.kind(), "name", task.name(), "phase", task.phase)
if ok {
// update existing value
if res.Status != existing.Status || res.HookPhase != existing.HookPhase || res.Message != existing.Message {
logCtx.Infof("updating resource result, status: '%s' -> '%s', phase '%s' -> '%s', message '%s' -> '%s'",
logCtx.Info(fmt.Sprintf("Updating resource result, status: '%s' -> '%s', phase '%s' -> '%s', message '%s' -> '%s'",
existing.Status, res.Status,
existing.HookPhase, res.HookPhase,
existing.Message, res.Message)
existing.Message, res.Message))
existing.Status = res.Status
existing.HookPhase = res.HookPhase
existing.Message = res.Message
}
sc.syncRes[task.resultKey()] = existing
} else {
logCtx.Infof("adding resource result, status: '%s', phase: '%s', message: '%s'", res.Status, res.HookPhase, res.Message)
logCtx.Info(fmt.Sprintf("Adding resource result, status: '%s', phase: '%s', message: '%s'", res.Status, res.HookPhase, res.Message))
res.Order = len(sc.syncRes) + 1
sc.syncRes[task.resultKey()] = res
}

View file

@ -11,7 +11,6 @@ import (
"github.com/argoproj/gitops-engine/pkg/utils/kube/kubetest"
. "github.com/argoproj/gitops-engine/pkg/utils/testing"
testingutils "github.com/argoproj/gitops-engine/pkg/utils/testing"
log "github.com/sirupsen/logrus"
"github.com/stretchr/testify/assert"
corev1 "k8s.io/api/core/v1"
v1 "k8s.io/apimachinery/pkg/apis/meta/v1"
@ -21,6 +20,7 @@ import (
"k8s.io/client-go/dynamic/fake"
"k8s.io/client-go/rest"
testcore "k8s.io/client-go/testing"
"k8s.io/klog/v2/klogr"
)
func newTestSyncCtx(opts ...SyncOpt) *syncContext {
@ -46,7 +46,7 @@ func newTestSyncCtx(opts ...SyncOpt) *syncContext {
namespace: FakeArgoCDNamespace,
revision: "FooBarBaz",
disco: fakeDisco,
log: log.WithFields(log.Fields{"application": "fake-app"}),
log: klogr.New().WithValues("application", "fake-app"),
resources: map[kube.ResourceKey]reconciledResource{},
syncRes: map[string]synccommon.ResourceSyncResult{},
validate: true,
@ -872,7 +872,7 @@ func Test_syncContext_hasCRDOfGroupKind(t *testing.T) {
}
func Test_setRunningPhase_healthyState(t *testing.T) {
sc := syncContext{log: log.WithFields(log.Fields{"application": "fake-app"})}
sc := syncContext{log: klogr.New().WithValues("application", "fake-app")}
sc.setRunningPhase([]*syncTask{{targetObj: NewPod()}, {targetObj: NewPod()}, {targetObj: NewPod()}}, false)
@ -880,7 +880,7 @@ func Test_setRunningPhase_healthyState(t *testing.T) {
}
func Test_setRunningPhase_runningHooks(t *testing.T) {
sc := syncContext{log: log.WithFields(log.Fields{"application": "fake-app"})}
sc := syncContext{log: klogr.New().WithValues("application", "fake-app")}
sc.setRunningPhase([]*syncTask{{targetObj: newHook(synccommon.HookTypeSyncFail)}}, false)
@ -888,7 +888,7 @@ func Test_setRunningPhase_runningHooks(t *testing.T) {
}
func Test_setRunningPhase_pendingDeletion(t *testing.T) {
sc := syncContext{log: log.WithFields(log.Fields{"application": "fake-app"})}
sc := syncContext{log: klogr.New().WithValues("application", "fake-app")}
sc.setRunningPhase([]*syncTask{{targetObj: NewPod()}, {targetObj: NewPod()}, {targetObj: NewPod()}}, true)

View file

@ -1,59 +0,0 @@
package errors
import (
"os"
log "github.com/sirupsen/logrus"
)
const (
// ErrorCommandSpecific is reserved for command specific indications
ErrorCommandSpecific = 1
// ErrorConnectionFailure is returned on connection failure to API endpoint
ErrorConnectionFailure = 11
// ErrorAPIResponse is returned on unexpected API response, i.e. authorization failure
ErrorAPIResponse = 12
// ErrorResourceDoesNotExist is returned when the requested resource does not exist
ErrorResourceDoesNotExist = 13
// ErrorGeneric is returned for generic error
ErrorGeneric = 20
)
// CheckError logs a fatal message and exits with ErrorGeneric if err is not nil
func CheckError(err error) {
if err != nil {
Fatal(ErrorGeneric, err)
}
}
// CheckErrorWithCode is a convenience function to exit if an error is non-nil and exit if it was
func CheckErrorWithCode(err error, exitcode int) {
if err != nil {
Fatal(exitcode, err)
}
}
// FailOnErr panics if there is an error. It returns the first value so you can use it if you cast it:
// text := FailOrErr(Foo)).(string)
func FailOnErr(v interface{}, err error) interface{} {
CheckError(err)
return v
}
// Fatal is a wrapper for logrus.Fatal() to exit with custom code
func Fatal(exitcode int, args ...interface{}) {
exitfunc := func() {
os.Exit(exitcode)
}
log.RegisterExitHandler(exitfunc)
log.Fatal(args...)
}
// Fatalf is a wrapper for logrus.Fatalf() to exit with custom code
func Fatalf(exitcode int, format string, args ...interface{}) {
exitfunc := func() {
os.Exit(exitcode)
}
log.RegisterExitHandler(exitfunc)
log.Fatalf(format, args...)
}

View file

@ -2,15 +2,10 @@ package io
import (
"os"
log "github.com/sirupsen/logrus"
)
var (
TempDir string
NopCloser = NewCloser(func() error {
return nil
})
TempDir string
)
func init() {
@ -27,27 +22,3 @@ func DeleteFile(path string) {
}
_ = os.Remove(path)
}
type Closer interface {
Close() error
}
type inlineCloser struct {
close func() error
}
func (c *inlineCloser) Close() error {
return c.close()
}
func NewCloser(close func() error) Closer {
return &inlineCloser{close: close}
}
// Close is a convenience function to close a object that has a Close() method, ignoring any errors
// Used to satisfy errcheck lint
func Close(c Closer) {
if err := c.Close(); err != nil {
log.Warnf("failed to close %v: %v", c, err)
}
}

View file

@ -7,11 +7,9 @@ import (
"errors"
"fmt"
"io/ioutil"
"os/exec"
"regexp"
"strings"
log "github.com/sirupsen/logrus"
"github.com/go-logr/logr"
"golang.org/x/sync/errgroup"
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/apimachinery/pkg/apis/meta/v1/unstructured"
@ -34,6 +32,10 @@ import (
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
)
type CleanupFunc func()
type OnKubectlRunFunc func(command string) (CleanupFunc, error)
type Kubectl interface {
ApplyResource(ctx context.Context, config *rest.Config, obj *unstructured.Unstructured, namespace string, dryRunStrategy cmdutil.DryRunStrategy, force, validate bool) (string, error)
ConvertToVersion(obj *unstructured.Unstructured, group, version string) (*unstructured.Unstructured, error)
@ -44,11 +46,13 @@ type Kubectl interface {
GetAPIGroups(config *rest.Config) ([]metav1.APIGroup, error)
GetServerVersion(config *rest.Config) (string, error)
NewDynamicClient(config *rest.Config) (dynamic.Interface, error)
SetOnKubectlRun(onKubectlRun func(command string) (io.Closer, error))
SetOnKubectlRun(onKubectlRun OnKubectlRunFunc)
}
type KubectlCmd struct {
OnKubectlRun func(command string) (io.Closer, error)
Log logr.Logger
Tracer tracing.Tracer
OnKubectlRun OnKubectlRunFunc
}
type APIResourceInfo struct {
@ -59,7 +63,7 @@ type APIResourceInfo struct {
type filterFunc func(apiResource *metav1.APIResource) bool
func filterAPIResources(config *rest.Config, resourceFilter ResourceFilter, filter filterFunc) ([]APIResourceInfo, error) {
func (k *KubectlCmd) filterAPIResources(config *rest.Config, resourceFilter ResourceFilter, filter filterFunc) ([]APIResourceInfo, error) {
disco, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
return nil, err
@ -70,7 +74,7 @@ func filterAPIResources(config *rest.Config, resourceFilter ResourceFilter, filt
if len(serverResources) == 0 {
return nil, err
}
log.Warnf("Partial success when performing preferred resource discovery: %v", err)
k.Log.Error(err, "Partial success when performing preferred resource discovery")
}
apiResIfs := make([]APIResourceInfo, 0)
for _, apiResourcesList := range serverResources {
@ -125,9 +129,9 @@ func (k *KubectlCmd) GetAPIGroups(config *rest.Config) ([]metav1.APIGroup, error
}
func (k *KubectlCmd) GetAPIResources(config *rest.Config, resourceFilter ResourceFilter) ([]APIResourceInfo, error) {
span := tracing.StartSpan("GetAPIResources")
span := k.Tracer.StartSpan("GetAPIResources")
defer span.Finish()
apiResIfs, err := filterAPIResources(config, resourceFilter, func(apiResource *metav1.APIResource) bool {
apiResIfs, err := k.filterAPIResources(config, resourceFilter, func(apiResource *metav1.APIResource) bool {
return isSupportedVerb(apiResource, listVerb) && isSupportedVerb(apiResource, watchVerb)
})
if err != nil {
@ -138,7 +142,7 @@ func (k *KubectlCmd) GetAPIResources(config *rest.Config, resourceFilter Resourc
// GetResource returns resource
func (k *KubectlCmd) GetResource(ctx context.Context, config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string) (*unstructured.Unstructured, error) {
span := tracing.StartSpan("GetResource")
span := k.Tracer.StartSpan("GetResource")
span.SetBaggageItem("kind", gvk.Kind)
span.SetBaggageItem("name", name)
defer span.Finish()
@ -161,7 +165,7 @@ func (k *KubectlCmd) GetResource(ctx context.Context, config *rest.Config, gvk s
// PatchResource patches resource
func (k *KubectlCmd) PatchResource(ctx context.Context, config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string, patchType types.PatchType, patchBytes []byte) (*unstructured.Unstructured, error) {
span := tracing.StartSpan("PatchResource")
span := k.Tracer.StartSpan("PatchResource")
span.SetBaggageItem("kind", gvk.Kind)
span.SetBaggageItem("name", name)
defer span.Finish()
@ -184,7 +188,7 @@ func (k *KubectlCmd) PatchResource(ctx context.Context, config *rest.Config, gvk
// DeleteResource deletes resource
func (k *KubectlCmd) DeleteResource(ctx context.Context, config *rest.Config, gvk schema.GroupVersionKind, name string, namespace string, forceDelete bool) error {
span := tracing.StartSpan("DeleteResource")
span := k.Tracer.StartSpan("DeleteResource")
span.SetBaggageItem("kind", gvk.Kind)
span.SetBaggageItem("name", name)
defer span.Finish()
@ -215,11 +219,11 @@ func (k *KubectlCmd) DeleteResource(ctx context.Context, config *rest.Config, gv
// ApplyResource performs an apply of a unstructured resource
func (k *KubectlCmd) ApplyResource(ctx context.Context, config *rest.Config, obj *unstructured.Unstructured, namespace string, dryRunStrategy cmdutil.DryRunStrategy, force, validate bool) (string, error) {
span := tracing.StartSpan("ApplyResource")
span := k.Tracer.StartSpan("ApplyResource")
span.SetBaggageItem("kind", obj.GetKind())
span.SetBaggageItem("name", obj.GetName())
defer span.Finish()
log.Infof("Applying resource %s/%s in cluster: %s, namespace: %s", obj.GetKind(), obj.GetName(), config.Host, namespace)
k.Log.Info(fmt.Sprintf("Applying resource %s/%s in cluster: %s, namespace: %s", obj.GetKind(), obj.GetName(), config.Host, namespace))
f, err := ioutil.TempFile(io.TempDir, "")
if err != nil {
return "", fmt.Errorf("Failed to generate temp file for kubeconfig: %v", err)
@ -247,7 +251,7 @@ func (k *KubectlCmd) ApplyResource(ctx context.Context, config *rest.Config, obj
defer io.DeleteFile(manifestFile.Name())
// log manifest
if log.IsLevelEnabled(log.DebugLevel) {
if k.Log.V(1).Enabled() {
var obj unstructured.Unstructured
err := json.Unmarshal(manifestBytes, &obj)
if err != nil {
@ -261,7 +265,7 @@ func (k *KubectlCmd) ApplyResource(ctx context.Context, config *rest.Config, obj
if err != nil {
return "", err
}
log.Debug(string(redactedBytes))
k.Log.V(1).Info(string(redactedBytes))
}
var out []string
@ -270,12 +274,14 @@ func (k *KubectlCmd) ApplyResource(ctx context.Context, config *rest.Config, obj
// `kubectl apply`, which cannot tolerate changes in roleRef, which is an immutable field.
// See: https://github.com/kubernetes/kubernetes/issues/66353
// `auth reconcile` will delete and recreate the resource if necessary
closer, err := k.processKubectlRun("auth")
if err != nil {
return "", err
}
outReconcile, err := k.authReconcile(ctx, config, f.Name(), manifestFile.Name(), namespace, dryRunStrategy)
io.Close(closer)
outReconcile, err := func() (string, error) {
cleanup, err := k.processKubectlRun("auth")
if err != nil {
return "", err
}
defer cleanup()
return k.authReconcile(ctx, config, f.Name(), manifestFile.Name(), namespace, dryRunStrategy)
}()
if err != nil {
return "", err
}
@ -284,11 +290,11 @@ func (k *KubectlCmd) ApplyResource(ctx context.Context, config *rest.Config, obj
// last-applied-configuration annotation in the object.
}
closer, err := k.processKubectlRun("apply")
cleanup, err := k.processKubectlRun("apply")
if err != nil {
return "", err
}
defer io.Close(closer)
defer cleanup()
// Run kubectl apply
fact, ioStreams := kubeCmdFactory(f.Name(), namespace)
@ -441,30 +447,9 @@ func (k *KubectlCmd) authReconcile(ctx context.Context, config *rest.Config, kub
return strings.Join(out, ". "), nil
}
func Version() (string, error) {
span := tracing.StartSpan("Version")
defer span.Finish()
cmd := exec.Command("kubectl", "version", "--client")
out, err := cmd.CombinedOutput()
if err != nil {
return "", fmt.Errorf("could not get kubectl version: %s", err)
}
re := regexp.MustCompile(`GitVersion:"([a-zA-Z0-9\.\-]+)"`)
matches := re.FindStringSubmatch(string(out))
if len(matches) != 2 {
return "", errors.New("could not get kubectl version")
}
version := matches[1]
if version[0] != 'v' {
version = "v" + version
}
return strings.TrimSpace(version), nil
}
// ConvertToVersion converts an unstructured object into the specified group/version
func (k *KubectlCmd) ConvertToVersion(obj *unstructured.Unstructured, group string, version string) (*unstructured.Unstructured, error) {
span := tracing.StartSpan("ConvertToVersion")
span := k.Tracer.StartSpan("ConvertToVersion")
from := obj.GroupVersionKind().GroupVersion()
span.SetBaggageItem("from", from.String())
span.SetBaggageItem("to", schema.GroupVersion{Group: group, Version: version}.String())
@ -476,7 +461,7 @@ func (k *KubectlCmd) ConvertToVersion(obj *unstructured.Unstructured, group stri
}
func (k *KubectlCmd) GetServerVersion(config *rest.Config) (string, error) {
span := tracing.StartSpan("GetServerVersion")
span := k.Tracer.StartSpan("GetServerVersion")
defer span.Finish()
client, err := discovery.NewDiscoveryClientForConfig(config)
if err != nil {
@ -493,17 +478,14 @@ func (k *KubectlCmd) NewDynamicClient(config *rest.Config) (dynamic.Interface, e
return dynamic.NewForConfig(config)
}
func (k *KubectlCmd) processKubectlRun(cmd string) (io.Closer, error) {
func (k *KubectlCmd) processKubectlRun(cmd string) (CleanupFunc, error) {
if k.OnKubectlRun != nil {
return k.OnKubectlRun(cmd)
}
return io.NewCloser(func() error {
return nil
// do nothing
}), nil
return func() {}, nil
}
func (k *KubectlCmd) SetOnKubectlRun(onKubectlRun func(command string) (io.Closer, error)) {
func (k *KubectlCmd) SetOnKubectlRun(onKubectlRun OnKubectlRunFunc) {
k.OnKubectlRun = onKubectlRun
}

View file

@ -1,12 +1,13 @@
package kube
import (
"regexp"
"testing"
testingutils "github.com/argoproj/gitops-engine/pkg/utils/testing"
"github.com/stretchr/testify/assert"
"k8s.io/klog/v2/klogr"
testingutils "github.com/argoproj/gitops-engine/pkg/utils/testing"
"github.com/argoproj/gitops-engine/pkg/utils/tracing"
)
var (
@ -14,7 +15,10 @@ var (
)
func TestConvertToVersion(t *testing.T) {
kubectl := KubectlCmd{}
kubectl := KubectlCmd{
Log: klogr.New(),
Tracer: tracing.NopTracer{},
}
t.Run("AppsDeployment", func(t *testing.T) {
newObj, err := kubectl.ConvertToVersion(testingutils.UnstructuredFromFile("testdata/appsdeployment.yaml"), "extensions", "v1beta1")
if assert.NoError(t, err) {
@ -55,11 +59,3 @@ func TestConvertToVersion(t *testing.T) {
}
})
}
func TestVersion(t *testing.T) {
ver, err := Version()
assert.NoError(t, err)
SemverRegexValidation := `^v(0|[1-9]\d*)\.(0|[1-9]\d*)\.(0|[1-9]\d*)(-(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*)(\.(0|[1-9]\d*|\d*[a-zA-Z-][0-9a-zA-Z-]*))*)?(\+[0-9a-zA-Z-]+(\.[0-9a-zA-Z-]+)*)?$`
re := regexp.MustCompile(SemverRegexValidation)
assert.True(t, re.MatchString(ver))
}

View file

@ -10,7 +10,8 @@ import (
"strings"
"time"
log "github.com/sirupsen/logrus"
"github.com/go-logr/logr"
v1 "k8s.io/api/core/v1"
"k8s.io/apimachinery/pkg/api/equality"
apierr "k8s.io/apimachinery/pkg/api/errors"
@ -178,7 +179,6 @@ func ServerResourceForGroupVersionKind(disco discovery.DiscoveryInterface, gvk s
}
for _, r := range resources.APIResources {
if r.Kind == gvk.Kind {
log.Debugf("Chose API '%s' for %s", r.Name, gvk)
return &r, nil
}
}
@ -370,19 +370,19 @@ func GetDeploymentReplicas(u *unstructured.Unstructured) *int64 {
}
// RetryUntilSucceed keep retrying given action with specified interval until action succeed or specified context is done.
func RetryUntilSucceed(ctx context.Context, interval time.Duration, desc string, action func() error) {
func RetryUntilSucceed(ctx context.Context, interval time.Duration, desc string, log logr.Logger, action func() error) {
pollErr := wait.PollImmediateUntil(interval, func() (bool /*done*/, error) {
log.Debugf("Start %s", desc)
log.V(1).Info("Start %s", desc)
err := action()
if err == nil {
log.Debugf("Completed %s", desc)
log.V(1).Info("Completed %s", desc)
return true, nil
}
log.Debugf("Failed to %s: %+v, retrying in %v", desc, err, interval)
log.V(1).Info("Failed to %s: %+v, retrying in %v", desc, err, interval)
return false, nil
}, ctx.Done())
if pollErr != nil {
// The only error that can happen here is wait.ErrWaitTimeout if ctx is done.
log.Debugf("Stop retrying %s", desc)
log.V(1).Info("Stop retrying %s", desc)
}
}

View file

@ -13,7 +13,6 @@ import (
"k8s.io/client-go/rest"
cmdutil "k8s.io/kubectl/pkg/cmd/util"
"github.com/argoproj/gitops-engine/pkg/utils/io"
"github.com/argoproj/gitops-engine/pkg/utils/kube"
)
@ -92,5 +91,5 @@ func (k *MockKubectlCmd) GetAPIGroups(config *rest.Config) ([]metav1.APIGroup, e
return k.APIGroups, nil
}
func (k *MockKubectlCmd) SetOnKubectlRun(onKubectlRun func(command string) (io.Closer, error)) {
func (k *MockKubectlCmd) SetOnKubectlRun(onKubectlRun kube.OnKubectlRunFunc) {
}

16
pkg/utils/tracing/api.go Normal file
View file

@ -0,0 +1,16 @@
package tracing
/*
Poor Mans OpenTracing.
Standardizes logging of operation duration.
*/
type Tracer interface {
StartSpan(operationName string) Span
}
type Span interface {
SetBaggageItem(key string, value interface{})
Finish()
}

View file

@ -0,0 +1,56 @@
package tracing
import (
"time"
"github.com/go-logr/logr"
)
var (
_ Tracer = LoggingTracer{}
_ Span = loggingSpan{}
)
type LoggingTracer struct {
logger logr.Logger
}
func NewLoggingTracer(logger logr.Logger) *LoggingTracer {
return &LoggingTracer{
logger: logger,
}
}
func (l LoggingTracer) StartSpan(operationName string) Span {
return loggingSpan{
logger: l.logger,
operationName: operationName,
baggage: make(map[string]interface{}),
start: time.Now(),
}
}
type loggingSpan struct {
logger logr.Logger
operationName string
baggage map[string]interface{}
start time.Time
}
func (s loggingSpan) Finish() {
s.logger.WithValues(baggageToVals(s.baggage)...).
WithValues("operation_name", s.operationName, "time_ms", time.Since(s.start).Seconds()*1e3).
Info("Trace")
}
func (s loggingSpan) SetBaggageItem(key string, value interface{}) {
s.baggage[key] = value
}
func baggageToVals(baggage map[string]interface{}) []interface{} {
result := make([]interface{}, 0, len(baggage)*2)
for k, v := range baggage {
result = append(result, k, v)
}
return result
}

View file

@ -0,0 +1,25 @@
package tracing
import (
"testing"
"github.com/golang/mock/gomock"
"github.com/argoproj/gitops-engine/pkg/utils/tracing/tracer_testing"
)
func TestLoggingTracer(t *testing.T) {
c := gomock.NewController(t)
l := tracer_testing.NewMockLogger(c)
gomock.InOrder(
l.EXPECT().WithValues("my-key", "my-value").Return(l),
l.EXPECT().WithValues("operation_name", "my-operation", "time_ms", gomock.Any()).Return(l),
l.EXPECT().Info("Trace"),
)
tr := NewLoggingTracer(l)
span := tr.StartSpan("my-operation")
span.SetBaggageItem("my-key", "my-value")
span.Finish()
}

22
pkg/utils/tracing/nop.go Normal file
View file

@ -0,0 +1,22 @@
package tracing
var (
_ Tracer = NopTracer{}
_ Span = nopSpan{}
)
type NopTracer struct {
}
func (n NopTracer) StartSpan(operationName string) Span {
return nopSpan{}
}
type nopSpan struct {
}
func (n nopSpan) SetBaggageItem(key string, value interface{}) {
}
func (n nopSpan) Finish() {
}

View file

@ -1,44 +0,0 @@
package tracing
import (
"os"
"time"
log "github.com/sirupsen/logrus"
)
/*
Poor Mans OpenTracing.
Standardizes logging of operation duration.
*/
var enabled = false
var logger = log.New()
func init() {
enabled = os.Getenv("ARGOCD_TRACING_ENABLED") == "1"
}
type Span struct {
operationName string
baggage map[string]interface{}
start time.Time
}
func (s Span) Finish() {
if enabled {
logger.WithFields(s.baggage).
WithField("operation_name", s.operationName).
WithField("time_ms", time.Since(s.start).Seconds()*1e3).
Info()
}
}
func (s Span) SetBaggageItem(key string, value interface{}) {
s.baggage[key] = value
}
func StartSpan(operationName string) Span {
return Span{operationName, make(map[string]interface{}), time.Now()}
}

View file

@ -1,42 +0,0 @@
package tracing
import (
"testing"
log "github.com/sirupsen/logrus"
"github.com/sirupsen/logrus/hooks/test"
"github.com/stretchr/testify/assert"
)
func TestStartSpan(t *testing.T) {
testLogger, hook := test.NewNullLogger()
defer hook.Reset()
logger = testLogger
defer func() { logger = log.New() }()
t.Run("Disabled", func(t *testing.T) {
span := StartSpan("my-operation")
span.SetBaggageItem("my-key", "my-value")
span.Finish()
assert.Empty(t, hook.Entries)
})
hook.Reset()
t.Run("Enabled", func(t *testing.T) {
enabled = true
defer func() { enabled = false }()
span := StartSpan("my-operation")
span.SetBaggageItem("my-key", "my-value")
span.Finish()
e := hook.LastEntry()
if assert.NotNil(t, e) {
assert.Empty(t, e.Message)
assert.Equal(t, "my-operation", e.Data["operation_name"])
assert.Equal(t, "my-value", e.Data["my-key"])
assert.Contains(t, e.Data, "time_ms")
}
})
}

View file

@ -0,0 +1,3 @@
package tracer_testing
//go:generate go run github.com/golang/mock/mockgen -destination "logger.go" -package "tracer_testing" "github.com/go-logr/logr" "Logger"

View file

@ -0,0 +1,128 @@
// Code generated by MockGen. DO NOT EDIT.
// Source: github.com/go-logr/logr (interfaces: Logger)
// Package tracer_testing is a generated GoMock package.
package tracer_testing
import (
logr "github.com/go-logr/logr"
gomock "github.com/golang/mock/gomock"
reflect "reflect"
)
// MockLogger is a mock of Logger interface
type MockLogger struct {
ctrl *gomock.Controller
recorder *MockLoggerMockRecorder
}
// MockLoggerMockRecorder is the mock recorder for MockLogger
type MockLoggerMockRecorder struct {
mock *MockLogger
}
// NewMockLogger creates a new mock instance
func NewMockLogger(ctrl *gomock.Controller) *MockLogger {
mock := &MockLogger{ctrl: ctrl}
mock.recorder = &MockLoggerMockRecorder{mock}
return mock
}
// EXPECT returns an object that allows the caller to indicate expected use
func (m *MockLogger) EXPECT() *MockLoggerMockRecorder {
return m.recorder
}
// Enabled mocks base method
func (m *MockLogger) Enabled() bool {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "Enabled")
ret0, _ := ret[0].(bool)
return ret0
}
// Enabled indicates an expected call of Enabled
func (mr *MockLoggerMockRecorder) Enabled() *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Enabled", reflect.TypeOf((*MockLogger)(nil).Enabled))
}
// Error mocks base method
func (m *MockLogger) Error(arg0 error, arg1 string, arg2 ...interface{}) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0, arg1}
for _, a := range arg2 {
varargs = append(varargs, a)
}
m.ctrl.Call(m, "Error", varargs...)
}
// Error indicates an expected call of Error
func (mr *MockLoggerMockRecorder) Error(arg0, arg1 interface{}, arg2 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0, arg1}, arg2...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Error", reflect.TypeOf((*MockLogger)(nil).Error), varargs...)
}
// Info mocks base method
func (m *MockLogger) Info(arg0 string, arg1 ...interface{}) {
m.ctrl.T.Helper()
varargs := []interface{}{arg0}
for _, a := range arg1 {
varargs = append(varargs, a)
}
m.ctrl.Call(m, "Info", varargs...)
}
// Info indicates an expected call of Info
func (mr *MockLoggerMockRecorder) Info(arg0 interface{}, arg1 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
varargs := append([]interface{}{arg0}, arg1...)
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "Info", reflect.TypeOf((*MockLogger)(nil).Info), varargs...)
}
// V mocks base method
func (m *MockLogger) V(arg0 int) logr.Logger {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "V", arg0)
ret0, _ := ret[0].(logr.Logger)
return ret0
}
// V indicates an expected call of V
func (mr *MockLoggerMockRecorder) V(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "V", reflect.TypeOf((*MockLogger)(nil).V), arg0)
}
// WithName mocks base method
func (m *MockLogger) WithName(arg0 string) logr.Logger {
m.ctrl.T.Helper()
ret := m.ctrl.Call(m, "WithName", arg0)
ret0, _ := ret[0].(logr.Logger)
return ret0
}
// WithName indicates an expected call of WithName
func (mr *MockLoggerMockRecorder) WithName(arg0 interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithName", reflect.TypeOf((*MockLogger)(nil).WithName), arg0)
}
// WithValues mocks base method
func (m *MockLogger) WithValues(arg0 ...interface{}) logr.Logger {
m.ctrl.T.Helper()
varargs := []interface{}{}
for _, a := range arg0 {
varargs = append(varargs, a)
}
ret := m.ctrl.Call(m, "WithValues", varargs...)
ret0, _ := ret[0].(logr.Logger)
return ret0
}
// WithValues indicates an expected call of WithValues
func (mr *MockLoggerMockRecorder) WithValues(arg0 ...interface{}) *gomock.Call {
mr.mock.ctrl.T.Helper()
return mr.mock.ctrl.RecordCallWithMethodType(mr.mock, "WithValues", reflect.TypeOf((*MockLogger)(nil).WithValues), arg0...)
}