TDengine/test/cases/13-StreamProcessing/07-SubQuery/test_create_stream_syntax.py

671 lines
No EOL
28 KiB
Python

import random
from itertools import product
import string
# Common time units
duration_lists = [
"1b", "1u", "1a", "1s", "1m", "1h", "1d", "1w", "1n", "1y",
"2b", "2u", "2a", "2s", "2m", "2h", "2d", "2w", "2n", "2y",
"5b", "5u", "5a", "5s", "5m", "5h", "5d", "5w", "5n", "5y",
"7b", "7u", "7a", "7s", "7m", "7h", "7d", "7w", "7n", "7y",
"12b", "12u", "12a", "12s", "12m", "12h", "12d", "12w", "12n", "12y",
"30b", "30u", "30a", "30s", "30m", "30h", "30d", "30w", "30n", "30y",
"365b", "365u", "365a", "365s", "365m", "365h", "365d", "365w", "365n", "365y"
]
columns = ["ts_col", "col1", "col2", "tag1", "tag2", "tag3"]
partition_columns = ["ts_col", "tag1", "tag2", "tag3", "tag4", "tbname"]
placeholders = ["_tcurrent_ts", "_twstart", "_twend", "_twduration", "_twrownum", "_tgrpid", "_tlocaltime", "%%1", "%%2", "%%3", "%%tbname", "%%trows"]
out_columns = ["ts_col", "col1", "col2", "col3", "col4", "col5", "col6"]
out_tags = ["tag1", "tag2", "tag3", "tag4", "tag5"]
counts = [10, 100]
slidings = [1, 5]
event_types = ["WINDOW_OPEN", "WINDOW_CLOSE"]
ops = ["=", "<>", "!=", ">", "<", ">=", "<="]
arith_ops = ["+", "-", "*", "/"]
logic_ops = ["AND", "OR"]
timestamps = [" '2020-01-01T00:00:00Z' ", " '2020-01-02T00:00:00Z' "]
event_types_pool = ["WINDOW_OPEN", "WINDOW_CLOSE"]
urls = [" 'http://example.com/notify' ", " 'http://localhost:8000/callback' ", " 'https://api.test.com/hook' "]
notify_option_list = ["NOTIFY_HISTORY", "ON_FAILURE_PAUSE"]
into_option_list = [
" INTO create_stream_db.new_table",
" INTO create_stream_db.exist_super_table",
" INTO create_stream_db.exist_sub_table",
" INTO create_stream_db.exist_normal_table",
" INTO non_exists_db.new_table",
" INTO create_stream_db.exist_super_table",
" INTO new_table",
" INTO exist_super_table",
" INTO exist_sub_table",
" INTO exist_normal_table",
""
]
as_subquery_opts = [" AS SELECT * FROM query_table",
" AS SELECT first(ts), avg(col1) from query_table",
" AS SELECT first(ts), avg(col1) from query_table WHERE col1 > 0",
" AS SELECT first(ts), avg(col1) from query_table WHERE col1 > 0 INTERVAL(1s)",
" AS SELECT first(ts), avg(col1) from query_table WHERE col1 > 0 GROUP BY col2",
" AS SELECT first(ts), avg(col1) from query_table WHERE col1 > 0 GROUP BY col2 HAVING avg(col1) > 0",
" AS SELECT first(ts), avg(col1) from query_table WHERE col1 > 0 GROUP BY col2 HAVING avg(col1) > 0 ORDER BY col2",
" AS SELECT _tcurrent_ts, avg(col1), sum(col2) from query_table",
" AS SELECT _tcurrent_ts, avg(col1), sum(col2) from query_table WHERE _tcurrent_ts > 1",
" AS SELECT _twstart, avg(col1), sum(col2) from query_table WHERE _twstart > 1",
" AS SELECT _twend, avg(col1), sum(col2) from query_table WHERE _twend > 1",
" AS SELECT _twduration, avg(col1), sum(col2) from query_table WHERE _twduration > 1",
" AS SELECT _twrownum, avg(col1), sum(col2) from query_table WHERE _twrownum > 1",
" AS SELECT _tgrpid, avg(col1), sum(col2) from query_table WHERE _tgrpid > 1",
" AS SELECT _tlocaltime, avg(col1), sum(col2) from query_table WHERE _tlocaltime > 1",
" AS SELECT %%1, avg(col1), sum(col2) from query_table",
" AS SELECT %%1, %%4, avg(col1), sum(col2) from query_table",
" AS SELECT %%tbname, avg(col1), sum(col2) from query_table",
" AS SELECT %%1 from %%tbname",
" AS SELECT %%1, %%2 from %%trows",
" AS SELECT col1, col2 from %%trows",
""]
if_not_exists_opts = ["", " IF NOT EXISTS"]
db_name_list_valid = ["", "create_stream_db."]
db_name_list_invalid = ["non_exists_db."]
trigger_table_list_valid = ["trigger_table", "trigger_stable", "trigger_ctable"]
trigger_table_list_invalid = ["non_exists_table", ""]
into_option_list_valid = [
" INTO create_stream_db.new_table",
" INTO create_stream_db.exist_super_table",
" INTO create_stream_db.exist_sub_table",
" INTO create_stream_db.exist_normal_table",
" INTO new_table",
" INTO exist_super_table",
" INTO exist_sub_table",
" INTO exist_normal_table",
""
]
into_option_list_invalid = [
" INTO non_exists_db.new_table",
]
partition_columns_valid = ["tag1", "tag2", "tag3", "tag4", "tbname"]
partition_columns_invalid = ["ts_col", "tag5", "tag6", "now"]
duration_lists_valid = [
"1b", "1u", "1a", "1s", "1m", "1h", "1d", "1w", "1n", "1y",
"2b", "2u", "2a", "2s", "2m", "2h", "2d", "2w", "2n", "2y",
"5b", "5u", "5a", "5s", "5m", "5h", "5d", "5w", "5n", "5y",
"7b", "7u", "7a", "7s", "7m", "7h", "7d", "7w", "7n", "7y",
"12b", "12u", "12a", "12s", "12m", "12h", "12d", "12w", "12n", "12y",
"30b", "30u", "30a", "30s", "30m", "30h", "30d", "30w", "30n", "30y",
"365b", "365u", "365a", "365s", "365m", "365h", "365d", "365w", "365n", "365y"
]
duration_lists_invalid = [
"1x", "2x", "5x", "7x", "12x", "30x", "365x",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
expired_time_list_valid = [
"1a", "1s", "1m", "1h", "1d",
"2a", "2s", "2m", "2h", "2d",
"5a", "5s", "5m", "5h", "5d",
"7a", "7s", "7m", "7h", "7d",
"12a", "12s", "12m", "12h", "12d",
"30a", "30s", "30m", "30h", "30d",
"365a", "365s", "365m", "365h", "365d",
]
expired_time_list_invalid = [
"1x", "2x", "5x", "7x", "12x", "30x", "365x",
"1b", "2b", "5b", "7b", "12b", "30b", "365b",
"1u", "2u", "5u", "7u", "12u", "30u", "365u",
"1w", "2w", "5w", "7w", "12w", "30w", "365w",
"1n", "2n", "5n", "7n", "12n", "30n", "365n",
"1y", "2y", "5y", "7y", "12y", "30y", "365y",
"a", "b", "c", "d", "e", "f", "g", "h", "i", "j",
]
start_time_valid = [" '2025-05-27 14:29:42' ", " '1970-01-01 08:00:00' ",
" 2025-05-27 14:29:42 ", " 1970-01-01 08:00:00 ",
" 1748327382161 ", " 1 "]
start_time_invalid = [" '2025-05-27 14:29:42:00' ", " '1970-01-01 08:00:00:00' ",
" 2025-05-27 14:29:42:00 ", " 1970-01-01 08:00:00:00 ",
" 1748327382161:00 ", " 1:00 ", " '2025-05-27 14:29' ",
" '2025-05-27' ", " '2025-05' ", " '2025' ",
" '2025-05-27T14:29:42Z' ", " '2025-05-27T14:29Z' ",
" '2025-05-27T14Z' ", " '2025-05-27T' ",
" '2025-05T14:29:42Z' ", " '2025-05T14Z' ",
" '2025-05T' ", " '2025T14:29:42Z' ", " '2025T14Z' ",
" '2025T' ", "'invalid_time'", "'invalid_date'",
"'invalid_timestamp'", "'invalid_format'",
"'another_invalid_format'", "'yet_another_invalid_format'"]
event_types_valid = ["WINDOW_OPEN", "WINDOW_CLOSE"]
event_types_invalid = ["INVALID_EVENT", "ANOTHER_INVALID_EVENT", ""]
urls_valid = [" 'http://example.com/notify' ", " 'http://localhost:8000/callback' ", " 'https://api.test.com/hook' "]
urls_invalid = [" 'invalid_url' ", " http://example.com/invalid ", " 12345678 "]
notify_option_valid = ["NOTIFY_HISTORY", "ON_FAILURE_PAUSE"]
notify_option_invalid = ["NOTIFY_WHAT", "NOTIFY_INVALID"]
def random_from_list(lst, n=1):
"""Return n random elements from a list."""
if n == 1:
return random.choice(lst)
return random.sample(lst, n)
def random_bool(prob=0.5):
"""Return True with the given probability."""
return random.random() < prob
def random_int(a, b):
"""Return a random integer between a and b, inclusive."""
return random.randint(a, b)
def generate_arithmetic_expr(column_list):
left = random_from_list(column_list)
right = random_from_list(column_list + [str(random_int(1, 100))])
operator = random_from_list(arith_ops)
return f"({left} {operator} {right})"
def generate_atomic_condition(full_list=None, valid_list=None, valid=True):
if valid:
column_list = valid_list
else:
column_list = list(set(full_list) - set(valid_list))
left_expr = (
generate_arithmetic_expr(column_list) if random_bool(0.3) else random_from_list(column_list)
)
op = random_from_list(ops)
right_expr = (
generate_arithmetic_expr(column_list) if random_bool(0.3) else random_from_list(column_list + [str(random_int(1, 100))])
)
return f"{left_expr} {op} {right_expr}"
def generate_logical_condition(max_depth=2, current_depth=0, full_column_list=None, valid_column_list=None, valid=True):
if current_depth >= max_depth or random_bool(0.4):
return generate_atomic_condition(full_column_list, valid_column_list, valid)
else:
left = generate_logical_condition(max_depth, current_depth + 1, full_column_list, valid_column_list, valid)
right = generate_logical_condition(max_depth, current_depth + 1, full_column_list, valid_column_list, valid)
op = random_from_list(logic_ops)
return f"({left} {op} {right})"
def generate_event_window_conditions(num_pairs=10):
return [
f"EVENT_WINDOW(START WITH {generate_logical_condition()} END WITH {generate_logical_condition()})"
for _ in range(num_pairs)
]
def generate_trigger_section():
triggers = []
# SESSION
for col in columns:
dur = random_from_list(duration_lists)
triggers.append(f" SESSION({col}, '{dur}') ")
triggers.append(f" SESSION({col}, {dur}) ")
# STATE_WINDOW
for col in columns:
triggers.append(f" STATE_WINDOW({col}) ")
dur = random_from_list(duration_lists)
triggers.append(f" STATE_WINDOW({col}) TRUE_FOR('{dur}') ")
# INTERVAL + SLIDING
max_sliding_count = 20
for _ in range(0, max_sliding_count + 1):
interval = random_from_list(duration_lists)
offset = random_from_list(duration_lists)
slide = random_from_list(duration_lists)
slide_offset = random_from_list(duration_lists)
int_part = f" INTERVAL('{interval}') "
int_part_with_offset = f" INTERVAL('{interval}', '{offset}') "
slide_part = f" SLIDING('{slide}') "
slide_part_with_offset = f" SLIDING('{slide}', '{slide_offset}') "
triggers.extend([
slide_part,
slide_part_with_offset,
f" {int_part} {slide_part} ",
f" {int_part} {slide_part_with_offset} ",
f" {int_part_with_offset} {slide_part} ",
f" {int_part_with_offset} {slide_part_with_offset} "
])
# EVENT_WINDOW
max_event_count = 20
for _ in range(0, max_event_count + 1):
start = generate_logical_condition()
end = generate_logical_condition()
ew = f" EVENT_WINDOW(START WITH {start} END WITH {end}) "
triggers.append(ew)
# COUNT_WINDOW
max_col_len = 3
max_samples_per_len = 10
for count in [1, 10, 20]:
for slide in [None, 10, 20]:
for length in range(0, max_col_len + 1):
all_combinations = list(product(columns, repeat=length))
sampled_combinations = (
random.sample(all_combinations, min(len(all_combinations), max_samples_per_len))
if all_combinations else [()]
)
for cols in sampled_combinations:
parts = [str(count)]
if slide is not None:
parts.append(str(slide))
if cols:
parts.extend(cols)
triggers.append(f" COUNT_WINDOW({', '.join(parts)}) ")
# PERIOD
max_period_count = 20
for _ in range(0, max_period_count + 1):
period = random_from_list(duration_lists)
offset = random_from_list(duration_lists)
triggers.append(f" PERIOD('{period}', '{offset}') ")
triggers.append(f" PERIOD('{period}') ")
return triggers
def generate_random_event_types(valid=True):
if valid:
types = [random_from_list(event_types_valid) for _ in range(random_int(1, 2))]
else:
types = [random_from_list(event_types_invalid)] + [random_from_list(event_types_valid) for _ in range(random_int(1, 2))]
return "|".join(types)
def random_option(valid=True, partition_list=None):
if valid:
watermark_duration = random_from_list(duration_lists_valid)
expired_time = random_from_list(expired_time_list_valid)
start_time = random_from_list(start_time_valid)
max_delay = random_from_list(expired_time_list_valid)
else:
watermark_duration = random_from_list(duration_lists_invalid)
expired_time = random_from_list(expired_time_list_invalid)
start_time = random_from_list(start_time_invalid)
max_delay = random_from_list(expired_time_list_invalid)
prev_filter = generate_logical_condition(
full_column_list = list(set(partition_columns_valid + partition_columns_invalid) - set(partition_list)),
valid_column_list = partition_list,
valid = valid)
option_type = random_from_list([
lambda: f"WATERMARK({watermark_duration})",
lambda: f"EXPIRED_TIME({expired_time})",
lambda: "IGNORE_DISORDER",
lambda: "DELETE_RECALC",
lambda: "DELETE_OUTPUT_TABLE",
lambda: f"FILL_HISTORY({start_time})",
lambda: f"FILL_HISTORY_FIRST({start_time})",
lambda: "CALC_NOTIFY_ONLY",
lambda: "LOW_LATENCY_CALC",
lambda: f"PRE_FILTER({prev_filter})",
lambda: "FORCE_OUTPUT",
lambda: f"MAX_DELAY({max_delay})",
lambda: f"EVENT_TYPE({generate_random_event_types(valid)})"
])
return option_type()
def generate_options_section(max_options=10, partition_list = None, trigger_null = False):
options = pick_random_combo(random_option(valid=True, partition_list=partition_list), max_options)
rand_val = random.random()
if rand_val < 0.2:
# 20% chance to generate empty options clause
return "", True
elif rand_val < 0.3:
# 10% chance to generate invalid options clause
options.append(random_option(valid=False, partition_list=partition_list))
valid = False
else:
# 70% chance to generate valid options clause
options.append(random_option(valid=True, partition_list=partition_list))
valid = True
combined = '|'.join(options)
# FILL_HISTORY and FILL_HISTORY_FIRST cannot be used together
if "FILL_HISTORY(" in combined and "FILL_HISTORY_FIRST(" in combined:
valid = False
# If no trigger table, options should not appear
if trigger_null:
valid = False
return f" OPTIONS({combined}) ", valid
def pick_random_combo(source_list, max_len):
if max_len == 0:
return []
length = random_int(1, max_len)
return [random_from_list(source_list) for _ in range(length)] if length > 0 else []
string_literals = ["'_v1'", "'_2024'", "'_tag'", "'_out'", "'_ts'", "'_X'"]
numeric_literals = [str(i) for i in range(0, 10)]
arithmetic_ops = ['+', '-', '*', '/', '%']
numeric_func_names = ['abs', 'acos', 'cos', 'asin', 'sin', 'log', 'floor', 'ceil', 'round']
string_func_names = [
'concat', 'upper', 'lower', 'length', 'substr',
'replace', 'ltrim', 'rtrim', 'trim'
]
def random_expr_atom():
return random.choices(
population=columns + string_literals,
weights=[7] * len(columns) + [3] * len(string_literals),
k=1
)[0]
def random_numeric_atom():
return random.choices(
population=columns + numeric_literals,
weights=[7] * len(columns) + [3] * len(numeric_literals),
k=1
)[0]
def gen_string_func(func, expr=None):
if func == 'concat':
args = [random_expr_atom() for _ in range(random_int(2, 4))]
return f"concat({', '.join(args)})"
elif func == 'upper':
return f"upper({expr or random_expr_atom()})"
elif func == 'lower':
return f"lower({expr or random_expr_atom()})"
elif func == 'length':
return f"length({expr or random_expr_atom()})"
elif func == 'substr':
expr = expr or random_expr_atom()
start = str(random_int(0, 3))
length = str(random_int(1, 5))
return f"substr({expr}, {start}, {length})"
elif func == 'replace':
expr = expr or random_expr_atom()
search = random_from_list(string_literals)
repl = random_from_list(string_literals)
return f"replace({expr}, {search}, {repl})"
elif func == 'ltrim':
return f"ltrim({expr or random_expr_atom()})"
elif func == 'rtrim':
return f"rtrim({expr or random_expr_atom()})"
elif func == 'trim':
return f"trim({expr or random_expr_atom()})"
else:
raise ValueError(f"Unknown string func: {func}")
def gen_numeric_expr(depth=0, max_depth=3):
if depth >= max_depth or random_bool(0.3):
return random_numeric_atom()
if random_bool(0.4):
# function
func = random_from_list(numeric_func_names)
return gen_numeric_func(func, gen_numeric_expr(depth + 1, max_depth))
else:
# operators
left = gen_numeric_expr(depth + 1, max_depth)
op = random_from_list(arithmetic_ops)
right = gen_numeric_expr(depth + 1, max_depth)
return f"({left} {op} {right})"
def gen_numeric_func(func, expr=None):
expr = expr or gen_numeric_expr(depth=2)
return f"{func}({expr})"
def gen_string_expr(depth, max_depth):
if depth >= max_depth or random_bool(0.3):
return random_expr_atom()
func = random_from_list(string_func_names)
inner = gen_string_expr(depth + 1, max_depth)
return gen_string_func(func, inner)
def generate_tag_expr(max_depth=3):
if random_bool(0.5):
# generate string type expression
return gen_string_expr(0, max_depth)
else:
# generate numeric type expression
return gen_numeric_expr(depth=0, max_depth=max_depth)
def generate_output_subtable(max_depth=3, include_probability=0.7):
if not random_bool(include_probability):
return "" # Do not include OUTPUT_SUBTABLE
expr = gen_string_expr(0, max_depth)
return f" OUTPUT_SUBTABLE({expr}) "
def generate_column_section_base(out_col_list, include_probability=0.8, max_cols=6, with_primary_key_prob=0.6):
if not random_bool(include_probability):
return ""
num_cols = random_int(1, min(max_cols, len(out_col_list)))
selected = random.sample(out_col_list, num_cols)
with_primary = random_bool(with_primary_key_prob)
pk_index = random_int(0, num_cols - 1) if with_primary else None
col_defs = []
for i, col in enumerate(selected):
if i == pk_index:
col_defs.append(f"{col} PRIMARY KEY")
else:
col_defs.append(col)
return f" ({', '.join(col_defs)}) "
def generate_column_list_section(include_probability=0.8, max_cols=6, with_primary_key_prob=0.6):
return generate_column_section_base(out_columns, include_probability, max_cols, with_primary_key_prob)
out_types = ["BIGINT", "SMALLINT"]
def random_string(length=5):
return ''.join(random.choices(string.ascii_letters + string.digits, k=length))
def generate_tags_clause(include_probability=0.7, max_tags=4, allow_comment=True):
if not random_bool(include_probability):
return ""
num_tags = random_int(1, min(max_tags, len(out_tags)))
selected_tags = random.sample(out_tags, num_tags)
tag_defs = []
for tag in selected_tags:
type_name = random_from_list(out_types)
comment_str = f" COMMENT '{random_string(6)}'" if allow_comment and random_bool(0.5) else ""
expr = generate_tag_expr(max_depth=2)
tag_defs.append(f"{tag} {type_name}{comment_str} AS {expr}")
return f" TAGS ({', '.join(tag_defs)}) "
def generate_random_stream_db_section():
if random_bool():
# Generate a valid database name
dbname = random_from_list(db_name_list_valid)
return f" {dbname}", True
else:
# Generate an invalid database name
dbname = random_from_list(db_name_list_invalid)
return f" {dbname}", False
# return a tuple (trigger_table, is_valid, trigger_null, has_tag)
def generate_random_trigger_table_section():
if random_bool(0.2):
# Do not generate a trigger table
return "", True, True, False
else :
if random_bool():
# Generate a valid database name
dbname = random_from_list(db_name_list_valid)
if random_bool():
# Generate a valid trigger table name
trigger_table = random_from_list(trigger_table_list_valid)
if trigger_table == "trigger_table":
return f" FROM {dbname}{trigger_table} ", True, False, False
else :
return f" FROM {dbname}{trigger_table} ", True, False, True
else:
# Generate an invalid trigger table name
trigger_table = random_from_list(trigger_table_list_invalid)
return f" FROM {dbname}{trigger_table} ", False, False, False
else:
# Generate an invalid database name
dbname = random_from_list(db_name_list_invalid)
if random_bool():
# Generate a valid trigger table name
trigger_table = random_from_list(trigger_table_list_valid)
return f" FROM {dbname}{trigger_table} ", False, False, False
else:
# Generate an invalid trigger table name
trigger_table = random_from_list(trigger_table_list_invalid)
return f" FROM {dbname}{trigger_table} ", False, False, False
def generate_random_into_table_section():
if random_bool():
# Generate a valid into table section
into_table = random_from_list(into_option_list_valid)
if into_table == "":
return f" {into_table} ", True, True
else:
return f" {into_table} ", True, False
else:
# Generate an invalid into table section
into_table = random_from_list(into_option_list_invalid)
return f" {into_table} ", False, False
def generate_random_partition_section(max_partition_len = 5, trigger_null = False, trigger_has_tag = False):
rand_val = random.random()
if trigger_null:
if random_bool(0.2):
# 20% chance to generate invalid partition clause
selected = pick_random_combo(partition_columns_valid, max_partition_len)
return f" PARTITION BY {', '.join(selected)} ", False, selected
else:
return "", True, []
if trigger_has_tag:
valid_columns = partition_columns_valid
invalid_columns = partition_columns_invalid
else:
valid_columns = ["tbname"]
invalid_columns = list(set(partition_columns_valid + partition_columns_invalid) - set(valid_columns))
# 20% chance to generate empty partition clause
if rand_val < 0.2:
return "", True, []
# 30% chance to generate invalid partition clause
elif rand_val < 0.5:
selected = pick_random_combo(valid_columns, max_partition_len - 1) + pick_random_combo(invalid_columns, 1)
return f" PARTITION BY {', '.join(selected)} ", False, selected
# 50% chance to generate valid partition clause
else:
selected = pick_random_combo(valid_columns, max_partition_len)
return f" PARTITION BY {', '.join(selected)} ", True, selected
def generate_random_notif_def_section(
max_urls=2, max_events=2, max_options=2, max_condition_depth=2, trigger_null = False
):
# Each part has 20% chance to be invalid, and 80% chance to be valid. And total's invalid change is also 20%.
valid = True
if trigger_null and random_bool(0.8):
# 80% chance to generate empty NOTIFY clause
return "", True
# optional NOTIFY(url [, ...])
if random_bool(0.2):
# 20% chance to generate invalid NOTIFY clause
if random_bool(0.5):
notify_urls = ""
else:
# pick at least one URL, but may include invalid URLs
notify_url = pick_random_combo(urls_valid, max_urls - 1) + pick_random_combo(urls_invalid, 1)
notify_urls = f" NOTIFY({', '.join(notify_url)}) "
valid = False
else:
# 80% chance to generate valid NOTIFY clause
# pick at least one URL
notify_url = pick_random_combo(urls_valid, max_urls)
notify_urls = f" NOTIFY({', '.join(notify_url)}) "
# optional ON (event_types)
if valid == True and random_bool(0.2):
# 20% chance to generate invalid NOTIFY_OPTIONS clause
notify_event = pick_random_combo(event_types_valid, max_events - 1) + pick_random_combo(event_types_invalid, 1)
notify_events = f" ON ({'|'.join(notify_event)}) "
valid = False
else:
# 80% chance to generate valid NOTIFY_OPTIONS clause
if random_bool(0.2):
# 20% chance to generate empty ON clause
notify_events = ""
else:
selected_events = pick_random_combo(event_types_valid, max_events)
notify_events = f" ON ({'|'.join(selected_events)}) "
# optional WHERE condition (using generate_logical_condition)
if valid == True and random_bool(0.2):
# 20% chance to generate invalid WHERE clause
# TODO(smj) : pass in the query's full column list and valid column list to generate_logical_condition
condition = generate_logical_condition(max_depth=max_condition_depth, valid=False)
notify_conditions = f" WHERE {condition} "
valid = False
else:
# 80% chance to generate valid WHERE clause
if random_bool(0.2):
# 20% chance to generate empty WHERE clause
notify_conditions = ""
else:
# TODO(smj) : pass in the query's full column list and valid column list to generate_logical_condition
condition = generate_logical_condition(max_depth=max_condition_depth, valid=True)
notify_conditions = f" WHERE {condition} "
# optional NOTIFY_OPTIONS(...)
if valid == True and random_bool(0.2):
# 20% chance to generate invalid NOTIFY_OPTIONS clause
notify_option = pick_random_combo(notify_option_valid, max_options - 1) + pick_random_combo(notify_option_invalid, 1)
notify_options = f" NOTIFY_OPTIONS({'|'.join(notify_option)}) "
valid = False
else:
# 80% chance to generate valid NOTIFY_OPTIONS clause
if random_bool(0.2):
# 20% chance to generate empty NOTIFY_OPTIONS clause
notify_options = ""
else:
# pick at least one option
notify_option = pick_random_combo(notify_option_list, max_options)
notify_options = f" NOTIFY_OPTIONS({'|'.join(notify_option)}) "
if notify_urls != "" or notify_events != "" or notify_conditions != "" or notify_options != "":
if trigger_null:
valid = False
return notify_urls + notify_events + notify_conditions + notify_options, valid
def gen_create_stream_variants():
base_template = "CREATE STREAM{if_not_exists} {stream_name}{stream_options}{into_clause}{output_subtable}{columns}{tags}{as_subquery};"
trigger_types = generate_trigger_section()
sql_variants = []
stream_index = 0
for if_not_exists, as_subquery in product(
if_not_exists_opts, as_subquery_opts
):
for trigger_type in trigger_types:
stream_db, v1 = generate_random_stream_db_section()
trigger_table, v2, trigger_null, trigger_has_tag = generate_random_trigger_table_section()
into_table, v3, into_null = generate_random_into_table_section()
partition, v4, partition_cols = generate_random_partition_section(trigger_null = trigger_null, trigger_has_tag = trigger_has_tag)
stream_opt, v5 = generate_options_section(partition_list=partition_cols, trigger_null = trigger_null)
notify_opt, v6 = generate_random_notif_def_section(trigger_null = trigger_null)
sql = base_template.format(
if_not_exists=if_not_exists,
stream_name=stream_db + "stream_" + str(stream_index),
stream_options=trigger_type + trigger_table + partition + stream_opt + notify_opt,
into_clause=into_table,
output_subtable=generate_output_subtable(),
columns=generate_column_list_section(),
tags=generate_tags_clause() + " ",
as_subquery=as_subquery
)
sql_variants.append(sql.strip())
stream_index += 1
if stream_index > 100000:
return sql_variants
print(stream_index)
return sql_variants
for i in range(10000):
print("======================")
sql, valid = generate_options_section(partition_list=partition_columns_valid)
print(sql)