TDengine/test/cases/18-StreamProcessing/02-Stream/stream_syntax.py
2025-11-12 11:16:10 +08:00

1181 lines
50 KiB
Python

import random
import string
import re
from new_test_framework.utils import tdLog, tdSql, sc, clusterComCheck, tdStream
# Common time units
duration_lists = [
"1b", "1u", "1a", "1s", "1m", "1h", "1d", "1w", "1n", "1y",
"2b", "2u", "2a", "2s", "2m", "2h", "2d", "2w", "2n", "2y",
"5b", "5u", "5a", "5s", "5m", "5h", "5d", "5w", "5n", "5y",
"8b", "8u", "8a", "8s", "8m", "8h", "8d", "8w", "8n", "8y",
"12b", "12u", "12a", "12s", "12m", "12h", "12d", "12w", "12n", "12y",
"30b", "30u", "30a", "30s", "30m", "30h", "30d", "30w", "30n", "30y",
"1365b", "1365u", "1365a", "1365s", "1365m", "1365h", "1365d", "1365w", "1365n", "1365y"
]
out_columns = ["out_col1", "out_col2", "out_col3", "out_col4", "out_col5", "out_col6", "out_col7", "out_col8", "out_col9", "out_col10"]
out_tags = ["out_tag1", "out_tag2", "out_tag3", "out_tag4", "out_tag5", "out_tag6", "out_tag7", "out_tag8", "out_tag9", "out_tag10"]
counts = [10, 100]
slidings = [1, 5]
ops = ["=", "<>", "!=", ">", "<", ">=", "<="]
arith_ops = ["+", "-", "*", "/"]
logic_ops = ["AND", "OR"]
notify_option_list = ["NOTIFY_HISTORY", "ON_FAILURE_PAUSE"]
query_table_col_list = ["ts", "q_col1", "q_col2", "q_col3", "q_col4", "q_col5", "q_col6"]
as_subquery_opts = [(" AS SELECT * FROM query_table", True, 7, ["ts", "q_col1", "q_col2", "q_col3", "q_col4", "q_col5", "q_col6"], False),
(" AS SELECT first(ts), count(q_col1) from query_table", True, 2, ["first(ts)", "count(q_col1)"], False),
(" AS SELECT first(ts), count(q_col1) from query_table WHERE q_col1 > 0", True, 2, ["first(ts)", "count(q_col1)"], False),
(" AS SELECT first(ts), count(q_col1), count(q_col2) from query_table WHERE q_col1 > 0 INTERVAL(1s)", True, 3, ["first(ts)", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT first(ts), count(q_col1), count(q_col2) from query_table WHERE q_col1 > 0 GROUP BY q_col2", True, 3, ["first(ts)", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT first(ts), count(q_col1) from query_table WHERE q_col1 > 0 GROUP BY q_col2 HAVING count(q_col1) > 0", True, 2, ["first(ts)", "count(q_col1)"], False),
(" AS SELECT first(ts), count(q_col1) from query_table WHERE q_col1 > 0 GROUP BY q_col2 HAVING count(q_col1) > 0 ORDER BY q_col2", True, 2, ["first(ts)", "count(q_col1)"], False),
(" AS SELECT _tcurrent_ts, count(q_col1), count(q_col2) from query_table", True, 3, ["_tcurrent_ts", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT _tcurrent_ts, count(q_col1), count(q_col2) from query_table WHERE _tcurrent_ts > 1", True, 3, ["_tcurrent_ts", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT _twstart, count(q_col1), count(q_col2) from query_table WHERE _twstart > 1", True, 3, ["_twstart", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT _twend, count(q_col1), count(q_col2) from query_table WHERE _twend > 1", True, 3, ["_twend", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT _twduration, count(q_col1), count(q_col2) from query_table WHERE _twduration > 1", False, 3, ["_twduration", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT _twrownum, count(q_col1), count(q_col2) from query_table WHERE _twrownum > 1", False, 3, ["_twrownum", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT _tgrpid, count(q_col1), count(q_col2) from query_table WHERE _tgrpid > 1", False, 3, ["_tgrpid", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT _tlocaltime, count(q_col1), count(q_col2) from query_table WHERE _tlocaltime > 1", True, 3, ["_tlocaltime", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT _twstart, %%1, count(q_col1), count(q_col2) from query_table", True, 4, ["_twstart", "%%1", "count(q_col1)", "count(q_col2)"], True),
(" AS SELECT %%tbname, count(q_col1), count(q_col2) from query_table", False, 3, ["%%tbname", "count(q_col1)", "count(q_col2)"], False),
(" AS SELECT 1 from %%tbname", False, 1, ["1"], False),
(" AS SELECT q_col1 from %%tbname", False, 1, ["q_col1"], False),
(" AS SELECT _twstart, %%1 from %%tbname", True, 2, ["_twstart", "%%1"], True),
(" AS SELECT q_col1, q_col2 from %%trows", False, 2, ["q_col1", "q_col2"], False),
(" AS SELECT col1, col2 from %%trows", True, 2, ["col1", "col2"], False),
(" AS SELECT _twstart, q_col1, q_col2 from %%trows", False, 3, ["_twstart", "q_col1", "q_col2"], False),
("", True, 0, None, False)]
if_not_exists_opts = ["", " IF NOT EXISTS"]
db_name_list_valid = ["", "create_stream_db."]
db_name_list_invalid = ["non_exists_db."]
trigger_table_list_valid = ["trigger_table", "trigger_stable", "trigger_ctable"]
trigger_table_list_invalid = ["non_exists_table", ""]
# TODO(smj) : add different column_num and tag_num's table
into_option_list_valid = [
" INTO create_stream_db.new_table",
" INTO create_stream_db.exist_super_table",
" INTO create_stream_db.exist_normal_table",
" INTO new_table",
" INTO exist_super_table",
" INTO exist_normal_table",
""
]
into_option_list_invalid = [
" INTO non_exists_db.new_table",
" INTO exist_sub_table",
" INTO create_stream_db.exist_sub_table",
]
# trigger_table(col1 timestamp, col2 int, col3 int, col4 int, col5 int, col6 int)
# trigger_stable(col1 timestamp, col2 int, col3 int, col4 int, col5 int, col6 int) tags(tag1 int, tag2 int, tag3 int, tag4 int)
# exist_super_table(out_col1 timestamp, out_col2 int, out_col3 int) tags(out_tag1 int, out_tag2 int)
# exist_normal_table(out_col1 timestamp, out_col2 int, out_col3 int)
# query_table(ts timestamp, col1 int, col2 int, col3 int, col4 int, col5 int, col6 int)
session_column_valid = ["col1"]
session_column_invalid = ["col2", "col3", "col4", "col5", "col6", "ts_col", "tag1", "tag2", "tag3", "tag4"]
state_column_valid = ["col2", "col3", "col4", "col5", "col6"]
state_column_invalid = ["col1", "col7", "col8", "col9", "col10", "ts_col", "tag5", "tag6", "tag7", "tag8"]
event_count_column_valid = ["col1", "col2", "col3", "col4", "col5", "col6", "tag1", "tag2", "tag3", "tag4"]
event_count_column_invalid = ["col7", "col8", "col9", "col10", "ts_col", "tag5", "tag6", "tag7", "tag8"]
trigger_column_valid = ["col1", "col2", "col3", "col4", "col5", "col6"]
trigger_column_invalid = ["col7", "col8", "col9", "col10", "ts_col", "tag1", "tag2", "tag3", "tag4"]
trigger_tag_valid = ["tag1", "tag2", "tag3", "tag4"]
trigger_tag_invalid = ["tag5", "tag6", "tag7", "tag8", "col1", "col2", "col3", "col4", "col5", "col6"]
partition_columns_valid = ["tag1", "tag2", "tag3", "tag4"]#, "tbname"]
partition_columns_invalid = ["ts_col", "tag5", "tag6", "now"]
duration_lists_valid = [
"1b", "1u", "1a", "1s", "1m", "1h", "1d", "1w", "1n", "1y",
"2b", "2u", "2a", "2s", "2m", "2h", "2d", "2w", "2n", "2y",
"5b", "5u", "5a", "5s", "5m", "5h", "5d", "5w", "5n", "5y",
"12b", "12u", "12a", "12s", "12m", "12h", "12d", "12w", "12n", "12y",
"30b", "30u", "30a", "30s", "30m", "30h", "30d", "30w", "30n", "30y",
"1365b", "1365u", "1365a", "1365s", "1365m", "1365h", "1365d", "1365w", "1365n", "1365y"
]
duration_lists_invalid = [
"1x", "2x", "5x", "8x", "12x", "30x", "1365x",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
slide_lists_valid = [
"1a", "1s", "1m", "1h", "1d", "1w",
"2a", "2s", "2m", "2h", "2d", "2w",
"5a", "5s", "5m", "5h", "5d", "5w",
"12a", "12s", "12m", "12h", "12d", "12w",
"30a", "30s", "30m", "30h", "30d", "30w",
"1365a", "1365s", "1365m", "1365h", "1365d", "1365w",
]
slide_lists_invalid = [
"1x", "2x", "5x", "8x", "12x", "30x", "1365x",
"1b", "2b", "5b", "8b", "12b", "30b", "1365b",
"1u", "2u", "5u", "8u", "12u", "30u", "1365u",
"1n", "2n", "5n", "8n", "12n", "30n", "1365n",
"1y", "2y", "5y", "8y", "12y", "30y", "1365y",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
slide_offset_lists_valid = [
"1a", "1s", "1m", "1h",
"2a", "2s", "2m", "2h",
"5a", "5s", "5m", "5h",
"8a", "8s", "8m", "8h",
"12a", "12s", "12m", "12h",
"30a", "30s", "30m", "30h",
"1365a", "1365s", "1365m", "1365h",
]
slide_offset_lists_invalid = [
"1x", "2x", "5x", "8x", "12x", "30x", "1365x",
"1b", "2b", "5b", "8b", "12b", "30b", "1365b",
"1u", "2u", "5u", "8u", "12u", "30u", "1365u",
"1d", "2d", "5d", "8d", "12d", "30d", "1365d",
"1w", "2w", "5w", "8w", "12w", "30w", "1365w",
"1n", "2n", "5n", "8n", "12n", "30n", "1365n",
"1y", "2y", "5y", "8y", "12y", "30y", "1365y",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
session_lists_valid = [
"1a", "1s", "1m", "1h", "1d", "1w",
"2a", "2s", "2m", "2h", "2d", "2w",
"5a", "5s", "5m", "5h", "5d", "5w",
"8a", "8s", "8m", "8h", "8d", "8w",
"12a", "12s", "12m", "12h", "12d", "12w",
"30a", "30s", "30m", "30h", "30d", "30w",
"1365a", "1365s", "1365m", "1365h", "1365d", "1365w",
]
session_lists_invalid = [
"1x", "2x", "5x", "8x", "12x", "30x", "1365x",
"1n", "2n", "5n", "8n", "12n", "30n", "1365n",
"1y", "2y", "5y", "8y", "12y", "30y", "1365y",
"1b", "2b", "5b", "8b", "12b", "30b", "1365b",
"1u", "2u", "5u", "8u", "12u", "30u", "1365u",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
true_for_lists_valid = [
"1b", "1u", "1a", "1s", "1m", "1h", "1d", "1w",
"2b", "2u", "2a", "2s", "2m", "2h", "2d", "2w",
"5b", "5u", "5a", "5s", "5m", "5h", "5d", "5w",
"8b", "8u", "8a", "8s", "8m", "8h", "8d", "8w",
"12b", "12u", "12a", "12s", "12m", "12h", "12d", "12w",
"30b", "30u", "30a", "30s", "30m", "30h", "30d", "30w",
"1365b", "1365u", "1365a", "1365s", "1365m", "1365h", "1365d", "1365w",
]
true_for_lists_invalid = [
"1x", "2x", "5x", "8x", "12x", "30x", "1365x",
"1n", "2n", "5n", "8n", "12n", "30n", "1365n",
"1y", "2y", "5y", "8y", "12y", "30y", "1365y",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
expired_time_list_valid = [
"1a", "1s", "1m", "1h", "1d",
"2a", "2s", "2m", "2h", "2d",
"5a", "5s", "5m", "5h", "5d",
"8a", "8s", "8m", "8h", "8d",
"12a", "12s", "12m", "12h", "12d",
"30a", "30s", "30m", "30h", "30d",
"1365a", "1365s", "1365m", "1365h", "1365d",
]
expired_time_list_invalid = [
"1x", "2x", "5x", "8x", "12x", "30x", "1365x",
"1b", "2b", "5b", "8b", "12b", "30b", "1365b",
"1u", "2u", "5u", "8u", "12u", "30u", "1365u",
"1w", "2w", "5w", "8w", "12w", "30w", "1365w",
"1n", "2n", "5n", "8n", "12n", "30n", "1365n",
"1y", "2y", "5y", "8y", "12y", "30y", "1365y",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
# 10a - 13650d
period_time_list_valid = [
"10a", "10s", "10m", "10h", "10d",
"20a", "20s", "20m", "20h", "20d",
"120a", "120s", "120m", "120h", "120d",
"360a", "360s", "360m", "360h", "360d",
"820a", "820s", "820m", "820h", "820d",
"1440a", "1440s", "1440m", "1440h", "1440d",
"13650a", "13650s", "13650m", "13650h", "13650d"
]
period_time_list_invalid = [
"10x", "20x", "120x", "360x", "820x", "1440x", "13650x",
"10b", "20b", "120b", "360b", "820b", "1440b", "13650b",
"10u", "20u", "120u", "360u", "820u", "1440u", "13650u",
"10w", "20w", "120w", "360w", "820w", "1440w", "13650w",
"10n", "20n", "120n", "360n", "820n", "1440n", "13650n",
"10y", "20y", "120y", "360y", "820y", "1440y", "13650y",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
period_offset_list_valid = [
"1a", "1s", "1m", "1h",
"2a", "2s", "2m", "2h",
"5a", "5s", "5m", "5h",
"8a", "8s", "8m", "8h",
"12a", "12s", "12m", "12h",
"30a", "30s", "30m", "30h",
]
period_offset_list_invalid = [
"1x", "2x", "5x", "8x", "12x", "30x",
"1b", "2b", "5b", "8b", "12b", "30b",
"1u", "2u", "5u", "8u", "12u", "30u",
"1w", "2w", "5w", "8w", "12w", "30w",
"1n", "2n", "5n", "8n", "12n", "30n",
"1y", "2y", "5y", "8y", "12y", "30y",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
start_time_valid = [" '2025-05-27 14:29:42' ", " '1970-01-01 08:00:00' ",
" 1748327382161 ", " 1 "]
start_time_invalid = [" '2025-05-27 14:29:42:00' ", " '1970-01-01 08:00:00:00' ",
" 2025-05-27 14:29:42 ", " 1970-01-01 08:00:00 ",
" 2025-05-27 14:29:42:00 ", " 1970-01-01 08:00:00:00 ",
" 1748327382161:00 ", " 1:00 ", " '2025-05-27 14:29' ",
" '2025-05-27' ", " '2025-05' ", " '2025' ",
" '2025-05-27T14:29:42Z' ", " '2025-05-27T14:29Z' ",
" '2025-05-27T14Z' ", " '2025-05-27T' ",
" '2025-05T14:29:42Z' ", " '2025-05T14Z' ",
" '2025-05T' ", " '2025T14:29:42Z' ", " '2025T14Z' ",
" '2025T' ", "'invalid_time'", "'invalid_date'",
"'invalid_timestamp'", "'invalid_format'",
"'another_invalid_format'", "'yet_another_invalid_format'"]
event_types_valid = ["WINDOW_OPEN", "WINDOW_CLOSE"]
event_types_invalid = ["INVALID_EVENT", "ANOTHER_INVALID_EVENT"]
urls_valid = [" 'http://example.com/notify' ", " 'http://localhost:8000/callback' ", " 'https://api.test.com/hook' "]
urls_invalid = [" 'invalid_url' ", " http://example.com/invalid ", " 12345678 "]
notify_option_valid = ["NOTIFY_HISTORY", "ON_FAILURE_PAUSE"]
notify_option_invalid = ["NOTIFY_WHAT", "NOTIFY_INVALID"]
out_tag_type_valid = ["VARCHAR(30)"]
out_tag_type_invalid = ["INVALID_TYPE", "BIGINT", "INT"]
string_literals_valid = ["'_v1'", "'_2024'", "'_tag'", "'_out'", "'_ts'", "'_X'"]
string_literals_invalid = ["1", "2", "3", "4", "5", "6", "7", "8", "9", "10"]
numeric_literals = [str(i) for i in range(0, 10)]
arithmetic_ops = ['+', '-', '*', '/', '%']
numeric_func_names = ['abs', 'acos', 'cos', 'asin', 'sin', 'log', 'floor', 'ceil', 'round']
string_func_names = [
'concat', 'upper', 'lower', 'substr',
'replace', 'ltrim', 'rtrim', 'trim'
]
class TestStreamSynatx:
def setup_class(cls):
tdLog.debug(f"start to execute {__file__}")
def test_create_stream_syntax(self):
"""Stream syntax
1. Test create stream sql parser and syntax
2. Test different valid and invalid combinations of create stream syntax options
3. Verify the create stream results
Since: v3.3.3.7
Labels: common,ci,skip
Jira: None
History:
- 2025-5-13 Jing Sima Create Case
"""
self.createSnode()
self.createDatabase()
self.prepareTables()
self.createStream()
tdSql.pause()
def createSnode(self):
tdLog.info("create snode")
tdStream.createSnode(1)
def createDatabase(self):
tdLog.info(f"create database")
tdSql.prepare(dbname="create_stream_db", vgroups=1)
clusterComCheck.checkDbReady("create_stream_db")
def prepareTables(self):
# trigger_table(col1 timestamp, col2 int, col3 int, col4 int, col5 int, col6 int)
# trigger_stable(col1 timestamp, col2 int, col3 int, col4 int, col5 int, col6 int) tags(tag1 int, tag2 int, tag3 int, tag4 int)
# exist_super_table(out_col1 timestamp, out_col2 int, out_col3 int) tags(out_tag1 int, out_tag2 int)
# exist_normal_table(out_col1 timestamp, out_col2 int, out_col3 int)
# query_table(ts timestamp, col1 int, col2 int, col3 int, col4 int, col5 int, col6 int)
tdSql.execute("create table create_stream_db.trigger_table (col1 timestamp, col2 int, col3 int, col4 int, col5 int, col6 int);")
tdSql.execute("create table create_stream_db.trigger_stable (col1 timestamp, col2 int, col3 int, col4 int, col5 int, col6 int) tags(tag1 varchar(30), tag2 varchar(30), tag3 varchar(30), tag4 varchar(30));")
tdSql.execute("create table create_stream_db.trigger_ctable using create_stream_db.trigger_stable tags('1', '2', '3', '4');")
tdSql.execute("create table create_stream_db.exist_super_table (out_col1 timestamp, out_col2 bigint, out_col3 bigint) tags(out_tag1 varchar(30), out_tag2 varchar(30));")
tdSql.execute("create table create_stream_db.exist_normal_table (out_col1 timestamp, out_col2 bigint, out_col3 bigint);")
tdSql.execute("create table create_stream_db.exist_sub_table using create_stream_db.exist_super_table tags(1,2);")
tdSql.execute("create table create_stream_db.query_table (ts timestamp, q_col1 bigint, q_col2 bigint, q_col3 bigint, q_col4 bigint, q_col5 bigint, q_col6 bigint);")
def createStream(self):
tdSql.execute("use create_stream_db")
sql_list = gen_create_stream_variants()
for sql, valid, index in sql_list:
tdLog.info(f"create stream sql:{sql}, valid:{valid}")
if valid:
tdSql.execute(sql)
tdSql.execute(f"drop stream if exists create_stream_db.stream_{index}")
tdSql.execute(f"drop table if exists create_stream_db.new_table_{index}")
else:
tdSql.error(sql)
def random_from_list(lst, n=1):
"""Return n random elements from a list."""
if n == 1:
return random.choice(lst)
return random.sample(lst, n)
def random_bool(prob=0.5):
"""Return True with the given probability."""
return random.random() < prob
def random_int(a, b):
"""Return a random integer between a and b, inclusive."""
return random.randint(a, b)
def generate_arithmetic_expr(column_list):
left = random_from_list(column_list)
right = random_from_list([str(random_int(1, 100))])
operator = random_from_list(arith_ops)
return f"({left} {operator} {right})"
def generate_atomic_condition(full_list=None, valid_list=None, valid=True):
if valid:
if valid_list is None or len(valid_list) == 0:
column_list = ["1", "2", "3"]
else:
column_list = valid_list
else:
column_list = list(set(full_list) - set(valid_list))
left_expr = (
generate_arithmetic_expr(column_list) if random_bool(0.3) else random_from_list(column_list)
)
op = random_from_list(ops)
right_expr = (
generate_arithmetic_expr(column_list) if random_bool(0.3) else random_from_list(column_list + [str(random_int(1, 100))])
)
return f"{left_expr} {op} {right_expr}"
def generate_logical_condition(max_depth=2, current_depth=0, full_column_list=None, valid_column_list=None, valid=True):
if current_depth >= max_depth or random_bool(0.4):
return generate_atomic_condition(full_column_list, valid_column_list, valid)
else:
left = generate_logical_condition(max_depth, current_depth + 1, full_column_list, valid_column_list, valid)
right = generate_logical_condition(max_depth, current_depth + 1, full_column_list, valid_column_list, valid)
op = random_from_list(logic_ops)
return f"({left} {op} {right})"
def random_from_combined(valid_list, invalid_list):
if random.random() < 0.2 and invalid_list:
return random.choice(invalid_list), False
return random.choice(valid_list), True
unit_to_ns = {
"b": 1,
"u": 1_000,
"a": 1_000_000,
"s": 1_000_000_000,
"m": 60 * 1_000_000_000,
"h": 3600 * 1_000_000_000,
"d": 86400 * 1_000_000_000,
"w": 7 * 86400 * 1_000_000_000,
"n": 31 * 86400 * 1_000_000_000,
"y": 365 * 86400 * 1_000_000_000
}
def duration_to_nanoseconds(duration_str):
match = re.fullmatch(r"(\d+)([a-zA-Z])", duration_str)
if not match:
return 0
value = int(match.group(1))
unit = match.group(2)
if unit not in unit_to_ns:
return 0
return value * unit_to_ns[unit]
def generate_trigger_section(each_window_count=200):
triggers = []
# SESSION
max_session_count = each_window_count
for _ in range(0, max_session_count + 1):
col, v1 = random_from_combined(session_column_valid, session_column_invalid)
dur, v2 = random_from_combined(session_lists_valid, session_lists_invalid)
triggers.extend([
(f" SESSION({col}) ", False),
(f" SESSION({col}, '{dur}') ", v1 and v2),
(f" SESSION({col}, {dur}) ", v1 and v2),
])
# STATE_WINDOW
max_state_count = each_window_count
for _ in range(0, max_state_count + 1):
col, v1 = random_from_combined(state_column_valid, state_column_invalid)
dur, v2 = random_from_combined(true_for_lists_valid, true_for_lists_invalid)
triggers.extend([
(f" STATE_WINDOW({col}) ", v1),
(f" STATE_WINDOW({col}) TRUE_FOR('{dur}') ", v1 and v2),
])
# INTERVAL + SLIDING
max_sliding_count = each_window_count
for _ in range(0, max_sliding_count + 1):
interval, v1 = random_from_combined(duration_lists_valid, duration_lists_invalid)
offset, v2 = random_from_combined(duration_lists_valid, duration_lists_invalid)
slide, v3 = random_from_combined(slide_lists_valid, slide_lists_invalid)
slide_offset, v4 = random_from_combined(slide_offset_lists_valid, slide_offset_lists_invalid)
interval_val = duration_to_nanoseconds(interval) if interval else None
offset_val = duration_to_nanoseconds(offset) if offset else None
slide_val = duration_to_nanoseconds(slide) if slide else None
slide_offset_val = duration_to_nanoseconds(slide_offset) if slide_offset else None
if interval_val and offset_val:
v5 = interval_val > offset_val
else:
v5 = True
if slide_val and slide_offset_val:
v6 = slide_val > slide_offset_val
else:
v6 = True
if interval_val and slide_val:
v7 = interval_val >= slide_val
else:
v7 = True
if 'y' in interval and 'n' in slide:
v8 = False
else:
v8 = True
if interval_val and slide_val and not (('y' in interval or 'n' in interval) and 'n' not in slide and 'y' not in slide):
v9 = interval_val / slide_val < 100
else:
v9 = True
if 'n' in interval and 'y' in offset:
v10 = False
else:
v10 = True
is_all_valid = v1 and v2 and v3 and v4 and v5 and v6 and v7 and v8 and v9 and v10
int_part = f" INTERVAL('{interval}') "
int_part_with_offset = f" INTERVAL('{interval}', '{offset}') "
slide_part = f" SLIDING('{slide}') "
slide_part_with_offset = f" SLIDING('{slide}', '{slide_offset}') "
triggers.extend([
(slide_part, v3),
(slide_part_with_offset, v3 and v4 and v6),
(f" {int_part} {slide_part} ", v1 and v3 and v7 and v8 and v9),
(f" {int_part} {slide_part_with_offset} ", v1 and v3 and v4 and v6 and v7 and v8 and v9),
(f" {int_part_with_offset} {slide_part} ", v1 and v2 and v3 and v5 and v7 and v8 and v9 and v10),
(f" {int_part_with_offset} {slide_part_with_offset} ", is_all_valid)
])
# EVENT_WINDOW
max_event_count = each_window_count
for _ in range(0, max_event_count + 1):
start_valid = random_bool(0.8)
end_valid = random_bool(0.8)
start = generate_logical_condition(full_column_list=event_count_column_valid + event_count_column_invalid, valid_column_list=event_count_column_valid, valid=start_valid)
end = generate_logical_condition(full_column_list=event_count_column_valid + event_count_column_invalid, valid_column_list=event_count_column_valid, valid=end_valid)
triggers.append((f" EVENT_WINDOW(START WITH {start} END WITH {end}) ", start_valid and end_valid))
# COUNT_WINDOW
max_count_count = each_window_count
for _ in range(0, max_count_count + 1):
count = random_from_list([5, 10, 20])
slide = random_from_list([None, 10, 20])
col_length = random_int(0, 5)
if random_bool(0.3):
valid = False
count_columns = random.choices(event_count_column_invalid, k=col_length + 1)
else:
valid = True
count_columns = random.choices(event_count_column_valid, k=col_length)
if slide is not None and slide > count:
valid1 = False
else:
valid1 = True
parts = [str(count)]
if slide is not None:
parts.append(str(slide))
for cols in count_columns:
parts.append(cols)
triggers.append((f" COUNT_WINDOW({', '.join(parts)}) ", valid and valid1))
# PERIOD
max_period_count = each_window_count
for _ in range(0, max_period_count + 1):
period, v1 = random_from_combined(period_time_list_valid, period_time_list_invalid)
offset, v2 = random_from_combined(period_offset_list_valid, period_offset_list_invalid)
period_val = duration_to_nanoseconds(period) if period else None
offset_val = duration_to_nanoseconds(offset) if offset else None
if period_val and offset_val:
v3 = period_val > offset_val
else:
v3 = True
triggers.extend([
(f" PERIOD('{period}') ", v1),
(f" PERIOD('{period}', '{offset}') ", v1 and v2 and v3)
])
return triggers
def generate_random_event_types(valid=True):
if valid:
types = [random_from_list(event_types_valid) for _ in range(random_int(1, 2))]
else:
types = [random_from_list(event_types_invalid)] + [random_from_list(event_types_valid) for _ in range(random_int(1, 2))]
return "|".join(types)
def random_option(valid=True, max_options=3, trigger_has_tag=False):
if max_options == 0:
return []
if valid:
watermark_duration = random_from_list(duration_lists_valid)
expired_time = random_from_list(expired_time_list_valid)
start_time = random_from_list(start_time_valid)
max_delay = random_from_list(expired_time_list_valid)
else:
watermark_duration = random_from_list(duration_lists_invalid)
expired_time = random_from_list(expired_time_list_invalid)
start_time = random_from_list(start_time_invalid)
max_delay = random_from_list(expired_time_list_invalid)
prev_filter = generate_logical_condition(
full_column_list = trigger_column_valid + trigger_column_invalid,
valid_column_list = trigger_column_valid + trigger_tag_valid if trigger_has_tag else trigger_column_valid,
valid = valid)
if valid:
option_type = random.sample([
lambda: f"WATERMARK({watermark_duration})",
lambda: f"EXPIRED_TIME({expired_time})",
lambda: "IGNORE_DISORDER",
lambda: "DELETE_RECALC",
lambda: "DELETE_OUTPUT_TABLE",
lambda: f"FILL_HISTORY({start_time})",
lambda: f"FILL_HISTORY_FIRST({start_time})",
lambda: "CALC_NOTIFY_ONLY",
lambda: "LOW_LATENCY_CALC",
lambda: f"PRE_FILTER({prev_filter})",
lambda: "FORCE_OUTPUT",
lambda: f"MAX_DELAY({max_delay})",
lambda: f"EVENT_TYPE({generate_random_event_types(valid)})"
],
max_options)
else:
option_type = random.sample([
lambda: f"WATERMARK({watermark_duration})",
lambda: f"EXPIRED_TIME({expired_time})",
lambda: f"FILL_HISTORY({start_time})",
lambda: f"FILL_HISTORY_FIRST({start_time})",
lambda: f"PRE_FILTER({prev_filter})",
lambda: f"MAX_DELAY({max_delay})",
lambda: f"EVENT_TYPE({generate_random_event_types(valid)})"
],
max_options)
return [f() for f in option_type]
def generate_options_section(max_options=10, trigger_has_tag = False):
stream_options = random_option(valid=True, trigger_has_tag=trigger_has_tag, max_options=max_options)
rand_val = random.random()
if rand_val < 0.2:
# 20% chance to generate empty stream_options clause
return "", True
elif rand_val < 0.3:
# 10% chance to generate invalid stream_options clause
stream_options = stream_options + random_option(valid=False, trigger_has_tag=trigger_has_tag, max_options=1)
valid = False
else:
# 70% chance to generate valid stream_options clause
valid = True
combined = '|'.join(stream_options)
# FILL_HISTORY and FILL_HISTORY_FIRST cannot be used together
if "FILL_HISTORY(" in combined and "FILL_HISTORY_FIRST(" in combined:
valid = False
return f" stream_options({combined}) ", valid
def pick_random_combo(source_list, max_len):
if max_len == 0:
return []
length = random_int(1, max_len)
return [random_from_list(source_list) for _ in range(length)] if length > 0 else []
def random_expr_atom(column_list=None, valid=True):
string_literals = string_literals_valid if valid else string_literals_invalid
return random.choices(
population=column_list + string_literals,
weights=[7] * len(column_list) + [3] * len(string_literals),
k=1
)[0]
def random_numeric_atom(column_list=None):
return random.choices(
population=column_list + numeric_literals,
weights=[7] * len(column_list) + [3] * len(numeric_literals),
k=1
)[0]
def gen_string_func(func, expr=None, invalid_col_list=None, valid_col_list=None, valid=True):
if valid:
atom_expr = random_expr_atom(valid_col_list, valid)
string_literals = string_literals_valid
else:
atom_expr = random_expr_atom(invalid_col_list, valid)
string_literals = string_literals_invalid
if func == 'concat':
args = [random_expr_atom(valid_col_list if valid else invalid_col_list, valid) for _ in range(random_int(2, 4))]
return f"concat({', '.join(args)})"
elif func == 'upper':
return f"upper({expr or atom_expr})"
elif func == 'lower':
return f"lower({expr or atom_expr})"
elif func == 'substr':
expr = expr or atom_expr
start = str(random_int(0, 3))
length = str(random_int(1, 5))
return f"substr({expr}, {start}, {length})"
elif func == 'replace':
expr = expr or atom_expr
search = random_from_list(string_literals)
repl = random_from_list(string_literals)
return f"replace({expr}, {search}, {repl})"
elif func == 'ltrim':
return f"ltrim({expr or atom_expr})"
elif func == 'rtrim':
return f"rtrim({expr or atom_expr})"
elif func == 'trim':
return f"trim({expr or atom_expr})"
else:
raise ValueError(f"Unknown string func: {func}")
def gen_numeric_expr(depth=0, max_depth=3, invalid_col_list=None, valid_col_list=None, valid=True):
if depth >= max_depth or random_bool(0.3):
return random_numeric_atom(valid_col_list if valid else invalid_col_list)
if random_bool(0.4):
# function
func = random_from_list(numeric_func_names)
return gen_numeric_func(func, gen_numeric_expr(depth + 1, max_depth, invalid_col_list, valid_col_list, valid))
else:
# operators
left = gen_numeric_expr(depth + 1, max_depth, invalid_col_list, valid_col_list, valid)
op = random_from_list(arithmetic_ops)
right = gen_numeric_expr(depth + 1, max_depth, invalid_col_list, valid_col_list, valid)
return f"({left} {op} {right})"
def gen_numeric_func(func, expr=None, invalid_col_list=None, valid_col_list=None, valid=True):
expr = expr or gen_numeric_expr(2, 3, invalid_col_list, valid_col_list, valid)
return f"{func}({expr})"
def gen_string_expr(depth=0, max_depth=3, invalid_col_list=None, valid_col_list=None, valid=True):
if depth >= max_depth or random_bool(0.3):
return random_expr_atom(valid_col_list if valid else invalid_col_list, valid)
func = random_from_list(string_func_names)
inner = gen_string_expr(depth + 1, max_depth, invalid_col_list, valid_col_list, valid)
return gen_string_func(func, inner, invalid_col_list, valid_col_list, valid)
def generate_tag_expr(max_depth=3, invalid_col_list=None, valid_col_list=None, valid=True):
if random_bool(1):
# generate string type expression
return gen_string_expr(0, max_depth, invalid_col_list, valid_col_list, valid)
else:
# generate numeric type expression
return gen_numeric_expr(0, max_depth, invalid_col_list, valid_col_list, valid)
def generate_column_section_base(out_col_list, out_col_num=6, into_exist=False, valid=True):
out_col_num = 6 if out_col_num == 0 else out_col_num
if valid:
pk_index = 1
if into_exist:
selected = out_col_list[:out_col_num]
with_primary = False
else:
selected = random.sample(out_col_list, out_col_num)
with_primary = random_bool(0.5)
else:
pk_index = random_from_list([i for i in range(0, out_col_num) if i != 1])
if into_exist:
col_num = random_int(1, out_col_num)
selected = random.choices(out_col_list, k=col_num)
with_primary = random_bool(0.5)
else:
col_num = random_from_list([i for i in range(1, out_col_num + 3) if i != out_col_num])
selected = random.choices(out_col_list, k=col_num)
with_primary = random_bool(0.5)
col_defs = []
for i, col in enumerate(selected):
if i == pk_index and with_primary:
col_defs.append(f"{col} PRIMARY KEY")
else:
col_defs.append(col)
return f" ({', '.join(col_defs)}) "
def random_string(length=5):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def generate_random_stream_db_section():
if random_bool(0.9):
# Generate a valid database name
dbname = random_from_list(db_name_list_valid)
return f" {dbname}", True
else:
# Generate an invalid database name
dbname = random_from_list(db_name_list_invalid)
return f" {dbname}", False
# return a tuple (trigger_table, is_valid, trigger_null, has_tag)
def generate_random_trigger_table_section():
if random_bool(0.2):
# Do not generate a trigger table
return "", True, True, False
else :
if random_bool(0.9):
# Generate a valid database name
dbname = random_from_list(db_name_list_valid)
if random_bool():
# Generate a valid trigger table name
trigger_table = random_from_list(trigger_table_list_valid)
if trigger_table == "trigger_table":
return f" FROM {dbname}{trigger_table} ", True, False, False
else :
return f" FROM {dbname}{trigger_table} ", True, False, True
else:
# Generate an invalid trigger table name
trigger_table = random_from_list(trigger_table_list_invalid)
return f" FROM {dbname}{trigger_table} ", False, False, False
else:
# Generate an invalid database name
dbname = random_from_list(db_name_list_invalid)
if random_bool():
# Generate a valid trigger table name
trigger_table = random_from_list(trigger_table_list_valid)
return f" FROM {dbname}{trigger_table} ", False, False, False
else:
# Generate an invalid trigger table name
trigger_table = random_from_list(trigger_table_list_invalid)
return f" FROM {dbname}{trigger_table} ", False, False, False
def generate_random_into_table_section(stream_index=0):
if random_bool(0.9):
# Generate a valid into table section
into_table = random_from_list(into_option_list_valid)
if into_table == "":
return "", True, True, False, False
elif "new_table" in into_table:
into_table += f"_{stream_index}"
return f" {into_table} ", True, False, False, False
else:
return f" {into_table} ", True, False, "exist" in into_table, "super_table" in into_table
else:
# Generate an invalid into table section
into_table = random_from_list(into_option_list_invalid)
return f" {into_table} ", False, False, False, False
def generate_random_partition_section(max_partition_len = 3, trigger_null = False, trigger_has_tag = False):
rand_val = random.random()
if trigger_null:
if random_bool(0.1):
# 20% chance to generate invalid partition clause
selected = pick_random_combo(partition_columns_valid, max_partition_len)
return f" PARTITION BY {', '.join(selected)} ", False, selected
else:
return "", True, None
if trigger_has_tag:
valid_columns = partition_columns_valid
invalid_columns = partition_columns_invalid
else:
valid_columns = ["tbname"]
invalid_columns = list(set(partition_columns_valid + partition_columns_invalid) - set(valid_columns))
# 20% chance to generate empty partition clause
if rand_val < 0.2:
return "", True, None
# 30% chance to generate invalid partition clause
elif rand_val < 0.4:
selected = random.sample(valid_columns, min(max_partition_len , len(valid_columns))) + random.sample(invalid_columns, 1)
return f" PARTITION BY {', '.join(selected)} ", False, selected
# 50% chance to generate valid partition clause
else:
selected = random.sample(valid_columns, min(max_partition_len, len(valid_columns)))
return f" PARTITION BY {', '.join(selected)} ", True, selected
def generate_random_notif_def_section(
max_urls=2, max_events=2, max_options=2, max_condition_depth=2, query_cols = None
):
# Each part has 20% chance to be invalid, and 80% chance to be valid. And total's invalid change is also 20%.
valid = True
# optional NOTIFY(url [, ...])
if random_bool(0.1):
# 20% chance to generate invalid NOTIFY clause
if random_bool(0.5):
notify_urls = ""
else:
# pick at least one URL, include invalid URLs
notify_url = pick_random_combo(urls_valid, max_urls - 1) + pick_random_combo(urls_invalid, 1)
notify_urls = f" NOTIFY({', '.join(notify_url)}) "
valid = False
else:
# 80% chance to generate valid NOTIFY clause
# pick at least one URL
notify_url = pick_random_combo(urls_valid, max_urls)
notify_urls = f" NOTIFY({', '.join(notify_url)}) "
# optional ON (event_types)
if valid == True and random_bool(0.1):
# 20% chance to generate invalid NOTIFY_OPTIONS clause
notify_event = pick_random_combo(event_types_valid, max_events - 1) + pick_random_combo(event_types_invalid, 1)
notify_events = f" ON ({'|'.join(notify_event)}) "
valid = False
else:
# 80% chance to generate valid NOTIFY_OPTIONS clause
if random_bool(0.2):
# 20% chance to generate empty ON clause
notify_events = ""
else:
selected_events = pick_random_combo(event_types_valid, max_events)
notify_events = f" ON ({'|'.join(selected_events)}) "
# optional WHERE condition (using generate_logical_condition)
if valid == True and random_bool(0.1):
# 20% chance to generate invalid WHERE clause
if query_cols is None:
query_cols = ["col1", "col2", "col3"]
condition = generate_logical_condition(max_depth=max_condition_depth, full_column_list=partition_columns_valid, valid_column_list=query_cols, valid=False)
notify_conditions = f" WHERE {condition} "
valid = False
else:
# 80% chance to generate valid WHERE clause
if random_bool(0.2) or query_cols is None:
# 20% chance to generate empty WHERE clause
notify_conditions = ""
else:
condition = generate_logical_condition(max_depth=max_condition_depth, full_column_list=partition_columns_valid, valid_column_list=query_cols, valid=True)
notify_conditions = f" WHERE {condition} "
# optional NOTIFY_OPTIONS(...)
if valid == True and random_bool(0.1):
# 20% chance to generate invalid NOTIFY_OPTIONS clause
notify_option = pick_random_combo(notify_option_valid, max_options - 1) + pick_random_combo(notify_option_invalid, 1)
notify_options = f" NOTIFY_OPTIONS({'|'.join(notify_option)}) "
valid = False
else:
# 80% chance to generate valid NOTIFY_OPTIONS clause
if random_bool(0.2):
# 20% chance to generate empty NOTIFY_OPTIONS clause
notify_options = ""
else:
# pick at least one option
notify_option = pick_random_combo(notify_option_list, max_options)
notify_options = f" NOTIFY_OPTIONS({'|'.join(notify_option)}) "
if notify_urls == "" and (notify_events != "" or notify_conditions != "" or notify_options != ""):
# If URLs are empty, but other parts are not, it's invalid
valid = False
return notify_urls + notify_events + notify_conditions + notify_options, valid
def generate_random_column_list_section(out_col_num=1, into_exist=False, into_null=False):
random_value = random.random()
if into_null:
if random_bool(0.8):
# 80% chance to generate empty column list
return "", True
else:
# 20% chance to generate invalid column list
return generate_column_section_base(out_columns, out_col_num=out_col_num, into_exist=into_exist, valid=True), False
if random_value < 0.2:
# 20% chance to generate empty column list
return "", True
elif random_value < 0.3:
# 20% chance to generate invalid column list
return generate_column_section_base(out_columns, out_col_num, into_exist=into_exist, valid=False), False
else:
return generate_column_section_base(out_columns, out_col_num, into_exist=into_exist, valid=True), True
def generate_random_output_subtable(max_depth=3, partition_list=None, into_null=False):
valid = True
if into_null:
if random_bool(0.8):
return "", True
else:
valid = False
if partition_list is None:
if random_bool(0.8):
return "", True
else:
valid = False
valid_list = partition_list if partition_list else partition_columns_valid
invalid_list = list(set(partition_columns_valid + partition_columns_invalid) - set(valid_list))
if random_bool(0.1):
# 20% chance to generate invalid OUTPUT_SUBTABLE
expr = gen_string_expr(0, max_depth, invalid_list, valid_list, False)
valid = False
else:
# 80% chance to generate valid OUTPUT_SUBTABLE
expr = gen_string_expr(0, max_depth, invalid_list, valid_list, True)
return f" OUTPUT_SUBTABLE({expr}) ", valid
def generate_random_tags_clause(partition_list=None, allow_comment=True, into_exist=False, into_stable=False, into_null=False):
valid = True
if partition_list == None or (into_exist and not into_stable) or into_null:
if random_bool(0.8):
return "", True
else:
if partition_list is None:
partition_list = partition_columns_invalid
valid = False
if random_bool(0.1):
return "", True
if random_bool(0.1) and valid and into_exist and into_stable:
num_tags = random_from_list([i for i in range(1, len(out_tags)) if i != 2])
valid = False
else:
# 80% chance to generate valid TAGS clause
num_tags = 2
if random_bool(0.1) and valid and into_exist:
selected_tags = random.sample(out_tags, num_tags - 1)
valid = False
else:
selected_tags = out_tags[:num_tags]
if random_bool(0.1) and valid:
# 20% chance to generate invalid tag type
out_type_list = out_tag_type_invalid
valid = False
else:
out_type_list = out_tag_type_valid
if random_bool(0.1) and valid:
# 20% chance to generate invalid tag type
gen_valid_expr = False
valid = False
else:
gen_valid_expr = True
tag_defs = []
for tag in selected_tags:
type_name = random_from_list(out_type_list)
comment_str = f" COMMENT '{random_string(6)}'" if allow_comment and random_bool(0.5) else ""
valid_list = partition_list
invalid_list = list(set(partition_columns_valid + partition_columns_invalid) - set(valid_list))
expr = generate_tag_expr(max_depth=2, invalid_col_list=invalid_list, valid_col_list=valid_list, valid=gen_valid_expr)
tag_defs.append(f"{tag} {type_name}{comment_str} AS {expr}")
return f" TAGS ({', '.join(tag_defs)}) ", valid
def gen_create_stream_variants():
base_template = "CREATE STREAM{if_not_exists} {stream_name}{stream_options}{into_clause}{output_subtable}{columns}{tags}{as_subquery};"
sql_variants = []
stream_index = 0
for trigger_type, v0 in generate_trigger_section(500):
for as_subquery, v1, out_col_num, query_col_list, with_ph_column in as_subquery_opts:
stream_db, v2 = generate_random_stream_db_section()
trigger_table, v3, trigger_null, trigger_has_tag = generate_random_trigger_table_section()
into_table, v4, into_null, into_exist, into_stable = generate_random_into_table_section(stream_index)
partition, v5, partition_cols = generate_random_partition_section(trigger_null = trigger_null, trigger_has_tag = trigger_has_tag)
stream_opt, v6 = generate_options_section(trigger_has_tag=trigger_has_tag)
notify_opt, v7 = generate_random_notif_def_section(query_cols=query_col_list)
output, v8 = generate_random_output_subtable(partition_list=partition_cols, into_null=into_null)
column, v9 = generate_random_column_list_section(out_col_num=out_col_num, into_exist=into_exist, into_null=into_null)
tag, v10 = generate_random_tags_clause(partition_list=partition_cols, into_exist=into_exist, into_stable=into_stable, into_null=into_null)
if trigger_null and "PERIOD" not in trigger_type:
v11 = False
else:
v11 = True
if out_col_num != 3 and into_exist:
v12 = False
else:
v12 = True
if notify_opt == "" and out_col_num == 0:
v13 = False
else:
v13 = True
if partition == "" and into_exist and into_stable:
v14 = False
elif partition == "" and into_exist and not into_stable:
v14 = True
elif partition != "" and into_exist and into_stable:
v14 = True
elif partition != "" and into_exist and not into_stable:
v14 = False
else:
v14 = True
if partition == "" and with_ph_column:
v15 = False
else:
v15 = True
if "%%tbname" in as_subquery:
if partition == "":
v16 = False
elif "tbname" not in partition:
v16 = False
else:
v16 = True
else:
v16 = True
if "_twstart" in as_subquery or "_twend" in as_subquery or "_twduration" in as_subquery or "_twrownum" in as_subquery:
if "PERIOD" in trigger_type or ("INTERVAL" not in trigger_type and "SLIDING" in trigger_type):
v17 = False
else:
v17 = True
else:
v17 = True
if "_tprev_ts" in as_subquery or "_tcurrent_ts" in as_subquery or "_tnext_ts" in as_subquery:
if "SLIDING" not in trigger_type:
v18 = False
else:
v18 = True
else:
v18 = True
if "_tprev_localtime" in as_subquery or "_tnext_localtime" in as_subquery:
if "PERIOD" not in trigger_type:
v19 = False
else:
v19 = True
else:
v19 = True
if into_null:
if as_subquery == "" and notify_opt != "":
v20 = True
elif as_subquery != "" and "CALC_NOTIFY_ONLY" in stream_opt:
v20 = True
else:
v20 = False
else:
v20 = True
if tag == "" and into_exist and into_stable and partition != "":
if len(partition_cols) != 2:
v21 = False
else:
v21 = True
else:
v21 = True
if not into_null and as_subquery == "":
v22 = False
else:
v22 = True
if column == "" and into_exist and as_subquery != "":
v23 = False
else:
v23 = True
if ('EVENT' in trigger_type or 'COUNT' in trigger_type) and 'tag' in trigger_type:
if 'trigger_stable' in trigger_table or 'trigger_ctable' in trigger_table:
v24 = True
else:
v24 = False
else:
v24 = True
if trigger_null and ('%%trows' in as_subquery or '%%tbname' in as_subquery):
v25 = False
else:
v25 = True
if trigger_null and 'PRE_FILTER' in stream_opt:
v26 = False
else:
v26 = True
valid = (v0 and v1 and v2 and v3 and v4 and v5 and v6 and v7 and v8 and v9 and
v10 and v11 and v12 and v13 and v14 and v15 and v16 and v17 and v18 and v19 and
v20 and v21 and v22 and v23 and v24 and v25 and v26)
print(f"stream_index: {stream_index}, v0: {v0}, v1: {v1}, v2: {v2}, v3: {v3}, v4: {v4}, "
f"v5: {v5}, v6: {v6}, v7: {v7}, v8: {v8}, v9: {v9}, v10: {v10}, v11: {v11}, "
f"v12: {v12}, v13: {v13}, v14: {v14}, v15: {v15}, v16: {v16}, v17: {v17}, "
f"v18: {v18}, v19: {v19}, v20: {v20}, v21: {v21}, v22: {v22}, v23: {v23}, "
f"v24: {v24}, v25: {v25}, v26: {v26}, valid: {valid}")
sql = base_template.format(
if_not_exists=random_from_list(if_not_exists_opts),
stream_name=stream_db + "stream_" + str(stream_index),
stream_options=trigger_type + trigger_table + partition + stream_opt + notify_opt,
into_clause=into_table,
output_subtable=output,
columns=column,
tags= tag,
as_subquery=as_subquery
)
sql_variants.append((sql.strip(), valid, stream_index))
stream_index += 1
return sql_variants