OpenMetadata/ingestion/tests/integration/data_quality/test_data_diff.py
IceS2 e9c87c6adb
chore(ingestion): drop pylint, expand ruff (#27774)
* chore(ingestion): drop pylint, expand ruff to Stage 2c

Replace pylint with a coherent ruff-only stack (Stage 2c of the modernize
roadmap). Pylint is dropped from dev deps and CI workflows; ruff selected
ruleset expanded to ~22 families covering style, bug catchers, hygiene,
and the pylint port (PLE/PLC/PLW/PLR with the noisy "too-many-X"
complexity caps + magic-value disabled).

What's selected (with rationale in pyproject.toml):
  E, W, F, I, N         — style + correctness baseline + naming
  UP                    — pyupgrade (py>=3.10 modernizations)
  B, C4, C90, RET, SIM, TRY  — bug catchers
  PIE, ICN, T20, TC, TID, PTH, PERF  — hygiene
  PLE, PLC, PLW, PLR    — pylint port (PLR complexity caps ignored)
  RUF                   — ruff-native (incl. RUF100 unused-noqa)

What's removed:
  - .pylintrc (root) — duplicate of the ingestion pylint config
  - [tool.pylint.*] block in ingestion/pyproject.toml (~140 lines)
  - ingestion/plugins/{print_checker,import_checker}.py + tests + README
    (replaced by built-in T20 + TID251 banned-api respectively)
  - pylint dep from ingestion/setup.py and openmetadata-airflow-apis/pyproject.toml
  - `make lint` Makefile target + the pylint invocation in py_format_check
  - dead pylint TODO comment + ignored test entry in noxfile.py

Cwd-stable config: ruff is invoked both from the repo root (pre-commit,
CI) and from ingestion/ (`make py_format_check`). The `src`,
`extend-exclude`, and per-file-ignores entries are listed twice — once
relative to ingestion/ and once with the `ingestion/` prefix — so
first-party isort detection and exclusions match in both invocations.

Grandfathering: ran `ruff check --add-noqa` once + format-stable
iteration. ~12,130 noqa directives across ~1,400 files. Cleanup is
deferred to follow-up PRs that drop noqas one rule at a time.

Documentation sweep: replaced `make lint` references in CLAUDE.md,
AGENTS.md, DEVELOPER.md, copilot-instructions, and 6 SKILL files with
the apply+verify shape `make py_format && make py_format_check`.
`make py_format` is NOT a strict superset of pylint — it only applies
auto-fixable violations; `make py_format_check` catches the rest.

Basedpyright baseline regenerated: ruff format reflowed multi-line
signatures in ~70 files, shifting type-error column positions. The
basedpyright baseline matches by (file path, error code, range), so
column shifts caused 19 entries to mis-align. Net diff is small
(154 lines in/out of the 13MB baseline.json) — purely positional.

Verified locally:
  - make py_format_check         → All checks passed
  - nox --no-venv -s static-checks → 0 errors, 0 warnings, 0 notes

* chore(ingestion): finish ruff swap — nox lint session + skill docs

Three remaining stale-tooling references after Stage 2c:

  - `ingestion/noxfile.py` `lint` session was still calling `black --check`,
    `isort --check-only`, `pycln --diff`. Those tools aren't installed
    anywhere (we dropped them from dev deps). Replace with the ruff
    equivalents that mirror `make py_format_check`.
  - `skills/standards/code_style.md`: stack listed as `black + isort +
    pycln`; line length claimed 88 (black default). Both wrong: stack is
    ruff, line length is 120.
  - `skills/connector-building/SKILL.md`: `make py_format` comment said
    `# black + isort + pycln`. Same swap.

* chore(ingestion): keep main's baseline + globally ignore TRY400

Per gitar-bot's review on PR #27774:

1. Main's PR #27728 promoted ~60 `logger.warning()` → `logger.error()`
   inside `except` blocks. Those changes landed on main with their own
   baseline updates. Our PR doesn't promote anything — the merge from
   origin/main brought those `error` calls along with their baseline
   entries.

   The bot interpreted the `# noqa: TRY400` we added next to those lines
   as us silencing the rule case-by-case. Cleaner: globally ignore
   TRY400 in pyproject.toml, with a comment explaining why the codebase's
   `logger.error(...)` + separate `logger.debug(traceback.format_exc())`
   pattern is intentional. Strip ~430 per-line `# noqa: TRY400` markers
   from source.

2. Document that `S101` in `per-file-ignores` is a forward-looking
   entry — flake8-bandit (`S`) is not yet selected, so the rule is
   no-op today; the entry stays so when `S` lands later, tests don't
   immediately error.

Reverts the platform pin and Linux Docker–generated baseline. Keep
main's baseline intact and let CI surface the exact column-shifted
entries; the team will decide whether to fix in-place (revert format
on affected files) or add per-line `# pyright: ignore` markers.

* chore(ingestion): regen baseline for new connector type debt

Main's baseline was stale relative to recently-added connectors
(McpConnection, CustomDriveConnection) that lack common attributes
like `hostPort`, `database`, `catalog` etc. — all sites that access
those attributes via the union-typed `serviceConnection.root.config`
fire `reportAttributeAccessIssue` errors that aren't baselined.

71 errors + 58 warnings absorbed. Local macOS regen; pushing to see
CI's drift count. Per the basedpyright-baseline-and-ci PR experience,
macOS↔Linux column drift on this size of regen has historically been
1-7 residuals.
2026-04-28 07:21:59 +02:00

686 lines
33 KiB
Python

from datetime import datetime # noqa: I001
import pytest
from dirty_equals import IsApprox, IsPositiveInt
from pydantic import BaseModel
from sqlalchemy import VARBINARY
from sqlalchemy import Column as SQAColumn
from sqlalchemy import MetaData
from sqlalchemy import Table as SQATable
from sqlalchemy import create_engine, text
from sqlalchemy.dialects import postgresql
from sqlalchemy.engine import Connection, make_url
from sqlalchemy.sql import sqltypes
from _openmetadata_testutils.postgres.conftest import postgres_container # noqa: F401
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
from metadata.data_quality.api.models import TestCaseDefinition
from metadata.generated.schema.entity.data.table import Table, TableProfilerConfig
from metadata.generated.schema.entity.services.databaseService import DatabaseService
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
TestSuiteConfigType,
)
from metadata.generated.schema.tests.basic import (
TestCaseResult,
TestCaseStatus,
TestResultValue,
)
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
from metadata.generated.schema.type.samplingConfig import ProfileSampleConfig
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.workflow.data_quality import TestSuiteWorkflow
class TestParameters(BaseModel):
test_case_defintion: TestCaseDefinition
table2_fqn: str
expected: TestCaseResult
table_profile_config: TableProfilerConfig = None
def __init__(self, *args, **kwargs):
if args:
# Map positional arguments to fields
field_names = list(self.__annotations__.keys())
kwargs.update(dict(zip(field_names, args))) # noqa: B905
super().__init__(**kwargs)
@pytest.mark.parametrize(
"parameters",
[
pytest.param(TestParameters(*t), id=t[0].name)
for t in [
(
TestCaseDefinition(
name="compare_same_tables",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(name="keyColumns", value="['customer_id']"),
],
),
"POSTGRES_SERVICE.dvdrental.public.customer",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Success,
failedRows=0,
passedRows=599,
),
),
(
TestCaseDefinition(
name="compare_same_tables_with_percentage_sample",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(name="keyColumns", value="['customer_id']"),
],
),
"POSTGRES_SERVICE.dvdrental.public.customer",
TestCaseResult.model_construct(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Success,
failedRows=0,
# we use approximations becuase the sampling is not deterministic
passedRows=IsApprox(59, delta=60) & IsPositiveInt,
),
TableProfilerConfig(
profileSampleConfig=ProfileSampleConfig(
sampleConfigType="STATIC",
config={
"profileSample": 10,
"profileSampleType": "PERCENTAGE",
},
),
),
),
(
TestCaseDefinition(
name="compare_same_tables_with_row_sample",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(name="keyColumns", value="['customer_id']"),
],
),
"POSTGRES_SERVICE.dvdrental.public.customer",
TestCaseResult.model_construct(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Success,
failedRows=0,
# we use approximations around the 99.5 confidence interval since the
# sampling in data diff uses hash based partitioning
passedRows=IsApprox(10, delta=15) & IsPositiveInt,
),
TableProfilerConfig(
profileSampleConfig=ProfileSampleConfig(
sampleConfigType="STATIC",
config={
"profileSample": 10,
"profileSampleType": "ROWS",
},
),
),
),
(
TestCaseDefinition(
name="with_explicit_key_columns",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(name="keyColumns", value="['customer_id']"),
],
),
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Failed,
failedRows=321,
passedRows=278,
),
),
(
TestCaseDefinition(
name="resolve_primary_keys",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[],
),
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Failed,
failedRows=321,
passedRows=278,
),
),
(
TestCaseDefinition(
name="with_passing_threshold",
testDefinitionName="tableDiff",
parameterValues=[
TestCaseParameterValue(name="threshold", value="322"),
],
),
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Success,
failedRows=321,
),
),
(
TestCaseDefinition(
name="with_failing_threshold",
testDefinitionName="tableDiff",
parameterValues=[
TestCaseParameterValue(name="threshold", value="321"),
],
),
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Failed,
failedRows=321,
),
),
(
TestCaseDefinition(
name="with_where_clause",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(
name="where",
value="MOD(customer_id, 2) != 0 AND MOD(customer_id, 13) != 0",
),
],
),
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Success,
),
),
(
TestCaseDefinition(
name="without_first_name",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[],
),
"POSTGRES_SERVICE.dvdrental.public.customer_without_first_name",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Failed,
failedRows=0,
passedRows=599,
testResultValue=[
TestResultValue(name="removedRows", value="0"),
TestResultValue(name="addedRows", value="0"),
TestResultValue(name="changedRows", value="0"),
TestResultValue(name="diffCount", value="0"),
TestResultValue(name="removedColumns", value="1"),
TestResultValue(name="addedColumns", value="0"),
TestResultValue(name="changedColumns", value="0"),
TestResultValue(
name="schemaTable1",
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
),
TestResultValue(
name="schemaTable2",
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer_without_first_name' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
),
],
),
),
(
TestCaseDefinition(
name="without_first_name_with_extra_column",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[TestCaseParameterValue(name="useColumns", value="['last_name', 'email']")],
),
"POSTGRES_SERVICE.dvdrental.public.customer_without_first_name",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Success,
),
),
(
TestCaseDefinition(
name="postgres_vs_mysql_success",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(
name="useColumns",
value=str(
[
"store_id",
"first_name",
"last_name",
"email",
"activebool",
"address_id",
"active",
# "create_date", # date types are incomparable for mysql and postgres
"last_update",
]
),
)
],
),
"MYSQL_SERVICE.default.test.customer",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Success,
),
),
(
TestCaseDefinition(
name="postgres_vs_mysql_failure",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[],
),
"MYSQL_SERVICE.default.test.changed_customer",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Failed,
),
),
(
TestCaseDefinition(
name="postgres_different_case_columns_fail",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[TestCaseParameterValue(name="caseSensitiveColumns", value="true")],
),
"POSTGRES_SERVICE.dvdrental.public.customer_different_case_columns",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Failed,
failedRows=0,
passedRows=599,
testResultValue=[
TestResultValue(name="removedRows", value="0"),
TestResultValue(name="addedRows", value="0"),
TestResultValue(name="changedRows", value="0"),
TestResultValue(name="diffCount", value="0"),
TestResultValue(name="removedColumns", value="1"),
TestResultValue(name="addedColumns", value="1"),
TestResultValue(name="changedColumns", value="0"),
TestResultValue(
name="schemaTable1",
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
),
TestResultValue(
name="schemaTable2",
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer_different_case_columns' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'First_Name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
),
],
),
),
(
TestCaseDefinition(
name="postgres_different_case_columns_success",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[TestCaseParameterValue(name="caseSensitiveColumns", value="false")],
),
"POSTGRES_SERVICE.dvdrental.public.customer_different_case_columns",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Success,
),
TableProfilerConfig(
profileSampleConfig=ProfileSampleConfig(
sampleConfigType="STATIC",
config={
"profileSample": 10,
"profileSampleType": "PERCENTAGE",
},
),
),
),
(
TestCaseDefinition(
name="table_from_another_db",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[],
),
"POSTGRES_SERVICE.other_db.public.customer",
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Failed,
),
),
]
],
)
def test_happy_paths(
postgres_service: DatabaseService,
prepare_data,
ingest_postgres_metadata,
ingest_mysql_service,
patched_metadata,
parameters: TestParameters,
sink_config,
profiler_config,
run_workflow,
workflow_config,
cleanup_fqns,
):
# Replace service name placeholders in expected testResultValue
if parameters.expected.testResultValue:
for result_value in parameters.expected.testResultValue:
if result_value.value:
result_value.value = result_value.value.replace(
"POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root
)
metadata = patched_metadata
table1: Table = metadata.get_by_name(
Table,
f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False,
)
cleanup_fqns(
TestCase,
f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}",
)
table2_service = {
"POSTGRES_SERVICE": postgres_service,
"MYSQL_SERVICE": ingest_mysql_service,
}
for k, v in table2_service.items():
parameters.table2_fqn = parameters.table2_fqn.replace(k, v.fullyQualifiedName.root)
parameters.test_case_defintion.parameterValues.extend(
[
TestCaseParameterValue(
name="table2",
value=parameters.table2_fqn,
),
]
)
if parameters.table_profile_config:
metadata.create_or_update_table_profiler_config(table1.fullyQualifiedName.root, parameters.table_profile_config)
workflow_config = {
"source": {
"type": "postgres",
"serviceName": f"MyTestSuite_{postgres_service.name.root}",
"sourceConfig": {
"config": {
"type": TestSuiteConfigType.TestSuite.value,
"entityFullyQualifiedName": table1.fullyQualifiedName.root,
}
},
},
"processor": {
"type": "orm-test-runner",
"config": {"testCases": [parameters.test_case_defintion.model_dump()]},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
run_workflow(TestSuiteWorkflow, workflow_config)
metadata.create_or_update_table_profiler_config(table1.fullyQualifiedName.root, TableProfilerConfig())
test_case_entity = metadata.get_by_name(
TestCase,
f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}",
fields=["*"],
)
assert "ERROR: Unexpected error" not in test_case_entity.testCaseResult.result
parameters.expected.timestamp = test_case_entity.testCaseResult.timestamp # timestamp is not deterministic
assert_equal_pydantic_objects(parameters.expected, test_case_entity.testCaseResult)
@pytest.mark.parametrize(
"parameters,expected",
[
pytest.param(
TestCaseDefinition(
name="unsupported_dialect",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(
name="service2Url",
value="mongodb://localhost:27017",
),
TestCaseParameterValue(
name="table2",
value="POSTGRES_SERVICE.dvdrental.public.customer",
),
],
),
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Aborted,
result="Unsupported dialect in param table2.serviceUrl: mongodb",
),
id="unsupported_dialect",
),
pytest.param(
TestCaseDefinition(
name="unsupported_data_types",
testDefinitionName="tableDiff",
computePassedFailedRowCount=True,
parameterValues=[
TestCaseParameterValue(
name="table2",
value="POSTGRES_SERVICE.dvdrental.public.customer_int_first_name",
),
],
),
TestCaseResult(
timestamp=int(datetime.now().timestamp() * 1000),
testCaseStatus=TestCaseStatus.Failed,
failedRows=0,
passedRows=599,
testResultValue=[
TestResultValue(name="removedRows", value="0"),
TestResultValue(name="addedRows", value="0"),
TestResultValue(name="changedRows", value="0"),
TestResultValue(name="diffCount", value="0"),
TestResultValue(name="removedColumns", value="0"),
TestResultValue(name="addedColumns", value="0"),
TestResultValue(name="changedColumns", value="1"),
TestResultValue(
name="schemaTable1",
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
),
TestResultValue(
name="schemaTable2",
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer_int_first_name' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}, 'first_name': {'type': 'integer', 'constraints': 'NULL'}}",
),
],
),
),
pytest.param(
None,
None,
marks=pytest.mark.skip(reason="TODO: implement test - table2 does not exist"),
),
pytest.param(
None,
None,
marks=pytest.mark.skip(reason="TODO: implement test - where clause is invalid"),
),
],
)
def test_error_paths(
parameters: TestCaseDefinition,
expected: TestCaseResult,
prepare_data: None,
ingest_postgres_metadata,
ingest_mysql_service: DatabaseService,
postgres_service: DatabaseService,
patched_metadata: OpenMetadata,
sink_config,
workflow_config,
run_workflow,
cleanup_fqns,
):
# Replace service name placeholders in expected testResultValue
if expected.testResultValue:
for result_value in expected.testResultValue:
if result_value.value:
result_value.value = result_value.value.replace(
"POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root
)
metadata = patched_metadata
table1 = metadata.get_by_name(
Table,
f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer",
nullable=False,
)
cleanup_fqns(TestCase, f"{table1.fullyQualifiedName.root}.{parameters.name}")
for parameter in parameters.parameterValues:
if parameter.name == "table2":
parameter.value = parameter.value.replace("POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root)
workflow_config = {
"source": {
"type": "postgres",
"serviceName": f"MyTestSuite_{postgres_service.name.root}",
"sourceConfig": {
"config": {
"type": TestSuiteConfigType.TestSuite.value,
"entityFullyQualifiedName": table1.fullyQualifiedName.root,
}
},
},
"processor": {
"type": "orm-test-runner",
"config": {"testCases": [parameters.dict()]},
},
"sink": sink_config,
"workflowConfig": workflow_config,
}
run_workflow(TestSuiteWorkflow, workflow_config)
test_case_entity: TestCase = metadata.get_or_create_test_case(f"{table1.fullyQualifiedName.root}.{parameters.name}")
expected.timestamp = test_case_entity.testCaseResult.timestamp # timestamp is not deterministic
assert_equal_pydantic_objects(expected, test_case_entity.testCaseResult)
def add_changed_tables(connection: Connection):
connection.execute(text("CREATE TABLE customer_200 AS SELECT * FROM customer LIMIT 200;"))
connection.execute(text("CREATE TABLE customer_different_case_columns AS SELECT * FROM customer;"))
connection.execute(text('ALTER TABLE customer_different_case_columns RENAME COLUMN first_name TO "First_Name";'))
# TODO: this appears to be unsupported by data diff. Cross data type comparison is flaky.
# connection.execute(
# text("ALTER TABLE customer_different_case_columns ALTER COLUMN store_id TYPE decimal")
# )
connection.execute(text("CREATE TABLE changed_customer AS SELECT * FROM customer;"))
connection.execute(text("UPDATE changed_customer SET first_name = 'John' WHERE MOD(customer_id, 2) = 0;"))
connection.execute(text("DELETE FROM changed_customer WHERE MOD(customer_id, 13) = 0;"))
connection.execute(text("CREATE TABLE customer_without_first_name AS SELECT * FROM customer;"))
connection.execute(text("ALTER TABLE customer_without_first_name DROP COLUMN first_name;"))
connection.execute(text("CREATE TABLE customer_int_first_name AS SELECT * FROM customer;"))
connection.execute(text("ALTER TABLE customer_int_first_name DROP COLUMN first_name;"))
connection.execute(text("ALTER TABLE customer_int_first_name ADD COLUMN first_name INT;"))
connection.execute(text("UPDATE customer_int_first_name SET first_name = 1;"))
@pytest.fixture(scope="module")
def prepare_data(postgres_container, mysql_container): # noqa: F811
dvdrental = create_engine(
make_url(postgres_container.get_connection_url()).set(database="dvdrental"),
isolation_level="AUTOCOMMIT",
)
with dvdrental.connect() as conn:
conn.execute(text("CREATE DATABASE other_db"))
with dvdrental.connect() as conn:
add_changed_tables(conn)
other = create_engine(
make_url(postgres_container.get_connection_url()).set(database="other_db"),
isolation_level="AUTOCOMMIT",
)
copy_table_between_postgres(dvdrental, other, "customer", 10)
mysql_container = create_engine(make_url(mysql_container.get_connection_url()).set(database=mysql_container.dbname))
dvdrental = create_engine(make_url(postgres_container.get_connection_url()).set(database="dvdrental"))
copy_table(dvdrental, mysql_container, "customer")
copy_table(dvdrental, mysql_container, "changed_customer")
def copy_table(source_engine, destination_engine, table_name):
source_metadata = MetaData()
source_table = SQATable(table_name, source_metadata, autoload_with=source_engine)
destination_metadata = MetaData()
destination_table = SQATable(table_name, destination_metadata)
for column in source_table.columns:
# we copy all the columns without constraints, indexes or defaults
# as we are only interested in the data
if isinstance(column.type, postgresql.base.BYTEA) and destination_engine.dialect.name == "mssql":
column_copy = SQAColumn(column.name, VARBINARY)
elif isinstance(column.type, sqltypes.BOOLEAN) and destination_engine.dialect.name == "mssql":
column_copy = SQAColumn(column.name, sqltypes.Boolean)
elif isinstance(column.type, sqltypes.TIMESTAMP) and destination_engine.dialect.name == "mssql":
column_copy = SQAColumn(column.name, sqltypes.DateTime)
elif isinstance(column.type, sqltypes.DATE) and destination_engine.dialect.name == "mssql":
column_copy = SQAColumn(column.name, sqltypes.DateTime)
elif isinstance(column.type, postgresql.json.JSONB):
column_copy = SQAColumn(column.name, sqltypes.JSON)
else:
column_copy = SQAColumn(column.name, column.type)
destination_table.append_column(column_copy)
destination_metadata.create_all(destination_engine)
with source_engine.connect() as source_connection, destination_engine.connect() as destination_connection:
data = source_connection.execute(source_table.select()).fetchall()
batch_size = 1000
for i in range(0, len(data), batch_size):
batch = data[i : i + batch_size]
destination_connection.execute(source_table.insert(), [dict(row._mapping) for row in batch])
destination_connection.commit()
@pytest.fixture
def patched_metadata(metadata, postgres_service, ingest_mysql_service, monkeypatch):
dbs_by_name = {service.fullyQualifiedName.root: service for service in [postgres_service, ingest_mysql_service]}
def override_result_by_fqn(func):
def inner(*args, **kwargs):
result = func(*args, **kwargs)
if result.fullyQualifiedName.root in dbs_by_name:
return dbs_by_name[result.fullyQualifiedName.root]
return result
return inner
monkeypatch.setattr(
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name",
override_result_by_fqn(OpenMetadata.get_by_name),
)
monkeypatch.setattr(
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id",
override_result_by_fqn(OpenMetadata.get_by_id),
)
return metadata
def copy_table_between_postgres(source_engine, dest_engine, table_name: str, limit: int):
source_metadata = MetaData()
source_table = SQATable(table_name, source_metadata, autoload_with=source_engine)
dest_metadata = MetaData()
dest_table = SQATable(table_name, dest_metadata)
for column in source_table.columns:
dest_table.append_column(column.copy())
dest_metadata.create_all(dest_engine)
with source_engine.connect() as source_conn, dest_engine.connect() as dest_conn:
query = source_table.select().limit(limit)
data = source_conn.execute(query).fetchall()
if data:
dest_conn.execute(dest_table.insert(), [dict(row._mapping) for row in data])
dest_conn.commit()