OpenMetadata/ingestion/tests/unit/test_query_parser.py
IceS2 e9c87c6adb
chore(ingestion): drop pylint, expand ruff (#27774)
* chore(ingestion): drop pylint, expand ruff to Stage 2c

Replace pylint with a coherent ruff-only stack (Stage 2c of the modernize
roadmap). Pylint is dropped from dev deps and CI workflows; ruff selected
ruleset expanded to ~22 families covering style, bug catchers, hygiene,
and the pylint port (PLE/PLC/PLW/PLR with the noisy "too-many-X"
complexity caps + magic-value disabled).

What's selected (with rationale in pyproject.toml):
  E, W, F, I, N         — style + correctness baseline + naming
  UP                    — pyupgrade (py>=3.10 modernizations)
  B, C4, C90, RET, SIM, TRY  — bug catchers
  PIE, ICN, T20, TC, TID, PTH, PERF  — hygiene
  PLE, PLC, PLW, PLR    — pylint port (PLR complexity caps ignored)
  RUF                   — ruff-native (incl. RUF100 unused-noqa)

What's removed:
  - .pylintrc (root) — duplicate of the ingestion pylint config
  - [tool.pylint.*] block in ingestion/pyproject.toml (~140 lines)
  - ingestion/plugins/{print_checker,import_checker}.py + tests + README
    (replaced by built-in T20 + TID251 banned-api respectively)
  - pylint dep from ingestion/setup.py and openmetadata-airflow-apis/pyproject.toml
  - `make lint` Makefile target + the pylint invocation in py_format_check
  - dead pylint TODO comment + ignored test entry in noxfile.py

Cwd-stable config: ruff is invoked both from the repo root (pre-commit,
CI) and from ingestion/ (`make py_format_check`). The `src`,
`extend-exclude`, and per-file-ignores entries are listed twice — once
relative to ingestion/ and once with the `ingestion/` prefix — so
first-party isort detection and exclusions match in both invocations.

Grandfathering: ran `ruff check --add-noqa` once + format-stable
iteration. ~12,130 noqa directives across ~1,400 files. Cleanup is
deferred to follow-up PRs that drop noqas one rule at a time.

Documentation sweep: replaced `make lint` references in CLAUDE.md,
AGENTS.md, DEVELOPER.md, copilot-instructions, and 6 SKILL files with
the apply+verify shape `make py_format && make py_format_check`.
`make py_format` is NOT a strict superset of pylint — it only applies
auto-fixable violations; `make py_format_check` catches the rest.

Basedpyright baseline regenerated: ruff format reflowed multi-line
signatures in ~70 files, shifting type-error column positions. The
basedpyright baseline matches by (file path, error code, range), so
column shifts caused 19 entries to mis-align. Net diff is small
(154 lines in/out of the 13MB baseline.json) — purely positional.

Verified locally:
  - make py_format_check         → All checks passed
  - nox --no-venv -s static-checks → 0 errors, 0 warnings, 0 notes

* chore(ingestion): finish ruff swap — nox lint session + skill docs

Three remaining stale-tooling references after Stage 2c:

  - `ingestion/noxfile.py` `lint` session was still calling `black --check`,
    `isort --check-only`, `pycln --diff`. Those tools aren't installed
    anywhere (we dropped them from dev deps). Replace with the ruff
    equivalents that mirror `make py_format_check`.
  - `skills/standards/code_style.md`: stack listed as `black + isort +
    pycln`; line length claimed 88 (black default). Both wrong: stack is
    ruff, line length is 120.
  - `skills/connector-building/SKILL.md`: `make py_format` comment said
    `# black + isort + pycln`. Same swap.

* chore(ingestion): keep main's baseline + globally ignore TRY400

Per gitar-bot's review on PR #27774:

1. Main's PR #27728 promoted ~60 `logger.warning()` → `logger.error()`
   inside `except` blocks. Those changes landed on main with their own
   baseline updates. Our PR doesn't promote anything — the merge from
   origin/main brought those `error` calls along with their baseline
   entries.

   The bot interpreted the `# noqa: TRY400` we added next to those lines
   as us silencing the rule case-by-case. Cleaner: globally ignore
   TRY400 in pyproject.toml, with a comment explaining why the codebase's
   `logger.error(...)` + separate `logger.debug(traceback.format_exc())`
   pattern is intentional. Strip ~430 per-line `# noqa: TRY400` markers
   from source.

2. Document that `S101` in `per-file-ignores` is a forward-looking
   entry — flake8-bandit (`S`) is not yet selected, so the rule is
   no-op today; the entry stays so when `S` lands later, tests don't
   immediately error.

Reverts the platform pin and Linux Docker–generated baseline. Keep
main's baseline intact and let CI surface the exact column-shifted
entries; the team will decide whether to fix in-place (revert format
on affected files) or add per-line `# pyright: ignore` markers.

* chore(ingestion): regen baseline for new connector type debt

Main's baseline was stale relative to recently-added connectors
(McpConnection, CustomDriveConnection) that lack common attributes
like `hostPort`, `database`, `catalog` etc. — all sites that access
those attributes via the union-typed `serviceConnection.root.config`
fire `reportAttributeAccessIssue` errors that aren't baselined.

71 errors + 58 warnings absorbed. Local macOS regen; pushing to see
CI's drift count. Per the basedpyright-baseline-and-ci PR experience,
macOS↔Linux column drift on this size of regen has historically been
1-7 residuals.
2026-04-28 07:21:59 +02:00

651 lines
23 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Validate query parser logic
"""
from unittest import TestCase
import pytest
from collate_sqllineage.core.models import Column, Location, Table
from metadata.generated.schema.type.tableUsageCount import TableColumn, TableColumnJoin
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.parser import LineageParser
class QueryParserTests(TestCase):
"""
Check methods from query_parser.py
"""
col_lineage = """
SELECT
a.col1,
a.col2 + b.col2 AS col2,
case
when col1 = 3 then 'hello'
else 'bye'
end as new_col
FROM foo a
JOIN db.grault b
ON a.col1 = b.col1
JOIN db.holis c
ON a.col1 = c.abc
JOIN db.random d
ON a.col2 = d.col2
WHERE a.col3 = 'abc'
""" # noqa: W291
parser = LineageParser(col_lineage)
parser_with_dialect = LineageParser(col_lineage, dialect=Dialect.TSQL)
def test_involved_tables(self):
expected_tables = {"db.grault", "db.holis", "<default>.foo", "db.random"}
tables = {str(table) for table in self.parser.involved_tables}
self.assertEqual(tables, expected_tables)
tables = {str(table) for table in self.parser_with_dialect.involved_tables}
self.assertEqual(tables, expected_tables)
def test_clean_parser_table_list(self):
expected_tables = {"db.grault", "db.holis", "foo", "db.random"}
clean_tables = set(self.parser.clean_table_list)
self.assertEqual(clean_tables, expected_tables)
clean_tables = set(self.parser_with_dialect.clean_table_list)
self.assertEqual(clean_tables, expected_tables)
def test_bracketed_parser_table_list(self):
expected_tables = {"test_schema.test_view", "test_table"}
parser = LineageParser("create view [test_schema].[test_view] as select * from [test_table];")
clean_tables = set(parser.clean_table_list)
self.assertEqual(clean_tables, expected_tables)
parser = LineageParser(
"create view [test_schema].[test_view] as select * from [test_table];",
dialect=Dialect.TSQL,
)
clean_tables = set(parser.clean_table_list)
self.assertEqual(clean_tables, expected_tables)
def test_parser_table_aliases(self):
expected_tables = {
"b": "db.grault",
"c": "db.holis",
"a": "foo",
"d": "db.random",
}
aliases = self.parser.table_aliases
self.assertEqual(aliases, expected_tables)
aliases = self.parser_with_dialect.table_aliases
self.assertEqual(aliases, expected_tables)
def test_get_table_joins(self):
"""
main logic point
"""
expected_joins = [
TableColumnJoin(
tableColumn=TableColumn(table="foo", column="col1"),
joinedWith=[
TableColumn(table="db.grault", column="col1"),
TableColumn(table="db.holis", column="abc"),
],
),
TableColumnJoin(
tableColumn=TableColumn(table="foo", column="col2"),
joinedWith=[
TableColumn(table="db.random", column="col2"),
],
),
]
joins = self.parser.table_joins
self.assertEqual(
joins["foo"],
expected_joins,
)
joins = self.parser_with_dialect.table_joins
self.assertEqual(
joins["foo"],
expected_joins,
)
def test_capitals(self):
"""
Example on how LineageRunner keeps capitals
for column names
"""
query = """
SELECT
USERS.ID,
li.id
FROM TESTDB.PUBLIC.USERS
JOIN testdb.PUBLIC."lowercase_users" li
ON USERS.id = li.ID
;
"""
expected_joins = [
TableColumnJoin(
tableColumn=TableColumn(table="testdb.public.users", column="id"), # lowercase col
joinedWith=[
TableColumn(table="testdb.public.lowercase_users", column="ID"), # uppercase col
],
),
]
parser = LineageParser(query)
joins = parser.table_joins
self.assertEqual(
joins["testdb.public.users"],
expected_joins,
)
parser = LineageParser(query, dialect=Dialect.MYSQL)
joins = parser.table_joins
self.assertEqual(
joins["testdb.public.users"],
expected_joins,
)
def test_clean_raw_query_copy_grants(self):
"""
Validate COPY GRANT query cleaning logic
"""
query = "create or replace view my_view copy grants as select * from my_table"
self.assertEqual(
LineageParser.clean_raw_query(query),
"create or replace view my_view as select * from my_table",
)
def test_clean_raw_query_merge_into(self):
"""
Validate MERGE INTO query cleaning logic
"""
query = """
/* comment */ merge into table_1 using (select a, b from table_2) when matched update set t.a = 'value'
when not matched then insert (table_1.a, table_2.b) values ('value1', 'value2')
""" # noqa: W291
self.assertEqual(
LineageParser.clean_raw_query(query),
"/* comment */ merge into table_1 using (select a, b from table_2)",
)
def test_clean_raw_query_copy_from(self):
"""
Validate COPY FROM query cleaning logic
"""
query = "COPY my_schema.my_table FROM 's3://bucket/path/object.csv';"
self.assertEqual(
LineageParser.clean_raw_query(query),
None,
)
# TODO: Fix this case at the earliest
@pytest.mark.skip(reason="Flaky with sqlglot parser, returns no column lineage or correct column lineage randomly.")
def test_ctes_column_lineage(self):
"""
Validate we obtain information from Common Table Expressions
"""
query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS
WITH cte_table AS (
SELECT
USERS.ID,
USERS.NAME
FROM TESTDB.PUBLIC.USERS
),
cte_table2 AS (
SELECT
ID,
NAME
FROM cte_table
)
SELECT
ID,
NAME
FROM cte_table2
;
""" # noqa: W291
expected_lineage = [
(
Column("testdb.public.users.id"),
Column("testdb.public.target.id"),
),
(
Column("testdb.public.users.name"),
Column("testdb.public.target.name"),
),
]
parser = LineageParser(query)
tables = {str(table) for table in parser.source_tables}
self.assertEqual(tables, {"testdb.public.users"})
self.assertEqual(
parser.column_lineage,
expected_lineage,
)
parser = LineageParser(query, dialect=Dialect.MYSQL)
tables = {str(table) for table in parser.source_tables}
self.assertEqual(tables, {"testdb.public.users"})
self.assertEqual(
parser.column_lineage,
expected_lineage,
)
def test_table_with_single_comment(self):
"""
Validate we obtain information from Comon Table Expressions
"""
query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS
SELECT
ID,
-- A comment here
NAME
FROM TESTDB.PUBLIC.USERS
;
"""
expected_tables = {"testdb.public.users", "testdb.public.target"}
expected_lineage = [
(Column("testdb.public.users.id"), Column("testdb.public.target.id")),
(
Column("testdb.public.users.name"),
Column("testdb.public.target.name"),
),
]
parser = LineageParser(query)
tables = {str(table) for table in parser.involved_tables}
self.assertEqual(tables, expected_tables)
self.assertEqual(
parser.column_lineage,
expected_lineage,
)
parser = LineageParser(query, Dialect.MYSQL)
tables = {str(table) for table in parser.involved_tables}
self.assertEqual(tables, expected_tables)
self.assertEqual(
parser.column_lineage,
expected_lineage,
)
def test_table_with_aliases(self):
"""
Validate we obtain information from Comon Table Expressions
"""
query = """CREATE TABLE TESTDB.PUBLIC.TARGET AS
SELECT
ID AS new_identifier,
NAME new_name
FROM TESTDB.PUBLIC.USERS
;
"""
expected_lineage = [
(
Column("testdb.public.users.id"),
Column("testdb.public.target.new_identifier"),
),
(
Column("testdb.public.users.name"),
Column("testdb.public.target.new_name"),
),
]
expected_tables = {"testdb.public.users", "testdb.public.target"}
parser = LineageParser(query)
tables = {str(table) for table in parser.involved_tables}
self.assertEqual(tables, expected_tables)
self.assertEqual(
parser.column_lineage,
expected_lineage,
)
parser = LineageParser(query, Dialect.MYSQL)
tables = {str(table) for table in parser.involved_tables}
self.assertEqual(tables, expected_tables)
self.assertEqual(
parser.column_lineage,
expected_lineage,
)
def test_copy_query(self):
"""
Validate Copy query is skipped appropriately without any errors
"""
query = """COPY MY_TABLE col1,col2,col3
FROM 's3://bucket/schema/table.csv'
WITH CREDENTIALS ''
REGION 'US-east-2'
"""
expected_lineage = []
expected_tables = set()
parser = LineageParser(query)
tables = {str(table) for table in parser.involved_tables}
self.assertEqual(tables, expected_tables)
self.assertEqual(
parser.column_lineage,
expected_lineage,
)
parser = LineageParser(query, Dialect.MYSQL)
tables = {str(table) for table in parser.involved_tables}
self.assertEqual(tables, expected_tables)
self.assertEqual(
parser.column_lineage,
expected_lineage,
)
def test_clean_raw_query_create_trigger(self):
"""
Validate CREATE TRIGGER query cleaning logic - should return None
"""
query = "CREATE TRIGGER last_updated BEFORE UPDATE ON public.inventory FOR EACH ROW EXECUTE PROCEDURE public.last_updated()"
self.assertEqual(
LineageParser.clean_raw_query(query),
None,
)
# Test with OR REPLACE
query_or_replace = (
"CREATE OR REPLACE TRIGGER my_trigger AFTER INSERT ON my_table FOR EACH ROW EXECUTE FUNCTION my_func()"
)
self.assertEqual(
LineageParser.clean_raw_query(query_or_replace),
None,
)
def test_clean_raw_query_create_function(self):
"""
Validate CREATE FUNCTION query cleaning logic - should return None
"""
query = """CREATE FUNCTION public.last_updated() RETURNS trigger
LANGUAGE plpgsql
AS $$
BEGIN
NEW.last_update = CURRENT_TIMESTAMP;
RETURN NEW;
END $$"""
self.assertEqual(
LineageParser.clean_raw_query(query),
None,
)
# Test with OR REPLACE
query_or_replace = (
"CREATE OR REPLACE FUNCTION my_schema.my_func() RETURNS void AS $$ BEGIN NULL; END $$ LANGUAGE plpgsql"
)
self.assertEqual(
LineageParser.clean_raw_query(query_or_replace),
None,
)
def test_clean_raw_query_create_procedure(self):
"""
Validate CREATE PROCEDURE query cleaning logic - should return None
"""
query = "CREATE PROCEDURE my_procedure() LANGUAGE plpgsql AS $$ BEGIN NULL; END $$"
self.assertEqual(
LineageParser.clean_raw_query(query),
None,
)
# Test with OR REPLACE
query_or_replace = "CREATE OR REPLACE PROCEDURE my_schema.my_proc() AS $$ BEGIN NULL; END $$ LANGUAGE SQL"
self.assertEqual(
LineageParser.clean_raw_query(query_or_replace),
None,
)
# -------------------------------------------------------------------------
# Snowflake Stage Lineage Tests
# -------------------------------------------------------------------------
def test_clean_raw_query_copy_into_table_from_stage(self):
"""
Validate COPY INTO table FROM @stage queries are NOT filtered out.
These provide lineage from Snowflake stages to tables.
"""
queries = [
"COPY INTO wine_quality FROM @demo FILE_FORMAT = wine_csv_format;",
"COPY INTO my_table FROM @my_stage",
"COPY INTO db.schema.table FROM @external_stage FILE_FORMAT = (TYPE = CSV)",
"copy into target_table from @s3_stage/path/to/files",
"COPY INTO my_table FROM @my_db.my_schema.my_stage FILE_FORMAT=(TYPE=CSV)",
]
for query in queries:
result = LineageParser.clean_raw_query(query)
self.assertIsNotNone(
result,
f"COPY INTO table FROM @stage should NOT be filtered: {query}",
)
def test_clean_raw_query_copy_into_stage_from_table(self):
"""
Validate COPY INTO @stage FROM table (unload) queries are NOT filtered out.
These provide lineage from tables to Snowflake stages.
"""
queries = [
"COPY INTO @my_stage FROM my_table",
"COPY INTO @db.schema.stage FROM (SELECT * FROM t)",
"copy into @stage/path FROM table1",
"COPY INTO @~/ FROM my_table FILE_FORMAT = (TYPE = CSV COMPRESSION = GZIP)",
"COPY INTO @~/staged FROM sales_data",
"COPY INTO @my_stage/daily/2024/ FROM reporting.public.daily_metrics",
"COPY INTO @external_stage/path/ FROM (SELECT col1, col2 FROM db.schema.source_table WHERE id > 100)",
]
for query in queries:
result = LineageParser.clean_raw_query(query)
self.assertIsNotNone(
result,
f"COPY INTO @stage FROM table should NOT be filtered: {query}",
)
def test_clean_raw_query_generic_copy_filtered(self):
"""
Validate generic COPY statements (e.g. PostgreSQL) are still filtered out.
"""
queries = [
"COPY my_table FROM '/path/to/file.csv'",
"COPY users FROM '/tmp/data.csv' WITH CSV HEADER",
"COPY table_name FROM STDIN",
"COPY orders FROM 's3://bucket/file.csv'",
"COPY my_table TO '/path/to/file.csv'",
"COPY (SELECT * FROM users) TO '/tmp/output.csv'",
]
for query in queries:
result = LineageParser.clean_raw_query(query)
self.assertIsNone(
result,
f"Generic COPY statement should be filtered: {query}",
)
def test_copy_into_table_from_stage_lineage(self):
"""
Validate COPY INTO table FROM @stage produces correct lineage:
- Source should be a Location (the stage)
- Target should be a Table
"""
query = "COPY INTO wine_quality FROM @demo FILE_FORMAT = wine_csv_format;"
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
self.assertEqual(len(parser.source_tables), 1)
self.assertEqual(len(parser.target_tables), 1)
self.assertIsInstance(parser.source_tables[0], Location)
self.assertIsInstance(parser.target_tables[0], Table)
self.assertEqual(str(parser.source_tables[0]), "<default>.demo")
self.assertEqual(str(parser.target_tables[0]), "<default>.wine_quality")
def test_copy_into_stage_from_table_lineage(self):
"""
Validate COPY INTO @stage FROM table produces correct lineage:
- Source should be a Table
- Target should be a Location (the stage)
"""
query = "COPY INTO @my_stage FROM my_table"
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
self.assertEqual(len(parser.source_tables), 1)
self.assertEqual(len(parser.target_tables), 1)
self.assertIsInstance(parser.source_tables[0], Table)
self.assertIsInstance(parser.target_tables[0], Location)
self.assertEqual(str(parser.source_tables[0]), "<default>.my_table")
self.assertEqual(str(parser.target_tables[0]), "<default>.my_stage")
def test_copy_into_stage_from_select_lineage(self):
"""
Validate COPY INTO @stage FROM (SELECT ...) produces correct lineage:
- Source should be the underlying Table from the SELECT
- Target should be a Location (the stage)
"""
query = "COPY INTO @db.schema.my_stage FROM (SELECT col1, col2 FROM my_table)"
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
self.assertEqual(len(parser.source_tables), 1)
self.assertEqual(len(parser.target_tables), 1)
self.assertIsInstance(parser.source_tables[0], Table)
self.assertIsInstance(parser.target_tables[0], Location)
self.assertEqual(str(parser.source_tables[0]), "<default>.my_table")
self.assertEqual(str(parser.target_tables[0]), "db.schema.my_stage")
def test_copy_into_fully_qualified_stage_lineage(self):
"""
Validate COPY INTO table FROM @db.schema.stage with fully qualified stage name.
"""
query = "COPY INTO my_table FROM @my_db.my_schema.my_stage FILE_FORMAT=(TYPE=CSV)"
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
self.assertEqual(len(parser.source_tables), 1)
self.assertEqual(len(parser.target_tables), 1)
self.assertIsInstance(parser.source_tables[0], Location)
self.assertIsInstance(parser.target_tables[0], Table)
self.assertEqual(str(parser.source_tables[0]), "my_db.my_schema.my_stage")
self.assertEqual(str(parser.target_tables[0]), "<default>.my_table")
def test_copy_into_stage_with_path_lineage(self):
"""
Validate COPY INTO @stage/path/to/data queries produce Location targets.
"""
query = "COPY INTO @my_stage/daily/2024/ FROM reporting.public.daily_metrics"
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
self.assertEqual(len(parser.source_tables), 1)
self.assertEqual(len(parser.target_tables), 1)
self.assertIsInstance(parser.source_tables[0], Table)
self.assertIsInstance(parser.target_tables[0], Location)
self.assertEqual(str(parser.source_tables[0]), "reporting.public.daily_metrics")
def test_copy_into_case_insensitivity(self):
"""
Validate that COPY INTO stage queries work with mixed case.
"""
query = "copy INTO @MY_STAGE from MY_TABLE"
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
self.assertEqual(len(parser.source_tables), 1)
self.assertEqual(len(parser.target_tables), 1)
self.assertIsInstance(parser.source_tables[0], Table)
self.assertIsInstance(parser.target_tables[0], Location)
def test_clean_table_name_location_at_symbol_removal(self):
"""
Validate clean_table_name removes leading @ from Location raw_name.
Note: The Location class already strips @ internally, so this tests
the defensive logic in clean_table_name.
"""
loc = Location("@STAGE_01")
self.assertEqual(loc.raw_name, "STAGE_01")
cleaned = LineageParser.clean_table_name(loc)
self.assertIsInstance(cleaned, Location)
self.assertEqual(cleaned.raw_name, "STAGE_01")
def test_clean_table_name_preserves_location_schema(self):
"""
Validate clean_table_name preserves schema info on Location objects.
"""
loc = Location("@DB.SCHEMA.STAGE_01")
cleaned = LineageParser.clean_table_name(loc)
self.assertIsInstance(cleaned, Location)
self.assertEqual(cleaned.raw_name, "STAGE_01")
self.assertEqual(str(cleaned.schema), "db.schema")
def test_clean_table_name_table_unchanged(self):
"""
Validate clean_table_name still works for regular Table objects.
"""
table = Table("my_table")
cleaned = LineageParser.clean_table_name(table)
self.assertIsInstance(cleaned, Table)
self.assertEqual(cleaned.raw_name, "my_table")
def test_clean_table_name_bracketed_location(self):
"""
Validate clean_table_name handles bracketed Location names.
"""
loc = Location("[STAGE_01]")
cleaned = LineageParser.clean_table_name(loc)
self.assertIsInstance(cleaned, Location)
self.assertEqual(cleaned.raw_name, "STAGE_01")
def test_retrieve_tables_with_location(self):
"""
Validate retrieve_tables includes Location objects in the result.
"""
query = "COPY INTO my_table FROM @my_stage"
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
locations = [t for t in parser.source_tables if isinstance(t, Location)]
tables = [t for t in parser.target_tables if isinstance(t, Table)]
self.assertEqual(len(locations), 1)
self.assertEqual(len(tables), 1)
def test_involved_tables_with_stage_lineage(self):
"""
Validate involved_tables includes both Table and Location for stage queries.
"""
query = "COPY INTO my_table FROM @my_stage"
parser = LineageParser(query, dialect=Dialect.SNOWFLAKE)
involved = parser.involved_tables
self.assertIsNotNone(involved)
self.assertEqual(len(involved), 2)
type_names = {type(t).__name__ for t in involved}
self.assertIn("Table", type_names)
self.assertIn("Location", type_names)
def test_standard_copy_query_no_lineage(self):
"""
Validate standard COPY query without stage produces no lineage.
"""
query = """COPY MY_TABLE col1,col2,col3
FROM 's3://bucket/schema/table.csv'
WITH CREDENTIALS ''
REGION 'US-east-2'
"""
parser = LineageParser(query)
self.assertEqual(parser.source_tables, [])
self.assertEqual(parser.target_tables, [])
self.assertEqual(parser.column_lineage, [])