TDengine/test/new_test_framework/utils/compatibilityUtil.py

799 lines
No EOL
37 KiB
Python
Raw Permalink Blame History

This file contains ambiguous Unicode characters

This file contains Unicode characters that might be confused with other characters. If you think that this is intentional, you can safely ignore this warning. Use the Escape button to reveal them.

###################################################################
# Copyright (c) 2016 by TAOS Technologies, Inc.
# All rights reserved.
#
# This file is proprietary and confidential to TAOS Technologies.
# No part of this file may be reproduced, stored, transmitted,
# disclosed or used in any form or by any means other than as
# expressly provided by the written permission from Jianhui Tao
#
###################################################################
# -*- coding: utf-8 -*-
import taos
import taosws
import sys
import os
import re
import time
import platform
import subprocess
from pathlib import Path
from .log import *
from .sql import *
from .server.dnodes import *
from .common import *
from taos.tmq import Consumer
from new_test_framework.utils import clusterComCheck
from taos.error import TmqError
def _create_so_symlinks(lib_dir: str) -> None:
"""为 lib_dir 内所有 libXXX.so.VER 创建不带版本号的符号链接 libXXX.so。
包里只提供 libtaos.so.3.3.7.9 这类带版本号的文件,但 taos 二进制链接的是
不带版本号的 libtaos.so 或 SONAME如 libtaos.so.1)。若没有对应符号链接,
LD_LIBRARY_PATH 指向该目录时动态链接器无法找到库,会回退到系统路径。
"""
import re as _re
import subprocess as _sp
p = Path(lib_dir)
for versioned in p.glob("lib*.so.*"):
if not versioned.is_file():
continue
name = versioned.name
dot_so = name.find(".so.")
if dot_so == -1:
continue
# 1. 不带版本号链接libtaos.so -> libtaos.so.3.3.7.9
unversioned = p / (name[: dot_so + 3])
if not unversioned.exists():
unversioned.symlink_to(versioned.name)
# 2. SONAME 链接libtaos.so.1 -> libtaos.so.3.3.5.0(旧版本 soname
try:
out = _sp.run(
["readelf", "-d", str(versioned)],
capture_output=True, text=True, timeout=10
).stdout
for line in out.splitlines():
if "SONAME" in line:
m = _re.search(r'\[([^\]]+)\]', line)
if m:
soname_link = p / m.group(1)
if not soname_link.exists():
soname_link.symlink_to(versioned.name)
break
except Exception:
pass
deletedDataSql = '''drop database if exists deldata;create database deldata duration 100 stt_trigger 1; ;use deldata;
create table deldata.stb1 (ts timestamp, c1 int, c2 bigint, c3 smallint, c4 tinyint, c5 float, c6 double, c7 bool, c8 binary(16),c9 nchar(32), c10 timestamp) tags (t1 int);
create table deldata.ct1 using deldata.stb1 tags ( 1 );
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
select avg(c1) from deldata.ct1;
delete from deldata.stb1;
flush database deldata;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );
delete from deldata.ct1;
insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a );
flush database deldata;'''
tableNumbers=100
recordNumbers1=1000
recordNumbers2=1000
first_consumer_rows=0
topic_select_sql = "select current,voltage,phase from test.meters where voltage >= 10;"
select_topic = "select_test_meters_topic"
db_topic = "db_test_topic"
stable_topic = "stable_test_meters_topic"
dbname = "test"
stb = f"{dbname}.meters"
class CompatibilityBase:
def checkProcessPid(self,processName):
tdLog.info(f"checkProcessPid {processName}")
i=0
while i<60:
tdLog.info(f"wait stop {processName}")
processPid = subprocess.getstatusoutput(f'ps aux|grep {processName} |grep -v "grep"|awk \'{{print $2}}\'')[1]
tdLog.info(f"times:{i},{processName}-pid:{processPid}")
if(processPid == ""):
break
i += 1
time.sleep(1)
else:
tdLog.info(f'this processName is not stopped in 60s')
def version_compare(self, version1, version2):
"""
Compare two version strings.
Returns 1 if version1 > version2, -1 if version1 < version2, 0 if equal
"""
v1_parts = [int(x) for x in version1.split('.')]
v2_parts = [int(x) for x in version2.split('.')]
# Pad shorter version with zeros
max_len = max(len(v1_parts), len(v2_parts))
v1_parts.extend([0] * (max_len - len(v1_parts)))
v2_parts.extend([0] * (max_len - len(v2_parts)))
for v1, v2 in zip(v1_parts, v2_parts):
if v1 > v2:
return 1
elif v1 < v2:
return -1
return 0
# Modified installTaosd to accept version parameter
def installTaosdForRollingUpgrade(self, dnodePaths, base_version):
packagePath = "/usr/local/src/"
# New download URL format
if platform.system() == "Linux" and platform.machine() == "aarch64":
packageName = f"tdengine-tsdb-oss-{base_version}-linux-arm64.tar.gz"
download_url = f"https://downloads.taosdata.com/tdengine-tsdb-oss/{base_version}/{packageName}"
else:
packageName = f"tdengine-tsdb-oss-{base_version}-linux-x64.tar.gz"
download_url = f"https://downloads.taosdata.com/tdengine-tsdb-oss/{base_version}/{packageName}"
tdLog.info(f"wget {download_url}")
# Compute extracted directory name (strip "-linux-{arch}.tar.gz")
packageTPath = re.sub(r"-linux-(x64|arm64)\.tar\.gz$", "", packageName, flags=re.IGNORECASE)
extract_dir = os.path.join(packagePath, packageTPath)
my_file = Path(f"{packagePath}/{packageName}")
if not my_file.exists():
print(f"{packageName} is not exists")
tdLog.info(f"cd {packagePath} && wget {download_url}")
ret_code = os.system(f"cd {packagePath} && wget {download_url}")
if ret_code != 0:
return False
# Check if file was actually downloaded
my_file = Path(f"{packagePath}/{packageName}")
if not my_file.exists():
return False
else:
print(f"{packageName} has been exists")
# Extract without running install.sh to avoid overwriting system binaries
if not Path(extract_dir).exists():
extract_ret = os.system(f"cd {packagePath} && tar xf {packageName} > /dev/null 2>&1")
if extract_ret != 0:
return False
# The outer archive contains package.tar.gz with the actual bin/ tree.
# Extract it if bin/ is not yet present.
bin_dir = os.path.join(extract_dir, "bin")
if not Path(bin_dir).exists():
inner_pkg = os.path.join(extract_dir, "package.tar.gz")
if not Path(inner_pkg).exists():
tdLog.error(f"Neither bin/ nor package.tar.gz found in: {extract_dir}")
return False
extract_ret = os.system(f"cd {extract_dir} && tar xf package.tar.gz > /dev/null 2>&1")
if extract_ret != 0:
return False
self.old_bin_dir = bin_dir
# Libraries are in driver/ (not lib/) for this package layout.
lib_dir = os.path.join(extract_dir, "driver")
if not Path(lib_dir).exists():
lib_dir = os.path.join(extract_dir, "lib")
self.old_lib_dir = lib_dir
_create_so_symlinks(lib_dir)
for dnodePath in dnodePaths:
tdLog.info(f"start taosd: rm -rf {dnodePath}data/* && LD_LIBRARY_PATH={self.old_lib_dir} nohup {self.old_bin_dir}/taosd -c {dnodePath}cfg/ &")
os.system(f"rm -rf {dnodePath}data/* && LD_LIBRARY_PATH={self.old_lib_dir} nohup {self.old_bin_dir}/taosd -c {dnodePath}cfg/ &")
os.system(f"killall taosadapter")
taosadapter_toml_src = "/etc/taos/taosadapter.toml"
if os.path.exists(taosadapter_toml_src):
os.system(f"cp {taosadapter_toml_src} {dnodePath}cfg/taosadapter.toml")
taosadapter_cfg = dnodePath + "cfg/taosadapter.toml"
taosadapter_log_path = dnodePath + "log/"
print(f"taosadapter_cfg:{taosadapter_cfg}, taosadapter_log_path:{taosadapter_log_path}")
self.alter_string_in_file(taosadapter_cfg,"#path = \"/var/log/taos\"",f"path = \"{taosadapter_log_path}\"")
self.alter_string_in_file(taosadapter_cfg,"taosConfigDir = \"\"",f"taosConfigDir = \"{dnodePath}cfg/\"")
print(f"{self.old_bin_dir}/taosadapter --version")
os.system(f"{self.old_bin_dir}/taosadapter --version")
print(f"LD_LIBRARY_PATH={self.old_lib_dir} {self.old_bin_dir}/taosadapter -c {taosadapter_cfg} 2>&1 &")
os.system(f"LD_LIBRARY_PATH={self.old_lib_dir} {self.old_bin_dir}/taosadapter -c {taosadapter_cfg} 2>&1 &")
time.sleep(5)
return True
def installTaosd(self, bPath, cPath, base_version):
packagePath = "/usr/local/src/"
dataPath = cPath + "/../data/"
packageType = "server"
# Check if version is 3.3.7.0 or later
if self.version_compare(base_version, "3.3.7.0") >= 0:
# Use new download URL format for 3.3.7.0 and later versions
if platform.system() == "Linux" and platform.machine() == "aarch64":
packageName = f"tdengine-tsdb-oss-{base_version}-linux-arm64.tar.gz"
download_url = f"https://downloads.taosdata.com/tdengine-tsdb-oss/{base_version}/{packageName}"
else:
packageName = f"tdengine-tsdb-oss-{base_version}-linux-x64.tar.gz"
download_url = f"https://downloads.taosdata.com/tdengine-tsdb-oss/{base_version}/{packageName}"
else:
# Use old download URL format for versions before 3.3.7.0
if platform.system() == "Linux" and platform.machine() == "aarch64":
packageName = "TDengine-"+ packageType + "-" + base_version + "-Linux-arm64.tar.gz"
else:
packageName = "TDengine-"+ packageType + "-" + base_version + "-Linux-x64.tar.gz"
# Determine download URL
download_url = f"https://www.taosdata.com/assets-download/3.0/{packageName}"
tdLog.info(f"wget {download_url}")
my_file = Path(f"{packagePath}/{packageName}")
if not my_file.exists():
print(f"{packageName} is not exists")
tdLog.info(f"cd {packagePath} && wget {download_url}")
os.system(f"cd {packagePath} && wget {download_url}")
else:
print(f"{packageName} has been exists")
# Compute extracted directory name and extract without running install.sh
packageTPath = re.sub(r"-linux-(x64|arm64)\.tar\.gz$", "", packageName, flags=re.IGNORECASE)
extract_dir = os.path.join(packagePath, packageTPath)
if not Path(extract_dir).exists():
os.system(f"cd {packagePath} && tar xf {packageName} > /dev/null 2>&1")
# The outer archive contains package.tar.gz with the actual bin/ tree.
# Extract it if bin/ is not yet present.
bin_dir = os.path.join(extract_dir, "bin")
if not Path(bin_dir).exists():
inner_pkg = os.path.join(extract_dir, "package.tar.gz")
if Path(inner_pkg).exists():
os.system(f"cd {extract_dir} && tar xf package.tar.gz > /dev/null 2>&1")
self.old_bin_dir = bin_dir
# Libraries are in driver/ (not lib/) for this package layout.
lib_dir = os.path.join(extract_dir, "driver")
if not Path(lib_dir).exists():
lib_dir = os.path.join(extract_dir, "lib")
self.old_lib_dir = lib_dir
_create_so_symlinks(lib_dir)
os.system(f"pkill -9 taosd")
self.checkProcessPid("taosd")
print(f"start taosd: rm -rf {dataPath}/* && LD_LIBRARY_PATH={self.old_lib_dir} nohup {self.old_bin_dir}/taosd -c {cPath} &")
os.system(f"rm -rf {dataPath}/* && LD_LIBRARY_PATH={self.old_lib_dir} nohup {self.old_bin_dir}/taosd -c {cPath} &")
os.system(f"killall taosadapter")
self.checkProcessPid("taosadapter")
taosadapter_toml_src = "/etc/taos/taosadapter.toml"
if os.path.exists(taosadapter_toml_src):
os.system(f"cp {taosadapter_toml_src} {cPath}/taosadapter.toml")
taosadapter_cfg = cPath + "/taosadapter.toml"
taosadapter_log_path = cPath + "/../log/"
print(f"taosadapter_cfg:{taosadapter_cfg}, taosadapter_log_path:{taosadapter_log_path}")
self.alter_string_in_file(taosadapter_cfg,"#path = \"/var/log/taos\"",f"path = \"{taosadapter_log_path}\"")
self.alter_string_in_file(taosadapter_cfg,"taosConfigDir = \"\"",f"taosConfigDir = \"{cPath}\"")
print(f"{self.old_bin_dir}/taosadapter --version")
os.system(f"{self.old_bin_dir}/taosadapter --version")
print(f"LD_LIBRARY_PATH={self.old_lib_dir} {self.old_bin_dir}/taosadapter -c {taosadapter_cfg} 2>&1 &")
os.system(f"LD_LIBRARY_PATH={self.old_lib_dir} {self.old_bin_dir}/taosadapter -c {taosadapter_cfg} 2>&1 &")
time.sleep(5)
def buildTaosd(self,bPath):
os.system(f"cd {bPath}")
def is_list_same_as_ordered_list(self,unordered_list, ordered_list):
sorted_list = sorted(unordered_list)
return sorted_list == ordered_list
def alter_string_in_file(self,file,old_str,new_str):
"""
replace str in file
:param file
:param old_str
:param new_str
:return:
"""
file_data = ""
with open(file, "r", encoding="utf-8") as f:
for line in f:
if old_str in line:
line = line.replace(old_str,new_str)
file_data += line
with open(file,"w",encoding="utf-8") as f:
f.write(file_data)
def killAllDnodes(self):
tdLog.info("kill all dnodes")
tdLog.info("kill taosd")
os.system(f"pkill -9 taosd")
tdLog.info("kill taos")
os.system(f"pkill -9 taos")
tdLog.info("check taosd")
self.checkProcessPid("taosd")
tdLog.info("kill taosadapter")
os.system(f"pkill -9 taosadapter")
tdLog.info("check taosadapter")
self.checkProcessPid("taosadapter")
def prepareDataOnOldVersion(self, base_version, bPath,corss_major_version):
time.sleep(5)
global dbname, stb, first_consumer_rows
# Use extracted old-version binaries to avoid polluting system paths
_old_taos = f"LD_LIBRARY_PATH={self.old_lib_dir} {self.old_bin_dir}/taos"
_old_bench = f"LD_LIBRARY_PATH={self.old_lib_dir} {self.old_bin_dir}/taosBenchmark"
_old_adapter_bin = f"{self.old_bin_dir}/taosadapter"
tdLog.printNoPrefix(f"==========step1:prepare and check data in old version-{base_version}")
tdLog.info(f" {_old_bench} -t {tableNumbers} -n {recordNumbers1} -v 1 -O 5 -y ")
os.system(f"{_old_bench} -t {tableNumbers} -n {recordNumbers1} -v 1 -O 5 -y ")
os.system(f"{_old_taos} -s 'alter database test keep 365000 '")
os.system(f"{_old_taos} -s 'alter database test cachemodel \"both\" '")
os.system(f"{_old_taos} -s 'select last(*) from test.meters '")
os.system(f"{_old_taos} -s 'flush database test '")
os.system(f"{_old_taos} -s \"insert into test.d1 values (now+1s, 11, 190, 0.21), (now+2s, 11, 190, 0.21), (now+3s, 11, 190, 0.21), ('2015-07-14 08:39:59.001', 11, 190, 0.21), ('2032-08-14 08:39:59.001 ', 11, 190, 0.21) test.d3 values (now+6s, 11, 190, 0.21), (now+7s, 11, 190, 0.21), (now+8s, 11, 190, 0.21), ('2033-07-14 08:39:59.000', 119, 191, 0.25) test.d3 (ts) values ('2033-07-14 08:39:58.000');\"")
os.system(f"{_old_taos} -s 'select last(*) from test.meters '")
os.system(f"{_old_taos} -s 'flush database test '")
os.system(f"{_old_taos} -s \"insert into test.d1 values (now+11s, 11, 190, 0.21), (now+12s, 11, 190, 0.21), (now+13s, 11, 190, 0.21), (now+14s, 11, 190, 0.21), (now+15s, 11, 190, 0.21) test.d3 values (now+16s, 11, 190, 0.21), (now+17s, 11, 190, 0.21), (now+18s, 11, 190, 0.21), (now+19s, 119, 191, 0.25) test.d3 (ts) values (now+20s);\"")
os.system(f"{_old_bench} -f cases/18-StreamProcessing/30-OldPyCases/json/com_alltypedata.json -y")
os.system(f"{_old_taos} -s 'flush database curdb '")
os.system(f"{_old_taos} -s 'alter database curdb cachemodel \"both\" '")
os.system(f"{_old_taos} -s 'select count(*) from curdb.meters '")
os.system(f"{_old_taos} -s 'select last(*) from curdb.meters '")
os.system(f"{_old_taos} -s 'select sum(fc) from curdb.meters '")
os.system(f"{_old_taos} -s 'select avg(ic) from curdb.meters '")
os.system(f"{_old_taos} -s 'select min(ui) from curdb.meters '")
os.system(f"{_old_taos} -s 'select max(bi) from curdb.meters '")
# Stream processing functionality removed - migrated to new stream computing framework
# create db/stb/select topic
os.system(f'{_old_taos} -s "create topic if not exists {db_topic} with meta as database test" ')
os.system(f'{_old_taos} -s "create topic if not exists {stable_topic} as stable test.meters where tbname like \\"d3\\";" ')
os.system(f'{_old_taos} -s "create topic if not exists {select_topic} as {topic_select_sql}" ')
os.system(f'{_old_taos} -s "use test;show topics;" ')
os.system(f" {_old_adapter_bin} --version " )
consumer_dict = {
"group.id": "g1",
"td.connect.websocket.scheme": "ws",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"auto.offset.reset": "earliest",
"enable.auto.commit": "false",
}
consumer = taosws.Consumer(consumer_dict)
try:
consumer.subscribe([select_topic])
except TmqError as e:
tdLog.exit(f"subscribe error: {e}")
while True:
message = consumer.poll(timeout=1.0)
if message:
for block in message:
first_consumer_rows += block.nrows()
else:
tdLog.notice("message is null and break")
break
consumer.commit(message)
tdLog.debug(f"topic:{select_topic} ,first consumer rows is {first_consumer_rows} in old version")
break
consumer.close()
tdLog.info(f" {_old_bench} -f cases/18-StreamProcessing/30-OldPyCases/json/compa4096.json -y ")
os.system(f"{_old_bench} -f cases/18-StreamProcessing/30-OldPyCases/json/compa4096.json -y")
os.system(f"{_old_bench} -f cases/18-StreamProcessing/30-OldPyCases/json/all_insertmode_alltypes.json -y")
# os.system(f"{_old_taos} -s 'flush database db4096 '")
os.system(f"{_old_taos} -f cases/18-StreamProcessing/30-OldPyCases/json/TS-3131.tsql")
# add deleted data
os.system(f'{_old_taos} -s "{deletedDataSql}" ')
if corss_major_version:
cmd = f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taos -h localhost ;"
tdLog.info(f"new client version connect to old version taosd, commad return value:{cmd}")
if os.system(cmd) == 0:
raise Exception("failed to execute system command. cmd: %s" % cmd)
def updateNewVersion(self, bPath, cPaths, upgrade):
tdLog.printNoPrefix("==========step2:update new version ")
# upgrade only one dnode
if upgrade == 0:
tdLog.info("upgrade all dnodes")
status, output = subprocess.getstatusoutput(f'ps aux|grep taosd |grep -v "grep"|awk \'{{print $2}}\'')
if status != 0:
tdLog.error(f"Command to get PIDs failed with status {status}: {output}")
return
found_pids = []
if output:
found_pids = [pid for pid in output.strip().split('\n') if pid]
tdLog.info(f"Found PIDs: {found_pids} for 'upgrade all dnodes' scenario.")
if len(found_pids) == 0:
tdLog.info("No taosd process found keep going")
else:
pid_to_kill_for_this_dnode = found_pids[0]
tdLog.info(f"Killing taosd process, pid:{pid_to_kill_for_this_dnode} (for cPaths[{0}])")
os.system(f"kill -9 {pid_to_kill_for_this_dnode}")
self.checkProcessPid(pid_to_kill_for_this_dnode)
tdLog.info(f"Starting taosd using cPath: {cPaths[0]}")
tdLog.info(f"{bPath}/build/bin/taosd -c {cPaths[0]}cfg/ > /dev/null 2>&1 &")
os.system(f"{bPath}/build/bin/taosd -c {cPaths[0]}cfg/ > /dev/null 2>&1 &")
# upgrade all dnodes
elif upgrade == 1:
tdLog.info("upgrade all dnodes")
status, output = subprocess.getstatusoutput(f'ps aux|grep taosd |grep -v "grep"|awk \'{{print $2}}\'')
if status != 0:
tdLog.error(f"Command to get PIDs failed with status {status}: {output}")
return
found_pids = []
if output:
found_pids = [pid for pid in output.strip().split('\n') if pid]
tdLog.info(f"Found PIDs: {found_pids} for 'upgrade all dnodes' scenario.")
# Determine the number of dnodes to manage, based on cPaths or a max like 3 (original implication)
# Let's use the length of cPaths as the primary guide for how many dnodes to manage.
num_dnodes_to_manage = len(cPaths) if cPaths else 0
if num_dnodes_to_manage == 0:
tdLog.warning("cPaths is empty or not provided. Cannot upgrade all dnodes.")
return
for i in range(num_dnodes_to_manage):
pid_to_kill_for_this_dnode = None
if i < len(found_pids):
pid_to_kill_for_this_dnode = found_pids[i]
if pid_to_kill_for_this_dnode:
tdLog.info(f"Killing taosd process, pid:{pid_to_kill_for_this_dnode} (for cPaths[{i}])")
os.system(f"kill -9 {pid_to_kill_for_this_dnode}")
else:
tdLog.info(f"No running taosd PID found to kill for cPaths[{i}] (or fewer PIDs found than cPaths entries).")
self.checkProcessPid(pid_to_kill_for_this_dnode)
tdLog.info(f"Starting taosd using cPath: {cPaths[i]}")
tdLog.info(f"{bPath}/build/bin/taosd -c {cPaths[i]}cfg/ > /dev/null 2>&1 &")
os.system(f"{bPath}/build/bin/taosd -c {cPaths[i]}cfg/ > /dev/null 2>&1 &")
# no rolling upgrade
elif upgrade == 2:
tdLog.info("no upgrade mode")
self.buildTaosd(bPath)
tdLog.info(f"nohup {bPath}/build/bin/taosd -c {cPaths[0]} > /dev/null 2>&1 &")
os.system(f"nohup {bPath}/build/bin/taosd -c {cPaths[0]} > /dev/null 2>&1 &")
self.checkstatus()
def checkTagSizeAndAlterStb(self,tdsql):
tdsql.query("select * from information_schema.ins_tags where db_name = 'db_all_insert_mode'")
for i in range(tdsql.queryRows):
tag_type = tdsql.queryResult[i][4]
if "NCHAR" not in tag_type:
continue
tag_size = int(tag_type.split('(')[1].split(')')[0])
tag_value = tdsql.queryResult[i][5]
if len(tag_value) > tag_size:
new_tag_size = tag_size
while new_tag_size < len(tag_value):
new_tag_size = new_tag_size * 2
db_name = tdsql.queryResult[i][1]
stable_name = tdsql.queryResult[i][2]
tag_name = tdsql.queryResult[i][3]
if new_tag_size <= tag_size:
continue
tdLog.info(f"ALTER STABLE {db_name}.{stable_name} MODIFY TAG {tag_name} nchar({new_tag_size})")
tdLog.info(f"current tag_value is {tag_value} and tag value len is {len(tag_value)} and tag_size is {tag_size}")
tdsql.execute(f"ALTER STABLE {db_name}.{stable_name} MODIFY TAG {tag_name} nchar({new_tag_size})")
#check tag size
max_try_times = 100
try_times = 0
while try_times < max_try_times:
tdLog.info(f"select * from information_schema.ins_tags where db_name = '{db_name}' and stable_name = '{stable_name}' and tag_name = '{tag_name}'")
tdsql.query(f"select * from information_schema.ins_tags where db_name = '{db_name}' and stable_name = '{stable_name}' and tag_name = '{tag_name}'")
real_tag_type = tdsql.queryResult[0][4]
real_tag_size = int(real_tag_type.split('(')[1].split(')')[0])
if real_tag_size == new_tag_size:
tdLog.info(f"success to alter tag size from {tag_size} to {new_tag_size}")
break
time.sleep(0.5)
try_times += 1
self.checkTagSizeAndAlterStb(tdsql)
def verifyData(self,corss_major_version):
tdLog.printNoPrefix(f"==========step3:prepare and check data in new version")
time.sleep(1)
tdsql=tdCom.newTdSql()
print(tdsql)
if corss_major_version:
cmd = f" LD_LIBRARY_PATH={self.old_lib_dir} {self.old_bin_dir}/taos -h localhost ;"
print(os.system(cmd))
if os.system(cmd) == 0:
raise Exception("failed to execute system command. cmd: %s" % cmd)
tdsql.query(f"SELECT SERVER_VERSION();")
nowServerVersion=tdsql.queryResult[0][0]
tdLog.info(f"New server version is {nowServerVersion}")
tdsql.query(f"SELECT CLIENT_VERSION();")
nowClientVersion=tdsql.queryResult[0][0]
tdLog.info(f"New client version is {nowClientVersion}")
tdsql.query(f"select last(*) from curdb.meters")
tdLog.info(tdsql.queryResult)
# deal table schema is too old issue
self.checkTagSizeAndAlterStb(tdsql)
tdsql.query(f"select * from db_all_insert_mode.sml_json")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_line")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_telnet")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.rest")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.stmt")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_rest_json")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_rest_line")
tdsql.checkRows(16)
tdsql.query(f"select * from db_all_insert_mode.sml_rest_telnet")
tdsql.checkRows(16)
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers1+20)
# Stream verification removed - migrated to new stream computing framework
# checkout db4096
tdsql.query("select count(*) from db4096.stb0")
tdsql.checkData(0,0,50000)
# checkout deleted data
tdsql.execute("insert into deldata.ct1 values ( now()-0s, 0, 0, 0, 0, 0.0, 0.0, 0, 'binary0', 'nchar0', now()+0a ) ( now()-10s, 1, 11111, 111, 11, 1.11, 11.11, 1, 'binary1', 'nchar1', now()+1a ) ( now()-20s, 2, 22222, 222, 22, 2.22, 22.22, 0, 'binary2', 'nchar2', now()+2a ) ( now()-30s, 3, 33333, 333, 33, 3.33, 33.33, 1, 'binary3', 'nchar3', now()+3a );")
tdsql.execute("flush database deldata;")
tdsql.query("select avg(c1) from deldata.ct1;")
def verifyBackticksInTaosSql(self,bPath,base_version):
tdsql=tdCom.newTdSql()
tdLog.printNoPrefix("==========step4:verify backticks in taos Sql-TD18542")
tdsql.execute("drop database if exists db")
tdsql.execute("create database db")
tdsql.execute("use db")
tdsql.execute("create stable db.stb1 (ts timestamp, c1 int) tags (t1 int);")
tdsql.execute("insert into db.ct1 using db.stb1 TAGS(1) values(now(),11);")
tdsql.error(" insert into `db.ct2` using db.stb1 TAGS(9) values(now(),11);")
tdsql.error(" insert into db.`db.ct2` using db.stb1 TAGS(9) values(now(),11);")
tdsql.execute("insert into `db`.ct3 using db.stb1 TAGS(3) values(now(),13);")
tdsql.query("select * from db.ct3")
tdsql.checkData(0,1,13)
tdsql.execute("insert into db.`ct4` using db.stb1 TAGS(4) values(now(),14);")
tdsql.query("select * from db.ct4")
tdsql.checkData(0,1,14)
#check retentions
tdsql=tdCom.newTdSql()
tdsql.query("describe information_schema.ins_databases;")
qRows=tdsql.queryRows
comFlag=True
j=0
while comFlag:
for i in range(qRows) :
if tdsql.queryResult[i][0] == "retentions" :
print("parameters include retentions")
comFlag=False
break
else :
comFlag=True
j=j+1
if j == qRows:
print("parameters don't include retentions")
import inspect
caller = inspect.getframeinfo(inspect.stack()[0][0])
args = (caller.filename, caller.lineno)
tdLog.exit("%s(%d) failed" % args)
# Stream checking removed - migrated to new stream computing framework
#check TS-3131
tdsql.query("select *,tbname from d0.almlog where mcid='m0103';")
tdsql.checkRows(6)
expectList = [0,3003,20031,20032,20033,30031]
resultList = []
for i in range(6):
resultList.append(tdsql.queryResult[i][3])
print(resultList)
if self.is_list_same_as_ordered_list(resultList,expectList):
print("The unordered list is the same as the ordered list.")
else:
tdLog.exit("The unordered list is not the same as the ordered list.")
# check database test and last
# first check
tdsql.query(f"select last(*) from test.meters group by tbname")
tdLog.info(tdsql.queryResult)
# tdsql.checkRows(tableNumbers)
tdsql.query(f"select last_row(*) from test.meters group by tbname")
tdLog.info(tdsql.queryResult)
# tdsql.checkRows(tableNumbers)
tdsql.query(f"select last_row(*) from test.meters partition by tbname")
tdLog.info(tdsql.queryResult)
# tdsql.checkRows(tableNumbers)
if self.version_compare(base_version, "3.3.6.0") >= 0:
tdsql.query(f"select last(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:39:59.000")
tdsql.checkData(0,1,119)
tdsql.checkData(0,2,191)
tdsql.checkData(0,3,0.25)
tdsql.query(f"select last_row(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:39:59.000")
tdsql.checkData(0,1,119)
tdsql.checkData(0,2,191)
tdsql.checkData(0,3,0.25)
tdsql.query(f"select last(*) from test.d1")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2032-08-14 08:39:59.001")
tdsql.checkData(0,1,11)
tdsql.checkData(0,2,190)
tdsql.checkData(0,3,0.21)
# update data and check
tdsql.execute("insert into test.d2 values ('2033-07-14 08:39:59.002', 139, 182, 1.10) (now+2s, 12, 191, 0.22) test.d2 (ts) values ('2033-07-14 08:39:59.003');")
tdsql.execute("insert into test.d2 values (now+5s, 4.3, 104, 0.4);")
tdsql.query(f"select last(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:39:59.003")
tdsql.checkData(0,1,139)
tdsql.checkData(0,2,182)
tdsql.checkData(0,3,1.10)
# repeately insert data and check
tdsql.execute("insert into test.d1 values (now+1s, 11, 190, 0.21) (now+2s, 12, 191, 0.22) ('2033-07-14 08:40:01.001', 16, 180, 0.53);")
tdsql.query(f"select last(*) from test.d1")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:40:01.001")
tdsql.checkData(0,1,16)
tdsql.checkData(0,2,180)
tdsql.checkData(0,3,0.53)
tdsql.query(f"select last(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:40:01.001")
tdsql.checkData(0,1,16)
tdsql.checkData(0,2,180)
tdsql.checkData(0,3,0.53)
tdsql.query(f"select last_row(*) from test.meters")
tdLog.info(tdsql.queryResult)
tdsql.checkData(0,0,"2033-07-14 08:40:01.001")
tdsql.checkData(0,1,16)
tdsql.checkData(0,2,180)
tdsql.checkData(0,3,0.53)
# check alter config
tdsql.execute('alter all dnodes "debugFlag 131"')
tdsql.execute('alter dnode 1 "debugFlag 143"')
tdsql.execute('alter local "debugFlag 131"')
# check tmq
conn = taos.connect()
consumer = Consumer(
{
"group.id": "g1",
"td.connect.user": "root",
"td.connect.pass": "taosdata",
"enable.auto.commit": "true",
"experimental.snapshot.enable": "true",
}
)
consumer.subscribe([select_topic])
consumer_rows = 0
while True:
message = consumer.poll(timeout=1.0)
tdLog.info(f" null {message}")
if message:
for block in message:
consumer_rows += block.nrows()
tdLog.info(f"consumer rows is {consumer_rows}")
else:
print("consumer has completed and break")
break
consumer.close()
tdsql.query(f"{topic_select_sql}")
all_rows = tdsql.queryRows
if consumer_rows < all_rows - first_consumer_rows :
tdLog.exit(f"consumer rows is {consumer_rows}, less than {all_rows - first_consumer_rows}")
tdsql.query("show topics;")
tdsql.checkRows(3)
tdsql.execute(f"drop topic {select_topic};",queryTimes=10)
tdsql.execute(f"drop topic {db_topic};",queryTimes=10)
tdsql.execute(f"drop topic {stable_topic};",queryTimes=10)
os.system(f" LD_LIBRARY_PATH={bPath}/build/lib {bPath}/build/bin/taosBenchmark -t {tableNumbers} -n {recordNumbers2} -y ")
tdsql.query(f"select count(*) from {stb}")
tdsql.checkData(0,0,tableNumbers*recordNumbers2)
def checkstatus(self,retry_times=30):
# sleep before check status to avoid dnodes not ready issue
time.sleep(30)
tdsql=tdCom.newTdSql()
dnodes_ready = False
for i in range(retry_times):
tdsql.query("show dnodes;")
dnode_nums = tdsql.queryRows
ready_nums = 0
for j in range(tdsql.queryRows):
if tdsql.queryResult[j][4] == "ready":
ready_nums += 1
if ready_nums == dnode_nums:
dnodes_ready = True
break
time.sleep(1)
if not dnodes_ready:
tdLog.exit(f"dnodes are not ready in {retry_times}s")
tdLog.info(f"dnodes are ready in {retry_times}s")
modes_ready = False
for i in range(retry_times):
tdsql.query("show mnodes;")
mnode_nums = tdsql.queryRows
ready_nums = 0
for j in range(tdsql.queryRows):
if tdsql.queryResult[j][3] == "ready":
ready_nums += 1
if ready_nums == mnode_nums:
modes_ready = True
break
time.sleep(1)
if not modes_ready:
tdLog.exit(f"mnodes are not ready in {retry_times}s")
tdLog.info(f"mnodes are ready in {retry_times}s")
vnodes_ready = False
for i in range(retry_times):
tdsql.query("show vnodes;")
vnode_nums = tdsql.queryRows
ready_nums = 0
for j in range(tdsql.queryRows):
if str(tdsql.queryResult[j][6]).lower() == "true":
ready_nums += 1
if ready_nums == vnode_nums:
vnodes_ready = True
break
time.sleep(1)
if not vnodes_ready:
tdLog.exit(f"vnodes are not ready in {retry_times}s")
tdLog.info(f"vnodes are ready in {retry_times}s")
# Create instance for compatibility
tdCb = CompatibilityBase()