OpenMetadata/ingestion/operators/docker/exit_handler.py
IceS2 e9c87c6adb
chore(ingestion): drop pylint, expand ruff (#27774)
* chore(ingestion): drop pylint, expand ruff to Stage 2c

Replace pylint with a coherent ruff-only stack (Stage 2c of the modernize
roadmap). Pylint is dropped from dev deps and CI workflows; ruff selected
ruleset expanded to ~22 families covering style, bug catchers, hygiene,
and the pylint port (PLE/PLC/PLW/PLR with the noisy "too-many-X"
complexity caps + magic-value disabled).

What's selected (with rationale in pyproject.toml):
  E, W, F, I, N         — style + correctness baseline + naming
  UP                    — pyupgrade (py>=3.10 modernizations)
  B, C4, C90, RET, SIM, TRY  — bug catchers
  PIE, ICN, T20, TC, TID, PTH, PERF  — hygiene
  PLE, PLC, PLW, PLR    — pylint port (PLR complexity caps ignored)
  RUF                   — ruff-native (incl. RUF100 unused-noqa)

What's removed:
  - .pylintrc (root) — duplicate of the ingestion pylint config
  - [tool.pylint.*] block in ingestion/pyproject.toml (~140 lines)
  - ingestion/plugins/{print_checker,import_checker}.py + tests + README
    (replaced by built-in T20 + TID251 banned-api respectively)
  - pylint dep from ingestion/setup.py and openmetadata-airflow-apis/pyproject.toml
  - `make lint` Makefile target + the pylint invocation in py_format_check
  - dead pylint TODO comment + ignored test entry in noxfile.py

Cwd-stable config: ruff is invoked both from the repo root (pre-commit,
CI) and from ingestion/ (`make py_format_check`). The `src`,
`extend-exclude`, and per-file-ignores entries are listed twice — once
relative to ingestion/ and once with the `ingestion/` prefix — so
first-party isort detection and exclusions match in both invocations.

Grandfathering: ran `ruff check --add-noqa` once + format-stable
iteration. ~12,130 noqa directives across ~1,400 files. Cleanup is
deferred to follow-up PRs that drop noqas one rule at a time.

Documentation sweep: replaced `make lint` references in CLAUDE.md,
AGENTS.md, DEVELOPER.md, copilot-instructions, and 6 SKILL files with
the apply+verify shape `make py_format && make py_format_check`.
`make py_format` is NOT a strict superset of pylint — it only applies
auto-fixable violations; `make py_format_check` catches the rest.

Basedpyright baseline regenerated: ruff format reflowed multi-line
signatures in ~70 files, shifting type-error column positions. The
basedpyright baseline matches by (file path, error code, range), so
column shifts caused 19 entries to mis-align. Net diff is small
(154 lines in/out of the 13MB baseline.json) — purely positional.

Verified locally:
  - make py_format_check         → All checks passed
  - nox --no-venv -s static-checks → 0 errors, 0 warnings, 0 notes

* chore(ingestion): finish ruff swap — nox lint session + skill docs

Three remaining stale-tooling references after Stage 2c:

  - `ingestion/noxfile.py` `lint` session was still calling `black --check`,
    `isort --check-only`, `pycln --diff`. Those tools aren't installed
    anywhere (we dropped them from dev deps). Replace with the ruff
    equivalents that mirror `make py_format_check`.
  - `skills/standards/code_style.md`: stack listed as `black + isort +
    pycln`; line length claimed 88 (black default). Both wrong: stack is
    ruff, line length is 120.
  - `skills/connector-building/SKILL.md`: `make py_format` comment said
    `# black + isort + pycln`. Same swap.

* chore(ingestion): keep main's baseline + globally ignore TRY400

Per gitar-bot's review on PR #27774:

1. Main's PR #27728 promoted ~60 `logger.warning()` → `logger.error()`
   inside `except` blocks. Those changes landed on main with their own
   baseline updates. Our PR doesn't promote anything — the merge from
   origin/main brought those `error` calls along with their baseline
   entries.

   The bot interpreted the `# noqa: TRY400` we added next to those lines
   as us silencing the rule case-by-case. Cleaner: globally ignore
   TRY400 in pyproject.toml, with a comment explaining why the codebase's
   `logger.error(...)` + separate `logger.debug(traceback.format_exc())`
   pattern is intentional. Strip ~430 per-line `# noqa: TRY400` markers
   from source.

2. Document that `S101` in `per-file-ignores` is a forward-looking
   entry — flake8-bandit (`S`) is not yet selected, so the rule is
   no-op today; the entry stays so when `S` lands later, tests don't
   immediately error.

Reverts the platform pin and Linux Docker–generated baseline. Keep
main's baseline intact and let CI surface the exact column-shifted
entries; the team will decide whether to fix in-place (revert format
on affected files) or add per-line `# pyright: ignore` markers.

* chore(ingestion): regen baseline for new connector type debt

Main's baseline was stale relative to recently-added connectors
(McpConnection, CustomDriveConnection) that lack common attributes
like `hostPort`, `database`, `catalog` etc. — all sites that access
those attributes via the union-typed `serviceConnection.root.config`
fire `reportAttributeAccessIssue` errors that aren't baselined.

71 errors + 58 warnings absorbed. Local macOS regen; pushing to see
CI's drift count. Per the basedpyright-baseline-and-ci PR experience,
macOS↔Linux column drift on this size of regen has historically been
1-7 residuals.
2026-04-28 07:21:59 +02:00

593 lines
23 KiB
Python

# Copyright 2021 Collate
# Licensed under the Apache License, Version 2.0 (the "License");
# you may not use this file except in compliance with the License.
# You may obtain a copy of the License at
# http://www.apache.org/licenses/LICENSE-2.0
# Unless required by applicable law or agreed to in writing, software
# distributed under the License is distributed on an "AS IS" BASIS,
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
# See the License for the specific language governing permissions and
# limitations under the License.
"""
Exit handler safety net for Kubernetes ingestion pipeline jobs
"""
import logging
import os
from datetime import datetime
from typing import Optional
import yaml
from kubernetes import client, config
from kubernetes.client import V1Pod
from pydantic import BaseModel
from metadata.generated.schema.entity.services.ingestionPipelines.ingestionPipeline import (
PipelineState,
PipelineStatus,
)
from metadata.generated.schema.entity.services.ingestionPipelines.status import (
IngestionStatus,
StackTraceError,
StepSummary,
)
from metadata.generated.schema.metadataIngestion.application import (
OpenMetadataApplicationConfig,
)
from metadata.generated.schema.metadataIngestion.workflow import (
OpenMetadataWorkflowConfig,
)
from metadata.generated.schema.type.basic import Timestamp
from metadata.ingestion.ometa.ometa_api import OpenMetadata
from metadata.utils.logger import ometa_logger, set_loggers_level
class FailureDiagnostics(BaseModel):
"""
Model to represent diagnostic information gathered from failed workflow pods.
Attributes:
pod_logs: Logs from the main container, or None if unavailable
pod_description: Detailed pod status and event information, or None if unavailable
has_diagnostics: True if any diagnostic information was successfully gathered
"""
pod_logs: Optional[str] = None # noqa: UP045
pod_description: Optional[str] = None # noqa: UP045
@property
def has_diagnostics(self) -> bool:
"""Check if any diagnostic information is available."""
return self.pod_logs is not None or self.pod_description is not None
@property
def summary(self) -> str:
"""Get a brief summary of available diagnostics."""
parts = []
if self.pod_logs:
lines = len(self.pod_logs.splitlines())
parts.append(f"logs ({lines} lines)")
if self.pod_description:
parts.append("pod description")
if parts:
return f"Available diagnostics: {', '.join(parts)}"
return "No diagnostics available"
SUCCESS_STATES = {"Succeeded", "success"}
TERMINAL_PIPELINE_STATES = {
PipelineState.success,
PipelineState.failed,
PipelineState.partialSuccess,
}
logger = ometa_logger()
def get_kubernetes_client() -> Optional[client.CoreV1Api]: # noqa: UP045
"""
Initialize and return Kubernetes client.
First tries in-cluster config, then falls back to local kubeconfig.
This function is fault-tolerant and will not raise exceptions.
"""
try:
# Try in-cluster config first (when running inside a pod)
config.load_incluster_config()
return client.CoreV1Api()
except Exception as in_cluster_error:
try:
# Fall back to local kubeconfig
config.load_kube_config()
return client.CoreV1Api()
except Exception as kubeconfig_error:
logger.warning(
f"Failed to initialize Kubernetes client - in-cluster: {in_cluster_error}, kubeconfig: {kubeconfig_error}"
)
return None
except Exception as unexpected_error: # noqa: B025
logger.error(f"Unexpected error initializing Kubernetes client: {unexpected_error}")
return None
LABEL_JOB_NAME = "job-name"
LABEL_OMJOB_NAME = "omjob.pipelines.openmetadata.org/name"
LABEL_POD_TYPE = "omjob.pipelines.openmetadata.org/pod-type"
LABEL_APP_RUN_ID = "app.kubernetes.io/run-id"
POD_TYPE_MAIN = "main"
def find_main_pod(
k8s_client: client.CoreV1Api,
job_name: Optional[str], # noqa: UP045
namespace: str,
pipeline_run_id: Optional[str] = None, # noqa: UP045
) -> Optional[V1Pod]: # noqa: UP045
"""
Find the main ingestion pod for the given Kubernetes job.
This function is fault-tolerant and will not raise exceptions.
"""
try:
if not namespace or (not job_name and not pipeline_run_id):
logger.warning(
"Invalid parameters: job_name=%s, pipeline_run_id=%s, namespace=%s",
job_name,
pipeline_run_id,
namespace,
)
return None
label_selectors = []
if job_name:
label_selectors.extend(
[
f"{LABEL_JOB_NAME}={job_name}",
f"{LABEL_OMJOB_NAME}={job_name},{LABEL_POD_TYPE}={POD_TYPE_MAIN}",
f"{LABEL_OMJOB_NAME}={job_name}",
]
)
if pipeline_run_id:
label_selectors.extend(
[
f"{LABEL_APP_RUN_ID}={pipeline_run_id},{LABEL_POD_TYPE}={POD_TYPE_MAIN}",
f"{LABEL_APP_RUN_ID}={pipeline_run_id}",
]
)
for label_selector in label_selectors:
try:
pods = k8s_client.list_namespaced_pod(namespace=namespace, label_selector=label_selector)
except Exception as list_error:
logger.warning(f"Failed to list pods with selector '{label_selector}': {list_error}")
continue
if not pods or not pods.items:
continue
for pod in pods.items:
try:
if pod.metadata and pod.metadata.name:
logger.info(f"Found main pod: {pod.metadata.name} (selector: {label_selector})")
return pod
except Exception as pod_error:
logger.warning(f"Error checking pod metadata: {pod_error}")
continue
logger.warning(f"No main pod found for job {job_name}")
return None # noqa: TRY300
except Exception as e:
logger.error(f"Failed to find main pod for job {job_name}: {e}")
return None
def get_main_pod_logs(k8s_client: client.CoreV1Api, main_pod: V1Pod, namespace: str) -> Optional[str]: # noqa: UP045
"""
Fetch logs from the main ingestion pod.
This function is fault-tolerant and will not raise exceptions.
Args:
k8s_client: Kubernetes API client
main_pod: The main pod object
namespace: Kubernetes namespace
Returns:
Main pod logs as string, or None if unable to fetch
"""
try:
if not main_pod or not main_pod.metadata or not main_pod.metadata.name:
logger.warning("Invalid pod object provided for log fetching")
return None
pod_name = main_pod.metadata.name
logger.info(f"Fetching logs from pod '{pod_name}'")
logs = k8s_client.read_namespaced_pod_log(name=pod_name, namespace=namespace, container="main", tail_lines=500)
if logs:
logger.info(f"Successfully fetched {len(logs.splitlines())} lines of logs")
return logs
else: # noqa: RET505
logger.info("No logs found for pod")
return None
except Exception as e:
logger.warning(
f"Failed to fetch pod logs from {main_pod.metadata.name if main_pod and main_pod.metadata else 'unknown'}: {e}"
)
return None
def get_main_pod_description(k8s_client: client.CoreV1Api, main_pod: V1Pod, namespace: str) -> Optional[str]: # noqa: C901, UP045
"""
Get detailed pod description for the main ingestion pod.
This function is fault-tolerant and will not raise exceptions.
Args:
k8s_client: Kubernetes API client
main_pod: The main pod object
namespace: Kubernetes namespace
Returns:
Pod description string, or None if unable to fetch
"""
try:
if not main_pod or not main_pod.metadata or not main_pod.metadata.name:
logger.warning("Invalid pod object provided for description")
return None
# Get detailed pod information
pod_name = main_pod.metadata.name
logger.info(f"Getting pod description for '{pod_name}'")
# Build detailed pod description with safe access to attributes
description_parts = []
try:
description_parts.append(f"Pod: {pod_name}")
description_parts.append(f"Namespace: {namespace}")
# Safely access pod status
if main_pod.status:
if hasattr(main_pod.status, "phase") and main_pod.status.phase:
description_parts.append(f"Status: {main_pod.status.phase}")
if hasattr(main_pod.status, "reason") and main_pod.status.reason:
description_parts.append(f"Reason: {main_pod.status.reason}")
if hasattr(main_pod.status, "message") and main_pod.status.message:
description_parts.append(f"Message: {main_pod.status.message}")
# Safely log container statuses
if hasattr(main_pod.status, "container_statuses") and main_pod.status.container_statuses:
description_parts.append("\nContainer Statuses:")
for container_status in main_pod.status.container_statuses:
try:
status_line = f" {container_status.name}: Ready={container_status.ready}, RestartCount={container_status.restart_count}"
description_parts.append(status_line)
if container_status.state:
if container_status.state.waiting:
description_parts.append(
f" State: Waiting - {container_status.state.waiting.reason}"
)
elif container_status.state.running:
description_parts.append(
f" State: Running - Started: {container_status.state.running.started_at}"
)
elif container_status.state.terminated:
description_parts.append(
f" State: Terminated - Reason: {container_status.state.terminated.reason}, ExitCode: {container_status.state.terminated.exit_code}"
)
except Exception as container_error:
logger.warning(f"Error processing container status: {container_error}")
continue
except Exception as status_error:
logger.warning(f"Error processing pod status: {status_error}")
# Try to get pod events but don't fail if unavailable
try:
events = k8s_client.list_namespaced_event(
namespace=namespace, field_selector=f"involvedObject.name={pod_name}"
)
if events and events.items:
description_parts.append(f"\nEvents ({len(events.items)} found):")
for event in events.items[-10:]: # Limit to last 10 events (most recent)
try:
event_time = event.last_timestamp or event.first_timestamp or "Unknown"
description_parts.append(f" {event_time} - {event.type}: {event.reason}")
if event.message:
description_parts.append(f" {event.message}")
except Exception as event_error:
logger.warning(f"Error processing event: {event_error}")
continue
else:
description_parts.append("\nEvents: No events found")
except Exception as e:
logger.warning(f"Could not fetch events for pod {pod_name}: {e}")
description_parts.append(
"\nEvents: Unable to fetch events. Make sure the ingestion service account has 'events' permissions."
)
description = "\n".join(description_parts)
logger.info("Pod description created successfully")
return description if description_parts else None # noqa: TRY300
except Exception as e:
logger.error(f"Failed to get pod description: {e}")
return None
def create_pod_diagnostics(main_pod_logs: Optional[str], pod_description: Optional[str]) -> StepSummary: # noqa: UP045
"""
Create a StepSummary with pod diagnostics for failed workflows.
"""
# Combine logs and description for stackTrace
summary_parts = []
if pod_description:
summary_parts.append("Pod Description:\n" + pod_description)
if main_pod_logs:
summary_parts.append("\nPod Logs: \n" + main_pod_logs)
stack_trace = "\n".join(summary_parts) if summary_parts else "No diagnostics available"
return StepSummary(
name="Pod Diagnostics",
records=0,
errors=1,
failures=[
StackTraceError(
name="Main Container Diagnostics",
error="Kubernetes job failed - check logs for details",
stackTrace=stack_trace,
)
],
)
def create_workflow_config(config: str, pipeline_run_id: str):
"""
Create appropriate workflow configuration based on config type.
Args:
config: Raw configuration string from environment
pipeline_run_id: Pipeline run identifier
Returns:
Union[OpenMetadataApplicationConfig, OpenMetadataWorkflowConfig]: Parsed workflow config
"""
raw_workflow_config = yaml.safe_load(config)
raw_workflow_config["pipelineRunId"] = pipeline_run_id
if raw_workflow_config.get("sourcePythonClass"):
logger.info("Creating OpenMetadataApplicationConfig")
return OpenMetadataApplicationConfig.model_validate(raw_workflow_config)
else: # noqa: RET505
logger.info("Creating OpenMetadataWorkflowConfig")
return OpenMetadataWorkflowConfig.model_validate(raw_workflow_config)
def get_or_create_pipeline_status(metadata: OpenMetadata, workflow_config) -> PipelineStatus:
"""
Retrieve existing pipeline status or create a new one.
Args:
metadata: OpenMetadata API client
workflow_config: Workflow configuration object
pipeline_run_id: Pipeline run identifier
Returns:
PipelineStatus: Existing or newly created pipeline status
"""
pipeline_status = metadata.get_pipeline_status(
workflow_config.ingestionPipelineFQN,
str(workflow_config.pipelineRunId.root),
)
if not pipeline_status:
logger.info("No existing pipeline status found, creating new one")
now_timestamp = Timestamp(int(datetime.now().timestamp() * 1000))
pipeline_status = PipelineStatus(
runId=str(workflow_config.pipelineRunId.root),
startDate=now_timestamp,
timestamp=now_timestamp,
)
else:
logger.info("Using existing pipeline status")
return pipeline_status
def gather_failure_diagnostics(
job_name: str | None,
namespace: str,
pipeline_run_id: Optional[str] = None, # noqa: UP045
) -> FailureDiagnostics:
"""
Gather diagnostic information from failed Kubernetes job pods.
This function is fault-tolerant and will never raise exceptions.
Args:
job_name: Name of the failed Kubernetes job
namespace: Kubernetes namespace
Returns:
FailureDiagnostics: Object containing pod logs and description, or empty if errors occur
"""
try:
logger.info(f"Kubernetes job failed, gathering diagnostics for: {job_name}")
# Try to get Kubernetes client - fail gracefully if unavailable
try:
k8s_client = get_kubernetes_client()
except Exception as e:
logger.warning(f"Failed to initialize Kubernetes client: {e}")
return FailureDiagnostics()
if not k8s_client:
logger.warning("Kubernetes client is not available - skipping diagnostics")
return FailureDiagnostics()
# Try to find main pod - fail gracefully if not found
try:
main_pod = find_main_pod(k8s_client, job_name, namespace, pipeline_run_id)
except Exception as e:
logger.warning(f"Failed to find main pod for job {job_name}: {e}")
return FailureDiagnostics()
if not main_pod:
logger.warning(f"Could not find main pod for job {job_name} - skipping diagnostics")
return FailureDiagnostics()
# Try to get pod logs - continue even if this fails
pod_logs = None
try:
pod_logs = get_main_pod_logs(k8s_client, main_pod, namespace)
except Exception as e:
logger.warning(f"Failed to fetch pod logs: {e}")
# Try to get pod description - continue even if this fails
pod_description = None
try:
pod_description = get_main_pod_description(k8s_client, main_pod, namespace)
if pod_description:
logger.info("Successfully fetched pod description")
except Exception as e:
logger.warning(f"Failed to fetch pod description: {e}")
# Create and return diagnostics object
diagnostics = FailureDiagnostics(pod_logs=pod_logs, pod_description=pod_description)
logger.info(diagnostics.summary)
return diagnostics # noqa: TRY300
except Exception as e:
# Catch-all for any unexpected errors - diagnostics should never break the exit handler
logger.error(f"Unexpected error while gathering diagnostics: {e}")
return FailureDiagnostics()
def update_pipeline_status_with_diagnostics(
pipeline_status: PipelineStatus,
diagnostics: FailureDiagnostics,
):
"""
Add diagnostic information to pipeline status.
This function is fault-tolerant and will not raise exceptions.
Args:
pipeline_status: Pipeline status object to update
diagnostics: Failure diagnostics containing pod logs and description
"""
try:
if not diagnostics.has_diagnostics:
logger.info("No diagnostics available to add to pipeline status")
return
error_step = create_pod_diagnostics(diagnostics.pod_logs, diagnostics.pod_description)
try:
if pipeline_status.status:
existing_steps = pipeline_status.status.root if hasattr(pipeline_status.status, "root") else []
existing_steps.append(error_step)
pipeline_status.status = IngestionStatus(existing_steps)
else:
pipeline_status.status = IngestionStatus([error_step])
logger.info(f"Successfully added diagnostics to pipeline status - {diagnostics.summary}")
except Exception as e:
logger.warning(f"Failed to update pipeline status with diagnostics: {e}")
except Exception as e:
logger.error(f"Failed to create pod diagnostics: {e}")
def main():
"""
Exit Handler entrypoint for Kubernetes jobs
The goal of this script is to be executed as a failure callback/exit handler
when a Kubernetes job processing fails. There are situations where the failure
cannot be directly controlled in the main ingestion process.
We don't want to initialize the full workflow as it might be failing
on the `__init__` call as well. We'll manually prepare the status sending
logic.
In this callback we just care about:
- instantiating the ometa client
- getting the IngestionPipeline FQN
- if exists, update with `Failed` status
"""
# Parse environment variables (adapted for K8s Job environment)
config = os.getenv("config") # noqa: SIM112
if not config:
error_msg = "Missing environment variable `config`. This is needed to configure the Workflow."
raise RuntimeError(error_msg)
pipeline_run_id = os.getenv("pipelineRunId") # noqa: SIM112
raw_pipeline_status = os.getenv("pipelineStatus") # noqa: SIM112
job_name = os.getenv("jobName") # Changed from workflowName to jobName # noqa: SIM112
namespace = os.getenv("namespace") # Changed from workflowNamespace to namespace # noqa: SIM112
logger.info(
f"Environment variables - pipelineRunId: {pipeline_run_id}, pipelineStatus: {raw_pipeline_status}, jobName: {job_name}, namespace: {namespace}"
)
# Create workflow configuration
workflow_config = create_workflow_config(config, pipeline_run_id)
# Initialize OpenMetadata client
metadata = OpenMetadata(config=workflow_config.workflowConfig.openMetadataServerConfig)
# Update pipeline status if all required fields are present
if workflow_config.ingestionPipelineFQN and pipeline_run_id and raw_pipeline_status:
logger.info(
f"Sending status to Ingestion Pipeline {workflow_config.ingestionPipelineFQN} for run ID {str(workflow_config.pipelineRunId.root)}" # noqa: RUF010
)
# Get or create pipeline status
pipeline_status = get_or_create_pipeline_status(metadata, workflow_config)
# If the workflow already reported a terminal status, the exit handler has nothing to do.
if pipeline_status.pipelineState in TERMINAL_PIPELINE_STATES:
logger.info(
f"Pipeline already in terminal state '{pipeline_status.pipelineState.value}', "
f"skipping exit handler update"
)
return
# Update pipeline status with final state
pipeline_status.endDate = Timestamp(int(datetime.now().timestamp() * 1000))
pipeline_status.pipelineState = (
PipelineState.failed if raw_pipeline_status not in SUCCESS_STATES else PipelineState.success
)
# Try to gather diagnostics for failed jobs - but never let this block status reporting
if raw_pipeline_status not in SUCCESS_STATES and job_name:
try:
logger.info("Attempting to gather failure diagnostics")
diagnostics = gather_failure_diagnostics(job_name, namespace, pipeline_run_id)
update_pipeline_status_with_diagnostics(pipeline_status, diagnostics)
except Exception as e:
# Log the error but continue - diagnostics should never prevent status updates
logger.error(f"Failed to gather or add diagnostics, continuing with status update: {e}")
# Send updated status to OpenMetadata - this is the critical operation that must succeed
try:
metadata.create_or_update_pipeline_status(workflow_config.ingestionPipelineFQN, pipeline_status)
logger.info(f"Successfully updated pipeline status to {pipeline_status.pipelineState.value}")
except Exception as e:
logger.error(f"CRITICAL: Failed to send pipeline status update to OpenMetadata: {e}")
raise
else:
logger.info("Missing required fields - not updating pipeline status")
if __name__ == "__main__":
set_loggers_level(logging.INFO)
try:
main()
except Exception as e:
logger.error(f"Exit handler failed: {e}")
raise