OpenMetadata/ingestion/tests/integration/postgres/test_data_quality.py
IceS2 5009059441
chore(ingestion): migrate to ruff for format + isort + unused-import (#27739)
* chore(ingestion): replace black/isort/pycln with ruff

- Swap formatter + import-sorter + unused-import tooling for ruff
  (line-length 120, target py3.10) in ingestion + openmetadata-airflow-apis
- Drop dead [tool.mypy] config; basedpyright is the active type checker
- Bump requires-python to >=3.10 to match noxfile and CLAUDE.md (3.9 is
  documented as broken on Mac in noxfile.py)
- Bump pre-commit-hooks v2.3 -> v5.0; the new check-json catches four
  pre-existing JSON issues now excluded with an inline TODO
- Update Makefile py_format / py_format_check targets to call ruff

* chore(ingestion): grandfather ruff lint violations and apply ruff format

- 253 noqa markers added via 'ruff check --add-noqa' across 128 files,
  freezing existing violations so this PR is a tooling-only swap. Per-rule
  cleanup tracked in the TODO comment in ingestion/pyproject.toml.
- Bulk reformat from black 22.3 -> ruff format @ line-length 120.
  Cosmetic only: imports balanced (-32/+32), structural keywords balanced
  (-2221/+2221), no logic changes.
- Star-import rules (F403/F405) globally ignored; refactoring wildcard
  imports across connectors is a separate effort.

* chore(ingestion): fix pylint findings surfaced after ruff format

- filters.py: drop redundant parens around re.match(...) in `if`
  (C0325 superfluous-parens) — exposed when ruff format unwrapped them
- nosql_adaptor.py: move `# pylint: disable=unused-argument` from the
  `column:` line to the `def` line so it covers `table` too (W0613) —
  scope was line-based, lost when ruff split params onto multiple lines
- action1xx.py: replace `arguments-differ` with `signature-differs` in
  the disable directive (was always wrong code) and drop the now-useless
  `unused-argument` suppression (I0021)

* fix(ingestion): make ruff extend-exclude robust to multi-root invocations

CI's `make py_format_check` runs from the repo root and passes both
`ingestion/` and `./openmetadata-airflow-apis/` to ruff in a single
invocation. With multiple root paths, ruff's parallel file discovery
races on extend-exclude matching against the project root, so files
under `ingestion/src/metadata/generated/` were intermittently scanned
and produced ~830 I001 violations.

20-run repro: 10/20 fail without the fix, 20/20 pass with the fix.

Each excluded directory now appears twice in extend-exclude:
- the project-root-relative pattern (cwd = ingestion/)
- the prefixed pattern (cwd = repo root, multi-root invocation)

* chore(ingestion): address gitar-bot findings + cross-version pylint disable

- openmetadata-airflow-apis/pyproject.toml: switch coverage to module-name
  source + [tool.coverage.paths] glob remap (matches the ingestion pattern).
  Drops the hardcoded `env/lib/python3.9/site-packages/...` source path,
  which broke after the requires-python bump to 3.10. (Finding 1)
- ingestion/setup.py: remove dead python_version<'3.9' / >='3.9' guards on
  mysql-connector-python and testcontainers; promote locust to a regular
  test dep (was conditionally added under sys.version_info >= (3, 9)). Also
  remove the now-unused `import sys`. (Finding 3)
- ingestion/src/metadata/great_expectations/action1xx.py: cover both
  arguments-differ (great_expectations 0.18.x parent) and signature-differs
  (great_expectations 1.x parent) in the pylint disable comment, since
  CI installs 0.18.x and local often has 1.x. unused-argument covers the
  unused action_context. The opposite rule fires as I0021 useless-suppression
  on each environment, which is informational and does not affect pylint's
  exit code.
2026-04-27 10:05:28 +02:00

437 lines
18 KiB
Python

# Test cases for data quality workflow
from dataclasses import dataclass
import pytest
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
ServiceConnections,
TestSuiteConfigType,
TestSuitePipeline,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
Processor,
Sink,
Source,
SourceConfig,
WorkflowConfig,
)
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCase
from metadata.generated.schema.tests.testSuite import TestSuite
from metadata.generated.schema.type.basic import ComponentConfig
from metadata.ingestion.api.status import TruncatedStackTraceError
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_quality import TestSuiteWorkflow
from metadata.workflow.metadata import MetadataWorkflow
# Column-level test cases need the column name in the FQN: {table}.{column}.{test_case}
# Table-level test cases use: {table}.{test_case}
_COLUMN_TEST_CASES = {
"first_name_includes_tom_and_jerry_wo_enum": "first_name",
"first_name_includes_tom_and_jerry": "first_name",
"first_name_is_tom_or_jerry": "first_name",
"id_no_bounds": "customer_id",
"column_values_not_match_regex": "email",
}
@pytest.fixture(scope="module")
def run_data_quality_workflow(
run_workflow,
ingestion_config,
db_service: DatabaseService,
metadata: OpenMetadata,
sink_config,
workflow_config,
patch_passwords_for_db_services,
):
run_workflow(MetadataWorkflow, ingestion_config)
test_suite_name = f"MyTestSuite_{db_service.name.root}"
test_suite_config = OpenMetadataWorkflowConfig(
source=Source(
type="postgres",
serviceName=test_suite_name,
sourceConfig=SourceConfig(
config=TestSuitePipeline(
type=TestSuiteConfigType.TestSuite,
entityFullyQualifiedName=f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer",
serviceConnections=[
ServiceConnections(
serviceName=db_service.name.root,
serviceConnection=db_service.connection,
)
],
)
),
),
processor=Processor(
type="orm-test-runner",
config=ComponentConfig(
{
"testCases": [
{
"name": "first_name_includes_tom_and_jerry_wo_enum",
"testDefinitionName": "columnValuesToBeInSet",
"columnName": "first_name",
"parameterValues": [{"name": "allowedValues", "value": "['Tom', 'Jerry']"}],
"computePassedFailedRowCount": True,
},
{
"name": "first_name_includes_tom_and_jerry",
"testDefinitionName": "columnValuesToBeInSet",
"columnName": "first_name",
"parameterValues": [
{"name": "allowedValues", "value": "['Tom', 'Jerry']"},
{"name": "matchEnum", "value": "false"},
],
},
{
"name": "first_name_is_tom_or_jerry",
"testDefinitionName": "columnValuesToBeInSet",
"columnName": "first_name",
"parameterValues": [
{"name": "allowedValues", "value": "['Tom', 'Jerry']"},
{"name": "matchEnum", "value": "true"},
],
},
{
"name": "id_no_bounds",
"testDefinitionName": "columnValuesToBeBetween",
"columnName": "customer_id",
"parameterValues": [],
},
{
"name": "column_values_not_match_regex",
"testDefinitionName": "columnValuesToNotMatchRegex",
"columnName": "email",
"parameterValues": [{"name": "forbiddenRegex", "value": ".*@example\\.com$"}],
},
{
"name": "table_column_count_between",
"testDefinitionName": "tableColumnCountToBeBetween",
"parameterValues": [
{"name": "minColValue", "value": "8"},
{"name": "maxColValue", "value": "12"},
],
},
{
"name": "table_column_count_equal",
"testDefinitionName": "tableColumnCountToEqual",
"parameterValues": [{"name": "columnCount", "value": "11"}],
},
{
"name": "table_column_name_exists",
"testDefinitionName": "tableColumnNameToExist",
"parameterValues": [{"name": "columnName", "value": "customer_id"}],
},
{
"name": "table_column_names_match_set",
"testDefinitionName": "tableColumnToMatchSet",
"parameterValues": [
{
"name": "columnNames",
"value": "customer_id, store_id, first_name, last_name, email, address_id, activebool, create_date, last_update, active, json_field",
},
{"name": "ordered", "value": "false"},
],
},
{
"name": "custom_sql_query_count",
"testDefinitionName": "tableCustomSQLQuery",
"parameterValues": [
{
"name": "sqlExpression",
"value": "SELECT CASE WHEN COUNT(*) > 0 THEN 0 ELSE 1 END FROM customer WHERE active = 1",
},
{"name": "strategy", "value": "COUNT"},
{"name": "threshold", "value": "0"},
],
},
{
"name": "custom_sql_query_rows",
"testDefinitionName": "tableCustomSQLQuery",
"parameterValues": [
{
"name": "sqlExpression",
"value": "SELECT * FROM customer WHERE active = 1",
},
{"name": "strategy", "value": "ROWS"},
{"name": "threshold", "value": "10"},
],
},
{
"name": "table_row_count_between",
"testDefinitionName": "tableRowCountToBeBetween",
"parameterValues": [
{"name": "minValue", "value": "100"},
{"name": "maxValue", "value": "1000"},
],
},
{
"name": "table_row_count_equal",
"testDefinitionName": "tableRowCountToEqual",
"parameterValues": [{"name": "value", "value": "599"}],
},
{
"name": "table_row_inserted_count_between_fail",
"testDefinitionName": "tableRowInsertedCountToBeBetween",
"parameterValues": [
{"name": "min", "value": "10"},
{"name": "max", "value": "50"},
{"name": "columnName", "value": "create_date"},
{"name": "rangeType", "value": "DAY"},
{"name": "rangeInterval", "value": "1"},
],
},
{
"name": "table_row_inserted_count_between_success",
"testDefinitionName": "tableRowInsertedCountToBeBetween",
"parameterValues": [
{"name": "min", "value": "590"},
{"name": "max", "value": "600"},
{"name": "columnName", "value": "last_update"},
{"name": "rangeType", "value": "YEAR"},
{"name": "rangeInterval", "value": "50"},
],
},
],
}
),
),
sink=Sink.model_validate(sink_config),
workflowConfig=WorkflowConfig.model_validate(workflow_config),
)
test_suite_processor = TestSuiteWorkflow.create(test_suite_config)
test_suite_processor.execute()
test_suite_processor.raise_from_status()
yield
test_suite: TestSuite = metadata.get_by_name(TestSuite, test_suite_name, nullable=True)
if test_suite:
metadata.delete(TestSuite, test_suite.id, recursive=True, hard_delete=True)
@pytest.mark.parametrize(
"test_case_name,expected_status",
[
(
"first_name_includes_tom_and_jerry_wo_enum",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Success,
passedRows=2,
failedRows=597,
),
),
(
"first_name_includes_tom_and_jerry",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"first_name_is_tom_or_jerry",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Failed),
),
(
"id_no_bounds",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"column_values_not_match_regex",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"table_column_count_between",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Success,
testResultValue=[TestResultValue(name="columnCount", value="11")],
),
),
(
"table_column_count_equal",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Success,
testResultValue=[TestResultValue(name="columnCount", value="11")],
),
),
(
"table_column_name_exists",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"table_column_names_match_set",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"custom_sql_query_count",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"custom_sql_query_rows",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Failed,
testResultValues=[{"name": "resultRowCount", "value": "599"}],
),
),
(
"table_row_count_between",
TestCaseResult(
timestamp=0,
testCaseStatus=TestCaseStatus.Success,
testResultValue=[TestResultValue(name="rowCount", value="599")],
),
),
(
"table_row_count_equal",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
(
"table_row_inserted_count_between_fail",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Failed),
),
(
"table_row_inserted_count_between_success",
TestCaseResult(timestamp=0, testCaseStatus=TestCaseStatus.Success),
),
],
ids=lambda *x: x[0],
)
def test_data_quality(
run_data_quality_workflow,
metadata: OpenMetadata,
test_case_name,
expected_status,
db_service,
):
table_fqn = f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer"
col = _COLUMN_TEST_CASES.get(test_case_name)
fqn = f"{table_fqn}.{col}.{test_case_name}" if col else f"{table_fqn}.{test_case_name}"
test_case: TestCase = metadata.get_by_name(TestCase, fqn, fields=["*"], nullable=False)
assert_equal_pydantic_objects(
expected_status.model_copy(update={"timestamp": test_case.testCaseResult.timestamp}),
test_case.testCaseResult,
)
@pytest.fixture()
def get_incompatible_column_type_config(workflow_config, sink_config, db_service):
def inner(entity_fqn: str, incompatible_test_case: TestCaseDefinition):
return {
"source": {
"type": "postgres",
"serviceName": f"MyTestSuite_{db_service.name.root}",
"sourceConfig": {
"config": {
"type": "TestSuite",
"entityFullyQualifiedName": entity_fqn,
}
},
},
"processor": {
"type": "orm-test-runner",
"config": {
"testCases": [
incompatible_test_case.model_dump(),
{
"name": "compatible_test",
"testDefinitionName": "columnValueMaxToBeBetween",
"columnName": "customer_id",
"parameterValues": [
{"name": "minValueForMaxInCol", "value": "0"},
{"name": "maxValueForMaxInCol", "value": "10"},
],
},
]
},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
return inner
@dataclass
class IncompatibleTypeParameter:
entity_fqn: str
test_case: TestCaseDefinition
expected_failure: TruncatedStackTraceError
@pytest.fixture(
params=[
IncompatibleTypeParameter(
entity_fqn="{database_service}.dvdrental.public.customer",
test_case=TestCaseDefinition(
name="string_max_between",
testDefinitionName="columnValueMaxToBeBetween",
columnName="first_name",
parameterValues=[
{"name": "minValueForMaxInCol", "value": "0"},
{"name": "maxValueForMaxInCol", "value": "10"},
],
),
expected_failure=TruncatedStackTraceError(
name="Incompatible Column for Test Case",
error="Test case string_max_between of type columnValueMaxToBeBetween "
"is not compatible with column first_name of type VARCHAR",
),
),
IncompatibleTypeParameter(
entity_fqn="{database_service}.dvdrental.public.customer",
test_case=TestCaseDefinition(
name="unique_json_column",
testDefinitionName="columnValuesToBeUnique",
columnName="json_field",
),
expected_failure=TruncatedStackTraceError(
name="Incompatible Column for Test Case",
error="Test case unique_json_column of type columnValuesToBeUnique "
"is not compatible with column json_field of type JSON",
),
),
],
ids=lambda x: x.test_case.name,
)
def parameters(request, db_service):
request.param.entity_fqn = request.param.entity_fqn.format(database_service=db_service.fullyQualifiedName.root)
return request.param
def test_incompatible_column_type(
parameters: IncompatibleTypeParameter,
patch_passwords_for_db_services,
run_workflow,
ingestion_config,
get_incompatible_column_type_config,
metadata: OpenMetadata,
db_service,
cleanup_fqns,
):
run_workflow(MetadataWorkflow, ingestion_config)
test_suite_processor = run_workflow(
TestSuiteWorkflow,
get_incompatible_column_type_config(parameters.entity_fqn, parameters.test_case),
raise_from_status=False,
)
cleanup_fqns(
TestCase,
f"{parameters.entity_fqn}.{parameters.test_case.columnName}.{parameters.test_case.name}",
)
assert_equal_pydantic_objects(
parameters.expected_failure,
test_suite_processor.steps[0].get_status().failures[0],
)
assert (
f"{db_service.fullyQualifiedName.root}.dvdrental.public.customer.customer_id.compatible_test"
in test_suite_processor.steps[1].get_status().records
), "Test case compatible_test should pass"