OpenMetadata/ingestion/tests/unit/utils/test_source_hash.py
IceS2 5009059441
chore(ingestion): migrate to ruff for format + isort + unused-import (#27739)
* chore(ingestion): replace black/isort/pycln with ruff

- Swap formatter + import-sorter + unused-import tooling for ruff
  (line-length 120, target py3.10) in ingestion + openmetadata-airflow-apis
- Drop dead [tool.mypy] config; basedpyright is the active type checker
- Bump requires-python to >=3.10 to match noxfile and CLAUDE.md (3.9 is
  documented as broken on Mac in noxfile.py)
- Bump pre-commit-hooks v2.3 -> v5.0; the new check-json catches four
  pre-existing JSON issues now excluded with an inline TODO
- Update Makefile py_format / py_format_check targets to call ruff

* chore(ingestion): grandfather ruff lint violations and apply ruff format

- 253 noqa markers added via 'ruff check --add-noqa' across 128 files,
  freezing existing violations so this PR is a tooling-only swap. Per-rule
  cleanup tracked in the TODO comment in ingestion/pyproject.toml.
- Bulk reformat from black 22.3 -> ruff format @ line-length 120.
  Cosmetic only: imports balanced (-32/+32), structural keywords balanced
  (-2221/+2221), no logic changes.
- Star-import rules (F403/F405) globally ignored; refactoring wildcard
  imports across connectors is a separate effort.

* chore(ingestion): fix pylint findings surfaced after ruff format

- filters.py: drop redundant parens around re.match(...) in `if`
  (C0325 superfluous-parens) — exposed when ruff format unwrapped them
- nosql_adaptor.py: move `# pylint: disable=unused-argument` from the
  `column:` line to the `def` line so it covers `table` too (W0613) —
  scope was line-based, lost when ruff split params onto multiple lines
- action1xx.py: replace `arguments-differ` with `signature-differs` in
  the disable directive (was always wrong code) and drop the now-useless
  `unused-argument` suppression (I0021)

* fix(ingestion): make ruff extend-exclude robust to multi-root invocations

CI's `make py_format_check` runs from the repo root and passes both
`ingestion/` and `./openmetadata-airflow-apis/` to ruff in a single
invocation. With multiple root paths, ruff's parallel file discovery
races on extend-exclude matching against the project root, so files
under `ingestion/src/metadata/generated/` were intermittently scanned
and produced ~830 I001 violations.

20-run repro: 10/20 fail without the fix, 20/20 pass with the fix.

Each excluded directory now appears twice in extend-exclude:
- the project-root-relative pattern (cwd = ingestion/)
- the prefixed pattern (cwd = repo root, multi-root invocation)

* chore(ingestion): address gitar-bot findings + cross-version pylint disable

- openmetadata-airflow-apis/pyproject.toml: switch coverage to module-name
  source + [tool.coverage.paths] glob remap (matches the ingestion pattern).
  Drops the hardcoded `env/lib/python3.9/site-packages/...` source path,
  which broke after the requires-python bump to 3.10. (Finding 1)
- ingestion/setup.py: remove dead python_version<'3.9' / >='3.9' guards on
  mysql-connector-python and testcontainers; promote locust to a regular
  test dep (was conditionally added under sys.version_info >= (3, 9)). Also
  remove the now-unused `import sys`. (Finding 3)
- ingestion/src/metadata/great_expectations/action1xx.py: cover both
  arguments-differ (great_expectations 0.18.x parent) and signature-differs
  (great_expectations 1.x parent) in the pylint disable comment, since
  CI installs 0.18.x and local often has 1.x. unused-argument covers the
  unused action_context. The opposite rule fires as I0021 useless-suppression
  on each environment, which is informational and does not affect pylint's
  exit code.
2026-04-27 10:05:28 +02:00

489 lines
18 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test source hash stability and normalization
"""
import uuid
from metadata.generated.schema.api.data.createTable import CreateTableRequest
from metadata.generated.schema.entity.data.table import (
Column,
ConstraintType,
DataType,
TableConstraint,
)
from metadata.generated.schema.type.entityReference import EntityReference
from metadata.generated.schema.type.tagLabel import (
LabelType,
State,
TagLabel,
TagSource,
)
from metadata.utils.source_hash import (
_get_column_sort_key,
_get_constraint_sort_key,
_get_entity_reference_sort_key,
_get_tag_sort_key,
_normalize_for_hash,
_normalize_whitespace,
_remove_volatile_fields,
_sort_columns,
generate_source_hash,
)
class TestNormalizeWhitespace:
def test_normalize_whitespace_none(self):
assert _normalize_whitespace(None) is None
def test_normalize_whitespace_simple(self):
assert _normalize_whitespace(" hello world ") == "hello world"
def test_normalize_whitespace_newlines(self):
text = """CREATE TABLE foo (
id INT,
name VARCHAR(100)
)"""
assert _normalize_whitespace(text) == "CREATE TABLE foo ( id INT, name VARCHAR(100) )"
def test_normalize_whitespace_tabs(self):
assert _normalize_whitespace("col1\t\tcol2\n\ncol3") == "col1 col2 col3"
class TestGetColumnSortKey:
def test_sort_key_with_ordinal(self):
col = {"name": "col_b", "ordinalPosition": 1}
assert _get_column_sort_key(col) == (1, "col_b")
def test_sort_key_without_ordinal(self):
col = {"name": "col_a"}
assert _get_column_sort_key(col) == (float("inf"), "col_a")
def test_sort_key_with_dict_name(self):
col = {"name": {"root": "col_c"}, "ordinalPosition": 2}
assert _get_column_sort_key(col) == (2, "col_c")
def test_sort_key_with_pydantic_v1_name(self):
col = {"name": {"__root__": "col_d"}, "ordinalPosition": 3}
assert _get_column_sort_key(col) == (3, "col_d")
class TestGetTagSortKey:
def test_tag_sort_key_string(self):
tag = {"tagFQN": "PII.Sensitive"}
assert _get_tag_sort_key(tag) == "PII.Sensitive"
def test_tag_sort_key_dict(self):
tag = {"tagFQN": {"root": "Classification.Tag"}}
assert _get_tag_sort_key(tag) == "Classification.Tag"
def test_tag_sort_key_missing(self):
tag = {}
assert _get_tag_sort_key(tag) == ""
class TestGetConstraintSortKey:
def test_constraint_sort_key(self):
constraint = {"constraintType": "PRIMARY_KEY", "columns": ["col_b", "col_a"]}
assert _get_constraint_sort_key(constraint) == ("PRIMARY_KEY", "col_a,col_b")
def test_constraint_sort_key_no_columns(self):
constraint = {"constraintType": "UNIQUE"}
assert _get_constraint_sort_key(constraint) == ("UNIQUE", "")
class TestGetEntityReferenceSortKey:
def test_entity_ref_sort_key_fqn(self):
ref = {"fullyQualifiedName": "team.user1", "name": "user1", "id": "123"}
assert _get_entity_reference_sort_key(ref) == "team.user1"
def test_entity_ref_sort_key_name(self):
ref = {"name": "user2", "id": "456"}
assert _get_entity_reference_sort_key(ref) == "user2"
def test_entity_ref_sort_key_id(self):
ref = {"id": "789"}
assert _get_entity_reference_sort_key(ref) == "789"
class TestRemoveVolatileFields:
def test_remove_href(self):
data = {"name": "test", "href": "http://example.com"}
result = _remove_volatile_fields(data)
assert result == {"name": "test"}
def test_remove_deleted(self):
data = {"id": "123", "deleted": True}
result = _remove_volatile_fields(data)
assert result == {"id": "123"}
def test_remove_inherited(self):
data = {"name": "owner", "inherited": False}
result = _remove_volatile_fields(data)
assert result == {"name": "owner"}
def test_remove_nested_volatile(self):
data = {"owners": [{"name": "user1", "href": "http://example.com/user1", "deleted": False}]}
result = _remove_volatile_fields(data)
assert result == {"owners": [{"name": "user1"}]}
def test_preserve_non_volatile(self):
data = {"name": "test", "description": "desc", "type": "user"}
result = _remove_volatile_fields(data)
assert result == data
class TestSortColumns:
def test_sort_by_ordinal(self):
columns = [
{"name": "col_c", "ordinalPosition": 3},
{"name": "col_a", "ordinalPosition": 1},
{"name": "col_b", "ordinalPosition": 2},
]
sorted_cols = _sort_columns(columns)
assert [c["name"] for c in sorted_cols] == ["col_a", "col_b", "col_c"]
def test_sort_by_name_when_no_ordinal(self):
columns = [
{"name": "zebra"},
{"name": "alpha"},
{"name": "beta"},
]
sorted_cols = _sort_columns(columns)
assert [c["name"] for c in sorted_cols] == ["alpha", "beta", "zebra"]
def test_sort_mixed_ordinal_and_name(self):
columns = [
{"name": "no_ordinal_b"},
{"name": "with_ordinal", "ordinalPosition": 1},
{"name": "no_ordinal_a"},
]
sorted_cols = _sort_columns(columns)
assert sorted_cols[0]["name"] == "with_ordinal"
assert sorted_cols[1]["name"] == "no_ordinal_a"
assert sorted_cols[2]["name"] == "no_ordinal_b"
def test_sort_nested_children(self):
columns = [
{
"name": "parent",
"ordinalPosition": 1,
"children": [
{"name": "child_b", "ordinalPosition": 2},
{"name": "child_a", "ordinalPosition": 1},
],
}
]
sorted_cols = _sort_columns(columns)
assert [c["name"] for c in sorted_cols[0]["children"]] == ["child_a", "child_b"]
def test_sort_column_tags(self):
columns = [
{
"name": "col1",
"ordinalPosition": 1,
"tags": [
{"tagFQN": "PII.Sensitive"},
{"tagFQN": "Classification.Email"},
],
}
]
sorted_cols = _sort_columns(columns)
assert sorted_cols[0]["tags"][0]["tagFQN"] == "Classification.Email"
assert sorted_cols[0]["tags"][1]["tagFQN"] == "PII.Sensitive"
def test_sort_string_columns(self):
"""Test backward compatibility with string column lists."""
columns = ["zebra", "alpha", "beta"]
sorted_cols = _sort_columns(columns)
assert sorted_cols == ["alpha", "beta", "zebra"]
def test_sort_empty_columns(self):
"""Test handling of empty column lists."""
assert _sort_columns([]) == []
assert _sort_columns(None) is None
class TestNormalizeForHash:
def test_normalize_columns(self):
data = {
"columns": [
{"name": "col_b", "ordinalPosition": 2},
{"name": "col_a", "ordinalPosition": 1},
]
}
result = _normalize_for_hash(data)
assert result["columns"][0]["name"] == "col_a"
assert result["columns"][1]["name"] == "col_b"
def test_normalize_tags(self):
data = {
"tags": [
{"tagFQN": "Tag.B"},
{"tagFQN": "Tag.A"},
]
}
result = _normalize_for_hash(data)
assert result["tags"][0]["tagFQN"] == "Tag.A"
assert result["tags"][1]["tagFQN"] == "Tag.B"
def test_normalize_constraints(self):
data = {
"tableConstraints": [
{"constraintType": "UNIQUE", "columns": ["col_a"]},
{"constraintType": "PRIMARY_KEY", "columns": ["id"]},
]
}
result = _normalize_for_hash(data)
assert result["tableConstraints"][0]["constraintType"] == "PRIMARY_KEY"
assert result["tableConstraints"][1]["constraintType"] == "UNIQUE"
def test_normalize_owners(self):
data = {
"owners": [
{"fullyQualifiedName": "team.user_b"},
{"fullyQualifiedName": "team.user_a"},
]
}
result = _normalize_for_hash(data)
assert result["owners"][0]["fullyQualifiedName"] == "team.user_a"
assert result["owners"][1]["fullyQualifiedName"] == "team.user_b"
def test_normalize_schema_definition(self):
data = {
"schemaDefinition": """
CREATE TABLE foo (
id INT
)
"""
}
result = _normalize_for_hash(data)
assert result["schemaDefinition"] == "CREATE TABLE foo ( id INT )"
def test_normalize_removes_volatile_fields(self):
data = {"owners": [{"name": "user1", "href": "http://example.com", "deleted": False}]}
result = _normalize_for_hash(data)
assert "href" not in result["owners"][0]
assert "deleted" not in result["owners"][0]
class TestGenerateSourceHash:
def test_hash_deterministic(self):
request = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[
Column(name="id", dataType=DataType.INT, ordinalPosition=1),
Column(name="name", dataType=DataType.VARCHAR, ordinalPosition=2),
],
)
hash1 = generate_source_hash(request)
hash2 = generate_source_hash(request)
assert hash1 == hash2
assert hash1 is not None
def test_hash_stable_with_column_order_variation(self):
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[
Column(name="id", dataType=DataType.INT, ordinalPosition=1),
Column(name="name", dataType=DataType.VARCHAR, ordinalPosition=2),
],
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[
Column(name="name", dataType=DataType.VARCHAR, ordinalPosition=2),
Column(name="id", dataType=DataType.INT, ordinalPosition=1),
],
)
assert generate_source_hash(request1) == generate_source_hash(request2)
def test_hash_stable_with_tag_order_variation(self):
tag_a = TagLabel(
tagFQN="Classification.TagA",
source=TagSource.Classification,
labelType=LabelType.Manual,
state=State.Confirmed,
)
tag_b = TagLabel(
tagFQN="Classification.TagB",
source=TagSource.Classification,
labelType=LabelType.Manual,
state=State.Confirmed,
)
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
tags=[tag_a, tag_b],
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
tags=[tag_b, tag_a],
)
assert generate_source_hash(request1) == generate_source_hash(request2)
def test_hash_stable_with_constraint_order_variation(self):
constraint_pk = TableConstraint(constraintType=ConstraintType.PRIMARY_KEY, columns=["id"])
constraint_unique = TableConstraint(constraintType=ConstraintType.UNIQUE, columns=["name"])
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[
Column(name="id", dataType=DataType.INT),
Column(name="name", dataType=DataType.VARCHAR),
],
tableConstraints=[constraint_pk, constraint_unique],
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[
Column(name="id", dataType=DataType.INT),
Column(name="name", dataType=DataType.VARCHAR),
],
tableConstraints=[constraint_unique, constraint_pk],
)
assert generate_source_hash(request1) == generate_source_hash(request2)
def test_hash_stable_with_owner_order_variation(self):
owner1 = EntityReference(id=uuid.uuid4(), type="user", fullyQualifiedName="team.user_a")
owner2 = EntityReference(id=uuid.uuid4(), type="user", fullyQualifiedName="team.user_b")
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
owners=[owner1, owner2],
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
owners=[owner2, owner1],
)
assert generate_source_hash(request1) == generate_source_hash(request2)
def test_hash_stable_with_volatile_owner_fields(self):
owner_id = uuid.uuid4()
owner1 = EntityReference(
id=owner_id,
type="user",
fullyQualifiedName="team.user1",
href="http://localhost:8585/api/v1/users/123",
deleted=False,
)
owner2 = EntityReference(
id=owner_id,
type="user",
fullyQualifiedName="team.user1",
href="http://production:8585/api/v1/users/123",
deleted=True,
)
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
owners=[owner1],
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
owners=[owner2],
)
assert generate_source_hash(request1) == generate_source_hash(request2)
def test_hash_stable_with_schema_definition_whitespace(self):
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
schemaDefinition="CREATE TABLE test_table ( id INT )",
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
schemaDefinition="""
CREATE TABLE test_table (
id INT
)
""",
)
assert generate_source_hash(request1) == generate_source_hash(request2)
def test_hash_changes_with_actual_data_change(self):
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.BIGINT)],
)
assert generate_source_hash(request1) != generate_source_hash(request2)
def test_hash_changes_with_new_column(self):
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[
Column(name="id", dataType=DataType.INT),
Column(name="name", dataType=DataType.VARCHAR),
],
)
assert generate_source_hash(request1) != generate_source_hash(request2)
def test_hash_excludes_source_hash_field(self):
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
sourceHash="abc123",
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
sourceHash="xyz789",
)
assert generate_source_hash(request1) == generate_source_hash(request2)
def test_hash_with_custom_exclude_fields(self):
request1 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
description="Description 1",
)
request2 = CreateTableRequest(
name="test_table",
databaseSchema="service.db.schema",
columns=[Column(name="id", dataType=DataType.INT)],
description="Description 2",
)
hash_without_exclude = generate_source_hash(request1)
hash_with_exclude = generate_source_hash(request1, exclude_fields={"description": True})
assert hash_without_exclude != generate_source_hash(request2)
assert hash_with_exclude == generate_source_hash(request2, exclude_fields={"description": True})