diff --git a/cmd/argocd-application-controller/commands/argocd_application_controller.go b/cmd/argocd-application-controller/commands/argocd_application_controller.go index a7c7f92fab..86c5721cae 100644 --- a/cmd/argocd-application-controller/commands/argocd_application_controller.go +++ b/cmd/argocd-application-controller/commands/argocd_application_controller.go @@ -220,7 +220,7 @@ func NewCommand() *cobra.Command { command.Flags().StringSliceVar(&otlpAttrs, "otlp-attrs", env.StringsFromEnv("ARGOCD_APPLICATION_CONTROLLER_OTLP_ATTRS", []string{}, ","), "List of OpenTelemetry collector extra attrs when send traces, each attribute is separated by a colon(e.g. key:value)") command.Flags().StringSliceVar(&applicationNamespaces, "application-namespaces", env.StringsFromEnv("ARGOCD_APPLICATION_NAMESPACES", []string{}, ","), "List of additional namespaces that applications are allowed to be reconciled from") command.Flags().BoolVar(&persistResourceHealth, "persist-resource-health", env.ParseBoolFromEnv("ARGOCD_APPLICATION_CONTROLLER_PERSIST_RESOURCE_HEALTH", true), "Enables storing the managed resources health in the Application CRD") - command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] ") + command.Flags().StringVar(&shardingAlgorithm, "sharding-method", env.StringFromEnv(common.EnvControllerShardingAlgorithm, common.DefaultShardingAlgorithm), "Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin, consistent-hashing] ") // global queue rate limit config command.Flags().Int64Var(&workqueueRateLimit.BucketSize, "wq-bucket-size", env.ParseInt64FromEnv("WORKQUEUE_BUCKET_SIZE", 500, 1, math.MaxInt64), "Set Workqueue Rate Limiter Bucket Size, default 500") command.Flags().Float64Var(&workqueueRateLimit.BucketQPS, "wq-bucket-qps", env.ParseFloat64FromEnv("WORKQUEUE_BUCKET_QPS", math.MaxFloat64, 1, math.MaxFloat64), "Set Workqueue Rate Limiter Bucket QPS, default set to MaxFloat64 which disables the bucket limiter") diff --git a/cmd/argocd/commands/admin/cluster.go b/cmd/argocd/commands/admin/cluster.go index 9d70ac3f8c..8c17c8b7be 100644 --- a/cmd/argocd/commands/admin/cluster.go +++ b/cmd/argocd/commands/admin/cluster.go @@ -219,7 +219,7 @@ func NewClusterShardsCommand(clientOpts *argocdclient.ClientOptions) *cobra.Comm clientConfig = cli.AddKubectlFlagsToCmd(&command) command.Flags().IntVar(&shard, "shard", -1, "Cluster shard filter") command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified") - command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] ") + command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] ") command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?") cacheSrc = appstatecache.AddCacheFlagsToCmd(&command) @@ -514,7 +514,7 @@ argocd admin cluster stats target-cluster`, clientConfig = cli.AddKubectlFlagsToCmd(&command) command.Flags().IntVar(&shard, "shard", -1, "Cluster shard filter") command.Flags().IntVar(&replicas, "replicas", 0, "Application controller replicas count. Inferred from number of running controller pods if not specified") - command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] ") + command.Flags().StringVar(&shardingAlgorithm, "sharding-method", common.DefaultShardingAlgorithm, "Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] ") command.Flags().BoolVar(&portForwardRedis, "port-forward-redis", true, "Automatically port-forward ha proxy redis from current namespace?") cacheSrc = appstatecache.AddCacheFlagsToCmd(&command) diff --git a/common/common.go b/common/common.go index 090cd33965..4e68391e1c 100644 --- a/common/common.go +++ b/common/common.go @@ -117,7 +117,13 @@ const ( RoundRobinShardingAlgorithm = "round-robin" // AppControllerHeartbeatUpdateRetryCount is the retry count for updating the Shard Mapping to the Shard Mapping ConfigMap used by Application Controller AppControllerHeartbeatUpdateRetryCount = 3 - DefaultShardingAlgorithm = LegacyShardingAlgorithm + + // ConsistentHashingWithBoundedLoadsAlgorithm uses an algorithm that tries to use an equal distribution across + // all shards but is optimised to handle sharding and/or cluster addition or removal. In case of sharding or + // cluster changes, this algorithm minimises the changes between shard and clusters assignments. + ConsistentHashingWithBoundedLoadsAlgorithm = "consistent-hashing" + + DefaultShardingAlgorithm = LegacyShardingAlgorithm ) // Dex related constants diff --git a/controller/sharding/consistent/consistent.go b/controller/sharding/consistent/consistent.go new file mode 100644 index 0000000000..6d717b0391 --- /dev/null +++ b/controller/sharding/consistent/consistent.go @@ -0,0 +1,274 @@ +// An implementation of Consistent Hashing and +// Consistent Hashing With Bounded Loads. +// +// https://en.wikipedia.org/wiki/Consistent_hashing +// +// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html +package consistent + +import ( + "encoding/binary" + "errors" + "fmt" + "math" + "sync" + "sync/atomic" + + "github.com/google/btree" + + blake2b "github.com/minio/blake2b-simd" +) + +// OptimalExtraCapacityFactor extra factor capacity (1 + ε). The ideal balance +// between keeping the shards uniform while also keeping consistency when +// changing shard numbers. +const OptimalExtraCapacityFactor = 1.25 + +var ErrNoHosts = errors.New("no hosts added") + +type Host struct { + Name string + Load int64 +} + +type Consistent struct { + servers map[uint64]string + clients *btree.BTree + loadMap map[string]*Host + totalLoad int64 + replicationFactor int + + sync.RWMutex +} + +type item struct { + value uint64 +} + +func (i item) Less(than btree.Item) bool { + return i.value < than.(item).value +} + +func New() *Consistent { + return &Consistent{ + servers: map[uint64]string{}, + clients: btree.New(2), + loadMap: map[string]*Host{}, + replicationFactor: 1000, + } +} + +func NewWithReplicationFactor(replicationFactor int) *Consistent { + return &Consistent{ + servers: map[uint64]string{}, + clients: btree.New(2), + loadMap: map[string]*Host{}, + replicationFactor: replicationFactor, + } +} +func (c *Consistent) Add(server string) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; ok { + return + } + + c.loadMap[server] = &Host{Name: server, Load: 0} + for i := 0; i < c.replicationFactor; i++ { + h := c.hash(fmt.Sprintf("%s%d", server, i)) + c.servers[h] = server + c.clients.ReplaceOrInsert(item{h}) + } +} + +// Get returns the server that owns the given client. +// As described in https://en.wikipedia.org/wiki/Consistent_hashing +// It returns ErrNoHosts if the ring has no servers in it. +func (c *Consistent) Get(client string) (string, error) { + c.RLock() + defer c.RUnlock() + + if c.clients.Len() == 0 { + return "", ErrNoHosts + } + + h := c.hash(client) + var foundItem btree.Item + c.clients.AscendGreaterOrEqual(item{h}, func(i btree.Item) bool { + foundItem = i + return false // stop the iteration + }) + + if foundItem == nil { + // If no host found, wrap around to the first one. + foundItem = c.clients.Min() + } + + host := c.servers[foundItem.(item).value] + + return host, nil +} + +// GetLeast returns the least loaded host that can serve the key. +// It uses Consistent Hashing With Bounded loads. +// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html +// It returns ErrNoHosts if the ring has no hosts in it. +func (c *Consistent) GetLeast(client string) (string, error) { + c.RLock() + defer c.RUnlock() + + if c.clients.Len() == 0 { + return "", ErrNoHosts + } + h := c.hash(client) + for { + var foundItem btree.Item + c.clients.AscendGreaterOrEqual(item{h}, func(bItem btree.Item) bool { + if h != bItem.(item).value { + foundItem = bItem + return false // stop the iteration + } + return true + }) + + if foundItem == nil { + // If no host found, wrap around to the first one. + foundItem = c.clients.Min() + } + key := c.clients.Get(foundItem) + if key != nil { + host := c.servers[key.(item).value] + if c.loadOK(host) { + return host, nil + } + h = key.(item).value + } else { + return client, nil + } + } +} + +// Sets the load of `server` to the given `load` +func (c *Consistent) UpdateLoad(server string, load int64) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; !ok { + return + } + c.totalLoad -= c.loadMap[server].Load + c.loadMap[server].Load = load + c.totalLoad += load +} + +// Increments the load of host by 1 +// +// should only be used with if you obtained a host with GetLeast +func (c *Consistent) Inc(server string) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; !ok { + return + } + atomic.AddInt64(&c.loadMap[server].Load, 1) + atomic.AddInt64(&c.totalLoad, 1) +} + +// Decrements the load of host by 1 +// +// should only be used with if you obtained a host with GetLeast +func (c *Consistent) Done(server string) { + c.Lock() + defer c.Unlock() + + if _, ok := c.loadMap[server]; !ok { + return + } + atomic.AddInt64(&c.loadMap[server].Load, -1) + atomic.AddInt64(&c.totalLoad, -1) +} + +// Deletes host from the ring +func (c *Consistent) Remove(server string) bool { + c.Lock() + defer c.Unlock() + + for i := 0; i < c.replicationFactor; i++ { + h := c.hash(fmt.Sprintf("%s%d", server, i)) + delete(c.servers, h) + c.delSlice(h) + } + delete(c.loadMap, server) + return true +} + +// Return the list of servers in the ring +func (c *Consistent) Servers() (servers []string) { + c.RLock() + defer c.RUnlock() + for k := range c.loadMap { + servers = append(servers, k) + } + return servers +} + +// Returns the loads of all the hosts +func (c *Consistent) GetLoads() map[string]int64 { + loads := map[string]int64{} + + for k, v := range c.loadMap { + loads[k] = v.Load + } + return loads +} + +// Returns the maximum load of the single host +// which is: +// (total_load/number_of_hosts)*1.25 +// total_load = is the total number of active requests served by hosts +// for more info: +// https://research.googleblog.com/2017/04/consistent-hashing-with-bounded-loads.html +func (c *Consistent) MaxLoad() int64 { + if c.totalLoad == 0 { + c.totalLoad = 1 + } + var avgLoadPerNode float64 + avgLoadPerNode = float64(c.totalLoad / int64(len(c.loadMap))) + if avgLoadPerNode == 0 { + avgLoadPerNode = 1 + } + avgLoadPerNode = math.Ceil(avgLoadPerNode * OptimalExtraCapacityFactor) + return int64(avgLoadPerNode) +} + +func (c *Consistent) loadOK(server string) bool { + // a safety check if someone performed c.Done more than needed + if c.totalLoad < 0 { + c.totalLoad = 0 + } + + var avgLoadPerNode float64 + avgLoadPerNode = float64((c.totalLoad + 1) / int64(len(c.loadMap))) + if avgLoadPerNode == 0 { + avgLoadPerNode = 1 + } + avgLoadPerNode = math.Ceil(avgLoadPerNode * 1.25) + + bserver, ok := c.loadMap[server] + if !ok { + panic(fmt.Sprintf("given host(%s) not in loadsMap", bserver.Name)) + } + + return float64(bserver.Load)+1 <= avgLoadPerNode +} + +func (c *Consistent) delSlice(val uint64) { + c.clients.Delete(item{val}) +} + +func (c *Consistent) hash(key string) uint64 { + out := blake2b.Sum512([]byte(key)) + return binary.LittleEndian.Uint64(out[:]) +} diff --git a/controller/sharding/sharding.go b/controller/sharding/sharding.go index c415acf0b8..fc09c27cc4 100644 --- a/controller/sharding/sharding.go +++ b/controller/sharding/sharding.go @@ -14,7 +14,9 @@ import ( "encoding/json" "github.com/argoproj/argo-cd/v2/common" + "github.com/argoproj/argo-cd/v2/controller/sharding/consistent" "github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1" + slices "golang.org/x/exp/slices" v1 "k8s.io/api/core/v1" metav1 "k8s.io/apimachinery/pkg/apis/meta/v1" "k8s.io/client-go/kubernetes" @@ -84,6 +86,8 @@ func GetDistributionFunction(clusters clusterAccessor, apps appAccessor, shardin distributionFunction = RoundRobinDistributionFunction(clusters, replicasCount) case common.LegacyShardingAlgorithm: distributionFunction = LegacyDistributionFunction(replicasCount) + case common.ConsistentHashingWithBoundedLoadsAlgorithm: + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(clusters, apps, replicasCount) default: log.Warnf("distribution type %s is not supported, defaulting to %s", shardingAlgorithm, common.DefaultShardingAlgorithm) } @@ -134,7 +138,7 @@ func LegacyDistributionFunction(replicas int) DistributionFunction { func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) DistributionFunction { return func(c *v1alpha1.Cluster) int { if replicas > 0 { - if c == nil { // in-cluster does not necessary have a secret assigned. So we are receiving a nil cluster here. + if c == nil { // in-cluster does not necessarily have a secret assigned. So we are receiving a nil cluster here. return 0 } // if Shard is manually set and the assigned value is lower than the number of replicas, @@ -158,6 +162,92 @@ func RoundRobinDistributionFunction(clusters clusterAccessor, replicas int) Dist } } +// ConsistentHashingWithBoundedLoadsDistributionFunction returns a DistributionFunction using an almost homogeneous distribution algorithm: +// for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm. +// This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of +// clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes. +func ConsistentHashingWithBoundedLoadsDistributionFunction(clusters clusterAccessor, apps appAccessor, replicas int) DistributionFunction { + return func(c *v1alpha1.Cluster) int { + if replicas > 0 { + if c == nil { // in-cluster does not necessarily have a secret assigned. So we are receiving a nil cluster here. + return 0 + } + + // if Shard is manually set and the assigned value is lower than the number of replicas, + // then its value is returned otherwise it is the default calculated value + if c.Shard != nil && int(*c.Shard) < replicas { + return int(*c.Shard) + } else { + // if the cluster is not in the clusters list anymore, we should unassign it from any shard, so we + // return the reserved value of -1 + if !slices.Contains(clusters(), c) { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + return -1 + } + shardIndexedByCluster := createConsistentHashingWithBoundLoads(replicas, clusters, apps) + shard, ok := shardIndexedByCluster[c.ID] + if !ok { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + return -1 + } + log.Debugf("Cluster with id=%s will be processed by shard %d", c.ID, shard) + return shard + } + } + log.Warnf("The number of replicas (%d) is lower than 1", replicas) + return -1 + } +} + +func createConsistentHashingWithBoundLoads(replicas int, getCluster clusterAccessor, getApp appAccessor) map[string]int { + clusters := getSortedClustersList(getCluster) + appDistribution := getAppDistribution(getCluster, getApp) + shardIndexedByCluster := make(map[string]int) + appsIndexedByShard := make(map[string]int64) + consistentHashing := consistent.New() + // Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard + // this happens for clusters that are removed for the clusters list + //consistentHashing.Add("-1") + for i := 0; i < replicas; i++ { + shard := strconv.Itoa(i) + consistentHashing.Add(shard) + appsIndexedByShard[shard] = 0 + } + + for _, c := range clusters { + clusterIndex, err := consistentHashing.GetLeast(c.ID) + if err != nil { + log.Warnf("Cluster with id=%s not found in cluster map.", c.ID) + } + shardIndexedByCluster[c.ID], err = strconv.Atoi(clusterIndex) + if err != nil { + log.Errorf("Consistent Hashing was supposed to return a shard index but it returned %d", err) + } + numApps, ok := appDistribution[c.Server] + if !ok { + numApps = 0 + } + appsIndexedByShard[clusterIndex] += numApps + consistentHashing.UpdateLoad(clusterIndex, appsIndexedByShard[clusterIndex]) + } + + return shardIndexedByCluster +} + +func getAppDistribution(getCluster clusterAccessor, getApps appAccessor) map[string]int64 { + apps := getApps() + clusters := getCluster() + appDistribution := make(map[string]int64, len(clusters)) + + for _, a := range apps { + if _, ok := appDistribution[a.Spec.Destination.Server]; !ok { + appDistribution[a.Spec.Destination.Server] = 0 + } + appDistribution[a.Spec.Destination.Server]++ + } + return appDistribution +} + // NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0 // the function is created for API compatibility purposes and is not supposed to be activated. func NoShardingDistributionFunction() DistributionFunction { diff --git a/controller/sharding/sharding_test.go b/controller/sharding/sharding_test.go index 1c338aac5f..c4eead67ee 100644 --- a/controller/sharding/sharding_test.go +++ b/controller/sharding/sharding_test.go @@ -275,6 +275,110 @@ func TestGetShardByIndexModuloReplicasCountDistributionFunctionWhenClusterIsAdde assert.Equal(t, -1, distributionFunction(&cluster6)) } +func TestConsistentHashingWhenClusterIsAddedAndRemoved(t *testing.T) { + db := dbmocks.ArgoDB{} + clusterCount := 133 + prefix := "cluster" + + clusters := []v1alpha1.Cluster{} + for i := 0; i < clusterCount; i++ { + id := fmt.Sprintf("%06d", i) + cluster := fmt.Sprintf("%s-%s", prefix, id) + clusters = append(clusters, createCluster(cluster, id)) + } + clusterAccessor := getClusterAccessor(clusters) + appAccessor, _, _, _, _, _ := createTestApps() + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + // Test with replicas set to 3 + replicasCount := 3 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) + assert.Equal(t, 0, distributionFunction(nil)) + distributionMap := map[int]int{} + assignementMap := map[string]int{} + for i := 0; i < clusterCount; i++ { + assignedShard := distributionFunction(&clusters[i]) + assignementMap[clusters[i].ID] = assignedShard + distributionMap[assignedShard]++ + + } + + // We check that the distribution does not differ for more than 20% + var sum float64 + sum = 0 + for shard, count := range distributionMap { + if shard != -1 { + sum = (sum + float64(count)) + } + } + average := sum / float64(replicasCount) + failedTests := false + for shard, count := range distributionMap { + if shard != -1 { + if float64(count) > average*float64(1.1) || float64(count) < average*float64(0.9) { + fmt.Printf("Cluster distribution differs for more than 20%%: %d for shard %d (average: %f)\n", count, shard, average) + failedTests = true + } + if failedTests { + t.Fail() + } + } + } + + // Now we will decrease the number of replicas to 2, and we should see only clusters that were attached to shard 2 to be reassigned + replicasCount = 2 + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount) + removedCluster := clusterList.Items[len(clusterList.Items)-1] + for i := 0; i < clusterCount; i++ { + c := &clusters[i] + assignedShard := distributionFunction(c) + prevıouslyAssignedShard := assignementMap[clusters[i].ID] + if prevıouslyAssignedShard != 2 && prevıouslyAssignedShard != assignedShard { + fmt.Printf("Previously assigned %s cluster has moved from replica %d to %d", c.ID, prevıouslyAssignedShard, assignedShard) + t.Fail() + } + } + // Now, we remove the last added cluster, it should be unassigned + removedCluster = clusterList.Items[len(clusterList.Items)-1] + clusterList.Items = clusterList.Items[:len(clusterList.Items)-1] + distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction(getClusterAccessor(clusterList.Items), appAccessor, replicasCount) + assert.Equal(t, -1, distributionFunction(&removedCluster)) +} + +func TestConsistentHashingWhenClusterWithZeroReplicas(t *testing.T) { + db := dbmocks.ArgoDB{} + clusters := []v1alpha1.Cluster{createCluster("cluster-01", "01")} + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + appAccessor, _, _, _, _, _ := createTestApps() + // Test with replicas set to 0 + replicasCount := 0 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) + assert.Equal(t, -1, distributionFunction(nil)) +} + +func TestConsistentHashingWhenClusterWithFixedShard(t *testing.T) { + db := dbmocks.ArgoDB{} + var fixedShard int64 = 1 + cluster := &v1alpha1.Cluster{ID: "1", Shard: &fixedShard} + clusters := []v1alpha1.Cluster{*cluster} + + clusterAccessor := getClusterAccessor(clusters) + clusterList := &v1alpha1.ClusterList{Items: clusters} + db.On("ListClusters", mock.Anything).Return(clusterList, nil) + + // Test with replicas set to 5 + replicasCount := 5 + db.On("GetApplicationControllerReplicas").Return(replicasCount) + appAccessor, _, _, _, _, _ := createTestApps() + distributionFunction := ConsistentHashingWithBoundedLoadsDistributionFunction(clusterAccessor, appAccessor, replicasCount) + assert.Equal(t, fixedShard, int64(distributionFunction(cluster))) + +} + func TestGetShardByIndexModuloReplicasCountDistributionFunction(t *testing.T) { clusters, db, cluster1, cluster2, _, _, _ := createTestClusters() replicasCount := 2 diff --git a/docs/operator-manual/high_availability.md b/docs/operator-manual/high_availability.md index 60ea048ffc..632ac2fb12 100644 --- a/docs/operator-manual/high_availability.md +++ b/docs/operator-manual/high_availability.md @@ -82,10 +82,16 @@ spec: ``` * In order to manually set the cluster's shard number, specify the optional `shard` property when creating a cluster. If not specified, it will be calculated on the fly by the application controller. -* The shard distribution algorithm of the `argocd-application-controller` can be set by using the `--sharding-method` parameter. Supported sharding methods are : [legacy (default), round-robin]. `legacy` mode uses an `uid` based distribution (non-uniform). `round-robin` uses an equal distribution across all shards. The `--sharding-method` parameter can also be overridden by setting the key `controller.sharding.algorithm` in the `argocd-cmd-params-cm` `configMap` (preferably) or by setting the `ARGOCD_CONTROLLER_SHARDING_ALGORITHM` environment variable and by specifiying the same possible values. +* The shard distribution algorithm of the `argocd-application-controller` can be set by using the `--sharding-method` parameter. Supported sharding methods are : [legacy (default), round-robin, consistent-hashing]: +- `legacy` mode uses an `uid` based distribution (non-uniform). +- `round-robin` uses an equal distribution across all shards. +- `consistent-hashing` uses the consistent hashing with bounded loads algorithm which tends to equal distribution and also reduces cluster or application reshuffling in case of additions or removals of shards or clusters. -!!! warning "Alpha Feature" - The `round-robin` shard distribution algorithm is an experimental feature. Reshuffling is known to occur in certain scenarios with cluster removal. If the cluster at rank-0 is removed, reshuffling all clusters across shards will occur and may temporarily have negative performance impacts. +The `--sharding-method` parameter can also be overridden by setting the key `controller.sharding.algorithm` in the `argocd-cmd-params-cm` `configMap` (preferably) or by setting the `ARGOCD_CONTROLLER_SHARDING_ALGORITHM` environment variable and by specifiying the same possible values. + +!!! warning "Alpha Features" + The `round-robin` shard distribution algorithm is an experimental feature. Reshuffling is known to occur in certain scenarios with cluster removal. If the cluster at rank-0 is removed, reshuffling all clusters across shards will occur and may temporarily have negative performance impacts. + The `consistent-hashing` shard distribution algorithm is an experimental feature. Extensive benchmark have been documented on the [CNOE blog](https://cnoe.io/blog/argo-cd-application-scalability) with encouraging results. Community feedback is highly appreciated before moving this feature to a production ready state. * A cluster can be manually assigned and forced to a `shard` by patching the `shard` field in the cluster secret to contain the shard number, e.g. ```yaml diff --git a/docs/operator-manual/server-commands/argocd-application-controller.md b/docs/operator-manual/server-commands/argocd-application-controller.md index caab2770e0..930dfa4147 100644 --- a/docs/operator-manual/server-commands/argocd-application-controller.md +++ b/docs/operator-manual/server-commands/argocd-application-controller.md @@ -70,7 +70,7 @@ argocd-application-controller [flags] --sentinelmaster string Redis sentinel master group name. (default "master") --server string The address and port of the Kubernetes API server --server-side-diff-enabled Feature flag to enable ServerSide diff. Default ("false") - --sharding-method string Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin] (default "legacy") + --sharding-method string Enables choice of sharding method. Supported sharding methods are : [legacy, round-robin, consistent-hashing] (default "legacy") --status-processors int Number of application status processors (default 20) --tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used. --token string Bearer token for authentication to the API server diff --git a/docs/user-guide/commands/argocd_admin_cluster_shards.md b/docs/user-guide/commands/argocd_admin_cluster_shards.md index 48f6138d47..44efa4392b 100644 --- a/docs/user-guide/commands/argocd_admin_cluster_shards.md +++ b/docs/user-guide/commands/argocd_admin_cluster_shards.md @@ -43,7 +43,7 @@ argocd admin cluster shards [flags] --sentinelmaster string Redis sentinel master group name. (default "master") --server string The address and port of the Kubernetes API server --shard int Cluster shard filter (default -1) - --sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] (default "legacy") + --sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] (default "legacy") --tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used. --token string Bearer token for authentication to the API server --user string The name of the kubeconfig user to use diff --git a/docs/user-guide/commands/argocd_admin_cluster_stats.md b/docs/user-guide/commands/argocd_admin_cluster_stats.md index c5297ce7e3..18aa583f01 100644 --- a/docs/user-guide/commands/argocd_admin_cluster_stats.md +++ b/docs/user-guide/commands/argocd_admin_cluster_stats.md @@ -57,7 +57,7 @@ argocd admin cluster stats target-cluster --sentinelmaster string Redis sentinel master group name. (default "master") --server string The address and port of the Kubernetes API server --shard int Cluster shard filter (default -1) - --sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin] (default "legacy") + --sharding-method string Sharding method. Defaults: legacy. Supported sharding methods are : [legacy, round-robin, consistent-hashing] (default "legacy") --tls-server-name string If provided, this name will be used to validate server certificate. If this is not provided, hostname used to contact the server is used. --token string Bearer token for authentication to the API server --user string The name of the kubeconfig user to use diff --git a/go.mod b/go.mod index c6e1bb004b..ad9422c347 100644 --- a/go.mod +++ b/go.mod @@ -1,8 +1,6 @@ module github.com/argoproj/argo-cd/v2 -go 1.21 - -toolchain go1.21.0 +go 1.21.0 require ( code.gitea.io/sdk/gitea v0.18.0 @@ -42,6 +40,7 @@ require ( github.com/gogo/protobuf v1.3.2 github.com/golang-jwt/jwt/v4 v4.5.0 github.com/golang/protobuf v1.5.4 + github.com/google/btree v1.1.2 github.com/google/go-cmp v0.6.0 github.com/google/go-github/v35 v35.3.0 github.com/google/go-jsonnet v0.20.0 @@ -63,6 +62,7 @@ require ( github.com/mattn/go-isatty v0.0.19 github.com/mattn/go-zglob v0.0.4 github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5 + github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 github.com/olekukonko/tablewriter v0.0.5 github.com/patrickmn/go-cache v2.1.0+incompatible github.com/prometheus/client_golang v1.18.0 @@ -209,7 +209,6 @@ require ( github.com/go-telegram-bot-api/telegram-bot-api/v5 v5.5.1 // indirect github.com/golang/glog v1.1.2 // indirect github.com/golang/groupcache v0.0.0-20210331224755-41bb18bfe9da // indirect - github.com/google/btree v1.1.2 // indirect github.com/google/go-github/v41 v41.0.0 // indirect github.com/google/go-github/v53 v53.2.0 // indirect github.com/google/go-querystring v1.1.0 // indirect diff --git a/go.sum b/go.sum index c9209abedd..f14ed4cdd8 100644 --- a/go.sum +++ b/go.sum @@ -1404,6 +1404,8 @@ github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5 h1:YH424zrwLTlyHS github.com/microsoft/azure-devops-go-api/azuredevops v1.0.0-b5/go.mod h1:PoGiBqKSQK1vIfQ+yVaFcGjDySHvym6FM1cNYnwzbrY= github.com/miekg/dns v1.0.14/go.mod h1:W1PPwlIAgtquWBMBEV9nkV9Cazfe8ScdGz/Lj7v3Nrg= github.com/minio/asm2plan9s v0.0.0-20200509001527-cdd76441f9d8/go.mod h1:mC1jAcsrzbxHt8iiaC+zU4b1ylILSosueou12R++wfY= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1 h1:lYpkrQH5ajf0OXOcUbGjvZxxijuBwbbmlSxLiuofa+g= +github.com/minio/blake2b-simd v0.0.0-20160723061019-3f5f724cb5b1/go.mod h1:pD8RvIylQ358TN4wwqatJ8rNavkEINozVn9DtGI3dfQ= github.com/minio/c2goasm v0.0.0-20190812172519-36a3d3bbc4f3/go.mod h1:RagcQ7I8IeTMnF8JTXieKnO4Z6JCsikNEzj0DwauVzE= github.com/minio/md5-simd v1.1.2/go.mod h1:MzdKDxYpY2BT9XQFocsiZf/NKVtR7nkE4RoEpN+20RM= github.com/minio/minio-go/v7 v7.0.58/go.mod h1:NUDy4A4oXPq1l2yK6LTSvCEzAMeIcoz9lcj5dbzSrRE=