From c20a29b11bb09c44bf2a335cd55256a0d23c23aa Mon Sep 17 00:00:00 2001 From: Sriharsha Chintalapani Date: Sat, 9 May 2026 21:44:47 -0700 Subject: [PATCH] cache: lineage cache, per-type metrics, invalidation registry, search-cache MIME-Version: 1.0 Content-Type: text/plain; charset=UTF-8 Content-Transfer-Encoding: 8bit Add Redis-backed lineage response cache and search response cache, both gated by the existing CACHE_PROVIDER toggle and falling through to direct computation when the cache is unavailable. The cache remains optional — verified end-to-end by toggling CACHE_PROVIDER=none on a live stack and confirming all paths continue to work (just without the L2 hit). Coverage: - CachedLineage wraps LineageRepository.getLineage with hybrid TTL + direct invalidation (60s default). Direct edits invalidate the affected root cache entries; transitive changes fall through to TTL. - CachedSearchLayer wraps /api/v1/search/query with auth-aware caching (cache key includes principal so users with different ACLs don't share results). 30s default TTL. Observability: - /api/v1/system/cache/stats response now includes a metrics block with hits/misses/hitRatio/evictions/errors/writes plus read/write latency Timers, and a byType breakdown so coverage gaps are visible per entity-type and per cache-layer. Correctness: - New Invalidatable interface + CacheBundle registry + invalidateEntity helper so future cache layers plug in by implementing one method instead of editing multiple mutation paths. - Edge mutations in LineageRepository.addLineage/deleteLineage invalidate both endpoints; entity mutations in EntityRepository.postUpdate / postDelete / restoreEntity invalidate the lineage rooted at the entity. - Pub/sub handler in CacheBundle iterates registered Invalidatables so remote-pod evictions flow to all layers automatically. Tooling: - docker-compose.cache-off.yml overlay flips CACHE_PROVIDER=none for local A/B testing without tearing down DB/ES volumes. - CachedSearchLayerIT exercises hit-on-second-call, distinct-query misses, distinct-page-size misses, and byType shape via the metrics endpoint. Each test gracefully no-ops when the cluster runs cache-off. Co-Authored-By: Claude Opus 4.7 (1M context) --- .../development/docker-compose.cache-off.yml | 8 + .../it/tests/CachedSearchLayerIT.java | 254 ++++++++++++++++++ .../service/cache/CacheBundle.java | 61 +++++ .../service/cache/CacheConfig.java | 13 + .../openmetadata/service/cache/CacheKeys.java | 36 +++ .../service/cache/CacheMetrics.java | 126 +++++++++ .../service/cache/CacheProvider.java | 13 + .../service/cache/CachedEntityDao.java | 28 +- .../service/cache/CachedLineage.java | 173 ++++++++++++ .../service/cache/CachedReadBundle.java | 20 +- .../service/cache/CachedSearchLayer.java | 171 ++++++++++++ .../service/cache/Invalidatable.java | 43 +++ .../service/cache/RedisCacheProvider.java | 33 +++ .../service/jdbi3/EntityRepository.java | 19 ++ .../service/jdbi3/LineageRepository.java | 39 +++ .../resources/search/SearchResource.java | 19 +- .../resources/system/SystemResource.java | 5 + 17 files changed, 1056 insertions(+), 5 deletions(-) create mode 100644 docker/development/docker-compose.cache-off.yml create mode 100644 openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/CachedSearchLayerIT.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedLineage.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedSearchLayer.java create mode 100644 openmetadata-service/src/main/java/org/openmetadata/service/cache/Invalidatable.java diff --git a/docker/development/docker-compose.cache-off.yml b/docker/development/docker-compose.cache-off.yml new file mode 100644 index 00000000000..e60d3226c0e --- /dev/null +++ b/docker/development/docker-compose.cache-off.yml @@ -0,0 +1,8 @@ +# Override that disables the cache while leaving the rest of the stack intact. +# Used in the local A/B benchmark to flip cache off without tearing down volumes. +# Apply on TOP of base compose (NOT the redis overlay): +# docker compose -f docker-compose.yml -f docker-compose.cache-off.yml up -d --no-deps openmetadata-server +services: + openmetadata-server: + environment: + CACHE_PROVIDER: none diff --git a/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/CachedSearchLayerIT.java b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/CachedSearchLayerIT.java new file mode 100644 index 00000000000..5dbc54fe6a3 --- /dev/null +++ b/openmetadata-integration-tests/src/test/java/org/openmetadata/it/tests/CachedSearchLayerIT.java @@ -0,0 +1,254 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.it.tests; + +import static org.junit.jupiter.api.Assertions.assertEquals; +import static org.junit.jupiter.api.Assertions.assertNotNull; +import static org.junit.jupiter.api.Assertions.assertTrue; + +import java.util.Map; +import org.junit.jupiter.api.Test; +import org.junit.jupiter.api.parallel.Execution; +import org.junit.jupiter.api.parallel.ExecutionMode; +import org.openmetadata.it.util.SdkClients; +import org.openmetadata.sdk.client.OpenMetadataClient; +import org.openmetadata.sdk.network.HttpMethod; +import org.openmetadata.sdk.network.RequestOptions; + +/** + * Integration tests for {@link org.openmetadata.service.cache.CachedSearchLayer}. + * + *

Verifies the auth-aware search-response cache shipped in plan Item 1 of + * {@code .context/cache-perf-findings.md}. Tests are designed to be cache-on; if the cluster is + * running with {@code CACHE_PROVIDER=none}, the {@code byType.search} block in + * {@code /cache/stats} stays at zero and the lazily-asserted hit-ratio assertions are skipped + * (the cache is expected to be inactive). + * + *

Skipped here (deferred to follow-up): + * + *

+ */ +@Execution(ExecutionMode.CONCURRENT) +class CachedSearchLayerIT { + + private static final String SEARCH_PATH = "/v1/search/query"; + private static final String STATS_PATH = "/v1/system/cache/stats"; + + /** + * The same query from the same principal hits the cache on call 2+. Verify by capturing the + * application-level metrics block delta — we expect at least one increment in + * {@code metrics.byType.search.hits} for {@code N-1} of {@code N} calls. + */ + @Test + void sameQueryHitsCacheOnSecondCall() { + OpenMetadataClient client = SdkClients.adminClient(); + String query = "*"; + String index = "table_search_index"; + int size = 10; + + Stats before = readStats(client); + if (!before.cacheEnabled) { + // Cache disabled — nothing to assert. Test still passes; the search itself must work. + runSearch(client, query, index, size); + runSearch(client, query, index, size); + return; + } + + // Three identical calls. First is a cold miss + write; the other two are hits. + runSearch(client, query, index, size); + runSearch(client, query, index, size); + runSearch(client, query, index, size); + + Stats after = readStats(client); + long hitsDelta = after.searchHits - before.searchHits; + long missesDelta = after.searchMisses - before.searchMisses; + long writesDelta = after.searchWrites - before.searchWrites; + + // At least 2 of the 3 calls should be hits. We don't pin to exactly 2 because other tests + // running concurrently may also issue searches against the same query. We do require at + // least one new write (the cold first call's populate) and at least 2 new hits. + assertTrue( + hitsDelta >= 2, + "Expected ≥2 search cache hits across 3 identical calls; saw delta hits=" + hitsDelta); + assertTrue( + writesDelta >= 1, + "Expected ≥1 search cache write on first call; saw delta writes=" + writesDelta); + // Total lookups (hits+misses) should be at least 3 — one per call. + assertTrue( + hitsDelta + missesDelta >= 3, + "Expected ≥3 search cache lookups across 3 identical calls; saw delta lookups=" + + (hitsDelta + missesDelta)); + } + + /** + * Different query strings are different cache keys, so each is a fresh miss. Verify the search + * miss counter increments by at least the number of distinct queries we issue. + */ + @Test + void differentQueriesProduceDistinctMisses() { + OpenMetadataClient client = SdkClients.adminClient(); + Stats before = readStats(client); + if (!before.cacheEnabled) { + // Cache disabled — exercise the path; no assertions. + runSearch(client, "alpha", "table_search_index", 10); + runSearch(client, "beta", "table_search_index", 10); + runSearch(client, "gamma", "table_search_index", 10); + return; + } + + // Use unique enough query strings that other tests are unlikely to be hitting them. + runSearch(client, "csliit_alpha_" + System.nanoTime(), "table_search_index", 10); + runSearch(client, "csliit_beta_" + System.nanoTime(), "table_search_index", 10); + runSearch(client, "csliit_gamma_" + System.nanoTime(), "table_search_index", 10); + + Stats after = readStats(client); + long missesDelta = after.searchMisses - before.searchMisses; + long writesDelta = after.searchWrites - before.searchWrites; + + // Each unique query should miss (cold) and write on the back side. + assertTrue( + missesDelta >= 3, + "Expected ≥3 search cache misses for 3 distinct queries; saw delta misses=" + missesDelta); + assertTrue( + writesDelta >= 3, + "Expected ≥3 search cache writes for 3 distinct queries; saw delta writes=" + writesDelta); + } + + /** + * Different {@code size} values are distinct cache entries — same {@code q} and {@code index} + * but different page coordinates. Verify each is its own miss. + */ + @Test + void differentSizeValuesProduceDistinctMisses() { + OpenMetadataClient client = SdkClients.adminClient(); + String query = "csliit_size_" + System.nanoTime(); + Stats before = readStats(client); + if (!before.cacheEnabled) { + runSearch(client, query, "table_search_index", 5); + runSearch(client, query, "table_search_index", 10); + runSearch(client, query, "table_search_index", 25); + return; + } + + runSearch(client, query, "table_search_index", 5); + runSearch(client, query, "table_search_index", 10); + runSearch(client, query, "table_search_index", 25); + + Stats after = readStats(client); + long missesDelta = after.searchMisses - before.searchMisses; + assertTrue( + missesDelta >= 3, + "Different size= values should be separate cache entries; saw delta misses=" + missesDelta); + } + + /** + * {@code /cache/stats} surfaces a per-type breakdown including a {@code search} entry once the + * search cache has been exercised. Smoke test that the byType block shape is sane. + */ + @Test + void cacheStatsExposesSearchByType() { + OpenMetadataClient client = SdkClients.adminClient(); + runSearch(client, "*", "table_search_index", 10); + Stats stats = readStats(client); + if (!stats.cacheEnabled) { + return; // cache off; nothing to verify + } + assertNotNull(stats.byType, "byType block must be present in /cache/stats response"); + Object searchEntry = stats.byType.get("search"); + if (searchEntry != null) { + assertTrue( + searchEntry instanceof Map, + "byType.search should be a Map; got " + searchEntry.getClass()); + Map m = (Map) searchEntry; + assertNotNull(m.get("hits")); + assertNotNull(m.get("misses")); + assertNotNull(m.get("writes")); + assertNotNull(m.get("hitRatio")); + } + // If searchEntry is null, the cluster is cache-off OR no search call has happened yet on + // this cluster instance. Either way, no failure — the contract is "if it's there, it's + // well-shaped," not "it's always there." + } + + // ------------------------------------------------------------------------------------------- + // Helpers + + private static void runSearch(OpenMetadataClient client, String q, String index, int size) { + RequestOptions opts = + RequestOptions.builder() + .queryParam("q", q) + .queryParam("index", index) + .queryParam("size", String.valueOf(size)) + .build(); + @SuppressWarnings("unchecked") + Map ignored = + client.getHttpClient().execute(HttpMethod.GET, SEARCH_PATH, null, Map.class, opts); + assertNotNull(ignored, "search response must not be null"); + } + + private static Stats readStats(OpenMetadataClient client) { + @SuppressWarnings("unchecked") + Map raw = + client + .getHttpClient() + .execute(HttpMethod.GET, STATS_PATH, null, Map.class, RequestOptions.builder().build()); + Stats s = new Stats(); + s.cacheEnabled = Boolean.TRUE.equals(raw.get("available")); + Object metricsObj = raw.get("metrics"); + if (metricsObj instanceof Map) { + @SuppressWarnings("unchecked") + Map metrics = (Map) metricsObj; + Object byTypeObj = metrics.get("byType"); + if (byTypeObj instanceof Map) { + @SuppressWarnings("unchecked") + Map byType = (Map) byTypeObj; + s.byType = byType; + Object searchEntry = byType.get("search"); + if (searchEntry instanceof Map) { + @SuppressWarnings("unchecked") + Map e = (Map) searchEntry; + s.searchHits = toLong(e.get("hits")); + s.searchMisses = toLong(e.get("misses")); + s.searchWrites = toLong(e.get("writes")); + } + } + } + return s; + } + + private static long toLong(Object o) { + if (o == null) return 0L; + if (o instanceof Number n) return n.longValue(); + return Long.parseLong(o.toString()); + } + + private static class Stats { + boolean cacheEnabled; + Map byType; + long searchHits; + long searchMisses; + long searchWrites; + } + + /** Compile-time check on test machinery. Asserts the helper compiles independently. */ + @SuppressWarnings("unused") + private static void compileCheck() { + assertEquals(0L, toLong(null)); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheBundle.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheBundle.java index d5d8477ff1f..243f7469851 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheBundle.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheBundle.java @@ -21,8 +21,16 @@ public class CacheBundle implements ConfiguredBundle INVALIDATABLES = + new java.util.concurrent.CopyOnWriteArrayList<>(); public CacheBundle() { instance = this; @@ -72,6 +80,13 @@ public class CacheBundle implements ConfiguredBundle { @@ -81,6 +96,16 @@ public class CacheBundle implements ConfiguredBundleThe {@code rootId} is wrapped in Redis hash-tag braces so when we eventually run on Redis + * Cluster, all lineage variants for the same root land on the same slot — keeps SCAN+UNLINK + * efficient and avoids cross-slot pipeline rejection. + */ + public String lineageGraph( + java.util.UUID rootId, int upstreamDepth, int downstreamDepth, boolean includeDeleted) { + return ns + + ":lineage:graph:{" + + rootId.toString() + + "}:up=" + + upstreamDepth + + ":down=" + + downstreamDepth + + ":incDel=" + + includeDeleted; + } + + /** SCAN/UNLINK pattern for {@link #lineageGraph} — matches every depth/include variant. */ + public String lineageGraphPattern(java.util.UUID rootId) { + return ns + ":lineage:graph:{" + rootId.toString() + "}:*"; + } + public String childrenPage( String type, String parentFqn, String version, int limit, int offset, String includeTag) { String fqnHash = FullyQualifiedName.buildHash(parentFqn); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheMetrics.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheMetrics.java index b82c3ee5395..68225cb8b68 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheMetrics.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheMetrics.java @@ -4,6 +4,7 @@ import io.micrometer.core.instrument.Counter; import io.micrometer.core.instrument.Gauge; import io.micrometer.core.instrument.MeterRegistry; import io.micrometer.core.instrument.Timer; +import java.util.LinkedHashMap; import java.util.Map; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.atomic.AtomicLong; @@ -33,6 +34,15 @@ public class CacheMetrics { // at startup. Holders keep the AtomicLong reference alive so the gauge stays observable. private final Map coverageGauges = new ConcurrentHashMap<>(); private final Map bundleCoverageGauges = new ConcurrentHashMap<>(); + // Per-type layer counters — registered lazily on first use. Distinct from the untagged + // counters above: these track *logical* hits at a cache layer (e.g. CachedSearchLayer, + // CachedEntityDao for type=table), while the untagged counters track *every* Redis op + // including those issued by sub-operations of a single layer call. The two views are + // related but not identical; use byType for "is this entity type's cache effective?" + // and the aggregate for "what's our Redis traffic look like?". + private final Map typedHits = new ConcurrentHashMap<>(); + private final Map typedMisses = new ConcurrentHashMap<>(); + private final Map typedWrites = new ConcurrentHashMap<>(); private CacheMetrics(MeterRegistry meterRegistry) { this.meterRegistry = meterRegistry; @@ -204,6 +214,59 @@ public class CacheMetrics { warmupCompletedRuns.incrementAndGet(); } + /** + * Layer-level hit recording. {@code type} is a free-form discriminator chosen by the calling + * cache layer — entity types like "table" / "container" for {@link CachedEntityDao}, or category + * names like "search" / "lineage" for the higher-level layers. {@code null} is a no-op (the + * aggregate counters above are untouched, so the call is safe from any context). + */ + public void recordLayerHit(String type) { + if (type != null) { + typedHits + .computeIfAbsent( + type, + t -> + Counter.builder("cache.layer.hits") + .description("Per-type cache hits at the layer level") + .tag("cache", "redis") + .tag("type", t) + .register(meterRegistry)) + .increment(); + } + } + + /** See {@link #recordLayerHit(String)}. */ + public void recordLayerMiss(String type) { + if (type != null) { + typedMisses + .computeIfAbsent( + type, + t -> + Counter.builder("cache.layer.misses") + .description("Per-type cache misses at the layer level") + .tag("cache", "redis") + .tag("type", t) + .register(meterRegistry)) + .increment(); + } + } + + /** See {@link #recordLayerHit(String)}. */ + public void recordLayerWrite(String type) { + if (type != null) { + typedWrites + .computeIfAbsent( + type, + t -> + Counter.builder("cache.layer.writes") + .description("Per-type cache writes at the layer level") + .tag("cache", "redis") + .tag("type", t) + .register(meterRegistry)) + .increment(); + } + } + private void setOrRegisterCoverageGauge( Map holders, String metricName, String entityType, double ratio) { long value = Math.round(Math.max(0.0, Math.min(1.0, ratio)) * 100.0); @@ -228,4 +291,67 @@ public class CacheMetrics { double total = hits + misses; return total > 0 ? hits / total : 0.0; } + + /** + * Snapshot of all application-level cache counters and gauges. Intended to be merged into the + * {@code /api/v1/system/cache/stats} response so operators can read hit/miss/latency without + * scraping Prometheus. Distinct from the provider-side stats (which expose Redis + * keyspace_hits/misses) — these counters track decisions made by OM read paths + * (EntityRepository, CachedReadBundle, etc.) so a hit here means "OM avoided a DB query," not + * "Redis returned data for some internal call." + */ + public Map snapshot() { + Map snap = new LinkedHashMap<>(); + snap.put("hits", cacheHits != null ? (long) cacheHits.count() : 0L); + snap.put("misses", cacheMisses != null ? (long) cacheMisses.count() : 0L); + snap.put("hitRatio", getHitRatio()); + snap.put("evictions", cacheEvictions != null ? (long) cacheEvictions.count() : 0L); + snap.put("errors", cacheErrors != null ? (long) cacheErrors.count() : 0L); + snap.put("writes", cacheWrites != null ? (long) cacheWrites.count() : 0L); + snap.put("size", cacheSize.get()); + Map warmup = new LinkedHashMap<>(); + warmup.put("entities", warmupEntities.get()); + warmup.put("relationships", warmupRelationships.get()); + warmup.put("tags", warmupTags.get()); + warmup.put("completedRuns", warmupCompletedRuns.get()); + snap.put("warmup", warmup); + if (cacheReadLatency != null) { + Map readLatency = new LinkedHashMap<>(); + readLatency.put("count", cacheReadLatency.count()); + readLatency.put( + "totalMs", cacheReadLatency.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS)); + readLatency.put("meanMs", cacheReadLatency.mean(java.util.concurrent.TimeUnit.MILLISECONDS)); + readLatency.put("maxMs", cacheReadLatency.max(java.util.concurrent.TimeUnit.MILLISECONDS)); + snap.put("readLatency", readLatency); + } + if (cacheWriteLatency != null) { + Map writeLatency = new LinkedHashMap<>(); + writeLatency.put("count", cacheWriteLatency.count()); + writeLatency.put( + "totalMs", cacheWriteLatency.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS)); + writeLatency.put( + "meanMs", cacheWriteLatency.mean(java.util.concurrent.TimeUnit.MILLISECONDS)); + writeLatency.put("maxMs", cacheWriteLatency.max(java.util.concurrent.TimeUnit.MILLISECONDS)); + snap.put("writeLatency", writeLatency); + } + Map> byType = new LinkedHashMap<>(); + java.util.Set types = new java.util.TreeSet<>(); + types.addAll(typedHits.keySet()); + types.addAll(typedMisses.keySet()); + types.addAll(typedWrites.keySet()); + for (String type : types) { + long h = typedHits.containsKey(type) ? (long) typedHits.get(type).count() : 0L; + long miss = typedMisses.containsKey(type) ? (long) typedMisses.get(type).count() : 0L; + long w = typedWrites.containsKey(type) ? (long) typedWrites.get(type).count() : 0L; + long totalLookups = h + miss; + Map entry = new LinkedHashMap<>(); + entry.put("hits", h); + entry.put("misses", miss); + entry.put("writes", w); + entry.put("hitRatio", totalLookups > 0 ? (double) h / totalLookups : 0.0); + byType.put(type, entry); + } + snap.put("byType", byType); + return snap; + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheProvider.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheProvider.java index cbebe3d5959..70a915b5391 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheProvider.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CacheProvider.java @@ -56,6 +56,19 @@ public interface CacheProvider extends AutoCloseable { return -1L; } + /** + * SCAN keys matching {@code pattern} and UNLINK them in batches. Returns the number of keys + * deleted, or {@code 0} if the provider doesn't support pattern-based deletion (the default). + * + *

Like {@link #scanCount}, the wall time is O(n) over the keyspace, not over matches. Call + * it on bounded events (entity edits, lineage edge changes) — never in a hot loop. Always use + * a precise pattern (e.g. {@code "om:prod:lineage:graph:{abc}:*"}); avoid broad globs like + * {@code "om:prod:*"} which would block the cluster on a large keyspace. + */ + default long scanDelete(String pattern) { + return 0L; + } + @Override void close(); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedEntityDao.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedEntityDao.java index dd4967a7782..b49a50eccec 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedEntityDao.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedEntityDao.java @@ -33,12 +33,15 @@ public class CachedEntityDao { // Try to get from cache first Optional cached = cache.hget(cacheKey, "base"); + CacheMetrics m = CacheMetrics.getInstance(); if (cached.isPresent()) { LOG.debug("Cache hit for entity: {} -> {}", entityType, entityId); + if (m != null) m.recordLayerHit(entityType); return cached.get(); } LOG.debug("Cache miss for entity: {} -> {}", entityType, entityId); + if (m != null) m.recordLayerMiss(entityType); // Fetch from database String entityJson = fetchEntityFromDatabase(entityId, entityType); @@ -48,6 +51,7 @@ public class CachedEntityDao { try { cache.hset( cacheKey, Map.of("base", entityJson), Duration.ofSeconds(config.entityTtlSeconds)); + if (m != null) m.recordLayerWrite(entityType); LOG.debug("Cached entity: {} -> {}", entityType, entityId); } catch (Exception e) { LOG.warn("Failed to cache entity: {} -> {}", entityType, entityId, e); @@ -183,7 +187,13 @@ public class CachedEntityDao { return Optional.empty(); } String cacheKey = keys.entityByName(entityType, fqn); - return cache.get(cacheKey); + Optional result = cache.get(cacheKey); + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) { + if (result.isPresent()) m.recordLayerHit(entityType); + else m.recordLayerMiss(entityType); + } + return result; } /** @@ -194,7 +204,13 @@ public class CachedEntityDao { return Optional.empty(); } String cacheKey = keys.entity(entityType, entityId); - return cache.hget(cacheKey, "ref"); + Optional result = cache.hget(cacheKey, "ref"); + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) { + if (result.isPresent()) m.recordLayerHit(entityType); + else m.recordLayerMiss(entityType); + } + return result; } /** @@ -205,7 +221,13 @@ public class CachedEntityDao { return Optional.empty(); } String cacheKey = keys.refByName(entityType, fqn); - return cache.get(cacheKey); + Optional result = cache.get(cacheKey); + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) { + if (result.isPresent()) m.recordLayerHit(entityType); + else m.recordLayerMiss(entityType); + } + return result; } public void invalidate(UUID entityId, String entityType) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedLineage.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedLineage.java new file mode 100644 index 00000000000..7cff25d7f23 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedLineage.java @@ -0,0 +1,173 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.cache; + +import com.google.common.util.concurrent.Striped; +import java.time.Duration; +import java.util.Optional; +import java.util.UUID; +import java.util.concurrent.locks.Lock; +import java.util.function.Supplier; +import lombok.extern.slf4j.Slf4j; + +/** + * Cache for {@code GET /api/v1/lineage/...} responses. Hybrid TTL + direct-invalidation: a short + * TTL ({@link CacheConfig#lineageTtlSeconds}, default 60s) acts as a backstop, while explicit + * invalidation handles the cases where staleness is most user-visible (the user just edited an + * entity or changed a lineage edge involving the affected root). + * + *

Why not a reverse index of every entity → root that contains it? Hub entities (popular + * tables referenced in thousands of lineage graphs) would invalidate all of them on every PATCH, + * causing a write storm. The TTL+direct strategy gives 90% of the value at 10% of the + * implementation cost; if production telemetry shows real staleness complaints we can upgrade + * later — design notes in {@code .context/cache-improvements-design.md}. + * + *

Cache-off semantics: when {@link CacheConfig#provider} is {@code none} or + * {@code lineageTtlSeconds <= 0}, {@link #enabled()} returns false. {@link #loadOrCompute} skips + * the cache check entirely and just runs the supplier — same behavior as if this layer didn't + * exist. No hard dependency on Redis. + */ +@Slf4j +public final class CachedLineage implements Invalidatable { + private final CacheProvider cache; + private final CacheKeys keys; + private final int ttlSeconds; + private final Striped loadLocks; + + public CachedLineage(CacheProvider cache, CacheKeys keys, CacheConfig config) { + this.cache = cache; + this.keys = keys; + this.ttlSeconds = config.lineageTtlSeconds; + // Same striping pattern as CachedReadBundle — shares the bundle stripe count for consistency. + this.loadLocks = Striped.lazyWeakLock(Math.max(16, config.bundleLoadLockStripes)); + } + + public boolean enabled() { + return ttlSeconds > 0 && cache != null && cache.available(); + } + + /** + * Single-flight load: cache lookup, then under a per-root stripe lock the supplier runs once + * and the result is cached. Concurrent waiters double-check the cache after acquiring the lock + * — the first waiter to win the race seeds the cache, the rest read it back without re-running + * the supplier. + * + *

If the cache is disabled, this degrades to {@code supplier.get()} with no locking. That + * matches what would happen if there were no cache layer at all — important for the + * "cache is optional" guarantee. + */ + public String loadOrCompute( + UUID rootId, + int upstreamDepth, + int downstreamDepth, + boolean includeDeleted, + Supplier supplier) { + if (!enabled()) { + return supplier.get(); + } + String key = keys.lineageGraph(rootId, upstreamDepth, downstreamDepth, includeDeleted); + Optional first = safeGet(key); + if (first.isPresent()) { + recordHit(); + return first.get(); + } + Lock lock = loadLocks.get(rootId); + lock.lock(); + try { + Optional recheck = safeGet(key); + if (recheck.isPresent()) { + recordHit(); + return recheck.get(); + } + recordMiss(); + String fresh = supplier.get(); + safePut(key, fresh); + return fresh; + } finally { + lock.unlock(); + } + } + + /** + * Invalidate every cached lineage variant rooted at {@code rootId} (all depths, both + * include-deleted flags). Called from entity mutation paths and from the + * {@code addLineage}/{@code deleteLineage} hooks for both endpoints of the affected edge. + * + *

No-op when the cache is disabled. + */ + public void invalidate(UUID rootId) { + if (!enabled() || rootId == null) { + return; + } + try { + long deleted = cache.scanDelete(keys.lineageGraphPattern(rootId)); + if (deleted > 0) { + LOG.debug("Lineage cache invalidated rootId={} keys={}", rootId, deleted); + } + } catch (Exception e) { + LOG.debug("Lineage invalidate failed for rootId={}", rootId, e); + } + } + + /** Convenience for the lineage edge mutation hooks — invalidates both endpoints. */ + public void invalidateEdge(UUID fromId, UUID toId) { + invalidate(fromId); + invalidate(toId); + } + + /** + * {@link Invalidatable} adapter. Lineage is keyed only by entity id (type doesn't enter the key + * because lineage relationships are between entities of any type) — so we drop everything for + * the given id and ignore type/fqn. + */ + @Override + public void invalidate(String type, UUID id, String fqn) { + invalidate(id); + } + + private Optional safeGet(String key) { + try { + return cache.get(key); + } catch (Exception e) { + LOG.debug("Lineage cache get failed (treated as miss) key={}", key, e); + return Optional.empty(); + } + } + + private void safePut(String key, String value) { + if (value == null) return; + try { + cache.set(key, value, Duration.ofSeconds(ttlSeconds)); + recordWrite(); + } catch (Exception e) { + LOG.debug("Lineage cache put failed key={}", key, e); + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) m.recordError(); + } + } + + private static void recordHit() { + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) m.recordLayerHit("lineage"); + } + + private static void recordMiss() { + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) m.recordLayerMiss("lineage"); + } + + private static void recordWrite() { + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) m.recordLayerWrite("lineage"); + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedReadBundle.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedReadBundle.java index cb7084805e3..f332d692e17 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedReadBundle.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedReadBundle.java @@ -55,15 +55,21 @@ public class CachedReadBundle { return null; } String key = keys.bundle(entityType, entityId); + String layerType = bundleType(entityType); + CacheMetrics m = CacheMetrics.getInstance(); try { Optional json = cache.get(key); if (json.isEmpty()) { + if (m != null) m.recordLayerMiss(layerType); return null; } - return JsonUtils.readValue(json.get(), Dto.class); + Dto dto = JsonUtils.readValue(json.get(), Dto.class); + if (m != null) m.recordLayerHit(layerType); + return dto; } catch (Exception e) { LOG.warn("Bad bundle cache entry, evicting: {} {}", entityType, entityId, e); cache.del(key); + if (m != null) m.recordError(); return null; } } @@ -76,11 +82,23 @@ public class CachedReadBundle { try { String json = JsonUtils.pojoToJson(dto); cache.set(key, json, Duration.ofSeconds(config.entityTtlSeconds)); + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) m.recordLayerWrite(bundleType(entityType)); } catch (Exception e) { LOG.warn("Failed to cache read bundle: {} {}", entityType, entityId, e); } } + /** + * Tag the bundle layer's per-type counters with a {@code bundle:} prefix so they + * sort separately from the entity-cache counters in {@code /cache/stats}. Without the prefix a + * bundle hit on table and an entity hit on table would merge — operators couldn't tell which + * layer is doing the work. + */ + private static String bundleType(String entityType) { + return "bundle:" + entityType; + } + public void invalidate(String entityType, UUID entityId) { if (EntityCacheBypass.isSkipped()) { return; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedSearchLayer.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedSearchLayer.java new file mode 100644 index 00000000000..0635ae4064b --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedSearchLayer.java @@ -0,0 +1,171 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.cache; + +import java.security.MessageDigest; +import java.security.NoSuchAlgorithmException; +import java.time.Duration; +import java.util.List; +import java.util.Optional; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.schema.search.SearchRequest; + +/** + * Auth-aware response cache for {@code GET /api/v1/search/query}. Keys include the principal so + * users with different ACLs do not see each other's filtered results. Cache key is + * {@code om::search:} where the SHA-256 input is the concatenation of every + * field that affects the result set, plus the principal name. TTL is short + * ({@link CacheConfig#searchTtlSeconds}, default 30s) — search is approximate and cache misses + * after writes self-heal within the TTL window. + * + *

Distinct from {@link CachedReadBundle}: that cache stores entity bundles by id; this one + * stores the entire ES/OS response body for a specific (query, principal) tuple. Search itself + * doesn't touch Redis without this layer — see plan Item 1 / cache-perf-findings.md. + */ +@Slf4j +public final class CachedSearchLayer { + private final CacheProvider cache; + private final String keyPrefix; + private final int ttlSeconds; + + public CachedSearchLayer(CacheProvider cache, CacheKeys keys, CacheConfig config) { + this.cache = cache; + this.keyPrefix = keys.search(); + this.ttlSeconds = config.searchTtlSeconds; + } + + public boolean enabled() { + return ttlSeconds > 0 && cache != null && cache.available(); + } + + public Optional get(SearchRequest request, String principalName) { + if (!enabled()) { + return Optional.empty(); + } + try { + String key = buildKey(request, principalName); + // The provider records its own untagged hit/miss; here we record a *layer-typed* one + // so /cache/stats can show a per-category hitRatio for "search". Don't bump the + // aggregate counter — the provider's get() already did. + Optional hit = cache.get(key); + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) { + if (hit.isPresent()) { + m.recordLayerHit("search"); + } else { + m.recordLayerMiss("search"); + } + } + return hit; + } catch (Exception e) { + LOG.debug("Search cache get failed (treated as miss)", e); + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) { + m.recordError(); + } + return Optional.empty(); + } + } + + public void put(SearchRequest request, String principalName, String responseJson) { + if (!enabled() || responseJson == null) { + return; + } + try { + String key = buildKey(request, principalName); + cache.set(key, responseJson, Duration.ofSeconds(ttlSeconds)); + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) { + m.recordLayerWrite("search"); + } + } catch (Exception e) { + LOG.debug("Search cache put failed (cache miss next time)", e); + CacheMetrics m = CacheMetrics.getInstance(); + if (m != null) { + m.recordError(); + } + } + } + + /** + * Build a deterministic cache key from every SearchRequest field that affects the result set, + * plus the principal name. Order matters — we serialize fields in a fixed order so the same + * logical request maps to the same key regardless of object construction order. + */ + String buildKey(SearchRequest request, String principalName) { + StringBuilder sb = new StringBuilder(512); + sb.append("p=").append(safe(principalName)).append('|'); + sb.append("idx=").append(safe(request.getIndex())).append('|'); + sb.append("q=").append(safe(request.getQuery())).append('|'); + sb.append("from=").append(request.getFrom()).append('|'); + sb.append("size=").append(request.getSize()).append('|'); + sb.append("qf=").append(safe(request.getQueryFilter())).append('|'); + sb.append("pf=").append(safe(request.getPostFilter())).append('|'); + sb.append("sf=").append(safe(request.getSortFieldParam())).append('|'); + sb.append("so=").append(safe(request.getSortOrder())).append('|'); + sb.append("fs=").append(request.getFetchSource()).append('|'); + sb.append("inc=").append(joinList(request.getIncludeSourceFields())).append('|'); + sb.append("exc=").append(joinList(request.getExcludeSourceFields())).append('|'); + sb.append("d=").append(request.getDeleted()).append('|'); + sb.append("h=").append(request.getIsHierarchy()).append('|'); + sb.append("ag=").append(request.getIncludeAggregations()).append('|'); + sb.append("ex=").append(request.getExplain()).append('|'); + sb.append("tt=").append(request.getTrackTotalHits()).append('|'); + sb.append("dom=").append(domainsKey(request)).append('|'); + sb.append("adf=").append(request.getApplyDomainFilter()).append('|'); + sb.append("sa=").append(safe(searchAfterKey(request))); + return keyPrefix + ":" + sha256Hex(sb.toString()); + } + + private static String safe(Object o) { + return o == null ? "" : o.toString(); + } + + private static String joinList(List list) { + return list == null || list.isEmpty() + ? "" + : String.join(",", list.stream().map(Object::toString).toList()); + } + + private static String domainsKey(SearchRequest request) { + if (request.getDomains() == null || request.getDomains().isEmpty()) { + return ""; + } + StringBuilder sb = new StringBuilder(); + for (var ref : request.getDomains()) { + if (ref != null && ref.getId() != null) { + sb.append(ref.getId()).append(','); + } + } + return sb.toString(); + } + + private static String searchAfterKey(SearchRequest request) { + var sa = request.getSearchAfter(); + return sa == null ? "" : sa.toString(); + } + + private static String sha256Hex(String input) { + try { + MessageDigest md = MessageDigest.getInstance("SHA-256"); + byte[] hash = md.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8)); + StringBuilder hex = new StringBuilder(hash.length * 2); + for (byte b : hash) { + hex.append(String.format("%02x", b)); + } + return hex.toString(); + } catch (NoSuchAlgorithmException e) { + throw new IllegalStateException("SHA-256 not available", e); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/Invalidatable.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/Invalidatable.java new file mode 100644 index 00000000000..08e2232f9f8 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/Invalidatable.java @@ -0,0 +1,43 @@ +/* + * Copyright 2026 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.cache; + +import java.util.UUID; + +/** + * Contract for any cache layer that holds entity-keyed data and needs to drop entries when the + * entity is mutated. Registered with {@link CacheBundle#registerInvalidatable(Invalidatable)} so + * the central remote-invalidation handler and {@link CacheBundle#invalidateEntity} can fan a + * single (type, id, fqn) tuple out to every registered layer. + * + *

Adding a new cache layer? Implement this interface, call {@code registerInvalidatable} in + * {@link CacheBundle#run}. The compiler enforces that you didn't forget — the registration is + * trivial code, but missing it means the layer silently serves stale data after writes. + * + *

Implementations must be safe to call when the cache is disabled. The contract is "do + * nothing if you're not actually holding the data"; never throw on a no-op invalidate. + */ +public interface Invalidatable { + + /** + * Drop every cached entry that may be affected by a write to the entity identified by + * {@code (type, id, fqn)}. Either {@code id} or {@code fqn} may be null when the writer doesn't + * have both; implementations should drop what they can. + * + *

Called on the local pod synchronously from {@code EntityRepository.postUpdate / + * postDelete / restoreEntity}, and on remote pods from the {@code CacheInvalidationPubSub} + * subscriber. Both paths are best-effort; an exception here is logged and swallowed at the + * caller — never let a cache hiccup take down a write path. + */ + void invalidate(String type, UUID id, String fqn); +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java b/openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java index 145836e6285..158251f7327 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/cache/RedisCacheProvider.java @@ -561,6 +561,39 @@ public class RedisCacheProvider implements CacheProvider { } } + @Override + public long scanDelete(String pattern) { + if (!available || pattern == null || pattern.isEmpty()) { + return 0L; + } + long deleted = 0L; + try { + io.lettuce.core.ScanArgs args = io.lettuce.core.ScanArgs.Builder.matches(pattern).limit(500); + io.lettuce.core.KeyScanCursor cursor = syncCommands.scan(args); + while (true) { + java.util.List keys = cursor.getKeys(); + if (!keys.isEmpty()) { + // UNLINK is async-delete on the Redis side — same effect as DEL but doesn't block the + // event loop on large value reclamation. Falls back to DEL on Redis < 4.0, which we do + // not target. + deleted += syncCommands.unlink(keys.toArray(new String[0])); + CacheMetrics m = metrics(); + if (m != null) { + for (int i = 0; i < keys.size(); i++) m.recordEviction(); + } + } + if (cursor.isFinished()) break; + cursor = syncCommands.scan(cursor, args); + } + return deleted; + } catch (Exception e) { + LOG.warn("scanDelete failed for pattern={}", pattern, e); + CacheMetrics m = metrics(); + if (m != null) m.recordError(); + return deleted; + } + } + @Override public void close() { try { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index a02f786c157..3d317d171a2 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -2898,6 +2898,10 @@ public abstract class EntityRepository { if (cachedReadBundle != null) { cachedReadBundle.invalidate(entityType, row.id); } + var cachedLineage = CacheBundle.getCachedLineage(); + if (cachedLineage != null) { + cachedLineage.invalidate(row.id); + } if (pubsub != null) { pubsub.publish(entityType, row.id, row.fqn, "rename-cascade"); } @@ -2955,6 +2959,10 @@ public abstract class EntityRepository { if (cachedReadBundle != null) { cachedReadBundle.invalidate(entityType, id); } + var cachedLineage = CacheBundle.getCachedLineage(); + if (cachedLineage != null) { + cachedLineage.invalidate(id); + } var pubsub = CacheBundle.getCacheInvalidationPubSub(); if (pubsub != null) { pubsub.publish(entityType, id, fqn, "ref-change"); @@ -3138,6 +3146,13 @@ public abstract class EntityRepository { cachedReadBundle.invalidate(entityType, entity.getId()); } + // Invalidate cached lineage rooted at this entity. Transitive changes (entity X is a node + // in someone else's cached graph) fall through to the 60s TTL — see CachedLineage doc. + var cachedLineage = CacheBundle.getCachedLineage(); + if (cachedLineage != null) { + cachedLineage.invalidate(entity.getId()); + } + // Invalidate tag caches var cachedTagUsageDao = CacheBundle.getCachedTagUsageDao(); if (cachedTagUsageDao != null) { @@ -8595,6 +8610,10 @@ public abstract class EntityRepository { if (cachedReadBundle != null) { cachedReadBundle.invalidate(entityType, id); } + var cachedLineage = CacheBundle.getCachedLineage(); + if (cachedLineage != null) { + cachedLineage.invalidate(id); + } // Synchronous repopulate: the write path holds the request thread until Redis is updated, // so the next GET on this instance can't race an in-flight async repopulate. diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java index 38d8a16a0a1..76e6fbcd79a 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/LineageRepository.java @@ -181,6 +181,14 @@ public class LineageRepository { detailsJson); addLineageToSearch(from, to, lineageDetails); + // Direct invalidation of cached lineage rooted at either endpoint of the new edge. + // Other roots that transitively contain these endpoints fall through to the TTL backstop — + // see CachedLineage class doc for the design rationale. + var cachedLineage = org.openmetadata.service.cache.CacheBundle.getCachedLineage(); + if (cachedLineage != null) { + cachedLineage.invalidateEdge(from.getId(), to.getId()); + } + // Add lineage to RDF if (RdfUpdater.isEnabled()) { EntityRelationship lineageRelationship = @@ -1048,6 +1056,11 @@ public class LineageRepository { if (result) { cleanUpExtendedLineage(from, to, lineageDetails); + // Direct invalidation of cached lineage rooted at either endpoint of the removed edge. + var cachedLineage = org.openmetadata.service.cache.CacheBundle.getCachedLineage(); + if (cachedLineage != null) { + cachedLineage.invalidateEdge(from.getId(), to.getId()); + } } return result; } @@ -1118,6 +1131,11 @@ public class LineageRepository { if (result) { cleanUpExtendedLineage(from, to, lineageDetails); + // Direct invalidation of cached lineage rooted at either endpoint of the removed edge. + var cachedLineage = org.openmetadata.service.cache.CacheBundle.getCachedLineage(); + if (cachedLineage != null) { + cachedLineage.invalidateEdge(from.getId(), to.getId()); + } } return result; } @@ -1308,6 +1326,27 @@ public class LineageRepository { private EntityLineage getLineage( EntityReference primary, int upstreamDepth, int downstreamDepth) { + // Wrap the (multi-second) lineage computation in the optional Redis cache. The cache layer + // is no-op when CACHE_PROVIDER=none — this method then behaves exactly as it did before the + // layer existed. See CachedLineage class doc for the TTL+direct-invalidation strategy. + var cachedLineage = org.openmetadata.service.cache.CacheBundle.getCachedLineage(); + if (cachedLineage == null || !cachedLineage.enabled()) { + return computeLineage(primary, upstreamDepth, downstreamDepth); + } + String json = + cachedLineage.loadOrCompute( + primary.getId(), + upstreamDepth, + downstreamDepth, + false /* includeDeleted */, + () -> + org.openmetadata.schema.utils.JsonUtils.pojoToJson( + computeLineage(primary, upstreamDepth, downstreamDepth))); + return org.openmetadata.schema.utils.JsonUtils.readValue(json, EntityLineage.class); + } + + private EntityLineage computeLineage( + EntityReference primary, int upstreamDepth, int downstreamDepth) { List entities = new ArrayList<>(); EntityLineage lineage = new EntityLineage() diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java index 18131ea2262..81d667221c6 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/search/SearchResource.java @@ -254,7 +254,24 @@ public class SearchResource { .withSearchAfter(SearchUtils.searchAfter(searchAfter)) .withExplain(explain) .withIncludeAggregations(includeAggregations); - return searchRepository.search(request, subjectContext); + + // Auth-aware response cache (Item 1). Bots bypass — they do bulk indexing reads with + // cardinalities that would pollute the user-keyed cache. + org.openmetadata.service.cache.CachedSearchLayer searchCache = + org.openmetadata.service.cache.CacheBundle.getCachedSearchLayer(); + String principal = subjectContext.user() != null ? subjectContext.user().getName() : null; + boolean cacheable = searchCache != null && searchCache.enabled() && !subjectContext.isBot(); + if (cacheable) { + java.util.Optional cached = searchCache.get(request, principal); + if (cached.isPresent()) { + return Response.ok(cached.get(), MediaType.APPLICATION_JSON_TYPE).build(); + } + } + Response upstream = searchRepository.search(request, subjectContext); + if (cacheable && upstream.getStatus() == 200 && upstream.getEntity() instanceof String body) { + searchCache.put(request, principal, body); + } + return upstream; } @GET diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/resources/system/SystemResource.java b/openmetadata-service/src/main/java/org/openmetadata/service/resources/system/SystemResource.java index 9bde39b93cb..4c1508af5bc 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/resources/system/SystemResource.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/resources/system/SystemResource.java @@ -954,6 +954,11 @@ public class SystemResource { authorizer.authorizeAdmin(securityContext); Map stats = CacheBundle.getCacheProvider().getStats(); + org.openmetadata.service.cache.CacheMetrics metrics = + org.openmetadata.service.cache.CacheMetrics.getInstance(); + if (metrics != null) { + stats.put("metrics", metrics.snapshot()); + } return Response.ok(stats).build(); }