mirror of
https://github.com/open-metadata/OpenMetadata
synced 2026-05-24 09:39:11 +00:00
* fix(logging): synchronous shutdown captures full streamable log tail V1's close() returned before the daemon sender delivered /close, leaving the server-side multipart upload unfinalized — workflow tail (Execution Time Summary, Workflow Summary, "Workflow finished in time") never reached S3, only partial.txt did. Redesign: - Single Queue + single daemon worker; threading.Event for control plane instead of a sentinel on the data queue. - flush(timeout) / shutdown(timeout) are synchronous: caller blocks until queued records are delivered or deadline. - atexit hook covers callers that forget to shut down. - Shutdown ships a final metrics line + synchronous /close POST. On join timeout, force-stops the worker and retries on a fresh REST so /close is never starved by an in-flight POST. - log_server_version() decoupled from validate_versions() so the "client running" line is captured by the handler instead of being emitted before setup. Counters (shipped, failed, dropped, timed_out) are logged at shutdown so silent loss becomes loud. * fix(logging): close flush() TOCTOU between dequeue and POST start flush() polled (buffer.empty() AND not _post_in_flight). The worker set _post_in_flight inside _post_batch, which left a microsecond gap after buffer.get() returned (queue empty) but before _post_batch ran (flag still clear). A flush() that landed in that gap could conclude "drained" while the worker held a batch about to ship. Not a data-loss bug in our caller (shutdown() always follows flush() with worker.join()), but flush() is public and the contract should hold standalone. Set _post_in_flight inside _collect_batch immediately after the first successful get(); the gap shrinks to one Python line. The flag stays clear while the worker idles in buffer.get(timeout=...), so flush() still returns fast when there's no work. * refactor(logging): name magic timeouts in streamable handler Hardcoded numbers in flush() and shutdown() were not self-documenting and tied callers to undocumented invariants. Promoted to named class constants: - FLUSH_DEFAULT_SEC (5.0): flush() default deadline. - FLUSH_POLL_SEC (0.05): how often flush() rechecks state. - FORCE_STOP_JOIN_SEC (2.0): secondary worker join after force-stop. flush(timeout) now uses the Optional[None] sentinel pattern matching shutdown(timeout). Added a comment to the poll-sleep explaining the two-conditions-AND requirement that motivates it. * fix(logging): close worker/main _client race + mock log_server_version Two issues surfaced by the PR review: 1. Workflow.__init__ now calls metadata.log_server_version(), which hits GET /system/version when not mocked. conftest pre-mocks the sibling validate_versions; add log_server_version so unit tests don't blow up on collection. 2. After force-stop, shutdown() reassigned self._client = REST(...). The worker's finally block (which closes self._client) could then close the freshly-created client under the main thread's metrics and /close POSTs. Use a local 'post_close_client' instead so the worker's session and the main thread's never alias. Also document on shutdown(timeout=...) that the post-stop metrics and /close POSTs carry their own HTTP timeouts, so total wall-time can be roughly timeout + 32s in pathological cases. |
||
|---|---|---|
| .. | ||
| __init__.py | ||
| test_collaborative_super.py | ||
| test_credentials.py | ||
| test_datalake.py | ||
| test_deprecation.py | ||
| test_fqn_special_chars.py | ||
| test_helpers.py | ||
| test_logger.py | ||
| test_memory_limit.py | ||
| test_service_spec.py | ||
| test_source_hash.py | ||
| test_status_warning_handler.py | ||
| test_stored_procedures.py | ||
| test_streamable_logger.py | ||
| test_tag_utils.py | ||