OpenMetadata/ingestion/tests/unit/utils
IceS2 ef9017a896
fix(logging): synchronous shutdown captures full streamable log tail (#28273)
* fix(logging): synchronous shutdown captures full streamable log tail

V1's close() returned before the daemon sender delivered /close, leaving
the server-side multipart upload unfinalized — workflow tail (Execution
Time Summary, Workflow Summary, "Workflow finished in time") never
reached S3, only partial.txt did.

Redesign:
- Single Queue + single daemon worker; threading.Event for control
  plane instead of a sentinel on the data queue.
- flush(timeout) / shutdown(timeout) are synchronous: caller blocks
  until queued records are delivered or deadline.
- atexit hook covers callers that forget to shut down.
- Shutdown ships a final metrics line + synchronous /close POST.
  On join timeout, force-stops the worker and retries on a fresh REST
  so /close is never starved by an in-flight POST.
- log_server_version() decoupled from validate_versions() so the
  "client running" line is captured by the handler instead of being
  emitted before setup.

Counters (shipped, failed, dropped, timed_out) are logged at shutdown
so silent loss becomes loud.

* fix(logging): close flush() TOCTOU between dequeue and POST start

flush() polled (buffer.empty() AND not _post_in_flight). The worker
set _post_in_flight inside _post_batch, which left a microsecond gap
after buffer.get() returned (queue empty) but before _post_batch ran
(flag still clear). A flush() that landed in that gap could conclude
"drained" while the worker held a batch about to ship.

Not a data-loss bug in our caller (shutdown() always follows flush()
with worker.join()), but flush() is public and the contract should
hold standalone.

Set _post_in_flight inside _collect_batch immediately after the first
successful get(); the gap shrinks to one Python line. The flag stays
clear while the worker idles in buffer.get(timeout=...), so flush()
still returns fast when there's no work.

* refactor(logging): name magic timeouts in streamable handler

Hardcoded numbers in flush() and shutdown() were not self-documenting
and tied callers to undocumented invariants. Promoted to named class
constants:

- FLUSH_DEFAULT_SEC (5.0): flush() default deadline.
- FLUSH_POLL_SEC (0.05): how often flush() rechecks state.
- FORCE_STOP_JOIN_SEC (2.0): secondary worker join after force-stop.

flush(timeout) now uses the Optional[None] sentinel pattern matching
shutdown(timeout). Added a comment to the poll-sleep explaining the
two-conditions-AND requirement that motivates it.

* fix(logging): close worker/main _client race + mock log_server_version

Two issues surfaced by the PR review:

1. Workflow.__init__ now calls metadata.log_server_version(), which hits
   GET /system/version when not mocked. conftest pre-mocks the sibling
   validate_versions; add log_server_version so unit tests don't
   blow up on collection.

2. After force-stop, shutdown() reassigned self._client = REST(...).
   The worker's finally block (which closes self._client) could then
   close the freshly-created client under the main thread's metrics
   and /close POSTs. Use a local 'post_close_client' instead so the
   worker's session and the main thread's never alias.

Also document on shutdown(timeout=...) that the post-stop metrics and
/close POSTs carry their own HTTP timeouts, so total wall-time can be
roughly timeout + 32s in pathological cases.
2026-05-20 11:28:52 +02:00
..
__init__.py Part of #12998 - Prep Stored Procedures Skeleton for Snowflake (#13121) 2023-09-12 14:25:42 +02:00
test_collaborative_super.py Fix 18434: feat(statistics-profiler): use statistics tables to profile trino tables (#18433) 2024-11-07 18:37:31 +01:00
test_credentials.py Fix Snowflake and BigQuery CLI e2e tests (#24217) 2025-11-10 10:16:20 +01:00
test_datalake.py fix(datalake): _resolve_col_type uses frequency-first majority vote (#28093) 2026-05-13 16:01:19 +00:00
test_deprecation.py chore(ingestion): migrate to ruff for format + isort + unused-import (#27739) 2026-04-27 10:05:28 +02:00
test_fqn_special_chars.py chore(ingestion): migrate to ruff for format + isort + unused-import (#27739) 2026-04-27 10:05:28 +02:00
test_helpers.py chore(ingestion): drop pylint, expand ruff (#27774) 2026-04-28 07:21:59 +02:00
test_logger.py chore(ingestion): migrate to ruff for format + isort + unused-import (#27739) 2026-04-27 10:05:28 +02:00
test_memory_limit.py chore(ingestion): drop pylint, expand ruff (#27774) 2026-04-28 07:21:59 +02:00
test_service_spec.py chore(ingestion): migrate to ruff for format + isort + unused-import (#27739) 2026-04-27 10:05:28 +02:00
test_source_hash.py chore(ingestion): migrate to ruff for format + isort + unused-import (#27739) 2026-04-27 10:05:28 +02:00
test_status_warning_handler.py chore(ingestion): drop pylint, expand ruff (#27774) 2026-04-28 07:21:59 +02:00
test_stored_procedures.py chore(ingestion): migrate to ruff for format + isort + unused-import (#27739) 2026-04-27 10:05:28 +02:00
test_streamable_logger.py fix(logging): synchronous shutdown captures full streamable log tail (#28273) 2026-05-20 11:28:52 +02:00
test_tag_utils.py chore(ingestion): migrate to ruff for format + isort + unused-import (#27739) 2026-04-27 10:05:28 +02:00