mirror of
https://github.com/open-metadata/OpenMetadata
synced 2026-05-24 09:39:11 +00:00
* RDF, cleanup relations and remove unnecessary bindings, add distributed mode for RDF reindex * Update generated TypeScript types * Address comments from copilot * Update generated TypeScript types * fix test issues * Fix minor UI bugs * Add the missing filters * Fix RDF export API error * Add export functionality * Fix ui-checkstyle * Fix java checkstyle * Fix unit tests * Fix and increase the coverage for KnowledgeGraph.spec.ts * Fix tests * Remove rdf as default in playwright and local docker * fix ui-checkstyle * Address comments * Potential fix for pull request finding 'CodeQL / Artifact poisoning' Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> * Address copilot comments * Address copilot comments * FIx tests * FIx docker * Update openmetadata-service/src/main/java/org/openmetadata/service/apps/bundles/rdf/distributed/DistributedRdfIndexCoordinator.java Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> * Address copilot review comments: license headers, JSON escaping, type safety, border-color, stop semantics Agent-Logs-Url: https://github.com/open-metadata/OpenMetadata/sessions/c026e52e-162b-4c9a-9874-43791d4aaac1 Co-authored-by: harshach <38649+harshach@users.noreply.github.com> * Show error toast for unsupported export format in KnowledgeGraph Agent-Logs-Url: https://github.com/open-metadata/OpenMetadata/sessions/c026e52e-162b-4c9a-9874-43791d4aaac1 Co-authored-by: harshach <38649+harshach@users.noreply.github.com> * Fix docker * Fix docker for playwright * Fix docker for playwright * Fix tests * Fix tests * Fix docker * Fix docker * Fix glossary and pagination spec flakiness * update the missing translations * Fix docker * Fix docker * Fix integration test * Fix fuseki not starting * Fixed the run local docker script * worked on comments * Fix flakiness in knowledge graph tests * Fix checkstyle --------- Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com> Co-authored-by: Aniket Katkar <aniketkatkar97@gmail.com> Co-authored-by: Copilot Autofix powered by AI <62310815+github-advanced-security[bot]@users.noreply.github.com> Co-authored-by: Copilot <175728472+Copilot@users.noreply.github.com> Co-authored-by: copilot-swe-agent[bot] <198982749+Copilot@users.noreply.github.com> Co-authored-by: harshach <38649+harshach@users.noreply.github.com>
188 lines
5.8 KiB
Python
188 lines
5.8 KiB
Python
import os
|
|
import time
|
|
from pprint import pprint
|
|
from typing import Optional, Tuple
|
|
|
|
import requests
|
|
from metadata.utils.logger import log_ansi_encoded_string
|
|
|
|
|
|
HEADER_JSON = {"Content-Type": "application/json"}
|
|
REQUESTS_TIMEOUT = 60 * 5
|
|
AIRFLOW_URL = "http://localhost:8080"
|
|
USERNAME = "admin"
|
|
PASSWORD = "admin"
|
|
|
|
_access_token: Optional[str] = None
|
|
_last_dag_logs_supported: Optional[bool] = None
|
|
|
|
|
|
def get_env_int(name: str, default: int) -> int:
|
|
value = os.getenv(name)
|
|
if value is None:
|
|
return default
|
|
|
|
try:
|
|
return int(value)
|
|
except ValueError:
|
|
log_ansi_encoded_string(
|
|
message=f"Invalid integer for {name}: {value}. Falling back to {default}."
|
|
)
|
|
return default
|
|
|
|
|
|
def get_access_token() -> str:
|
|
"""Get OAuth access token for Airflow 3.x API."""
|
|
global _access_token
|
|
|
|
if _access_token:
|
|
return _access_token
|
|
|
|
response = requests.post(
|
|
f"{AIRFLOW_URL}/auth/token",
|
|
headers={"Content-Type": "application/json"},
|
|
json={"username": USERNAME, "password": PASSWORD},
|
|
timeout=10
|
|
)
|
|
|
|
if response.status_code == 201:
|
|
data = response.json()
|
|
_access_token = data.get("access_token")
|
|
return _access_token
|
|
else:
|
|
raise Exception(f"Failed to get access token: {response.status_code} - {response.text}")
|
|
|
|
|
|
def get_auth_headers() -> dict:
|
|
"""Get authorization headers with Bearer token."""
|
|
token = get_access_token()
|
|
return {
|
|
"Authorization": f"Bearer {token}",
|
|
"Content-Type": "application/json"
|
|
}
|
|
|
|
|
|
def get_last_run_info() -> Tuple[str, str]:
|
|
"""
|
|
Make sure we can pick up the latest run info
|
|
"""
|
|
max_retries = get_env_int("VALIDATE_COMPOSE_DAG_RUN_RETRIES", 30)
|
|
poll_interval_seconds = get_env_int("VALIDATE_COMPOSE_DAG_RUN_POLL_SECONDS", 5)
|
|
retries = 0
|
|
|
|
while retries < max_retries:
|
|
try:
|
|
res = requests.get(
|
|
f"{AIRFLOW_URL}/api/v2/dags/sample_data/dagRuns",
|
|
headers=get_auth_headers(),
|
|
timeout=REQUESTS_TIMEOUT
|
|
)
|
|
res.raise_for_status()
|
|
runs = res.json()
|
|
dag_runs = runs.get("dag_runs", [])
|
|
|
|
if dag_runs and len(dag_runs) > 0:
|
|
# Sort by logical_date descending to get the latest run
|
|
dag_runs_sorted = sorted(
|
|
dag_runs,
|
|
key=lambda x: x.get("logical_date", ""),
|
|
reverse=True
|
|
)
|
|
dag_run = dag_runs_sorted[0]
|
|
dag_run_id = dag_run.get("dag_run_id")
|
|
state = dag_run.get("state", "").lower()
|
|
|
|
if dag_run_id:
|
|
log_ansi_encoded_string(
|
|
message=f"Found DAG run: {dag_run_id} with state: {state}"
|
|
)
|
|
return dag_run_id, state
|
|
else:
|
|
log_ansi_encoded_string(message="No DAG runs found yet, waiting...")
|
|
|
|
except requests.exceptions.HTTPError as e:
|
|
if e.response.status_code == 401:
|
|
global _access_token
|
|
_access_token = None
|
|
log_ansi_encoded_string(message=f"Error getting DAG runs: {e}")
|
|
|
|
time.sleep(poll_interval_seconds)
|
|
retries += 1
|
|
|
|
return None, None
|
|
|
|
|
|
def print_last_run_logs() -> None:
|
|
"""
|
|
Show the logs
|
|
"""
|
|
global _last_dag_logs_supported
|
|
|
|
try:
|
|
response = requests.get(
|
|
f"{AIRFLOW_URL}/api/v2/openmetadata/last_dag_logs?dag_id=sample_data&task_id=ingest_using_recipe",
|
|
headers=get_auth_headers(),
|
|
timeout=REQUESTS_TIMEOUT
|
|
)
|
|
|
|
if response.status_code == 404:
|
|
if _last_dag_logs_supported is not False:
|
|
log_ansi_encoded_string(
|
|
message="Airflow last_dag_logs route is unavailable. Skipping task log fetch."
|
|
)
|
|
_last_dag_logs_supported = False
|
|
return
|
|
|
|
response.raise_for_status()
|
|
_last_dag_logs_supported = True
|
|
pprint(response.text)
|
|
except Exception as e:
|
|
log_ansi_encoded_string(message=f"Could not fetch logs: {e}")
|
|
|
|
|
|
def main():
|
|
max_retries = get_env_int("VALIDATE_COMPOSE_MAX_RETRIES", 15)
|
|
retry_interval_seconds = get_env_int("VALIDATE_COMPOSE_RETRY_INTERVAL_SECONDS", 10)
|
|
retries = 0
|
|
|
|
while retries < max_retries:
|
|
dag_run_id, state = get_last_run_info()
|
|
|
|
if not dag_run_id:
|
|
log_ansi_encoded_string(
|
|
message="Waiting for DAG run to start...",
|
|
)
|
|
time.sleep(retry_interval_seconds)
|
|
retries += 1
|
|
continue
|
|
|
|
if state == "success":
|
|
log_ansi_encoded_string(message=f"DAG run: [{dag_run_id}, {state}]")
|
|
print_last_run_logs()
|
|
log_ansi_encoded_string(message="Sample data ingestion completed successfully!")
|
|
break
|
|
elif state in ["running", "queued"]:
|
|
log_ansi_encoded_string(
|
|
message=f"DAG run [{dag_run_id}] is {state}. Waiting for completion...",
|
|
)
|
|
print_last_run_logs()
|
|
time.sleep(retry_interval_seconds)
|
|
retries += 1
|
|
elif state == "failed":
|
|
log_ansi_encoded_string(message=f"DAG run [{dag_run_id}] FAILED!")
|
|
print_last_run_logs()
|
|
raise Exception(f"Sample data ingestion failed. DAG run state: {state}")
|
|
else:
|
|
log_ansi_encoded_string(
|
|
message=f"Waiting for sample data ingestion. Current state: {state}",
|
|
)
|
|
print_last_run_logs()
|
|
time.sleep(retry_interval_seconds)
|
|
retries += 1
|
|
|
|
if retries == max_retries:
|
|
raise Exception("Max retries exceeded. Sample data ingestion was not successful.")
|
|
|
|
|
|
if __name__ == "__main__":
|
|
main()
|