TDengine/source/libs/executor/src/executorMain.c

268 lines
7.1 KiB
C
Raw Normal View History

2022-01-08 14:59:24 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include <tsdb.h>
2022-01-11 02:51:23 +00:00
#include "dataSinkMgt.h"
#include "exception.h"
#include "os.h"
#include "tarray.h"
2022-01-08 14:59:24 +00:00
#include "tcache.h"
#include "tglobal.h"
#include "tmsg.h"
#include "thash.h"
#include "executorimpl.h"
#include "executor.h"
#include "tlosertree.h"
#include "ttypes.h"
#include "query.h"
typedef struct STaskMgmt {
pthread_mutex_t lock;
SCacheObj *qinfoPool; // query handle pool
int32_t vgId;
bool closed;
} STaskMgmt;
static void taskMgmtKillTaskFn(void* handle, void* param1) {
void** fp = (void**)handle;
qKillTask(*fp);
}
static void freeqinfoFn(void *qhandle) {
void** handle = qhandle;
if (handle == NULL || *handle == NULL) {
return;
}
qKillTask(*handle);
qDestroyTask(*handle);
}
2022-01-28 02:44:02 +00:00
int32_t qCreateExecTask(SReadHandle* readHandle, int32_t vgId, uint64_t taskId, SSubplan* pSubplan, qTaskInfo_t* pTaskInfo, DataSinkHandle* handle) {
2022-01-21 03:19:24 +00:00
assert(readHandle != NULL && pSubplan != NULL);
2022-01-11 02:51:23 +00:00
SExecTaskInfo** pTask = (SExecTaskInfo**)pTaskInfo;
2022-01-08 14:59:24 +00:00
2022-01-25 05:42:33 +00:00
int32_t code = createExecTaskInfoImpl(pSubplan, pTask, readHandle, taskId);
2022-01-19 08:20:13 +00:00
if (code != TSDB_CODE_SUCCESS) {
goto _error;
}
2022-01-08 14:59:24 +00:00
2022-01-11 10:15:37 +00:00
SDataSinkMgtCfg cfg = {.maxDataBlockNum = 1000, .maxDataBlockNumPerQuery = 100};
code = dsDataSinkMgtInit(&cfg);
2022-01-08 14:59:24 +00:00
if (code != TSDB_CODE_SUCCESS) {
goto _error;
2022-01-08 14:59:24 +00:00
}
2022-01-25 07:17:02 +00:00
if (handle) {
code = dsCreateDataSinker(pSubplan->pDataSink, handle);
}
2022-01-21 03:19:24 +00:00
_error:
2022-01-08 14:59:24 +00:00
// if failed to add ref for all tables in this query, abort current query
return code;
}
#ifdef TEST_IMPL
// wait moment
int waitMoment(SQInfo* pQInfo){
if(pQInfo->sql) {
int ms = 0;
char* pcnt = strstr(pQInfo->sql, " count(*)");
if(pcnt) return 0;
char* pos = strstr(pQInfo->sql, " t_");
if(pos){
pos += 3;
ms = atoi(pos);
while(*pos >= '0' && *pos <= '9'){
pos ++;
}
char unit_char = *pos;
if(unit_char == 'h'){
ms *= 3600*1000;
} else if(unit_char == 'm'){
ms *= 60*1000;
} else if(unit_char == 's'){
ms *= 1000;
}
}
if(ms == 0) return 0;
printf("test wait sleep %dms. sql=%s ...\n", ms, pQInfo->sql);
if(ms < 1000) {
taosMsleep(ms);
} else {
int used_ms = 0;
while(used_ms < ms) {
taosMsleep(1000);
used_ms += 1000;
if(isTaskKilled(pQInfo)){
2022-01-08 14:59:24 +00:00
printf("test check query is canceled, sleep break.%s\n", pQInfo->sql);
break;
}
}
}
}
return 1;
}
#endif
2022-01-18 06:12:25 +00:00
int32_t qExecTask(qTaskInfo_t tinfo, SSDataBlock** pRes, uint64_t *useconds) {
2022-01-11 02:51:23 +00:00
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*)tinfo;
int64_t threadId = taosGetSelfPthreadId();
2022-01-08 14:59:24 +00:00
2022-01-18 06:12:25 +00:00
*pRes = NULL;
2022-01-08 14:59:24 +00:00
int64_t curOwner = 0;
if ((curOwner = atomic_val_compare_exchange_64(&pTaskInfo->owner, 0, threadId)) != 0) {
2022-01-25 05:42:33 +00:00
qError("%s-%p execTask is now executed by thread:%p", GET_TASKID(pTaskInfo), pTaskInfo,
2022-01-11 02:51:23 +00:00
(void*)curOwner);
pTaskInfo->code = TSDB_CODE_QRY_IN_EXEC;
2022-01-18 06:12:25 +00:00
return pTaskInfo->code;
2022-01-08 14:59:24 +00:00
}
2022-01-11 02:51:23 +00:00
if (pTaskInfo->cost.start == 0) {
pTaskInfo->cost.start = taosGetTimestampMs();
2022-01-08 14:59:24 +00:00
}
if (isTaskKilled(pTaskInfo)) {
2022-01-25 06:13:11 +00:00
qDebug("%s already killed, abort", GET_TASKID(pTaskInfo));
2022-01-18 06:12:25 +00:00
return TSDB_CODE_SUCCESS;
2022-01-08 14:59:24 +00:00
}
// error occurs, record the error code and return to client
int32_t ret = setjmp(pTaskInfo->env);
2022-01-08 14:59:24 +00:00
if (ret != TSDB_CODE_SUCCESS) {
publishQueryAbortEvent(pTaskInfo, ret);
pTaskInfo->code = ret;
2022-01-25 06:13:11 +00:00
qDebug("%s task abort due to error/cancel occurs, code:%s", GET_TASKID(pTaskInfo),
tstrerror(pTaskInfo->code));
2022-01-18 06:12:25 +00:00
return pTaskInfo->code;
2022-01-08 14:59:24 +00:00
}
2022-01-25 06:13:11 +00:00
qDebug("%s execTask is launched", GET_TASKID(pTaskInfo));
2022-01-08 14:59:24 +00:00
bool newgroup = false;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_BEFORE_OPERATOR_EXEC);
2022-01-11 02:51:23 +00:00
int64_t st = 0;
2022-01-08 14:59:24 +00:00
st = taosGetTimestampUs();
2022-01-18 06:12:25 +00:00
*pRes = pTaskInfo->pRoot->exec(pTaskInfo->pRoot, &newgroup);
2022-01-11 02:51:23 +00:00
2022-01-25 06:13:11 +00:00
uint64_t el = (taosGetTimestampUs() - st);
pTaskInfo->cost.elapsedTime += el;
publishOperatorProfEvent(pTaskInfo->pRoot, QUERY_PROF_AFTER_OPERATOR_EXEC);
2022-01-11 02:51:23 +00:00
2022-01-18 06:12:25 +00:00
if (NULL == *pRes) {
*useconds = pTaskInfo->cost.elapsedTime;
}
2022-01-25 02:41:35 +00:00
int32_t current = (*pRes != NULL)? (*pRes)->info.rows:0;
pTaskInfo->totalRows += current;
2022-01-25 06:13:11 +00:00
qDebug("%s task suspended, %d rows returned, total:%" PRId64 " rows, in sinkNode:%d, elapsed:%.2f ms",
GET_TASKID(pTaskInfo), current, pTaskInfo->totalRows, 0, el/1000.0);
2022-01-11 02:51:23 +00:00
atomic_store_64(&pTaskInfo->owner, 0);
2022-01-18 06:12:25 +00:00
return pTaskInfo->code;
2022-01-08 14:59:24 +00:00
}
void* qGetResultRetrieveMsg(qTaskInfo_t qinfo) {
SQInfo* pQInfo = (SQInfo*) qinfo;
assert(pQInfo != NULL);
return pQInfo->rspContext;
}
int32_t qKillTask(qTaskInfo_t qinfo) {
2022-01-25 02:41:35 +00:00
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
2022-01-08 14:59:24 +00:00
2022-01-25 02:41:35 +00:00
if (pTaskInfo == NULL) {
2022-01-08 14:59:24 +00:00
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
2022-01-25 05:42:33 +00:00
qDebug("%s execTask killed", GET_TASKID(pTaskInfo));
2022-01-25 02:41:35 +00:00
setTaskKilled(pTaskInfo);
2022-01-08 14:59:24 +00:00
// Wait for the query executing thread being stopped/
// Once the query is stopped, the owner of qHandle will be cleared immediately.
2022-01-25 02:41:35 +00:00
while (pTaskInfo->owner != 0) {
2022-01-08 14:59:24 +00:00
taosMsleep(100);
}
return TSDB_CODE_SUCCESS;
}
2022-01-20 11:34:46 +00:00
int32_t qAsyncKillTask(qTaskInfo_t qinfo) {
2022-01-25 02:41:35 +00:00
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
2022-01-20 11:34:46 +00:00
2022-01-25 02:41:35 +00:00
if (pTaskInfo == NULL) {
2022-01-20 11:34:46 +00:00
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
2022-01-25 05:42:33 +00:00
qDebug("%s execTask async killed", GET_TASKID(pTaskInfo));
2022-01-25 02:41:35 +00:00
setTaskKilled(pTaskInfo);
2022-01-20 11:34:46 +00:00
return TSDB_CODE_SUCCESS;
}
2022-01-08 14:59:24 +00:00
int32_t qIsTaskCompleted(qTaskInfo_t qinfo) {
SExecTaskInfo *pTaskInfo = (SExecTaskInfo *)qinfo;
2022-01-08 14:59:24 +00:00
if (pTaskInfo == NULL /*|| !isValidQInfo(pTaskInfo)*/) {
2022-01-08 14:59:24 +00:00
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
return isTaskKilled(pTaskInfo) || Q_STATUS_EQUAL(pTaskInfo->status, TASK_OVER);
2022-01-08 14:59:24 +00:00
}
2022-01-25 02:41:35 +00:00
void qDestroyTask(qTaskInfo_t qTaskHandle) {
SExecTaskInfo* pTaskInfo = (SExecTaskInfo*) qTaskHandle;
2022-01-25 05:42:33 +00:00
qDebug("%s execTask completed, numOfRows:%"PRId64, GET_TASKID(pTaskInfo), pTaskInfo->totalRows);
2022-01-08 14:59:24 +00:00
2022-01-25 02:41:35 +00:00
queryCostStatis(pTaskInfo); // print the query cost summary
doDestroyTask(pTaskInfo);
2022-01-08 14:59:24 +00:00
}
#if 0
//kill by qid
int32_t qKillQueryByQId(void* pMgmt, int64_t qId, int32_t waitMs, int32_t waitCount) {
int32_t error = TSDB_CODE_SUCCESS;
void** handle = qAcquireTask(pMgmt, qId);
if(handle == NULL) return terrno;
SQInfo* pQInfo = (SQInfo*)(*handle);
if (pQInfo == NULL || !isValidQInfo(pQInfo)) {
return TSDB_CODE_QRY_INVALID_QHANDLE;
}
2022-01-25 05:42:33 +00:00
qWarn("%s be killed(no memory commit).", pQInfo->qId);
2022-01-25 02:41:35 +00:00
setTaskKilled(pQInfo);
2022-01-08 14:59:24 +00:00
// wait query stop
int32_t loop = 0;
while (pQInfo->owner != 0) {
taosMsleep(waitMs);
if(loop++ > waitCount){
error = TSDB_CODE_FAILED;
break;
}
}
qReleaseTask(pMgmt, (void **)&handle, true);
return error;
}
2022-01-25 07:17:02 +00:00
#endif