TDengine/source/libs/executor/src/executorMain.c

426 lines
13 KiB
C
Raw Normal View History

2022-01-08 14:59:24 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "dataSinkMgt.h"
2022-06-20 06:29:18 +00:00
#include "os.h"
2022-01-08 14:59:24 +00:00
#include "tmsg.h"
2022-06-20 06:29:18 +00:00
#include "tref.h"
2022-05-15 23:47:56 +00:00
#include "tudf.h"
2022-01-08 14:59:24 +00:00
#include "executor.h"
2022-05-15 23:47:56 +00:00
#include "executorimpl.h"
#include "query.h"
static TdThreadOnce initPoolOnce = PTHREAD_ONCE_INIT;
2022-06-20 06:29:18 +00:00
int32_t exchangeObjRefPool = -1;
2022-06-20 06:29:18 +00:00
static void initRefPool() { exchangeObjRefPool = taosOpenRef(1024, doDestroyExchangeOperatorInfo); }
static void cleanupRefPool() {
int32_t ref = atomic_val_compare_exchange_32(&exchangeObjRefPool, exchangeObjRefPool, 0);
taosCloseRef(ref);
}
2022-01-08 14:59:24 +00:00
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan,
qTaskInfo_t* pTaskInfo, DataSinkHandle* handle, const char* sql, EOPTR_EXEC_MODEL model) {
2022-06-20 06:29:18 +00:00
assert(pSubplan != NULL);
2022-01-11 02:51:23 +00:00
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
2022-01-08 14:59:24 +00:00
taosThreadOnce(&initPoolOnce, initRefPool);
atexit(cleanupRefPool);
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId, sql, model);
2022-01-19 08:20:13 +00:00
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
2022-01-08 14:59:24 +00:00
2022-01-11 10:15:37 +00:00
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
code = dsDataSinkMgtInit(&cfg);
2022-01-08 14:59:24 +00:00
if (code != TSDB_CODE_SUCCESS) {
goto _error;
2022-01-08 14:59:24 +00:00
}
2022-06-20 06:29:18 +00:00
2022-01-25 07:17:02 +00:00
if (handle) {
2022-06-06 12:59:36 +00:00
void* pSinkParam = NULL;
2022-07-06 08:29:51 +00:00
code = createDataSinkParam(pSubplan->pDataSink, &pSinkParam, pTaskInfo, readHandle);
2022-06-06 12:59:36 +00:00
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
2022-06-20 06:29:18 +00:00
2022-06-06 12:59:36 +00:00
code = dsCreateDataSinker(pSubplan->pDataSink, handle, pSinkParam);
2022-01-25 07:17:02 +00:00
}
2022-06-20 06:29:18 +00:00
_error:
2022-01-08 14:59:24 +00:00
// if failed to add ref for all tables in this query, abort current query
return code;
}
#ifdef TEST_IMPL
// wait moment
2022-06-20 06:29:18 +00:00
int waitMoment(SQInfo* pQInfo) {
if (pQInfo->sql) {
int ms = 0;
2022-01-08 14:59:24 +00:00
char* pcnt = strstr(pQInfo->sql, " count(*)");
2022-06-20 06:29:18 +00:00
if (pcnt) return 0;
2022-01-08 14:59:24 +00:00
char* pos = strstr(pQInfo->sql, " t_");
2022-06-20 06:29:18 +00:00
if (pos) {
2022-01-08 14:59:24 +00:00
pos += 3;
ms = atoi(pos);
2022-06-20 06:29:18 +00:00
while (*pos >= '0' && *pos <= '9') {
pos++;
2022-01-08 14:59:24 +00:00
}
char unit_char = *pos;
2022-06-20 06:29:18 +00:00
if (unit_char == 'h') {
ms *= 3600 * 1000;
} else if (unit_char == 'm') {
ms *= 60 * 1000;
} else if (unit_char == 's') {
2022-01-08 14:59:24 +00:00
ms *= 1000;
}
}
2022-06-20 06:29:18 +00:00
if (ms == 0) return 0;
2022-01-08 14:59:24 +00:00
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
2022-06-20 06:29:18 +00:00
if (ms < 1000) {
2022-01-08 14:59:24 +00:00
taosMsleep(ms);
} else {
int used_ms = 0;
2022-06-20 06:29:18 +00:00
while (used_ms < ms) {
2022-01-08 14:59:24 +00:00
taosMsleep(1000);
used_ms += 1000;
2022-06-20 06:29:18 +00:00
if (isTaskKilled(pQInfo)) {
2022-01-08 14:59:24 +00:00
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
}
}
}
return 1;
}
#endif
2022-06-20 06:29:18 +00:00
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t* useconds) {
2022-01-11 02:51:23 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
2022-01-08 14:59:24 +00:00
2022-01-18 06:12:25 +00:00
*pRes = NULL;
2022-01-08 14:59:24 +00:00
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
2022-06-20 06:29:18 +00:00
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo, (void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
2022-01-18 06:12:25 +00:00
return pTaskInfo->code;
2022-01-08 14:59:24 +00:00
}
2022-01-11 02:51:23 +00:00
if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampMs();
2022-01-08 14:59:24 +00:00
}
if (isTaskKilled(pTaskInfo)) {
2022-01-25 06:13:11 +00:00
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
2022-01-18 06:12:25 +00:00
return TSDB_CODE_SUCCESS;
2022-01-08 14:59:24 +00:00
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env);
2022-01-08 14:59:24 +00:00
if (ret != TSDB_CODE_SUCCESS) {
pTaskInfo->code = ret;
cleanUpUdfs();
2022-05-26 09:18:56 +00:00
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo), tstrerror(pTaskInfo->code));
2022-01-18 06:12:25 +00:00
return pTaskInfo->code;
2022-01-08 14:59:24 +00:00
}
2022-01-25 06:13:11 +00:00
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
2022-01-08 14:59:24 +00:00
2022-07-05 05:57:07 +00:00
int64_t st = taosGetTimestampUs();
2022-07-05 02:52:33 +00:00
*pRes = pTaskInfo->pRoot->fpSet.getNextFn(pTaskInfo->pRoot);
2022-01-25 06:13:11 +00:00
uint64_t el = (taosGetTimestampUs() - st);
2022-05-03 07:27:13 +00:00
2022-01-25 06:13:11 +00:00
pTaskInfo->cost.elapsedTime += el;
2022-01-18 06:12:25 +00:00
if (NULL == *pRes) {
*useconds = pTaskInfo->cost.elapsedTime;
}
2022-05-26 09:18:56 +00:00
cleanUpUdfs();
2022-06-20 06:29:18 +00:00
int32_t current = (*pRes != NULL) ? (*pRes)->info.rows : 0;
2022-05-26 09:18:56 +00:00
uint64_t total = pTaskInfo->pRoot->resultInfo.totalRows;
2022-01-25 02:41:35 +00:00
2022-01-25 06:13:11 +00:00
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
2022-06-20 06:29:18 +00:00
GET_TASKID(pTaskInfo), current, total, 0, el / 1000.0);
2022-01-11 02:51:23 +00:00
atomic_store_64(&pTaskInfo->owner, 0);
2022-01-18 06:12:25 +00:00
return pTaskInfo->code;
2022-01-08 14:59:24 +00:00
}
int32_t qKillTask(qTaskInfo_t qinfo) {
2022-06-20 06:29:18 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
2022-01-08 14:59:24 +00:00
2022-01-25 02:41:35 +00:00
if (pTaskInfo == NULL) {
2022-01-08 14:59:24 +00:00
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
2022-01-25 05:42:33 +00:00
qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
2022-01-25 02:41:35 +00:00
setTaskKilled(pTaskInfo);
2022-01-08 14:59:24 +00:00
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
2022-01-25 02:41:35 +00:00
while (pTaskInfo->owner != 0) {
2022-01-08 14:59:24 +00:00
taosMsleep(100);
}
return TSDB_CODE_SUCCESS;
}
2022-01-20 11:34:46 +00:00
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
2022-06-20 06:29:18 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qinfo;
2022-01-20 11:34:46 +00:00
2022-01-25 02:41:35 +00:00
if (pTaskInfo == NULL) {
2022-01-20 11:34:46 +00:00
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
2022-01-25 05:42:33 +00:00
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
2022-01-25 02:41:35 +00:00
setTaskKilled(pTaskInfo);
2022-01-20 11:34:46 +00:00
return TSDB_CODE_SUCCESS;
}
2022-01-25 02:41:35 +00:00
void qDestroyTask(qTaskInfo_t qTaskHandle) {
2022-06-20 06:29:18 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)qTaskHandle;
2022-07-21 07:47:22 +00:00
if (pTaskInfo == NULL) {
return;
}
2022-06-20 06:29:18 +00:00
qDebug("%s execTask completed, numOfRows:%" PRId64, GET_TASKID(pTaskInfo), pTaskInfo->pRoot->resultInfo.totalRows);
2022-01-08 14:59:24 +00:00
2022-06-20 06:29:18 +00:00
queryCostStatis(pTaskInfo); // print the query cost summary
2022-01-25 02:41:35 +00:00
doDestroyTask(pTaskInfo);
2022-01-08 14:59:24 +00:00
}
2022-04-01 11:45:45 +00:00
2022-06-20 06:29:18 +00:00
int32_t qGetExplainExecInfo(qTaskInfo_t tinfo, int32_t* resNum, SExplainExecInfo** pRes) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int32_t capacity = 0;
2022-04-01 11:45:45 +00:00
2022-06-20 06:29:18 +00:00
return getOperatorExplainExecInfo(pTaskInfo->pRoot, pRes, &capacity, resNum);
2022-04-01 11:45:45 +00:00
}
int32_t qSerializeTaskStatus(qTaskInfo_t tinfo, char** pOutput, int32_t* len) {
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
2022-06-17 12:07:55 +00:00
if (pTaskInfo->pRoot == NULL) {
return TSDB_CODE_INVALID_PARA;
}
2022-06-27 06:47:14 +00:00
int32_t nOptrWithVal = 0;
int32_t code = encodeOperator(pTaskInfo->pRoot, pOutput, len, &nOptrWithVal);
if ((code == TSDB_CODE_SUCCESS) && (nOptrWithVal = 0)) {
taosMemoryFreeClear(*pOutput);
*len = 0;
}
return code;
2022-06-17 12:07:55 +00:00
}
int32_t qDeserializeTaskStatus(qTaskInfo_t tinfo, const char* pInput, int32_t len) {
2022-06-30 06:41:50 +00:00
SExecTaskInfo* pTaskInfo = (struct SExecTaskInfo*)tinfo;
2022-06-17 12:07:55 +00:00
if (pTaskInfo == NULL || pInput == NULL || len == 0) {
return TSDB_CODE_INVALID_PARA;
}
return decodeOperator(pTaskInfo->pRoot, pInput, len);
}
2022-07-07 09:16:12 +00:00
int32_t qExtractStreamScanner(qTaskInfo_t tinfo, void** scanner) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
while (1) {
uint8_t type = pOperator->operatorType;
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
*scanner = pOperator->info;
return 0;
} else {
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
}
}
}
2022-07-14 07:32:21 +00:00
#if 0
2022-07-13 08:37:33 +00:00
int32_t qStreamInput(qTaskInfo_t tinfo, void* pItem) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
taosWriteQitem(pTaskInfo->streamInfo.inputQueue->queue, pItem);
return 0;
}
2022-07-14 07:32:21 +00:00
#endif
2022-07-13 08:37:33 +00:00
2022-07-15 09:48:48 +00:00
int32_t qStreamPrepareRecover(qTaskInfo_t tinfo, int64_t startVer, int64_t endVer) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_STREAM);
pTaskInfo->streamInfo.recoverStartVer = startVer;
pTaskInfo->streamInfo.recoverEndVer = endVer;
pTaskInfo->streamInfo.recoverStep = STREAM_RECOVER_STEP__PREPARE;
return 0;
}
2022-07-07 09:16:12 +00:00
void* qExtractReaderFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return (void*)pInfo->tqReader;
}
const SSchemaWrapper* qExtractSchemaFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return pInfo->tqReader->pSchemaWrapper;
}
const STqOffset* qExtractStatusFromStreamScanner(void* scanner) {
SStreamScanInfo* pInfo = scanner;
return &pInfo->offset;
}
2022-07-08 09:48:34 +00:00
void* qStreamExtractMetaMsg(qTaskInfo_t tinfo) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2022-07-12 06:10:22 +00:00
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
2022-07-08 09:48:34 +00:00
return pTaskInfo->streamInfo.metaBlk;
}
int32_t qStreamExtractOffset(qTaskInfo_t tinfo, STqOffsetVal* pOffset) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2022-07-12 06:10:22 +00:00
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
2022-07-08 09:48:34 +00:00
memcpy(pOffset, &pTaskInfo->streamInfo.lastStatus, sizeof(STqOffsetVal));
return 0;
}
2022-07-10 03:18:15 +00:00
int32_t qStreamPrepareScan(qTaskInfo_t tinfo, const STqOffsetVal* pOffset) {
2022-07-08 09:48:34 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
SOperatorInfo* pOperator = pTaskInfo->pRoot;
2022-07-12 06:10:22 +00:00
ASSERT(pTaskInfo->execModel == OPTR_EXEC_MODEL_QUEUE);
2022-07-08 09:48:34 +00:00
pTaskInfo->streamInfo.prepareStatus = *pOffset;
2022-07-12 07:11:14 +00:00
if (!tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus)) {
2022-07-12 06:10:22 +00:00
while (1) {
uint8_t type = pOperator->operatorType;
2022-07-12 09:28:14 +00:00
pOperator->status = OP_OPENED;
2022-07-12 06:10:22 +00:00
if (type == QUERY_NODE_PHYSICAL_PLAN_STREAM_SCAN) {
SStreamScanInfo* pInfo = pOperator->info;
if (pOffset->type == TMQ_OFFSET__LOG) {
2022-07-22 02:45:11 +00:00
STableScanInfo* pTSInfo = pInfo->pTableScanOp->info;
tsdbReaderClose(pTSInfo->dataReader);
pTSInfo->dataReader = NULL;
2022-07-12 08:39:05 +00:00
#if 0
if (tOffsetEqual(pOffset, &pTaskInfo->streamInfo.lastStatus) &&
pInfo->tqReader->pWalReader->curVersion != pOffset->version) {
qError("prepare scan ver %ld actual ver %ld, last %ld", pOffset->version,
pInfo->tqReader->pWalReader->curVersion, pTaskInfo->streamInfo.lastStatus.version);
ASSERT(0);
}
#endif
if (tqSeekVer(pInfo->tqReader, pOffset->version + 1) < 0) {
2022-07-12 06:10:22 +00:00
return -1;
2022-07-10 03:18:15 +00:00
}
2022-07-12 08:39:05 +00:00
ASSERT(pInfo->tqReader->pWalReader->curVersion == pOffset->version + 1);
2022-07-12 06:10:22 +00:00
} else if (pOffset->type == TMQ_OFFSET__SNAPSHOT_DATA) {
/*pInfo->blockType = STREAM_INPUT__TABLE_SCAN;*/
int64_t uid = pOffset->uid;
int64_t ts = pOffset->ts;
if (uid == 0) {
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
2022-07-18 05:51:31 +00:00
} else {
return -1;
2022-07-10 03:18:15 +00:00
}
}
2022-07-21 13:46:55 +00:00
2022-07-12 07:11:14 +00:00
/*if (pTaskInfo->streamInfo.lastStatus.type != TMQ_OFFSET__SNAPSHOT_DATA ||*/
/*pTaskInfo->streamInfo.lastStatus.uid != uid || pTaskInfo->streamInfo.lastStatus.ts != ts) {*/
STableScanInfo* pTableScanInfo = pInfo->pTableScanOp->info;
int32_t tableSz = taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList);
2022-07-21 13:46:55 +00:00
#ifndef NDEBUG
2022-07-22 05:04:59 +00:00
qDebug("switch to next table %ld (cursor %d), %ld rows returned", uid, pTableScanInfo->currentTable,
pInfo->pTableScanOp->resultInfo.totalRows);
2022-07-21 13:46:55 +00:00
pInfo->pTableScanOp->resultInfo.totalRows = 0;
#endif
bool found = false;
2022-07-12 07:11:14 +00:00
for (int32_t i = 0; i < tableSz; i++) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, i);
if (pTableInfo->uid == uid) {
found = true;
pTableScanInfo->currentTable = i;
2022-07-12 09:28:14 +00:00
break;
2022-07-12 06:10:22 +00:00
}
2022-07-12 07:11:14 +00:00
}
2022-07-10 03:18:15 +00:00
2022-07-12 07:11:14 +00:00
// TODO after dropping table, table may be not found
ASSERT(found);
2022-07-10 03:18:15 +00:00
2022-07-22 05:04:59 +00:00
if (pTableScanInfo->dataReader == NULL) {
2022-07-22 02:45:11 +00:00
if (tsdbReaderOpen(pTableScanInfo->readHandle.vnode, &pTableScanInfo->cond,
pTaskInfo->tableqinfoList.pTableList, &pTableScanInfo->dataReader, NULL) < 0 ||
pTableScanInfo->dataReader == NULL) {
ASSERT(0);
}
}
2022-07-12 07:11:14 +00:00
tsdbSetTableId(pTableScanInfo->dataReader, uid);
int64_t oldSkey = pTableScanInfo->cond.twindows.skey;
pTableScanInfo->cond.twindows.skey = ts + 1;
tsdbReaderReset(pTableScanInfo->dataReader, &pTableScanInfo->cond);
pTableScanInfo->cond.twindows.skey = oldSkey;
pTableScanInfo->scanTimes = 0;
2022-07-10 03:18:15 +00:00
2022-07-12 07:11:14 +00:00
qDebug("tsdb reader offset seek to uid %ld ts %ld, table cur set to %d , all table num %d", uid, ts,
pTableScanInfo->currentTable, tableSz);
/*}*/
2022-07-10 03:18:15 +00:00
2022-07-12 06:10:22 +00:00
} else {
ASSERT(0);
}
return 0;
2022-07-10 03:18:15 +00:00
} else {
2022-07-12 06:10:22 +00:00
ASSERT(pOperator->numOfDownstream == 1);
pOperator = pOperator->pDownstream[0];
2022-07-08 13:37:15 +00:00
}
2022-07-08 09:48:34 +00:00
}
}
return 0;
}
2022-07-10 06:48:16 +00:00
#if 0
2022-07-08 09:48:34 +00:00
int32_t qStreamPrepareTsdbScan(qTaskInfo_t tinfo, uint64_t uid, int64_t ts) {
2022-06-30 06:41:50 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2022-04-01 11:45:45 +00:00
if (uid == 0) {
2022-07-01 06:39:21 +00:00
if (taosArrayGetSize(pTaskInfo->tableqinfoList.pTableList) != 0) {
STableKeyInfo* pTableInfo = taosArrayGet(pTaskInfo->tableqinfoList.pTableList, 0);
uid = pTableInfo->uid;
ts = INT64_MIN;
}
}
2022-06-30 06:41:50 +00:00
return doPrepareScan(pTaskInfo->pRoot, uid, ts);
}
2022-06-30 06:41:50 +00:00
int32_t qGetStreamScanStatus(qTaskInfo_t tinfo, uint64_t* uid, int64_t* ts) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
2022-06-30 06:41:50 +00:00
return doGetScanStatus(pTaskInfo->pRoot, uid, ts);
}
2022-07-10 06:48:16 +00:00
#endif