mirror of
https://github.com/open-metadata/OpenMetadata
synced 2026-05-24 09:39:11 +00:00
* Fix: Airflow Task Instance link does not correctly filter by DAG * fix airflow pipeline source url
This commit is contained in:
parent
317e1b7452
commit
dfd2bd5167
9 changed files with 236 additions and 47 deletions
|
|
@ -10,7 +10,7 @@
|
|||
"name": "presto_task",
|
||||
"displayName": "Presto Task",
|
||||
"description": "Airflow operator to perform ETL on presto tables",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
},
|
||||
|
|
@ -18,7 +18,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}
|
||||
|
|
@ -34,7 +34,7 @@
|
|||
"name": "dim_address_task",
|
||||
"displayName": "dim_address Task",
|
||||
"description": "Airflow operator to perform ETL and generate dim_address table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_address_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
},
|
||||
|
|
@ -42,7 +42,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}
|
||||
|
|
@ -57,7 +57,7 @@
|
|||
"name": "dim_user_task",
|
||||
"displayName": "dim_user Task",
|
||||
"description": "Airflow operator to perform ETL and generate dim_user table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_user_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_user_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
},
|
||||
|
|
@ -65,7 +65,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}
|
||||
|
|
@ -80,7 +80,7 @@
|
|||
"name": "dim_location_task",
|
||||
"displayName": "dim_location Task",
|
||||
"description": "Airflow operator to perform ETL and generate dim_location table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_location_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_location_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
},
|
||||
|
|
@ -88,7 +88,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}
|
||||
|
|
@ -103,7 +103,7 @@
|
|||
"name": "dim_product_task",
|
||||
"displayName": "dim_product Task",
|
||||
"description": "Airflow operator to perform ETL and generate dim_product table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_product_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_product_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
},
|
||||
|
|
@ -111,7 +111,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}
|
||||
|
|
@ -127,7 +127,7 @@
|
|||
"name": "trino_task",
|
||||
"displayName": "Trino Task",
|
||||
"description": "Airflow operator to perform ETL on trino tables",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "TrinoOperator"
|
||||
},
|
||||
|
|
@ -135,7 +135,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}
|
||||
|
|
@ -150,7 +150,7 @@
|
|||
"name": "hive_create_table",
|
||||
"displayName": "Hive Create Table",
|
||||
"description": "Hive Create Table Task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=hive_create_table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=hive_create_table",
|
||||
"downstreamTasks": ["assert_table_exits"],
|
||||
"taskType": "HiveOperator"
|
||||
},
|
||||
|
|
@ -158,7 +158,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}
|
||||
|
|
@ -173,7 +173,7 @@
|
|||
"name": "snowflake_task",
|
||||
"displayName": "Snowflake Task",
|
||||
"description": "Airflow operator to perform ETL on snowflake tables",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "SnowflakeOperator"
|
||||
},
|
||||
|
|
@ -181,7 +181,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}]
|
||||
|
|
@ -196,7 +196,7 @@
|
|||
"name": "metrics_aggregation_task",
|
||||
"displayName": "Metrics Aggregation Task",
|
||||
"description": "Airflow operator to aggregate streaming metrics",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=metrics_aggregation_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=metrics_aggregation_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PythonOperator"
|
||||
},
|
||||
|
|
@ -204,7 +204,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}]
|
||||
|
|
@ -219,7 +219,7 @@
|
|||
"name": "feature_engineering_task",
|
||||
"displayName": "Feature Engineering Task",
|
||||
"description": "Airflow operator to generate ML features from order data",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=feature_engineering_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=feature_engineering_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PythonOperator"
|
||||
},
|
||||
|
|
@ -227,7 +227,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
}]
|
||||
|
|
|
|||
|
|
@ -3,7 +3,7 @@
|
|||
"name": "hive_create_table",
|
||||
"displayName": "Hive Create Table",
|
||||
"description": "Hive Create Table Task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=hive_create_table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=hive_create_table",
|
||||
"downstreamTasks": ["assert_table_exits"],
|
||||
"taskType": "HiveOperator"
|
||||
},
|
||||
|
|
@ -11,7 +11,7 @@
|
|||
"name": "assert_table_exists",
|
||||
"displayName": "Assert Table Exists",
|
||||
"description": "Assert if a table exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": [],
|
||||
"taskType": "HiveOperator"
|
||||
},
|
||||
|
|
@ -19,7 +19,7 @@
|
|||
"name": "snowflake_task",
|
||||
"displayName": "Snowflake Task",
|
||||
"description": "Airflow operator to perform ETL on snowflake tables",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "SnowflakeOperator"
|
||||
},
|
||||
|
|
@ -27,7 +27,7 @@
|
|||
"name": "presto_task",
|
||||
"displayName": "Presto Task",
|
||||
"description": "Airflow operator to perform ETL on presto tables",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
},
|
||||
|
|
@ -35,7 +35,7 @@
|
|||
"name": "dim_address_task",
|
||||
"displayName": "dim_address Task",
|
||||
"description": "Airflow operator to perform ETL and generate dim_address table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_address_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
},
|
||||
|
|
@ -43,7 +43,7 @@
|
|||
"name": "dim_user_task",
|
||||
"displayName": "dim_user Task",
|
||||
"description": "Airflow operator to perform ETL and generate dim_user table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_user_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_user_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
},
|
||||
|
|
@ -51,7 +51,7 @@
|
|||
"name": "dim_location_task",
|
||||
"displayName": "dim_location Task",
|
||||
"description": "Airflow operator to perform ETL and generate dim_location table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_location_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_location_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
},
|
||||
|
|
@ -59,7 +59,7 @@
|
|||
"name": "trino_task",
|
||||
"displayName": "Trino Task",
|
||||
"description": "Airflow operator to perform ETL on trino tables",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "TrinoOperator"
|
||||
},
|
||||
|
|
@ -68,7 +68,7 @@
|
|||
"displayName": "dim_location etl",
|
||||
"description": "dim_location ETL pipeline",
|
||||
"sourceUrl": "http://localhost:8080/tree?dag_id=dim_address_etl",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "TrinoOperator"
|
||||
},
|
||||
|
|
@ -76,7 +76,7 @@
|
|||
"name": "metrics_aggregation_task",
|
||||
"displayName": "Metrics Aggregation Task",
|
||||
"description": "Airflow operator to aggregate streaming metrics in real-time",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=metrics_aggregation_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=metrics_aggregation_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PythonOperator"
|
||||
},
|
||||
|
|
@ -84,7 +84,7 @@
|
|||
"name": "feature_engineering_task",
|
||||
"displayName": "Feature Engineering Task",
|
||||
"description": "Airflow operator to generate ML features from warehouse data",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=feature_engineering_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=feature_engineering_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PythonOperator"
|
||||
},
|
||||
|
|
@ -92,7 +92,7 @@
|
|||
"name": "dim_product_task",
|
||||
"displayName": "dim_product Task",
|
||||
"description": "Airflow operator to perform ETL and generate dim_product table",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_product_task",
|
||||
"sourceUrl": "http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_product_task",
|
||||
"downstreamTasks": ["assert_table_exists"],
|
||||
"taskType": "PrestoOperator"
|
||||
}
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ import logging
|
|||
import os
|
||||
from itertools import groupby
|
||||
from typing import List, Optional
|
||||
from urllib.parse import quote
|
||||
|
||||
from airflow.configuration import conf
|
||||
from pydantic import BaseModel
|
||||
|
|
@ -164,9 +165,11 @@ class AirflowLineageRunner:
|
|||
return pipeline_service
|
||||
|
||||
def get_task_url(self, task: "Operator"):
|
||||
if IS_AIRFLOW_3_OR_HIGHER:
|
||||
return f"{clean_uri(self.host_port)}/dags/{quote(self.dag.dag_id)}/tasks/{quote(task.task_id)}"
|
||||
return (
|
||||
f"{clean_uri(self.host_port)}/taskinstance/list/"
|
||||
f"?flt1_dag_id_equals={self.dag.dag_id}&_flt_3_task_id={task.task_id}"
|
||||
f"?_flt_3_dag_id={quote(self.dag.dag_id)}&_flt_3_task_id={quote(task.task_id)}"
|
||||
)
|
||||
|
||||
def get_om_tasks(self) -> List[Task]:
|
||||
|
|
|
|||
|
|
@ -16,6 +16,7 @@ from collections import Counter, defaultdict
|
|||
from datetime import datetime
|
||||
from enum import Enum
|
||||
from typing import Any, Dict, Iterable, List, Optional, Tuple, cast
|
||||
from urllib.parse import quote
|
||||
|
||||
from airflow.models import BaseOperator, DagRun, DagTag, TaskInstance
|
||||
from airflow.models.dag import DagModel
|
||||
|
|
@ -131,6 +132,46 @@ class AirflowSource(PipelineServiceSource):
|
|||
self.observability_cache: Dict[Tuple[str, str], Dict[str, Any]] = {}
|
||||
|
||||
self._execution_date_column = None
|
||||
self._is_remote_airflow_3 = None
|
||||
|
||||
@property
|
||||
def is_remote_airflow_3(self):
|
||||
"""
|
||||
Dynamically check if the remote Airflow instance is version 3.x
|
||||
by inspecting the database schema.
|
||||
"""
|
||||
if self._is_remote_airflow_3 is not None:
|
||||
return self._is_remote_airflow_3
|
||||
|
||||
try:
|
||||
inspector = inspect(self.session.bind)
|
||||
tables = inspector.get_table_names()
|
||||
|
||||
# Airflow 3.x removed the 'task_instance' primary key column 'map_index'
|
||||
# and uses 'run_id' instead of 'execution_date' more consistently
|
||||
# Check for the presence of specific Airflow 3.x schema characteristics
|
||||
if "dag_run" in tables:
|
||||
columns = {col["name"] for col in inspector.get_columns("dag_run")}
|
||||
# logical_date exists in both late 2.x and 3.x, but
|
||||
# the absence of 'execution_date' is a strong indicator of 3.x
|
||||
if "logical_date" in columns and "execution_date" not in columns:
|
||||
self._is_remote_airflow_3 = True
|
||||
else:
|
||||
self._is_remote_airflow_3 = False
|
||||
else:
|
||||
# Fallback to False (assume Airflow 2.x) if we can't determine
|
||||
self._is_remote_airflow_3 = False
|
||||
except Exception as exc:
|
||||
logger.debug(traceback.format_exc())
|
||||
logger.warning(
|
||||
f"Failed to detect remote Airflow version - {exc}. Assuming Airflow 2.x"
|
||||
)
|
||||
self._is_remote_airflow_3 = False
|
||||
|
||||
logger.info(
|
||||
f"Detected remote Airflow version: {'3.x' if self._is_remote_airflow_3 else '2.x'}"
|
||||
)
|
||||
return self._is_remote_airflow_3
|
||||
|
||||
@property
|
||||
def execution_date_column(self):
|
||||
|
|
@ -602,8 +643,12 @@ class AirflowSource(PipelineServiceSource):
|
|||
name=task.task_id,
|
||||
description=task.doc_md,
|
||||
sourceUrl=SourceUrl(
|
||||
f"{clean_uri(host_port)}/taskinstance/list/"
|
||||
f"?flt1_dag_id_equals={dag.dag_id}&_flt_3_task_id={task.task_id}"
|
||||
(
|
||||
f"{clean_uri(host_port)}/dags/{quote(dag.dag_id)}/tasks/{quote(task.task_id)}"
|
||||
if self.is_remote_airflow_3
|
||||
else f"{clean_uri(host_port)}/taskinstance/list/"
|
||||
f"?_flt_3_dag_id={quote(dag.dag_id)}&_flt_3_task_id={quote(task.task_id)}"
|
||||
)
|
||||
),
|
||||
downstreamTasks=list(task.downstream_task_ids)
|
||||
if task.downstream_task_ids
|
||||
|
|
@ -643,8 +688,11 @@ class AirflowSource(PipelineServiceSource):
|
|||
"""
|
||||
|
||||
try:
|
||||
# Airflow uses /dags/dag_id/grid to show pipeline / dag
|
||||
source_url = f"{clean_uri(self.service_connection.hostPort)}/dags/{pipeline_details.dag_id}/grid"
|
||||
# Airflow 2.x uses /dags/dag_id/grid, Airflow 3.x uses /dags/dag_id
|
||||
if self.is_remote_airflow_3:
|
||||
source_url = f"{clean_uri(self.service_connection.hostPort)}/dags/{pipeline_details.dag_id}"
|
||||
else:
|
||||
source_url = f"{clean_uri(self.service_connection.hostPort)}/dags/{pipeline_details.dag_id}/grid"
|
||||
pipeline_state = self.get_pipeline_state(pipeline_details)
|
||||
|
||||
pipeline_request = CreatePipelineRequest(
|
||||
|
|
|
|||
|
|
@ -13,6 +13,7 @@ Test Airflow processing
|
|||
"""
|
||||
from unittest import TestCase
|
||||
from unittest.mock import patch
|
||||
from urllib.parse import quote
|
||||
|
||||
import pytest
|
||||
|
||||
|
|
@ -860,3 +861,140 @@ class TestAirflow(TestCase):
|
|||
|
||||
# Verify fallback
|
||||
self.assertEqual(column, "execution_date")
|
||||
|
||||
def test_task_source_url_format(self):
|
||||
"""Test that task source URLs use correct filter parameters (Airflow 2.x)"""
|
||||
dag_id = "test_dag"
|
||||
host_port = "http://localhost:8080"
|
||||
|
||||
# Mock remote Airflow as version 2.x
|
||||
self.airflow._is_remote_airflow_3 = False
|
||||
|
||||
data = SERIALIZED_DAG["dag"]
|
||||
dag = AirflowDagDetails(
|
||||
dag_id=dag_id,
|
||||
fileloc="/path/to/dag.py",
|
||||
data=AirflowDag.model_validate(SERIALIZED_DAG),
|
||||
max_active_runs=data.get("max_active_runs", None),
|
||||
description=data.get("_description", None),
|
||||
start_date=data.get("start_date", None),
|
||||
tasks=data.get("tasks", []),
|
||||
schedule_interval=None,
|
||||
owner=None,
|
||||
)
|
||||
|
||||
tasks = self.airflow.get_tasks_from_dag(dag, host_port)
|
||||
|
||||
assert len(tasks) > 0
|
||||
for task in tasks:
|
||||
url = task.sourceUrl.root
|
||||
assert "_flt_3_dag_id=" in url
|
||||
assert "_flt_3_task_id=" in url
|
||||
assert "flt1_dag_id_equals" not in url
|
||||
|
||||
def test_task_source_url_with_special_characters(self):
|
||||
"""Test URL encoding for DAG and task IDs with special characters (Airflow 2.x)"""
|
||||
# Mock remote Airflow as version 2.x
|
||||
self.airflow._is_remote_airflow_3 = False
|
||||
|
||||
dag_id = "timescale_loader_v7"
|
||||
task_id_with_dots = "loader_group.load_measurement"
|
||||
host_port = "http://localhost:8080"
|
||||
|
||||
serialized_dag_with_dots = {
|
||||
"__version": 1,
|
||||
"dag": {
|
||||
"_dag_id": dag_id,
|
||||
"fileloc": "/path/to/dag.py",
|
||||
"tasks": [
|
||||
{
|
||||
"task_id": task_id_with_dots,
|
||||
"_task_type": "EmptyOperator",
|
||||
"downstream_task_ids": [],
|
||||
}
|
||||
],
|
||||
},
|
||||
}
|
||||
|
||||
data = serialized_dag_with_dots["dag"]
|
||||
dag = AirflowDagDetails(
|
||||
dag_id=dag_id,
|
||||
fileloc="/path/to/dag.py",
|
||||
data=AirflowDag.model_validate(serialized_dag_with_dots),
|
||||
max_active_runs=data.get("max_active_runs", None),
|
||||
description=data.get("_description", None),
|
||||
start_date=data.get("start_date", None),
|
||||
tasks=data.get("tasks", []),
|
||||
schedule_interval=None,
|
||||
owner=None,
|
||||
)
|
||||
|
||||
tasks = self.airflow.get_tasks_from_dag(dag, host_port)
|
||||
|
||||
assert len(tasks) == 1
|
||||
task_url = tasks[0].sourceUrl.root
|
||||
assert f"_flt_3_dag_id={quote(dag_id)}" in task_url
|
||||
assert f"_flt_3_task_id={quote(task_id_with_dots)}" in task_url
|
||||
assert dag_id in task_url
|
||||
assert task_id_with_dots in task_url
|
||||
|
||||
def test_task_source_url_airflow_2x_format(self):
|
||||
"""Test that Airflow 2.x uses Flask-Admin URL format"""
|
||||
dag_id = "test_dag_v2"
|
||||
task_id = "test_task_v2"
|
||||
host_port = "http://localhost:8080"
|
||||
|
||||
# Mock remote Airflow as version 2.x
|
||||
self.airflow._is_remote_airflow_3 = False
|
||||
|
||||
data = SERIALIZED_DAG["dag"]
|
||||
dag = AirflowDagDetails(
|
||||
dag_id=dag_id,
|
||||
fileloc="/path/to/dag.py",
|
||||
data=AirflowDag.model_validate(SERIALIZED_DAG),
|
||||
max_active_runs=data.get("max_active_runs", None),
|
||||
description=data.get("_description", None),
|
||||
start_date=data.get("start_date", None),
|
||||
tasks=[{"task_id": task_id, "_task_type": "EmptyOperator"}],
|
||||
schedule_interval=None,
|
||||
owner=None,
|
||||
)
|
||||
|
||||
tasks = self.airflow.get_tasks_from_dag(dag, host_port)
|
||||
|
||||
assert len(tasks) > 0
|
||||
task_url = tasks[0].sourceUrl.root
|
||||
assert "/taskinstance/list/" in task_url
|
||||
assert f"_flt_3_dag_id={quote(dag_id)}" in task_url
|
||||
assert f"_flt_3_task_id={quote(task_id)}" in task_url
|
||||
assert "/dags/" not in task_url or "/tasks/" not in task_url
|
||||
|
||||
def test_task_source_url_airflow_3x_format(self):
|
||||
"""Test that Airflow 3.x uses React UI URL format"""
|
||||
dag_id = "test_dag_v3"
|
||||
task_id = "test_task_v3"
|
||||
host_port = "http://localhost:8080"
|
||||
|
||||
# Mock remote Airflow as version 3.x
|
||||
self.airflow._is_remote_airflow_3 = True
|
||||
|
||||
data = SERIALIZED_DAG["dag"]
|
||||
dag = AirflowDagDetails(
|
||||
dag_id=dag_id,
|
||||
fileloc="/path/to/dag.py",
|
||||
data=AirflowDag.model_validate(SERIALIZED_DAG),
|
||||
max_active_runs=data.get("max_active_runs", None),
|
||||
description=data.get("_description", None),
|
||||
start_date=data.get("start_date", None),
|
||||
tasks=[{"task_id": task_id, "_task_type": "EmptyOperator"}],
|
||||
schedule_interval=None,
|
||||
owner=None,
|
||||
)
|
||||
|
||||
tasks = self.airflow.get_tasks_from_dag(dag, host_port)
|
||||
|
||||
assert len(tasks) > 0
|
||||
task_url = tasks[0].sourceUrl.root
|
||||
assert f"/dags/{quote(dag_id)}/tasks/{quote(task_id)}" in task_url
|
||||
assert "/taskinstance/list/" not in task_url
|
||||
assert "_flt_3_dag_id=" not in task_url
|
||||
|
|
|
|||
|
|
@ -30,7 +30,7 @@ export const mockPipelineEntityDetails: Pipeline = {
|
|||
description:
|
||||
'Airflow operator to perform ETL and generate dim_address table',
|
||||
sourceUrl:
|
||||
'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task',
|
||||
'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_address_task',
|
||||
downstreamTasks: ['assert_table_exists'],
|
||||
taskType: 'PrestoOperator',
|
||||
},
|
||||
|
|
@ -39,7 +39,7 @@ export const mockPipelineEntityDetails: Pipeline = {
|
|||
displayName: 'Assert Table Exists',
|
||||
description: 'Assert if a table exists',
|
||||
sourceUrl:
|
||||
'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists',
|
||||
'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists',
|
||||
downstreamTasks: [],
|
||||
taskType: 'HiveOperator',
|
||||
},
|
||||
|
|
|
|||
|
|
@ -33,7 +33,7 @@ const mockTasks = [
|
|||
displayName: 'Snowflake Task',
|
||||
description: 'Airflow operator to perform ETL on snowflake tables',
|
||||
sourceUrl:
|
||||
'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists',
|
||||
'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists',
|
||||
downstreamTasks: ['assert_table_exists'],
|
||||
taskType: 'SnowflakeOperator',
|
||||
},
|
||||
|
|
@ -42,7 +42,7 @@ const mockTasks = [
|
|||
displayName: 'Assert Table Exists',
|
||||
description: 'Assert if a table exists',
|
||||
sourceUrl:
|
||||
'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists',
|
||||
'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists',
|
||||
downstreamTasks: [],
|
||||
taskType: 'HiveOperator',
|
||||
},
|
||||
|
|
|
|||
|
|
@ -67,7 +67,7 @@ export const mockPipelineData = {
|
|||
name: 'snowflake_task',
|
||||
fullyQualifiedName: 'sample_airflow.snowflake_etl.snowflake_task',
|
||||
sourceUrl:
|
||||
'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists',
|
||||
'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists',
|
||||
downstreamTasks: ['assert_table_exists'],
|
||||
taskType: 'SnowflakeOperator',
|
||||
tags: [],
|
||||
|
|
@ -78,7 +78,7 @@ export const mockPipelineData = {
|
|||
fullyQualifiedName: 'sample_airflow.snowflake_etl.assert_table_exists',
|
||||
description: 'Assert if a table exists',
|
||||
sourceUrl:
|
||||
'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists',
|
||||
'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists',
|
||||
downstreamTasks: [],
|
||||
taskType: 'HiveOperator',
|
||||
tags: [],
|
||||
|
|
|
|||
|
|
@ -45,7 +45,7 @@ export const mockTextBasedSummaryTitleResponse = (
|
|||
export const mockLinkBasedSummaryTitleResponse = (
|
||||
<Link
|
||||
target="_blank"
|
||||
to="http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task">
|
||||
to="http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_address_task">
|
||||
<div className="d-flex items-center">
|
||||
<Text
|
||||
className="entity-title text-link-color font-medium m-r-xss"
|
||||
|
|
@ -127,7 +127,7 @@ export const mockEntityDataWithoutNesting: Task[] = [
|
|||
description:
|
||||
'Airflow operator to perform ETL and generate dim_address table',
|
||||
sourceUrl:
|
||||
'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=dim_address_task',
|
||||
'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=dim_address_task',
|
||||
downstreamTasks: ['assert_table_exists'],
|
||||
taskType: 'PrestoOperator',
|
||||
tags: [],
|
||||
|
|
@ -138,7 +138,7 @@ export const mockEntityDataWithoutNesting: Task[] = [
|
|||
fullyQualifiedName: 'sample_airflow.dim_address_etl.assert_table_exists',
|
||||
description: 'Assert if a table exists',
|
||||
sourceUrl:
|
||||
'http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists',
|
||||
'http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists',
|
||||
downstreamTasks: [],
|
||||
taskType: 'HiveOperator',
|
||||
tags: [],
|
||||
|
|
@ -169,7 +169,7 @@ export const mockEntityDataWithoutNestingResponse: BasicEntityInfo[] = [
|
|||
title: (
|
||||
<Link
|
||||
target="_blank"
|
||||
to="http://localhost:8080/taskinstance/list/?flt1_dag_id_equals=assert_table_exists">
|
||||
to="http://localhost:8080/taskinstance/list/?_flt_3_dag_id=assert_table_exists">
|
||||
<div className="d-flex items-center">
|
||||
<Text
|
||||
className="entity-title text-link-color font-medium m-r-xss"
|
||||
|
|
|
|||
Loading…
Reference in a new issue