OpenMetadata/ingestion/tests/unit/test_workflow_parse.py
IceS2 5009059441
chore(ingestion): migrate to ruff for format + isort + unused-import (#27739)
* chore(ingestion): replace black/isort/pycln with ruff

- Swap formatter + import-sorter + unused-import tooling for ruff
  (line-length 120, target py3.10) in ingestion + openmetadata-airflow-apis
- Drop dead [tool.mypy] config; basedpyright is the active type checker
- Bump requires-python to >=3.10 to match noxfile and CLAUDE.md (3.9 is
  documented as broken on Mac in noxfile.py)
- Bump pre-commit-hooks v2.3 -> v5.0; the new check-json catches four
  pre-existing JSON issues now excluded with an inline TODO
- Update Makefile py_format / py_format_check targets to call ruff

* chore(ingestion): grandfather ruff lint violations and apply ruff format

- 253 noqa markers added via 'ruff check --add-noqa' across 128 files,
  freezing existing violations so this PR is a tooling-only swap. Per-rule
  cleanup tracked in the TODO comment in ingestion/pyproject.toml.
- Bulk reformat from black 22.3 -> ruff format @ line-length 120.
  Cosmetic only: imports balanced (-32/+32), structural keywords balanced
  (-2221/+2221), no logic changes.
- Star-import rules (F403/F405) globally ignored; refactoring wildcard
  imports across connectors is a separate effort.

* chore(ingestion): fix pylint findings surfaced after ruff format

- filters.py: drop redundant parens around re.match(...) in `if`
  (C0325 superfluous-parens) — exposed when ruff format unwrapped them
- nosql_adaptor.py: move `# pylint: disable=unused-argument` from the
  `column:` line to the `def` line so it covers `table` too (W0613) —
  scope was line-based, lost when ruff split params onto multiple lines
- action1xx.py: replace `arguments-differ` with `signature-differs` in
  the disable directive (was always wrong code) and drop the now-useless
  `unused-argument` suppression (I0021)

* fix(ingestion): make ruff extend-exclude robust to multi-root invocations

CI's `make py_format_check` runs from the repo root and passes both
`ingestion/` and `./openmetadata-airflow-apis/` to ruff in a single
invocation. With multiple root paths, ruff's parallel file discovery
races on extend-exclude matching against the project root, so files
under `ingestion/src/metadata/generated/` were intermittently scanned
and produced ~830 I001 violations.

20-run repro: 10/20 fail without the fix, 20/20 pass with the fix.

Each excluded directory now appears twice in extend-exclude:
- the project-root-relative pattern (cwd = ingestion/)
- the prefixed pattern (cwd = repo root, multi-root invocation)

* chore(ingestion): address gitar-bot findings + cross-version pylint disable

- openmetadata-airflow-apis/pyproject.toml: switch coverage to module-name
  source + [tool.coverage.paths] glob remap (matches the ingestion pattern).
  Drops the hardcoded `env/lib/python3.9/site-packages/...` source path,
  which broke after the requires-python bump to 3.10. (Finding 1)
- ingestion/setup.py: remove dead python_version<'3.9' / >='3.9' guards on
  mysql-connector-python and testcontainers; promote locust to a regular
  test dep (was conditionally added under sys.version_info >= (3, 9)). Also
  remove the now-unused `import sys`. (Finding 3)
- ingestion/src/metadata/great_expectations/action1xx.py: cover both
  arguments-differ (great_expectations 0.18.x parent) and signature-differs
  (great_expectations 1.x parent) in the pylint disable comment, since
  CI installs 0.18.x and local often has 1.x. unused-argument covers the
  unused action_context. The opposite rule fires as I0021 useless-suppression
  on each environment, which is informational and does not affect pylint's
  exit code.
2026-04-27 10:05:28 +02:00

931 lines
37 KiB
Python

# Copyright 2025 Collate
# Licensed under the Collate Community License, Version 1.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# https://github.com/open-metadata/OpenMetadata/blob/main/ingestion/LICENSE
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Test Workflow pydantic parsing
"""
from unittest import TestCase
from pydantic import ValidationError
from metadata.generated.schema.entity.automations.workflow import (
Workflow as AutomationWorkflow,
)
from metadata.generated.schema.entity.services.connections.api.restConnection import (
RestConnection,
)
from metadata.generated.schema.entity.services.connections.dashboard.tableauConnection import (
TableauConnection,
)
from metadata.generated.schema.entity.services.connections.database.glueConnection import (
GlueConnection,
)
from metadata.generated.schema.entity.services.connections.messaging.kafkaConnection import (
KafkaConnection,
)
from metadata.generated.schema.entity.services.connections.metadata.openMetadataConnection import (
OpenMetadataConnection,
)
from metadata.generated.schema.entity.services.dashboardService import (
DashboardConnection,
)
from metadata.generated.schema.entity.services.databaseService import DatabaseConnection
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
IngestionPipeline,
)
from metadata.generated.schema.entity.services.messagingService import (
MessagingConnection,
)
from metadata.generated.schema.entity.services.metadataService import MetadataConnection
from metadata.generated.schema.metadataIngestion.apiServiceMetadataPipeline import (
ApiServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.dashboardServiceMetadataPipeline import (
DashboardServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceMetadataPipeline import (
DatabaseServiceMetadataPipeline,
)
from metadata.generated.schema.metadataIngestion.databaseServiceProfilerPipeline import (
DatabaseServiceProfilerPipeline,
)
from metadata.generated.schema.metadataIngestion.pipelineServiceMetadataPipeline import (
PipelineServiceMetadataPipeline,
)
from metadata.ingestion.api.parser import (
ParsingConfigurationError,
get_connection_class,
get_service_type,
get_source_config_class,
parse_automation_workflow_gracefully,
parse_ingestion_pipeline_config_gracefully,
parse_workflow_config_gracefully,
)
class TestWorkflowParse(TestCase):
"""
Test parsing scenarios of JSON Schemas
"""
def test_get_service_type(self):
"""
Test that we can get the service type of source
"""
database_service = get_service_type("Mysql")
self.assertEqual(database_service, DatabaseConnection)
dashboard_service = get_service_type("Looker")
self.assertEqual(dashboard_service, DashboardConnection)
messaging_service = get_service_type("Kafka")
self.assertEqual(messaging_service, MessagingConnection)
metadata_service = get_service_type("Amundsen")
self.assertEqual(metadata_service, MetadataConnection)
with self.assertRaises(ValueError) as err:
get_service_type("random")
self.assertEqual("Cannot find the service type of random", str(err.exception))
def test_get_connection_class(self):
"""
Check that we can correctly build the connection module ingredients
"""
source_type = "Glue"
connection = get_connection_class(source_type, get_service_type(source_type))
self.assertEqual(connection, GlueConnection)
source_type = "Tableau"
connection = get_connection_class(source_type, get_service_type(source_type))
self.assertEqual(connection, TableauConnection)
source_type = "OpenMetadata"
connection = get_connection_class(source_type, get_service_type(source_type))
self.assertEqual(connection, OpenMetadataConnection)
source_type = "Kafka"
connection = get_connection_class(source_type, get_service_type(source_type))
self.assertEqual(connection, KafkaConnection)
source_type = "Rest"
connection = get_connection_class(source_type, get_service_type(source_type))
self.assertEqual(connection, RestConnection)
# Test all-uppercase source types (SAS, SSAS)
from metadata.generated.schema.entity.services.connections.database.sasConnection import (
SASConnection,
)
from metadata.generated.schema.entity.services.connections.database.ssasConnection import (
SSASConnection,
)
source_type = "SAS"
connection = get_connection_class(source_type, get_service_type(source_type))
self.assertEqual(connection, SASConnection)
source_type = "SSAS"
connection = get_connection_class(source_type, get_service_type(source_type))
self.assertEqual(connection, SSASConnection)
def test_get_source_config_class(self):
"""
Check that we can correctly build the connection module ingredients
"""
source_config_type = "Profiler"
connection = get_source_config_class(source_config_type)
self.assertEqual(connection, DatabaseServiceProfilerPipeline)
source_config_type = "DatabaseMetadata"
connection = get_source_config_class(source_config_type)
self.assertEqual(connection, DatabaseServiceMetadataPipeline)
source_config_type = "PipelineMetadata"
connection = get_source_config_class(source_config_type)
self.assertEqual(connection, PipelineServiceMetadataPipeline)
source_config_type = "DashboardMetadata"
connection = get_source_config_class(source_config_type)
self.assertEqual(connection, DashboardServiceMetadataPipeline)
source_config_type = "ApiMetadata"
connection = get_source_config_class(source_config_type)
self.assertEqual(connection, ApiServiceMetadataPipeline)
def test_parsing_ok(self):
"""
Test MSSQL JSON Config parsing OK
"""
config_dict = {
"source": {
"type": "mssql",
"serviceName": "test_mssql",
"serviceConnection": {
"config": {
"type": "Mssql",
"database": "master",
"username": "sa",
"password": "MY%password",
"hostPort": "random:1433",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "WARN",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "token"},
},
},
}
self.assertIsNotNone(parse_workflow_config_gracefully(config_dict))
def test_parsing_ko_mssql(self):
"""
Test MSSQL JSON Config parsing KO
"""
config_dict = {
"source": {
"type": "mssql",
"serviceName": "test_mssql",
"serviceConnection": {
"config": {
"type": "Mssql",
"database": "master",
"username": "sa",
"password": "MY%password",
"hostPort": "localhost:1433",
"random": "extra",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "WARN",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth",
},
},
}
with self.assertRaises(ParsingConfigurationError) as err:
parse_workflow_config_gracefully(config_dict)
self.assertIn(
"We encountered an error parsing the configuration of your MssqlConnection.\nYou might need to review your config based on the original cause of this failure:\n\t - Extra parameter 'random'",
str(err.exception),
)
def test_parsing_ko_mssql_source_config(self):
"""
Test MSSQL JSON Config parsing KO
"""
config_dict = {
"source": {
"type": "mssql",
"serviceName": "test_mssql",
"serviceConnection": {
"config": {
"type": "Mssql",
"database": "master",
"username": "sa",
"password": "MY%password",
"hostPort": "localhost:1433",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata", "random": "extra"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "WARN",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth",
},
},
}
with self.assertRaises(ParsingConfigurationError) as err:
parse_workflow_config_gracefully(config_dict)
self.assertIn(
"We encountered an error parsing the configuration of your DatabaseServiceMetadataPipeline.\nYou might need to review your config based on the original cause of this failure:\n\t - Extra parameter 'random'",
str(err.exception),
)
def test_parsing_ko_glue(self):
"""
Test Glue JSON Config parsing OK
"""
config_dict = {
"source": {
"type": "glue",
"serviceName": "local_glue",
"serviceConnection": {
"config": {
"type": "Glue",
"awsConfig": {
"awsSecretAccessKey": "aws secret access key",
"awsRegion": "us-east-1",
"endPointURL": "https://glue.<region_name>.amazonaws.com/",
},
"random": "extra",
}
},
"sourceConfig": {"config": {"type": "DatabaseMetadata"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth",
}
},
}
with self.assertRaises(ParsingConfigurationError) as err:
parse_workflow_config_gracefully(config_dict)
self.assertIn(
"We encountered an error parsing the configuration of your GlueConnection.\nYou might need to review your config based on the original cause of this failure:\n\t - Extra parameter 'random'",
str(err.exception),
)
def test_parsing_ko_airbyte(self):
"""
Test Glue JSON Config parsing OK
"""
config_dict = {
"source": {
"type": "airbyte",
"serviceName": "local_airbyte",
"serviceConnection": {"config": {"type": "Airbyte", "hostPort": "http://localhost:8000"}},
"sourceConfig": {"config": {"type": "PipelineMetadata", "random": "extra"}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "no-auth",
}
},
}
with self.assertRaises(ParsingConfigurationError) as err:
parse_workflow_config_gracefully(config_dict)
self.assertIn(
"We encountered an error parsing the configuration of your PipelineServiceMetadataPipeline.\nYou might need to review your config based on the original cause of this failure:\n\t - Extra parameter 'random'",
str(err.exception),
)
def test_parsing_matillion_pipeline(self):
"""
Test Matillion JSON Config parsing OK
"""
config_dict = {
"source": {
"type": "Matillion",
"serviceName": "local_Matillion_123",
"serviceConnection": {
"config": {
"type": "Matillion",
"connection": {
"type": "MatillionETL",
"hostPort": "hostport",
"username": "username",
"password": "password",
"sslConfig": {
"caCertificate": "-----BEGIN CERTIFICATE-----\nsample certificate\n-----END CERTIFICATE-----\n"
},
},
}
},
"sourceConfig": {"config": {"type": "PipelineMetadata", "includeLineage": True}},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {
"jwtToken": "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
},
},
},
}
self.assertTrue(parse_workflow_config_gracefully(config_dict))
del config_dict["source"]["serviceConnection"]["config"]["connection"]["sslConfig"]
self.assertTrue(parse_workflow_config_gracefully(config_dict))
del config_dict["source"]["serviceConnection"]["config"]["connection"]["username"]
del config_dict["source"]["serviceConnection"]["config"]["connection"]["hostPort"]
del config_dict["source"]["serviceConnection"]["config"]["connection"]["password"]
with self.assertRaises(ParsingConfigurationError) as err:
parse_workflow_config_gracefully(config_dict)
self.assertIn(
"We encountered an error parsing the configuration of your MatillionConnection.\n"
"You might need to review your config based on the original cause of this failure:\n"
"\t - Missing parameter in ('connection', 'function-after[parse_name(), MatillionEtlAuthConfig]', 'hostPort')\n"
"\t - Missing parameter in ('connection', 'function-after[parse_name(), MatillionEtlAuthConfig]', 'username')\n"
"\t - Missing parameter in ('connection', 'function-after[parse_name(), MatillionEtlAuthConfig]', 'password')\n"
"\t - Invalid parameter value for ('connection', 'function-after[parse_name(), MatillionDpcAuthConfig]', 'type')",
str(err.exception),
)
def test_parsing_ingestion_pipeline_mysql(self):
"""
Test parsing of ingestion_pipeline for MYSQL
"""
config_dict = {
"id": "08868b3e-cd02-4257-a545-9080856371a0",
"name": "qwfef_metadata_SPWHTqVO",
"pipelineType": "metadata",
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
"includeTags": True,
"includeViews": True,
"includeTables": True,
"queryLogDuration": 1,
"markDeletedTables": True,
"tableFilterPattern": {"excludes": [], "includes": []},
"useFqnForFiltering": True,
"schemaFilterPattern": {"excludes": [], "includes": []},
"databaseFilterPattern": {"excludes": [], "includes": []},
"includeStoredProcedures": True,
"queryParsingTimeoutLimit": 300,
"markDeletedStoredProcedures": True,
}
},
"airflowConfig": {
"retries": 0,
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
"pausePipeline": False,
"pipelineCatchup": False,
"pipelineTimezone": "UTC",
"scheduleInterval": "0 * * * *",
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
},
}
self.assertIsInstance(
parse_ingestion_pipeline_config_gracefully(config_dict),
IngestionPipeline,
)
config_dict_ko = {
"id": "08868b3e-cd02-4257-a545-9080856371a0",
"name": "qwfef_metadata_SPWHTqVO",
"pipelineType": "metadata",
"sourceConfig": {
"config": {
"type": "DatabaseMetadata",
"includeTags": True,
"includeViews": True,
"includeTables": True,
"viewLogDuration": 1,
"markDeletedTables": True,
"tFilterPattern": {"excludes": [], "includes": []},
"useFqnForFiltering": True,
"schemaFilterPattern": {"excludes": [], "includes": []},
"databaseFilterPattern": {"excludes": [], "includes": []},
"includeStoredProcedures": True,
"queryParsingTimeoutLimit": 300,
"markDeletedStoredProcedures": True,
}
},
"airflowConfig": {
"retries": 0,
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
"pausePipeline": False,
"pipelineCatchup": False,
"pipelineTimezone": "UTC",
"scheduleInterval": "0 * * * *",
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
},
}
with self.assertRaises(ValidationError) as err:
parse_ingestion_pipeline_config_gracefully(config_dict_ko)
self.assertIn(
"2 validation errors for DatabaseServiceMetadataPipeline\nviewLogDuration\n Extra inputs are not permitted",
str(err.exception),
)
def test_parsing_ingestion_pipeline_dagster(self):
"""
Test parsing of ingestion_pipeline for Dagster
"""
config_dict = {
"id": "da50179a-02c8-42d1-a8bd-3002a49649a6",
"name": "dagster_dev_metadata_G6pRkj7X",
"pipelineType": "metadata",
"sourceConfig": {
"config": {
"type": "PipelineMetadata",
"includeTags": True,
"includeOwners": True,
"lineageInformation": {"dbServiceNames": ["dev"]},
"includeLineage": True,
"markDeletedPipelines": True,
"pipelineFilterPattern": {
"excludes": [],
"includes": ["test_pipeline"],
},
}
},
"airflowConfig": {
"retries": 0,
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
"pausePipeline": False,
"pipelineCatchup": False,
"pipelineTimezone": "UTC",
"scheduleInterval": "0 * * * *",
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
},
}
self.assertIsInstance(
parse_ingestion_pipeline_config_gracefully(config_dict),
IngestionPipeline,
)
config_dict_ko = {
"id": "da50179a-02c8-42d1-a8bd-3002a49649a6",
"name": "dagster_dev_metadata_G6pRkj7X",
"pipelineType": "metadata",
"sourceConfig": {
"config": {
"type": "PipelineMetadata",
"includeTags": True,
"includeOwners": True,
"lineageInformation": {"dbServiceNames": ["dev"]},
"includeViewLineage": True,
"markDeletedDbs": True,
"pipelineFilterPatterns": {
"excludes": [],
"includes": ["test_pipeline"],
},
}
},
"airflowConfig": {
"retries": 0,
"startDate": "2023-12-19T00:00:00.000000Z",
"retryDelay": 300,
"concurrency": 1,
"maxActiveRuns": 1,
"pausePipeline": False,
"pipelineCatchup": False,
"pipelineTimezone": "UTC",
"scheduleInterval": "0 * * * *",
"workflowDefaultView": "tree",
"workflowDefaultViewOrientation": "LR",
},
}
with self.assertRaises(ValidationError) as err:
parse_ingestion_pipeline_config_gracefully(config_dict_ko)
self.assertIn(
"3 validation errors for PipelineServiceMetadataPipeline\nincludeViewLineage\n Extra inputs are not permitted",
str(err.exception),
)
def test_parsing_automation_workflow_airflow(self):
"""
Test parsing of automation workflow for airflow
"""
config_dict = {
"id": "8b735b2c-194e-41a4-b383-96253f936293",
"name": "test-connection-Airflow-WhCTUSXJ",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Airflow",
"hostPort": "https://localhost:8080",
"connection": {
"type": "Mysql",
"scheme": "mysql+pymysql",
"authType": {"password": "fernet:demo_password"},
"hostPort": "mysql:3306",
"username": "admin@openmetadata.org",
"databaseName": "airflow_db",
"supportsProfiler": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsMetadataExtraction": True,
},
"numberOfStatus": 10,
"supportsMetadataExtraction": True,
}
},
"serviceName": "airflow_test_two",
"serviceType": "Pipeline",
"connectionType": "Airflow",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703157653864,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Airflow-WhCTUSXJ",
}
self.assertIsInstance(
parse_automation_workflow_gracefully(config_dict),
AutomationWorkflow,
)
config_dict_ko = {
"id": "8b735b2c-194e-41a4-b383-96253f936293",
"name": "test-connection-Airflow-WhCTUSXJ",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Airflow",
"hostPort": "http:://localhost:8080",
"connection": {
"type": "Mysql",
"scheme": "mysql+pymysql",
"authType": {"password": "fernet:demo_password"},
"hostPort": "mysql:3306",
"username": "admin@openmetadata.org",
"databaseName": "airflow_db",
"supportsProfiler": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsMetadataExtraction": True,
},
"numberOfStatus": 10,
"supportsMetadataExtraction": True,
}
},
"serviceName": "airflow_test_two",
"serviceType": "Pipeline",
"connectionType": "Airflow",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703157653864,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Airflow-WhCTUSXJ",
}
with self.assertRaises(ValidationError) as err:
parse_automation_workflow_gracefully(config_dict_ko)
self.assertIn(
"1 validation error for AirflowConnection\nhostPort\n Input should be a valid URL",
str(err.exception),
)
config_dict_ko_2 = {
"id": "8b735b2c-194e-41a4-b383-96253f936293",
"name": "test-connection-Airflow-WhCTUSXJ",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Airflow",
"hostPort": "https://localhost:8080",
"connection": {
"type": "Mysql",
"scheme": "mysql+pymysql",
"authType": {"password": "fernet:demo_password"},
"hostPort": "mysql:3306",
"usernam": "admin@openmetadata.org",
"databaseName": "airflow_db",
"supportsProfile": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsMetadataExtraction": True,
},
"numberOfStatus": 10,
"supportsMetadataExtraction": True,
}
},
"serviceName": "airflow_test_two",
"serviceType": "Pipeline",
"connectionType": "Airflow",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703157653864,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Airflow-WhCTUSXJ",
}
with self.assertRaises(ValidationError) as err:
parse_automation_workflow_gracefully(config_dict_ko_2)
self.assertIn(
"3 validation errors for MysqlConnection\nusername\n Field required",
str(err.exception),
)
def test_parsing_automation_workflow_athena(self):
"""
Test parsing of automation workflow for airflow
"""
config_dict = {
"id": "850b194c-3d1b-4f6f-95df-83e3df5ccb24",
"name": "test-connection-Athena-EHnc3Ral",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Athena",
"scheme": "awsathena+rest",
"awsConfig": {
"awsRegion": "us-east-2",
"assumeRoleSessionName": "OpenMetadataSession",
},
"workgroup": "primary",
"s3StagingDir": "s3://athena-postgres/output/",
"supportsProfiler": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsUsageExtraction": True,
"supportsLineageExtraction": True,
"supportsMetadataExtraction": True,
}
},
"serviceType": "Database",
"connectionType": "Athena",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703173676044,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Athena-EHnc3Ral",
}
self.assertIsInstance(
parse_automation_workflow_gracefully(config_dict),
AutomationWorkflow,
)
config_dict_ko = {
"id": "850b194c-3d1b-4f6f-95df-83e3df5ccb24",
"name": "test-connection-Athena-EHnc3Ral",
"deleted": False,
"request": {
"connection": {
"config": {
"type": "Athena",
"scheme": "awsathena+rest",
"awsConfig": {
"awsRegion": "us-east-2",
"assumeRoleSessionName": "OpenMetadataSession",
},
"workgroup": "primary",
"s3StagingDir": "athena-postgres/output/",
"supportsProfiler": True,
"supportsQueryComment": True,
"supportsDBTExtraction": True,
"supportsUsageExtraction": True,
"supportsLineageExtraction": True,
"supportsMetadataExtraction": True,
}
},
"serviceType": "Database",
"connectionType": "Athena",
"secretsManagerProvider": "db",
},
"version": 0.1,
"updatedAt": 1703173676044,
"updatedBy": "admin",
"workflowType": "TEST_CONNECTION",
"fullyQualifiedName": "test-connection-Athena-EHnc3Ral",
}
with self.assertRaises(ValidationError) as err:
parse_automation_workflow_gracefully(config_dict_ko)
self.assertIn(
"1 validation error for AthenaConnection\ns3StagingDir\n Input should be a valid URL",
str(err.exception),
)
def test_parsing_dbt_workflow_ok(self):
"""
Test dbt workflow Config parsing OK
"""
config_dict = {
"source": {
"type": "dbt",
"serviceName": "dbt_prod",
"sourceConfig": {
"config": {
"type": "DBT",
"dbtConfigSource": {
"dbtConfigType": "local",
"dbtCatalogFilePath": "/path/to/catalog.json",
"dbtManifestFilePath": "/path/to/manifest.json",
"dbtRunResultsFilePath": "/path/to/run_results.json",
},
"dbtUpdateDescriptions": True,
"includeTags": True,
"dbtClassificationName": "dbtTags",
"databaseFilterPattern": {"includes": ["test"]},
"schemaFilterPattern": {
"includes": ["test1"],
"excludes": [".*schema.*"],
},
"tableFilterPattern": {
"includes": ["test3"],
"excludes": [".*table_name.*"],
},
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "jwt_token"},
},
},
}
self.assertIsNotNone(parse_workflow_config_gracefully(config_dict))
def test_parsing_dbt_workflow_ko(self):
"""
Test dbt workflow Config parsing OK
"""
config_dict_type_error_ko = {
"source": {
"type": "dbt",
"serviceName": "dbt_prod",
"sourceConfig": {
"config": {
"type": "DBT",
"dbtConfigSource": {
"dbtConfigType": "cloud",
"dbtCloudAuthToken": "token",
"dbtCloudAccountId": "ID",
"dbtCloudJobId": "JOB ID",
},
"dbtUpdateDescriptions": True,
"includeTags": True,
"dbtClassificationName": "dbtTags",
"databaseFilterPattern": {"includes": ["test"]},
"schemaFilterPattern": {
"includes": ["test1"],
"excludes": [".*schema.*"],
},
"tableFilterPattern": {
"includes": ["test3"],
"excludes": [".*table_name.*"],
},
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "jwt_token"},
},
},
}
with self.assertRaises(ParsingConfigurationError) as err:
parse_workflow_config_gracefully(config_dict_type_error_ko)
self.assertIn(
"We encountered an error parsing the configuration of your DbtCloudConfig.\nYou might need to review your config based on the original cause of this failure:\n\t - Missing parameter 'dbtCloudUrl'",
str(err.exception),
)
def test_parsing_dbt_pipeline_ko(self):
"""
Test dbt workflow Config parsing OK
"""
config_dict_dbt_pipeline_ko = {
"source": {
"type": "dbt",
"serviceName": "dbt_prod",
"sourceConfig": {
"config": {
"type": "DBT",
"dbtConfigSource": {
"dbtConfigType": "cloud",
"dbtCloudAuthToken": "token",
"dbtCloudAccountId": "ID",
"dbtCloudJobId": "JOB ID",
"dbtCloudUrl": "https://clouddbt.com",
},
"extraParameter": True,
"includeTags": True,
"dbtClassificationName": "dbtTags",
"databaseFilterPattern": {"includes": ["test"]},
"schemaFilterPattern": {
"includes": ["test1"],
"excludes": [".*schema.*"],
},
"tableFilterPattern": {
"includes": ["test3"],
"excludes": [".*table_name.*"],
},
}
},
},
"sink": {"type": "metadata-rest", "config": {}},
"workflowConfig": {
"loggerLevel": "DEBUG",
"openMetadataServerConfig": {
"hostPort": "http://localhost:8585/api",
"authProvider": "openmetadata",
"securityConfig": {"jwtToken": "jwt_token"},
},
},
}
with self.assertRaises(ParsingConfigurationError) as err:
parse_workflow_config_gracefully(config_dict_dbt_pipeline_ko)
self.assertIn(
"We encountered an error parsing the configuration of your DbtPipeline.\nYou might need to review your config based on the original cause of this failure:\n\t - Extra parameter 'extraParameter'",
str(err.exception),
)