TDengine/source/dnode/mnode/impl/src/mndFunc.c

765 lines
25 KiB
C
Raw Normal View History

2021-09-22 08:15:20 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
2021-10-16 07:16:05 +00:00
#define _DEFAULT_SOURCE
2021-12-08 05:48:18 +00:00
#include "mndFunc.h"
2022-06-25 01:09:33 +00:00
#include "mndPrivilege.h"
2021-12-08 05:48:18 +00:00
#include "mndShow.h"
#include "mndSync.h"
#include "mndTrans.h"
2022-02-12 08:28:50 +00:00
#include "mndUser.h"
2021-10-17 03:42:05 +00:00
2023-04-04 11:53:08 +00:00
#define SDB_FUNC_VER 2
2022-01-26 01:56:24 +00:00
#define SDB_FUNC_RESERVE_SIZE 64
2021-12-08 05:48:18 +00:00
static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc);
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw);
static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc);
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc);
2022-01-24 05:59:09 +00:00
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew);
2022-05-16 06:55:31 +00:00
static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCreate);
static int32_t mndDropFunc(SMnode *pMnode, SRpcMsg *pReq, SFuncObj *pFunc);
static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq);
static int32_t mndProcessDropFuncReq(SRpcMsg *pReq);
static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq);
static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows);
2021-12-08 05:48:18 +00:00
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter);
int32_t mndInitFunc(SMnode *pMnode) {
2022-09-26 08:18:48 +00:00
SSdbTable table = {
.sdbType = SDB_FUNC,
.keyType = SDB_KEY_BINARY,
.encodeFp = (SdbEncodeFp)mndFuncActionEncode,
.decodeFp = (SdbDecodeFp)mndFuncActionDecode,
.insertFp = (SdbInsertFp)mndFuncActionInsert,
.updateFp = (SdbUpdateFp)mndFuncActionUpdate,
.deleteFp = (SdbDeleteFp)mndFuncActionDelete,
};
2021-12-08 05:48:18 +00:00
2022-01-24 02:25:02 +00:00
mndSetMsgHandle(pMnode, TDMT_MND_CREATE_FUNC, mndProcessCreateFuncReq);
mndSetMsgHandle(pMnode, TDMT_MND_DROP_FUNC, mndProcessDropFuncReq);
mndSetMsgHandle(pMnode, TDMT_MND_RETRIEVE_FUNC, mndProcessRetrieveFuncReq);
2021-12-08 05:48:18 +00:00
2022-01-24 02:25:02 +00:00
mndAddShowRetrieveHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndRetrieveFuncs);
mndAddShowFreeIterHandle(pMnode, TSDB_MGMT_TABLE_FUNC, mndCancelGetNextFunc);
2021-12-10 06:12:11 +00:00
2021-12-08 05:48:18 +00:00
return sdbSetTable(pMnode->pSdb, table);
}
void mndCleanupFunc(SMnode *pMnode) {}
static SSdbRaw *mndFuncActionEncode(SFuncObj *pFunc) {
2024-07-22 08:33:42 +00:00
int32_t code = 0;
int32_t lino = 0;
2021-12-31 06:22:50 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-01-26 03:33:29 +00:00
int32_t size = pFunc->commentSize + pFunc->codeSize + sizeof(SFuncObj) + SDB_FUNC_RESERVE_SIZE;
2021-12-08 05:48:18 +00:00
SSdbRaw *pRaw = sdbAllocRaw(SDB_FUNC, SDB_FUNC_VER, size);
if (pRaw == NULL) goto _OVER;
2021-12-08 05:48:18 +00:00
int32_t dataPos = 0;
SDB_SET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, _OVER)
SDB_SET_INT64(pRaw, dataPos, pFunc->createdTime, _OVER)
SDB_SET_INT8(pRaw, dataPos, pFunc->funcType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pFunc->scriptType, _OVER)
SDB_SET_INT8(pRaw, dataPos, pFunc->align, _OVER)
SDB_SET_INT8(pRaw, dataPos, pFunc->outputType, _OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->outputLen, _OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->bufSize, _OVER)
SDB_SET_INT64(pRaw, dataPos, pFunc->signature, _OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->commentSize, _OVER)
SDB_SET_INT32(pRaw, dataPos, pFunc->codeSize, _OVER)
2022-04-20 09:43:02 +00:00
if (pFunc->commentSize > 0) {
SDB_SET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
}
SDB_SET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
2023-04-06 07:58:53 +00:00
SDB_SET_INT32(pRaw, dataPos, pFunc->funcVersion, _OVER)
SDB_SET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
SDB_SET_DATALEN(pRaw, dataPos, _OVER);
2021-12-31 06:22:50 +00:00
terrno = 0;
_OVER:
2021-12-31 06:22:50 +00:00
if (terrno != 0) {
mError("func:%s, failed to encode to raw:%p since %s", pFunc->name, pRaw, terrstr());
sdbFreeRaw(pRaw);
return NULL;
}
2021-12-08 05:48:18 +00:00
2021-12-31 06:32:59 +00:00
mTrace("func:%s, encode to raw:%p, row:%p", pFunc->name, pRaw, pFunc);
2021-12-08 05:48:18 +00:00
return pRaw;
}
static SSdbRow *mndFuncActionDecode(SSdbRaw *pRaw) {
2024-07-22 08:33:42 +00:00
int32_t code = 0;
int32_t lino = 0;
2021-12-31 06:22:50 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-12-01 08:04:39 +00:00
SSdbRow *pRow = NULL;
SFuncObj *pFunc = NULL;
2021-12-31 06:22:50 +00:00
2021-12-08 05:48:18 +00:00
int8_t sver = 0;
if (sdbGetRawSoftVer(pRaw, &sver) != 0) goto _OVER;
2021-12-08 05:48:18 +00:00
2023-04-04 11:53:08 +00:00
if (sver != 1 && sver != 2) {
2021-12-08 05:48:18 +00:00
terrno = TSDB_CODE_SDB_INVALID_DATA_VER;
goto _OVER;
2021-12-08 05:48:18 +00:00
}
2022-12-01 08:04:39 +00:00
pRow = sdbAllocRow(sizeof(SFuncObj));
if (pRow == NULL) goto _OVER;
2021-12-31 06:22:50 +00:00
2022-12-01 08:04:39 +00:00
pFunc = sdbGetRowObj(pRow);
if (pFunc == NULL) goto _OVER;
2021-12-08 05:48:18 +00:00
int32_t dataPos = 0;
SDB_GET_BINARY(pRaw, dataPos, pFunc->name, TSDB_FUNC_NAME_LEN, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pFunc->createdTime, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pFunc->funcType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pFunc->scriptType, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pFunc->align, _OVER)
SDB_GET_INT8(pRaw, dataPos, &pFunc->outputType, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->outputLen, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->bufSize, _OVER)
SDB_GET_INT64(pRaw, dataPos, &pFunc->signature, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->commentSize, _OVER)
SDB_GET_INT32(pRaw, dataPos, &pFunc->codeSize, _OVER)
2022-01-24 05:59:09 +00:00
2022-04-20 09:43:02 +00:00
if (pFunc->commentSize > 0) {
pFunc->pComment = taosMemoryCalloc(1, pFunc->commentSize);
if (pFunc->pComment == NULL) {
goto _OVER;
}
SDB_GET_BINARY(pRaw, dataPos, pFunc->pComment, pFunc->commentSize, _OVER)
}
2022-03-25 16:29:53 +00:00
pFunc->pCode = taosMemoryCalloc(1, pFunc->codeSize);
2022-04-20 09:43:02 +00:00
if (pFunc->pCode == NULL) {
goto _OVER;
2022-01-24 05:59:09 +00:00
}
SDB_GET_BINARY(pRaw, dataPos, pFunc->pCode, pFunc->codeSize, _OVER)
if (sver >= 2) {
2023-04-06 07:58:53 +00:00
SDB_GET_INT32(pRaw, dataPos, &pFunc->funcVersion, _OVER)
2023-04-04 11:53:08 +00:00
}
SDB_GET_RESERVE(pRaw, dataPos, SDB_FUNC_RESERVE_SIZE, _OVER)
2021-12-08 05:48:18 +00:00
2023-04-10 08:40:16 +00:00
taosInitRWLatch(&pFunc->lock);
2021-12-31 06:22:50 +00:00
terrno = 0;
_OVER:
2021-12-31 06:22:50 +00:00
if (terrno != 0) {
2022-12-01 08:04:39 +00:00
mError("func:%s, failed to decode from raw:%p since %s", pFunc == NULL ? "null" : pFunc->name, pRaw, terrstr());
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(pRow);
2021-12-31 06:22:50 +00:00
return NULL;
}
mTrace("func:%s, decode from raw:%p, row:%p", pFunc->name, pRaw, pFunc);
2021-12-08 05:48:18 +00:00
return pRow;
}
static int32_t mndFuncActionInsert(SSdb *pSdb, SFuncObj *pFunc) {
2021-12-31 06:22:50 +00:00
mTrace("func:%s, perform insert action, row:%p", pFunc->name, pFunc);
2021-12-08 05:48:18 +00:00
return 0;
}
static int32_t mndFuncActionDelete(SSdb *pSdb, SFuncObj *pFunc) {
2021-12-31 06:22:50 +00:00
mTrace("func:%s, perform delete action, row:%p", pFunc->name, pFunc);
2022-03-25 16:29:53 +00:00
taosMemoryFreeClear(pFunc->pCode);
taosMemoryFreeClear(pFunc->pComment);
2021-12-08 05:48:18 +00:00
return 0;
}
2022-01-24 05:59:09 +00:00
static int32_t mndFuncActionUpdate(SSdb *pSdb, SFuncObj *pOld, SFuncObj *pNew) {
2024-09-26 06:12:54 +00:00
int32_t code = 0;
2022-01-24 05:59:09 +00:00
mTrace("func:%s, perform update action, old row:%p new row:%p", pOld->name, pOld, pNew);
2023-04-10 08:40:16 +00:00
taosWLockLatch(&pOld->lock);
pOld->align = pNew->align;
pOld->bufSize = pNew->bufSize;
pOld->codeSize = pNew->codeSize;
pOld->commentSize = pNew->commentSize;
pOld->createdTime = pNew->createdTime;
pOld->funcType = pNew->funcType;
pOld->funcVersion = pNew->funcVersion;
pOld->outputLen = pNew->outputLen;
pOld->outputType = pNew->outputType;
if (pOld->pComment != NULL) {
2023-04-10 08:40:16 +00:00
taosMemoryFree(pOld->pComment);
pOld->pComment = NULL;
}
if (pNew->commentSize > 0 && pNew->pComment != NULL) {
2023-04-10 08:40:16 +00:00
pOld->commentSize = pNew->commentSize;
pOld->pComment = taosMemoryMalloc(pOld->commentSize);
2024-09-26 06:12:54 +00:00
if (pOld->pComment == NULL) {
code = terrno;
taosWUnLockLatch(&pOld->lock);
return code;
}
2024-07-25 09:49:58 +00:00
(void)memcpy(pOld->pComment, pNew->pComment, pOld->commentSize);
2023-04-10 08:40:16 +00:00
}
if (pOld->pCode != NULL) {
2023-04-10 08:40:16 +00:00
taosMemoryFree(pOld->pCode);
pOld->pCode = NULL;
}
if (pNew->codeSize > 0 && pNew->pCode != NULL) {
2023-04-10 08:40:16 +00:00
pOld->codeSize = pNew->codeSize;
pOld->pCode = taosMemoryMalloc(pOld->codeSize);
2024-09-26 06:12:54 +00:00
if (pOld->pCode == NULL) {
code = terrno;
taosWUnLockLatch(&pOld->lock);
return code;
}
2024-07-25 09:49:58 +00:00
(void)memcpy(pOld->pCode, pNew->pCode, pOld->codeSize);
2023-04-10 08:40:16 +00:00
}
pOld->scriptType = pNew->scriptType;
pOld->signature = pNew->signature;
2023-04-10 08:40:16 +00:00
taosWUnLockLatch(&pOld->lock);
2021-12-08 05:48:18 +00:00
return 0;
}
2022-01-24 05:59:09 +00:00
static SFuncObj *mndAcquireFunc(SMnode *pMnode, char *funcName) {
2024-07-20 01:25:18 +00:00
terrno = 0;
2022-01-24 05:59:09 +00:00
SSdb *pSdb = pMnode->pSdb;
SFuncObj *pFunc = sdbAcquire(pSdb, SDB_FUNC, funcName);
if (pFunc == NULL && terrno == TSDB_CODE_SDB_OBJ_NOT_THERE) {
terrno = TSDB_CODE_MND_FUNC_NOT_EXIST;
}
return pFunc;
}
2021-12-08 05:48:18 +00:00
2022-01-24 05:59:09 +00:00
static void mndReleaseFunc(SMnode *pMnode, SFuncObj *pFunc) {
SSdb *pSdb = pMnode->pSdb;
sdbRelease(pSdb, pFunc);
}
2022-05-16 06:55:31 +00:00
static int32_t mndCreateFunc(SMnode *pMnode, SRpcMsg *pReq, SCreateFuncReq *pCreate) {
2022-01-24 05:59:09 +00:00
int32_t code = -1;
STrans *pTrans = NULL;
2024-07-20 01:25:18 +00:00
if ((code = grantCheck(TSDB_GRANT_USER)) < 0) {
2022-07-23 11:38:41 +00:00
return code;
}
2022-01-24 05:59:09 +00:00
SFuncObj func = {0};
2024-07-25 09:49:58 +00:00
(void)memcpy(func.name, pCreate->name, TSDB_FUNC_NAME_LEN);
2022-01-24 05:59:09 +00:00
func.createdTime = taosGetTimestampMs();
func.funcType = pCreate->funcType;
func.scriptType = pCreate->scriptType;
func.outputType = pCreate->outputType;
func.outputLen = pCreate->outputLen;
func.bufSize = pCreate->bufSize;
func.signature = pCreate->signature;
2022-04-20 09:43:02 +00:00
if (NULL != pCreate->pComment) {
func.commentSize = strlen(pCreate->pComment) + 1;
func.pComment = taosMemoryMalloc(func.commentSize);
2024-09-26 06:12:54 +00:00
if (func.pComment == NULL) {
code = terrno;
goto _OVER;
}
2022-04-20 09:43:02 +00:00
}
func.codeSize = pCreate->codeLen;
2022-03-25 16:29:53 +00:00
func.pCode = taosMemoryMalloc(func.codeSize);
2022-01-24 05:59:09 +00:00
if (func.pCode == NULL || func.pCode == NULL) {
2024-09-12 07:46:30 +00:00
code = terrno;
goto _OVER;
2021-12-08 05:48:18 +00:00
}
2022-04-20 09:43:02 +00:00
if (func.commentSize > 0) {
2024-07-25 09:49:58 +00:00
(void)memcpy(func.pComment, pCreate->pComment, func.commentSize);
2022-04-20 09:43:02 +00:00
}
2024-07-25 09:49:58 +00:00
(void)memcpy(func.pCode, pCreate->pCode, func.codeSize);
2022-01-24 05:59:09 +00:00
2022-09-22 08:18:51 +00:00
pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "create-func");
2024-07-20 01:25:18 +00:00
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
2022-09-23 07:42:36 +00:00
mInfo("trans:%d, used to create func:%s", pTrans->id, pCreate->name);
2021-12-08 05:48:18 +00:00
2023-04-10 08:40:16 +00:00
SFuncObj *oldFunc = mndAcquireFunc(pMnode, pCreate->name);
if (pCreate->orReplace == 1 && oldFunc != NULL) {
2023-04-10 08:40:16 +00:00
func.funcVersion = oldFunc->funcVersion + 1;
func.createdTime = oldFunc->createdTime;
2021-12-08 05:48:18 +00:00
2023-04-10 08:40:16 +00:00
SSdbRaw *pRedoRaw = mndFuncActionEncode(oldFunc);
2024-07-20 01:25:18 +00:00
if (pRedoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRedoRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pRedoRaw, SDB_STATUS_READY), NULL, _OVER);
2021-12-08 05:48:18 +00:00
2023-04-10 08:40:16 +00:00
SSdbRaw *pUndoRaw = mndFuncActionEncode(oldFunc);
2024-07-20 01:25:18 +00:00
if (pUndoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendUndolog(pTrans, pUndoRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY), NULL, _OVER);
2023-04-10 08:40:16 +00:00
SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
2024-07-20 01:25:18 +00:00
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pCommitRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY), NULL, _OVER);
} else {
2023-04-10 08:40:16 +00:00
SSdbRaw *pRedoRaw = mndFuncActionEncode(&func);
2024-07-20 01:25:18 +00:00
if (pRedoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRedoRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pRedoRaw, SDB_STATUS_CREATING), NULL, _OVER);
2021-12-08 05:48:18 +00:00
2023-04-10 08:40:16 +00:00
SSdbRaw *pUndoRaw = mndFuncActionEncode(&func);
2024-07-20 01:25:18 +00:00
if (pUndoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendUndolog(pTrans, pUndoRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pUndoRaw, SDB_STATUS_DROPPED), NULL, _OVER);
2021-12-08 05:48:18 +00:00
2023-04-10 08:40:16 +00:00
SSdbRaw *pCommitRaw = mndFuncActionEncode(&func);
2024-07-20 01:25:18 +00:00
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pCommitRaw), NULL, _OVER);
TAOS_CHECK_GOTO(sdbSetRawStatus(pCommitRaw, SDB_STATUS_READY), NULL, _OVER);
2023-04-10 08:40:16 +00:00
}
2021-12-08 05:48:18 +00:00
2024-07-20 01:25:18 +00:00
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
2021-12-08 05:48:18 +00:00
2022-01-24 05:59:09 +00:00
code = 0;
_OVER:
if (oldFunc != NULL) {
2023-04-10 08:40:16 +00:00
mndReleaseFunc(pMnode, oldFunc);
}
2022-03-25 16:29:53 +00:00
taosMemoryFree(func.pCode);
taosMemoryFree(func.pComment);
2021-12-08 05:48:18 +00:00
mndTransDrop(pTrans);
2024-07-20 01:25:18 +00:00
TAOS_RETURN(code);
2021-12-08 05:48:18 +00:00
}
2022-05-16 06:55:31 +00:00
static int32_t mndDropFunc(SMnode *pMnode, SRpcMsg *pReq, SFuncObj *pFunc) {
2022-01-24 05:59:09 +00:00
int32_t code = -1;
2022-09-22 08:18:51 +00:00
STrans *pTrans = mndTransCreate(pMnode, TRN_POLICY_ROLLBACK, TRN_CONFLICT_NOTHING, pReq, "drop-func");
2024-07-20 01:25:18 +00:00
if (pTrans == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
2022-01-24 05:59:09 +00:00
2022-09-23 07:42:36 +00:00
mInfo("trans:%d, used to drop user:%s", pTrans->id, pFunc->name);
2021-12-08 05:48:18 +00:00
SSdbRaw *pRedoRaw = mndFuncActionEncode(pFunc);
2024-07-20 01:25:18 +00:00
if (pRedoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendRedolog(pTrans, pRedoRaw), NULL, _OVER);
2024-09-05 11:39:49 +00:00
TAOS_CHECK_GOTO(sdbSetRawStatus(pRedoRaw, SDB_STATUS_DROPPING), NULL, _OVER);
2021-12-08 05:48:18 +00:00
SSdbRaw *pUndoRaw = mndFuncActionEncode(pFunc);
2024-07-20 01:25:18 +00:00
if (pUndoRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendUndolog(pTrans, pUndoRaw), NULL, _OVER);
2024-09-05 11:39:49 +00:00
TAOS_CHECK_GOTO(sdbSetRawStatus(pUndoRaw, SDB_STATUS_READY), NULL, _OVER);
2021-12-08 05:48:18 +00:00
SSdbRaw *pCommitRaw = mndFuncActionEncode(pFunc);
2024-07-20 01:25:18 +00:00
if (pCommitRaw == NULL) {
code = TSDB_CODE_MND_RETURN_VALUE_NULL;
if (terrno != 0) code = terrno;
goto _OVER;
}
TAOS_CHECK_GOTO(mndTransAppendCommitlog(pTrans, pCommitRaw), NULL, _OVER);
2024-09-05 11:39:49 +00:00
TAOS_CHECK_GOTO(sdbSetRawStatus(pCommitRaw, SDB_STATUS_DROPPED), NULL, _OVER);
2021-12-08 05:48:18 +00:00
2024-07-20 01:25:18 +00:00
TAOS_CHECK_GOTO(mndTransPrepare(pMnode, pTrans), NULL, _OVER);
2022-01-24 05:59:09 +00:00
code = 0;
2021-12-08 05:48:18 +00:00
_OVER:
2021-12-08 05:48:18 +00:00
mndTransDrop(pTrans);
2022-01-24 05:59:09 +00:00
return code;
2021-12-08 05:48:18 +00:00
}
2022-05-16 06:55:31 +00:00
static int32_t mndProcessCreateFuncReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
2022-02-12 08:28:50 +00:00
int32_t code = -1;
SFuncObj *pFunc = NULL;
SCreateFuncReq createReq = {0};
2024-07-20 01:25:18 +00:00
TAOS_CHECK_GOTO(tDeserializeSCreateFuncReq(pReq->pCont, pReq->contLen, &createReq), NULL, _OVER);
#ifdef WINDOWS
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_MND_INVALID_PLATFORM;
goto _OVER;
#endif
2023-01-11 08:37:45 +00:00
mInfo("func:%s, start to create, size:%d", createReq.name, createReq.codeLen);
2024-07-20 01:25:18 +00:00
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_CREATE_FUNC), NULL, _OVER);
2021-12-08 05:48:18 +00:00
2022-02-12 08:28:50 +00:00
pFunc = mndAcquireFunc(pMnode, createReq.name);
2021-12-08 05:48:18 +00:00
if (pFunc != NULL) {
2022-02-12 08:28:50 +00:00
if (createReq.igExists) {
2022-09-23 07:42:36 +00:00
mInfo("func:%s, already exist, ignore exist is set", createReq.name);
2022-02-12 08:28:50 +00:00
code = 0;
goto _OVER;
2023-04-10 03:33:25 +00:00
} else if (createReq.orReplace) {
mInfo("func:%s, replace function is set", createReq.name);
code = 0;
2022-01-24 05:59:09 +00:00
} else {
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_MND_FUNC_ALREADY_EXIST;
goto _OVER;
2022-01-24 05:59:09 +00:00
}
2022-01-24 09:14:31 +00:00
} else if (terrno == TSDB_CODE_MND_FUNC_ALREADY_EXIST) {
goto _OVER;
2021-12-08 05:48:18 +00:00
}
2022-02-12 08:28:50 +00:00
if (createReq.name[0] == 0) {
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_MND_INVALID_FUNC_NAME;
goto _OVER;
2021-12-08 05:48:18 +00:00
}
if (createReq.pCode == NULL) {
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_MND_INVALID_FUNC_CODE;
goto _OVER;
2021-12-08 05:48:18 +00:00
}
if (createReq.codeLen <= 1) {
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_MND_INVALID_FUNC_CODE;
goto _OVER;
2021-12-08 05:48:18 +00:00
}
if (createReq.bufSize < 0 || createReq.bufSize > TSDB_FUNC_BUF_SIZE) {
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_MND_INVALID_FUNC_BUFSIZE;
goto _OVER;
2022-02-12 08:28:50 +00:00
}
code = mndCreateFunc(pMnode, pReq, &createReq);
2022-05-21 08:35:24 +00:00
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
2022-02-12 08:28:50 +00:00
_OVER:
2022-05-21 08:35:24 +00:00
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2024-07-22 05:24:39 +00:00
mError("func:%s, failed to create since %s", createReq.name, tstrerror(code));
2021-12-08 05:48:18 +00:00
}
2022-02-12 08:28:50 +00:00
mndReleaseFunc(pMnode, pFunc);
tFreeSCreateFuncReq(&createReq);
2024-07-20 01:25:18 +00:00
TAOS_RETURN(code);
2021-12-08 05:48:18 +00:00
}
2022-05-16 06:55:31 +00:00
static int32_t mndProcessDropFuncReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
2022-02-12 08:28:50 +00:00
int32_t code = -1;
SFuncObj *pFunc = NULL;
SDropFuncReq dropReq = {0};
2024-07-20 01:25:18 +00:00
TAOS_CHECK_GOTO(tDeserializeSDropFuncReq(pReq->pCont, pReq->contLen, &dropReq), NULL, _OVER);
2021-12-08 05:48:18 +00:00
2022-09-23 07:42:36 +00:00
mInfo("func:%s, start to drop", dropReq.name);
2024-07-20 01:25:18 +00:00
TAOS_CHECK_GOTO(mndCheckOperPrivilege(pMnode, pReq->info.conn.user, MND_OPER_DROP_FUNC), NULL, _OVER);
2021-12-08 05:48:18 +00:00
2022-02-12 08:28:50 +00:00
if (dropReq.name[0] == 0) {
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_MND_INVALID_FUNC_NAME;
goto _OVER;
2021-12-08 05:48:18 +00:00
}
2022-02-12 08:28:50 +00:00
pFunc = mndAcquireFunc(pMnode, dropReq.name);
2021-12-08 05:48:18 +00:00
if (pFunc == NULL) {
2022-02-12 08:28:50 +00:00
if (dropReq.igNotExists) {
2022-09-23 07:42:36 +00:00
mInfo("func:%s, not exist, ignore not exist is set", dropReq.name);
2022-02-12 08:28:50 +00:00
code = 0;
goto _OVER;
2022-01-24 05:59:09 +00:00
} else {
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_MND_FUNC_NOT_EXIST;
goto _OVER;
2022-01-24 05:59:09 +00:00
}
2021-12-08 05:48:18 +00:00
}
2022-02-12 08:28:50 +00:00
code = mndDropFunc(pMnode, pReq, pFunc);
2022-05-21 08:35:24 +00:00
if (code == 0) code = TSDB_CODE_ACTION_IN_PROGRESS;
2022-01-24 06:18:36 +00:00
_OVER:
2022-05-21 08:35:24 +00:00
if (code != 0 && code != TSDB_CODE_ACTION_IN_PROGRESS) {
2024-07-22 05:24:39 +00:00
mError("func:%s, failed to drop since %s", dropReq.name, tstrerror(code));
2021-12-08 05:48:18 +00:00
}
2022-02-12 08:28:50 +00:00
mndReleaseFunc(pMnode, pFunc);
2024-07-20 01:25:18 +00:00
TAOS_RETURN(code);
2021-12-08 05:48:18 +00:00
}
2022-05-16 06:55:31 +00:00
static int32_t mndProcessRetrieveFuncReq(SRpcMsg *pReq) {
SMnode *pMnode = pReq->info.node;
2022-02-12 08:28:50 +00:00
int32_t code = -1;
SRetrieveFuncReq retrieveReq = {0};
SRetrieveFuncRsp retrieveRsp = {0};
2022-05-16 06:55:31 +00:00
if (tDeserializeSRetrieveFuncReq(pReq->pCont, pReq->contLen, &retrieveReq) != 0) {
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_INVALID_MSG;
2022-02-12 08:28:50 +00:00
goto RETRIEVE_FUNC_OVER;
}
2021-12-08 05:48:18 +00:00
2022-02-12 08:28:50 +00:00
if (retrieveReq.numOfFuncs <= 0 || retrieveReq.numOfFuncs > TSDB_FUNC_MAX_RETRIEVE) {
2024-07-20 01:25:18 +00:00
code = TSDB_CODE_MND_INVALID_FUNC_RETRIEVE;
2022-02-12 08:28:50 +00:00
goto RETRIEVE_FUNC_OVER;
2022-01-24 09:14:31 +00:00
}
2021-12-08 05:48:18 +00:00
2022-02-12 08:28:50 +00:00
retrieveRsp.numOfFuncs = retrieveReq.numOfFuncs;
retrieveRsp.pFuncInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncInfo));
if (retrieveRsp.pFuncInfos == NULL) {
2024-09-12 07:46:30 +00:00
code = terrno;
2022-02-12 08:28:50 +00:00
goto RETRIEVE_FUNC_OVER;
2022-01-24 07:00:27 +00:00
}
retrieveRsp.pFuncExtraInfos = taosArrayInit(retrieveReq.numOfFuncs, sizeof(SFuncExtraInfo));
if (retrieveRsp.pFuncExtraInfos == NULL) {
2024-09-12 07:46:30 +00:00
code = terrno;
2023-04-04 11:53:08 +00:00
goto RETRIEVE_FUNC_OVER;
}
2022-02-12 08:28:50 +00:00
for (int32_t i = 0; i < retrieveReq.numOfFuncs; ++i) {
char *funcName = taosArrayGet(retrieveReq.pFuncNames, i);
2021-12-08 05:48:18 +00:00
2022-01-24 07:00:27 +00:00
SFuncObj *pFunc = mndAcquireFunc(pMnode, funcName);
2021-12-08 05:48:18 +00:00
if (pFunc == NULL) {
2024-07-23 01:58:17 +00:00
if (terrno != 0) code = terrno;
2022-02-12 08:28:50 +00:00
goto RETRIEVE_FUNC_OVER;
2021-12-08 05:48:18 +00:00
}
2022-02-12 08:28:50 +00:00
SFuncInfo funcInfo = {0};
2024-07-25 09:49:58 +00:00
(void)memcpy(funcInfo.name, pFunc->name, TSDB_FUNC_NAME_LEN);
2022-02-12 08:28:50 +00:00
funcInfo.funcType = pFunc->funcType;
funcInfo.scriptType = pFunc->scriptType;
funcInfo.outputType = pFunc->outputType;
funcInfo.outputLen = pFunc->outputLen;
funcInfo.bufSize = pFunc->bufSize;
funcInfo.signature = pFunc->signature;
2022-04-21 07:33:07 +00:00
if (retrieveReq.ignoreCodeComment) {
funcInfo.commentSize = 0;
funcInfo.codeSize = 0;
} else {
funcInfo.commentSize = pFunc->commentSize;
funcInfo.codeSize = pFunc->codeSize;
funcInfo.pCode = taosMemoryCalloc(1, funcInfo.codeSize);
if (funcInfo.pCode == NULL) {
2024-09-12 07:46:30 +00:00
terrno = terrno;
2022-04-20 09:43:02 +00:00
goto RETRIEVE_FUNC_OVER;
}
2024-07-25 09:49:58 +00:00
(void)memcpy(funcInfo.pCode, pFunc->pCode, pFunc->codeSize);
2022-04-21 07:33:07 +00:00
if (funcInfo.commentSize > 0) {
funcInfo.pComment = taosMemoryCalloc(1, funcInfo.commentSize);
if (funcInfo.pComment == NULL) {
2024-09-12 07:46:30 +00:00
terrno = terrno;
2022-04-21 07:33:07 +00:00
goto RETRIEVE_FUNC_OVER;
}
2024-07-25 09:49:58 +00:00
(void)memcpy(funcInfo.pComment, pFunc->pComment, pFunc->commentSize);
2022-04-21 07:33:07 +00:00
}
2022-04-20 09:43:02 +00:00
}
2024-07-25 09:49:58 +00:00
if (taosArrayPush(retrieveRsp.pFuncInfos, &funcInfo) == NULL) {
2024-09-12 07:46:30 +00:00
terrno = terrno;
2024-07-25 09:49:58 +00:00
goto RETRIEVE_FUNC_OVER;
}
SFuncExtraInfo extraInfo = {0};
extraInfo.funcVersion = pFunc->funcVersion;
extraInfo.funcCreatedTime = pFunc->createdTime;
2024-07-25 09:49:58 +00:00
if (taosArrayPush(retrieveRsp.pFuncExtraInfos, &extraInfo) == NULL) {
2024-09-12 07:46:30 +00:00
terrno = terrno;
2024-07-25 09:49:58 +00:00
goto RETRIEVE_FUNC_OVER;
}
2023-04-04 11:53:08 +00:00
2022-01-24 07:00:27 +00:00
mndReleaseFunc(pMnode, pFunc);
2021-12-08 05:48:18 +00:00
}
2022-02-12 08:28:50 +00:00
int32_t contLen = tSerializeSRetrieveFuncRsp(NULL, 0, &retrieveRsp);
void *pRsp = rpcMallocCont(contLen);
if (pRsp == NULL) {
2024-09-12 07:46:30 +00:00
code = terrno;
2022-02-12 08:28:50 +00:00
goto RETRIEVE_FUNC_OVER;
}
2024-09-05 11:39:49 +00:00
if ((contLen = tSerializeSRetrieveFuncRsp(pRsp, contLen, &retrieveRsp)) <= 0) {
code = contLen;
goto RETRIEVE_FUNC_OVER;
}
2022-02-12 08:28:50 +00:00
2022-05-16 06:55:31 +00:00
pReq->info.rsp = pRsp;
pReq->info.rspLen = contLen;
2021-12-08 05:48:18 +00:00
2022-01-24 07:00:27 +00:00
code = 0;
2022-02-12 08:28:50 +00:00
RETRIEVE_FUNC_OVER:
tFreeSRetrieveFuncReq(&retrieveReq);
tFreeSRetrieveFuncRsp(&retrieveRsp);
2022-01-24 07:00:27 +00:00
2024-07-20 01:25:18 +00:00
TAOS_RETURN(code);
2021-12-08 05:48:18 +00:00
}
static void *mnodeGenTypeStr(char *buf, int32_t buflen, uint8_t type, int32_t len) {
2021-12-08 05:48:18 +00:00
char *msg = "unknown";
if (type >= sizeof(tDataTypes) / sizeof(tDataTypes[0])) {
return msg;
}
2024-09-20 08:37:27 +00:00
if (type == TSDB_DATA_TYPE_NCHAR || type == TSDB_DATA_TYPE_VARBINARY || type == TSDB_DATA_TYPE_BINARY ||
type == TSDB_DATA_TYPE_GEOMETRY) {
2021-12-25 05:27:37 +00:00
int32_t bytes = len > 0 ? (int32_t)(len - VARSTR_HEADER_SIZE) : len;
2021-12-08 05:48:18 +00:00
2024-07-25 09:49:58 +00:00
(void)snprintf(buf, buflen - 1, "%s(%d)", tDataTypes[type].name, type == TSDB_DATA_TYPE_NCHAR ? bytes / 4 : bytes);
2021-12-08 05:48:18 +00:00
buf[buflen - 1] = 0;
return buf;
}
return tDataTypes[type].name;
}
2022-05-16 06:55:31 +00:00
static int32_t mndRetrieveFuncs(SRpcMsg *pReq, SShowObj *pShow, SSDataBlock *pBlock, int32_t rows) {
SMnode *pMnode = pReq->info.node;
2021-12-08 05:48:18 +00:00
SSdb *pSdb = pMnode->pSdb;
int32_t numOfRows = 0;
SFuncObj *pFunc = NULL;
int32_t cols = 0;
2024-09-20 08:37:27 +00:00
int32_t code = 0;
2021-12-08 05:48:18 +00:00
char buf[TSDB_TYPE_STR_MAX_LEN];
while (numOfRows < rows) {
pShow->pIter = sdbFetch(pSdb, SDB_FUNC, pShow->pIter, (void **)&pFunc);
if (pShow->pIter == NULL) break;
cols = 0;
char b1[tListLen(pFunc->name) + VARSTR_HEADER_SIZE] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b1, pFunc->name, pShow->pMeta->pSchemas[cols].bytes);
SColumnInfoData *pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2024-09-20 08:37:27 +00:00
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)b1, false), pSdb, pFunc);
2022-04-21 07:33:07 +00:00
if (pFunc->pComment) {
char *b2 = taosMemoryCalloc(1, pShow->pMeta->pSchemas[cols].bytes);
STR_WITH_MAXSIZE_TO_VARSTR(b2, pFunc->pComment, pShow->pMeta->pSchemas[cols].bytes);
2022-04-21 07:33:07 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2024-09-20 08:37:27 +00:00
code = colDataSetVal(pColInfo, numOfRows, (const char *)b2, false);
if (code != 0) {
sdbRelease(pSdb, pFunc);
taosMemoryFree(b2);
TAOS_RETURN(code);
}
taosMemoryFree(b2);
2022-04-21 07:33:07 +00:00
} else {
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2024-09-20 08:37:27 +00:00
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, NULL, true), pSdb, pFunc);
if (code != 0) {
sdbRelease(pSdb, pFunc);
TAOS_RETURN(code);
}
2022-04-21 07:33:07 +00:00
}
int32_t isAgg = (pFunc->funcType == TSDB_FUNC_TYPE_AGGREGATE) ? 1 : 0;
2021-12-08 05:48:18 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2024-09-20 08:37:27 +00:00
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&isAgg, false), pSdb, pFunc);
2022-06-03 08:13:06 +00:00
char b3[TSDB_TYPE_STR_MAX_LEN + 1] = {0};
STR_WITH_MAXSIZE_TO_VARSTR(b3, mnodeGenTypeStr(buf, TSDB_TYPE_STR_MAX_LEN, pFunc->outputType, pFunc->outputLen),
pShow->pMeta->pSchemas[cols].bytes);
2021-12-08 05:48:18 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2024-09-20 08:37:27 +00:00
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)b3, false), pSdb, pFunc);
2021-12-08 05:48:18 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2024-09-20 08:37:27 +00:00
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->createdTime, false), pSdb,
pFunc);
2021-12-08 05:48:18 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2024-09-20 08:37:27 +00:00
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->codeSize, false), pSdb,
pFunc);
2021-12-08 05:48:18 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2024-09-20 08:37:27 +00:00
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->bufSize, false), pSdb,
pFunc);
2021-12-08 05:48:18 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
char *language = "";
if (pFunc->scriptType == TSDB_FUNC_SCRIPT_BIN_LIB) {
language = "C";
} else if (pFunc->scriptType == TSDB_FUNC_SCRIPT_PYTHON) {
language = "Python";
}
char varLang[TSDB_TYPE_STR_MAX_LEN + 1] = {0};
varDataSetLen(varLang, strlen(language));
2024-12-16 07:41:19 +00:00
tstrncpy(varDataVal(varLang), language, sizeof(varLang) - VARSTR_HEADER_SIZE);
2024-09-20 08:37:27 +00:00
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)varLang, false), pSdb, pFunc);
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
int32_t varCodeLen = (pFunc->codeSize + VARSTR_HEADER_SIZE) > TSDB_MAX_BINARY_LEN
? TSDB_MAX_BINARY_LEN
: pFunc->codeSize + VARSTR_HEADER_SIZE;
char *b4 = taosMemoryMalloc(varCodeLen);
2024-09-26 06:12:54 +00:00
if (b4 == NULL) {
code = terrno;
sdbRelease(pSdb, pFunc);
TAOS_RETURN(code);
}
2024-07-25 09:49:58 +00:00
(void)memcpy(varDataVal(b4), pFunc->pCode, varCodeLen - VARSTR_HEADER_SIZE);
varDataSetLen(b4, varCodeLen - VARSTR_HEADER_SIZE);
2024-09-20 08:37:27 +00:00
code = colDataSetVal(pColInfo, numOfRows, (const char *)b4, false);
if (code < 0) {
sdbRelease(pSdb, pFunc);
taosMemoryFree(b4);
TAOS_RETURN(code);
}
taosMemoryFree(b4);
2023-04-06 07:58:53 +00:00
pColInfo = taosArrayGet(pBlock->pDataBlock, cols++);
2024-09-20 08:37:27 +00:00
TAOS_CHECK_RETURN_WITH_RELEASE(colDataSetVal(pColInfo, numOfRows, (const char *)&pFunc->funcVersion, false), pSdb,
pFunc);
2023-04-06 07:58:53 +00:00
2021-12-08 05:48:18 +00:00
numOfRows++;
sdbRelease(pSdb, pFunc);
}
pShow->numOfRows += numOfRows;
2021-12-08 05:48:18 +00:00
return numOfRows;
}
static void mndCancelGetNextFunc(SMnode *pMnode, void *pIter) {
SSdb *pSdb = pMnode->pSdb;
sdbCancelFetchByType(pSdb, pIter, SDB_FUNC);
2022-04-21 07:33:07 +00:00
}