/* * Copyright (c) 2019 TAOS Data, Inc. * * This program is free software: you can use, redistribute, and/or modify * it under the terms of the GNU Affero General Public License, version 3 * or later ("AGPL"), as published by the Free Software Foundation. * * This program is distributed in the hope that it will be useful, but WITHOUT * ANY WARRANTY; without even the implied warranty of MERCHANTABILITY or * FITNESS FOR A PARTICULAR PURPOSE. * * You should have received a copy of the GNU Affero General Public License * along with this program. If not, see . */ #include #pragma GCC diagnostic push #pragma GCC diagnostic ignored "-Wwrite-strings" #pragma GCC diagnostic ignored "-Wunused-function" #pragma GCC diagnostic ignored "-Wunused-variable" #pragma GCC diagnostic ignored "-Wsign-compare" #pragma GCC diagnostic ignored "-Wformat" #pragma GCC diagnostic ignored "-Wint-to-pointer-cast" #pragma GCC diagnostic ignored "-Wpointer-arith" #include #include "dataSink.h" #include "tdatablock.h" const int64_t baseTestTime = 1745142096000; SSDataBlock* createTestBlock(int64_t timeOffset) { SSDataBlock* b = NULL; int32_t code = createDataBlock(&b); int64_t timeStart = baseTestTime + timeOffset; SColumnInfoData infoData = createColumnInfoData(TSDB_DATA_TYPE_TIMESTAMP, 8, 1); blockDataAppendColInfo(b, &infoData); SColumnInfoData infoData1 = createColumnInfoData(TSDB_DATA_TYPE_BINARY, 40, 2); blockDataAppendColInfo(b, &infoData1); blockDataEnsureCapacity(b, 100); char* str = "the value of: %d"; char buf[128] = {0}; char varbuf[128] = {0}; int64_t ts = baseTestTime; for (int32_t i = 0; i < 100; ++i) { SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0); SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1); ts = timeStart + i; if (i & 0x01) { int32_t len = sprintf(buf, str, timeOffset + i); STR_TO_VARSTR(varbuf, buf) colDataSetVal(p0, i, (const char*)&ts, false); colDataSetVal(p1, i, (const char*)varbuf, false); memset(varbuf, 0, sizeof(varbuf)); memset(buf, 0, sizeof(buf)); } else { colDataSetVal(p0, i, (const char*)&ts, false); colDataSetVal(p1, i, (const char*)varbuf, true); } b->info.rows++; } SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 0); SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b->pDataBlock, 1); printf("binary column length:%d\n", *(int32_t*)p1->pData); char* pData = colDataGetData(p1, 2); printf("the second row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); pData = colDataGetData(p1, 3); printf("the third row: %s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); return b; } bool compareBlock(SSDataBlock* b1, SSDataBlock* b2) { if (b1->info.rows != b2->info.rows) { return false; } for (int32_t i = 0; i < b1->info.rows; ++i) { SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b1->pDataBlock, 0); SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b2->pDataBlock, 0); if (*(int32_t*)colDataGetData(p0, i) != *(int32_t*)colDataGetData(p1, i)) { return false; } } SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b1->pDataBlock, 1); SColumnInfoData* p2 = (SColumnInfoData*)taosArrayGet(b2->pDataBlock, 1); char* pData = colDataGetData(p1, 3); printf("b1 the third row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); pData = colDataGetData(p1, 3); printf("b2 the third row of binary:%s, length:%d\n", (char*)varDataVal(pData), varDataLen(pData)); return true; } bool compareBlockRow(SSDataBlock* b1, SSDataBlock* b2, int32_t row1, int32_t row2) { for (int32_t i = 0; i < b1->pDataBlock->size; ++i) { SColumnInfoData* p0 = (SColumnInfoData*)taosArrayGet(b1->pDataBlock, i); SColumnInfoData* p1 = (SColumnInfoData*)taosArrayGet(b2->pDataBlock, i); if (*(int32_t*)colDataGetData(p0, row1) != *(int32_t*)colDataGetData(p1, row2)) { return false; } } return true; } TEST(dataSinkTest, fileInit) { int32_t code = initStreamDataSinkOnce(); ASSERT_EQ(code, 0); code = initStreamDataSinkOnce(); ASSERT_EQ(code, 0); destroyDataSinkManager2(); } TEST(dataSinkTest, test_name) { SSDataBlock* pBlock = createTestBlock(0); ASSERT_NE(pBlock, nullptr); int64_t streamId = 1; int64_t taskId = 1; int64_t groupID = 1; int32_t cleanMode = 1; TSKEY wstart = 0; TSKEY wend = 100; void* pCache = NULL; int32_t code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 1); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); } TEST(dataSinkTest, putStreamDataCacheTest) { SSDataBlock* pBlock = createTestBlock(0); ASSERT_NE(pBlock, nullptr); int64_t streamId = 1; int64_t taskId = 1; int64_t groupID = 1; int32_t cleanMode = DATA_CLEAN_IMMEDIATE; TSKEY wstart = baseTestTime + 0; TSKEY wend = baseTestTime + 100; void* pCache = NULL; // Test invalid parameters int32_t code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 1); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 1, 0); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 1); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); code = moveStreamDataCache(pCache, groupID, wstart, wend, pBlock); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); code = moveStreamDataCache(NULL, groupID, wstart, wend, pBlock); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); code = getStreamDataCache(pCache, groupID, wend, wstart, NULL); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); // Test valid parameters code = initStreamDataCache(streamId, taskId, cleanMode, &pCache); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 99); ASSERT_EQ(code, 0); void* pIter = NULL; // Test invalid parameters code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 100); ASSERT_NE(code, 0); code = getStreamDataCache(pCache, groupID, wend, wstart, &pIter); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); code = getStreamDataCache(NULL, groupID, wstart, wend, &pIter); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); code = getStreamDataCache(pCache, groupID, wstart, wend, NULL); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); // Test valid parameters code = getStreamDataCache(pCache, groupID, wstart, wend, &pIter); ASSERT_EQ(code, 0); code = getNextStreamDataCache(&pIter, NULL); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); SSDataBlock* pBlock1 = NULL; code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_EQ(pIter, nullptr); bool equal = compareBlock(pBlock, pBlock1); ASSERT_EQ(equal, true); blockDataDestroy(pBlock1); blockDataDestroy(pBlock); pBlock = createTestBlock(100); streamId = 1; taskId = 1; groupID = 2; cleanMode = DATA_CLEAN_IMMEDIATE; wstart = baseTestTime + 100; wend = baseTestTime + 200; pCache = NULL; code = initStreamDataCache(streamId, taskId, cleanMode, &pCache); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 99); ASSERT_EQ(code, 0); pIter = NULL; code = getStreamDataCache(pCache, groupID, wstart, wend, &pIter); ASSERT_EQ(code, 0); pBlock1 = NULL; code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_EQ(pIter, nullptr); equal = compareBlock(pBlock, pBlock1); ASSERT_EQ(equal, true); blockDataDestroy(pBlock1); blockDataDestroy(pBlock); pBlock = createTestBlock(0); streamId = 2; taskId = 1; groupID = 2; cleanMode = DATA_CLEAN_IMMEDIATE; wstart = baseTestTime + 0; wend = baseTestTime + 100; pCache = NULL; code = initStreamDataCache(streamId, taskId, cleanMode, &pCache); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 99); ASSERT_EQ(code, 0); pIter = NULL; code = getStreamDataCache(pCache, groupID, wstart, wend, &pIter); ASSERT_EQ(code, 0); pBlock1 = NULL; code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_EQ(pIter, nullptr); equal = compareBlock(pBlock, pBlock1); ASSERT_EQ(equal, true); blockDataDestroy(pBlock1); destroyDataSinkManager2(); blockDataDestroy(pBlock); } TEST(dataSinkTest, getSlidingStreamData) { SSDataBlock* pBlock = createTestBlock(0); ASSERT_NE(pBlock, nullptr); int64_t streamId = 1; int64_t taskId = 1; int64_t groupID = 1; int32_t cleanMode = DATA_CLEAN_EXPIRED; TSKEY wstart = baseTestTime + 0; TSKEY wend = baseTestTime + 100; void* pCache = NULL; int32_t code = initStreamDataCache(streamId, taskId, cleanMode, &pCache); ASSERT_EQ(code, 0); // Test invalid parameters, cleanMode is DATA_CLEAN_EXPIRED, cannot call moveStreamDataCache code = moveStreamDataCache(pCache, groupID, wstart, wend, pBlock); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 99); ASSERT_EQ(code, 0); blockDataDestroy(pBlock); pBlock = createTestBlock(100); cleanMode = DATA_CLEAN_EXPIRED; wstart = baseTestTime + 100; wend = baseTestTime + 200; code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 99); ASSERT_EQ(code, 0); void* pIter = NULL; blockDataDestroy(pBlock); code = getStreamDataCache(pCache, groupID, baseTestTime + 50, baseTestTime + 150, &pIter); ASSERT_EQ(code, 0); SSDataBlock* pBlock1 = NULL; code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_NE(pIter, nullptr); int rows = pBlock1->info.rows; ASSERT_EQ(rows, 50); blockDataDestroy(pBlock1); code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_EQ(pIter, nullptr); rows = pBlock1->info.rows; ASSERT_EQ(rows, 51); blockDataDestroy(pBlock1); pBlock = createTestBlock(200); cleanMode = DATA_CLEAN_EXPIRED; wstart = baseTestTime + 200; wend = baseTestTime + 300; code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 99); ASSERT_EQ(code, 0); pIter = NULL; code = getStreamDataCache(pCache, groupID, baseTestTime + 150, baseTestTime + 249, &pIter); ASSERT_EQ(code, 0); pBlock1 = NULL; code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_NE(pIter, nullptr); rows = pBlock1->info.rows; ASSERT_EQ(rows, 50); code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_EQ(pIter, nullptr); rows = pBlock1->info.rows; ASSERT_EQ(rows, 50); ASSERT_EQ(compareBlockRow(pBlock, pBlock1, 0, 0), true); ASSERT_EQ(compareBlockRow(pBlock, pBlock1, 1, 1), true); blockDataDestroy(pBlock); blockDataDestroy(pBlock1); destroyDataSinkManager2(); } TEST(dataSinkTest, moveStreamData) { SSDataBlock* pBlock = createTestBlock(0); ASSERT_NE(pBlock, nullptr); int64_t streamId = 3; int64_t taskId = 1; int64_t groupID = 1; int32_t cleanMode = DATA_CLEAN_IMMEDIATE; TSKEY wstart = baseTestTime + 0; TSKEY wend = baseTestTime + 100; void* pCache = NULL; int32_t code = initStreamDataCache(streamId, taskId, cleanMode, &pCache); ASSERT_EQ(code, 0); code = moveStreamDataCache(pCache, groupID, wstart, wend, pBlock); ASSERT_EQ(code, 0); void* pIter = NULL; code = getStreamDataCache(pCache, groupID, baseTestTime, baseTestTime + 100, &pIter); ASSERT_EQ(code, 0); ASSERT_NE(pIter, nullptr); SSDataBlock* pBlock1 = NULL; code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_EQ(pIter, nullptr); ASSERT_EQ(pBlock1, pBlock); blockDataDestroy(pBlock); code = getStreamDataCache(pCache, groupID, baseTestTime, baseTestTime + 100, &pIter); ASSERT_EQ(code, 0); ASSERT_NE(pIter, nullptr); pBlock1 = NULL; code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, TSDB_CODE_STREAM_INTERNAL_ERROR); ASSERT_EQ(pBlock1, nullptr); destroyDataSinkManager2(); } TEST(dataSinkTest, cancelStreamDataCacheIterateTest) { int64_t streamId = 3; int64_t taskId = 1; int64_t groupID = 1; int32_t cleanMode = DATA_CLEAN_IMMEDIATE; void* pCache = NULL; int32_t code = initStreamDataCache(streamId, taskId, cleanMode, &pCache); ASSERT_EQ(code, 0); SSDataBlock* pBlock1 = createTestBlock(0); ASSERT_NE(pBlock1, nullptr); TSKEY wstart = baseTestTime + 0; TSKEY wend = baseTestTime + 100; code = moveStreamDataCache(pCache, groupID, wstart, wend, pBlock1); ASSERT_EQ(code, 0); SSDataBlock* pBlock2 = createTestBlock(100); ASSERT_NE(pBlock2, nullptr); wstart = baseTestTime + 100; wend = baseTestTime + 200; code = moveStreamDataCache(pCache, groupID, wstart, wend, pBlock2); ASSERT_EQ(code, 0); void* pIter = NULL; code = getStreamDataCache(pCache, groupID, baseTestTime, baseTestTime + 100, &pIter); ASSERT_EQ(code, 0); ASSERT_NE(pIter, nullptr); SSDataBlock* pBlock = NULL; code = getNextStreamDataCache(&pIter, &pBlock); ASSERT_EQ(code, 0); ASSERT_NE(pBlock, nullptr); ASSERT_NE(pIter, nullptr); ASSERT_EQ(pBlock1, pBlock); cancelStreamDataCacheIterate(&pIter); blockDataDestroy(pBlock1); // pBlock1 has moveout, can destroy // blockDataDestroy(pBlock2); // pBlock2 has not moveout, can not destroy destroyDataSinkManager2(); } TEST(dataSinkTest, putStreamDataRows) { SSDataBlock* pBlock = createTestBlock(0); ASSERT_NE(pBlock, nullptr); int64_t streamId = 1; int64_t taskId = 1; int64_t groupID = 1; int32_t cleanMode = DATA_CLEAN_EXPIRED; TSKEY wstart = baseTestTime + 0; TSKEY wend = baseTestTime + 100; void* pCache = NULL; int32_t code = initStreamDataCache(streamId, taskId, cleanMode, &pCache); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 29); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 30, 79); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 80, 99); ASSERT_EQ(code, 0); blockDataDestroy(pBlock); pBlock = createTestBlock(100); cleanMode = DATA_CLEAN_EXPIRED; wstart = baseTestTime + 100; wend = baseTestTime + 200; code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 49); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 50, 99); ASSERT_EQ(code, 0); void* pIter = NULL; blockDataDestroy(pBlock); code = getStreamDataCache(pCache, groupID, baseTestTime + 50, baseTestTime + 150, &pIter); ASSERT_EQ(code, 0); SSDataBlock* pBlock1 = NULL; code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_NE(pIter, nullptr); int rows = pBlock1->info.rows; ASSERT_EQ(rows, 30); blockDataDestroy(pBlock1); code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_NE(pIter, nullptr); rows = pBlock1->info.rows; ASSERT_EQ(rows, 20); blockDataDestroy(pBlock1); code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_NE(pIter, nullptr); rows = pBlock1->info.rows; ASSERT_EQ(rows, 50); blockDataDestroy(pBlock1); destroyDataSinkManager2(); } TEST(dataSinkTest, allWriteToFileTest) { //setDataSinkMaxMemSize(0); SSDataBlock* pBlock = createTestBlock(0); ASSERT_NE(pBlock, nullptr); int64_t streamId = 1; int64_t taskId = 1; int64_t groupID = 1; int32_t cleanMode = DATA_CLEAN_EXPIRED; TSKEY wstart = baseTestTime + 0; TSKEY wend = baseTestTime + 100; void* pCache = NULL; int32_t code = initStreamDataCache(streamId, taskId, cleanMode, &pCache); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 29); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 30, 79); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 80, 99); ASSERT_EQ(code, 0); blockDataDestroy(pBlock); pBlock = createTestBlock(100); cleanMode = DATA_CLEAN_EXPIRED; wstart = baseTestTime + 100; wend = baseTestTime + 200; code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 0, 49); ASSERT_EQ(code, 0); code = putStreamDataCache(pCache, groupID, wstart, wend, pBlock, 50, 99); ASSERT_EQ(code, 0); void* pIter = NULL; blockDataDestroy(pBlock); code = getStreamDataCache(pCache, groupID, baseTestTime + 50, baseTestTime + 150, &pIter); ASSERT_EQ(code, 0); SSDataBlock* pBlock1 = NULL; code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_NE(pIter, nullptr); int rows = pBlock1->info.rows; ASSERT_EQ(rows, 30); blockDataDestroy(pBlock1); code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_NE(pIter, nullptr); rows = pBlock1->info.rows; ASSERT_EQ(rows, 20); blockDataDestroy(pBlock1); code = getNextStreamDataCache(&pIter, &pBlock1); ASSERT_EQ(code, 0); ASSERT_NE(pBlock1, nullptr); ASSERT_NE(pIter, nullptr); rows = pBlock1->info.rows; ASSERT_EQ(rows, 50); blockDataDestroy(pBlock1); destroyDataSinkManager2(); } int main(int argc, char **argv) { taos_init(); ::testing::InitGoogleTest(&argc, argv); int ret = RUN_ALL_TESTS(); taos_cleanup(); return ret; }