TDengine/source/libs/stream/src/streamTask.c

693 lines
25 KiB
C
Raw Normal View History

2022-06-06 08:34:12 +00:00
/*
* Copyright (c) 2019 TAOS Data, Inc. <jhtao@taosdata.com>
*
* This program is free software: you can use, redistribute, and/or modify
* it under the terms of the GNU Affero General Public License, version 3
* or later ("AGPL"), as published by the Free Software Foundation.
*
* This program is distributed in the hope that it will be useful, but WITHOUT
* ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or
* FITNESS FOR A PARTICULAR PURPOSE.
*
* You should have received a copy of the GNU Affero General Public License
* along with this program. If not, see <http://www.gnu.org/licenses/>.
*/
#include "executor.h"
#include "streamInt.h"
#include "tmisce.h"
2022-06-06 08:34:12 +00:00
#include "tstream.h"
#include "ttimer.h"
#include "wal.h"
2022-06-06 08:34:12 +00:00
static int32_t addToTaskset(SArray* pArray, SStreamTask* pTask) {
2023-05-29 10:02:45 +00:00
int32_t childId = taosArrayGetSize(pArray);
2023-06-07 07:03:04 +00:00
pTask->info.selfChildId = childId;
2023-05-29 10:02:45 +00:00
taosArrayPush(pArray, &pTask);
return 0;
}
2023-06-28 09:58:07 +00:00
SStreamTask* tNewStreamTask(int64_t streamId, int8_t taskLevel, int8_t fillHistory, int64_t triggerParam,
SArray* pTaskList) {
2022-06-06 08:34:12 +00:00
SStreamTask* pTask = (SStreamTask*)taosMemoryCalloc(1, sizeof(SStreamTask));
if (pTask == NULL) {
2023-05-29 09:50:39 +00:00
terrno = TSDB_CODE_OUT_OF_MEMORY;
2022-06-06 08:34:12 +00:00
return NULL;
}
2023-08-04 05:52:20 +00:00
pTask->ver = SSTREAM_TASK_VER;
2023-04-08 17:39:09 +00:00
pTask->id.taskId = tGenIdPI32();
pTask->id.streamId = streamId;
2023-06-07 07:03:04 +00:00
pTask->info.taskLevel = taskLevel;
pTask->info.fillHistory = fillHistory;
2023-08-31 05:46:48 +00:00
pTask->info.triggerParam = triggerParam;
2023-04-08 17:39:09 +00:00
char buf[128] = {0};
2023-04-27 14:38:33 +00:00
sprintf(buf, "0x%" PRIx64 "-%d", pTask->id.streamId, pTask->id.taskId);
2023-04-08 17:39:09 +00:00
pTask->id.idStr = taosStrdup(buf);
2023-04-11 11:24:34 +00:00
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.taskStatus = TASK_STATUS__SCAN_HISTORY;
2023-08-31 05:46:48 +00:00
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
2023-07-19 10:42:02 +00:00
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
2022-06-06 08:34:12 +00:00
addToTaskset(pTaskList, pTask);
2022-06-06 08:34:12 +00:00
return pTask;
}
2022-06-22 09:56:46 +00:00
int32_t tEncodeStreamEpInfo(SEncoder* pEncoder, const SStreamChildEpInfo* pInfo) {
if (tEncodeI32(pEncoder, pInfo->taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->nodeId) < 0) return -1;
if (tEncodeI32(pEncoder, pInfo->childId) < 0) return -1;
2022-08-08 10:09:36 +00:00
/*if (tEncodeI64(pEncoder, pInfo->processedVer) < 0) return -1;*/
2022-06-22 09:56:46 +00:00
if (tEncodeSEpSet(pEncoder, &pInfo->epSet) < 0) return -1;
if (tEncodeI64(pEncoder, pInfo->stage) < 0) return -1;
2022-06-22 09:56:46 +00:00
return 0;
}
int32_t tDecodeStreamEpInfo(SDecoder* pDecoder, SStreamChildEpInfo* pInfo) {
if (tDecodeI32(pDecoder, &pInfo->taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->nodeId) < 0) return -1;
if (tDecodeI32(pDecoder, &pInfo->childId) < 0) return -1;
2022-08-08 10:09:36 +00:00
/*if (tDecodeI64(pDecoder, &pInfo->processedVer) < 0) return -1;*/
2022-06-22 09:56:46 +00:00
if (tDecodeSEpSet(pDecoder, &pInfo->epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pInfo->stage) < 0) return -1;
2022-06-22 09:56:46 +00:00
return 0;
}
2023-04-11 07:42:24 +00:00
int32_t tEncodeStreamTask(SEncoder* pEncoder, const SStreamTask* pTask) {
2022-10-21 01:47:04 +00:00
if (tStartEncode(pEncoder) < 0) return -1;
2023-08-04 05:52:20 +00:00
if (tEncodeI64(pEncoder, pTask->ver) < 0) return -1;
2023-04-08 17:39:09 +00:00
if (tEncodeI64(pEncoder, pTask->id.streamId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->id.taskId) < 0) return -1;
2023-06-07 07:03:04 +00:00
if (tEncodeI32(pEncoder, pTask->info.totalLevel) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->info.taskLevel) < 0) return -1;
2023-07-19 10:42:02 +00:00
if (tEncodeI8(pEncoder, pTask->outputInfo.type) < 0) return -1;
2023-06-07 07:03:04 +00:00
if (tEncodeI16(pEncoder, pTask->msgInfo.msgType) < 0) return -1;
2023-04-11 11:24:34 +00:00
if (tEncodeI8(pEncoder, pTask->status.taskStatus) < 0) return -1;
if (tEncodeI8(pEncoder, pTask->status.schedStatus) < 0) return -1;
2022-06-06 08:34:12 +00:00
2023-06-07 07:03:04 +00:00
if (tEncodeI32(pEncoder, pTask->info.selfChildId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->info.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.epSet) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->info.mnodeEpset) < 0) return -1;
2022-06-06 08:34:12 +00:00
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointId) < 0) return -1;
if (tEncodeI64(pEncoder, pTask->chkInfo.checkpointVer) < 0) return -1;
2023-06-07 07:03:04 +00:00
if (tEncodeI8(pEncoder, pTask->info.fillHistory) < 0) return -1;
2022-10-21 01:47:04 +00:00
if (tEncodeI64(pEncoder, pTask->historyTaskId.streamId)) return -1;
int32_t taskId = pTask->historyTaskId.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1;
2023-06-12 14:48:23 +00:00
if (tEncodeI64(pEncoder, pTask->streamTaskId.streamId)) return -1;
taskId = pTask->streamTaskId.taskId;
if (tEncodeI32(pEncoder, taskId)) return -1;
2023-06-12 14:48:23 +00:00
if (tEncodeU64(pEncoder, pTask->dataRange.range.minVer)) return -1;
if (tEncodeU64(pEncoder, pTask->dataRange.range.maxVer)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.skey)) return -1;
if (tEncodeI64(pEncoder, pTask->dataRange.window.ekey)) return -1;
2023-07-13 08:32:25 +00:00
int32_t epSz = taosArrayGetSize(pTask->pUpstreamInfoList);
2022-06-22 09:56:46 +00:00
if (tEncodeI32(pEncoder, epSz) < 0) return -1;
for (int32_t i = 0; i < epSz; i++) {
2023-07-13 08:32:25 +00:00
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
2022-06-22 09:56:46 +00:00
if (tEncodeStreamEpInfo(pEncoder, pInfo) < 0) return -1;
}
2023-06-07 07:03:04 +00:00
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
2022-06-06 08:34:12 +00:00
if (tEncodeCStr(pEncoder, pTask->exec.qmsg) < 0) return -1;
}
2023-07-19 10:42:02 +00:00
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
2022-06-06 08:34:12 +00:00
if (tEncodeI64(pEncoder, pTask->tbSink.stbUid) < 0) return -1;
if (tEncodeCStr(pEncoder, pTask->tbSink.stbFullName) < 0) return -1;
if (tEncodeSSchemaWrapper(pEncoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
2023-07-19 10:42:02 +00:00
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
2022-06-06 08:34:12 +00:00
if (tEncodeI64(pEncoder, pTask->smaSink.smaId) < 0) return -1;
2023-07-19 10:42:02 +00:00
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
2022-06-06 08:34:12 +00:00
if (tEncodeI8(pEncoder, pTask->fetchSink.reserved) < 0) return -1;
2023-07-19 10:42:02 +00:00
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
2023-09-23 17:26:51 +00:00
if (tEncodeI32(pEncoder, pTask->fixedDispatcher.taskId) < 0) return -1;
if (tEncodeI32(pEncoder, pTask->fixedDispatcher.nodeId) < 0) return -1;
if (tEncodeSEpSet(pEncoder, &pTask->fixedDispatcher.epSet) < 0) return -1;
2023-07-19 10:42:02 +00:00
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
2022-06-06 08:34:12 +00:00
if (tSerializeSUseDbRspImp(pEncoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
2022-06-17 13:00:10 +00:00
if (tEncodeCStr(pEncoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
2022-06-06 08:34:12 +00:00
}
2023-08-31 05:46:48 +00:00
if (tEncodeI64(pEncoder, pTask->info.triggerParam) < 0) return -1;
2023-08-31 03:42:12 +00:00
if (tEncodeCStrWithLen(pEncoder, pTask->reserve, sizeof(pTask->reserve) - 1) < 0) return -1;
2022-06-06 08:34:12 +00:00
2022-10-21 01:47:04 +00:00
tEndEncode(pEncoder);
2022-06-06 08:34:12 +00:00
return pEncoder->pos;
}
2023-04-11 07:42:24 +00:00
int32_t tDecodeStreamTask(SDecoder* pDecoder, SStreamTask* pTask) {
int32_t taskId = 0;
2022-10-21 01:47:04 +00:00
if (tStartDecode(pDecoder) < 0) return -1;
2023-08-04 05:52:20 +00:00
if (tDecodeI64(pDecoder, &pTask->ver) < 0) return -1;
if (pTask->ver != SSTREAM_TASK_VER) return -1;
2023-04-08 17:39:09 +00:00
if (tDecodeI64(pDecoder, &pTask->id.streamId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->id.taskId) < 0) return -1;
2023-06-07 07:03:04 +00:00
if (tDecodeI32(pDecoder, &pTask->info.totalLevel) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->info.taskLevel) < 0) return -1;
2023-07-19 10:42:02 +00:00
if (tDecodeI8(pDecoder, &pTask->outputInfo.type) < 0) return -1;
2023-06-07 07:03:04 +00:00
if (tDecodeI16(pDecoder, &pTask->msgInfo.msgType) < 0) return -1;
2023-04-11 11:24:34 +00:00
if (tDecodeI8(pDecoder, &pTask->status.taskStatus) < 0) return -1;
if (tDecodeI8(pDecoder, &pTask->status.schedStatus) < 0) return -1;
2022-06-06 08:34:12 +00:00
2023-06-07 07:03:04 +00:00
if (tDecodeI32(pDecoder, &pTask->info.selfChildId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->info.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.epSet) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->info.mnodeEpset) < 0) return -1;
2022-06-06 08:34:12 +00:00
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pTask->chkInfo.checkpointVer) < 0) return -1;
2023-06-07 07:03:04 +00:00
if (tDecodeI8(pDecoder, &pTask->info.fillHistory) < 0) return -1;
2022-10-21 01:47:04 +00:00
if (tDecodeI64(pDecoder, &pTask->historyTaskId.streamId)) return -1;
2023-09-16 17:19:59 +00:00
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->historyTaskId.taskId = taskId;
2023-09-16 17:19:59 +00:00
2023-06-12 14:48:23 +00:00
if (tDecodeI64(pDecoder, &pTask->streamTaskId.streamId)) return -1;
2023-09-16 17:19:59 +00:00
if (tDecodeI32(pDecoder, &taskId)) return -1;
pTask->streamTaskId.taskId = taskId;
2023-06-12 14:48:23 +00:00
if (tDecodeU64(pDecoder, &pTask->dataRange.range.minVer)) return -1;
if (tDecodeU64(pDecoder, &pTask->dataRange.range.maxVer)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.skey)) return -1;
if (tDecodeI64(pDecoder, &pTask->dataRange.window.ekey)) return -1;
2022-10-21 01:47:04 +00:00
2023-08-04 05:52:20 +00:00
int32_t epSz = -1;
2022-06-22 09:56:46 +00:00
if (tDecodeI32(pDecoder, &epSz) < 0) return -1;
2023-06-07 07:03:04 +00:00
2023-07-13 08:32:25 +00:00
pTask->pUpstreamInfoList = taosArrayInit(epSz, POINTER_BYTES);
2022-06-22 09:56:46 +00:00
for (int32_t i = 0; i < epSz; i++) {
SStreamChildEpInfo* pInfo = taosMemoryCalloc(1, sizeof(SStreamChildEpInfo));
if (pInfo == NULL) return -1;
2022-10-19 01:24:18 +00:00
if (tDecodeStreamEpInfo(pDecoder, pInfo) < 0) {
taosMemoryFreeClear(pInfo);
return -1;
}
2023-07-13 08:32:25 +00:00
taosArrayPush(pTask->pUpstreamInfoList, &pInfo);
2022-06-22 09:56:46 +00:00
}
2023-06-07 07:03:04 +00:00
if (pTask->info.taskLevel != TASK_LEVEL__SINK) {
2022-06-06 08:34:12 +00:00
if (tDecodeCStrAlloc(pDecoder, &pTask->exec.qmsg) < 0) return -1;
}
2023-07-19 10:42:02 +00:00
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
2022-06-06 08:34:12 +00:00
if (tDecodeI64(pDecoder, &pTask->tbSink.stbUid) < 0) return -1;
if (tDecodeCStrTo(pDecoder, pTask->tbSink.stbFullName) < 0) return -1;
pTask->tbSink.pSchemaWrapper = taosMemoryCalloc(1, sizeof(SSchemaWrapper));
if (pTask->tbSink.pSchemaWrapper == NULL) return -1;
if (tDecodeSSchemaWrapper(pDecoder, pTask->tbSink.pSchemaWrapper) < 0) return -1;
2023-07-19 10:42:02 +00:00
} else if (pTask->outputInfo.type == TASK_OUTPUT__SMA) {
2022-06-06 08:34:12 +00:00
if (tDecodeI64(pDecoder, &pTask->smaSink.smaId) < 0) return -1;
2023-07-19 10:42:02 +00:00
} else if (pTask->outputInfo.type == TASK_OUTPUT__FETCH) {
2022-06-06 08:34:12 +00:00
if (tDecodeI8(pDecoder, &pTask->fetchSink.reserved) < 0) return -1;
2023-07-19 10:42:02 +00:00
} else if (pTask->outputInfo.type == TASK_OUTPUT__FIXED_DISPATCH) {
2023-09-23 17:26:51 +00:00
if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.taskId) < 0) return -1;
if (tDecodeI32(pDecoder, &pTask->fixedDispatcher.nodeId) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &pTask->fixedDispatcher.epSet) < 0) return -1;
2023-07-19 10:42:02 +00:00
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
2022-06-06 08:34:12 +00:00
if (tDeserializeSUseDbRspImp(pDecoder, &pTask->shuffleDispatcher.dbInfo) < 0) return -1;
2022-06-17 13:00:10 +00:00
if (tDecodeCStrTo(pDecoder, pTask->shuffleDispatcher.stbFullName) < 0) return -1;
2022-06-06 08:34:12 +00:00
}
2023-08-31 05:46:48 +00:00
if (tDecodeI64(pDecoder, &pTask->info.triggerParam) < 0) return -1;
2023-08-31 01:28:43 +00:00
if (tDecodeCStrTo(pDecoder, pTask->reserve) < 0) return -1;
2022-06-06 08:34:12 +00:00
2022-10-21 01:47:04 +00:00
tEndDecode(pDecoder);
2022-06-06 08:34:12 +00:00
return 0;
}
2023-08-31 03:42:12 +00:00
int32_t tDecodeStreamTaskChkInfo(SDecoder* pDecoder, SCheckpointInfo* pChkpInfo) {
int64_t ver;
int64_t skip64;
int8_t skip8;
int32_t skip32;
int16_t skip16;
SEpSet epSet;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &skip64) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI16(pDecoder, &skip16) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI8(pDecoder, &skip8) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeI32(pDecoder, &skip32) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeSEpSet(pDecoder, &epSet) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointId) < 0) return -1;
if (tDecodeI64(pDecoder, &pChkpInfo->checkpointVer) < 0) return -1;
2022-06-06 08:34:12 +00:00
2022-10-21 01:47:04 +00:00
tEndDecode(pDecoder);
2022-06-06 08:34:12 +00:00
return 0;
}
2023-09-16 13:37:30 +00:00
int32_t tDecodeStreamTaskId(SDecoder* pDecoder, STaskId* pTaskId) {
2023-08-31 07:28:31 +00:00
int64_t ver;
if (tStartDecode(pDecoder) < 0) return -1;
if (tDecodeI64(pDecoder, &ver) < 0) return -1;
if (ver != SSTREAM_TASK_VER) return -1;
if (tDecodeI64(pDecoder, &pTaskId->streamId) < 0) return -1;
2023-09-16 17:19:59 +00:00
int32_t taskId = 0;
if (tDecodeI32(pDecoder, &taskId) < 0) return -1;
pTaskId->taskId = taskId;
2023-08-31 07:28:31 +00:00
tEndDecode(pDecoder);
return 0;
}
2022-06-06 08:34:12 +00:00
2023-07-21 17:02:58 +00:00
static void freeItem(void* p) {
SStreamContinueExecInfo* pInfo = p;
rpcFreeCont(pInfo->msg.pCont);
}
2023-07-25 01:25:18 +00:00
static void freeUpstreamItem(void* p) {
SStreamChildEpInfo** pInfo = p;
taosMemoryFree(*pInfo);
}
2023-04-08 17:39:09 +00:00
void tFreeStreamTask(SStreamTask* pTask) {
2023-08-18 02:30:00 +00:00
int32_t taskId = pTask->id.taskId;
2023-09-23 17:26:51 +00:00
STaskExecStatisInfo* pStatis = &pTask->execInfo;
2023-09-20 10:18:31 +00:00
stDebug("start to free s-task:0x%x, %p, state:%p, status:%s", taskId, pTask, pTask->pState,
streamGetTaskStatusStr(pTask->status.taskStatus));
2023-09-23 17:26:51 +00:00
stDebug("s-task:0x%x task exec summary: create:%" PRId64 ", init:%" PRId64 ", start:%" PRId64
", updateCount:%d latestUpdate:%" PRId64 ", latestCheckPoint:%" PRId64 ", ver:%" PRId64
2023-09-23 17:26:51 +00:00
" nextProcessVer:%" PRId64", checkpointCount:%d",
taskId, pStatis->created, pStatis->init, pStatis->start, pStatis->updateCount, pStatis->latestUpdateTs,
2023-09-23 17:26:51 +00:00
pTask->chkInfo.checkpointId, pTask->chkInfo.checkpointVer, pTask->chkInfo.nextProcessVer,
pStatis->checkpoint);
2023-07-11 11:29:52 +00:00
// remove the ref by timer
2023-08-18 13:42:35 +00:00
while (pTask->status.timerActive > 0) {
2023-09-20 10:18:31 +00:00
stDebug("s-task:%s wait for task stop timer activities", pTask->id.idStr);
taosMsleep(10);
}
2023-08-31 05:46:48 +00:00
if (pTask->schedInfo.pTimer != NULL) {
taosTmrStop(pTask->schedInfo.pTimer);
pTask->schedInfo.pTimer = NULL;
}
2023-09-23 17:26:51 +00:00
if (pTask->pTimer != NULL) {
if (pTask->pTimer->hTaskLaunchTimer != NULL) {
taosTmrStop(pTask->pTimer->hTaskLaunchTimer);
pTask->pTimer->hTaskLaunchTimer = NULL;
}
if (pTask->pTimer->dispatchTimer != NULL) {
taosTmrStop(pTask->pTimer->dispatchTimer);
pTask->pTimer->dispatchTimer = NULL;
}
taosMemoryFreeClear(pTask->pTimer);
}
2023-04-27 14:38:33 +00:00
int32_t status = atomic_load_8((int8_t*)&(pTask->status.taskStatus));
2023-08-31 05:46:48 +00:00
if (pTask->inputInfo.queue) {
streamQueueClose(pTask->inputInfo.queue, pTask->id.taskId);
2023-04-08 17:39:09 +00:00
}
2023-07-11 11:29:52 +00:00
2023-07-19 10:42:02 +00:00
if (pTask->outputInfo.queue) {
streamQueueClose(pTask->outputInfo.queue, pTask->id.taskId);
2023-04-08 17:39:09 +00:00
}
2023-07-11 11:29:52 +00:00
2023-04-08 17:39:09 +00:00
if (pTask->exec.qmsg) {
taosMemoryFree(pTask->exec.qmsg);
}
if (pTask->exec.pExecutor) {
qDestroyTask(pTask->exec.pExecutor);
pTask->exec.pExecutor = NULL;
}
2023-04-13 15:49:14 +00:00
if (pTask->exec.pWalReader != NULL) {
walCloseReader(pTask->exec.pWalReader);
}
2023-09-22 08:05:12 +00:00
pTask->pReadyMsgList = taosArrayDestroy(pTask->pReadyMsgList);
if (pTask->msgInfo.pData != NULL) {
destroyDispatchMsg(pTask->msgInfo.pData, getNumOfDispatchBranch(pTask));
pTask->msgInfo.pData = NULL;
pTask->msgInfo.dispatchMsgType = 0;
}
2023-07-19 10:42:02 +00:00
if (pTask->outputInfo.type == TASK_OUTPUT__TABLE) {
2023-05-04 08:15:14 +00:00
tDeleteSchemaWrapper(pTask->tbSink.pSchemaWrapper);
2022-08-17 11:44:53 +00:00
taosMemoryFree(pTask->tbSink.pTSchema);
2023-05-11 10:47:27 +00:00
tSimpleHashCleanup(pTask->tbSink.pTblInfo);
2023-07-19 10:42:02 +00:00
} else if (pTask->outputInfo.type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
2022-08-17 11:44:53 +00:00
taosArrayDestroy(pTask->shuffleDispatcher.dbInfo.pVgroupInfos);
2023-07-18 04:01:20 +00:00
pTask->checkReqIds = taosArrayDestroy(pTask->checkReqIds);
2022-08-17 11:44:53 +00:00
}
2023-04-08 17:39:09 +00:00
if (pTask->pState) {
2023-09-20 10:18:31 +00:00
stDebug("s-task:0x%x start to free task state", taskId);
2023-04-27 14:38:33 +00:00
streamStateClose(pTask->pState, status == TASK_STATUS__DROPPING);
2023-04-08 17:39:09 +00:00
}
2023-04-27 14:38:33 +00:00
if (pTask->id.idStr != NULL) {
taosMemoryFree((void*)pTask->id.idStr);
}
2023-06-07 02:45:27 +00:00
if (pTask->pNameMap) {
tSimpleHashCleanup(pTask->pNameMap);
}
2023-07-21 15:05:42 +00:00
if (pTask->pRspMsgList != NULL) {
2023-07-21 17:02:58 +00:00
taosArrayDestroyEx(pTask->pRspMsgList, freeItem);
pTask->pRspMsgList = NULL;
2023-07-21 15:05:42 +00:00
}
2023-07-25 01:25:18 +00:00
if (pTask->pUpstreamInfoList != NULL) {
taosArrayDestroyEx(pTask->pUpstreamInfoList, freeUpstreamItem);
pTask->pUpstreamInfoList = NULL;
}
pTask->msgInfo.pRetryList = taosArrayDestroy(pTask->msgInfo.pRetryList);
2023-09-15 01:25:22 +00:00
taosMemoryFree(pTask->pTokenBucket);
2023-07-21 15:05:42 +00:00
taosThreadMutexDestroy(&pTask->lock);
2022-06-06 08:34:12 +00:00
taosMemoryFree(pTask);
2023-08-18 02:30:00 +00:00
2023-09-20 10:18:31 +00:00
stDebug("s-task:0x%x free task completed", taskId);
2022-06-06 08:34:12 +00:00
}
int32_t streamTaskInit(SStreamTask* pTask, SStreamMeta* pMeta, SMsgCb* pMsgCb, int64_t ver) {
pTask->id.idStr = createStreamTaskIdStr(pTask->id.streamId, pTask->id.taskId);
pTask->refCnt = 1;
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
pTask->status.timerActive = 0;
2023-08-31 05:46:48 +00:00
pTask->inputInfo.queue = streamQueueOpen(512 << 10);
pTask->outputInfo.queue = streamQueueOpen(512 << 10);
2023-08-31 05:46:48 +00:00
if (pTask->inputInfo.queue == NULL || pTask->outputInfo.queue == NULL) {
2023-09-20 10:18:31 +00:00
stError("s-task:%s failed to prepare the input/output queue, initialize task failed", pTask->id.idStr);
2023-09-15 01:25:22 +00:00
return TSDB_CODE_OUT_OF_MEMORY;
}
2023-09-23 17:26:51 +00:00
pTask->execInfo.created = taosGetTimestampMs();
2023-08-31 05:46:48 +00:00
pTask->inputInfo.status = TASK_INPUT_STATUS__NORMAL;
pTask->outputInfo.status = TASK_OUTPUT_STATUS__NORMAL;
pTask->pMeta = pMeta;
pTask->chkInfo.nextProcessVer = ver;
pTask->dataRange.range.maxVer = ver;
pTask->dataRange.range.minVer = ver;
pTask->pMsgCb = pMsgCb;
pTask->msgInfo.pRetryList = taosArrayInit(4, sizeof(int32_t));
2023-09-15 01:25:22 +00:00
pTask->pTokenBucket = taosMemoryCalloc(1, sizeof(STokenBucket));
if (pTask->pTokenBucket == NULL) {
2023-09-20 10:18:31 +00:00
stError("s-task:%s failed to prepare the tokenBucket, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
2023-09-15 01:25:22 +00:00
return TSDB_CODE_OUT_OF_MEMORY;
}
2023-09-23 17:26:51 +00:00
pTask->pTimer = taosMemoryCalloc(1, sizeof(STaskTimer));
if (pTask->pTimer == NULL) {
stError("s-task:%s failed to prepare the timer, code:%s", pTask->id.idStr, tstrerror(TSDB_CODE_OUT_OF_MEMORY));
return TSDB_CODE_OUT_OF_MEMORY;
}
2023-09-15 01:25:22 +00:00
streamTaskInitTokenBucket(pTask->pTokenBucket, 50, 50);
2023-09-14 06:09:05 +00:00
TdThreadMutexAttr attr = {0};
2023-09-15 01:25:22 +00:00
int code = taosThreadMutexAttrInit(&attr);
if (code != 0) {
2023-09-20 10:18:31 +00:00
stError("s-task:%s initElapsed mutex attr failed, code:%s", pTask->id.idStr, tstrerror(code));
2023-09-15 01:25:22 +00:00
return code;
2023-09-14 06:09:05 +00:00
}
2023-09-15 01:25:22 +00:00
code = taosThreadMutexAttrSetType(&attr, PTHREAD_MUTEX_RECURSIVE);
if (code != 0) {
2023-09-20 10:18:31 +00:00
stError("s-task:%s set mutex attr recursive, code:%s", pTask->id.idStr, tstrerror(code));
2023-09-15 01:25:22 +00:00
return code;
2023-09-14 06:09:05 +00:00
}
taosThreadMutexInit(&pTask->lock, &attr);
streamTaskOpenAllUpstreamInput(pTask);
return TSDB_CODE_SUCCESS;
}
int32_t streamTaskGetNumOfDownstream(const SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SINK) {
return 0;
} else {
2023-07-24 07:23:03 +00:00
int32_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__FIXED_DISPATCH || type == TASK_OUTPUT__TABLE) {
return 1;
} else {
SArray* vgInfo = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
return taosArrayGetSize(vgInfo);
}
}
}
2023-07-31 06:07:18 +00:00
static SStreamChildEpInfo* createStreamTaskEpInfo(const SStreamTask* pTask) {
SStreamChildEpInfo* pEpInfo = taosMemoryMalloc(sizeof(SStreamChildEpInfo));
if (pEpInfo == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return NULL;
}
pEpInfo->childId = pTask->info.selfChildId;
pEpInfo->epSet = pTask->info.epSet;
pEpInfo->nodeId = pTask->info.nodeId;
pEpInfo->taskId = pTask->id.taskId;
pEpInfo->stage = -1;
2023-07-31 06:07:18 +00:00
return pEpInfo;
}
int32_t streamTaskSetUpstreamInfo(SStreamTask* pTask, const SStreamTask* pUpstreamTask) {
SStreamChildEpInfo* pEpInfo = createStreamTaskEpInfo(pUpstreamTask);
if (pEpInfo == NULL) {
return TSDB_CODE_OUT_OF_MEMORY;
}
if (pTask->pUpstreamInfoList == NULL) {
pTask->pUpstreamInfoList = taosArrayInit(4, POINTER_BYTES);
}
taosArrayPush(pTask->pUpstreamInfoList, &pEpInfo);
return TSDB_CODE_SUCCESS;
}
void streamTaskUpdateUpstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
2023-07-31 06:07:18 +00:00
int32_t numOfUpstream = taosArrayGetSize(pTask->pUpstreamInfoList);
2023-08-18 13:42:35 +00:00
for (int32_t i = 0; i < numOfUpstream; ++i) {
2023-08-21 11:08:27 +00:00
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
2023-07-31 06:07:18 +00:00
if (pInfo->nodeId == nodeId) {
epsetAssign(&pInfo->epSet, pEpSet);
2023-09-20 10:18:31 +00:00
stDebug("s-task:0x%x update the upstreamInfo taskId:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
2023-09-16 13:37:30 +00:00
pInfo->taskId, nodeId, buf);
2023-07-31 06:07:18 +00:00
break;
}
}
}
void streamTaskSetFixedDownstreamInfo(SStreamTask* pTask, const SStreamTask* pDownstreamTask) {
2023-09-23 17:26:51 +00:00
STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher;
2023-07-31 06:07:18 +00:00
pDispatcher->taskId = pDownstreamTask->id.taskId;
pDispatcher->nodeId = pDownstreamTask->info.nodeId;
pDispatcher->epSet = pDownstreamTask->info.epSet;
pTask->outputInfo.type = TASK_OUTPUT__FIXED_DISPATCH;
pTask->msgInfo.msgType = TDMT_STREAM_TASK_DISPATCH;
}
void streamTaskUpdateDownstreamInfo(SStreamTask* pTask, int32_t nodeId, const SEpSet* pEpSet) {
char buf[512] = {0};
EPSET_TO_STR(pEpSet, buf);
2023-07-31 06:07:18 +00:00
int8_t type = pTask->outputInfo.type;
if (type == TASK_OUTPUT__SHUFFLE_DISPATCH) {
SArray* pVgs = pTask->shuffleDispatcher.dbInfo.pVgroupInfos;
int32_t numOfVgroups = taosArrayGetSize(pVgs);
for (int32_t i = 0; i < numOfVgroups; i++) {
SVgroupInfo* pVgInfo = taosArrayGet(pVgs, i);
if (pVgInfo->vgId == nodeId) {
epsetAssign(&pVgInfo->epSet, pEpSet);
2023-09-20 10:18:31 +00:00
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpset:%s", pTask->id.taskId,
2023-09-16 13:37:30 +00:00
pVgInfo->taskId, nodeId, buf);
2023-07-31 06:07:18 +00:00
break;
}
}
} else if (type == TASK_OUTPUT__FIXED_DISPATCH) {
2023-09-23 17:26:51 +00:00
STaskDispatcherFixed* pDispatcher = &pTask->fixedDispatcher;
2023-07-31 06:07:18 +00:00
if (pDispatcher->nodeId == nodeId) {
epsetAssign(&pDispatcher->epSet, pEpSet);
2023-09-20 10:18:31 +00:00
stDebug("s-task:0x%x update the dispatch info, task:0x%x(nodeId:%d) newEpSet:%s", pTask->id.taskId,
2023-09-16 13:37:30 +00:00
pDispatcher->taskId, nodeId, buf);
2023-07-31 06:07:18 +00:00
}
} else {
// do nothing
}
}
int32_t streamTaskStop(SStreamTask* pTask) {
2023-09-26 04:15:12 +00:00
int32_t vgId = pTask->pMeta->vgId;
int64_t st = taosGetTimestampMs();
const char* id = pTask->id.idStr;
taosThreadMutexLock(&pTask->lock);
if (pTask->status.taskStatus == TASK_STATUS__CK) {
2023-09-26 04:15:12 +00:00
stDebug("s-task:%s in checkpoint will be discarded since task is stopped", id);
}
pTask->status.taskStatus = TASK_STATUS__STOP;
taosThreadMutexUnlock(&pTask->lock);
qKillTask(pTask->exec.pExecutor, TSDB_CODE_SUCCESS);
2023-08-31 01:28:43 +00:00
while (/*pTask->status.schedStatus != TASK_SCHED_STATUS__INACTIVE */ !streamTaskIsIdle(pTask)) {
2023-09-26 04:15:12 +00:00
stDebug("s-task:%s level:%d wait for task to be idle and then close, check again in 100ms", id,
pTask->info.taskLevel);
taosMsleep(100);
}
int64_t el = taosGetTimestampMs() - st;
2023-09-26 04:15:12 +00:00
stDebug("vgId:%d s-task:%s is closed in %" PRId64 " ms", vgId, id, el);
return 0;
}
int32_t doUpdateTaskEpset(SStreamTask* pTask, int32_t nodeId, SEpSet* pEpSet) {
char buf[512] = {0};
if (pTask->info.nodeId == nodeId) { // execution task should be moved away
epsetAssign(&pTask->info.epSet, pEpSet);
EPSET_TO_STR(pEpSet, buf)
2023-09-20 10:18:31 +00:00
stDebug("s-task:0x%x (vgId:%d) self node epset is updated %s", pTask->id.taskId, nodeId, buf);
}
2023-08-08 01:11:41 +00:00
// check for the dispath info and the upstream task info
int32_t level = pTask->info.taskLevel;
if (level == TASK_LEVEL__SOURCE) {
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else if (level == TASK_LEVEL__AGG) {
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
streamTaskUpdateDownstreamInfo(pTask, nodeId, pEpSet);
} else { // TASK_LEVEL__SINK
streamTaskUpdateUpstreamInfo(pTask, nodeId, pEpSet);
}
2023-08-08 01:11:41 +00:00
return 0;
}
2023-08-08 01:11:41 +00:00
int32_t streamTaskUpdateEpsetInfo(SStreamTask* pTask, SArray* pNodeList) {
2023-09-23 17:26:51 +00:00
STaskExecStatisInfo* p = &pTask->execInfo;
2023-09-15 01:25:22 +00:00
2023-09-16 13:37:30 +00:00
int32_t numOfNodes = taosArrayGetSize(pNodeList);
int64_t prevTs = p->latestUpdateTs;
2023-09-15 01:25:22 +00:00
p->latestUpdateTs = taosGetTimestampMs();
2023-09-16 13:37:30 +00:00
p->updateCount += 1;
2023-09-20 10:18:31 +00:00
stDebug("s-task:%s update task nodeEp epset, updatedNodes:%d, updateCount:%d, prevTs:%" PRId64, pTask->id.idStr,
2023-09-16 13:37:30 +00:00
numOfNodes, p->updateCount, prevTs);
2023-09-15 01:25:22 +00:00
2023-08-18 13:42:35 +00:00
for (int32_t i = 0; i < taosArrayGetSize(pNodeList); ++i) {
SNodeUpdateInfo* pInfo = taosArrayGet(pNodeList, i);
doUpdateTaskEpset(pTask, pInfo->nodeId, &pInfo->newEp);
2023-08-08 01:11:41 +00:00
}
return 0;
}
void streamTaskResetUpstreamStageInfo(SStreamTask* pTask) {
if (pTask->info.taskLevel == TASK_LEVEL__SOURCE) {
return;
}
int32_t size = taosArrayGetSize(pTask->pUpstreamInfoList);
2023-08-18 13:42:35 +00:00
for (int32_t i = 0; i < size; ++i) {
SStreamChildEpInfo* pInfo = taosArrayGetP(pTask->pUpstreamInfoList, i);
pInfo->stage = -1;
}
2023-09-20 10:18:31 +00:00
stDebug("s-task:%s reset all upstream tasks stage info", pTask->id.idStr);
2023-08-31 07:28:31 +00:00
}
int8_t streamTaskSetSchedStatusWait(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
if (status == TASK_SCHED_STATUS__INACTIVE) {
pTask->status.schedStatus = TASK_SCHED_STATUS__WAITING;
}
taosThreadMutexUnlock(&pTask->lock);
return status;
}
int8_t streamTaskSetSchedStatusActive(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
if (status == TASK_SCHED_STATUS__WAITING) {
pTask->status.schedStatus = TASK_SCHED_STATUS__ACTIVE;
}
taosThreadMutexUnlock(&pTask->lock);
return status;
}
int8_t streamTaskSetSchedStatusInActive(SStreamTask* pTask) {
taosThreadMutexLock(&pTask->lock);
int8_t status = pTask->status.schedStatus;
2023-09-14 06:09:05 +00:00
ASSERT(status == TASK_SCHED_STATUS__WAITING || status == TASK_SCHED_STATUS__ACTIVE ||
status == TASK_SCHED_STATUS__INACTIVE);
pTask->status.schedStatus = TASK_SCHED_STATUS__INACTIVE;
taosThreadMutexUnlock(&pTask->lock);
return status;
}
2023-09-18 07:14:51 +00:00
int32_t streamBuildAndSendDropTaskMsg(SMsgCb* pMsgCb, int32_t vgId, SStreamTaskId* pTaskId) {
SVDropStreamTaskReq *pReq = rpcMallocCont(sizeof(SVDropStreamTaskReq));
if (pReq == NULL) {
terrno = TSDB_CODE_OUT_OF_MEMORY;
return -1;
}
pReq->head.vgId = vgId;
pReq->taskId = pTaskId->taskId;
pReq->streamId = pTaskId->streamId;
SRpcMsg msg = {.msgType = TDMT_STREAM_TASK_DROP, .pCont = pReq, .contLen = sizeof(SVDropStreamTaskReq)};
2023-09-18 07:14:51 +00:00
int32_t code = tmsgPutToQueue(pMsgCb, WRITE_QUEUE, &msg);
if (code != TSDB_CODE_SUCCESS) {
2023-09-20 10:18:31 +00:00
stError("vgId:%d failed to send drop task:0x%x msg, code:%s", vgId, pTaskId->taskId, tstrerror(code));
return code;
}
2023-09-20 10:18:31 +00:00
stDebug("vgId:%d build and send drop table:0x%x msg", vgId, pTaskId->taskId);
return code;
}
2023-09-16 13:37:30 +00:00
STaskId extractStreamTaskKey(const SStreamTask* pTask) {
STaskId id = {.streamId = pTask->id.streamId, .taskId = pTask->id.taskId};
return id;
}