2020-10-09 20:16:54 +00:00
package sharding
import (
2023-06-05 13:19:14 +00:00
"context"
2024-06-11 15:41:55 +00:00
"encoding/json"
2024-12-30 08:56:41 +00:00
stderrors "errors"
2020-10-09 20:16:54 +00:00
"fmt"
"hash/fnv"
2024-02-13 16:51:41 +00:00
"math"
2020-10-09 20:16:54 +00:00
"os"
2025-01-06 16:30:42 +00:00
"slices"
2023-06-05 13:19:14 +00:00
"sort"
2020-10-09 20:16:54 +00:00
"strconv"
"strings"
2023-09-22 19:49:09 +00:00
"time"
2025-01-03 16:10:00 +00:00
corev1 "k8s.io/api/core/v1"
2023-09-22 19:49:09 +00:00
metav1 "k8s.io/apimachinery/pkg/apis/meta/v1"
"k8s.io/client-go/kubernetes"
2023-06-05 13:19:14 +00:00
2024-06-10 13:18:12 +00:00
"github.com/argoproj/argo-cd/v2/common"
"github.com/argoproj/argo-cd/v2/controller/sharding/consistent"
"github.com/argoproj/argo-cd/v2/pkg/apis/application/v1alpha1"
log "github.com/sirupsen/logrus"
2025-01-03 17:09:37 +00:00
apierrors "k8s.io/apimachinery/pkg/api/errors"
2024-06-10 13:18:12 +00:00
2023-06-05 13:19:14 +00:00
"github.com/argoproj/argo-cd/v2/util/db"
"github.com/argoproj/argo-cd/v2/util/env"
2024-02-13 16:51:41 +00:00
"github.com/argoproj/argo-cd/v2/util/errors"
2023-09-22 19:49:09 +00:00
"github.com/argoproj/argo-cd/v2/util/settings"
2020-10-09 20:16:54 +00:00
)
2023-06-05 13:19:14 +00:00
// Make it overridable for testing
var osHostnameFunction = os . Hostname
2023-09-22 19:49:09 +00:00
// Make it overridable for testing
var heartbeatCurrentTime = metav1 . Now
var (
HeartbeatDuration = env . ParseNumFromEnv ( common . EnvControllerHeartbeatTime , 10 , 10 , 60 )
HeartbeatTimeout = 3 * HeartbeatDuration
)
const ShardControllerMappingKey = "shardControllerMapping"
2024-06-11 15:41:55 +00:00
type (
DistributionFunction func ( c * v1alpha1 . Cluster ) int
ClusterFilterFunction func ( c * v1alpha1 . Cluster ) bool
clusterAccessor func ( ) [ ] * v1alpha1 . Cluster
appAccessor func ( ) [ ] * v1alpha1 . Application
)
2023-06-05 13:19:14 +00:00
2023-09-22 19:49:09 +00:00
// shardApplicationControllerMapping stores the mapping of Shard Number to Application Controller in ConfigMap.
// It also stores the heartbeat of last synced time of the application controller.
type shardApplicationControllerMapping struct {
ShardNumber int
ControllerName string
HeartbeatTime metav1 . Time
}
2023-06-05 13:19:14 +00:00
// GetClusterFilter returns a ClusterFilterFunction which is a function taking a cluster as a parameter
2024-05-28 09:51:04 +00:00
// and returns whether or not the cluster should be processed by a given shard. It calls the distributionFunction
2023-06-05 13:19:14 +00:00
// to determine which shard will process the cluster, and if the given shard is equal to the calculated shard
// the function will return true.
2025-01-07 15:12:56 +00:00
func GetClusterFilter ( _ db . ArgoDB , distributionFunction DistributionFunction , replicas , shard int ) ClusterFilterFunction {
2023-06-05 13:19:14 +00:00
return func ( c * v1alpha1 . Cluster ) bool {
clusterShard := 0
if c != nil && c . Shard != nil {
requestedShard := int ( * c . Shard )
if requestedShard < replicas {
clusterShard = requestedShard
} else {
log . Warnf ( "Specified cluster shard (%d) for cluster: %s is greater than the number of available shard. Assigning automatically." , requestedShard , c . Name )
}
} else {
clusterShard = distributionFunction ( c )
}
return clusterShard == shard
}
}
// GetDistributionFunction returns which DistributionFunction should be used based on the passed algorithm and
// the current datas.
2024-03-01 18:56:48 +00:00
func GetDistributionFunction ( clusters clusterAccessor , apps appAccessor , shardingAlgorithm string , replicasCount int ) DistributionFunction {
2024-01-11 06:32:11 +00:00
log . Debugf ( "Using filter function: %s" , shardingAlgorithm )
distributionFunction := LegacyDistributionFunction ( replicasCount )
2023-06-05 13:19:14 +00:00
switch shardingAlgorithm {
case common . RoundRobinShardingAlgorithm :
2024-01-11 06:32:11 +00:00
distributionFunction = RoundRobinDistributionFunction ( clusters , replicasCount )
2023-06-05 13:19:14 +00:00
case common . LegacyShardingAlgorithm :
2024-01-11 06:32:11 +00:00
distributionFunction = LegacyDistributionFunction ( replicasCount )
2024-06-05 18:28:19 +00:00
case common . ConsistentHashingWithBoundedLoadsAlgorithm :
distributionFunction = ConsistentHashingWithBoundedLoadsDistributionFunction ( clusters , apps , replicasCount )
2023-06-05 13:19:14 +00:00
default :
log . Warnf ( "distribution type %s is not supported, defaulting to %s" , shardingAlgorithm , common . DefaultShardingAlgorithm )
}
return distributionFunction
}
// LegacyDistributionFunction returns a DistributionFunction using a stable distribution algorithm:
// for a given cluster the function will return the shard number based on the cluster id. This function
// is lightweight and can be distributed easily, however, it does not ensure an homogenous distribution as
// some shards may get assigned more clusters than others. It is the legacy function distribution that is
// kept for compatibility reasons
2024-01-11 06:32:11 +00:00
func LegacyDistributionFunction ( replicas int ) DistributionFunction {
2023-06-05 13:19:14 +00:00
return func ( c * v1alpha1 . Cluster ) int {
if replicas == 0 {
2024-01-11 06:32:11 +00:00
log . Debugf ( "Replicas count is : %d, returning -1" , replicas )
2023-06-05 13:19:14 +00:00
return - 1
}
if c == nil {
2024-01-11 06:32:11 +00:00
log . Debug ( "In-cluster: returning 0" )
2023-06-05 13:19:14 +00:00
return 0
}
2024-01-11 06:32:11 +00:00
// if Shard is manually set and the assigned value is lower than the number of replicas,
// then its value is returned otherwise it is the default calculated value
if c . Shard != nil && int ( * c . Shard ) < replicas {
return int ( * c . Shard )
}
2023-06-05 13:19:14 +00:00
id := c . ID
log . Debugf ( "Calculating cluster shard for cluster id: %s" , id )
if id == "" {
return 0
}
2025-01-07 15:25:22 +00:00
h := fnv . New32a ( )
_ , _ = h . Write ( [ ] byte ( id ) )
shard := int32 ( h . Sum32 ( ) % uint32 ( replicas ) )
log . Debugf ( "Cluster with id=%s will be processed by shard %d" , id , shard )
return int ( shard )
2023-06-05 13:19:14 +00:00
}
}
// RoundRobinDistributionFunction returns a DistributionFunction using an homogeneous distribution algorithm:
// for a given cluster the function will return the shard number based on the modulo of the cluster rank in
// the cluster's list sorted by uid on the shard number.
// This function ensures an homogenous distribution: each shards got assigned the same number of
2024-05-28 09:51:04 +00:00
// clusters +/-1 , but with the drawback of a reshuffling of clusters across shards in case of some changes
2023-06-05 13:19:14 +00:00
// in the cluster list
2024-01-11 06:32:11 +00:00
func RoundRobinDistributionFunction ( clusters clusterAccessor , replicas int ) DistributionFunction {
2023-06-05 13:19:14 +00:00
return func ( c * v1alpha1 . Cluster ) int {
if replicas > 0 {
2024-06-05 18:28:19 +00:00
if c == nil { // in-cluster does not necessarily have a secret assigned. So we are receiving a nil cluster here.
2023-06-05 13:19:14 +00:00
return 0
2024-01-11 06:32:11 +00:00
}
// if Shard is manually set and the assigned value is lower than the number of replicas,
// then its value is returned otherwise it is the default calculated value
if c . Shard != nil && int ( * c . Shard ) < replicas {
return int ( * c . Shard )
2023-06-05 13:19:14 +00:00
}
2025-01-07 15:25:22 +00:00
clusterIndexdByClusterIdMap := createClusterIndexByClusterIdMap ( clusters )
clusterIndex , ok := clusterIndexdByClusterIdMap [ c . ID ]
if ! ok {
log . Warnf ( "Cluster with id=%s not found in cluster map." , c . ID )
return - 1
}
shard := int ( clusterIndex % replicas )
log . Debugf ( "Cluster with id=%s will be processed by shard %d" , c . ID , shard )
return shard
2023-06-05 13:19:14 +00:00
}
log . Warnf ( "The number of replicas (%d) is lower than 1" , replicas )
return - 1
}
}
2024-06-05 18:28:19 +00:00
// ConsistentHashingWithBoundedLoadsDistributionFunction returns a DistributionFunction using an almost homogeneous distribution algorithm:
// for a given cluster the function will return the shard number based on a consistent hashing with bounded loads algorithm.
// This function ensures an almost homogenous distribution: each shards got assigned the fairly similar number of
// clusters +/-10% , but with it is resilient to sharding and/or number of clusters changes.
func ConsistentHashingWithBoundedLoadsDistributionFunction ( clusters clusterAccessor , apps appAccessor , replicas int ) DistributionFunction {
return func ( c * v1alpha1 . Cluster ) int {
if replicas > 0 {
if c == nil { // in-cluster does not necessarily have a secret assigned. So we are receiving a nil cluster here.
return 0
}
// if Shard is manually set and the assigned value is lower than the number of replicas,
// then its value is returned otherwise it is the default calculated value
if c . Shard != nil && int ( * c . Shard ) < replicas {
return int ( * c . Shard )
}
2025-01-07 15:25:22 +00:00
// if the cluster is not in the clusters list anymore, we should unassign it from any shard, so we
// return the reserved value of -1
if ! slices . Contains ( clusters ( ) , c ) {
log . Warnf ( "Cluster with id=%s not found in cluster map." , c . ID )
return - 1
}
shardIndexedByCluster := createConsistentHashingWithBoundLoads ( replicas , clusters , apps )
shard , ok := shardIndexedByCluster [ c . ID ]
if ! ok {
log . Warnf ( "Cluster with id=%s not found in cluster map." , c . ID )
return - 1
}
log . Debugf ( "Cluster with id=%s will be processed by shard %d" , c . ID , shard )
return shard
2024-06-05 18:28:19 +00:00
}
log . Warnf ( "The number of replicas (%d) is lower than 1" , replicas )
return - 1
}
}
func createConsistentHashingWithBoundLoads ( replicas int , getCluster clusterAccessor , getApp appAccessor ) map [ string ] int {
clusters := getSortedClustersList ( getCluster )
appDistribution := getAppDistribution ( getCluster , getApp )
shardIndexedByCluster := make ( map [ string ] int )
appsIndexedByShard := make ( map [ string ] int64 )
consistentHashing := consistent . New ( )
// Adding a shard with id "-1" as a reserved value for clusters that does not have an assigned shard
// this happens for clusters that are removed for the clusters list
2024-06-11 15:41:55 +00:00
// consistentHashing.Add("-1")
2024-06-05 18:28:19 +00:00
for i := 0 ; i < replicas ; i ++ {
shard := strconv . Itoa ( i )
consistentHashing . Add ( shard )
appsIndexedByShard [ shard ] = 0
}
for _ , c := range clusters {
clusterIndex , err := consistentHashing . GetLeast ( c . ID )
if err != nil {
log . Warnf ( "Cluster with id=%s not found in cluster map." , c . ID )
}
shardIndexedByCluster [ c . ID ] , err = strconv . Atoi ( clusterIndex )
if err != nil {
log . Errorf ( "Consistent Hashing was supposed to return a shard index but it returned %d" , err )
}
numApps , ok := appDistribution [ c . Server ]
if ! ok {
numApps = 0
}
appsIndexedByShard [ clusterIndex ] += numApps
consistentHashing . UpdateLoad ( clusterIndex , appsIndexedByShard [ clusterIndex ] )
}
return shardIndexedByCluster
}
func getAppDistribution ( getCluster clusterAccessor , getApps appAccessor ) map [ string ] int64 {
apps := getApps ( )
clusters := getCluster ( )
appDistribution := make ( map [ string ] int64 , len ( clusters ) )
for _ , a := range apps {
if _ , ok := appDistribution [ a . Spec . Destination . Server ] ; ! ok {
appDistribution [ a . Spec . Destination . Server ] = 0
}
appDistribution [ a . Spec . Destination . Server ] ++
}
return appDistribution
}
2024-01-11 06:32:11 +00:00
// NoShardingDistributionFunction returns a DistributionFunction that will process all cluster by shard 0
// the function is created for API compatibility purposes and is not supposed to be activated.
func NoShardingDistributionFunction ( ) DistributionFunction {
2025-01-07 15:12:56 +00:00
return func ( _ * v1alpha1 . Cluster ) int { return 0 }
2024-01-11 06:32:11 +00:00
}
2023-06-05 13:19:14 +00:00
// InferShard extracts the shard index based on its hostname.
2020-10-09 20:16:54 +00:00
func InferShard ( ) ( int , error ) {
2023-06-05 13:19:14 +00:00
hostname , err := osHostnameFunction ( )
2020-10-09 20:16:54 +00:00
if err != nil {
2023-09-22 19:49:09 +00:00
return - 1 , err
2020-10-09 20:16:54 +00:00
}
parts := strings . Split ( hostname , "-" )
if len ( parts ) == 0 {
2024-01-11 06:32:11 +00:00
log . Warnf ( "hostname should end with shard number separated by '-' but got: %s" , hostname )
return 0 , nil
2020-10-09 20:16:54 +00:00
}
shard , err := strconv . Atoi ( parts [ len ( parts ) - 1 ] )
if err != nil {
2024-01-11 06:32:11 +00:00
log . Warnf ( "hostname should end with shard number separated by '-' but got: %s" , hostname )
return 0 , nil
2020-10-09 20:16:54 +00:00
}
2023-06-05 13:19:14 +00:00
return int ( shard ) , nil
2020-10-09 20:16:54 +00:00
}
2024-01-11 06:32:11 +00:00
func getSortedClustersList ( getCluster clusterAccessor ) [ ] * v1alpha1 . Cluster {
clusters := getCluster ( )
2023-06-05 13:19:14 +00:00
sort . Slice ( clusters , func ( i , j int ) bool {
return clusters [ i ] . ID < clusters [ j ] . ID
} )
return clusters
2020-10-09 20:16:54 +00:00
}
2024-01-11 06:32:11 +00:00
func createClusterIndexByClusterIdMap ( getCluster clusterAccessor ) map [ string ] int {
clusters := getSortedClustersList ( getCluster )
2023-06-05 13:19:14 +00:00
log . Debugf ( "ClustersList has %d items" , len ( clusters ) )
2024-01-11 06:32:11 +00:00
clusterById := make ( map [ string ] * v1alpha1 . Cluster )
2023-06-05 13:19:14 +00:00
clusterIndexedByClusterId := make ( map [ string ] int )
for i , cluster := range clusters {
log . Debugf ( "Adding cluster with id=%s and name=%s to cluster's map" , cluster . ID , cluster . Name )
clusterById [ cluster . ID ] = cluster
clusterIndexedByClusterId [ cluster . ID ] = i
2020-10-09 20:16:54 +00:00
}
2023-06-05 13:19:14 +00:00
return clusterIndexedByClusterId
2020-10-09 20:16:54 +00:00
}
2023-09-22 19:49:09 +00:00
// GetOrUpdateShardFromConfigMap finds the shard number from the shard mapping configmap. If the shard mapping configmap does not exist,
// the function creates the shard mapping configmap.
// The function takes the shard number from the environment variable (default value -1, if not set) and passes it to this function.
// If the shard value passed to this function is -1, that is, the shard was not set as an environment variable,
// we default the shard number to 0 for computing the default config map.
2024-02-13 16:51:41 +00:00
func GetOrUpdateShardFromConfigMap ( kubeClient kubernetes . Interface , settingsMgr * settings . SettingsManager , replicas , shard int ) ( int , error ) {
2023-09-22 19:49:09 +00:00
hostname , err := osHostnameFunction ( )
if err != nil {
return - 1 , err
}
// fetch the shard mapping configMap
shardMappingCM , err := kubeClient . CoreV1 ( ) . ConfigMaps ( settingsMgr . GetNamespace ( ) ) . Get ( context . Background ( ) , common . ArgoCDAppControllerShardConfigMapName , metav1 . GetOptions { } )
if err != nil {
2025-01-03 17:09:37 +00:00
if ! apierrors . IsNotFound ( err ) {
2024-06-11 19:33:22 +00:00
return - 1 , fmt . Errorf ( "error getting sharding config map: %w" , err )
2023-09-22 19:49:09 +00:00
}
log . Infof ( "shard mapping configmap %s not found. Creating default shard mapping configmap." , common . ArgoCDAppControllerShardConfigMapName )
// if the shard is not set as an environment variable, set the default value of shard to 0 for generating default CM
if shard == - 1 {
shard = 0
}
shardMappingCM , err = generateDefaultShardMappingCM ( settingsMgr . GetNamespace ( ) , hostname , replicas , shard )
if err != nil {
2024-06-11 19:33:22 +00:00
return - 1 , fmt . Errorf ( "error generating default shard mapping configmap %w" , err )
2023-09-22 19:49:09 +00:00
}
if _ , err = kubeClient . CoreV1 ( ) . ConfigMaps ( settingsMgr . GetNamespace ( ) ) . Create ( context . Background ( ) , shardMappingCM , metav1 . CreateOptions { } ) ; err != nil {
2024-06-11 19:33:22 +00:00
return - 1 , fmt . Errorf ( "error creating shard mapping configmap %w" , err )
2023-09-22 19:49:09 +00:00
}
// return 0 as the controller is assigned to shard 0 while generating default shard mapping ConfigMap
return shard , nil
2025-01-07 15:25:22 +00:00
}
// Identify the available shard and update the ConfigMap
data := shardMappingCM . Data [ ShardControllerMappingKey ]
var shardMappingData [ ] shardApplicationControllerMapping
err = json . Unmarshal ( [ ] byte ( data ) , & shardMappingData )
if err != nil {
return - 1 , fmt . Errorf ( "error unmarshalling shard config map data: %w" , err )
}
2023-09-22 19:49:09 +00:00
2025-01-07 15:25:22 +00:00
shard , shardMappingData = getOrUpdateShardNumberForController ( shardMappingData , hostname , replicas , shard )
updatedShardMappingData , err := json . Marshal ( shardMappingData )
if err != nil {
return - 1 , fmt . Errorf ( "error marshalling data of shard mapping ConfigMap: %w" , err )
}
shardMappingCM . Data [ ShardControllerMappingKey ] = string ( updatedShardMappingData )
2023-09-22 19:49:09 +00:00
2025-01-07 15:25:22 +00:00
_ , err = kubeClient . CoreV1 ( ) . ConfigMaps ( settingsMgr . GetNamespace ( ) ) . Update ( context . Background ( ) , shardMappingCM , metav1 . UpdateOptions { } )
if err != nil {
return - 1 , err
2023-09-22 19:49:09 +00:00
}
2025-01-07 15:25:22 +00:00
return shard , nil
2023-09-22 19:49:09 +00:00
}
// getOrUpdateShardNumberForController takes list of shardApplicationControllerMapping and performs computation to find the matching or empty shard number
func getOrUpdateShardNumberForController ( shardMappingData [ ] shardApplicationControllerMapping , hostname string , replicas , shard int ) ( int , [ ] shardApplicationControllerMapping ) {
// if current length of shardMappingData in shard mapping configMap is less than the number of replicas,
// create additional empty entries for missing shard numbers in shardMappingDataconfigMap
if len ( shardMappingData ) < replicas {
// generate extra default mappings
for currentShard := len ( shardMappingData ) ; currentShard < replicas ; currentShard ++ {
shardMappingData = append ( shardMappingData , shardApplicationControllerMapping {
ShardNumber : currentShard ,
} )
}
}
// if current length of shardMappingData in shard mapping configMap is more than the number of replicas,
// we replace the config map with default config map and let controllers self assign the new shard to itself
if len ( shardMappingData ) > replicas {
shardMappingData = getDefaultShardMappingData ( replicas )
}
if shard != - 1 && shard < replicas {
log . Debugf ( "update heartbeat for shard %d" , shard )
for i := range shardMappingData {
shardMapping := shardMappingData [ i ]
if shardMapping . ShardNumber == shard {
log . Debugf ( "Shard found. Updating heartbeat!!" )
shardMapping . ControllerName = hostname
shardMapping . HeartbeatTime = heartbeatCurrentTime ( )
shardMappingData [ i ] = shardMapping
break
}
}
} else {
// find the matching shard with assigned controllerName
for i := range shardMappingData {
shardMapping := shardMappingData [ i ]
if shardMapping . ControllerName == hostname {
log . Debugf ( "Shard matched. Updating heartbeat!!" )
shard = int ( shardMapping . ShardNumber )
shardMapping . HeartbeatTime = heartbeatCurrentTime ( )
shardMappingData [ i ] = shardMapping
break
}
}
}
// at this point, we have still not found a shard with matching hostname.
// So, find a shard with either no controller assigned or assigned controller
// with heartbeat past threshold
if shard == - 1 {
for i := range shardMappingData {
shardMapping := shardMappingData [ i ]
if ( shardMapping . ControllerName == "" ) || ( metav1 . Now ( ) . After ( shardMapping . HeartbeatTime . Add ( time . Duration ( HeartbeatTimeout ) * time . Second ) ) ) {
shard = int ( shardMapping . ShardNumber )
log . Debugf ( "Empty shard found %d" , shard )
shardMapping . ControllerName = hostname
shardMapping . HeartbeatTime = heartbeatCurrentTime ( )
shardMappingData [ i ] = shardMapping
break
}
}
}
return shard , shardMappingData
}
// generateDefaultShardMappingCM creates a default shard mapping configMap. Assigns current controller to shard 0.
2025-01-03 16:10:00 +00:00
func generateDefaultShardMappingCM ( namespace , hostname string , replicas , shard int ) ( * corev1 . ConfigMap , error ) {
shardingCM := & corev1 . ConfigMap {
2023-09-22 19:49:09 +00:00
ObjectMeta : metav1 . ObjectMeta {
Name : common . ArgoCDAppControllerShardConfigMapName ,
Namespace : namespace ,
} ,
Data : map [ string ] string { } ,
}
shardMappingData := getDefaultShardMappingData ( replicas )
// if shard is not assigned to a controller, we use shard 0
if shard == - 1 || shard > replicas {
shard = 0
}
shardMappingData [ shard ] . ControllerName = hostname
shardMappingData [ shard ] . HeartbeatTime = heartbeatCurrentTime ( )
data , err := json . Marshal ( shardMappingData )
if err != nil {
2024-06-11 19:33:22 +00:00
return nil , fmt . Errorf ( "error generating default ConfigMap: %w" , err )
2023-09-22 19:49:09 +00:00
}
shardingCM . Data [ ShardControllerMappingKey ] = string ( data )
return shardingCM , nil
}
func getDefaultShardMappingData ( replicas int ) [ ] shardApplicationControllerMapping {
shardMappingData := make ( [ ] shardApplicationControllerMapping , 0 )
for i := 0 ; i < replicas ; i ++ {
mapping := shardApplicationControllerMapping {
ShardNumber : i ,
}
shardMappingData = append ( shardMappingData , mapping )
}
return shardMappingData
}
2024-02-13 16:51:41 +00:00
func GetClusterSharding ( kubeClient kubernetes . Interface , settingsMgr * settings . SettingsManager , shardingAlgorithm string , enableDynamicClusterDistribution bool ) ( ClusterShardingCache , error ) {
var replicasCount int
if enableDynamicClusterDistribution {
applicationControllerName := env . StringFromEnv ( common . EnvAppControllerName , common . DefaultApplicationControllerName )
appControllerDeployment , err := kubeClient . AppsV1 ( ) . Deployments ( settingsMgr . GetNamespace ( ) ) . Get ( context . Background ( ) , applicationControllerName , metav1 . GetOptions { } )
// if app controller deployment is not found when dynamic cluster distribution is enabled error out
if err != nil {
2024-06-11 19:33:22 +00:00
return nil , fmt . Errorf ( "(dynamic cluster distribution) failed to get app controller deployment: %w" , err )
2024-02-13 16:51:41 +00:00
}
if appControllerDeployment != nil && appControllerDeployment . Spec . Replicas != nil {
replicasCount = int ( * appControllerDeployment . Spec . Replicas )
} else {
2024-12-30 08:56:41 +00:00
return nil , stderrors . New ( "(dynamic cluster distribution) failed to get app controller deployment replica count" )
2024-02-13 16:51:41 +00:00
}
} else {
replicasCount = env . ParseNumFromEnv ( common . EnvControllerReplicas , 0 , 0 , math . MaxInt32 )
}
shardNumber := env . ParseNumFromEnv ( common . EnvControllerShard , - 1 , - math . MaxInt32 , math . MaxInt32 )
if replicasCount > 1 {
// check for shard mapping using configmap if application-controller is a deployment
// else use existing logic to infer shard from pod name if application-controller is a statefulset
if enableDynamicClusterDistribution {
var err error
// retry 3 times if we find a conflict while updating shard mapping configMap.
// If we still see conflicts after the retries, wait for next iteration of heartbeat process.
for i := 0 ; i <= common . AppControllerHeartbeatUpdateRetryCount ; i ++ {
shardNumber , err = GetOrUpdateShardFromConfigMap ( kubeClient , settingsMgr , replicasCount , shardNumber )
2025-01-03 17:09:37 +00:00
if err != nil && ! apierrors . IsConflict ( err ) {
2024-06-11 19:33:22 +00:00
err = fmt . Errorf ( "unable to get shard due to error updating the sharding config map: %w" , err )
2024-02-13 16:51:41 +00:00
break
}
log . Warnf ( "conflict when getting shard from shard mapping configMap. Retrying (%d/3)" , i )
}
errors . CheckError ( err )
} else {
if shardNumber < 0 {
var err error
shardNumber , err = InferShard ( )
errors . CheckError ( err )
}
if shardNumber > replicasCount {
log . Warnf ( "Calculated shard number %d is greated than the number of replicas count. Defaulting to 0" , shardNumber )
shardNumber = 0
}
}
} else {
log . Info ( "Processing all cluster shards" )
shardNumber = 0
}
db := db . NewDB ( settingsMgr . GetNamespace ( ) , settingsMgr , kubeClient )
return NewClusterSharding ( db , shardNumber , replicasCount , shardingAlgorithm ) , nil
}