TDengine/source/libs/sync/test/syncWriteTest.cpp

189 lines
5.3 KiB
C++
Raw Normal View History

2022-03-17 10:55:26 +00:00
#include <gtest/gtest.h>
#include "syncTest.h"
2022-03-17 10:55:26 +00:00
void logTest() {
sTrace("--- sync log test: trace");
sDebug("--- sync log test: debug");
sInfo("--- sync log test: info");
sWarn("--- sync log test: warn");
sError("--- sync log test: error");
sFatal("--- sync log test: fatal");
}
uint16_t ports[] = {7010, 7110, 7210, 7310, 7410};
int32_t replicaNum = 1;
int32_t myIndex = 0;
SRaftId ids[TSDB_MAX_REPLICA];
SSyncInfo syncInfo;
2022-10-13 06:06:27 +00:00
SSyncFSM *pFsm;
SWal *pWal;
2022-03-17 10:55:26 +00:00
SSyncNode *gSyncNode;
2022-04-18 13:50:56 +00:00
const char *pDir = "./syncWriteTest";
void CommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
2022-03-22 08:43:30 +00:00
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==CommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
2022-11-02 02:24:55 +00:00
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
2022-04-18 13:50:56 +00:00
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
2022-03-17 10:55:26 +00:00
}
2022-04-18 13:50:56 +00:00
void PreCommitCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
2022-03-22 08:43:30 +00:00
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==PreCommitCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
2022-11-02 02:24:55 +00:00
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
2022-04-18 13:50:56 +00:00
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
2022-03-17 10:55:26 +00:00
}
2022-04-18 13:50:56 +00:00
void RollBackCb(struct SSyncFSM *pFsm, const SRpcMsg *pMsg, SFsmCbMeta cbMeta) {
2022-03-22 08:43:30 +00:00
char logBuf[256];
snprintf(logBuf, sizeof(logBuf),
"==callback== ==RollBackCb== pFsm:%p, index:%" PRId64 ", isWeak:%d, code:%d, state:%d %s \n", pFsm,
2022-11-02 02:24:55 +00:00
cbMeta.index, cbMeta.isWeak, cbMeta.code, cbMeta.state, syncStr(cbMeta.state));
2022-04-18 13:50:56 +00:00
syncRpcMsgLog2(logBuf, (SRpcMsg *)pMsg);
2022-03-17 10:55:26 +00:00
}
void initFsm() {
2022-03-25 16:29:53 +00:00
pFsm = (SSyncFSM *)taosMemoryMalloc(sizeof(SSyncFSM));
2022-11-02 07:38:30 +00:00
#if 0
2022-03-17 10:55:26 +00:00
pFsm->FpCommitCb = CommitCb;
pFsm->FpPreCommitCb = PreCommitCb;
pFsm->FpRollBackCb = RollBackCb;
2022-11-02 07:38:30 +00:00
#endif
2022-03-17 10:55:26 +00:00
}
SSyncNode *syncNodeInit() {
syncInfo.vgId = 1234;
2022-05-19 11:44:01 +00:00
syncInfo.msgcb = &gSyncIO->msgcb;
2022-11-01 07:40:23 +00:00
syncInfo.syncSendMSg = syncIOSendMsg;
syncInfo.syncEqMsg = syncIOEqMsg;
2022-03-17 10:55:26 +00:00
syncInfo.pFsm = pFsm;
2022-04-18 13:50:56 +00:00
snprintf(syncInfo.path, sizeof(syncInfo.path), "%s", pDir);
2022-03-17 10:55:26 +00:00
int code = walInit();
TD_ALWAYS_ASSERT(code == 0);
2022-03-17 10:55:26 +00:00
SWalCfg walCfg;
memset(&walCfg, 0, sizeof(SWalCfg));
walCfg.vgId = syncInfo.vgId;
walCfg.fsyncPeriod = 1000;
walCfg.retentionPeriod = 1000;
walCfg.rollPeriod = 1000;
walCfg.retentionSize = 1000;
walCfg.segSize = 1000;
walCfg.level = TAOS_WAL_FSYNC;
pWal = walOpen("./write_test_wal", &walCfg);
TD_ALWAYS_ASSERT(pWal != NULL);
2022-03-17 10:55:26 +00:00
syncInfo.pWal = pWal;
SSyncCfg *pCfg = &syncInfo.syncCfg;
pCfg->myIndex = myIndex;
pCfg->replicaNum = replicaNum;
for (int i = 0; i < replicaNum; ++i) {
pCfg->nodeInfo[i].nodePort = ports[i];
snprintf(pCfg->nodeInfo[i].nodeFqdn, sizeof(pCfg->nodeInfo[i].nodeFqdn), "%s", "127.0.0.1");
// taosGetFqdn(pCfg->nodeInfo[0].nodeFqdn);
}
SSyncNode *pSyncNode = syncNodeOpen(&syncInfo);
TD_ALWAYS_ASSERT(pSyncNode != NULL);
2022-03-17 10:55:26 +00:00
2022-11-11 09:47:48 +00:00
// gSyncIO->FpOnSyncPing = pSyncNode->FpOnPing;
// gSyncIO->FpOnSyncClientRequest = pSyncNode->FpOnClientRequest;
// gSyncIO->FpOnSyncPingReply = pSyncNode->FpOnPingReply;
// gSyncIO->FpOnSyncRequestVote = pSyncNode->FpOnRequestVote;
// gSyncIO->FpOnSyncRequestVoteReply = pSyncNode->FpOnRequestVoteReply;
// gSyncIO->FpOnSyncAppendEntries = pSyncNode->FpOnAppendEntries;
// gSyncIO->FpOnSyncAppendEntriesReply = pSyncNode->FpOnAppendEntriesReply;
// gSyncIO->FpOnSyncTimeout = pSyncNode->FpOnTimeout;
2022-03-17 10:55:26 +00:00
gSyncIO->pSyncNode = pSyncNode;
2022-04-18 13:50:56 +00:00
syncNodeStart(pSyncNode);
2022-03-17 10:55:26 +00:00
return pSyncNode;
}
SSyncNode *syncInitTest() { return syncNodeInit(); }
void initRaftId(SSyncNode *pSyncNode) {
for (int i = 0; i < replicaNum; ++i) {
ids[i] = pSyncNode->replicasId[i];
char *s = syncUtilRaftId2Str(&ids[i]);
printf("raftId[%d] : %s\n", i, s);
2022-03-25 16:29:53 +00:00
taosMemoryFree(s);
2022-03-17 10:55:26 +00:00
}
}
SRpcMsg *step0() {
2022-03-25 16:29:53 +00:00
SRpcMsg *pMsg = (SRpcMsg *)taosMemoryMalloc(sizeof(SRpcMsg));
2022-03-17 10:55:26 +00:00
memset(pMsg, 0, sizeof(SRpcMsg));
pMsg->msgType = 9999;
pMsg->contLen = 32;
2022-03-25 16:29:53 +00:00
pMsg->pCont = taosMemoryMalloc(pMsg->contLen);
2022-03-17 10:55:26 +00:00
snprintf((char *)(pMsg->pCont), pMsg->contLen, "hello, world");
return pMsg;
}
SyncClientRequest *step1(const SRpcMsg *pMsg) {
SyncClientRequest *pRetMsg = NULL;
// syncClientRequestBuild(pMsg, 123, true, 1000);
2022-03-17 10:55:26 +00:00
return pRetMsg;
}
int main(int argc, char **argv) {
// taosInitLog((char *)"syncTest.log", 100000, 10);
tsAsyncLog = 0;
sDebugFlag = 143 + 64;
void logTest();
myIndex = 0;
if (argc >= 2) {
myIndex = atoi(argv[1]);
}
int32_t ret = syncIOStart((char *)"127.0.0.1", ports[myIndex]);
TD_ALWAYS_ASSERT(ret == 0);
2022-03-17 10:55:26 +00:00
2022-10-31 15:40:43 +00:00
ret = syncInit();
TD_ALWAYS_ASSERT(ret == 0);
2022-03-17 10:55:26 +00:00
taosRemoveDir("./wal_test");
initFsm();
gSyncNode = syncInitTest();
TD_ALWAYS_ASSERT(gSyncNode != NULL);
2022-11-07 11:13:12 +00:00
sNTrace(gSyncNode, "");
2022-03-17 10:55:26 +00:00
initRaftId(gSyncNode);
// step0
SRpcMsg *pMsg0 = step0();
2022-04-18 13:50:56 +00:00
syncRpcMsgLog2((char *)"==step0==", pMsg0);
2022-03-17 10:55:26 +00:00
// step1
SyncClientRequest *pMsg1 = step1(pMsg0);
2022-04-18 13:50:56 +00:00
syncClientRequestLog2((char *)"==step1==", pMsg1);
2022-03-17 10:55:26 +00:00
// for (int i = 0; i < 10; ++i) {
// SyncClientRequest *pSyncClientRequest = pMsg1;
// SRpcMsg rpcMsg = {0};
// syncClientRequest2RpcMsg(pSyncClientRequest, &rpcMsg);
// gSyncNode->syncEqMsg(gSyncNode->msgcb, &rpcMsg);
2022-03-17 10:55:26 +00:00
// taosMsleep(1000);
// }
2022-03-17 10:55:26 +00:00
while (1) {
sTrace("while 1 sleep");
taosMsleep(1000);
}
return 0;
}