* fix(api): make closeStream idempotent when log storage is not configured
closeStream used to throw IllegalStateException("Log storage is not
configured") which the resource layer translates to a 500 response.
That made the contract surprising for callers: any defensive cleanup
path (exit handlers, retry logic, generic teardown) had to know in
advance whether streaming was configured before calling close, or eat
spurious server errors.
Closing a stream is naturally idempotent — same shape as DELETE on a
non-existent resource. When log storage is not configured, return
silently with a debug log so callers can call close() defensively
without checking state first.
Adds a unit test covering the no-op path.
* Add design spec for streamable logs stability fix
Captures the design discussion for fixing partial.txt and logs.txt
clobber bugs in S3LogStorage when ingestion runs hit idle gaps longer
than the 5-minute stream timeout.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* Add full design flow doc for streamable ingestion logs
End-to-end documentation of the streamable logs feature: architecture,
storage layout, run lifecycle, read paths, abandoned-run recovery,
configuration, concurrency model, and observability. Reflects the
post-fix design captured in the streamable-logs-stability spec.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* Add implementation plan for streamable-logs stability fix
Step-by-step TDD plan grouped into 8 PR-sized tasks: config schema
additions, per-stream lock, pendingFlush + merge-always flush, multipart
removal, sweeper rewrite, /close rewrite, read-path correction, and
integration tests.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* feat(log-storage): add config fields for streamable-logs stability fix
Adds streamTimeoutHours, cleanupIntervalMinutes, partialFlushIntervalMinutes,
earlyFlushWatermarkBytes, pendingFlushAlertAfterFailures. Deprecates
streamTimeoutMinutes in favor of streamTimeoutHours. Pure schema-only
change; no Java code consumes these fields yet.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(log-storage): add deprecated:true keyword and clarify watermark unit
Addresses code review on Task 1: project convention uses the JSON Schema
deprecated keyword alongside description annotation. Also clarifies that
earlyFlushWatermarkBytes default (5242880) equals 5 MB.
* feat(log-storage): wire new stability-fix config fields into S3LogStorage
Reads streamTimeoutHours, cleanupIntervalMinutes, partialFlushIntervalMinutes,
earlyFlushWatermarkBytes, pendingFlushAlertAfterFailures from
LogStorageConfiguration with sane defaults. No behavioral change yet —
values are stored but not consumed.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(log-storage): broaden streamTimeoutMinutes deprecation warning + drop FQN
Addresses code review on Task 2: warning now fires whenever
streamTimeoutMinutes is set (not only for values < 30 min), since the
field is deprecated for all deployments. Also imports java.lang.reflect.Field
in the test helper instead of using a fully-qualified name (CLAUDE.md
no-FQN rule).
* refactor(log-storage): add per-stream ReentrantLock for S3LogStorage
Introduces streamLocks map and acquire/release helpers. appendLogs,
writePartialLogsForStream, closeStream, and cleanupExpiredStreams all
serialize on the per-stream lock. No behavior change; locking is
pure mutual-exclusion at this point.
* fix(log-storage): close iterator.remove race in cleanupExpiredStreams
Move iterator.remove() inside the per-stream lock to prevent a window
where a concurrent appendLogs sees the still-present closed StreamContext
and writes to a closed stream. Also clarifies the comment on flush(fqn,runId)
ordering and documents that streamLocks accumulates monotonically until
Tasks 7 and 8 add cleanup.
* feat(log-storage): track pendingFlush queue and totalLinesAppended counter
Each appendLogs now also populates per-stream pendingFlush (lines awaiting
flush) and totalLinesAppended (monotonic logical line counter). State is
written but not yet consumed; the new flush logic in the next commit reads it.
* fix(log-storage): document thread-safety + lifecycle on Task 4 maps, add test
Addresses review on Task 4: documents that pendingFlush ArrayList values
may only be accessed under the per-stream lock; clarifies that
consecutiveFlushFailures is written and consumed in Task 5 (not just
consumed); aligns its type with AtomicInteger for consistency with
the other counters; adds a test for the trailing-newline trim path.
* fix(log-storage): merge-always partial.txt PUT and persist offset in S3 metadata
Replaces the old writePartialLogsForStream that skipped the read-merge step
when partialLogOffsets[streamKey] was 0 (the canonical 80MB->KB clobber bug).
The new flush always reads existing partial.txt, appends a snapshot of
pendingFlush, and PUTs with offset state in S3 user-defined metadata.
Also adds an early-flush watermark trigger so high-burst writes don't
pile up unbounded in pendingFlush.
Closes the partial.txt-clobber half of the streamable-logs-stability spec.
* fix(log-storage): replace task-number comments with intent-describing language
Addresses code review on Task 5: production code comments should describe
invariants, not the planning-doc task that filled the gap. Also clarifies
the parse-before-lock and the byte-counter atomicity assumption.
* refactor(log-storage): remove MultipartS3OutputStream, rewrite closeStream as server-side copy
appendLogs no longer initiates a multipart upload; bytes flow only through
pendingFlush -> partial.txt PUTs.
closeStream now: (1) drains pendingFlush via final partial.txt PUT,
(2) issues CopyObjectRequest from partial.txt to logs.txt server-side,
(3) deletes partial.txt and the .active marker, (4) drops in-memory state.
Idempotent: a second /close sees no partial.txt (NoSuchKeyException) and
returns gracefully.
Closes the logs.txt-clobber half of the streamable-logs-stability spec
and finalizes the canonical /close flow.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(log-storage): plug listener/lock leaks, propagate SSE on copy, recover counter from metadata
Addresses code review on combined Tasks 6+8:
- dropStreamState now removes activeListeners entries (SSE listener leak fix).
- cleanupExpiredStreams now removes streamLocks entries on expire (lock leak fix).
- copyPartialToLogs applies SSE configuration to CopyObjectRequest (was unencrypted on copy).
- writePartialLogsForStreamLocked reads last-flushed-line metadata from existing
partial.txt and uses it to keep totalLinesAppended monotonic across restarts.
- consecutiveFlushFailures reset uses computeIfAbsent + set(0) instead of allocating
a new AtomicInteger every successful flush.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* refactor(log-storage): rewrite sweeper as cleanupAbandonedStreams (24h/1h)
Bumps the idle threshold from 5 min to streamTimeoutHours (default 24h)
and the poll interval from 1 min to cleanupIntervalMinutes (default 1h).
On expire, finalizes the abandoned run by copying partial.txt -> logs.txt
server-side, deleting partial.txt, and dropping in-memory state — same
end-state as closeStream.
Also wires partialFlushIntervalMinutes into the periodic flush schedule
and removes the legacy streamTimeoutMs field that no longer drives behavior.
* fix(log-storage): preserve streamLocks entry on cleanup retry path
Addresses code review on Task 7: streamLocks.remove was unconditionally
in the finally block of finalizeAbandonedStream, so it ran even when the
sweeper returned early to retry next tick on a copy failure. That meant
the next sweep tick would create a fresh ReentrantLock, and any
concurrent appendLogs in the meantime would contend on a different lock
object than the retry, defeating mutual exclusion.
Now we only remove the lock entry once finalization has succeeded
(after dropStreamState). The retry path leaves the lock in place so
the next tick and any concurrent appendLogs see the same lock identity.
* fix(log-storage): include pendingFlush snapshot in mid-run reads
getCombinedLogsForActiveStream now appends the in-memory pendingFlush
snapshot to the partial.txt body when reading mid-run, so the UI's
paginated GET surfaces the most recent tail even before the next
scheduled flush has happened.
Only appends pendingFlush when a partial.txt file exists, avoiding
duplication in the fallback path where recentLogsCache already
includes those lines.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(log-storage): tighten Task 9 read path safety + invariant comment
Addresses review on Task 9: the unsafe null-lock fallback in the
pendingFlush append path is removed (it was structurally unreachable
but a latent hazard for future lifecycle changes). The pendingFlush
read now happens entirely under the per-stream lock, with a
conservative skip if no lock entry exists.
Also documents the recentLogsCache-vs-pendingFlush invariant in the
fallback path and adds a total-count assertion to the new test.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* test(log-storage): add bug-reproducer ITs for streamable-logs stability
- testIdleGapDoesNotClobberPartial: two log bursts within an open run;
asserts both are present in the read response.
- testCloseProducesLogsTxtMatchingPartial: write, close, read; asserts
content survives the close.
- testCloseIsIdempotent: a second /close is a graceful no-op.
Tests are tolerant of the storage backend in the test environment
(DefaultLogStorage in CI may not persist; S3LogStorage in S3-configured
environments). Deep behavioral coverage is in S3LogStorageTest unit tests.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(log-storage): address final-review critical bugs
- closeStream and finalizeAbandonedStream now propagate PUT failures
from writePartialLogsForStreamLocked (which returns boolean).
closeStream throws IOException; the sweeper retains state for retry.
Fixes silent data loss when the final flush PUT fails.
- streamLocks entries are no longer removed; this prevents an
acquire-vs-remove race that would break mutual exclusion. Memory
growth is bounded by maxConcurrentStreams in practice.
- cleanupAbandonedStreams re-checks expiration inside the per-stream
lock so a stream that was bumped by appendLogs between the scan
and the lock acquisition is not finalized.
- deleteLogs now acquires the per-stream lock before mutating state.
- getCombinedLogsForActiveStream appends pendingFlush in BOTH the
S3-found and memory-fallback branches, so reads aren't truncated
when recentLogsCache evicts oldest at its 1000-line cap.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(log-storage): use pendingFlush as canonical mid-run read source (no duplicates)
The previous Issue 5 fix appended pendingFlush unconditionally, which
caused duplicate lines in the read response when the fallback branch
used recentLogsCache (since both are populated by the same appendLogs).
Now: in the foundPartialFile branch, append pendingFlush AFTER the S3
body (non-overlapping by construction). In the fallback branch
(no partial.txt yet), use pendingFlush directly as the canonical
source — this is more complete than recentLogsCache (1000-line cap)
and avoids the duplicate issue. recentLogsCache remains a defensive
fallback for the rare case where pendingFlush is empty in the fallback
path.
* Update generated TypeScript types
* chore(log-storage): drop dead abortIncompleteMultipartUpload lifecycle rule
The multipart upload write path was removed; the bucket lifecycle's
abortIncompleteMultipartUpload(7 days) rule served only as migration
cleanup for in-flight uploads from the old code at deploy time. After
the migration window it does nothing.
Drops the rule from configureLifecyclePolicy, the AWS SDK import, the
"7 days multipart cleanup" string in the startup log, and the
corresponding bullet in docs/streamable-logs.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* chore: ignore docs/superpowers/
Local-only working notes (specs, plans) live there and shouldn't be tracked.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* test(log-storage): tolerate DefaultLogStorage in CI for streamable-logs ITs
CI runs the integration tests against the bootstrap config which uses
DefaultLogStorage (delegates to k8s/Airflow which isn't running). The
storage returns:
- "No pods found for this pipeline" sentinel for getLogs
- non-2xx status (the SDK wraps it as statusCode -1) for /close
Adjustments:
- testIdleGapDoesNotClobberPartial: parse JSON, only assert when total>0.
When storage actually persists (S3 deployments), assert BOTH bursts
are present — that's the real "no clobber" check.
- postClose helper: tolerate any exception from the close call
(idempotency is the contract; transient errors are acceptable).
The deep behavioural coverage continues to live in S3LogStorageTest unit
tests where mock S3 is the storage backend.
* test
* fix
* Update generated TypeScript types
* fix
* Update generated TypeScript types
* fix(log-storage): record UTF-8 byte length in partial.txt total-bytes metadata
String.length() returns UTF-16 code units; for non-ASCII content this
diverged from the actual S3 object size, breaking the drift cross-check
documented in docs/streamable-logs.md.
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
* fix(log-storage): address PR review findings on S3LogStorage
Plumbs the documented timing knobs (cleanupIntervalMinutes, partialFlushIntervalMinutes,
earlyFlushWatermarkBytes, pendingFlushAlertAfterFailures) through LogStorageConfiguration
so operators can actually tune them. Replaces the unbounded streamLocks ConcurrentHashMap
with a Guava Striped<Lock> capped at 256 stripes, eliminating the per-(fqn, runId) memory
leak and the acquire-vs-remove race that a per-key map would have. Adds a Multipart
Upload + UploadPartCopy concatenation path for partial.txt >= 5 MB, avoiding the O(n^2)
total transfer and full in-JVM body merge that the prior GET+PUT-everything strategy hit
on long-running pipelines. Realigns docs/streamable-logs.md with the actual schema and
implementation, drops the broken superpowers/* spec link, and renames the misleading
testIdleGapDoesNotClobberPartial IT (which posted bursts back-to-back without simulating
any gap).
Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
---------
Co-authored-by: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
16 KiB
Streamable Ingestion Logs
This document describes the end-to-end design of OpenMetadata's streamable ingestion-pipeline log system: how logs flow from a running connector to durable S3 storage, how the UI reads them while a run is in progress, and how the system handles long idle gaps, restarts, and abandoned runs.
Overview
Ingestion pipelines (metadata, profiler, lineage, usage, dbt, etc.) emit logs as they run. Operators need to:
- Watch logs live while a pipeline is running, including for long-running connectors that can take hours.
- Read logs after the run ends, with a single canonical artifact per run.
- Recover gracefully from server restarts, network blips, and connector idle gaps.
OpenMetadata addresses this with a server-side log storage abstraction backed by S3 (or any S3-compatible store like MinIO). The connector pushes log batches over HTTP; the server persists them and serves both live and post-run reads.
Architecture
┌──────────────────────┐
│ Python ingestion │ POST /logs/{fqn}/{runId} (append)
│ connector │ POST /logs/{fqn}/{runId}/close (finalize)
│ (logs_mixin.py) │
└──────────┬───────────┘
│ HTTP
▼
┌──────────────────────┐
│ OpenMetadata server │
│ IngestionPipeline │
│ Resource │
└──────────┬───────────┘
│ LogStorageInterface
▼
┌──────────────────────┐ ┌──────────────────────┐
│ S3LogStorage │────────▶│ S3 / MinIO bucket │
│ (streaming, in-mem │ │ partial.txt │
│ buffers, sweeper) │ │ logs.txt │
└──────────┬───────────┘ └──────────────────────┘
│ SSE / GET (paginated / download)
▼
┌──────────────────────┐
│ OpenMetadata UI │
│ (live tail + history)│
└──────────────────────┘
The LogStorageInterface abstraction supports multiple backends:
| Backend | Purpose |
|---|---|
S3LogStorage |
Production: stores logs durably in S3 / MinIO. The focus of this document. |
DefaultLogStorage |
Backward-compat: delegates to the pipeline service client (Airflow / Argo). No first-class storage. |
This document covers the S3LogStorage implementation.
Storage Layout
Each pipeline run is identified by a (fqn, runId) tuple. On S3 the layout is:
{bucket}/{prefix}/ # prefix defaults to "pipeline-logs"
{sanitizedFQN}/{runId}/
partial.txt # readable view during the run
logs.txt # final artifact, materialized at /close
.active/{sanitizedFQN}/{runId}/{serverId} # heartbeat marker
partial.txt is the durable, readable view of an in-progress run. It is updated periodically as the connector appends batches. It carries durable offset state in S3 user-defined metadata:
| Metadata key | Purpose |
|---|---|
x-amz-meta-last-flushed-line |
Logical line counter at the moment of this PUT. Drives retry idempotency and post-restart recovery. |
x-amz-meta-total-bytes |
Cross-check on body size; helps detect drift. |
x-amz-meta-writer-epoch |
Bumped each time a fresh OM-server instance picks up the stream after a restart. |
x-amz-meta-writer-version |
Identifies the writer code version. Useful during migration windows. |
logs.txt is the canonical post-run artifact. It is created only at /close (or by the abandoned-run sweeper), as a server-side S3 copy of the final partial.txt. Content matches partial.txt exactly at the moment of close.
.active/... markers are dropped as a side effect of appendLogs. They have no functional role in correctness; they are operational hints for diagnostics ("which OM-server instance most recently saw this run").
A bucket lifecycle policy ensures cleanup:
expirationDays(default 30) on thepipeline-logs/prefix expires all logs after the retention window.
Run Lifecycle
1. Connector emits a batch
The Python ingestion runner buffers log lines and POSTs batches to the server:
POST /api/v1/services/ingestionPipelines/logs/{fqn}/{runId}
Content-Type: application/json
"<raw log content>" OR
{
"logs": "<base64-gzipped log content>",
"connectorId": "...",
"compressed": true
}
IngestionPipelineResource.writePipelineLogs decodes the body and calls repository.appendLogs(fqn, runId, content), which delegates to S3LogStorage.appendLogs.
2. Server-side append
S3LogStorage.appendLogs does five things, all in memory, all under a per-stream ReentrantLock:
- Increments
totalLinesAppended, the monotonic logical line counter that anchors retry idempotency. - Appends to
SimpleLogBuffer(in-memory ring, capacity 1000 lines). This is the source for the SSE/WebSocket live-tail UI experience. It is bounded; oldest lines evict on overflow. It is not load-bearing for durability. - Appends to
pendingFlush(in-memory queue, no fixed cap, byte-tracked). This is the durable-pending-write queue and survives until the next successful PUT. - Notifies SSE listeners, fanning out the new lines to any open live-tail HTTP connections.
- Schedules an early flush if
pendingFlushexceedsearlyFlushWatermarkBytes(default 5 MB). This protects against memory bloat under bursty writes.
A single-threaded cleanupExecutor schedules the periodic flush, the abandoned-run sweeper, and metrics updates.
3. Periodic flush to partial.txt
Every partialFlushIntervalMinutes (default 2) and on demand from the early-flush watermark, writePartialLogsForStream runs under the per-stream lock:
- Snapshot
pendingFlushand clear it. - If empty, no-op (idle streams cost nothing).
GetObject partial.txt→ readsContent-Lengthand metadata from the response headers. On 404, treat as empty.- Build new metadata (
last-flushed-line,total-bytes,writer-epoch,writer-version). - If existing body < 5 MB — read the body, build merged body = existing +
\n-joined snapshot,PutObjectatomically. - If existing body ≥ 5 MB — abort the body stream and concatenate server-side via Multipart Upload:
CreateMultipartUpload,UploadPartCopy(existing body as part 1),UploadPart(new content as part 2, the last part has no 5 MB minimum),CompleteMultipartUpload. The merged body never enters JVM heap and is not re-uploaded. - On failure, abort any in-flight multipart upload, re-merge the snapshot to the head of
pendingFlush, and try again next tick. No data loss.
Because pendingFlush is unbounded by the SimpleLogBuffer cap, no line is ever evicted before being flushed.
4. Live read while running
The UI's "live logs" view does two things in parallel:
- HTTP GET
/logs/{fqn}/{runId}?after={cursor}for paginated history. The server readspartial.txtfrom S3 and concatenates the in-memorypendingFlushsnapshot for the most-recent-tail bytes that haven't yet been flushed. The cursor is a line offset. - Server-Sent Events (SSE) for live tail. The endpoint registers a
LogStreamListeneragainst the stream key and pushes new lines asnotifyListenersfires from eachappendLogs.
This gives the user "everything written so far" via GET and "everything written in real time from now on" via SSE.
5. /close finalization
When the connector terminates (success, graceful failure, or graceful abort), it calls:
POST /api/v1/services/ingestionPipelines/logs/{fqn}/{runId}/close
S3LogStorage.closeStream runs under the per-stream lock:
- Final flush: drain remaining
pendingFlushtopartial.txt(same path as the periodic flush). - Server-side copy
partial.txt→logs.txt. Bytes do not transit through OM. Cheap and constant-time regardless of log size. - Delete
partial.txt. - Best-effort delete the
.active/{fqn}/{runId}/{serverId}marker. - Drop in-memory state for the stream (
activeStreams,pendingFlush,totalLinesAppended,recentLogsCache, the per-stream lock).
/close is idempotent. A second call finds no partial.txt and no in-memory state; it is a graceful no-op. A /close that arrives after the abandoned-run sweeper already finalized the stream behaves the same way.
6. Post-/close reads
Once /close completes, logs.txt is the canonical artifact. getLogs(fqn, runId) reads it directly. Pagination is by line offset; the response includes after (next cursor) and total (total bytes / lines).
There is also a download endpoint that streams the full file (or composes from segments / partial in legacy fallbacks).
Read Paths
| Endpoint | Pre-/close |
Post-/close |
|---|---|---|
GET /logs/{fqn}/{runId} |
Reads partial.txt + appends pendingFlush snapshot. Apply cursor pagination. |
Reads logs.txt. |
GET /logs/{fqn}/{runId}/download |
Streams partial.txt. |
Streams logs.txt. |
GET /logs/{fqn}/stream/{runId} (SSE) |
Registers a listener; replays last 100 buffered lines, then live-streams new lines. | (Not used post-close; the run is over.) |
Legacy partial.txt files written by older code (without S3 metadata) read normally; the new flush logic treats them as "no prior offset" and merges any new content correctly.
Abandoned-Run Recovery
Connectors can die without calling /close — process killed, OOM, network partition, infrastructure failure. To bound resource use and still produce a final logs.txt, a sweeper runs periodically:
- Schedule: every
cleanupIntervalMinutes(default 60). - Threshold:
streamTimeoutMinutessince lastappendLogs(default 1440 = 24h).
For each expired stream, the sweeper does the same finalization steps as /close (final flush, copy to logs.txt, delete partial.txt, drop in-memory state). The end result is identical: an abandoned run produces a finalized logs.txt artifact that the UI can read, just delayed.
The 24h default is intentionally lenient: typical idle gaps in slow connectors (waiting on source queries, batch boundaries, queues) are minutes-to-hours, not days. Operators can tune the threshold downward in deployments where memory pressure from many parallel runs requires more aggressive reclamation.
Failure Modes & Recovery
| Failure | Recovery |
|---|---|
| S3 PUT fails during periodic flush | pendingFlush snapshot is restored under the lock. Next tick retries. No data loss. |
| OM-server restart mid-run | All in-memory state lost. partial.txt on S3 retains all previously-flushed content. The next appendLogs re-creates state; the first flush after restart reads partial.txt (with metadata) and resumes from last-flushed-line. Worst-case loss: lines that were in pendingFlush at restart time, bounded above by partialFlushIntervalMinutes. |
Connector dies without /close |
Abandoned-run sweeper finalizes the run after streamTimeoutHours. logs.txt is materialized from the most recent partial.txt. |
/close retries after partial success |
All steps are idempotent. Second call finds no partial.txt and no in-memory state; no-op. |
Concurrent appendLogs and cleanup |
The per-stream lock serializes them. Cleanup finds the stream "fresh" again and skips it next tick. |
Bucket lifecycle expires partial.txt mid-run |
Should not happen at default expirationDays = 30. If misconfigured (very low retention), the next flush would treat it as a fresh partial.txt and start over. Recommended floor: 7 days. |
Configuration
All settings live under LogStorageConfiguration in openmetadata.yaml:
| Field | Default | Description |
|---|---|---|
bucketName |
(required) | S3 bucket for log storage. |
prefix |
pipeline-logs |
Key prefix within the bucket. |
enableServerSideEncryption |
true |
Apply SSE on every PUT. |
sseAlgorithm |
AES_256 |
Or AWS_KMS (requires kmsKeyId). |
storageClass |
STANDARD_IA |
S3 storage class for log objects. |
expirationDays |
30 | Bucket lifecycle: expire all logs after this many days. |
streamTimeoutMinutes |
1440 | Idle threshold (in minutes) before the abandoned-run sweeper finalizes a stream. |
cleanupIntervalMinutes |
60 | How often the sweeper wakes up to check for abandoned streams. |
partialFlushIntervalMinutes |
2 | Periodic pendingFlush → partial.txt cadence. |
earlyFlushWatermarkBytes |
5242880 (5 MB) | Triggers an out-of-band flush when pendingFlush exceeds this size. |
pendingFlushAlertAfterFailures |
10 | Emit an alerting metric after this many consecutive failed flushes for a stream. |
maxConcurrentStreams |
100 | Bound on in-flight pipeline runs per OM-server instance. |
awsConfig.* |
— | AWS credentials / region / endpoint (also supports IAM role + custom endpoints for MinIO). |
Concurrency Model
Coordination is a per-stream lock keyed by streamKey = fqn + "/" + runId. The lock is held for the duration of appendLogs, periodic flush, abandoned-run cleanup, and /close. Locks are backed by a Guava Striped<Lock> with a fixed stripe count, so memory does not grow with completed-run accumulation; the same key always maps to the same lock instance, eliminating the acquire-vs-remove race that a per-key map would have. False contention across stripes is bounded by maxConcurrentStreams << stripe count.
A single-threaded ScheduledExecutorService (cleanupExecutor) drives:
- Periodic flushes (
writePartialLogs) - Abandoned-run sweeper (
cleanupAbandonedStreams) - Metrics updates (
updateStreamMetrics) - One-shot early flushes scheduled by the watermark trigger
Under sustained burst load, scheduled tasks queue on this single thread. This is intentional: it bounds resource use and avoids unbounded thread creation under spikes. If a deployment regularly sees queue backlog, the watermark or flush interval can be tuned.
Observability
Key metrics exposed by StreamableLogsMetrics:
om_streamable_logs_log_shipment_*— distribution of append latencies.om_streamable_logs_logs_sent/logs_failed— counter of successful and failed appends.om_streamable_logs_batch_size— distribution of lines per batch.om_streamable_logs_s3_*— distribution of S3 read/write latencies and counters of S3 errors.om_streamable_logs_pending_part_uploads— gauge for monitoring queue backlog (legacy, will be retired with multipart removal).om_streamable_logs_multipart_uploads— gauge for active multipart uploads (legacy, will be retired).om_streamable_logs_pending_flush_bytes— gauge for in-memorypendingFlushsize per stream (new).om_streamable_logs_consecutive_flush_failures— gauge per stream (new).
Recommended alerts:
pending_flush_bytes> 50 MB sustained → memory pressure or persistent S3 failures.consecutive_flush_failures≥ 10 → S3 connectivity or auth issue.s3_errorsrate > 1/min → S3 health degradation.
Multi-Server Topology
The design assumes single-writer-per-run: an ALB / load balancer enforces sticky sessions for (fqn, runId) via the PIPELINE_SESSION cookie set on the first appendLogs response. All subsequent requests for the same run land on the same OM-server instance for the lifetime of the run.
If stickiness is broken (cookie stripped by a proxy, multi-cluster routing without coordination), two OM-server instances could write to the same partial.txt and clobber each other. This is out of scope for the current design. A future iteration could move offset state to the database for cross-server coordination.
References
- Source files:
openmetadata-service/src/main/java/org/openmetadata/service/logstorage/S3LogStorage.javaopenmetadata-service/src/main/java/org/openmetadata/service/logstorage/LogStorageFactory.javaopenmetadata-spec/src/main/java/org/openmetadata/service/logstorage/LogStorageInterface.javaopenmetadata-service/src/main/java/org/openmetadata/service/resources/services/ingestionpipelines/IngestionPipelineResource.javaingestion/src/metadata/utils/streamable_logger.pyingestion/src/metadata/ingestion/ometa/mixins/logs_mixin.py
- Related PRs: #23590, #24198, #24287, #24410