mirror of
https://github.com/open-metadata/OpenMetadata
synced 2026-05-24 09:39:11 +00:00
* chore(ingestion): drop pylint, expand ruff to Stage 2c
Replace pylint with a coherent ruff-only stack (Stage 2c of the modernize
roadmap). Pylint is dropped from dev deps and CI workflows; ruff selected
ruleset expanded to ~22 families covering style, bug catchers, hygiene,
and the pylint port (PLE/PLC/PLW/PLR with the noisy "too-many-X"
complexity caps + magic-value disabled).
What's selected (with rationale in pyproject.toml):
E, W, F, I, N — style + correctness baseline + naming
UP — pyupgrade (py>=3.10 modernizations)
B, C4, C90, RET, SIM, TRY — bug catchers
PIE, ICN, T20, TC, TID, PTH, PERF — hygiene
PLE, PLC, PLW, PLR — pylint port (PLR complexity caps ignored)
RUF — ruff-native (incl. RUF100 unused-noqa)
What's removed:
- .pylintrc (root) — duplicate of the ingestion pylint config
- [tool.pylint.*] block in ingestion/pyproject.toml (~140 lines)
- ingestion/plugins/{print_checker,import_checker}.py + tests + README
(replaced by built-in T20 + TID251 banned-api respectively)
- pylint dep from ingestion/setup.py and openmetadata-airflow-apis/pyproject.toml
- `make lint` Makefile target + the pylint invocation in py_format_check
- dead pylint TODO comment + ignored test entry in noxfile.py
Cwd-stable config: ruff is invoked both from the repo root (pre-commit,
CI) and from ingestion/ (`make py_format_check`). The `src`,
`extend-exclude`, and per-file-ignores entries are listed twice — once
relative to ingestion/ and once with the `ingestion/` prefix — so
first-party isort detection and exclusions match in both invocations.
Grandfathering: ran `ruff check --add-noqa` once + format-stable
iteration. ~12,130 noqa directives across ~1,400 files. Cleanup is
deferred to follow-up PRs that drop noqas one rule at a time.
Documentation sweep: replaced `make lint` references in CLAUDE.md,
AGENTS.md, DEVELOPER.md, copilot-instructions, and 6 SKILL files with
the apply+verify shape `make py_format && make py_format_check`.
`make py_format` is NOT a strict superset of pylint — it only applies
auto-fixable violations; `make py_format_check` catches the rest.
Basedpyright baseline regenerated: ruff format reflowed multi-line
signatures in ~70 files, shifting type-error column positions. The
basedpyright baseline matches by (file path, error code, range), so
column shifts caused 19 entries to mis-align. Net diff is small
(154 lines in/out of the 13MB baseline.json) — purely positional.
Verified locally:
- make py_format_check → All checks passed
- nox --no-venv -s static-checks → 0 errors, 0 warnings, 0 notes
* chore(ingestion): finish ruff swap — nox lint session + skill docs
Three remaining stale-tooling references after Stage 2c:
- `ingestion/noxfile.py` `lint` session was still calling `black --check`,
`isort --check-only`, `pycln --diff`. Those tools aren't installed
anywhere (we dropped them from dev deps). Replace with the ruff
equivalents that mirror `make py_format_check`.
- `skills/standards/code_style.md`: stack listed as `black + isort +
pycln`; line length claimed 88 (black default). Both wrong: stack is
ruff, line length is 120.
- `skills/connector-building/SKILL.md`: `make py_format` comment said
`# black + isort + pycln`. Same swap.
* chore(ingestion): keep main's baseline + globally ignore TRY400
Per gitar-bot's review on PR #27774:
1. Main's PR #27728 promoted ~60 `logger.warning()` → `logger.error()`
inside `except` blocks. Those changes landed on main with their own
baseline updates. Our PR doesn't promote anything — the merge from
origin/main brought those `error` calls along with their baseline
entries.
The bot interpreted the `# noqa: TRY400` we added next to those lines
as us silencing the rule case-by-case. Cleaner: globally ignore
TRY400 in pyproject.toml, with a comment explaining why the codebase's
`logger.error(...)` + separate `logger.debug(traceback.format_exc())`
pattern is intentional. Strip ~430 per-line `# noqa: TRY400` markers
from source.
2. Document that `S101` in `per-file-ignores` is a forward-looking
entry — flake8-bandit (`S`) is not yet selected, so the rule is
no-op today; the entry stays so when `S` lands later, tests don't
immediately error.
Reverts the platform pin and Linux Docker–generated baseline. Keep
main's baseline intact and let CI surface the exact column-shifted
entries; the team will decide whether to fix in-place (revert format
on affected files) or add per-line `# pyright: ignore` markers.
* chore(ingestion): regen baseline for new connector type debt
Main's baseline was stale relative to recently-added connectors
(McpConnection, CustomDriveConnection) that lack common attributes
like `hostPort`, `database`, `catalog` etc. — all sites that access
those attributes via the union-typed `serviceConnection.root.config`
fire `reportAttributeAccessIssue` errors that aren't baselined.
71 errors + 58 warnings absorbed. Local macOS regen; pushing to see
CI's drift count. Per the basedpyright-baseline-and-ci PR experience,
macOS↔Linux column drift on this size of regen has historically been
1-7 residuals.
686 lines
33 KiB
Python
686 lines
33 KiB
Python
from datetime import datetime # noqa: I001
|
|
|
|
import pytest
|
|
from dirty_equals import IsApprox, IsPositiveInt
|
|
from pydantic import BaseModel
|
|
from sqlalchemy import VARBINARY
|
|
from sqlalchemy import Column as SQAColumn
|
|
from sqlalchemy import MetaData
|
|
from sqlalchemy import Table as SQATable
|
|
from sqlalchemy import create_engine, text
|
|
from sqlalchemy.dialects import postgresql
|
|
from sqlalchemy.engine import Connection, make_url
|
|
from sqlalchemy.sql import sqltypes
|
|
|
|
from _openmetadata_testutils.postgres.conftest import postgres_container # noqa: F401
|
|
from _openmetadata_testutils.pydantic.test_utils import assert_equal_pydantic_objects
|
|
from metadata.data_quality.api.models import TestCaseDefinition
|
|
from metadata.generated.schema.entity.data.table import Table, TableProfilerConfig
|
|
from metadata.generated.schema.entity.services.databaseService import DatabaseService
|
|
from metadata.generated.schema.metadataIngestion.testSuitePipeline import (
|
|
TestSuiteConfigType,
|
|
)
|
|
from metadata.generated.schema.tests.basic import (
|
|
TestCaseResult,
|
|
TestCaseStatus,
|
|
TestResultValue,
|
|
)
|
|
from metadata.generated.schema.tests.testCase import TestCase, TestCaseParameterValue
|
|
from metadata.generated.schema.type.samplingConfig import ProfileSampleConfig
|
|
from metadata.ingestion.ometa.ometa_api import OpenMetadata
|
|
from metadata.workflow.data_quality import TestSuiteWorkflow
|
|
|
|
|
|
class TestParameters(BaseModel):
|
|
test_case_defintion: TestCaseDefinition
|
|
table2_fqn: str
|
|
expected: TestCaseResult
|
|
table_profile_config: TableProfilerConfig = None
|
|
|
|
def __init__(self, *args, **kwargs):
|
|
if args:
|
|
# Map positional arguments to fields
|
|
field_names = list(self.__annotations__.keys())
|
|
kwargs.update(dict(zip(field_names, args))) # noqa: B905
|
|
|
|
super().__init__(**kwargs)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"parameters",
|
|
[
|
|
pytest.param(TestParameters(*t), id=t[0].name)
|
|
for t in [
|
|
(
|
|
TestCaseDefinition(
|
|
name="compare_same_tables",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(name="keyColumns", value="['customer_id']"),
|
|
],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.customer",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Success,
|
|
failedRows=0,
|
|
passedRows=599,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="compare_same_tables_with_percentage_sample",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(name="keyColumns", value="['customer_id']"),
|
|
],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.customer",
|
|
TestCaseResult.model_construct(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Success,
|
|
failedRows=0,
|
|
# we use approximations becuase the sampling is not deterministic
|
|
passedRows=IsApprox(59, delta=60) & IsPositiveInt,
|
|
),
|
|
TableProfilerConfig(
|
|
profileSampleConfig=ProfileSampleConfig(
|
|
sampleConfigType="STATIC",
|
|
config={
|
|
"profileSample": 10,
|
|
"profileSampleType": "PERCENTAGE",
|
|
},
|
|
),
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="compare_same_tables_with_row_sample",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(name="keyColumns", value="['customer_id']"),
|
|
],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.customer",
|
|
TestCaseResult.model_construct(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Success,
|
|
failedRows=0,
|
|
# we use approximations around the 99.5 confidence interval since the
|
|
# sampling in data diff uses hash based partitioning
|
|
passedRows=IsApprox(10, delta=15) & IsPositiveInt,
|
|
),
|
|
TableProfilerConfig(
|
|
profileSampleConfig=ProfileSampleConfig(
|
|
sampleConfigType="STATIC",
|
|
config={
|
|
"profileSample": 10,
|
|
"profileSampleType": "ROWS",
|
|
},
|
|
),
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="with_explicit_key_columns",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(name="keyColumns", value="['customer_id']"),
|
|
],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Failed,
|
|
failedRows=321,
|
|
passedRows=278,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="resolve_primary_keys",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Failed,
|
|
failedRows=321,
|
|
passedRows=278,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="with_passing_threshold",
|
|
testDefinitionName="tableDiff",
|
|
parameterValues=[
|
|
TestCaseParameterValue(name="threshold", value="322"),
|
|
],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Success,
|
|
failedRows=321,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="with_failing_threshold",
|
|
testDefinitionName="tableDiff",
|
|
parameterValues=[
|
|
TestCaseParameterValue(name="threshold", value="321"),
|
|
],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Failed,
|
|
failedRows=321,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="with_where_clause",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(
|
|
name="where",
|
|
value="MOD(customer_id, 2) != 0 AND MOD(customer_id, 13) != 0",
|
|
),
|
|
],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.changed_customer",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Success,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="without_first_name",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.customer_without_first_name",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Failed,
|
|
failedRows=0,
|
|
passedRows=599,
|
|
testResultValue=[
|
|
TestResultValue(name="removedRows", value="0"),
|
|
TestResultValue(name="addedRows", value="0"),
|
|
TestResultValue(name="changedRows", value="0"),
|
|
TestResultValue(name="diffCount", value="0"),
|
|
TestResultValue(name="removedColumns", value="1"),
|
|
TestResultValue(name="addedColumns", value="0"),
|
|
TestResultValue(name="changedColumns", value="0"),
|
|
TestResultValue(
|
|
name="schemaTable1",
|
|
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
|
|
),
|
|
TestResultValue(
|
|
name="schemaTable2",
|
|
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer_without_first_name' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
|
|
),
|
|
],
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="without_first_name_with_extra_column",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[TestCaseParameterValue(name="useColumns", value="['last_name', 'email']")],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.customer_without_first_name",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Success,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="postgres_vs_mysql_success",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(
|
|
name="useColumns",
|
|
value=str(
|
|
[
|
|
"store_id",
|
|
"first_name",
|
|
"last_name",
|
|
"email",
|
|
"activebool",
|
|
"address_id",
|
|
"active",
|
|
# "create_date", # date types are incomparable for mysql and postgres
|
|
"last_update",
|
|
]
|
|
),
|
|
)
|
|
],
|
|
),
|
|
"MYSQL_SERVICE.default.test.customer",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Success,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="postgres_vs_mysql_failure",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[],
|
|
),
|
|
"MYSQL_SERVICE.default.test.changed_customer",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Failed,
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="postgres_different_case_columns_fail",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[TestCaseParameterValue(name="caseSensitiveColumns", value="true")],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.customer_different_case_columns",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Failed,
|
|
failedRows=0,
|
|
passedRows=599,
|
|
testResultValue=[
|
|
TestResultValue(name="removedRows", value="0"),
|
|
TestResultValue(name="addedRows", value="0"),
|
|
TestResultValue(name="changedRows", value="0"),
|
|
TestResultValue(name="diffCount", value="0"),
|
|
TestResultValue(name="removedColumns", value="1"),
|
|
TestResultValue(name="addedColumns", value="1"),
|
|
TestResultValue(name="changedColumns", value="0"),
|
|
TestResultValue(
|
|
name="schemaTable1",
|
|
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
|
|
),
|
|
TestResultValue(
|
|
name="schemaTable2",
|
|
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer_different_case_columns' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'First_Name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
|
|
),
|
|
],
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="postgres_different_case_columns_success",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[TestCaseParameterValue(name="caseSensitiveColumns", value="false")],
|
|
),
|
|
"POSTGRES_SERVICE.dvdrental.public.customer_different_case_columns",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Success,
|
|
),
|
|
TableProfilerConfig(
|
|
profileSampleConfig=ProfileSampleConfig(
|
|
sampleConfigType="STATIC",
|
|
config={
|
|
"profileSample": 10,
|
|
"profileSampleType": "PERCENTAGE",
|
|
},
|
|
),
|
|
),
|
|
),
|
|
(
|
|
TestCaseDefinition(
|
|
name="table_from_another_db",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[],
|
|
),
|
|
"POSTGRES_SERVICE.other_db.public.customer",
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Failed,
|
|
),
|
|
),
|
|
]
|
|
],
|
|
)
|
|
def test_happy_paths(
|
|
postgres_service: DatabaseService,
|
|
prepare_data,
|
|
ingest_postgres_metadata,
|
|
ingest_mysql_service,
|
|
patched_metadata,
|
|
parameters: TestParameters,
|
|
sink_config,
|
|
profiler_config,
|
|
run_workflow,
|
|
workflow_config,
|
|
cleanup_fqns,
|
|
):
|
|
# Replace service name placeholders in expected testResultValue
|
|
if parameters.expected.testResultValue:
|
|
for result_value in parameters.expected.testResultValue:
|
|
if result_value.value:
|
|
result_value.value = result_value.value.replace(
|
|
"POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root
|
|
)
|
|
|
|
metadata = patched_metadata
|
|
table1: Table = metadata.get_by_name(
|
|
Table,
|
|
f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer",
|
|
nullable=False,
|
|
)
|
|
cleanup_fqns(
|
|
TestCase,
|
|
f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}",
|
|
)
|
|
table2_service = {
|
|
"POSTGRES_SERVICE": postgres_service,
|
|
"MYSQL_SERVICE": ingest_mysql_service,
|
|
}
|
|
for k, v in table2_service.items():
|
|
parameters.table2_fqn = parameters.table2_fqn.replace(k, v.fullyQualifiedName.root)
|
|
parameters.test_case_defintion.parameterValues.extend(
|
|
[
|
|
TestCaseParameterValue(
|
|
name="table2",
|
|
value=parameters.table2_fqn,
|
|
),
|
|
]
|
|
)
|
|
if parameters.table_profile_config:
|
|
metadata.create_or_update_table_profiler_config(table1.fullyQualifiedName.root, parameters.table_profile_config)
|
|
workflow_config = {
|
|
"source": {
|
|
"type": "postgres",
|
|
"serviceName": f"MyTestSuite_{postgres_service.name.root}",
|
|
"sourceConfig": {
|
|
"config": {
|
|
"type": TestSuiteConfigType.TestSuite.value,
|
|
"entityFullyQualifiedName": table1.fullyQualifiedName.root,
|
|
}
|
|
},
|
|
},
|
|
"processor": {
|
|
"type": "orm-test-runner",
|
|
"config": {"testCases": [parameters.test_case_defintion.model_dump()]},
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
run_workflow(TestSuiteWorkflow, workflow_config)
|
|
metadata.create_or_update_table_profiler_config(table1.fullyQualifiedName.root, TableProfilerConfig())
|
|
test_case_entity = metadata.get_by_name(
|
|
TestCase,
|
|
f"{table1.fullyQualifiedName.root}.{parameters.test_case_defintion.name}",
|
|
fields=["*"],
|
|
)
|
|
assert "ERROR: Unexpected error" not in test_case_entity.testCaseResult.result
|
|
parameters.expected.timestamp = test_case_entity.testCaseResult.timestamp # timestamp is not deterministic
|
|
assert_equal_pydantic_objects(parameters.expected, test_case_entity.testCaseResult)
|
|
|
|
|
|
@pytest.mark.parametrize(
|
|
"parameters,expected",
|
|
[
|
|
pytest.param(
|
|
TestCaseDefinition(
|
|
name="unsupported_dialect",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(
|
|
name="service2Url",
|
|
value="mongodb://localhost:27017",
|
|
),
|
|
TestCaseParameterValue(
|
|
name="table2",
|
|
value="POSTGRES_SERVICE.dvdrental.public.customer",
|
|
),
|
|
],
|
|
),
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Aborted,
|
|
result="Unsupported dialect in param table2.serviceUrl: mongodb",
|
|
),
|
|
id="unsupported_dialect",
|
|
),
|
|
pytest.param(
|
|
TestCaseDefinition(
|
|
name="unsupported_data_types",
|
|
testDefinitionName="tableDiff",
|
|
computePassedFailedRowCount=True,
|
|
parameterValues=[
|
|
TestCaseParameterValue(
|
|
name="table2",
|
|
value="POSTGRES_SERVICE.dvdrental.public.customer_int_first_name",
|
|
),
|
|
],
|
|
),
|
|
TestCaseResult(
|
|
timestamp=int(datetime.now().timestamp() * 1000),
|
|
testCaseStatus=TestCaseStatus.Failed,
|
|
failedRows=0,
|
|
passedRows=599,
|
|
testResultValue=[
|
|
TestResultValue(name="removedRows", value="0"),
|
|
TestResultValue(name="addedRows", value="0"),
|
|
TestResultValue(name="changedRows", value="0"),
|
|
TestResultValue(name="diffCount", value="0"),
|
|
TestResultValue(name="removedColumns", value="0"),
|
|
TestResultValue(name="addedColumns", value="0"),
|
|
TestResultValue(name="changedColumns", value="1"),
|
|
TestResultValue(
|
|
name="schemaTable1",
|
|
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer' schema={'customer_id': {'type': 'integer', 'constraints': 'PRIMARY_KEY'}, 'store_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'first_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NOT_NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NOT_NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NOT_NULL'}, 'create_date': {'type': 'date', 'constraints': 'NOT_NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}}",
|
|
),
|
|
TestResultValue(
|
|
name="schemaTable2",
|
|
value="serviceType='Postgres' fullyQualifiedTableName='POSTGRES_SERVICE.dvdrental.public.customer_int_first_name' schema={'customer_id': {'type': 'integer', 'constraints': 'NULL'}, 'store_id': {'type': 'smallint', 'constraints': 'NULL'}, 'last_name': {'type': 'character varying(45)', 'constraints': 'NULL'}, 'email': {'type': 'character varying(50)', 'constraints': 'NULL'}, 'address_id': {'type': 'smallint', 'constraints': 'NULL'}, 'activebool': {'type': 'boolean', 'constraints': 'NULL'}, 'create_date': {'type': 'date', 'constraints': 'NULL'}, 'last_update': {'type': 'timestamp without time zone', 'constraints': 'NULL'}, 'active': {'type': 'integer', 'constraints': 'NULL'}, 'json_field': {'type': 'jsonb', 'constraints': 'NULL'}, 'first_name': {'type': 'integer', 'constraints': 'NULL'}}",
|
|
),
|
|
],
|
|
),
|
|
),
|
|
pytest.param(
|
|
None,
|
|
None,
|
|
marks=pytest.mark.skip(reason="TODO: implement test - table2 does not exist"),
|
|
),
|
|
pytest.param(
|
|
None,
|
|
None,
|
|
marks=pytest.mark.skip(reason="TODO: implement test - where clause is invalid"),
|
|
),
|
|
],
|
|
)
|
|
def test_error_paths(
|
|
parameters: TestCaseDefinition,
|
|
expected: TestCaseResult,
|
|
prepare_data: None,
|
|
ingest_postgres_metadata,
|
|
ingest_mysql_service: DatabaseService,
|
|
postgres_service: DatabaseService,
|
|
patched_metadata: OpenMetadata,
|
|
sink_config,
|
|
workflow_config,
|
|
run_workflow,
|
|
cleanup_fqns,
|
|
):
|
|
# Replace service name placeholders in expected testResultValue
|
|
if expected.testResultValue:
|
|
for result_value in expected.testResultValue:
|
|
if result_value.value:
|
|
result_value.value = result_value.value.replace(
|
|
"POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root
|
|
)
|
|
|
|
metadata = patched_metadata
|
|
table1 = metadata.get_by_name(
|
|
Table,
|
|
f"{postgres_service.fullyQualifiedName.root}.dvdrental.public.customer",
|
|
nullable=False,
|
|
)
|
|
cleanup_fqns(TestCase, f"{table1.fullyQualifiedName.root}.{parameters.name}")
|
|
for parameter in parameters.parameterValues:
|
|
if parameter.name == "table2":
|
|
parameter.value = parameter.value.replace("POSTGRES_SERVICE", postgres_service.fullyQualifiedName.root)
|
|
workflow_config = {
|
|
"source": {
|
|
"type": "postgres",
|
|
"serviceName": f"MyTestSuite_{postgres_service.name.root}",
|
|
"sourceConfig": {
|
|
"config": {
|
|
"type": TestSuiteConfigType.TestSuite.value,
|
|
"entityFullyQualifiedName": table1.fullyQualifiedName.root,
|
|
}
|
|
},
|
|
},
|
|
"processor": {
|
|
"type": "orm-test-runner",
|
|
"config": {"testCases": [parameters.dict()]},
|
|
},
|
|
"sink": sink_config,
|
|
"workflowConfig": workflow_config,
|
|
}
|
|
run_workflow(TestSuiteWorkflow, workflow_config)
|
|
test_case_entity: TestCase = metadata.get_or_create_test_case(f"{table1.fullyQualifiedName.root}.{parameters.name}")
|
|
expected.timestamp = test_case_entity.testCaseResult.timestamp # timestamp is not deterministic
|
|
assert_equal_pydantic_objects(expected, test_case_entity.testCaseResult)
|
|
|
|
|
|
def add_changed_tables(connection: Connection):
|
|
connection.execute(text("CREATE TABLE customer_200 AS SELECT * FROM customer LIMIT 200;"))
|
|
connection.execute(text("CREATE TABLE customer_different_case_columns AS SELECT * FROM customer;"))
|
|
connection.execute(text('ALTER TABLE customer_different_case_columns RENAME COLUMN first_name TO "First_Name";'))
|
|
# TODO: this appears to be unsupported by data diff. Cross data type comparison is flaky.
|
|
# connection.execute(
|
|
# text("ALTER TABLE customer_different_case_columns ALTER COLUMN store_id TYPE decimal")
|
|
# )
|
|
connection.execute(text("CREATE TABLE changed_customer AS SELECT * FROM customer;"))
|
|
connection.execute(text("UPDATE changed_customer SET first_name = 'John' WHERE MOD(customer_id, 2) = 0;"))
|
|
connection.execute(text("DELETE FROM changed_customer WHERE MOD(customer_id, 13) = 0;"))
|
|
connection.execute(text("CREATE TABLE customer_without_first_name AS SELECT * FROM customer;"))
|
|
connection.execute(text("ALTER TABLE customer_without_first_name DROP COLUMN first_name;"))
|
|
connection.execute(text("CREATE TABLE customer_int_first_name AS SELECT * FROM customer;"))
|
|
connection.execute(text("ALTER TABLE customer_int_first_name DROP COLUMN first_name;"))
|
|
connection.execute(text("ALTER TABLE customer_int_first_name ADD COLUMN first_name INT;"))
|
|
connection.execute(text("UPDATE customer_int_first_name SET first_name = 1;"))
|
|
|
|
|
|
@pytest.fixture(scope="module")
|
|
def prepare_data(postgres_container, mysql_container): # noqa: F811
|
|
dvdrental = create_engine(
|
|
make_url(postgres_container.get_connection_url()).set(database="dvdrental"),
|
|
isolation_level="AUTOCOMMIT",
|
|
)
|
|
with dvdrental.connect() as conn:
|
|
conn.execute(text("CREATE DATABASE other_db"))
|
|
with dvdrental.connect() as conn:
|
|
add_changed_tables(conn)
|
|
other = create_engine(
|
|
make_url(postgres_container.get_connection_url()).set(database="other_db"),
|
|
isolation_level="AUTOCOMMIT",
|
|
)
|
|
copy_table_between_postgres(dvdrental, other, "customer", 10)
|
|
mysql_container = create_engine(make_url(mysql_container.get_connection_url()).set(database=mysql_container.dbname))
|
|
dvdrental = create_engine(make_url(postgres_container.get_connection_url()).set(database="dvdrental"))
|
|
copy_table(dvdrental, mysql_container, "customer")
|
|
copy_table(dvdrental, mysql_container, "changed_customer")
|
|
|
|
|
|
def copy_table(source_engine, destination_engine, table_name):
|
|
source_metadata = MetaData()
|
|
source_table = SQATable(table_name, source_metadata, autoload_with=source_engine)
|
|
destination_metadata = MetaData()
|
|
destination_table = SQATable(table_name, destination_metadata)
|
|
|
|
for column in source_table.columns:
|
|
# we copy all the columns without constraints, indexes or defaults
|
|
# as we are only interested in the data
|
|
if isinstance(column.type, postgresql.base.BYTEA) and destination_engine.dialect.name == "mssql":
|
|
column_copy = SQAColumn(column.name, VARBINARY)
|
|
elif isinstance(column.type, sqltypes.BOOLEAN) and destination_engine.dialect.name == "mssql":
|
|
column_copy = SQAColumn(column.name, sqltypes.Boolean)
|
|
elif isinstance(column.type, sqltypes.TIMESTAMP) and destination_engine.dialect.name == "mssql":
|
|
column_copy = SQAColumn(column.name, sqltypes.DateTime)
|
|
elif isinstance(column.type, sqltypes.DATE) and destination_engine.dialect.name == "mssql":
|
|
column_copy = SQAColumn(column.name, sqltypes.DateTime)
|
|
elif isinstance(column.type, postgresql.json.JSONB):
|
|
column_copy = SQAColumn(column.name, sqltypes.JSON)
|
|
else:
|
|
column_copy = SQAColumn(column.name, column.type)
|
|
destination_table.append_column(column_copy)
|
|
destination_metadata.create_all(destination_engine)
|
|
with source_engine.connect() as source_connection, destination_engine.connect() as destination_connection:
|
|
data = source_connection.execute(source_table.select()).fetchall()
|
|
batch_size = 1000
|
|
for i in range(0, len(data), batch_size):
|
|
batch = data[i : i + batch_size]
|
|
destination_connection.execute(source_table.insert(), [dict(row._mapping) for row in batch])
|
|
destination_connection.commit()
|
|
|
|
|
|
@pytest.fixture
|
|
def patched_metadata(metadata, postgres_service, ingest_mysql_service, monkeypatch):
|
|
dbs_by_name = {service.fullyQualifiedName.root: service for service in [postgres_service, ingest_mysql_service]}
|
|
|
|
def override_result_by_fqn(func):
|
|
def inner(*args, **kwargs):
|
|
result = func(*args, **kwargs)
|
|
if result.fullyQualifiedName.root in dbs_by_name:
|
|
return dbs_by_name[result.fullyQualifiedName.root]
|
|
return result
|
|
|
|
return inner
|
|
|
|
monkeypatch.setattr(
|
|
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_name",
|
|
override_result_by_fqn(OpenMetadata.get_by_name),
|
|
)
|
|
|
|
monkeypatch.setattr(
|
|
"metadata.ingestion.ometa.ometa_api.OpenMetadata.get_by_id",
|
|
override_result_by_fqn(OpenMetadata.get_by_id),
|
|
)
|
|
|
|
return metadata
|
|
|
|
|
|
def copy_table_between_postgres(source_engine, dest_engine, table_name: str, limit: int):
|
|
source_metadata = MetaData()
|
|
source_table = SQATable(table_name, source_metadata, autoload_with=source_engine)
|
|
|
|
dest_metadata = MetaData()
|
|
dest_table = SQATable(table_name, dest_metadata)
|
|
|
|
for column in source_table.columns:
|
|
dest_table.append_column(column.copy())
|
|
|
|
dest_metadata.create_all(dest_engine)
|
|
|
|
with source_engine.connect() as source_conn, dest_engine.connect() as dest_conn:
|
|
query = source_table.select().limit(limit)
|
|
data = source_conn.execute(query).fetchall()
|
|
|
|
if data:
|
|
dest_conn.execute(dest_table.insert(), [dict(row._mapping) for row in data])
|
|
dest_conn.commit()
|