mirror of
https://github.com/open-metadata/OpenMetadata
synced 2026-05-24 09:39:11 +00:00
cache: lineage cache, per-type metrics, invalidation registry, search-cache
Add Redis-backed lineage response cache and search response cache, both gated by the existing CACHE_PROVIDER toggle and falling through to direct computation when the cache is unavailable. The cache remains optional — verified end-to-end by toggling CACHE_PROVIDER=none on a live stack and confirming all paths continue to work (just without the L2 hit). Coverage: - CachedLineage wraps LineageRepository.getLineage with hybrid TTL + direct invalidation (60s default). Direct edits invalidate the affected root cache entries; transitive changes fall through to TTL. - CachedSearchLayer wraps /api/v1/search/query with auth-aware caching (cache key includes principal so users with different ACLs don't share results). 30s default TTL. Observability: - /api/v1/system/cache/stats response now includes a metrics block with hits/misses/hitRatio/evictions/errors/writes plus read/write latency Timers, and a byType breakdown so coverage gaps are visible per entity-type and per cache-layer. Correctness: - New Invalidatable interface + CacheBundle registry + invalidateEntity helper so future cache layers plug in by implementing one method instead of editing multiple mutation paths. - Edge mutations in LineageRepository.addLineage/deleteLineage invalidate both endpoints; entity mutations in EntityRepository.postUpdate / postDelete / restoreEntity invalidate the lineage rooted at the entity. - Pub/sub handler in CacheBundle iterates registered Invalidatables so remote-pod evictions flow to all layers automatically. Tooling: - docker-compose.cache-off.yml overlay flips CACHE_PROVIDER=none for local A/B testing without tearing down DB/ES volumes. - CachedSearchLayerIT exercises hit-on-second-call, distinct-query misses, distinct-page-size misses, and byType shape via the metrics endpoint. Each test gracefully no-ops when the cluster runs cache-off. Co-Authored-By: Claude Opus 4.7 (1M context) <noreply@anthropic.com>
This commit is contained in:
parent
41cfcf995e
commit
c20a29b11b
17 changed files with 1056 additions and 5 deletions
8
docker/development/docker-compose.cache-off.yml
Normal file
8
docker/development/docker-compose.cache-off.yml
Normal file
|
|
@ -0,0 +1,8 @@
|
|||
# Override that disables the cache while leaving the rest of the stack intact.
|
||||
# Used in the local A/B benchmark to flip cache off without tearing down volumes.
|
||||
# Apply on TOP of base compose (NOT the redis overlay):
|
||||
# docker compose -f docker-compose.yml -f docker-compose.cache-off.yml up -d --no-deps openmetadata-server
|
||||
services:
|
||||
openmetadata-server:
|
||||
environment:
|
||||
CACHE_PROVIDER: none
|
||||
|
|
@ -0,0 +1,254 @@
|
|||
/*
|
||||
* Copyright 2026 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.openmetadata.it.tests;
|
||||
|
||||
import static org.junit.jupiter.api.Assertions.assertEquals;
|
||||
import static org.junit.jupiter.api.Assertions.assertNotNull;
|
||||
import static org.junit.jupiter.api.Assertions.assertTrue;
|
||||
|
||||
import java.util.Map;
|
||||
import org.junit.jupiter.api.Test;
|
||||
import org.junit.jupiter.api.parallel.Execution;
|
||||
import org.junit.jupiter.api.parallel.ExecutionMode;
|
||||
import org.openmetadata.it.util.SdkClients;
|
||||
import org.openmetadata.sdk.client.OpenMetadataClient;
|
||||
import org.openmetadata.sdk.network.HttpMethod;
|
||||
import org.openmetadata.sdk.network.RequestOptions;
|
||||
|
||||
/**
|
||||
* Integration tests for {@link org.openmetadata.service.cache.CachedSearchLayer}.
|
||||
*
|
||||
* <p>Verifies the auth-aware search-response cache shipped in plan Item 1 of
|
||||
* {@code .context/cache-perf-findings.md}. Tests are designed to be cache-on; if the cluster is
|
||||
* running with {@code CACHE_PROVIDER=none}, the {@code byType.search} block in
|
||||
* {@code /cache/stats} stays at zero and the lazily-asserted hit-ratio assertions are skipped
|
||||
* (the cache is expected to be inactive).
|
||||
*
|
||||
* <p>Skipped here (deferred to follow-up):
|
||||
*
|
||||
* <ul>
|
||||
* <li>TTL expiry — would block CI for 30s+ per assertion. Manual verification only.
|
||||
* <li>{@code CACHE_PROVIDER=none} mode — needs a separate test profile that boots the server
|
||||
* with the cache-off overlay. Tracked in cache plan Phase 2.
|
||||
* <li>Cross-principal isolation — needs a non-admin token; we verify same-principal
|
||||
* hit/miss as a proxy for now.
|
||||
* </ul>
|
||||
*/
|
||||
@Execution(ExecutionMode.CONCURRENT)
|
||||
class CachedSearchLayerIT {
|
||||
|
||||
private static final String SEARCH_PATH = "/v1/search/query";
|
||||
private static final String STATS_PATH = "/v1/system/cache/stats";
|
||||
|
||||
/**
|
||||
* The same query from the same principal hits the cache on call 2+. Verify by capturing the
|
||||
* application-level metrics block delta — we expect at least one increment in
|
||||
* {@code metrics.byType.search.hits} for {@code N-1} of {@code N} calls.
|
||||
*/
|
||||
@Test
|
||||
void sameQueryHitsCacheOnSecondCall() {
|
||||
OpenMetadataClient client = SdkClients.adminClient();
|
||||
String query = "*";
|
||||
String index = "table_search_index";
|
||||
int size = 10;
|
||||
|
||||
Stats before = readStats(client);
|
||||
if (!before.cacheEnabled) {
|
||||
// Cache disabled — nothing to assert. Test still passes; the search itself must work.
|
||||
runSearch(client, query, index, size);
|
||||
runSearch(client, query, index, size);
|
||||
return;
|
||||
}
|
||||
|
||||
// Three identical calls. First is a cold miss + write; the other two are hits.
|
||||
runSearch(client, query, index, size);
|
||||
runSearch(client, query, index, size);
|
||||
runSearch(client, query, index, size);
|
||||
|
||||
Stats after = readStats(client);
|
||||
long hitsDelta = after.searchHits - before.searchHits;
|
||||
long missesDelta = after.searchMisses - before.searchMisses;
|
||||
long writesDelta = after.searchWrites - before.searchWrites;
|
||||
|
||||
// At least 2 of the 3 calls should be hits. We don't pin to exactly 2 because other tests
|
||||
// running concurrently may also issue searches against the same query. We do require at
|
||||
// least one new write (the cold first call's populate) and at least 2 new hits.
|
||||
assertTrue(
|
||||
hitsDelta >= 2,
|
||||
"Expected ≥2 search cache hits across 3 identical calls; saw delta hits=" + hitsDelta);
|
||||
assertTrue(
|
||||
writesDelta >= 1,
|
||||
"Expected ≥1 search cache write on first call; saw delta writes=" + writesDelta);
|
||||
// Total lookups (hits+misses) should be at least 3 — one per call.
|
||||
assertTrue(
|
||||
hitsDelta + missesDelta >= 3,
|
||||
"Expected ≥3 search cache lookups across 3 identical calls; saw delta lookups="
|
||||
+ (hitsDelta + missesDelta));
|
||||
}
|
||||
|
||||
/**
|
||||
* Different query strings are different cache keys, so each is a fresh miss. Verify the search
|
||||
* miss counter increments by at least the number of distinct queries we issue.
|
||||
*/
|
||||
@Test
|
||||
void differentQueriesProduceDistinctMisses() {
|
||||
OpenMetadataClient client = SdkClients.adminClient();
|
||||
Stats before = readStats(client);
|
||||
if (!before.cacheEnabled) {
|
||||
// Cache disabled — exercise the path; no assertions.
|
||||
runSearch(client, "alpha", "table_search_index", 10);
|
||||
runSearch(client, "beta", "table_search_index", 10);
|
||||
runSearch(client, "gamma", "table_search_index", 10);
|
||||
return;
|
||||
}
|
||||
|
||||
// Use unique enough query strings that other tests are unlikely to be hitting them.
|
||||
runSearch(client, "csliit_alpha_" + System.nanoTime(), "table_search_index", 10);
|
||||
runSearch(client, "csliit_beta_" + System.nanoTime(), "table_search_index", 10);
|
||||
runSearch(client, "csliit_gamma_" + System.nanoTime(), "table_search_index", 10);
|
||||
|
||||
Stats after = readStats(client);
|
||||
long missesDelta = after.searchMisses - before.searchMisses;
|
||||
long writesDelta = after.searchWrites - before.searchWrites;
|
||||
|
||||
// Each unique query should miss (cold) and write on the back side.
|
||||
assertTrue(
|
||||
missesDelta >= 3,
|
||||
"Expected ≥3 search cache misses for 3 distinct queries; saw delta misses=" + missesDelta);
|
||||
assertTrue(
|
||||
writesDelta >= 3,
|
||||
"Expected ≥3 search cache writes for 3 distinct queries; saw delta writes=" + writesDelta);
|
||||
}
|
||||
|
||||
/**
|
||||
* Different {@code size} values are distinct cache entries — same {@code q} and {@code index}
|
||||
* but different page coordinates. Verify each is its own miss.
|
||||
*/
|
||||
@Test
|
||||
void differentSizeValuesProduceDistinctMisses() {
|
||||
OpenMetadataClient client = SdkClients.adminClient();
|
||||
String query = "csliit_size_" + System.nanoTime();
|
||||
Stats before = readStats(client);
|
||||
if (!before.cacheEnabled) {
|
||||
runSearch(client, query, "table_search_index", 5);
|
||||
runSearch(client, query, "table_search_index", 10);
|
||||
runSearch(client, query, "table_search_index", 25);
|
||||
return;
|
||||
}
|
||||
|
||||
runSearch(client, query, "table_search_index", 5);
|
||||
runSearch(client, query, "table_search_index", 10);
|
||||
runSearch(client, query, "table_search_index", 25);
|
||||
|
||||
Stats after = readStats(client);
|
||||
long missesDelta = after.searchMisses - before.searchMisses;
|
||||
assertTrue(
|
||||
missesDelta >= 3,
|
||||
"Different size= values should be separate cache entries; saw delta misses=" + missesDelta);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@code /cache/stats} surfaces a per-type breakdown including a {@code search} entry once the
|
||||
* search cache has been exercised. Smoke test that the byType block shape is sane.
|
||||
*/
|
||||
@Test
|
||||
void cacheStatsExposesSearchByType() {
|
||||
OpenMetadataClient client = SdkClients.adminClient();
|
||||
runSearch(client, "*", "table_search_index", 10);
|
||||
Stats stats = readStats(client);
|
||||
if (!stats.cacheEnabled) {
|
||||
return; // cache off; nothing to verify
|
||||
}
|
||||
assertNotNull(stats.byType, "byType block must be present in /cache/stats response");
|
||||
Object searchEntry = stats.byType.get("search");
|
||||
if (searchEntry != null) {
|
||||
assertTrue(
|
||||
searchEntry instanceof Map,
|
||||
"byType.search should be a Map<String,Object>; got " + searchEntry.getClass());
|
||||
Map<?, ?> m = (Map<?, ?>) searchEntry;
|
||||
assertNotNull(m.get("hits"));
|
||||
assertNotNull(m.get("misses"));
|
||||
assertNotNull(m.get("writes"));
|
||||
assertNotNull(m.get("hitRatio"));
|
||||
}
|
||||
// If searchEntry is null, the cluster is cache-off OR no search call has happened yet on
|
||||
// this cluster instance. Either way, no failure — the contract is "if it's there, it's
|
||||
// well-shaped," not "it's always there."
|
||||
}
|
||||
|
||||
// -------------------------------------------------------------------------------------------
|
||||
// Helpers
|
||||
|
||||
private static void runSearch(OpenMetadataClient client, String q, String index, int size) {
|
||||
RequestOptions opts =
|
||||
RequestOptions.builder()
|
||||
.queryParam("q", q)
|
||||
.queryParam("index", index)
|
||||
.queryParam("size", String.valueOf(size))
|
||||
.build();
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> ignored =
|
||||
client.getHttpClient().execute(HttpMethod.GET, SEARCH_PATH, null, Map.class, opts);
|
||||
assertNotNull(ignored, "search response must not be null");
|
||||
}
|
||||
|
||||
private static Stats readStats(OpenMetadataClient client) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> raw =
|
||||
client
|
||||
.getHttpClient()
|
||||
.execute(HttpMethod.GET, STATS_PATH, null, Map.class, RequestOptions.builder().build());
|
||||
Stats s = new Stats();
|
||||
s.cacheEnabled = Boolean.TRUE.equals(raw.get("available"));
|
||||
Object metricsObj = raw.get("metrics");
|
||||
if (metricsObj instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> metrics = (Map<String, Object>) metricsObj;
|
||||
Object byTypeObj = metrics.get("byType");
|
||||
if (byTypeObj instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> byType = (Map<String, Object>) byTypeObj;
|
||||
s.byType = byType;
|
||||
Object searchEntry = byType.get("search");
|
||||
if (searchEntry instanceof Map) {
|
||||
@SuppressWarnings("unchecked")
|
||||
Map<String, Object> e = (Map<String, Object>) searchEntry;
|
||||
s.searchHits = toLong(e.get("hits"));
|
||||
s.searchMisses = toLong(e.get("misses"));
|
||||
s.searchWrites = toLong(e.get("writes"));
|
||||
}
|
||||
}
|
||||
}
|
||||
return s;
|
||||
}
|
||||
|
||||
private static long toLong(Object o) {
|
||||
if (o == null) return 0L;
|
||||
if (o instanceof Number n) return n.longValue();
|
||||
return Long.parseLong(o.toString());
|
||||
}
|
||||
|
||||
private static class Stats {
|
||||
boolean cacheEnabled;
|
||||
Map<String, Object> byType;
|
||||
long searchHits;
|
||||
long searchMisses;
|
||||
long searchWrites;
|
||||
}
|
||||
|
||||
/** Compile-time check on test machinery. Asserts the helper compiles independently. */
|
||||
@SuppressWarnings("unused")
|
||||
private static void compileCheck() {
|
||||
assertEquals(0L, toLong(null));
|
||||
}
|
||||
}
|
||||
|
|
@ -21,8 +21,16 @@ public class CacheBundle implements ConfiguredBundle<OpenMetadataApplicationConf
|
|||
private static CachedReadBundle cachedReadBundle;
|
||||
private static AncestorsCache ancestorsCache;
|
||||
private static ChildrenPageCache childrenPageCache;
|
||||
private static CachedSearchLayer cachedSearchLayer;
|
||||
private static CachedLineage cachedLineage;
|
||||
private static CacheInvalidationPubSub cacheInvalidationPubSub;
|
||||
private static CacheConfig cacheConfig;
|
||||
// Registry of cache layers that implement Invalidatable. Both the pub-sub handler (remote pod
|
||||
// writes) and CacheBundle.invalidateEntity (local mutations) iterate this list and fan out a
|
||||
// single (type, id, fqn) tuple to every registered layer. New cache layers should call
|
||||
// registerInvalidatable() in their owner — typically here in run(), right after construction.
|
||||
private static final java.util.List<Invalidatable> INVALIDATABLES =
|
||||
new java.util.concurrent.CopyOnWriteArrayList<>();
|
||||
|
||||
public CacheBundle() {
|
||||
instance = this;
|
||||
|
|
@ -72,6 +80,13 @@ public class CacheBundle implements ConfiguredBundle<OpenMetadataApplicationConf
|
|||
cachedReadBundle = new CachedReadBundle(cacheProvider, keys, cacheConfig);
|
||||
ancestorsCache = new AncestorsCache(cacheProvider, keys, cacheConfig);
|
||||
childrenPageCache = new ChildrenPageCache(cacheProvider, keys, cacheConfig);
|
||||
cachedSearchLayer = new CachedSearchLayer(cacheProvider, keys, cacheConfig);
|
||||
cachedLineage = new CachedLineage(cacheProvider, keys, cacheConfig);
|
||||
// Register all id-keyed cache layers that participate in entity-write invalidation.
|
||||
// Layers with type/fqn-keyed semantics (CachedReadBundle, AncestorsCache, etc.) keep
|
||||
// their existing wiring for now — see .context/cache-improvements-design.md P1.3 for
|
||||
// the full audit and the planned migration of those layers to the registry.
|
||||
registerInvalidatable(cachedLineage);
|
||||
cacheInvalidationPubSub = new CacheInvalidationPubSub(cacheConfig);
|
||||
cacheInvalidationPubSub.setHandler(
|
||||
msg -> {
|
||||
|
|
@ -81,6 +96,16 @@ public class CacheBundle implements ConfiguredBundle<OpenMetadataApplicationConf
|
|||
if (msg.id() != null && cachedReadBundle != null) {
|
||||
cachedReadBundle.invalidate(msg.type(), msg.id());
|
||||
}
|
||||
// Fan invalidation out to every Invalidatable registered with the bundle. This is
|
||||
// the path new cache layers should plug into — implement Invalidatable, call
|
||||
// registerInvalidatable, and the remote-pod invalidation Just Works.
|
||||
for (Invalidatable layer : INVALIDATABLES) {
|
||||
try {
|
||||
layer.invalidate(msg.type(), msg.id(), msg.fqn());
|
||||
} catch (Exception ex) {
|
||||
LOG.debug("Invalidatable {} failed for {}", layer, msg, ex);
|
||||
}
|
||||
}
|
||||
// Container-only derived caches: ancestors keyed by descendant FQN, children-page
|
||||
// keyed by parent FQN. Other entity types don't have these caches today, so this
|
||||
// gate keeps unrelated invalidations from doing redundant Redis work on every
|
||||
|
|
@ -151,6 +176,42 @@ public class CacheBundle implements ConfiguredBundle<OpenMetadataApplicationConf
|
|||
return childrenPageCache;
|
||||
}
|
||||
|
||||
public static CachedSearchLayer getCachedSearchLayer() {
|
||||
return cachedSearchLayer;
|
||||
}
|
||||
|
||||
public static CachedLineage getCachedLineage() {
|
||||
return cachedLineage;
|
||||
}
|
||||
|
||||
/**
|
||||
* Register an {@link Invalidatable} cache layer with the bundle. Both the pub-sub handler
|
||||
* (remote pod writes) and {@link #invalidateEntity} (local mutations) iterate registered
|
||||
* layers and call {@code invalidate(type, id, fqn)} on each. Idempotent — safe to call
|
||||
* multiple times for the same instance.
|
||||
*/
|
||||
public static void registerInvalidatable(Invalidatable layer) {
|
||||
if (layer != null && !INVALIDATABLES.contains(layer)) {
|
||||
INVALIDATABLES.add(layer);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Fan an entity-write invalidation out to every registered Invalidatable. Called from
|
||||
* {@code EntityRepository.postUpdate / postDelete / restoreEntity} on the local pod; remote
|
||||
* pods invoke the same fan-out via the pub-sub handler above. No-op if no layers are
|
||||
* registered (cache disabled or none registered yet).
|
||||
*/
|
||||
public static void invalidateEntity(String type, java.util.UUID id, String fqn) {
|
||||
for (Invalidatable layer : INVALIDATABLES) {
|
||||
try {
|
||||
layer.invalidate(type, id, fqn);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Invalidatable {} failed for type={} id={} fqn={}", layer, type, id, fqn, e);
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
public static CacheInvalidationPubSub getCacheInvalidationPubSub() {
|
||||
return cacheInvalidationPubSub;
|
||||
}
|
||||
|
|
|
|||
|
|
@ -19,6 +19,19 @@ public class CacheConfig {
|
|||
public int relationshipTtlSeconds = 3600; // 1 hour
|
||||
public int tagTtlSeconds = 3600; // 1 hour
|
||||
|
||||
// /api/v1/search/query response cache. Short TTL because search hits ES which usually
|
||||
// has its own request cache; 30s catches the typical "user types and re-searches the
|
||||
// same thing within a minute" pattern without serving badly stale results after writes.
|
||||
// Set to 0 to disable.
|
||||
public int searchTtlSeconds = 30;
|
||||
|
||||
// /api/v1/lineage/* response cache. Hybrid TTL + direct-invalidation strategy: a 60s TTL
|
||||
// backstops cases where a transitive change (an entity deep in the cached graph) wasn't
|
||||
// explicitly invalidated. Direct edits (entity rename/delete, lineage edge add/remove)
|
||||
// still invalidate the affected root cache entries immediately. Set to 0 to disable —
|
||||
// the read path falls through to LineageRepository.computeLineage as if no cache existed.
|
||||
public int lineageTtlSeconds = 60;
|
||||
|
||||
// Listing total-row counts. Short TTL because counts are best-effort: a freshly created
|
||||
// entity may not show up in paging.total for up to listCountTtlSeconds, but the list
|
||||
// itself is always live. Keeps repeated /containers, /tables, /dashboards listings
|
||||
|
|
|
|||
|
|
@ -110,6 +110,42 @@ public final class CacheKeys {
|
|||
* toggling the UI's "Deleted" switch would return a stale page from the other side until
|
||||
* the version stamp rotates.
|
||||
*/
|
||||
/**
|
||||
* Prefix for cached {@code GET /api/v1/search/query} responses. Per-principal,
|
||||
* per-(query+filters+pagination) entries hash-suffixed in {@link CachedSearchLayer}.
|
||||
*/
|
||||
public String search() {
|
||||
return ns + ":search";
|
||||
}
|
||||
|
||||
/**
|
||||
* Cached lineage graph keyed by the root entity. {@code upstreamDepth} and {@code
|
||||
* downstreamDepth} are folded into the key so each variant is its own cache entry. Invalidation
|
||||
* issues a SCAN + UNLINK for {@link #lineageGraphPattern} to drop every variant for that root
|
||||
* at once.
|
||||
*
|
||||
* <p>The {@code rootId} is wrapped in Redis hash-tag braces so when we eventually run on Redis
|
||||
* Cluster, all lineage variants for the same root land on the same slot — keeps SCAN+UNLINK
|
||||
* efficient and avoids cross-slot pipeline rejection.
|
||||
*/
|
||||
public String lineageGraph(
|
||||
java.util.UUID rootId, int upstreamDepth, int downstreamDepth, boolean includeDeleted) {
|
||||
return ns
|
||||
+ ":lineage:graph:{"
|
||||
+ rootId.toString()
|
||||
+ "}:up="
|
||||
+ upstreamDepth
|
||||
+ ":down="
|
||||
+ downstreamDepth
|
||||
+ ":incDel="
|
||||
+ includeDeleted;
|
||||
}
|
||||
|
||||
/** SCAN/UNLINK pattern for {@link #lineageGraph} — matches every depth/include variant. */
|
||||
public String lineageGraphPattern(java.util.UUID rootId) {
|
||||
return ns + ":lineage:graph:{" + rootId.toString() + "}:*";
|
||||
}
|
||||
|
||||
public String childrenPage(
|
||||
String type, String parentFqn, String version, int limit, int offset, String includeTag) {
|
||||
String fqnHash = FullyQualifiedName.buildHash(parentFqn);
|
||||
|
|
|
|||
|
|
@ -4,6 +4,7 @@ import io.micrometer.core.instrument.Counter;
|
|||
import io.micrometer.core.instrument.Gauge;
|
||||
import io.micrometer.core.instrument.MeterRegistry;
|
||||
import io.micrometer.core.instrument.Timer;
|
||||
import java.util.LinkedHashMap;
|
||||
import java.util.Map;
|
||||
import java.util.concurrent.ConcurrentHashMap;
|
||||
import java.util.concurrent.atomic.AtomicLong;
|
||||
|
|
@ -33,6 +34,15 @@ public class CacheMetrics {
|
|||
// at startup. Holders keep the AtomicLong reference alive so the gauge stays observable.
|
||||
private final Map<String, AtomicLong> coverageGauges = new ConcurrentHashMap<>();
|
||||
private final Map<String, AtomicLong> bundleCoverageGauges = new ConcurrentHashMap<>();
|
||||
// Per-type layer counters — registered lazily on first use. Distinct from the untagged
|
||||
// counters above: these track *logical* hits at a cache layer (e.g. CachedSearchLayer,
|
||||
// CachedEntityDao for type=table), while the untagged counters track *every* Redis op
|
||||
// including those issued by sub-operations of a single layer call. The two views are
|
||||
// related but not identical; use byType for "is this entity type's cache effective?"
|
||||
// and the aggregate for "what's our Redis traffic look like?".
|
||||
private final Map<String, Counter> typedHits = new ConcurrentHashMap<>();
|
||||
private final Map<String, Counter> typedMisses = new ConcurrentHashMap<>();
|
||||
private final Map<String, Counter> typedWrites = new ConcurrentHashMap<>();
|
||||
|
||||
private CacheMetrics(MeterRegistry meterRegistry) {
|
||||
this.meterRegistry = meterRegistry;
|
||||
|
|
@ -204,6 +214,59 @@ public class CacheMetrics {
|
|||
warmupCompletedRuns.incrementAndGet();
|
||||
}
|
||||
|
||||
/**
|
||||
* Layer-level hit recording. {@code type} is a free-form discriminator chosen by the calling
|
||||
* cache layer — entity types like "table" / "container" for {@link CachedEntityDao}, or category
|
||||
* names like "search" / "lineage" for the higher-level layers. {@code null} is a no-op (the
|
||||
* aggregate counters above are untouched, so the call is safe from any context).
|
||||
*/
|
||||
public void recordLayerHit(String type) {
|
||||
if (type != null) {
|
||||
typedHits
|
||||
.computeIfAbsent(
|
||||
type,
|
||||
t ->
|
||||
Counter.builder("cache.layer.hits")
|
||||
.description("Per-type cache hits at the layer level")
|
||||
.tag("cache", "redis")
|
||||
.tag("type", t)
|
||||
.register(meterRegistry))
|
||||
.increment();
|
||||
}
|
||||
}
|
||||
|
||||
/** See {@link #recordLayerHit(String)}. */
|
||||
public void recordLayerMiss(String type) {
|
||||
if (type != null) {
|
||||
typedMisses
|
||||
.computeIfAbsent(
|
||||
type,
|
||||
t ->
|
||||
Counter.builder("cache.layer.misses")
|
||||
.description("Per-type cache misses at the layer level")
|
||||
.tag("cache", "redis")
|
||||
.tag("type", t)
|
||||
.register(meterRegistry))
|
||||
.increment();
|
||||
}
|
||||
}
|
||||
|
||||
/** See {@link #recordLayerHit(String)}. */
|
||||
public void recordLayerWrite(String type) {
|
||||
if (type != null) {
|
||||
typedWrites
|
||||
.computeIfAbsent(
|
||||
type,
|
||||
t ->
|
||||
Counter.builder("cache.layer.writes")
|
||||
.description("Per-type cache writes at the layer level")
|
||||
.tag("cache", "redis")
|
||||
.tag("type", t)
|
||||
.register(meterRegistry))
|
||||
.increment();
|
||||
}
|
||||
}
|
||||
|
||||
private void setOrRegisterCoverageGauge(
|
||||
Map<String, AtomicLong> holders, String metricName, String entityType, double ratio) {
|
||||
long value = Math.round(Math.max(0.0, Math.min(1.0, ratio)) * 100.0);
|
||||
|
|
@ -228,4 +291,67 @@ public class CacheMetrics {
|
|||
double total = hits + misses;
|
||||
return total > 0 ? hits / total : 0.0;
|
||||
}
|
||||
|
||||
/**
|
||||
* Snapshot of all application-level cache counters and gauges. Intended to be merged into the
|
||||
* {@code /api/v1/system/cache/stats} response so operators can read hit/miss/latency without
|
||||
* scraping Prometheus. Distinct from the provider-side stats (which expose Redis
|
||||
* keyspace_hits/misses) — these counters track decisions made by OM read paths
|
||||
* (EntityRepository, CachedReadBundle, etc.) so a hit here means "OM avoided a DB query," not
|
||||
* "Redis returned data for some internal call."
|
||||
*/
|
||||
public Map<String, Object> snapshot() {
|
||||
Map<String, Object> snap = new LinkedHashMap<>();
|
||||
snap.put("hits", cacheHits != null ? (long) cacheHits.count() : 0L);
|
||||
snap.put("misses", cacheMisses != null ? (long) cacheMisses.count() : 0L);
|
||||
snap.put("hitRatio", getHitRatio());
|
||||
snap.put("evictions", cacheEvictions != null ? (long) cacheEvictions.count() : 0L);
|
||||
snap.put("errors", cacheErrors != null ? (long) cacheErrors.count() : 0L);
|
||||
snap.put("writes", cacheWrites != null ? (long) cacheWrites.count() : 0L);
|
||||
snap.put("size", cacheSize.get());
|
||||
Map<String, Object> warmup = new LinkedHashMap<>();
|
||||
warmup.put("entities", warmupEntities.get());
|
||||
warmup.put("relationships", warmupRelationships.get());
|
||||
warmup.put("tags", warmupTags.get());
|
||||
warmup.put("completedRuns", warmupCompletedRuns.get());
|
||||
snap.put("warmup", warmup);
|
||||
if (cacheReadLatency != null) {
|
||||
Map<String, Object> readLatency = new LinkedHashMap<>();
|
||||
readLatency.put("count", cacheReadLatency.count());
|
||||
readLatency.put(
|
||||
"totalMs", cacheReadLatency.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS));
|
||||
readLatency.put("meanMs", cacheReadLatency.mean(java.util.concurrent.TimeUnit.MILLISECONDS));
|
||||
readLatency.put("maxMs", cacheReadLatency.max(java.util.concurrent.TimeUnit.MILLISECONDS));
|
||||
snap.put("readLatency", readLatency);
|
||||
}
|
||||
if (cacheWriteLatency != null) {
|
||||
Map<String, Object> writeLatency = new LinkedHashMap<>();
|
||||
writeLatency.put("count", cacheWriteLatency.count());
|
||||
writeLatency.put(
|
||||
"totalMs", cacheWriteLatency.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS));
|
||||
writeLatency.put(
|
||||
"meanMs", cacheWriteLatency.mean(java.util.concurrent.TimeUnit.MILLISECONDS));
|
||||
writeLatency.put("maxMs", cacheWriteLatency.max(java.util.concurrent.TimeUnit.MILLISECONDS));
|
||||
snap.put("writeLatency", writeLatency);
|
||||
}
|
||||
Map<String, Map<String, Object>> byType = new LinkedHashMap<>();
|
||||
java.util.Set<String> types = new java.util.TreeSet<>();
|
||||
types.addAll(typedHits.keySet());
|
||||
types.addAll(typedMisses.keySet());
|
||||
types.addAll(typedWrites.keySet());
|
||||
for (String type : types) {
|
||||
long h = typedHits.containsKey(type) ? (long) typedHits.get(type).count() : 0L;
|
||||
long miss = typedMisses.containsKey(type) ? (long) typedMisses.get(type).count() : 0L;
|
||||
long w = typedWrites.containsKey(type) ? (long) typedWrites.get(type).count() : 0L;
|
||||
long totalLookups = h + miss;
|
||||
Map<String, Object> entry = new LinkedHashMap<>();
|
||||
entry.put("hits", h);
|
||||
entry.put("misses", miss);
|
||||
entry.put("writes", w);
|
||||
entry.put("hitRatio", totalLookups > 0 ? (double) h / totalLookups : 0.0);
|
||||
byType.put(type, entry);
|
||||
}
|
||||
snap.put("byType", byType);
|
||||
return snap;
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -56,6 +56,19 @@ public interface CacheProvider extends AutoCloseable {
|
|||
return -1L;
|
||||
}
|
||||
|
||||
/**
|
||||
* SCAN keys matching {@code pattern} and UNLINK them in batches. Returns the number of keys
|
||||
* deleted, or {@code 0} if the provider doesn't support pattern-based deletion (the default).
|
||||
*
|
||||
* <p>Like {@link #scanCount}, the wall time is O(n) over the keyspace, not over matches. Call
|
||||
* it on bounded events (entity edits, lineage edge changes) — never in a hot loop. Always use
|
||||
* a precise pattern (e.g. {@code "om:prod:lineage:graph:{abc}:*"}); avoid broad globs like
|
||||
* {@code "om:prod:*"} which would block the cluster on a large keyspace.
|
||||
*/
|
||||
default long scanDelete(String pattern) {
|
||||
return 0L;
|
||||
}
|
||||
|
||||
@Override
|
||||
void close();
|
||||
}
|
||||
|
|
|
|||
|
|
@ -33,12 +33,15 @@ public class CachedEntityDao {
|
|||
|
||||
// Try to get from cache first
|
||||
Optional<String> cached = cache.hget(cacheKey, "base");
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (cached.isPresent()) {
|
||||
LOG.debug("Cache hit for entity: {} -> {}", entityType, entityId);
|
||||
if (m != null) m.recordLayerHit(entityType);
|
||||
return cached.get();
|
||||
}
|
||||
|
||||
LOG.debug("Cache miss for entity: {} -> {}", entityType, entityId);
|
||||
if (m != null) m.recordLayerMiss(entityType);
|
||||
|
||||
// Fetch from database
|
||||
String entityJson = fetchEntityFromDatabase(entityId, entityType);
|
||||
|
|
@ -48,6 +51,7 @@ public class CachedEntityDao {
|
|||
try {
|
||||
cache.hset(
|
||||
cacheKey, Map.of("base", entityJson), Duration.ofSeconds(config.entityTtlSeconds));
|
||||
if (m != null) m.recordLayerWrite(entityType);
|
||||
LOG.debug("Cached entity: {} -> {}", entityType, entityId);
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to cache entity: {} -> {}", entityType, entityId, e);
|
||||
|
|
@ -183,7 +187,13 @@ public class CachedEntityDao {
|
|||
return Optional.empty();
|
||||
}
|
||||
String cacheKey = keys.entityByName(entityType, fqn);
|
||||
return cache.get(cacheKey);
|
||||
Optional<String> result = cache.get(cacheKey);
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) {
|
||||
if (result.isPresent()) m.recordLayerHit(entityType);
|
||||
else m.recordLayerMiss(entityType);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -194,7 +204,13 @@ public class CachedEntityDao {
|
|||
return Optional.empty();
|
||||
}
|
||||
String cacheKey = keys.entity(entityType, entityId);
|
||||
return cache.hget(cacheKey, "ref");
|
||||
Optional<String> result = cache.hget(cacheKey, "ref");
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) {
|
||||
if (result.isPresent()) m.recordLayerHit(entityType);
|
||||
else m.recordLayerMiss(entityType);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
/**
|
||||
|
|
@ -205,7 +221,13 @@ public class CachedEntityDao {
|
|||
return Optional.empty();
|
||||
}
|
||||
String cacheKey = keys.refByName(entityType, fqn);
|
||||
return cache.get(cacheKey);
|
||||
Optional<String> result = cache.get(cacheKey);
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) {
|
||||
if (result.isPresent()) m.recordLayerHit(entityType);
|
||||
else m.recordLayerMiss(entityType);
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
||||
public void invalidate(UUID entityId, String entityType) {
|
||||
|
|
|
|||
173
openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedLineage.java
vendored
Normal file
173
openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedLineage.java
vendored
Normal file
|
|
@ -0,0 +1,173 @@
|
|||
/*
|
||||
* Copyright 2026 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.openmetadata.service.cache;
|
||||
|
||||
import com.google.common.util.concurrent.Striped;
|
||||
import java.time.Duration;
|
||||
import java.util.Optional;
|
||||
import java.util.UUID;
|
||||
import java.util.concurrent.locks.Lock;
|
||||
import java.util.function.Supplier;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
|
||||
/**
|
||||
* Cache for {@code GET /api/v1/lineage/...} responses. Hybrid TTL + direct-invalidation: a short
|
||||
* TTL ({@link CacheConfig#lineageTtlSeconds}, default 60s) acts as a backstop, while explicit
|
||||
* invalidation handles the cases where staleness is most user-visible (the user just edited an
|
||||
* entity or changed a lineage edge involving the affected root).
|
||||
*
|
||||
* <p>Why not a reverse index of every entity → root that contains it? Hub entities (popular
|
||||
* tables referenced in thousands of lineage graphs) would invalidate all of them on every PATCH,
|
||||
* causing a write storm. The TTL+direct strategy gives 90% of the value at 10% of the
|
||||
* implementation cost; if production telemetry shows real staleness complaints we can upgrade
|
||||
* later — design notes in {@code .context/cache-improvements-design.md}.
|
||||
*
|
||||
* <p>Cache-off semantics: when {@link CacheConfig#provider} is {@code none} or
|
||||
* {@code lineageTtlSeconds <= 0}, {@link #enabled()} returns false. {@link #loadOrCompute} skips
|
||||
* the cache check entirely and just runs the supplier — same behavior as if this layer didn't
|
||||
* exist. No hard dependency on Redis.
|
||||
*/
|
||||
@Slf4j
|
||||
public final class CachedLineage implements Invalidatable {
|
||||
private final CacheProvider cache;
|
||||
private final CacheKeys keys;
|
||||
private final int ttlSeconds;
|
||||
private final Striped<Lock> loadLocks;
|
||||
|
||||
public CachedLineage(CacheProvider cache, CacheKeys keys, CacheConfig config) {
|
||||
this.cache = cache;
|
||||
this.keys = keys;
|
||||
this.ttlSeconds = config.lineageTtlSeconds;
|
||||
// Same striping pattern as CachedReadBundle — shares the bundle stripe count for consistency.
|
||||
this.loadLocks = Striped.lazyWeakLock(Math.max(16, config.bundleLoadLockStripes));
|
||||
}
|
||||
|
||||
public boolean enabled() {
|
||||
return ttlSeconds > 0 && cache != null && cache.available();
|
||||
}
|
||||
|
||||
/**
|
||||
* Single-flight load: cache lookup, then under a per-root stripe lock the supplier runs once
|
||||
* and the result is cached. Concurrent waiters double-check the cache after acquiring the lock
|
||||
* — the first waiter to win the race seeds the cache, the rest read it back without re-running
|
||||
* the supplier.
|
||||
*
|
||||
* <p>If the cache is disabled, this degrades to {@code supplier.get()} with no locking. That
|
||||
* matches what would happen if there were no cache layer at all — important for the
|
||||
* "cache is optional" guarantee.
|
||||
*/
|
||||
public String loadOrCompute(
|
||||
UUID rootId,
|
||||
int upstreamDepth,
|
||||
int downstreamDepth,
|
||||
boolean includeDeleted,
|
||||
Supplier<String> supplier) {
|
||||
if (!enabled()) {
|
||||
return supplier.get();
|
||||
}
|
||||
String key = keys.lineageGraph(rootId, upstreamDepth, downstreamDepth, includeDeleted);
|
||||
Optional<String> first = safeGet(key);
|
||||
if (first.isPresent()) {
|
||||
recordHit();
|
||||
return first.get();
|
||||
}
|
||||
Lock lock = loadLocks.get(rootId);
|
||||
lock.lock();
|
||||
try {
|
||||
Optional<String> recheck = safeGet(key);
|
||||
if (recheck.isPresent()) {
|
||||
recordHit();
|
||||
return recheck.get();
|
||||
}
|
||||
recordMiss();
|
||||
String fresh = supplier.get();
|
||||
safePut(key, fresh);
|
||||
return fresh;
|
||||
} finally {
|
||||
lock.unlock();
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Invalidate every cached lineage variant rooted at {@code rootId} (all depths, both
|
||||
* include-deleted flags). Called from entity mutation paths and from the
|
||||
* {@code addLineage}/{@code deleteLineage} hooks for both endpoints of the affected edge.
|
||||
*
|
||||
* <p>No-op when the cache is disabled.
|
||||
*/
|
||||
public void invalidate(UUID rootId) {
|
||||
if (!enabled() || rootId == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
long deleted = cache.scanDelete(keys.lineageGraphPattern(rootId));
|
||||
if (deleted > 0) {
|
||||
LOG.debug("Lineage cache invalidated rootId={} keys={}", rootId, deleted);
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Lineage invalidate failed for rootId={}", rootId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/** Convenience for the lineage edge mutation hooks — invalidates both endpoints. */
|
||||
public void invalidateEdge(UUID fromId, UUID toId) {
|
||||
invalidate(fromId);
|
||||
invalidate(toId);
|
||||
}
|
||||
|
||||
/**
|
||||
* {@link Invalidatable} adapter. Lineage is keyed only by entity id (type doesn't enter the key
|
||||
* because lineage relationships are between entities of any type) — so we drop everything for
|
||||
* the given id and ignore type/fqn.
|
||||
*/
|
||||
@Override
|
||||
public void invalidate(String type, UUID id, String fqn) {
|
||||
invalidate(id);
|
||||
}
|
||||
|
||||
private Optional<String> safeGet(String key) {
|
||||
try {
|
||||
return cache.get(key);
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Lineage cache get failed (treated as miss) key={}", key, e);
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
private void safePut(String key, String value) {
|
||||
if (value == null) return;
|
||||
try {
|
||||
cache.set(key, value, Duration.ofSeconds(ttlSeconds));
|
||||
recordWrite();
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Lineage cache put failed key={}", key, e);
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) m.recordError();
|
||||
}
|
||||
}
|
||||
|
||||
private static void recordHit() {
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) m.recordLayerHit("lineage");
|
||||
}
|
||||
|
||||
private static void recordMiss() {
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) m.recordLayerMiss("lineage");
|
||||
}
|
||||
|
||||
private static void recordWrite() {
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) m.recordLayerWrite("lineage");
|
||||
}
|
||||
}
|
||||
|
|
@ -55,15 +55,21 @@ public class CachedReadBundle {
|
|||
return null;
|
||||
}
|
||||
String key = keys.bundle(entityType, entityId);
|
||||
String layerType = bundleType(entityType);
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
try {
|
||||
Optional<String> json = cache.get(key);
|
||||
if (json.isEmpty()) {
|
||||
if (m != null) m.recordLayerMiss(layerType);
|
||||
return null;
|
||||
}
|
||||
return JsonUtils.readValue(json.get(), Dto.class);
|
||||
Dto dto = JsonUtils.readValue(json.get(), Dto.class);
|
||||
if (m != null) m.recordLayerHit(layerType);
|
||||
return dto;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Bad bundle cache entry, evicting: {} {}", entityType, entityId, e);
|
||||
cache.del(key);
|
||||
if (m != null) m.recordError();
|
||||
return null;
|
||||
}
|
||||
}
|
||||
|
|
@ -76,11 +82,23 @@ public class CachedReadBundle {
|
|||
try {
|
||||
String json = JsonUtils.pojoToJson(dto);
|
||||
cache.set(key, json, Duration.ofSeconds(config.entityTtlSeconds));
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) m.recordLayerWrite(bundleType(entityType));
|
||||
} catch (Exception e) {
|
||||
LOG.warn("Failed to cache read bundle: {} {}", entityType, entityId, e);
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Tag the bundle layer's per-type counters with a {@code bundle:<entityType>} prefix so they
|
||||
* sort separately from the entity-cache counters in {@code /cache/stats}. Without the prefix a
|
||||
* bundle hit on table and an entity hit on table would merge — operators couldn't tell which
|
||||
* layer is doing the work.
|
||||
*/
|
||||
private static String bundleType(String entityType) {
|
||||
return "bundle:" + entityType;
|
||||
}
|
||||
|
||||
public void invalidate(String entityType, UUID entityId) {
|
||||
if (EntityCacheBypass.isSkipped()) {
|
||||
return;
|
||||
|
|
|
|||
171
openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedSearchLayer.java
vendored
Normal file
171
openmetadata-service/src/main/java/org/openmetadata/service/cache/CachedSearchLayer.java
vendored
Normal file
|
|
@ -0,0 +1,171 @@
|
|||
/*
|
||||
* Copyright 2026 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.openmetadata.service.cache;
|
||||
|
||||
import java.security.MessageDigest;
|
||||
import java.security.NoSuchAlgorithmException;
|
||||
import java.time.Duration;
|
||||
import java.util.List;
|
||||
import java.util.Optional;
|
||||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.openmetadata.schema.search.SearchRequest;
|
||||
|
||||
/**
|
||||
* Auth-aware response cache for {@code GET /api/v1/search/query}. Keys include the principal so
|
||||
* users with different ACLs do not see each other's filtered results. Cache key is
|
||||
* {@code om:<keyspace>:search:<sha256-hex>} where the SHA-256 input is the concatenation of every
|
||||
* field that affects the result set, plus the principal name. TTL is short
|
||||
* ({@link CacheConfig#searchTtlSeconds}, default 30s) — search is approximate and cache misses
|
||||
* after writes self-heal within the TTL window.
|
||||
*
|
||||
* <p>Distinct from {@link CachedReadBundle}: that cache stores entity bundles by id; this one
|
||||
* stores the entire ES/OS response body for a specific (query, principal) tuple. Search itself
|
||||
* doesn't touch Redis without this layer — see plan Item 1 / cache-perf-findings.md.
|
||||
*/
|
||||
@Slf4j
|
||||
public final class CachedSearchLayer {
|
||||
private final CacheProvider cache;
|
||||
private final String keyPrefix;
|
||||
private final int ttlSeconds;
|
||||
|
||||
public CachedSearchLayer(CacheProvider cache, CacheKeys keys, CacheConfig config) {
|
||||
this.cache = cache;
|
||||
this.keyPrefix = keys.search();
|
||||
this.ttlSeconds = config.searchTtlSeconds;
|
||||
}
|
||||
|
||||
public boolean enabled() {
|
||||
return ttlSeconds > 0 && cache != null && cache.available();
|
||||
}
|
||||
|
||||
public Optional<String> get(SearchRequest request, String principalName) {
|
||||
if (!enabled()) {
|
||||
return Optional.empty();
|
||||
}
|
||||
try {
|
||||
String key = buildKey(request, principalName);
|
||||
// The provider records its own untagged hit/miss; here we record a *layer-typed* one
|
||||
// so /cache/stats can show a per-category hitRatio for "search". Don't bump the
|
||||
// aggregate counter — the provider's get() already did.
|
||||
Optional<String> hit = cache.get(key);
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) {
|
||||
if (hit.isPresent()) {
|
||||
m.recordLayerHit("search");
|
||||
} else {
|
||||
m.recordLayerMiss("search");
|
||||
}
|
||||
}
|
||||
return hit;
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Search cache get failed (treated as miss)", e);
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) {
|
||||
m.recordError();
|
||||
}
|
||||
return Optional.empty();
|
||||
}
|
||||
}
|
||||
|
||||
public void put(SearchRequest request, String principalName, String responseJson) {
|
||||
if (!enabled() || responseJson == null) {
|
||||
return;
|
||||
}
|
||||
try {
|
||||
String key = buildKey(request, principalName);
|
||||
cache.set(key, responseJson, Duration.ofSeconds(ttlSeconds));
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) {
|
||||
m.recordLayerWrite("search");
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.debug("Search cache put failed (cache miss next time)", e);
|
||||
CacheMetrics m = CacheMetrics.getInstance();
|
||||
if (m != null) {
|
||||
m.recordError();
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
/**
|
||||
* Build a deterministic cache key from every SearchRequest field that affects the result set,
|
||||
* plus the principal name. Order matters — we serialize fields in a fixed order so the same
|
||||
* logical request maps to the same key regardless of object construction order.
|
||||
*/
|
||||
String buildKey(SearchRequest request, String principalName) {
|
||||
StringBuilder sb = new StringBuilder(512);
|
||||
sb.append("p=").append(safe(principalName)).append('|');
|
||||
sb.append("idx=").append(safe(request.getIndex())).append('|');
|
||||
sb.append("q=").append(safe(request.getQuery())).append('|');
|
||||
sb.append("from=").append(request.getFrom()).append('|');
|
||||
sb.append("size=").append(request.getSize()).append('|');
|
||||
sb.append("qf=").append(safe(request.getQueryFilter())).append('|');
|
||||
sb.append("pf=").append(safe(request.getPostFilter())).append('|');
|
||||
sb.append("sf=").append(safe(request.getSortFieldParam())).append('|');
|
||||
sb.append("so=").append(safe(request.getSortOrder())).append('|');
|
||||
sb.append("fs=").append(request.getFetchSource()).append('|');
|
||||
sb.append("inc=").append(joinList(request.getIncludeSourceFields())).append('|');
|
||||
sb.append("exc=").append(joinList(request.getExcludeSourceFields())).append('|');
|
||||
sb.append("d=").append(request.getDeleted()).append('|');
|
||||
sb.append("h=").append(request.getIsHierarchy()).append('|');
|
||||
sb.append("ag=").append(request.getIncludeAggregations()).append('|');
|
||||
sb.append("ex=").append(request.getExplain()).append('|');
|
||||
sb.append("tt=").append(request.getTrackTotalHits()).append('|');
|
||||
sb.append("dom=").append(domainsKey(request)).append('|');
|
||||
sb.append("adf=").append(request.getApplyDomainFilter()).append('|');
|
||||
sb.append("sa=").append(safe(searchAfterKey(request)));
|
||||
return keyPrefix + ":" + sha256Hex(sb.toString());
|
||||
}
|
||||
|
||||
private static String safe(Object o) {
|
||||
return o == null ? "" : o.toString();
|
||||
}
|
||||
|
||||
private static String joinList(List<?> list) {
|
||||
return list == null || list.isEmpty()
|
||||
? ""
|
||||
: String.join(",", list.stream().map(Object::toString).toList());
|
||||
}
|
||||
|
||||
private static String domainsKey(SearchRequest request) {
|
||||
if (request.getDomains() == null || request.getDomains().isEmpty()) {
|
||||
return "";
|
||||
}
|
||||
StringBuilder sb = new StringBuilder();
|
||||
for (var ref : request.getDomains()) {
|
||||
if (ref != null && ref.getId() != null) {
|
||||
sb.append(ref.getId()).append(',');
|
||||
}
|
||||
}
|
||||
return sb.toString();
|
||||
}
|
||||
|
||||
private static String searchAfterKey(SearchRequest request) {
|
||||
var sa = request.getSearchAfter();
|
||||
return sa == null ? "" : sa.toString();
|
||||
}
|
||||
|
||||
private static String sha256Hex(String input) {
|
||||
try {
|
||||
MessageDigest md = MessageDigest.getInstance("SHA-256");
|
||||
byte[] hash = md.digest(input.getBytes(java.nio.charset.StandardCharsets.UTF_8));
|
||||
StringBuilder hex = new StringBuilder(hash.length * 2);
|
||||
for (byte b : hash) {
|
||||
hex.append(String.format("%02x", b));
|
||||
}
|
||||
return hex.toString();
|
||||
} catch (NoSuchAlgorithmException e) {
|
||||
throw new IllegalStateException("SHA-256 not available", e);
|
||||
}
|
||||
}
|
||||
}
|
||||
43
openmetadata-service/src/main/java/org/openmetadata/service/cache/Invalidatable.java
vendored
Normal file
43
openmetadata-service/src/main/java/org/openmetadata/service/cache/Invalidatable.java
vendored
Normal file
|
|
@ -0,0 +1,43 @@
|
|||
/*
|
||||
* Copyright 2026 Collate
|
||||
* Licensed under the Apache License, Version 2.0 (the "License");
|
||||
* you may not use this file except in compliance with the License.
|
||||
* You may obtain a copy of the License at
|
||||
* http://www.apache.org/licenses/LICENSE-2.0
|
||||
* Unless required by applicable law or agreed to in writing, software
|
||||
* distributed under the License is distributed on an "AS IS" BASIS,
|
||||
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
|
||||
* See the License for the specific language governing permissions and
|
||||
* limitations under the License.
|
||||
*/
|
||||
package org.openmetadata.service.cache;
|
||||
|
||||
import java.util.UUID;
|
||||
|
||||
/**
|
||||
* Contract for any cache layer that holds entity-keyed data and needs to drop entries when the
|
||||
* entity is mutated. Registered with {@link CacheBundle#registerInvalidatable(Invalidatable)} so
|
||||
* the central remote-invalidation handler and {@link CacheBundle#invalidateEntity} can fan a
|
||||
* single (type, id, fqn) tuple out to every registered layer.
|
||||
*
|
||||
* <p>Adding a new cache layer? Implement this interface, call {@code registerInvalidatable} in
|
||||
* {@link CacheBundle#run}. The compiler enforces that you didn't forget — the registration is
|
||||
* trivial code, but missing it means the layer silently serves stale data after writes.
|
||||
*
|
||||
* <p>Implementations must be safe to call when the cache is disabled. The contract is "do
|
||||
* nothing if you're not actually holding the data"; never throw on a no-op invalidate.
|
||||
*/
|
||||
public interface Invalidatable {
|
||||
|
||||
/**
|
||||
* Drop every cached entry that may be affected by a write to the entity identified by
|
||||
* {@code (type, id, fqn)}. Either {@code id} or {@code fqn} may be null when the writer doesn't
|
||||
* have both; implementations should drop what they can.
|
||||
*
|
||||
* <p>Called on the local pod synchronously from {@code EntityRepository.postUpdate /
|
||||
* postDelete / restoreEntity}, and on remote pods from the {@code CacheInvalidationPubSub}
|
||||
* subscriber. Both paths are best-effort; an exception here is logged and swallowed at the
|
||||
* caller — never let a cache hiccup take down a write path.
|
||||
*/
|
||||
void invalidate(String type, UUID id, String fqn);
|
||||
}
|
||||
|
|
@ -561,6 +561,39 @@ public class RedisCacheProvider implements CacheProvider {
|
|||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public long scanDelete(String pattern) {
|
||||
if (!available || pattern == null || pattern.isEmpty()) {
|
||||
return 0L;
|
||||
}
|
||||
long deleted = 0L;
|
||||
try {
|
||||
io.lettuce.core.ScanArgs args = io.lettuce.core.ScanArgs.Builder.matches(pattern).limit(500);
|
||||
io.lettuce.core.KeyScanCursor<String> cursor = syncCommands.scan(args);
|
||||
while (true) {
|
||||
java.util.List<String> keys = cursor.getKeys();
|
||||
if (!keys.isEmpty()) {
|
||||
// UNLINK is async-delete on the Redis side — same effect as DEL but doesn't block the
|
||||
// event loop on large value reclamation. Falls back to DEL on Redis < 4.0, which we do
|
||||
// not target.
|
||||
deleted += syncCommands.unlink(keys.toArray(new String[0]));
|
||||
CacheMetrics m = metrics();
|
||||
if (m != null) {
|
||||
for (int i = 0; i < keys.size(); i++) m.recordEviction();
|
||||
}
|
||||
}
|
||||
if (cursor.isFinished()) break;
|
||||
cursor = syncCommands.scan(cursor, args);
|
||||
}
|
||||
return deleted;
|
||||
} catch (Exception e) {
|
||||
LOG.warn("scanDelete failed for pattern={}", pattern, e);
|
||||
CacheMetrics m = metrics();
|
||||
if (m != null) m.recordError();
|
||||
return deleted;
|
||||
}
|
||||
}
|
||||
|
||||
@Override
|
||||
public void close() {
|
||||
try {
|
||||
|
|
|
|||
|
|
@ -2898,6 +2898,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||
if (cachedReadBundle != null) {
|
||||
cachedReadBundle.invalidate(entityType, row.id);
|
||||
}
|
||||
var cachedLineage = CacheBundle.getCachedLineage();
|
||||
if (cachedLineage != null) {
|
||||
cachedLineage.invalidate(row.id);
|
||||
}
|
||||
if (pubsub != null) {
|
||||
pubsub.publish(entityType, row.id, row.fqn, "rename-cascade");
|
||||
}
|
||||
|
|
@ -2955,6 +2959,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||
if (cachedReadBundle != null) {
|
||||
cachedReadBundle.invalidate(entityType, id);
|
||||
}
|
||||
var cachedLineage = CacheBundle.getCachedLineage();
|
||||
if (cachedLineage != null) {
|
||||
cachedLineage.invalidate(id);
|
||||
}
|
||||
var pubsub = CacheBundle.getCacheInvalidationPubSub();
|
||||
if (pubsub != null) {
|
||||
pubsub.publish(entityType, id, fqn, "ref-change");
|
||||
|
|
@ -3138,6 +3146,13 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||
cachedReadBundle.invalidate(entityType, entity.getId());
|
||||
}
|
||||
|
||||
// Invalidate cached lineage rooted at this entity. Transitive changes (entity X is a node
|
||||
// in someone else's cached graph) fall through to the 60s TTL — see CachedLineage doc.
|
||||
var cachedLineage = CacheBundle.getCachedLineage();
|
||||
if (cachedLineage != null) {
|
||||
cachedLineage.invalidate(entity.getId());
|
||||
}
|
||||
|
||||
// Invalidate tag caches
|
||||
var cachedTagUsageDao = CacheBundle.getCachedTagUsageDao();
|
||||
if (cachedTagUsageDao != null) {
|
||||
|
|
@ -8595,6 +8610,10 @@ public abstract class EntityRepository<T extends EntityInterface> {
|
|||
if (cachedReadBundle != null) {
|
||||
cachedReadBundle.invalidate(entityType, id);
|
||||
}
|
||||
var cachedLineage = CacheBundle.getCachedLineage();
|
||||
if (cachedLineage != null) {
|
||||
cachedLineage.invalidate(id);
|
||||
}
|
||||
|
||||
// Synchronous repopulate: the write path holds the request thread until Redis is updated,
|
||||
// so the next GET on this instance can't race an in-flight async repopulate.
|
||||
|
|
|
|||
|
|
@ -181,6 +181,14 @@ public class LineageRepository {
|
|||
detailsJson);
|
||||
addLineageToSearch(from, to, lineageDetails);
|
||||
|
||||
// Direct invalidation of cached lineage rooted at either endpoint of the new edge.
|
||||
// Other roots that transitively contain these endpoints fall through to the TTL backstop —
|
||||
// see CachedLineage class doc for the design rationale.
|
||||
var cachedLineage = org.openmetadata.service.cache.CacheBundle.getCachedLineage();
|
||||
if (cachedLineage != null) {
|
||||
cachedLineage.invalidateEdge(from.getId(), to.getId());
|
||||
}
|
||||
|
||||
// Add lineage to RDF
|
||||
if (RdfUpdater.isEnabled()) {
|
||||
EntityRelationship lineageRelationship =
|
||||
|
|
@ -1048,6 +1056,11 @@ public class LineageRepository {
|
|||
|
||||
if (result) {
|
||||
cleanUpExtendedLineage(from, to, lineageDetails);
|
||||
// Direct invalidation of cached lineage rooted at either endpoint of the removed edge.
|
||||
var cachedLineage = org.openmetadata.service.cache.CacheBundle.getCachedLineage();
|
||||
if (cachedLineage != null) {
|
||||
cachedLineage.invalidateEdge(from.getId(), to.getId());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
@ -1118,6 +1131,11 @@ public class LineageRepository {
|
|||
|
||||
if (result) {
|
||||
cleanUpExtendedLineage(from, to, lineageDetails);
|
||||
// Direct invalidation of cached lineage rooted at either endpoint of the removed edge.
|
||||
var cachedLineage = org.openmetadata.service.cache.CacheBundle.getCachedLineage();
|
||||
if (cachedLineage != null) {
|
||||
cachedLineage.invalidateEdge(from.getId(), to.getId());
|
||||
}
|
||||
}
|
||||
return result;
|
||||
}
|
||||
|
|
@ -1308,6 +1326,27 @@ public class LineageRepository {
|
|||
|
||||
private EntityLineage getLineage(
|
||||
EntityReference primary, int upstreamDepth, int downstreamDepth) {
|
||||
// Wrap the (multi-second) lineage computation in the optional Redis cache. The cache layer
|
||||
// is no-op when CACHE_PROVIDER=none — this method then behaves exactly as it did before the
|
||||
// layer existed. See CachedLineage class doc for the TTL+direct-invalidation strategy.
|
||||
var cachedLineage = org.openmetadata.service.cache.CacheBundle.getCachedLineage();
|
||||
if (cachedLineage == null || !cachedLineage.enabled()) {
|
||||
return computeLineage(primary, upstreamDepth, downstreamDepth);
|
||||
}
|
||||
String json =
|
||||
cachedLineage.loadOrCompute(
|
||||
primary.getId(),
|
||||
upstreamDepth,
|
||||
downstreamDepth,
|
||||
false /* includeDeleted */,
|
||||
() ->
|
||||
org.openmetadata.schema.utils.JsonUtils.pojoToJson(
|
||||
computeLineage(primary, upstreamDepth, downstreamDepth)));
|
||||
return org.openmetadata.schema.utils.JsonUtils.readValue(json, EntityLineage.class);
|
||||
}
|
||||
|
||||
private EntityLineage computeLineage(
|
||||
EntityReference primary, int upstreamDepth, int downstreamDepth) {
|
||||
List<EntityReference> entities = new ArrayList<>();
|
||||
EntityLineage lineage =
|
||||
new EntityLineage()
|
||||
|
|
|
|||
|
|
@ -254,7 +254,24 @@ public class SearchResource {
|
|||
.withSearchAfter(SearchUtils.searchAfter(searchAfter))
|
||||
.withExplain(explain)
|
||||
.withIncludeAggregations(includeAggregations);
|
||||
return searchRepository.search(request, subjectContext);
|
||||
|
||||
// Auth-aware response cache (Item 1). Bots bypass — they do bulk indexing reads with
|
||||
// cardinalities that would pollute the user-keyed cache.
|
||||
org.openmetadata.service.cache.CachedSearchLayer searchCache =
|
||||
org.openmetadata.service.cache.CacheBundle.getCachedSearchLayer();
|
||||
String principal = subjectContext.user() != null ? subjectContext.user().getName() : null;
|
||||
boolean cacheable = searchCache != null && searchCache.enabled() && !subjectContext.isBot();
|
||||
if (cacheable) {
|
||||
java.util.Optional<String> cached = searchCache.get(request, principal);
|
||||
if (cached.isPresent()) {
|
||||
return Response.ok(cached.get(), MediaType.APPLICATION_JSON_TYPE).build();
|
||||
}
|
||||
}
|
||||
Response upstream = searchRepository.search(request, subjectContext);
|
||||
if (cacheable && upstream.getStatus() == 200 && upstream.getEntity() instanceof String body) {
|
||||
searchCache.put(request, principal, body);
|
||||
}
|
||||
return upstream;
|
||||
}
|
||||
|
||||
@GET
|
||||
|
|
|
|||
|
|
@ -954,6 +954,11 @@ public class SystemResource {
|
|||
authorizer.authorizeAdmin(securityContext);
|
||||
|
||||
Map<String, Object> stats = CacheBundle.getCacheProvider().getStats();
|
||||
org.openmetadata.service.cache.CacheMetrics metrics =
|
||||
org.openmetadata.service.cache.CacheMetrics.getInstance();
|
||||
if (metrics != null) {
|
||||
stats.put("metrics", metrics.snapshot());
|
||||
}
|
||||
return Response.ok(stats).build();
|
||||
}
|
||||
|
||||
|
|
|
|||
Loading…
Reference in a new issue