TDengine/source/dnode/mnode/impl/inc/mndDef.h

1480 lines
37 KiB
C

/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#ifndef _TD_MND_DEF_H_
#define _TD_MND_DEF_H_
#include "os.h"
#include "cJSON.h"
#include "scheduler.h"
#include "sync.h"
#include "tarray.h"
#include "thash.h"
#include "tlist.h"
#include "tlog.h"
#include "tmsg.h"
#include "trpc.h"
#include "ttimer.h"
#include "tconfig.h"
#include "mnode.h"
#ifdef __cplusplus
extern "C" {
#endif
typedef enum {
MND_OPER_CONNECT = 1,
MND_OPER_CREATE_ACCT,
MND_OPER_DROP_ACCT,
MND_OPER_ALTER_ACCT,
MND_OPER_CREATE_USER,
MND_OPER_DROP_USER,
MND_OPER_ALTER_USER,
MND_OPER_CREATE_DNODE,
MND_OPER_DROP_DNODE,
MND_OPER_CONFIG_DNODE,
MND_OPER_CREATE_MNODE,
MND_OPER_DROP_MNODE,
MND_OPER_CREATE_QNODE,
MND_OPER_DROP_QNODE,
MND_OPER_CREATE_SNODE,
MND_OPER_DROP_SNODE,
MND_OPER_REDISTRIBUTE_VGROUP,
MND_OPER_MERGE_VGROUP,
MND_OPER_SPLIT_VGROUP,
MND_OPER_BALANCE_VGROUP,
MND_OPER_CREATE_FUNC,
MND_OPER_DROP_FUNC,
MND_OPER_KILL_TRANS,
MND_OPER_KILL_CONN,
MND_OPER_KILL_QUERY,
MND_OPER_CREATE_DB,
MND_OPER_ALTER_DB,
MND_OPER_DROP_DB,
MND_OPER_COMPACT_DB,
MND_OPER_TRIM_DB,
MND_OPER_USE_DB,
MND_OPER_WRITE_DB,
MND_OPER_READ_DB,
MND_OPER_READ_OR_WRITE_DB,
MND_OPER_SHOW_VARIABLES,
MND_OPER_SUBSCRIBE,
MND_OPER_CREATE_TOPIC,
MND_OPER_DROP_TOPIC,
MND_OPER_CREATE_VIEW,
MND_OPER_DROP_VIEW,
MND_OPER_CONFIG_CLUSTER,
MND_OPER_BALANCE_VGROUP_LEADER,
MND_OPER_CREATE_ANODE,
MND_OPER_UPDATE_ANODE,
MND_OPER_DROP_ANODE,
MND_OPER_CREATE_BNODE,
MND_OPER_DROP_BNODE,
MND_OPER_CREATE_MOUNT,
MND_OPER_DROP_MOUNT,
MND_OPER_SCAN_DB,
MND_OPER_CREATE_RSMA,
MND_OPER_DROP_RSMA,
MND_OPER_ROLLUP_DB,
MND_OPER_SHOW_STB,
MND_OPER_ALTER_RSMA,
MND_OPER_ALTER_DNODE_RELOAD_TLS,
MND_OPER_CREATE_TOKEN,
MND_OPER_ALTER_TOKEN,
MND_OPER_DROP_TOKEN,
MND_OPER_CREATE_TABLE,
MND_OPER_CREATE_ROLE,
MND_OPER_DROP_ROLE,
MND_OPER_ALTER_ROLE,
MND_OPER_SSMIGRATE_DB,
MND_OPER_SHOW_DATABASES,
MND_OPER_SHOW_VGROUPS,
MND_OPER_SHOW_VNODES,
MND_OPER_SHOW_COMPACTS,
MND_OPER_SHOW_RETENTIONS,
MND_OPER_SHOW_SCANS,
MND_OPER_SHOW_SSMIGRATES,
MND_OPER_CREATE_XNODE,
MND_OPER_UPDATE_XNODE,
MND_OPER_DROP_XNODE,
MND_OPER_DRAIN_XNODE,
MND_OPER_CREATE_XNODE_TASK,
MND_OPER_START_XNODE_TASK,
MND_OPER_STOP_XNODE_TASK,
MND_OPER_UPDATE_XNODE_TASK,
MND_OPER_DROP_XNODE_TASK,
MND_OPER_CREATE_XNODE_JOB,
MND_OPER_UPDATE_XNODE_JOB,
MND_OPER_REBALANCE_XNODE_JOB,
MND_OPER_DROP_XNODE_JOB,
MND_OPER_CREATE_XNODE_AGENT,
MND_OPER_UPDATE_XNODE_AGENT,
MND_OPER_DROP_XNODE_AGENT,
MND_OPER_CONFIG_SOD,
MND_OPER_CONFIG_MAC,
MND_OPER_MAX // the max operation type
} EOperType;
typedef enum {
MND_AUTH_ACCT_START = 0,
MND_AUTH_ACCT_USER,
MND_AUTH_ACCT_DNODE,
MND_AUTH_ACCT_MNODE,
MND_AUTH_ACCT_DB,
MND_AUTH_ACCT_TABLE,
MND_AUTH_ACCT_MAX
} EAuthAcct;
typedef enum {
MND_AUTH_OP_START = 0,
MND_AUTH_OP_CREATE_USER,
MND_AUTH_OP_ALTER_USER,
MND_AUTH_OP_DROP_USER,
MND_AUTH_MAX
} EAuthOp;
typedef enum {
TRN_CONFLICT_NOTHING = 0,
TRN_CONFLICT_GLOBAL = 1,
TRN_CONFLICT_DB = 2,
TRN_CONFLICT_DB_INSIDE = 3,
// TRN_CONFLICT_TOPIC = 4,
// TRN_CONFLICT_TOPIC_INSIDE = 5,
TRN_CONFLICT_ARBGROUP = 6,
TRN_CONFLICT_TSMA = 7,
TRN_CONFLICT_ROLE = 8,
} ETrnConflct;
typedef enum {
TRN_STAGE_PREPARE = 0,
TRN_STAGE_REDO_ACTION = 1,
TRN_STAGE_ROLLBACK = 2,
TRN_STAGE_UNDO_ACTION = 3,
TRN_STAGE_COMMIT = 4,
TRN_STAGE_COMMIT_ACTION = 5,
TRN_STAGE_FINISH = 6,
TRN_STAGE_PRE_FINISH = 7
} ETrnStage;
typedef enum {
TRN_POLICY_ROLLBACK = 0,
TRN_POLICY_RETRY = 1,
} ETrnPolicy;
typedef enum {
TRN_EXEC_PARALLEL = 0,
TRN_EXEC_SERIAL = 1,
TRN_EXEC_GROUP_PARALLEL = 2,
} ETrnExec;
typedef enum {
TRN_KILL_MODE_SKIP = 0,
TRN_KILL_MODE_INTERUPT = 1,
// TRN_KILL_MODE_ROLLBACK = 2,
} ETrnKillMode;
typedef enum {
DND_REASON_ONLINE = 0,
DND_REASON_STATUS_MSG_TIMEOUT,
DND_REASON_STATUS_NOT_RECEIVED,
DND_REASON_VERSION_NOT_MATCH,
DND_REASON_DNODE_ID_NOT_MATCH,
DND_REASON_CLUSTER_ID_NOT_MATCH,
DND_REASON_STATUS_INTERVAL_NOT_MATCH,
DND_REASON_TIME_ZONE_NOT_MATCH,
DND_REASON_LOCALE_NOT_MATCH,
DND_REASON_CHARSET_NOT_MATCH,
DND_REASON_TTL_CHANGE_ON_WRITE_NOT_MATCH,
DND_REASON_ENABLE_WHITELIST_NOT_MATCH,
DND_REASON_ENCRYPTION_KEY_NOT_MATCH,
DND_REASON_STATUS_MONITOR_NOT_MATCH,
DND_REASON_STATUS_MONITOR_SWITCH_NOT_MATCH,
DND_REASON_STATUS_MONITOR_INTERVAL_NOT_MATCH,
DND_REASON_STATUS_MONITOR_SLOW_LOG_THRESHOLD_NOT_MATCH,
DND_REASON_STATUS_MONITOR_SLOW_LOG_SQL_MAX_LEN_NOT_MATCH,
DND_REASON_STATUS_MONITOR_SLOW_LOG_SCOPE_NOT_MATCH,
DND_REASON_OTHERS,
DND_REASON_TIME_UNSYNC
} EDndReason;
typedef enum {
CONSUMER_CLEAR = 0,
CONSUMER_UPDATE_REB, // update after rebalance
CONSUMER_ADD_REB, // add after rebalance
CONSUMER_REMOVE_REB, // remove after rebalance
CONSUMER_UPDATE_REC, // discarded
CONSUMER_UPDATE_SUB, // update after subscribe req
CONSUMER_INSERT_SUB,
} ECsmUpdateType;
typedef struct {
int32_t id;
ETrnStage stage;
ETrnPolicy policy;
ETrnConflct conflict;
ETrnExec exec;
EOperType oper;
bool changeless;
int32_t code;
int32_t failedTimes;
void* rpcRsp;
int32_t rpcRspLen;
int32_t actionPos;
SArray* prepareActions;
SArray* redoActions;
SArray* undoActions;
SArray* commitActions;
int64_t createdTime;
int64_t lastExecTime;
int32_t lastAction;
int32_t lastErrorNo;
SEpSet lastEpset;
tmsg_t lastMsgType;
tmsg_t originRpcType;
char dbname[TSDB_TABLE_FNAME_LEN];
char stbname[TSDB_TABLE_FNAME_LEN];
SHashObj* arbGroupIds;
int32_t startFunc;
int32_t stopFunc;
int32_t paramLen;
void* param;
char opername[TSDB_TRANS_OPER_LEN];
SArray* pRpcArray;
SRWLatch lockRpcArray;
int64_t mTraceId;
TdThreadMutex mutex;
bool ableToBeKilled;
ETrnKillMode killMode;
SHashObj* redoGroupActions;
SHashObj* groupActionPos;
// user data is for upper layer to store some additional infomation in the transaction,
// it is set and interpreted by upper layer, the transaction layer does not care about it.
// if user data is a complex structure, the upper layer should serialize/deserialize it.
// user data should always alloced from heap and can be freed by taosMemoryFree.
void* userData;
int32_t userDataLen;
} STrans;
typedef struct {
int64_t id;
char name[TSDB_CLUSTER_ID_LEN];
int64_t createdTime;
int64_t updateTime;
int32_t upTime;
} SClusterObj;
typedef enum {
TSDB_SECURITY_POLICY_SOD = 1, // Separation of Duties
TSDB_SECURITY_POLICY_MAC = 2, // Mandatory Access Control
} ESecurityPolicyType;
// status field semantics per type:
// SOD: 0 = enabled (default), 1 = mandatory (irreversible)
// MAC: 0 = disabled (default), 1 = mandatory (irreversible)
#define SEC_POLICY_STATUS_DEFAULT 0
#define SEC_POLICY_STATUS_ENFORCED 1
// Legacy aliases kept for readability at call sites
#define SOD_MODE_ENABLED SEC_POLICY_STATUS_DEFAULT
#define SOD_MODE_MANDATORY SEC_POLICY_STATUS_ENFORCED
#define MAC_MODE_DISABLED SEC_POLICY_STATUS_DEFAULT
#define MAC_MODE_MANDATORY SEC_POLICY_STATUS_ENFORCED
typedef struct {
int32_t type; // ESecurityPolicyType — SDB key (SDB_KEY_INT32)
int64_t createdTime;
int64_t updateTime;
int64_t activateTime;
uint8_t status; // SEC_POLICY_STATUS_DEFAULT or SEC_POLICY_STATUS_ENFORCED
char activator[TSDB_USER_LEN];
char reserve[48]; // private data space for future per-type extensions
} SSecurityPolicyObj;
typedef struct {
int32_t id;
int64_t createdTime;
int64_t updateTime;
int64_t rebootTime;
int64_t lastAccessTime;
int32_t accessTimes;
int32_t numOfVnodes;
int32_t numOfOtherNodes;
int32_t numOfSupportVnodes;
int32_t numOfDiskCfg;
float numOfCores;
int64_t memTotal;
int64_t memAvail;
int64_t memUsed;
EDndReason offlineReason;
uint32_t encryptionKeyChksum;
int8_t encryptionKeyStat;
uint16_t port;
char fqdn[TSDB_FQDN_LEN];
char ep[TSDB_EP_LEN];
char machineId[TSDB_MACHINE_ID_LEN + 1];
} SDnodeObj;
typedef struct {
char* name;
int32_t nameLen;
char* pStatus;
char* pNote;
} SAnodeAlgo;
typedef struct {
int32_t id;
int64_t createdTime;
int64_t updateTime;
int32_t version;
int32_t urlLen;
int32_t numOfAlgos;
int32_t status;
SRWLatch lock;
char* url;
SArray** algos;
} SAnodeObj;
/**
* @brief Stucture for XNode object.
*
* This structure represents an XNode in the system, which contains
* information about the node's ID, creation and update times, version,
* URL length, status, and a lock for synchronization.
*/
typedef struct {
int32_t id;
int32_t urlLen;
char* url;
int32_t statusLen;
char* status;
int64_t createTime;
int64_t updateTime;
SRWLatch lock;
} SXnodeObj;
typedef struct {
int32_t id;
int32_t via;
int32_t xnodeId;
int64_t createTime;
int64_t updateTime;
int32_t sourceType;
int32_t sinkType;
int32_t nameLen;
int32_t sourceDsnLen;
int32_t sinkDsnLen;
int32_t parserLen;
int32_t statusLen;
int32_t reasonLen;
int32_t createdByLen;
int32_t labelsLen;
char* name;
char* sourceDsn;
char* sinkDsn;
char* parser;
char* status;
char* reason;
char* createdBy;
char* labels;
SRWLatch lock;
// SArray** labels;
// int32_t numOfLabels;
} SXnodeTaskObj;
typedef struct {
int32_t id;
int32_t taskId;
int32_t configLen;
char* config;
int32_t via;
int32_t xnodeId;
int32_t statusLen;
char* status;
int32_t reasonLen;
char* reason;
int64_t createTime;
int64_t updateTime;
SRWLatch lock;
} SXnodeJobObj;
typedef struct {
int32_t id;
int64_t createTime;
int64_t updateTime;
int32_t nameLen;
int32_t tokenLen;
int32_t statusLen;
char* name;
char* token;
char* status;
SRWLatch lock;
} SXnodeAgentObj;
typedef struct {
int32_t id;
int32_t userLen;
char* user;
int32_t passLen;
char* pass;
int32_t tokenLen;
char* token;
int64_t createTime;
int64_t updateTime;
SRWLatch lock;
} SXnodeUserPassObj;
typedef struct {
int32_t id;
int64_t createdTime;
int64_t updateTime;
ESyncState syncState;
SyncTerm syncTerm;
bool syncRestore;
int64_t roleTimeMs;
SDnodeObj* pDnode;
int32_t role;
SyncIndex lastIndex;
} SMnodeObj;
typedef struct {
int32_t id;
int64_t createdTime;
int64_t updateTime;
SDnodeObj* pDnode;
SQnodeLoad load;
} SQnodeObj;
typedef struct {
int32_t id;
int32_t leadersId[2];
int32_t replicaId;
int64_t createdTime;
int64_t updateTime;
SDnodeObj* pDnode;
} SSnodeObj;
typedef struct {
SSnodeObj* target;
int32_t affNum;
SSnodeObj affSnode[2];
SSnodeObj affNewReplica[2];
} SSnodeDropTraversaCtx;
typedef struct {
int32_t id;
int32_t proto;
int64_t createdTime;
int64_t updateTime;
SDnodeObj* pDnode;
} SBnodeObj;
typedef struct {
char name[TSDB_TOKEN_NAME_LEN];
char token[TSDB_TOKEN_LEN];
char provider[TSDB_TOKEN_PROVIDER_LEN];
char user[TSDB_USER_LEN];
char extraInfo[TSDB_TOKEN_EXTRA_INFO_LEN];
int8_t enabled;
int32_t expireTime; // in seconds
int32_t createdTime; // in seconds
SRWLatch lock;
} STokenObj;
typedef struct {
int32_t assignedDnodeId;
char token[TSDB_ARB_TOKEN_SIZE];
int8_t assignAcked;
} SArbAssignedLeader;
typedef struct {
int32_t dnodeId;
} SArbMemberInfo;
typedef struct {
int32_t nextHbSeq;
int32_t responsedHbSeq;
char token[TSDB_ARB_TOKEN_SIZE];
int64_t lastHbMs;
} SArbMemberState;
typedef struct {
SArbMemberInfo info;
SArbMemberState state;
} SArbGroupMember;
typedef struct {
int32_t vgId;
int64_t dbUid;
SArbGroupMember members[TSDB_ARB_GROUP_MEMBER_NUM];
int8_t isSync;
int32_t code;
int64_t updateTimeMs;
SArbAssignedLeader assignedLeader;
int64_t version;
// following fields will not be duplicated
bool mutexInited;
TdThreadMutex mutex;
} SArbGroup;
typedef struct {
char name[CFG_NAME_MAX_LEN];
ECfgDataType dtype;
union {
bool bval;
float fval;
int32_t i32;
int64_t i64;
char* str;
};
} SConfigObj;
int32_t tEncodeSConfigObj(SEncoder* pEncoder, const SConfigObj* pObj);
int32_t tDecodeSConfigObj(SDecoder* pDecoder, SConfigObj* pObj);
int32_t mndInitConfigObj(SConfigItem* pItem, SConfigObj* pObj);
SConfigObj mndInitConfigVersion();
int32_t mndUpdateObj(SConfigObj* pObj, const char* name, char* value);
void tFreeSConfigObj(SConfigObj* obj);
typedef struct {
int32_t maxUsers;
int32_t maxDbs;
int32_t maxStbs;
int32_t maxTbs;
int32_t maxTimeSeries;
int32_t maxStreams;
int32_t maxFuncs;
int32_t maxConsumers;
int32_t maxConns;
int32_t maxTopics;
int64_t maxStorage;
int32_t accessState; // Configured only by command
} SAcctCfg;
typedef struct {
int32_t numOfUsers;
int32_t numOfDbs;
int32_t numOfTimeSeries;
int32_t numOfStreams;
int64_t totalStorage; // Total storage wrtten from this account
int64_t compStorage; // Compressed storage on disk
} SAcctInfo;
typedef struct {
int32_t lastLoginTime; // in seconds
int32_t lastFailedLoginTime; // in seconds
int32_t failedLoginCount;
} SLoginInfo;
typedef struct {
char acct[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int32_t acctId;
int32_t status;
SAcctCfg cfg;
SAcctInfo info;
} SAcctObj;
typedef struct {
char pass[TSDB_PASSWORD_LEN];
int32_t setTime; // password set time, in seconds
} SUserPassword;
typedef struct {
SHashObj* pReadDbs;
SHashObj* pWriteDbs;
SHashObj* pReadTbs;
SHashObj* pWriteTbs;
SHashObj* pTopics;
SHashObj* pAlterTbs;
SHashObj* pReadViews;
SHashObj* pWriteViews;
SHashObj* pAlterViews;
SHashObj* pUseDbs;
} SPrivHashObjSet;
typedef struct {
union {
char name[TSDB_USER_LEN];
char user[TSDB_USER_LEN];
};
// passwords history, from newest to oldest,
// the latest one is the current password
int32_t numOfPasswords;
SUserPassword* passwords;
char salt[TSDB_PASSWORD_SALT_LEN + 1];
char acct[TSDB_USER_LEN];
char totpsecret[TSDB_TOTP_SECRET_LEN];
int64_t createdTime; // in milliseconds
int64_t updateTime; // in milliseconds
int64_t uid;
int8_t superUser;
int8_t sysInfo;
int8_t enable;
int8_t changePass;
union {
uint8_t flag;
struct {
uint8_t createdb : 1;
uint8_t minSecLevel : 3; // TD: 6671585124
uint8_t maxSecLevel : 3; // TD: 6671585124
uint8_t reserve : 1;
};
};
int32_t sessionPerUser;
int32_t connectTime; // unit is second
int32_t connectIdleTime; // unit is second
int32_t callPerSession;
int32_t vnodePerCall;
int32_t failedLoginAttempts;
int32_t passwordLifeTime; // unit is second
int32_t passwordReuseTime; // unit is second
int32_t passwordReuseMax;
int32_t passwordLockTime; // unit is second
int32_t passwordGraceTime; // unit is second
int32_t inactiveAccountTime; // unit is second
int32_t allowTokenNum;
int32_t tokenNum;
int32_t acctId;
int32_t authVersion;
int32_t passVersion;
int64_t ipWhiteListVer;
SIpWhiteListDual* pIpWhiteListDual;
int64_t timeWhiteListVer;
SDateTimeWhiteList* pTimeWhiteList;
int64_t lastRoleRetrieve; // Last retrieve time of role, unit is ms, default value is 0. Memory only and no need to
// persist.
SHashObj* roles; // k: roleName, v: flag (int8_t: 0x01 enable(default), 0x00 disable)
SPrivSet sysPrivs;
/**
* N.B. The privileges of "select/insert/update/delete tables without row/col/tag conditions" are also
* stored in objPrivs.
*/
SHashObj* objPrivs; // k:EPrivObjType + "." + objName, v: SPrivObjPolicies.
// table level privileges
SHashObj* selectTbs; // k:tbFName 1.db.tbName, v: SPrivTblPolicies
SHashObj* insertTbs; // k:tbFName 1.db.tbName, v: SPrivTblPolicies
SHashObj* updateTbs; // k:tbFName 1.db.tbName, v: SPrivTblPolicies
SHashObj* deleteTbs; // k:tbFName 1.db.tbName, v: SPrivTblPolicies
SHashObj* ownedDbs; // k:dbFName, v: empty
SRWLatch lock;
int8_t passEncryptAlgorithm;
SPrivHashObjSet* legacyPrivs; // used to temporarily hold legacy privileges during upgrade
} SUserObj;
typedef struct {
char name[TSDB_ROLE_LEN];
int64_t createdTime;
int64_t updateTime;
int64_t uid;
int64_t version;
union {
uint8_t flag;
struct {
uint8_t enable : 1;
uint8_t sys : 1; // system role
uint8_t reserve : 6;
};
};
SPrivSet sysPrivs;
/**
* N.B. The privileges of "select/insert/update/delete tables without row/col/tag conditions" are also
* stored in objPrivs.
*/
SHashObj* objPrivs; // k:EPrivObjType + "." + objName, v: SPrivObjPolicies.
// table level privileges combined with row/col/tag conditions
SHashObj* selectTbs; // k:tbFName 1.db.tbName, v: SPrivTblPolicies
SHashObj* insertTbs; // k:tbFName 1.db.tbName, v: SPrivTblPolicies
SHashObj* updateTbs; // k:tbFName 1.db.tbName, v: SPrivTblPolicies
SHashObj* deleteTbs; // k:tbFName 1.db.tbName, v: SPrivTblPolicies
// SHashObj* alterTbs; // k:tbFName, v: empty
// SHashObj* useDbs;
SHashObj* parentRoles; // not supported yet
SHashObj* subRoles; // not supported yet
SRWLatch lock;
} SRoleObj;
typedef struct {
int32_t numOfVgroups;
int32_t numOfStables;
int32_t buffer;
int32_t pageSize;
int32_t pages;
int32_t cacheLastSize;
int32_t cacheLastShardBits; // Number of shards for last cache LRU, -1 for auto
int32_t daysPerFile;
int32_t daysToKeep0;
int32_t daysToKeep1;
int32_t daysToKeep2;
int32_t keepTimeOffset;
int32_t minRows;
int32_t maxRows;
int32_t walFsyncPeriod;
int8_t walLevel;
int8_t precision;
int8_t compression;
int8_t replications;
int8_t strict;
int8_t hashMethod; // default is 1
int8_t cacheLast;
int8_t schemaless;
union {
uint8_t flags;
struct {
uint8_t isMount : 1; // TS-5868
uint8_t allowDrop : 1; // TS-7232
uint8_t securityLevel : 3; // TD: 6671585124
uint8_t padding : 3;
};
};
int16_t hashPrefix;
int16_t hashSuffix;
int16_t sstTrigger;
int32_t tsdbPageSize;
int32_t numOfRetensions;
SArray* pRetensions;
int32_t walRetentionPeriod;
int32_t walRollPeriod;
int64_t walRetentionSize;
int64_t walSegmentSize;
int32_t ssChunkSize;
int32_t ssKeepLocal;
int8_t ssCompact;
int8_t withArbitrator;
int8_t encryptAlgorithm;
int8_t compactTimeOffset; // hour
int32_t compactInterval; // minute
int32_t compactStartTime; // minute
int32_t compactEndTime; // minute
int8_t isAudit;
int8_t secureDelete;
} SDbCfg;
typedef struct {
char name[TSDB_DB_FNAME_LEN];
char acct[TSDB_USER_LEN];
char createUser[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int64_t uid;
int64_t ownerId;
int32_t cfgVersion;
int32_t vgVersion;
SDbCfg cfg;
SRWLatch lock;
int64_t stateTs;
int64_t compactStartTime;
int32_t tsmaVersion;
int64_t scanStartTime;
} SDbObj;
typedef struct {
int64_t uid;
char name[TSDB_DB_FNAME_LEN];
} SMountDbObj;
typedef struct {
char name[TSDB_MOUNT_NAME_LEN];
char acct[TSDB_USER_LEN];
char createUser[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int64_t uid;
int16_t nMounts;
int16_t nDbs;
int32_t* dnodeIds;
char** paths;
SMountDbObj* dbObj;
SRWLatch lock;
} SMountObj;
typedef struct {
int32_t id;
int64_t createdTime;
int64_t updateTime;
int64_t mountTimes;
int64_t umountTimes;
SRWLatch lock;
} SMountLogObj;
typedef struct {
int32_t dnodeId;
ESyncState syncState;
int64_t syncTerm;
int64_t syncAppliedIndex;
int64_t lastSyncAppliedIndexUpdateTime;
double appliedRate;
int64_t syncCommitIndex;
bool syncRestore;
bool syncCanRead;
int64_t roleTimeMs;
int64_t startTimeMs;
ESyncRole nodeRole;
int32_t learnerProgress;
int64_t bufferSegmentUsed;
int64_t bufferSegmentSize;
int32_t snapSeq;
int64_t syncTotalIndex;
} SVnodeGid;
typedef struct {
int32_t vgId;
int64_t createdTime;
int64_t updateTime;
int32_t version;
uint32_t hashBegin;
uint32_t hashEnd;
char dbName[TSDB_DB_FNAME_LEN];
int64_t dbUid;
int64_t cacheUsage;
int64_t numOfTables;
int64_t numOfTimeSeries;
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
int8_t compact;
int8_t isTsma;
int8_t replica;
SVnodeGid vnodeGid[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
void* pTsma;
int32_t numOfCachedTables;
int32_t syncConfChangeVer;
int32_t mountVgId; // TS-5868
int64_t keepVersion; // WAL keep version, -1 for disabled
int64_t keepVersionTime; // WAL keep version time
} SVgObj;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
char dstTbName[TSDB_TABLE_FNAME_LEN];
char createUser[TSDB_USER_LEN];
int64_t createdTime;
int64_t uid;
int64_t stbUid;
int64_t dbUid;
int64_t dstTbUid;
int64_t ownerId;
int8_t intervalUnit;
int8_t slidingUnit;
int8_t timezone; // int8_t is not enough, timezone is unit of second
int32_t dstVgId; // for stream
int64_t interval;
int64_t offset;
int64_t sliding;
int32_t exprLen; // strlen + 1
int32_t tagsFilterLen;
int32_t sqlLen;
int32_t astLen;
int32_t version;
char* expr;
char* tagsFilter;
char* sql;
char* ast;
SSchemaWrapper schemaRow; // for dstVgroup
SSchemaWrapper schemaTag; // for dstVgroup
char baseSmaName[TSDB_TABLE_FNAME_LEN];
} SSmaObj;
typedef struct {
char name[TSDB_TABLE_NAME_LEN];
char tbName[TSDB_TABLE_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
char createUser[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int64_t uid;
int64_t tbUid;
int64_t dbUid;
int64_t ownerId;
int64_t interval[2];
union {
uint64_t reserved;
};
int32_t version;
int8_t tbType;
int8_t intervalUnit;
int16_t nFuncs;
col_id_t* funcColIds;
func_id_t* funcIds;
SRWLatch lock;
} SRsmaObj;
typedef struct {
char name[TSDB_INDEX_FNAME_LEN];
char stb[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
char dstTbName[TSDB_TABLE_FNAME_LEN];
char colName[TSDB_COL_NAME_LEN];
char createUser[TSDB_USER_LEN];
int64_t createdTime;
int64_t uid;
int64_t stbUid;
int64_t dbUid;
int64_t ownerId;
} SIdxObj;
typedef struct {
col_id_t colId;
int32_t cmprAlg;
} SCmprObj;
typedef struct {
char name[TSDB_TABLE_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
char createUser[TSDB_USER_LEN];
int64_t createdTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int64_t ownerId;
int32_t tagVer;
int32_t colVer;
int32_t smaVer;
int32_t nextColId;
int64_t maxdelay[2];
int64_t watermark[2];
int32_t ttl;
int32_t numOfColumns;
int32_t numOfTags;
int32_t numOfFuncs;
int32_t commentLen;
int32_t ast1Len;
int32_t ast2Len;
SArray* pFuncs;
SSchema* pColumns;
SSchema* pTags;
char* comment;
char* pAst1;
char* pAst2;
SRWLatch lock;
int8_t source;
SColCmpr* pCmpr;
int64_t keep;
SExtSchema* pExtSchemas;
int8_t virtualStb;
int8_t secureDelete;
union {
uint32_t flags;
struct {
uint32_t securityLevel : 3; // TD: 6671585124
uint32_t padding : 5;
};
};
} SStbObj;
typedef struct {
char name[TSDB_FUNC_NAME_LEN];
char createUser[TSDB_USER_LEN];
int64_t createdTime;
int8_t funcType;
int8_t scriptType;
int8_t align;
int8_t outputType;
int32_t outputLen;
int32_t bufSize;
int64_t signature;
int32_t commentSize;
int32_t codeSize;
char* pComment;
char* pCode;
int32_t funcVersion;
SRWLatch lock;
} SFuncObj;
typedef struct {
char id[TSDB_INSTANCE_ID_LEN];
char type[TSDB_INSTANCE_TYPE_LEN];
char desc[TSDB_INSTANCE_DESC_LEN];
int64_t firstRegTime;
int64_t lastRegTime;
int32_t expire;
} SInstanceObj;
typedef struct {
int64_t id;
int8_t type;
int8_t replica;
int16_t numOfColumns;
int32_t numOfRows;
int32_t curIterPackedRows;
void* pIter;
SMnode* pMnode;
STableMetaRsp* pMeta;
bool restore;
bool sysDbRsp;
char db[TSDB_DB_FNAME_LEN];
char filterTb[TSDB_TABLE_NAME_LEN];
} SShowObj;
typedef struct {
char name[TSDB_TOPIC_FNAME_LEN];
char db[TSDB_DB_FNAME_LEN];
char createUser[TSDB_USER_LEN];
int64_t createTime;
int64_t updateTime;
int64_t uid;
int64_t dbUid;
int64_t ownerId;
int32_t version;
int8_t subType; // column, db or stable
int8_t withMeta; // TODO
int32_t sqlLen;
char* sql;
char* physicalPlan;
SSchemaWrapper schema;
int64_t stbUid;
char stbName[TSDB_TABLE_FNAME_LEN];
SRWLatch lock; // lock must be at the end for topic update
} SMqTopicObj;
typedef struct {
int64_t consumerId;
char cgroup[TSDB_CGROUP_LEN];
char clientId[TSDB_CLIENT_ID_LEN];
char user[TSDB_USER_LEN];
char fqdn[TSDB_FQDN_LEN];
int8_t updateType; // used only for update
int32_t epoch;
int32_t status;
int32_t hbStatus; // hbStatus is not applicable to serialization
int32_t pollStatus; // pollStatus is not applicable to serialization
SRWLatch lock; // lock is used for topics update
SArray* currentTopics; // SArray<char*>
SArray* rebNewTopics; // SArray<char*>
SArray* rebRemovedTopics; // SArray<char*>
// subscribed by user
SArray* assignedTopics; // SArray<char*>
// data for display
int32_t pid;
SEpSet ep;
int64_t createTime;
int64_t pollTime;
int64_t subscribeTime;
int64_t rebalanceTime;
int8_t withTbName;
int8_t autoCommit;
int32_t autoCommitInterval;
int32_t resetOffsetCfg;
int32_t sessionTimeoutMs;
int32_t maxPollIntervalMs;
} SMqConsumerObj;
int32_t tNewSMqConsumerObj(int64_t consumerId, char* cgroup, int8_t updateType, char* topic, SCMSubscribeReq* subscribe,
SMqConsumerObj** ppConsumer);
void tClearSMqConsumerObj(SMqConsumerObj* pConsumer);
void tDeleteSMqConsumerObj(SMqConsumerObj* pConsumer);
int32_t tEncodeSMqConsumerObj(void** buf, const SMqConsumerObj* pConsumer);
void* tDecodeSMqConsumerObj(const void* buf, SMqConsumerObj* pConsumer, int8_t sver);
typedef struct {
int32_t vgId;
SEpSet epSet;
} SMqVgEp;
int32_t tEncodeSMqVgEp(void** buf, const SMqVgEp* pVgEp);
void* tDecodeSMqVgEp(const void* buf, SMqVgEp* pVgEp, int8_t sver);
typedef struct {
int64_t consumerId; // -1 for unassigned
SArray* vgs; // SArray<SMqVgEp>
SArray* offsetRows; // SArray<OffsetRows>
} SMqConsumerEp;
void freeSMqConsumerEp(void* data);
int32_t tEncodeSMqConsumerEp(void** buf, const SMqConsumerEp* pEp);
void* tDecodeSMqConsumerEp(const void* buf, SMqConsumerEp* pEp, int8_t sver);
typedef struct {
char key[TSDB_SUBSCRIBE_KEY_LEN];
int64_t dbUid;
int32_t vgNum;
int8_t subType;
int8_t withMeta;
int64_t stbUid;
SHashObj* consumerHash; // consumerId -> SMqConsumerEp
SArray* unassignedVgs; // SArray<SMqVgEp>
SArray* offsetRows;
char dbName[TSDB_DB_FNAME_LEN];
SRWLatch lock;
} SMqSubscribeObj;
int32_t tNewSubscribeObj(const char* key, SMqSubscribeObj** ppSub);
int32_t tCloneSubscribeObj(SMqSubscribeObj* pSub, SMqSubscribeObj** ppSub);
void tDeleteSubscribeObj(SMqSubscribeObj* pSub);
int32_t tEncodeSubscribeObj(void** buf, const SMqSubscribeObj* pSub);
void* tDecodeSubscribeObj(const void* buf, SMqSubscribeObj* pSub, int8_t sver);
typedef struct {
int32_t oldConsumerNum;
const SMqRebInfo* pRebInfo;
} SMqRebInputObj;
typedef struct {
int64_t oldConsumerId;
int64_t newConsumerId;
SMqVgEp pVgEp;
} SMqRebOutputVg;
typedef struct {
SArray* rebVgs; // SArray<SMqRebOutputVg>
SArray* newConsumers; // SArray<int64_t>
SArray* removedConsumers; // SArray<int64_t>
SArray* modifyConsumers; // SArray<int64_t>
SMqSubscribeObj* pSub;
bool isReload;
} SMqRebOutputObj;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
char createUser[TSDB_USER_LEN];
SCMCreateStreamReq* pCreate;
SRWLatch lock;
// dynamic info
int32_t mainSnodeId;
int8_t userDropped; // no need to serialize
int8_t userStopped;
int64_t createTime;
int64_t updateTime;
int64_t ownerId;
} SStreamObj;
#if 0
typedef struct SStreamConf {
int8_t igExpired;
int8_t trigger;
int8_t fillHistory;
int64_t triggerParam;
int64_t watermark;
} SStreamConf;
typedef struct {
int32_t vgId;
int64_t createdTime;
int64_t updateTime;
int32_t version;
uint32_t hashBegin;
uint32_t hashEnd;
char dbName[TSDB_DB_FNAME_LEN];
int64_t dbUid;
int64_t cacheUsage;
int64_t numOfTables;
int64_t numOfTimeSeries;
int64_t totalStorage;
int64_t compStorage;
int64_t pointsWritten;
int8_t compact;
int8_t isTsma;
int8_t replica;
SVnodeGid vnodeGid[TSDB_MAX_REPLICA + TSDB_MAX_LEARNER_REPLICA];
void* pTsma;
int32_t numOfCachedTables;
int32_t syncConfChangeVer;
} SVgObj;
typedef struct {
char name[TSDB_STREAM_FNAME_LEN];
SRWLatch lock;
// create info
int64_t createTime;
int64_t updateTime;
int32_t version;
int32_t totalLevel;
int64_t smaId; // 0 for unused
// info
int64_t uid;
int8_t status;
SStreamConf conf;
// source and target
int64_t sourceDbUid;
int64_t targetDbUid;
char sourceDb[TSDB_DB_FNAME_LEN];
char targetDb[TSDB_DB_FNAME_LEN];
char targetSTbName[TSDB_TABLE_FNAME_LEN];
int64_t targetStbUid;
// fixedSinkVg is not applicable for encode and decode
SVgObj fixedSinkVg;
int32_t fixedSinkVgId; // 0 for shuffle
// transformation
char* sql;
char* ast;
char* physicalPlan;
SArray* pTaskList; // SArray<SArray<SStreamTask>>
SArray* pHTaskList; // generate the results for already stored ts data
int64_t hTaskUid; // stream task for history ts data
SSchemaWrapper outputSchema;
SSchemaWrapper tagSchema;
// 3.0.20
int64_t checkpointFreq; // ms
int64_t currentTick; // do not serialize
int64_t deleteMark;
int8_t igCheckUpdate;
// 3.0.5.
int64_t checkpointId;
int32_t indexForMultiAggBalance;
int8_t subTableWithoutMd5;
char reserve[TSDB_RESERVE_VALUE_LEN];
SSHashObj* pVTableMap; // do not serialize
SQueryPlan* pPlan; // do not serialize
} SStreamObj;
#endif
typedef struct SStreamSeq {
char name[24];
uint64_t seq;
SRWLatch lock;
} SStreamSeq;
int32_t tEncodeSStreamObj(SEncoder* pEncoder, const SStreamObj* pObj);
int32_t tDecodeSStreamObj(SDecoder* pDecoder, SStreamObj* pObj, int32_t sver);
void tFreeStreamObj(SStreamObj* pObj);
#define VIEW_TYPE_UPDATABLE (1 << 0)
#define VIEW_TYPE_MATERIALIZED (1 << 1)
typedef struct {
char fullname[TSDB_VIEW_FNAME_LEN];
char name[TSDB_VIEW_NAME_LEN];
char dbFName[TSDB_DB_FNAME_LEN];
char createUser[TSDB_USER_LEN];
char* querySql;
char* parameters;
void** defaultValues;
char* targetTable;
uint64_t viewId;
uint64_t dbId;
int64_t ownerId;
int64_t createdTime;
int32_t version;
int8_t precision;
int8_t type;
int32_t numOfCols;
SSchema* pSchema;
SRWLatch lock;
} SViewObj;
int32_t tEncodeSViewObj(SEncoder* pEncoder, const SViewObj* pObj);
int32_t tDecodeSViewObj(SDecoder* pDecoder, SViewObj* pObj, int32_t sver);
void tFreeSViewObj(SViewObj* pObj);
typedef struct {
union {
int32_t compactDetailId;
int32_t detailId;
};
union {
int32_t compactId;
int32_t id;
};
int32_t vgId;
int32_t dnodeId;
int32_t numberFileset;
int32_t finished;
int64_t startTime;
int32_t newNumberFileset;
int32_t newFinished;
int32_t progress;
int64_t remainingTime;
} SCompactDetailObj;
typedef struct {
int32_t scanDetailId;
int32_t scanId;
int32_t vgId;
int32_t dnodeId;
int32_t numberFileset;
int32_t finished;
int64_t startTime;
int32_t newNumberFileset;
int32_t newFinished;
int32_t progress;
int64_t remainingTime;
} SScanDetailObj;
typedef struct {
union {
int32_t compactId;
int32_t id;
};
char dbname[TSDB_TABLE_FNAME_LEN];
int64_t dbUid;
int64_t startTime;
SArray* compactDetail;
union {
uint32_t flags;
struct {
uint32_t optrType : 3; // ETsdbOpType
uint32_t triggerType : 1; // ETriggerType
uint32_t reserve : 28;
};
};
} SCompactObj;
typedef struct {
int32_t id;
char algorithm_id[TSDB_ENCRYPT_ALGR_NAME_LEN];
char name[TSDB_ENCRYPT_ALGR_NAME_LEN];
char desc[TSDB_ENCRYPT_ALGR_DESC_LEN];
int16_t type;
int8_t source;
char ossl_algr_name[TSDB_ENCRYPT_ALGR_NAME_LEN];
} SEncryptAlgrObj;
typedef struct {
int32_t scanId;
char dbname[TSDB_TABLE_FNAME_LEN];
int64_t dbUid;
int64_t startTime;
SArray* scanDetail;
} SScanObj;
typedef SCompactObj SRetentionObj; // reuse compact obj for retention
typedef SCompactDetailObj SRetentionDetailObj; // reuse compact detail obj for retention
typedef struct {
int32_t nodeId; // dnode id of the leader vnode
int32_t vgId;
int32_t fid; // file set id
int32_t state;
int64_t startTime; // migration start time of this file set in seconds
} SSsMigrateFileSet;
typedef enum {
SSMIGRATE_VGSTATE_INIT = 0, // initial state
SSMIGRATE_VGSTATE_WAITING_FSET_LIST = 1, // waiting for file set list
SSMIGRATE_VGSTATE_FSET_LIST_RECEIVED = 2, // file set list received
SSMIGRATE_VGSTATE_FSET_STARTING = 3, // fset ssmigrate request was sent, waiting for response
SSMIGRATE_VGSTATE_FSET_STARTED = 4, // fset ssmigrate response received
} ESsMigrateVgroupState;
typedef struct {
int32_t id; // migration id
int64_t dbUid;
char dbname[TSDB_TABLE_FNAME_LEN];
int64_t startTime; // migration start time in seconds
int64_t stateUpdateTime; // last state(vgState or currFest.state) update time in seconds
int32_t vgIdx; // index of current vgroup
int32_t vgState; // vgroup migration state
int32_t fsetIdx; // index of current file set
SSsMigrateFileSet currFset; // current file set being processed
SArray* vgroups; // SArray<int32_t>, vgroup ids of current migration
SArray* fileSets; // SArray<int32_t>, file set ids of current vgroup
} SSsMigrateObj;
// SGrantLogObj
typedef enum {
GRANT_STATE_INIT = 0,
GRANT_STATE_UNGRANTED = 1,
GRANT_STATE_GRANTED = 2,
GRANT_STATE_EXPIRED = 3,
GRANT_STATE_REVOKED = 4,
GRANT_STATE_MAX,
} EGrantState;
typedef enum {
GRANT_STATE_REASON_INIT = 0,
GRANT_STATE_REASON_ALTER = 1, // alter activeCode 'revoked' or 'xxx'
GRANT_STATE_REASON_MISMATCH = 2, // dnode machine mismatch
GRANT_STATE_REASON_EXPIRE = 3, // expire
GRANT_STATE_REASON_MAX,
} EGrantStateReason;
#define GRANT_STATE_NUM 30
#define GRANT_ACTIVE_NUM 10
#define GRANT_ACTIVE_HEAD_LEN 30
#define GRANT_ACTIVE_SIGN_LEN 30
typedef struct {
union {
int64_t u0;
struct {
int64_t ts : 40;
int64_t lastState : 4;
int64_t state : 4;
int64_t reason : 8;
int64_t reserve : 8;
};
};
} SGrantState;
typedef struct {
union {
int64_t u0;
struct {
int64_t ts : 40;
int64_t reserve : 24;
};
};
char active[GRANT_ACTIVE_HEAD_LEN + 1];
} SGrantActive;
typedef struct {
union {
int64_t u0;
struct {
int64_t ts : 40;
int64_t id : 24;
};
};
char machine[TSDB_MACHINE_ID_LEN + 1];
} SGrantMachine;
typedef struct {
int32_t id;
int8_t nStates;
int8_t nActives;
int64_t createTime;
int64_t updateTime;
int64_t upgradeTime;
SGrantState states[GRANT_STATE_NUM];
SGrantActive actives[GRANT_ACTIVE_NUM];
char* active;
SArray* pMachines; // SGrantMachine
SRWLatch lock;
} SGrantLogObj;
#ifdef __cplusplus
}
#endif
#endif /*_TD_MND_DEF_H_*/