diff --git a/ingestion/examples/sample_data/pipelines/pipelines.json b/ingestion/examples/sample_data/pipelines/pipelines.json index bc94dc5c347..1df17887310 100644 --- a/ingestion/examples/sample_data/pipelines/pipelines.json +++ b/ingestion/examples/sample_data/pipelines/pipelines.json @@ -10,7 +10,7 @@ "name": "presto_task", "displayName": "Presto Task", "description": "Airflow operator to perform ETL on presto tables", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" }, @@ -18,7 +18,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" } @@ -34,7 +34,7 @@ "name": "dim_address_task", "displayName": "dim_address Task", "description": "Airflow operator to perform ETL and generate dim_address table", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_address_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" }, @@ -42,7 +42,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" } @@ -57,7 +57,7 @@ "name": "dim_user_task", "displayName": "dim_user Task", "description": "Airflow operator to perform ETL and generate dim_user table", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_user_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_user_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" }, @@ -65,7 +65,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" } @@ -80,7 +80,7 @@ "name": "dim_location_task", "displayName": "dim_location Task", "description": "Airflow operator to perform ETL and generate dim_location table", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_location_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_location_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" }, @@ -88,7 +88,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" } @@ -103,7 +103,7 @@ "name": "dim_product_task", "displayName": "dim_product Task", "description": "Airflow operator to perform ETL and generate dim_product table", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_product_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_product_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" }, @@ -111,7 +111,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" } @@ -127,7 +127,7 @@ "name": "trino_task", "displayName": "Trino Task", "description": "Airflow operator to perform ETL on trino tables", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": ["assert_table_exists"], "taskType": "TrinoOperator" }, @@ -135,7 +135,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" } @@ -150,7 +150,7 @@ "name": "hive_create_table", "displayName": "Hive Create Table", "description": "Hive Create Table Task", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=hive_create_table", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=hive_create_table", "downstreamTasks": ["assert_table_exits"], "taskType": "HiveOperator" }, @@ -158,7 +158,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" } @@ -173,7 +173,7 @@ "name": "snowflake_task", "displayName": "Snowflake Task", "description": "Airflow operator to perform ETL on snowflake tables", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": ["assert_table_exists"], "taskType": "SnowflakeOperator" }, @@ -181,7 +181,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" }] @@ -196,7 +196,7 @@ "name": "metrics_aggregation_task", "displayName": "Metrics Aggregation Task", "description": "Airflow operator to aggregate streaming metrics", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=metrics_aggregation_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=metrics_aggregation_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PythonOperator" }, @@ -204,7 +204,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" }] @@ -219,7 +219,7 @@ "name": "feature_engineering_task", "displayName": "Feature Engineering Task", "description": "Airflow operator to generate ML features from order data", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=feature_engineering_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=feature_engineering_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PythonOperator" }, @@ -227,7 +227,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" }] diff --git a/ingestion/examples/sample_data/pipelines/tasks.json b/ingestion/examples/sample_data/pipelines/tasks.json index c30a3156609..9cf4648cee9 100644 --- a/ingestion/examples/sample_data/pipelines/tasks.json +++ b/ingestion/examples/sample_data/pipelines/tasks.json @@ -3,7 +3,7 @@ "name": "hive_create_table", "displayName": "Hive Create Table", "description": "Hive Create Table Task", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=hive_create_table", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=hive_create_table", "downstreamTasks": ["assert_table_exits"], "taskType": "HiveOperator" }, @@ -11,7 +11,7 @@ "name": "assert_table_exists", "displayName": "Assert Table Exists", "description": "Assert if a table exists", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": [], "taskType": "HiveOperator" }, @@ -19,7 +19,7 @@ "name": "snowflake_task", "displayName": "Snowflake Task", "description": "Airflow operator to perform ETL on snowflake tables", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": ["assert_table_exists"], "taskType": "SnowflakeOperator" }, @@ -27,7 +27,7 @@ "name": "presto_task", "displayName": "Presto Task", "description": "Airflow operator to perform ETL on presto tables", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" }, @@ -35,7 +35,7 @@ "name": "dim_address_task", "displayName": "dim_address Task", "description": "Airflow operator to perform ETL and generate dim_address table", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_address_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" }, @@ -43,7 +43,7 @@ "name": "dim_user_task", "displayName": "dim_user Task", "description": "Airflow operator to perform ETL and generate dim_user table", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_user_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_user_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" }, @@ -51,7 +51,7 @@ "name": "dim_location_task", "displayName": "dim_location Task", "description": "Airflow operator to perform ETL and generate dim_location table", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_location_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_location_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" }, @@ -59,7 +59,7 @@ "name": "trino_task", "displayName": "Trino Task", "description": "Airflow operator to perform ETL on trino tables", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": ["assert_table_exists"], "taskType": "TrinoOperator" }, @@ -68,7 +68,7 @@ "displayName": "dim_location etl", "description": "dim_location ETL pipeline", "sourceUrl": "http://localhost:8080/tree?dag_id=dim_address_etl", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists", "downstreamTasks": ["assert_table_exists"], "taskType": "TrinoOperator" }, @@ -76,7 +76,7 @@ "name": "metrics_aggregation_task", "displayName": "Metrics Aggregation Task", "description": "Airflow operator to aggregate streaming metrics in real-time", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=metrics_aggregation_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=metrics_aggregation_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PythonOperator" }, @@ -84,7 +84,7 @@ "name": "feature_engineering_task", "displayName": "Feature Engineering Task", "description": "Airflow operator to generate ML features from warehouse data", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=feature_engineering_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=feature_engineering_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PythonOperator" }, @@ -92,7 +92,7 @@ "name": "dim_product_task", "displayName": "dim_product Task", "description": "Airflow operator to perform ETL and generate dim_product table", - "sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_product_task", + "sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_product_task", "downstreamTasks": ["assert_table_exists"], "taskType": "PrestoOperator" } diff --git a/ingestion/src/airflow_provider_openmetadata/lineage/runner.py b/ingestion/src/airflow_provider_openmetadata/lineage/runner.py index 0d760741bef..2f2e9ed474c 100644 --- a/ingestion/src/airflow_provider_openmetadata/lineage/runner.py +++ b/ingestion/src/airflow_provider_openmetadata/lineage/runner.py @@ -16,6 +16,7 @@ import logging import os from itertools import groupby from typing import List, Optional +from urllib.parse import quote from airflow.configuration import conf from pydantic import BaseModel @@ -164,9 +165,11 @@ class AirflowLineageRunner: return pipeline_service def get_task_url(self, task: "Operator"): + if IS_AIRFLOW_3_OR_HIGHER: + return f"{clean_uri(self.host_port)}/dags/{quote(self.dag.dag_id)}/tasks/{quote(task.task_id)}" return ( f"{clean_uri(self.host_port)}/taskinstance/list/" - f"?flt1_dag_id_equals={self.dag.dag_id}&_flt_3_task_id={task.task_id}" + f"?_flt_3_dag_id={quote(self.dag.dag_id)}&_flt_3_task_id={quote(task.task_id)}" ) def get_om_tasks(self) -> List[Task]: diff --git a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py index 399ef9a73a6..357d3c2e962 100644 --- a/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py +++ b/ingestion/src/metadata/ingestion/source/pipeline/airflow/metadata.py @@ -16,6 +16,7 @@ from collections import Counter, defaultdict from datetime import datetime from enum import Enum from typing import Any, Dict, Iterable, List, Optional, Tuple, cast +from urllib.parse import quote from airflow.models import BaseOperator, DagRun, DagTag, TaskInstance from airflow.models.dag import DagModel @@ -131,6 +132,46 @@ class AirflowSource(PipelineServiceSource): self.observability_cache: Dict[Tuple[str, str], Dict[str, Any]] = {} self._execution_date_column = None + self._is_remote_airflow_3 = None + + @property + def is_remote_airflow_3(self): + """ + Dynamically check if the remote Airflow instance is version 3.x + by inspecting the database schema. + """ + if self._is_remote_airflow_3 is not None: + return self._is_remote_airflow_3 + + try: + inspector = inspect(self.session.bind) + tables = inspector.get_table_names() + + # Airflow 3.x removed the 'task_instance' primary key column 'map_index' + # and uses 'run_id' instead of 'execution_date' more consistently + # Check for the presence of specific Airflow 3.x schema characteristics + if "dag_run" in tables: + columns = {col["name"] for col in inspector.get_columns("dag_run")} + # logical_date exists in both late 2.x and 3.x, but + # the absence of 'execution_date' is a strong indicator of 3.x + if "logical_date" in columns and "execution_date" not in columns: + self._is_remote_airflow_3 = True + else: + self._is_remote_airflow_3 = False + else: + # Fallback to False (assume Airflow 2.x) if we can't determine + self._is_remote_airflow_3 = False + except Exception as exc: + logger.debug(traceback.format_exc()) + logger.warning( + f"Failed to detect remote Airflow version - {exc}. Assuming Airflow 2.x" + ) + self._is_remote_airflow_3 = False + + logger.info( + f"Detected remote Airflow version: {'3.x' if self._is_remote_airflow_3 else '2.x'}" + ) + return self._is_remote_airflow_3 @property def execution_date_column(self): @@ -602,8 +643,12 @@ class AirflowSource(PipelineServiceSource): name=task.task_id, description=task.doc_md, sourceUrl=SourceUrl( - f"{clean_uri(host_port)}/taskinstance/list/" - f"?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={task.task_id}" + ( + f"{clean_uri(host_port)}/dags/{quote(dag.dag_id)}/tasks/{quote(task.task_id)}" + if self.is_remote_airflow_3 + else f"{clean_uri(host_port)}/taskinstance/list/" + f"?_flt_3_dag_id={quote(dag.dag_id)}&_flt_3_task_id={quote(task.task_id)}" + ) ), downstreamTasks=list(task.downstream_task_ids) if task.downstream_task_ids @@ -643,8 +688,11 @@ class AirflowSource(PipelineServiceSource): """ try: - # Airflow uses /dags/dag_id/grid to show pipeline / dag - source_url = f"{clean_uri(self.service_connection.hostPort)}/dags/{pipeline_details.dag_id}/grid" + # Airflow 2.x uses /dags/dag_id/grid, Airflow 3.x uses /dags/dag_id + if self.is_remote_airflow_3: + source_url = f"{clean_uri(self.service_connection.hostPort)}/dags/{pipeline_details.dag_id}" + else: + source_url = f"{clean_uri(self.service_connection.hostPort)}/dags/{pipeline_details.dag_id}/grid" pipeline_state = self.get_pipeline_state(pipeline_details) pipeline_request = CreatePipelineRequest( diff --git a/ingestion/tests/unit/topology/pipeline/test_airflow.py b/ingestion/tests/unit/topology/pipeline/test_airflow.py index 146cb559e99..6c302aeb6d3 100644 --- a/ingestion/tests/unit/topology/pipeline/test_airflow.py +++ b/ingestion/tests/unit/topology/pipeline/test_airflow.py @@ -13,6 +13,7 @@ Test Airflow processing """ from unittest import TestCase from unittest.mock import patch +from urllib.parse import quote import pytest @@ -860,3 +861,140 @@ class TestAirflow(TestCase): # Verify fallback self.assertEqual(column, "execution_date") + + def test_task_source_url_format(self): + """Test that task source URLs use correct filter parameters (Airflow 2.x)""" + dag_id = "test_dag" + host_port = "http://localhost:8080" + + # Mock remote Airflow as version 2.x + self.airflow._is_remote_airflow_3 = False + + data = SERIALIZED_DAG["dag"] + dag = AirflowDagDetails( + dag_id=dag_id, + fileloc="/path/to/dag.py", + data=AirflowDag.model_validate(SERIALIZED_DAG), + max_active_runs=data.get("max_active_runs", None), + description=data.get("_description", None), + start_date=data.get("start_date", None), + tasks=data.get("tasks", []), + schedule_interval=None, + owner=None, + ) + + tasks = self.airflow.get_tasks_from_dag(dag, host_port) + + assert len(tasks) > 0 + for task in tasks: + url = task.sourceUrl.root + assert "_flt_3_dag_id=" in url + assert "_flt_3_task_id=" in url + assert "flt1_dag_id_equals" not in url + + def test_task_source_url_with_special_characters(self): + """Test URL encoding for DAG and task IDs with special characters (Airflow 2.x)""" + # Mock remote Airflow as version 2.x + self.airflow._is_remote_airflow_3 = False + + dag_id = "timescale_loader_v7" + task_id_with_dots = "loader_group.load_measurement" + host_port = "http://localhost:8080" + + serialized_dag_with_dots = { + "__version": 1, + "dag": { + "_dag_id": dag_id, + "fileloc": "/path/to/dag.py", + "tasks": [ + { + "task_id": task_id_with_dots, + "_task_type": "EmptyOperator", + "downstream_task_ids": [], + } + ], + }, + } + + data = serialized_dag_with_dots["dag"] + dag = AirflowDagDetails( + dag_id=dag_id, + fileloc="/path/to/dag.py", + data=AirflowDag.model_validate(serialized_dag_with_dots), + max_active_runs=data.get("max_active_runs", None), + description=data.get("_description", None), + start_date=data.get("start_date", None), + tasks=data.get("tasks", []), + schedule_interval=None, + owner=None, + ) + + tasks = self.airflow.get_tasks_from_dag(dag, host_port) + + assert len(tasks) == 1 + task_url = tasks[0].sourceUrl.root + assert f"_flt_3_dag_id={quote(dag_id)}" in task_url + assert f"_flt_3_task_id={quote(task_id_with_dots)}" in task_url + assert dag_id in task_url + assert task_id_with_dots in task_url + + def test_task_source_url_airflow_2x_format(self): + """Test that Airflow 2.x uses Flask-Admin URL format""" + dag_id = "test_dag_v2" + task_id = "test_task_v2" + host_port = "http://localhost:8080" + + # Mock remote Airflow as version 2.x + self.airflow._is_remote_airflow_3 = False + + data = SERIALIZED_DAG["dag"] + dag = AirflowDagDetails( + dag_id=dag_id, + fileloc="/path/to/dag.py", + data=AirflowDag.model_validate(SERIALIZED_DAG), + max_active_runs=data.get("max_active_runs", None), + description=data.get("_description", None), + start_date=data.get("start_date", None), + tasks=[{"task_id": task_id, "_task_type": "EmptyOperator"}], + schedule_interval=None, + owner=None, + ) + + tasks = self.airflow.get_tasks_from_dag(dag, host_port) + + assert len(tasks) > 0 + task_url = tasks[0].sourceUrl.root + assert "/taskinstance/list/" in task_url + assert f"_flt_3_dag_id={quote(dag_id)}" in task_url + assert f"_flt_3_task_id={quote(task_id)}" in task_url + assert "/dags/" not in task_url or "/tasks/" not in task_url + + def test_task_source_url_airflow_3x_format(self): + """Test that Airflow 3.x uses React UI URL format""" + dag_id = "test_dag_v3" + task_id = "test_task_v3" + host_port = "http://localhost:8080" + + # Mock remote Airflow as version 3.x + self.airflow._is_remote_airflow_3 = True + + data = SERIALIZED_DAG["dag"] + dag = AirflowDagDetails( + dag_id=dag_id, + fileloc="/path/to/dag.py", + data=AirflowDag.model_validate(SERIALIZED_DAG), + max_active_runs=data.get("max_active_runs", None), + description=data.get("_description", None), + start_date=data.get("start_date", None), + tasks=[{"task_id": task_id, "_task_type": "EmptyOperator"}], + schedule_interval=None, + owner=None, + ) + + tasks = self.airflow.get_tasks_from_dag(dag, host_port) + + assert len(tasks) > 0 + task_url = tasks[0].sourceUrl.root + assert f"/dags/{quote(dag_id)}/tasks/{quote(task_id)}" in task_url + assert "/taskinstance/list/" not in task_url + assert "_flt_3_dag_id=" not in task_url diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Explore/EntitySummaryPanel/mocks/PipelineSummary.mock.ts b/openmetadata-ui/src/main/resources/ui/src/components/Explore/EntitySummaryPanel/mocks/PipelineSummary.mock.ts index 065c007e955..2e61495dcd4 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Explore/EntitySummaryPanel/mocks/PipelineSummary.mock.ts +++ b/openmetadata-ui/src/main/resources/ui/src/components/Explore/EntitySummaryPanel/mocks/PipelineSummary.mock.ts @@ -30,7 +30,7 @@ export const mockPipelineEntityDetails: Pipeline = { description: 'Airflow operator to perform ETL and generate dim_address table', sourceUrl: - 'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task', + 'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_address_task', downstreamTasks: ['assert_table_exists'], taskType: 'PrestoOperator', }, @@ -39,7 +39,7 @@ export const mockPipelineEntityDetails: Pipeline = { displayName: 'Assert Table Exists', description: 'Assert if a table exists', sourceUrl: - 'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists', + 'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists', downstreamTasks: [], taskType: 'HiveOperator', }, diff --git a/openmetadata-ui/src/main/resources/ui/src/components/Pipeline/PipelineDetails/PipelineDetails.test.tsx b/openmetadata-ui/src/main/resources/ui/src/components/Pipeline/PipelineDetails/PipelineDetails.test.tsx index b33c3da0540..66c1fb9cd12 100644 --- a/openmetadata-ui/src/main/resources/ui/src/components/Pipeline/PipelineDetails/PipelineDetails.test.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/components/Pipeline/PipelineDetails/PipelineDetails.test.tsx @@ -33,7 +33,7 @@ const mockTasks = [ displayName: 'Snowflake Task', description: 'Airflow operator to perform ETL on snowflake tables', sourceUrl: - 'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists', + 'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists', downstreamTasks: ['assert_table_exists'], taskType: 'SnowflakeOperator', }, @@ -42,7 +42,7 @@ const mockTasks = [ displayName: 'Assert Table Exists', description: 'Assert if a table exists', sourceUrl: - 'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists', + 'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists', downstreamTasks: [], taskType: 'HiveOperator', }, diff --git a/openmetadata-ui/src/main/resources/ui/src/mocks/PipelineVersion.mock.ts b/openmetadata-ui/src/main/resources/ui/src/mocks/PipelineVersion.mock.ts index 74511dbce00..7b4dc3e9051 100644 --- a/openmetadata-ui/src/main/resources/ui/src/mocks/PipelineVersion.mock.ts +++ b/openmetadata-ui/src/main/resources/ui/src/mocks/PipelineVersion.mock.ts @@ -67,7 +67,7 @@ export const mockPipelineData = { name: 'snowflake_task', fullyQualifiedName: 'sample_airflow.snowflake_etl.snowflake_task', sourceUrl: - 'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists', + 'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists', downstreamTasks: ['assert_table_exists'], taskType: 'SnowflakeOperator', tags: [], @@ -78,7 +78,7 @@ export const mockPipelineData = { fullyQualifiedName: 'sample_airflow.snowflake_etl.assert_table_exists', description: 'Assert if a table exists', sourceUrl: - 'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists', + 'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists', downstreamTasks: [], taskType: 'HiveOperator', tags: [], diff --git a/openmetadata-ui/src/main/resources/ui/src/utils/mocks/EntitySummaryPanelUtils.mock.tsx b/openmetadata-ui/src/main/resources/ui/src/utils/mocks/EntitySummaryPanelUtils.mock.tsx index 91fa7d4fd3c..cd11e708adf 100644 --- a/openmetadata-ui/src/main/resources/ui/src/utils/mocks/EntitySummaryPanelUtils.mock.tsx +++ b/openmetadata-ui/src/main/resources/ui/src/utils/mocks/EntitySummaryPanelUtils.mock.tsx @@ -45,7 +45,7 @@ export const mockTextBasedSummaryTitleResponse = ( export const mockLinkBasedSummaryTitleResponse = ( + to="http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_address_task">
+ to="http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists">