OpenMetadata/ingestion/tests/unit/test_db_utils.py
IceS2 e9c87c6adb
chore(ingestion): drop pylint, expand ruff (#27774)
* chore(ingestion): drop pylint, expand ruff to Stage 2c

Replace pylint with a coherent ruff-only stack (Stage 2c of the modernize
roadmap). Pylint is dropped from dev deps and CI workflows; ruff selected
ruleset expanded to ~22 families covering style, bug catchers, hygiene,
and the pylint port (PLE/PLC/PLW/PLR with the noisy "too-many-X"
complexity caps + magic-value disabled).

What's selected (with rationale in pyproject.toml):
  E, W, F, I, N         — style + correctness baseline + naming
  UP                    — pyupgrade (py>=3.10 modernizations)
  B, C4, C90, RET, SIM, TRY  — bug catchers
  PIE, ICN, T20, TC, TID, PTH, PERF  — hygiene
  PLE, PLC, PLW, PLR    — pylint port (PLR complexity caps ignored)
  RUF                   — ruff-native (incl. RUF100 unused-noqa)

What's removed:
  - .pylintrc (root) — duplicate of the ingestion pylint config
  - [tool.pylint.*] block in ingestion/pyproject.toml (~140 lines)
  - ingestion/plugins/{print_checker,import_checker}.py + tests + README
    (replaced by built-in T20 + TID251 banned-api respectively)
  - pylint dep from ingestion/setup.py and openmetadata-airflow-apis/pyproject.toml
  - `make lint` Makefile target + the pylint invocation in py_format_check
  - dead pylint TODO comment + ignored test entry in noxfile.py

Cwd-stable config: ruff is invoked both from the repo root (pre-commit,
CI) and from ingestion/ (`make py_format_check`). The `src`,
`extend-exclude`, and per-file-ignores entries are listed twice — once
relative to ingestion/ and once with the `ingestion/` prefix — so
first-party isort detection and exclusions match in both invocations.

Grandfathering: ran `ruff check --add-noqa` once + format-stable
iteration. ~12,130 noqa directives across ~1,400 files. Cleanup is
deferred to follow-up PRs that drop noqas one rule at a time.

Documentation sweep: replaced `make lint` references in CLAUDE.md,
AGENTS.md, DEVELOPER.md, copilot-instructions, and 6 SKILL files with
the apply+verify shape `make py_format && make py_format_check`.
`make py_format` is NOT a strict superset of pylint — it only applies
auto-fixable violations; `make py_format_check` catches the rest.

Basedpyright baseline regenerated: ruff format reflowed multi-line
signatures in ~70 files, shifting type-error column positions. The
basedpyright baseline matches by (file path, error code, range), so
column shifts caused 19 entries to mis-align. Net diff is small
(154 lines in/out of the 13MB baseline.json) — purely positional.

Verified locally:
  - make py_format_check         → All checks passed
  - nox --no-venv -s static-checks → 0 errors, 0 warnings, 0 notes

* chore(ingestion): finish ruff swap — nox lint session + skill docs

Three remaining stale-tooling references after Stage 2c:

  - `ingestion/noxfile.py` `lint` session was still calling `black --check`,
    `isort --check-only`, `pycln --diff`. Those tools aren't installed
    anywhere (we dropped them from dev deps). Replace with the ruff
    equivalents that mirror `make py_format_check`.
  - `skills/standards/code_style.md`: stack listed as `black + isort +
    pycln`; line length claimed 88 (black default). Both wrong: stack is
    ruff, line length is 120.
  - `skills/connector-building/SKILL.md`: `make py_format` comment said
    `# black + isort + pycln`. Same swap.

* chore(ingestion): keep main's baseline + globally ignore TRY400

Per gitar-bot's review on PR #27774:

1. Main's PR #27728 promoted ~60 `logger.warning()` → `logger.error()`
   inside `except` blocks. Those changes landed on main with their own
   baseline updates. Our PR doesn't promote anything — the merge from
   origin/main brought those `error` calls along with their baseline
   entries.

   The bot interpreted the `# noqa: TRY400` we added next to those lines
   as us silencing the rule case-by-case. Cleaner: globally ignore
   TRY400 in pyproject.toml, with a comment explaining why the codebase's
   `logger.error(...)` + separate `logger.debug(traceback.format_exc())`
   pattern is intentional. Strip ~430 per-line `# noqa: TRY400` markers
   from source.

2. Document that `S101` in `per-file-ignores` is a forward-looking
   entry — flake8-bandit (`S`) is not yet selected, so the rule is
   no-op today; the entry stays so when `S` lands later, tests don't
   immediately error.

Reverts the platform pin and Linux Docker–generated baseline. Keep
main's baseline intact and let CI surface the exact column-shifted
entries; the team will decide whether to fix in-place (revert format
on affected files) or add per-line `# pyright: ignore` markers.

* chore(ingestion): regen baseline for new connector type debt

Main's baseline was stale relative to recently-added connectors
(McpConnection, CustomDriveConnection) that lack common attributes
like `hostPort`, `database`, `catalog` etc. — all sites that access
those attributes via the union-typed `serviceConnection.root.config`
fire `reportAttributeAccessIssue` errors that aren't baselined.

71 errors + 58 warnings absorbed. Local macOS regen; pushing to see
CI's drift count. Per the basedpyright-baseline-and-ci PR experience,
macOS↔Linux column drift on this size of regen has historically been
1-7 residuals.
2026-04-28 07:21:59 +02:00

665 lines
26 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Unit tests for db_utils module
"""
import uuid
from copy import deepcopy
from unittest import TestCase
from unittest.mock import MagicMock, patch
from metadata.generated.schema.api.lineage.addLineage import AddLineageRequest
from metadata.generated.schema.entity.data.table import Table
from metadata.generated.schema.entity.services.databaseService import (
DatabaseServiceType,
)
from metadata.generated.schema.metadataIngestion.parserconfig.queryParserConfig import (
QueryParserType,
)
from metadata.generated.schema.type.basic import (
EntityName,
FullyQualifiedEntityName,
Uuid,
)
from metadata.generated.schema.type.entityLineage import EntitiesEdge
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.ingestion.api.models import Either
from metadata.ingestion.lineage.models import Dialect
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.ingestion.source.models import TableView
from metadata.utils.db_utils import get_host_from_host_port, get_view_lineage
# Mock LineageTable class to simulate collate_sqllineage.core.models.Table
class MockLineageTable:
def __init__(self, name):
self.raw_name = name
self.schema = None
self.catalog = None
def __str__(self):
return self.raw_name
class TestDbUtils(TestCase):
"""
Test cases for db_utils module
"""
def setUp(self):
"""Set up test fixtures"""
# Clear the search cache to ensure test isolation
search_cache.clear()
self.metadata = MagicMock()
self.service_name = "test_service"
self.connection_type = "postgres"
self.timeout_seconds = 30
# Create a mock table entity
self.table_entity = Table(
id=Uuid(root=uuid.uuid4()),
name=EntityName(root="test_view"),
fullyQualifiedName=FullyQualifiedEntityName(root="test_service.test_db.test_schema.test_view"),
serviceType=DatabaseServiceType.Postgres,
columns=[], # Add required columns field
)
self.table_entity_non_postgres = deepcopy(self.table_entity)
self.table_entity_non_postgres.serviceType = DatabaseServiceType.Mysql
# Create a mock source table entity for lineage tests
self.source_table_entity = Table(
id=Uuid(root=uuid.uuid4()),
name=EntityName(root="source_table"),
fullyQualifiedName=FullyQualifiedEntityName(root="test_service.test_db.test_schema.source_table"),
serviceType=DatabaseServiceType.Postgres,
columns=[],
)
# Non-postgres version of source table
self.source_table_entity_non_postgres = deepcopy(self.source_table_entity)
self.source_table_entity_non_postgres.serviceType = DatabaseServiceType.Mysql
# Create a mock TableView
self.table_view = TableView(
table_name="test_view",
schema_name="test_schema",
db_name="test_db",
view_definition="create view test_view as SELECT * FROM source_table",
)
def tearDown(self):
"""Clean up after each test"""
# Reset any module-level state if needed
pass # noqa: PIE790
def test_get_host_from_host_port(self):
"""Test get_host_from_host_port function"""
# Test with host:port format
self.assertEqual(get_host_from_host_port("localhost:9000"), "localhost")
self.assertEqual(get_host_from_host_port("127.0.0.1:5432"), "127.0.0.1")
self.assertEqual(get_host_from_host_port("example.com:8080"), "example.com")
# Test with host only (no port)
self.assertEqual(get_host_from_host_port("localhost"), "localhost")
self.assertEqual(get_host_from_host_port("example.com"), "example.com")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_success_with_lineage_parser(self, mock_fqn, mock_dialect_mapper):
"""Test successful view lineage generation when lineage parser has source and target tables"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
# Mock the search methods that get_lineage_by_query uses
# The es_search_from_fqn method will be called with a search string like "test_service.test_db.test_schema.source_table"
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "source_table" in fqn_search_string:
return [self.source_table_entity]
if "test_view" in fqn_search_string:
return [self.table_entity]
return []
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
# Also mock the get_by_name method that might be called
def mock_get_by_name(entity, fqn=None, **kwargs):
if "test_service.test_db.test_schema.test_view" in fqn:
return self.table_entity
return None
self.metadata.get_by_name = mock_get_by_name
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Assertions - check the actual lineage results
self.assertGreater(len(result), 0)
# Check that we have Either objects with Right values (successful results)
successful_results = [r.right for r in result if r.right]
self.assertGreater(len(successful_results), 0)
# Verify the lineage has correct source and target
for lineage_request in successful_results:
# Check the from and to entities exist
self.assertIsNotNone(lineage_request.edge.fromEntity)
self.assertIsNotNone(lineage_request.edge.toEntity)
# Check that the IDs match our expected entities
self.assertEqual(
lineage_request.edge.fromEntity.id.root,
self.source_table_entity.id.root,
)
self.assertEqual(lineage_request.edge.toEntity.id.root, self.table_entity.id.root)
# Verify mocks were called correctly
mock_fqn.build.assert_called_once()
mock_dialect_mapper.dialect_of.assert_called_once_with(self.connection_type)
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_success_with_fallback(self, mock_fqn, mock_dialect_mapper):
"""Test successful view lineage generation when lineage parser has source and target tables"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
# Mock the search methods that get_lineage_by_query uses
# The es_search_from_fqn method will be called with a search string like "test_service.test_db.test_schema.source_table"
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "public.source_table" in fqn_search_string:
return []
if "source_table" in fqn_search_string:
return [self.source_table_entity]
if "test_view" in fqn_search_string:
return [self.table_entity]
return []
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
# Also mock the get_by_name method that might be called
def mock_get_by_name(entity, fqn=None, **kwargs):
if "test_service.test_db.test_schema.test_view" in fqn:
return self.table_entity
return None
self.metadata.get_by_name = mock_get_by_name
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Assertions - should get lineage results since we have source and target tables
self.assertGreater(len(result), 0)
# Check that we have Either objects with Right values (successful results)
successful_results = [r.right for r in result if r.right]
self.assertGreater(len(successful_results), 0)
# Verify the lineage has correct source and target
for lineage_request in successful_results:
# Check the from and to entities exist
self.assertIsNotNone(lineage_request.edge.fromEntity)
self.assertIsNotNone(lineage_request.edge.toEntity)
# Check that the IDs match our expected entities
self.assertEqual(
lineage_request.edge.fromEntity.id.root,
self.source_table_entity.id.root,
)
self.assertEqual(lineage_request.edge.toEntity.id.root, self.table_entity.id.root)
# Verify mocks were called correctly
mock_fqn.build.assert_called_once()
mock_dialect_mapper.dialect_of.assert_called_once_with(self.connection_type)
@patch("metadata.utils.db_utils.get_lineage_via_table_entity")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_success_with_table_entity(
self,
mock_fqn,
mock_lineage_parser_class,
mock_dialect_mapper,
mock_get_lineage_via_table_entity,
):
"""Test successful view lineage generation when lineage parser has no source/target tables"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = [] # No source tables
mock_lineage_parser.target_tables = [] # No target tables
mock_lineage_parser_class.return_value = mock_lineage_parser
# Create a valid AddLineageRequest for the mock
valid_lineage_request = AddLineageRequest(
edge=EntitiesEdge(
fromEntity=EntityReference(
id=self.source_table_entity.id.root,
type="table",
),
toEntity=EntityReference(
id=self.table_entity.id.root,
type="table",
),
)
)
mock_get_lineage_via_table_entity.return_value = [Either(right=valid_lineage_request)]
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Assertions
self.assertEqual(len(result), 1)
self.assertIsInstance(result[0], Either)
self.assertIsNotNone(result[0].right)
# Verify mocks were called correctly
mock_fqn.build.assert_called_once()
self.metadata.get_by_name.assert_called_once()
mock_dialect_mapper.dialect_of.assert_called_once_with(self.connection_type)
mock_lineage_parser_class.assert_called_once()
mock_get_lineage_via_table_entity.assert_called_once()
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_postgres_schema_fallback(self, mock_fqn, mock_dialect_mapper):
"""Test that Postgres views use public schema fallback"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
# Reset metadata mocks to prevent interference from other tests
self.metadata.reset_mock()
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
# Mock metadata search methods
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "source_table" in fqn_search_string:
return [self.source_table_entity]
if "test_view" in fqn_search_string:
return [self.table_entity]
return []
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
def mock_get_by_name(entity, fqn=None, **kwargs):
if "test_service.test_db.test_schema.test_view" in fqn:
return self.table_entity
return None
self.metadata.get_by_name = mock_get_by_name
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Test passes if we get lineage results back
# The actual lineage processing happens internally with LineageParser
self.assertGreater(len(result), 0)
# Check that we have Either objects with Right values (successful results)
successful_results = [r.right for r in result if r.right]
self.assertGreater(len(successful_results), 0)
# Verify the lineage has correct source and target
for lineage_request in successful_results:
# Check the from and to entities exist
self.assertIsNotNone(lineage_request.edge.fromEntity)
self.assertIsNotNone(lineage_request.edge.toEntity)
# Check that the IDs match our expected entities
self.assertEqual(
lineage_request.edge.fromEntity.id.root,
self.source_table_entity.id.root,
)
self.assertEqual(lineage_request.edge.toEntity.id.root, self.table_entity.id.root)
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_no_view_definition(self, mock_fqn):
"""Test handling when view definition is not available"""
# Setup mock
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
# Create TableView without view definition
table_view_no_definition = TableView(
table_name="test_view",
schema_name="test_schema",
db_name="test_db",
view_definition=None,
)
# Execute function
result = list(
get_view_lineage(
view=table_view_no_definition,
metadata=self.metadata,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Assertions
self.assertEqual(len(result), 0)
@patch("metadata.utils.db_utils.get_lineage_by_query")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_exception_handling(
self,
mock_fqn,
mock_lineage_parser_class,
mock_dialect_mapper,
mock_get_lineage_by_query,
):
"""Test exception handling during lineage generation"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
# Make LineageParser raise an exception
mock_lineage_parser_class.side_effect = Exception("Test exception")
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Assertions
self.assertEqual(len(result), 0)
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_non_postgres_service(self, mock_fqn, mock_dialect_mapper):
"""Test view lineage for non-Postgres services"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
# Reset metadata mocks to prevent interference from other tests
metadata = MagicMock()
mock_dialect_mapper.dialect_of.return_value = Dialect.MYSQL
# Mock metadata search methods that get_lineage_by_query will call
table_view = TableView(
table_name="test_view",
schema_name="test_schema",
db_name="test_db",
view_definition="create view test_view as SELECT * FROM source_table",
)
# Mock the search methods that get_lineage_by_query uses
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "source_table" in fqn_search_string:
return [self.source_table_entity_non_postgres]
if "test_view" in fqn_search_string:
return [self.table_entity_non_postgres]
return []
metadata.es_search_from_fqn = mock_es_search_from_fqn
def mock_get_by_name(entity, fqn=None, **kwargs):
if "test_service.test_db.test_schema.test_view" in fqn:
return self.table_entity_non_postgres
return None
metadata.get_by_name = mock_get_by_name
# Execute function with mysql connection type
result = list(
get_view_lineage(
view=table_view,
metadata=metadata,
service_names="test_service",
connection_type="mysql",
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Assertions - should get lineage results since we have source and target tables
self.assertGreater(len(result), 0)
# Check that we have Either objects with Right values (successful results)
successful_results = [r.right for r in result if r.right]
self.assertGreater(len(successful_results), 0)
# Verify the lineage has correct source and target
for lineage_request in successful_results:
# Check the from and to entities exist
self.assertIsNotNone(lineage_request.edge.fromEntity)
self.assertIsNotNone(lineage_request.edge.toEntity)
# Check that the IDs match our expected entities
self.assertEqual(
lineage_request.edge.fromEntity.id.root,
self.source_table_entity_non_postgres.id.root,
)
self.assertEqual(
lineage_request.edge.toEntity.id.root,
self.table_entity_non_postgres.id.root,
)
@patch("metadata.utils.db_utils.get_lineage_by_query")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_empty_lineage_results(
self,
mock_fqn,
mock_lineage_parser_class,
mock_dialect_mapper,
mock_get_lineage_by_query,
):
"""Test handling when lineage functions return empty results"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = ["source_table"]
mock_lineage_parser.target_tables = ["test_view"]
mock_lineage_parser_class.return_value = mock_lineage_parser
# Return empty results
mock_get_lineage_by_query.return_value = []
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Assertions
self.assertEqual(len(result), 0)
@patch("metadata.utils.db_utils.get_lineage_by_query")
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.LineageParser")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_none_lineage_results(
self,
mock_fqn,
mock_lineage_parser_class,
mock_dialect_mapper,
mock_get_lineage_by_query,
):
"""Test handling when lineage functions return None"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
mock_lineage_parser = MagicMock()
mock_lineage_parser.source_tables = ["source_table"]
mock_lineage_parser.target_tables = ["test_view"]
mock_lineage_parser_class.return_value = mock_lineage_parser
# Return None
mock_get_lineage_by_query.return_value = None
# Execute function
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Assertions
self.assertEqual(len(result), 0)
def test_get_view_lineage_invalid_connection_type(self):
"""Test handling of invalid connection type"""
# Execute function with invalid connection type
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_names=self.service_name,
connection_type=123, # Invalid type
timeout_seconds=self.timeout_seconds,
parser_type=QueryParserType.Auto,
)
)
# Should handle gracefully and return empty result
self.assertEqual(len(result), 0)
@patch("metadata.utils.db_utils.ConnectionTypeDialectMapper")
@patch("metadata.utils.db_utils.fqn")
def test_get_view_lineage_custom_timeout(self, mock_fqn, mock_dialect_mapper):
"""Test that custom timeout is passed correctly"""
# Setup mocks
mock_fqn.build.return_value = "test_service.test_db.test_schema.test_view"
self.metadata.get_by_name.return_value = self.table_entity
mock_dialect_mapper.dialect_of.return_value = Dialect.POSTGRES
# Mock metadata search methods that get_lineage_by_query will call
def mock_es_search_from_fqn(entity_type, fqn_search_string, **kwargs):
if "source_table" in fqn_search_string:
return [self.source_table_entity]
return []
self.metadata.es_search_from_fqn = mock_es_search_from_fqn
custom_timeout = 60
# Execute function with custom timeout
result = list(
get_view_lineage(
view=self.table_view,
metadata=self.metadata,
service_names=self.service_name,
connection_type=self.connection_type,
timeout_seconds=custom_timeout,
parser_type=QueryParserType.Auto,
)
)
# Test passes if we get lineage results back
# Custom timeout is used internally by LineageParser
self.assertGreater(len(result), 0)
# Check that we have Either objects with Right values (successful results)
successful_results = [r.right for r in result if r.right]
self.assertGreater(len(successful_results), 0)
# Verify the lineage has correct source and target
for lineage_request in successful_results:
# Check the from and to entities exist
self.assertIsNotNone(lineage_request.edge.fromEntity)
self.assertIsNotNone(lineage_request.edge.toEntity)
# Check that the IDs match our expected entities
self.assertEqual(
lineage_request.edge.fromEntity.id.root,
self.source_table_entity.id.root,
)
self.assertEqual(
lineage_request.edge.toEntity.id.root,
self.table_entity.id.root,
)