OpenMetadata/ingestion/tests/integration/postgres/test_profiler.py
IceS2 5009059441
chore(ingestion): migrate to ruff for format + isort + unused-import (#27739)
* chore(ingestion): replace black/isort/pycln with ruff

- Swap formatter + import-sorter + unused-import tooling for ruff
  (line-length 120, target py3.10) in ingestion + openmetadata-airflow-apis
- Drop dead [tool.mypy] config; basedpyright is the active type checker
- Bump requires-python to >=3.10 to match noxfile and CLAUDE.md (3.9 is
  documented as broken on Mac in noxfile.py)
- Bump pre-commit-hooks v2.3 -> v5.0; the new check-json catches four
  pre-existing JSON issues now excluded with an inline TODO
- Update Makefile py_format / py_format_check targets to call ruff

* chore(ingestion): grandfather ruff lint violations and apply ruff format

- 253 noqa markers added via 'ruff check --add-noqa' across 128 files,
  freezing existing violations so this PR is a tooling-only swap. Per-rule
  cleanup tracked in the TODO comment in ingestion/pyproject.toml.
- Bulk reformat from black 22.3 -> ruff format @ line-length 120.
  Cosmetic only: imports balanced (-32/+32), structural keywords balanced
  (-2221/+2221), no logic changes.
- Star-import rules (F403/F405) globally ignored; refactoring wildcard
  imports across connectors is a separate effort.

* chore(ingestion): fix pylint findings surfaced after ruff format

- filters.py: drop redundant parens around re.match(...) in `if`
  (C0325 superfluous-parens) — exposed when ruff format unwrapped them
- nosql_adaptor.py: move `# pylint: disable=unused-argument` from the
  `column:` line to the `def` line so it covers `table` too (W0613) —
  scope was line-based, lost when ruff split params onto multiple lines
- action1xx.py: replace `arguments-differ` with `signature-differs` in
  the disable directive (was always wrong code) and drop the now-useless
  `unused-argument` suppression (I0021)

* fix(ingestion): make ruff extend-exclude robust to multi-root invocations

CI's `make py_format_check` runs from the repo root and passes both
`ingestion/` and `./openmetadata-airflow-apis/` to ruff in a single
invocation. With multiple root paths, ruff's parallel file discovery
races on extend-exclude matching against the project root, so files
under `ingestion/src/metadata/generated/` were intermittently scanned
and produced ~830 I001 violations.

20-run repro: 10/20 fail without the fix, 20/20 pass with the fix.

Each excluded directory now appears twice in extend-exclude:
- the project-root-relative pattern (cwd = ingestion/)
- the prefixed pattern (cwd = repo root, multi-root invocation)

* chore(ingestion): address gitar-bot findings + cross-version pylint disable

- openmetadata-airflow-apis/pyproject.toml: switch coverage to module-name
  source + [tool.coverage.paths] glob remap (matches the ingestion pattern).
  Drops the hardcoded `env/lib/python3.9/site-packages/...` source path,
  which broke after the requires-python bump to 3.10. (Finding 1)
- ingestion/setup.py: remove dead python_version<'3.9' / >='3.9' guards on
  mysql-connector-python and testcontainers; promote locust to a regular
  test dep (was conditionally added under sys.version_info >= (3, 9)). Also
  remove the now-unused `import sys`. (Finding 3)
- ingestion/src/metadata/great_expectations/action1xx.py: cover both
  arguments-differ (great_expectations 0.18.x parent) and signature-differs
  (great_expectations 1.x parent) in the pylint disable comment, since
  CI installs 0.18.x and local often has 1.x. unused-argument covers the
  unused action_context. The opposite rule fires as I0021 useless-suppression
  on each environment, which is informational and does not affect pylint's
  exit code.
2026-04-27 10:05:28 +02:00

107 lines
3.9 KiB
Python

from copy import deepcopy
import pytest
from sqlalchemy import create_engine, text
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.generated.schema.entity.data.table import ColumnProfile
from metadata.ingestion.lineage.sql_lineage import search_cache
from metadata.workflow.metadata import MetadataWorkflow
from metadata.workflow.profiler import ProfilerWorkflow
@pytest.fixture(scope="module")
def prepare_postgres(postgres_container):
engine = create_engine(postgres_container.get_connection_url())
sql = [
"CREATE TABLE financial_transactions (id SERIAL PRIMARY KEY, amount MONEY);",
"INSERT INTO financial_transactions (amount) VALUES (100.00), (200.00), (300.00), (400.00), (500.00);",
]
with engine.begin() as conn:
for stmt in sql:
conn.execute(text(stmt))
@pytest.fixture(scope="module")
def run_profiler(
patch_passwords_for_db_services,
prepare_postgres,
run_workflow,
ingestion_config,
profiler_config,
):
search_cache.clear()
config = deepcopy(ingestion_config)
config["source"]["sourceConfig"]["config"]["schemaFilterPattern"] = {"excludes": ["information_schema"]}
run_workflow(MetadataWorkflow, config)
run_workflow(ProfilerWorkflow, profiler_config)
@pytest.mark.parametrize(
"table_fqn,expected_column_profiles",
[
[
"{service}.dvdrental.public.financial_transactions",
{
"id": ColumnProfile.model_validate(
{
"name": "id",
"timestamp": 1724343985740,
"valuesCount": 5.0,
"nullCount": 0.0,
"nullProportion": 0.0,
"uniqueCount": 5.0,
"uniqueProportion": 1.0,
"distinctCount": 5.0,
"distinctProportion": 1.0,
"min": 1.0,
"max": 5.0,
"mean": 3.0,
"sum": 15.0,
"stddev": 1.414213562373095,
"median": 3.0,
"firstQuartile": 2.0,
"thirdQuartile": 4.0,
"interQuartileRange": 2.0,
"nonParametricSkew": 0.0,
}
),
"amount": ColumnProfile.model_validate(
{
"name": "amount",
"timestamp": 1724343985743,
"valuesCount": 5.0,
"nullCount": 0.0,
"nullProportion": 0.0,
"uniqueCount": 5.0,
"uniqueProportion": 1.0,
"distinctCount": 5.0,
"distinctProportion": 1.0,
"min": "$100.00",
"max": "$500.00",
"mean": 300.0,
"sum": 1500.0,
"stddev": 141.4213562373095,
}
),
},
]
],
ids=lambda x: x.split(".")[-1] if isinstance(x, str) else "",
)
def test_profiler(
table_fqn,
expected_column_profiles,
db_service,
run_profiler,
metadata,
):
table = metadata.get_latest_table_profile(table_fqn.format(service=db_service.fullyQualifiedName.root))
for name, expected_profile in expected_column_profiles.items():
actual_column_profile = next(column for column in table.columns if column.name.root == name).profile
# the timestamp always changes so we equalize them to avoid comparison
actual_column_profile.timestamp = expected_profile.timestamp
assert_equal_pydantic_objects(
expected_profile,
actual_column_profile,
)