Improve Slow request metric calculation; Add bulkSync config to fine-tune (#25275)

* Improve Slow request metric calculation; Add bulkSync config to fine-tune

* Add clear metric instrumentation for bulk operations

* Address gitar comments
This commit is contained in:
Sriharsha Chintalapani 2026-01-15 14:41:52 -08:00 committed by GitHub
parent faf38d4bb3
commit f81bb04fa2
No known key found for this signature in database
GPG key ID: B5690EEEBB952194
23 changed files with 2216 additions and 217 deletions

View file

@ -575,6 +575,17 @@ limits:
className: ${LIMITS_CLASS_NAME:-"org.openmetadata.service.limits.DefaultLimits"}
limitsConfigFile: ${LIMITS_CONFIG_FILE:-""}
# Bulk Operation Configuration
# Controls parallelism and resource usage for bulk API operations (e.g., bulk import/export)
# Uses a bounded thread pool to prevent connection pool exhaustion
bulkOperation:
# Max threads for bulk operations (recommendations: 2 vCore=5-8, 4 vCore=8-15, 8 vCore=15-25)
maxThreads: ${BULK_OPERATION_MAX_THREADS:-10}
# Max queued operations before rejection (returns 503)
queueSize: ${BULK_OPERATION_QUEUE_SIZE:-1000}
# Timeout in seconds for entire bulk operation
timeoutSeconds: ${BULK_OPERATION_TIMEOUT_SECONDS:-300}
web:
uriPath: ${WEB_CONF_URI_PATH:-"/api"}
hsts:

View file

@ -97,6 +97,7 @@ import org.openmetadata.service.exception.JsonMappingExceptionMapper;
import org.openmetadata.service.exception.OMErrorPageHandler;
import org.openmetadata.service.fernet.Fernet;
import org.openmetadata.service.governance.workflows.WorkflowHandler;
import org.openmetadata.service.jdbi3.BulkExecutor;
import org.openmetadata.service.jdbi3.CollectionDAO;
import org.openmetadata.service.jdbi3.EntityRepository;
import org.openmetadata.service.jdbi3.MigrationDAO;
@ -241,6 +242,9 @@ public class OpenMetadataApplication extends Application<OpenMetadataApplication
Entity.setJobDAO(jdbi.onDemand(JobDAO.class));
Entity.setJdbi(jdbi);
// Initialize bulk operation executor for bounded concurrent processing
BulkExecutor.initialize(catalogConfig.getBulkOperationConfiguration());
initializeSearchRepository(catalogConfig);
// Initialize the MigrationValidationClient, used in the Settings Repository
MigrationValidationClient.initialize(jdbi.onDemand(MigrationDAO.class), catalogConfig);

View file

@ -37,6 +37,7 @@ import org.openmetadata.schema.security.scim.ScimConfiguration;
import org.openmetadata.schema.security.secrets.SecretsManagerConfiguration;
import org.openmetadata.schema.service.configuration.elasticsearch.ElasticSearchConfiguration;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.service.config.BulkOperationConfiguration;
import org.openmetadata.service.config.OMWebConfiguration;
import org.openmetadata.service.config.ObjectStorageConfiguration;
import org.openmetadata.service.jdbi3.HikariCPDataSourceFactory;
@ -165,6 +166,17 @@ public class OpenMetadataApplicationConfig extends Configuration {
return cacheConfig;
}
@JsonProperty("bulkOperation")
@Valid
private BulkOperationConfiguration bulkOperationConfiguration;
public BulkOperationConfiguration getBulkOperationConfiguration() {
if (bulkOperationConfiguration == null) {
bulkOperationConfiguration = new BulkOperationConfiguration();
}
return bulkOperationConfiguration;
}
public String getApiRootPath() {
if (!(getServerFactory() instanceof DefaultServerFactory serverFactory)) {
return "";

View file

@ -535,7 +535,7 @@ public class CacheWarmupApp extends AbstractNativeApplication {
jobStats.getFailedRecords());
if (currentThroughput > 0) {
LOG.info("Average throughput: {:.1f} entities/sec", currentThroughput);
LOG.info("Average throughput: {} entities/sec", String.format("%.1f", currentThroughput));
}
}
}

View file

@ -0,0 +1,89 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.config;
import com.fasterxml.jackson.annotation.JsonProperty;
import jakarta.validation.constraints.Max;
import jakarta.validation.constraints.Min;
import lombok.Getter;
import lombok.Setter;
/**
* Configuration for bulk operations that controls parallelism and resource usage.
*
* <p>This uses a bounded thread pool to limit concurrent database operations, preventing bulk
* operations from exhausting the connection pool and starving regular API requests.
*
* <p>Environment variables (for Helm/Docker configuration):
*
* <ul>
* <li>BULK_OPERATION_MAX_THREADS - Maximum threads for bulk operations
* <li>BULK_OPERATION_QUEUE_SIZE - Maximum queued operations before rejection
* <li>BULK_OPERATION_TIMEOUT_SECONDS - Timeout for entire bulk operation
* </ul>
*
* <p>Example YAML configuration:
*
* <pre>
* bulkOperation:
* maxThreads: ${BULK_OPERATION_MAX_THREADS:-10}
* queueSize: ${BULK_OPERATION_QUEUE_SIZE:-1000}
* timeoutSeconds: ${BULK_OPERATION_TIMEOUT_SECONDS:-300}
* </pre>
*/
@Getter
@Setter
public class BulkOperationConfiguration {
/**
* Maximum number of threads for bulk operation processing. This directly controls how many
* concurrent database operations can occur during bulk processing.
*
* <p>Recommendations based on DB capacity:
*
* <ul>
* <li>2 vCore DB: 5-8
* <li>4 vCore DB: 8-15
* <li>8 vCore DB: 15-25
* <li>16+ vCore DB: 25-50
* </ul>
*
* <p>Default: 10 (conservative, works for most deployments)
*/
@JsonProperty
@Min(1)
@Max(100)
private int maxThreads = 10;
/**
* Maximum number of operations that can be queued waiting for a thread. When the queue is full,
* new bulk requests will be rejected with 503 Service Unavailable.
*
* <p>Default: 1000 (allows bursts while preventing memory exhaustion)
*/
@JsonProperty
@Min(100)
@Max(10000)
private int queueSize = 1000;
/**
* Timeout in seconds for the entire bulk operation. If the operation doesn't complete within this
* time, it will be cancelled and return partial results.
*
* <p>Default: 300 seconds (5 minutes)
*/
@JsonProperty
@Min(30)
@Max(3600)
private int timeoutSeconds = 300;
}

View file

@ -0,0 +1,223 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.jdbi3;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.ThreadPoolExecutor;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
import org.openmetadata.service.config.BulkOperationConfiguration;
/**
* Manages a bounded thread pool for bulk operations. This provides natural backpressure and
* prevents bulk operations from overwhelming the database connection pool.
*
* <p>Key design decisions:
*
* <ul>
* <li>Bounded thread pool limits concurrent DB operations directly
* <li>Bounded queue prevents memory exhaustion from too many pending tasks
* <li>Rejected execution throws exception so caller can return 503
* <li>Virtual threads are NOT used here because we need bounded concurrency
* </ul>
*/
@Slf4j
public class BulkExecutor {
private static volatile BulkExecutor instance;
private static final Object LOCK = new Object();
private static final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false);
@Getter private final ExecutorService executor;
@Getter private final int maxThreads;
@Getter private final int queueSize;
@Getter private final int timeoutSeconds;
private volatile boolean isShutdown = false;
private BulkExecutor(BulkOperationConfiguration config) {
this.maxThreads = config.getMaxThreads();
this.queueSize = config.getQueueSize();
this.timeoutSeconds = config.getTimeoutSeconds();
// Create a bounded thread pool with a bounded queue
// When queue is full, new submissions will throw RejectedExecutionException
this.executor =
new ThreadPoolExecutor(
maxThreads, // core pool size
maxThreads, // max pool size (same as core for predictable behavior)
60L,
TimeUnit.SECONDS, // idle thread timeout
new ArrayBlockingQueue<>(queueSize), // bounded queue
r -> {
Thread t = new Thread(r, "bulk-operation-worker");
t.setDaemon(true);
return t;
},
new ThreadPoolExecutor.AbortPolicy() // throw on rejection
);
LOG.info(
"BulkExecutor initialized: maxThreads={}, queueSize={}, timeoutSeconds={}",
maxThreads,
queueSize,
timeoutSeconds);
registerShutdownHook();
}
private static void registerShutdownHook() {
if (shutdownHookRegistered.compareAndSet(false, true)) {
Runtime.getRuntime()
.addShutdownHook(
new Thread(
() -> {
LOG.info("JVM shutdown detected, shutting down BulkExecutor gracefully...");
if (instance != null && !instance.isShutdown) {
instance.shutdown();
}
},
"bulk-executor-shutdown-hook"));
}
}
/** Initialize with configuration. Should be called during application startup. */
public static void initialize(BulkOperationConfiguration config) {
if (instance == null) {
synchronized (LOCK) {
if (instance == null) {
instance = new BulkExecutor(config);
}
}
}
}
/** Get the singleton instance. Creates with defaults if not initialized. */
public static BulkExecutor getInstance() {
if (instance == null) {
synchronized (LOCK) {
if (instance == null) {
LOG.warn("BulkExecutor not initialized, using defaults");
instance = new BulkExecutor(new BulkOperationConfiguration());
}
}
}
return instance;
}
/** Reset the singleton (for testing). */
public static void reset() {
synchronized (LOCK) {
if (instance != null) {
instance.isShutdown = true;
instance.executor.shutdownNow();
instance = null;
}
}
}
/**
* Check if the executor can accept more work.
*
* @return true if queue has capacity
*/
public boolean hasCapacity() {
if (executor instanceof ThreadPoolExecutor tpe) {
return tpe.getQueue().remainingCapacity() > 0;
}
return true;
}
/**
* Get current queue depth.
*
* @return number of tasks waiting in queue
*/
public int getQueueDepth() {
if (executor instanceof ThreadPoolExecutor tpe) {
return tpe.getQueue().size();
}
return 0;
}
/**
* Get number of currently active threads.
*
* @return active thread count
*/
public int getActiveCount() {
if (executor instanceof ThreadPoolExecutor tpe) {
return tpe.getActiveCount();
}
return 0;
}
/**
* Submit a task for execution. Throws RejectedExecutionException if queue is full.
*
* @param task the task to execute
* @throws RejectedExecutionException if the queue is full
*/
public void submit(Runnable task) throws RejectedExecutionException {
if (isShutdown) {
throw new RejectedExecutionException("BulkExecutor is shut down");
}
executor.execute(task);
}
/**
* Submit a task for execution and return a Future that can be used to cancel the task.
*
* @param task the task to execute
* @return a Future representing the pending task
* @throws RejectedExecutionException if the queue is full
*/
public Future<?> submitWithFuture(Runnable task) throws RejectedExecutionException {
if (isShutdown) {
throw new RejectedExecutionException("BulkExecutor is shut down");
}
return executor.submit(task);
}
/** Check if the executor has been shut down. */
public boolean isShutdown() {
return isShutdown;
}
/** Graceful shutdown with configurable timeout. */
public void shutdown() {
if (isShutdown) {
return;
}
isShutdown = true;
LOG.info("Shutting down BulkExecutor, waiting for pending tasks to complete...");
executor.shutdown();
try {
if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) {
LOG.warn(
"BulkExecutor did not terminate within {} seconds, forcing shutdown", timeoutSeconds);
executor.shutdownNow();
} else {
LOG.info("BulkExecutor shut down gracefully");
}
} catch (InterruptedException e) {
LOG.warn("BulkExecutor shutdown interrupted, forcing shutdown");
executor.shutdownNow();
Thread.currentThread().interrupt();
}
}
}

View file

@ -93,6 +93,9 @@ import com.google.common.util.concurrent.UncheckedExecutionException;
import com.google.gson.Gson;
import com.networknt.schema.Error;
import com.networknt.schema.Schema;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import jakarta.json.JsonPatch;
import jakarta.validation.ConstraintViolationException;
import jakarta.validation.constraints.NotNull;
@ -126,8 +129,6 @@ import java.util.UUID;
import java.util.concurrent.CompletableFuture;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.ExecutorService;
import java.util.concurrent.Executors;
import java.util.concurrent.TimeUnit;
import java.util.function.BiConsumer;
import java.util.function.BiPredicate;
@ -208,6 +209,7 @@ import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow;
import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext;
import org.openmetadata.service.jobs.JobDAO;
import org.openmetadata.service.lock.HierarchicalLockManager;
import org.openmetadata.service.monitoring.RequestLatencyContext;
import org.openmetadata.service.rdf.RdfUpdater;
import org.openmetadata.service.resources.tags.TagLabelUtil;
import org.openmetadata.service.resources.teams.RoleResource;
@ -6700,15 +6702,21 @@ public abstract class EntityRepository<T extends EntityInterface> {
return findByNameOrNull(entity.getFullyQualifiedName(), Include.ALL) != null;
}
private static final ExecutorService BULK_PROCESSING_EXECUTOR =
Executors.newVirtualThreadPerTaskExecutor();
private static final ExecutorService BOUNDED_BULK_EXECUTOR =
Executors.newFixedThreadPool(20, Thread.ofVirtual().factory());
private static final ConcurrentHashMap<String, CompletableFuture<BulkOperationResult>> BULK_JOBS =
new ConcurrentHashMap<>();
// Cached metrics to avoid Timer.builder overhead on every call
private static final ConcurrentHashMap<String, Timer> ENTITY_LATENCY_TIMERS =
new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Timer> ENTITY_QUEUE_WAIT_TIMERS =
new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Timer> BULK_OPERATION_TIMERS =
new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, DistributionSummary> BATCH_SIZE_SUMMARIES =
new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, DistributionSummary> SUCCESS_RATE_SUMMARIES =
new ConcurrentHashMap<>();
public CompletableFuture<BulkOperationResult> submitAsyncBulkOperation(
UriInfo uriInfo, List<T> entities, String userName) {
@ -6716,6 +6724,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
LOG.info(
"Submitting async bulk operation with jobId: {} for {} entities", jobId, entities.size());
// Use BulkExecutor for async operations too
CompletableFuture<BulkOperationResult> job =
CompletableFuture.supplyAsync(
() -> {
@ -6730,7 +6739,7 @@ public abstract class EntityRepository<T extends EntityInterface> {
return errorResult;
}
},
BULK_PROCESSING_EXECUTOR);
BulkExecutor.getInstance().getExecutor());
BULK_JOBS.put(jobId, job);
@ -6752,14 +6761,24 @@ public abstract class EntityRepository<T extends EntityInterface> {
List<BulkResponse> successRequests = new ArrayList<>();
List<BulkResponse> failedRequests = new ArrayList<>();
long bulkStartTime = System.nanoTime();
List<Long> entityLatenciesNanos = new ArrayList<>();
for (T entity : entities) {
long entityStartTime = System.nanoTime();
try {
createOrUpdate(uriInfo, entity, userName);
long entityDuration = System.nanoTime() - entityStartTime;
entityLatenciesNanos.add(entityDuration);
recordEntityMetrics(entityType, entityDuration, 0, true);
successRequests.add(
new BulkResponse()
.withRequest(entity.getFullyQualifiedName())
.withStatus(Status.OK.getStatusCode()));
} catch (Exception e) {
long entityDuration = System.nanoTime() - entityStartTime;
entityLatenciesNanos.add(entityDuration);
recordEntityMetrics(entityType, entityDuration, 0, false);
LOG.warn("Failed to process entity in bulk operation", e);
failedRequests.add(
new BulkResponse()
@ -6769,21 +6788,44 @@ public abstract class EntityRepository<T extends EntityInterface> {
}
}
long totalDurationNanos = System.nanoTime() - bulkStartTime;
result.setNumberOfRowsProcessed(entities.size());
result.setNumberOfRowsPassed(successRequests.size());
result.setNumberOfRowsFailed(failedRequests.size());
result.setSuccessRequest(successRequests);
result.setFailedRequest(failedRequests);
if (failedRequests.size() > 0) {
result.setStatus(ApiStatus.PARTIAL_SUCCESS);
if (!failedRequests.isEmpty()) {
result.setStatus(successRequests.isEmpty() ? ApiStatus.FAILURE : ApiStatus.PARTIAL_SUCCESS);
}
// Calculate metrics
long avgEntityLatencyMs = 0;
long maxEntityLatencyMs = 0;
if (!entityLatenciesNanos.isEmpty()) {
avgEntityLatencyMs =
entityLatenciesNanos.stream().mapToLong(Long::longValue).sum()
/ entityLatenciesNanos.size()
/ 1_000_000;
maxEntityLatencyMs =
entityLatenciesNanos.stream().mapToLong(Long::longValue).max().orElse(0) / 1_000_000;
}
recordBulkMetrics(
entityType,
entities.size(),
successRequests.size(),
totalDurationNanos,
avgEntityLatencyMs,
maxEntityLatencyMs);
LOG.info(
"Async bulk operation completed: {} succeeded, {} failed out of {} total",
"Async bulk operation completed: {} succeeded, {} failed out of {} total, took {}ms",
successRequests.size(),
failedRequests.size(),
entities.size());
entities.size(),
totalDurationNanos / 1_000_000);
return result;
}
@ -6871,35 +6913,142 @@ public abstract class EntityRepository<T extends EntityInterface> {
List<BulkResponse> successRequests = Collections.synchronizedList(new ArrayList<>());
List<BulkResponse> failedRequests = Collections.synchronizedList(new ArrayList<>());
BulkExecutor bulkExecutor = BulkExecutor.getInstance();
// Track overall wall-clock time
long bulkStartTime = System.nanoTime();
// Check if system can accept more work
if (!bulkExecutor.hasCapacity()) {
LOG.warn(
"Bulk operation rejected: queue full (depth={}, max={})",
bulkExecutor.getQueueDepth(),
bulkExecutor.getQueueSize());
result.setStatus(ApiStatus.FAILURE);
result.setNumberOfRowsProcessed(0);
result.setNumberOfRowsFailed(entities.size());
for (T entity : entities) {
failedRequests.add(
new BulkResponse()
.withRequest(entity.getFullyQualifiedName())
.withStatus(Status.SERVICE_UNAVAILABLE.getStatusCode())
.withMessage("System overloaded, please retry later"));
}
result.setFailedRequest(failedRequests);
recordBulkMetrics(entityType, entities.size(), 0, System.nanoTime() - bulkStartTime, 0, 0);
return result;
}
LOG.info(
"Starting bulk operation for {} {} entities (active={}, queued={})",
entities.size(),
entityType,
bulkExecutor.getActiveCount(),
bulkExecutor.getQueueDepth());
List<CompletableFuture<Void>> futures = new ArrayList<>();
// Track per-entity latencies for accurate metrics
List<Long> entityLatenciesNanos = Collections.synchronizedList(new ArrayList<>());
// Capture parent thread's latency context for propagation to worker threads
final RequestLatencyContext.RequestContext parentLatencyContext =
RequestLatencyContext.getContext();
for (T entity : entities) {
CompletableFuture<Void> future =
CompletableFuture.runAsync(
() -> {
CompletableFuture<Void> future = new CompletableFuture<>();
futures.add(future);
final long submitTime = System.nanoTime();
try {
bulkExecutor.submit(
() -> {
// Propagate latency context to worker thread for accurate DB/Search tracking
if (parentLatencyContext != null) {
RequestLatencyContext.setContext(parentLatencyContext);
}
try {
long entityStartTime = System.nanoTime();
long queueWaitTime = entityStartTime - submitTime;
try {
PutResponse<T> putResponse = bulkCreateOrUpdateEntity(uriInfo, entity, userName);
long entityDuration = System.nanoTime() - entityStartTime;
entityLatenciesNanos.add(entityDuration);
successRequests.add(
new BulkResponse()
.withRequest(entity.getFullyQualifiedName())
.withStatus(Status.OK.getStatusCode()));
createChangeEventForBulkOperation(
putResponse.getEntity(), putResponse.getChangeType(), userName);
} catch (Exception e) {
LOG.warn("Failed to process entity in bulk operation", e);
failedRequests.add(
new BulkResponse()
.withRequest(entity.getFullyQualifiedName())
.withStatus(Status.BAD_REQUEST.getStatusCode())
.withMessage(e.getMessage()));
}
},
BOUNDED_BULK_EXECUTOR);
futures.add(future);
// Record per-entity metrics
recordEntityMetrics(entityType, entityDuration, queueWaitTime, true);
future.complete(null);
} catch (Exception e) {
long entityDuration = System.nanoTime() - entityStartTime;
entityLatenciesNanos.add(entityDuration);
recordEntityMetrics(entityType, entityDuration, queueWaitTime, false);
handleBulkOperationError(entity, e, failedRequests);
future.complete(null); // Complete even on error so we don't hang
}
} finally {
// Clear context from worker thread to prevent memory leaks in pooled threads
if (parentLatencyContext != null) {
RequestLatencyContext.clearContext();
}
}
});
} catch (java.util.concurrent.RejectedExecutionException e) {
// Queue became full between check and submit
LOG.warn("Task rejected for entity: {}", entity.getFullyQualifiedName());
failedRequests.add(
new BulkResponse()
.withRequest(entity.getFullyQualifiedName())
.withStatus(Status.SERVICE_UNAVAILABLE.getStatusCode())
.withMessage("System overloaded, please retry later"));
future.complete(null);
}
}
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join();
// Wait with timeout
boolean timedOut = false;
try {
CompletableFuture.allOf(futures.toArray(new CompletableFuture[0]))
.get(bulkExecutor.getTimeoutSeconds(), TimeUnit.SECONDS);
} catch (java.util.concurrent.TimeoutException e) {
timedOut = true;
LOG.error(
"Bulk operation timed out after {}s. Completed: {}, Failed: {}, Total: {}",
bulkExecutor.getTimeoutSeconds(),
successRequests.size(),
failedRequests.size(),
entities.size());
// Check each future to find which entities actually timed out
// futures[i] corresponds to entities[i]
for (int i = 0; i < futures.size(); i++) {
CompletableFuture<Void> future = futures.get(i);
if (!future.isDone()) {
T entity = entities.get(i);
failedRequests.add(
new BulkResponse()
.withRequest(entity.getFullyQualifiedName())
.withStatus(Status.REQUEST_TIMEOUT.getStatusCode())
.withMessage("Operation timed out"));
// Cancel the future to signal we're no longer interested
future.cancel(false);
}
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
LOG.warn("Bulk operation interrupted");
} catch (ExecutionException e) {
LOG.error("Unexpected error in bulk operation", e.getCause());
}
long totalDurationNanos = System.nanoTime() - bulkStartTime;
result.setNumberOfRowsProcessed(entities.size());
result.setNumberOfRowsPassed(successRequests.size());
@ -6907,16 +7056,148 @@ public abstract class EntityRepository<T extends EntityInterface> {
result.setSuccessRequest(successRequests);
result.setFailedRequest(failedRequests);
if (failedRequests.size() > 0) {
result.setStatus(ApiStatus.PARTIAL_SUCCESS);
if (!failedRequests.isEmpty()) {
result.setStatus(successRequests.isEmpty() ? ApiStatus.FAILURE : ApiStatus.PARTIAL_SUCCESS);
}
// Calculate and log detailed metrics
long avgEntityLatencyMs = 0;
long maxEntityLatencyMs = 0;
if (!entityLatenciesNanos.isEmpty()) {
avgEntityLatencyMs =
entityLatenciesNanos.stream().mapToLong(Long::longValue).sum()
/ entityLatenciesNanos.size()
/ 1_000_000;
maxEntityLatencyMs =
entityLatenciesNanos.stream().mapToLong(Long::longValue).max().orElse(0) / 1_000_000;
}
long totalDurationMs = totalDurationNanos / 1_000_000;
double throughput = entities.size() * 1000.0 / Math.max(1, totalDurationMs);
// Record bulk operation metrics
recordBulkMetrics(
entityType,
entities.size(),
successRequests.size(),
totalDurationNanos,
avgEntityLatencyMs,
maxEntityLatencyMs);
LOG.info(
"Bulk operation completed: {} succeeded, {} failed out of {} total",
"Bulk operation completed: entity={}, total={}, succeeded={}, failed={}, "
+ "wallClockMs={}, avgEntityMs={}, maxEntityMs={}, throughput={}/s",
entityType,
entities.size(),
successRequests.size(),
failedRequests.size(),
entities.size());
totalDurationMs,
avgEntityLatencyMs,
maxEntityLatencyMs,
String.format("%.1f", throughput));
return result;
}
private void recordEntityMetrics(
String entityType, long durationNanos, long queueWaitNanos, boolean success) {
// Per-entity processing time (cached, no histogram to reduce Prometheus cardinality)
// This fires for EVERY entity in a bulk operation, so we use simple timers.
// The bulk.operation.latency metric has histograms for percentile analysis.
String latencyKey = entityType + "|" + success;
Timer latencyTimer =
ENTITY_LATENCY_TIMERS.computeIfAbsent(
latencyKey,
k ->
Timer.builder("bulk.entity.latency")
.tag("entity", entityType)
.tag("success", String.valueOf(success))
.register(Metrics.globalRegistry));
latencyTimer.record(durationNanos, TimeUnit.NANOSECONDS);
// Queue wait time (cached, simple timer)
Timer queueTimer =
ENTITY_QUEUE_WAIT_TIMERS.computeIfAbsent(
entityType,
k ->
Timer.builder("bulk.entity.queue_wait")
.tag("entity", entityType)
.register(Metrics.globalRegistry));
queueTimer.record(queueWaitNanos, TimeUnit.NANOSECONDS);
}
private void recordBulkMetrics(
String entityType,
int totalEntities,
int successCount,
long durationNanos,
long avgEntityMs,
long maxEntityMs) {
// Total bulk operation time (cached)
Timer operationTimer =
BULK_OPERATION_TIMERS.computeIfAbsent(
entityType,
k ->
Timer.builder("bulk.operation.latency")
.tag("entity", entityType)
.publishPercentileHistogram(true)
.register(Metrics.globalRegistry));
operationTimer.record(durationNanos, TimeUnit.NANOSECONDS);
// Batch size distribution (cached)
DistributionSummary batchSizeSummary =
BATCH_SIZE_SUMMARIES.computeIfAbsent(
entityType,
k ->
DistributionSummary.builder("bulk.operation.batch_size")
.tag("entity", entityType)
.register(Metrics.globalRegistry));
batchSizeSummary.record(totalEntities);
// Success rate as distribution (cached, avoids gauge memory leak)
if (totalEntities > 0) {
DistributionSummary successRateSummary =
SUCCESS_RATE_SUMMARIES.computeIfAbsent(
entityType,
k ->
DistributionSummary.builder("bulk.operation.success_rate")
.tag("entity", entityType)
.register(Metrics.globalRegistry));
successRateSummary.record(successCount * 100.0 / totalEntities);
}
// Record success and failure counts for alerting (Micrometer caches counters internally)
Metrics.counter("bulk.operation.entities.success", "entity", entityType)
.increment(successCount);
Metrics.counter("bulk.operation.entities.failed", "entity", entityType)
.increment(totalEntities - successCount);
}
private void handleBulkOperationError(T entity, Exception e, List<BulkResponse> failedRequests) {
String fqn = entity.getFullyQualifiedName();
int statusCode;
String message;
// Categorize errors properly
if (e instanceof jakarta.ws.rs.WebApplicationException wae) {
statusCode = wae.getResponse().getStatus();
message = e.getMessage();
LOG.warn("Entity {} failed with status {}: {}", fqn, statusCode, message);
} else if (e instanceof java.sql.SQLException) {
statusCode = Status.INTERNAL_SERVER_ERROR.getStatusCode();
message = "Database error: " + e.getMessage();
LOG.error("Database error processing entity {}", fqn, e);
} else if (e instanceof IllegalArgumentException || e instanceof IllegalStateException) {
statusCode = Status.BAD_REQUEST.getStatusCode();
message = e.getMessage();
LOG.warn("Validation error for entity {}: {}", fqn, message);
} else {
statusCode = Status.INTERNAL_SERVER_ERROR.getStatusCode();
message = "Unexpected error: " + e.getMessage();
LOG.error("Unexpected error processing entity {}", fqn, e);
}
failedRequests.add(
new BulkResponse().withRequest(fqn).withStatus(statusCode).withMessage(message));
}
}

View file

@ -7,13 +7,30 @@ import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.time.Duration;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.atomic.AtomicInteger;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
import lombok.Getter;
import lombok.extern.slf4j.Slf4j;
/**
* Thread-local context for tracking request latencies using Micrometer.
* This provides accurate latency measurements including percentiles.
* Thread-local context for tracking request latencies using Micrometer. This provides accurate
* latency measurements including percentiles.
*
* <p><b>Thread-Safety and Multi-threaded Requests:</b>
*
* <p>This context can be shared across multiple worker threads using {@link #setContext} for
* operations like bulk processing. When shared:
*
* <ul>
* <li>Database and search time tracking uses atomic operations and aggregates correctly
* <li>Operation counts (dbOperationCount, searchOperationCount) aggregate correctly
* <li><b>Internal time calculation may be inaccurate</b> when multiple threads work concurrently,
* as the internal timer start point is shared. This is a known limitation.
* </ul>
*
* <p>For parallel operations, the "internal" percentage should be interpreted as approximate. The
* database and search percentages remain accurate as they track cumulative time across all threads.
*/
@Slf4j
public class RequestLatencyContext {
@ -56,6 +73,9 @@ public class RequestLatencyContext {
requestContext.set(context);
String normalizedEndpoint = normalizeUri(endpoint);
String timerKey = normalizedEndpoint + "|" + normalizedMethod;
// Total request timer with histogram for percentile analysis.
// Uses reduced SLO buckets (5 instead of 9) to minimize Prometheus cardinality.
// Histogram provides p50/p95/p99/p99.9 percentiles automatically.
Timer timer =
requestTimers.computeIfAbsent(
timerKey,
@ -68,15 +88,11 @@ public class RequestLatencyContext {
.minimumExpectedValue(Duration.ofMillis(1))
.maximumExpectedValue(Duration.ofSeconds(60))
.serviceLevelObjectives(
Duration.ofMillis(10),
Duration.ofMillis(50),
Duration.ofMillis(100),
Duration.ofMillis(200),
Duration.ofMillis(500),
Duration.ofSeconds(1),
Duration.ofSeconds(2),
Duration.ofSeconds(5),
Duration.ofSeconds(10))
Duration.ofMillis(100), // Fast response
Duration.ofMillis(500), // Acceptable
Duration.ofSeconds(1), // Slow
Duration.ofSeconds(5), // Very slow
Duration.ofSeconds(10)) // Timeout threshold
.register(Metrics.globalRegistry));
LOG.debug(
"Created/retrieved timer for endpoint: {}, method: {}, timer: {}",
@ -84,7 +100,7 @@ public class RequestLatencyContext {
normalizedMethod,
timer);
context.requestTimerSample = Timer.start(Metrics.globalRegistry);
context.internalTimerStartNanos = System.nanoTime();
context.internalTimerStartNanos.set(System.nanoTime());
}
/**
@ -96,12 +112,14 @@ public class RequestLatencyContext {
return null;
}
if (context.internalTimerStartNanos > 0) {
context.internalTime += System.nanoTime() - context.internalTimerStartNanos;
context.internalTimerStartNanos = 0;
// Atomically read and reset internalTimerStartNanos to prevent race conditions
// when multiple threads call this concurrently on the same context
long internalStart = context.internalTimerStartNanos.getAndSet(0);
if (internalStart > 0) {
context.internalTime.addAndGet(System.nanoTime() - internalStart);
}
context.dbOperationCount++;
context.dbOperationCount.incrementAndGet();
return Timer.start(Metrics.globalRegistry);
}
@ -115,9 +133,9 @@ public class RequestLatencyContext {
// Use the shared dummy timer to measure elapsed time without recording
long duration = timerSample.stop(DUMMY_TIMER);
context.dbTime += duration;
context.dbTime.addAndGet(duration);
context.internalTimerStartNanos = System.nanoTime();
context.internalTimerStartNanos.set(System.nanoTime());
}
public static Timer.Sample startSearchOperation() {
@ -125,12 +143,14 @@ public class RequestLatencyContext {
if (context == null) {
return null;
}
if (context.internalTimerStartNanos > 0) {
context.internalTime += System.nanoTime() - context.internalTimerStartNanos;
context.internalTimerStartNanos = 0;
// Atomically read and reset internalTimerStartNanos to prevent race conditions
long internalStart = context.internalTimerStartNanos.getAndSet(0);
if (internalStart > 0) {
context.internalTime.addAndGet(System.nanoTime() - internalStart);
}
context.searchOperationCount++;
context.searchOperationCount.incrementAndGet();
return Timer.start(Metrics.globalRegistry);
}
@ -144,10 +164,10 @@ public class RequestLatencyContext {
// Use the shared dummy timer to measure elapsed time without recording
long duration = timerSample.stop(DUMMY_TIMER);
context.searchTime += duration;
context.searchTime.addAndGet(duration);
// Resume internal timer
context.internalTimerStartNanos = System.nanoTime();
context.internalTimerStartNanos.set(System.nanoTime());
}
public static void endRequest() {
@ -172,12 +192,22 @@ public class RequestLatencyContext {
}
}
if (context.internalTimerStartNanos > 0) {
context.internalTime += System.nanoTime() - context.internalTimerStartNanos;
long finalInternalStart = context.internalTimerStartNanos.get();
if (finalInternalStart > 0) {
context.internalTime.addAndGet(System.nanoTime() - finalInternalStart);
}
// Record per-request timers (not per-operation)
// This gives us the total DB time for THIS request
// Get final values from atomic fields
long dbTimeNanos = context.dbTime.get();
long searchTimeNanos = context.searchTime.get();
long internalTimeNanos = context.internalTime.get();
int dbOps = context.dbOperationCount.get();
int searchOps = context.searchOperationCount.get();
// Record per-request component timers (not per-operation)
// These use simple timers without histograms to reduce Prometheus cardinality.
// The total request timer (request.latency.total) has histograms for percentile analysis.
// Component timers provide mean/max/count which is sufficient for bottleneck identification.
Timer dbTimer =
databaseTimers.computeIfAbsent(
timerKey,
@ -186,22 +216,9 @@ public class RequestLatencyContext {
.tag(ENDPOINT, normalizedEndpoint)
.tag(METHOD, context.method)
.description("Total database latency per request")
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(1))
.maximumExpectedValue(Duration.ofSeconds(30))
.serviceLevelObjectives(
Duration.ofMillis(5),
Duration.ofMillis(10),
Duration.ofMillis(25),
Duration.ofMillis(50),
Duration.ofMillis(100),
Duration.ofMillis(250),
Duration.ofMillis(500),
Duration.ofSeconds(1),
Duration.ofSeconds(2))
.register(Metrics.globalRegistry));
if (context.dbTime > 0) {
dbTimer.record(context.dbTime, java.util.concurrent.TimeUnit.NANOSECONDS);
if (dbTimeNanos > 0) {
dbTimer.record(dbTimeNanos, java.util.concurrent.TimeUnit.NANOSECONDS);
}
// Record total search time for THIS request
@ -213,22 +230,9 @@ public class RequestLatencyContext {
.tag(ENDPOINT, normalizedEndpoint)
.tag(METHOD, context.method)
.description("Total search latency per request")
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(1))
.maximumExpectedValue(Duration.ofSeconds(30))
.serviceLevelObjectives(
Duration.ofMillis(5),
Duration.ofMillis(10),
Duration.ofMillis(25),
Duration.ofMillis(50),
Duration.ofMillis(100),
Duration.ofMillis(250),
Duration.ofMillis(500),
Duration.ofSeconds(1),
Duration.ofSeconds(2))
.register(Metrics.globalRegistry));
if (context.searchTime > 0) {
searchTimer.record(context.searchTime, java.util.concurrent.TimeUnit.NANOSECONDS);
if (searchTimeNanos > 0) {
searchTimer.record(searchTimeNanos, java.util.concurrent.TimeUnit.NANOSECONDS);
}
// Record internal processing time for THIS request
@ -240,42 +244,29 @@ public class RequestLatencyContext {
.tag(ENDPOINT, normalizedEndpoint)
.tag(METHOD, context.method)
.description("Internal processing latency per request")
.publishPercentileHistogram(true)
.minimumExpectedValue(Duration.ofMillis(1))
.maximumExpectedValue(Duration.ofSeconds(10))
.serviceLevelObjectives(
Duration.ofMillis(1),
Duration.ofMillis(5),
Duration.ofMillis(10),
Duration.ofMillis(25),
Duration.ofMillis(50),
Duration.ofMillis(100),
Duration.ofMillis(250),
Duration.ofMillis(500),
Duration.ofSeconds(1))
.register(Metrics.globalRegistry));
if (context.internalTime > 0) {
internalTimer.record(context.internalTime, java.util.concurrent.TimeUnit.NANOSECONDS);
if (internalTimeNanos > 0) {
internalTimer.record(internalTimeNanos, java.util.concurrent.TimeUnit.NANOSECONDS);
}
// Record operation counts as distribution summaries to get avg/max/percentiles
if (context.dbOperationCount > 0) {
if (dbOps > 0) {
Metrics.summary(
"request.operations.database", ENDPOINT, normalizedEndpoint, METHOD, context.method)
.record(context.dbOperationCount);
.record(dbOps);
}
if (context.searchOperationCount > 0) {
if (searchOps > 0) {
Metrics.summary(
"request.operations.search", ENDPOINT, normalizedEndpoint, METHOD, context.method)
.record(context.searchOperationCount);
.record(searchOps);
}
if (context.totalTime > 0) {
long totalNanos = context.totalTime;
double dbPercent = (context.dbTime * 100.0) / totalNanos;
double searchPercent = (context.searchTime * 100.0) / totalNanos;
double internalPercent = (context.internalTime * 100.0) / totalNanos;
double dbPercent = (dbTimeNanos * 100.0) / totalNanos;
double searchPercent = (searchTimeNanos * 100.0) / totalNanos;
double internalPercent = (internalTimeNanos * 100.0) / totalNanos;
// Get or create percentage holder for this endpoint and method
PercentageHolder holder =
@ -315,15 +306,17 @@ public class RequestLatencyContext {
// Log slow requests (over 1 second)
if (context.totalTime > 1_000_000_000L) {
LOG.warn(
"Slow request detected - endpoint: {}, total: {}ms, db: {}ms ({}%), search: {}ms ({}%), internal: {}ms ({}%)",
"Slow request detected - endpoint: {}, total: {}ms, db: {}ms ({}%), search: {}ms ({}%), internal: {}ms ({}%), dbOps: {}, searchOps: {}",
context.endpoint,
context.totalTime / 1_000_000,
context.dbTime / 1_000_000,
(context.dbTime * 100) / context.totalTime,
context.searchTime / 1_000_000,
(context.searchTime * 100) / context.totalTime,
context.internalTime / 1_000_000,
(context.internalTime * 100) / context.totalTime);
dbTimeNanos / 1_000_000,
(dbTimeNanos * 100) / context.totalTime,
searchTimeNanos / 1_000_000,
(searchTimeNanos * 100) / context.totalTime,
internalTimeNanos / 1_000_000,
(internalTimeNanos * 100) / context.totalTime,
dbOps,
searchOps);
}
} finally {
@ -331,20 +324,64 @@ public class RequestLatencyContext {
}
}
/**
* Get the current request context for propagation to child threads. This allows virtual threads
* or async tasks to share the same timing context as the parent request thread.
*
* @return the current RequestContext, or null if not in a request context
*/
public static RequestContext getContext() {
return requestContext.get();
}
/**
* Set the request context in the current thread. Used by child threads to inherit the parent's
* timing context for accurate metrics tracking across thread boundaries.
*
* @param context the RequestContext to set (obtained from parent thread via getContext())
*/
public static void setContext(RequestContext context) {
if (context != null) {
requestContext.set(context);
}
}
/**
* Clear the request context from the current thread. Should be called in finally blocks by child
* threads that received context via setContext() to prevent memory leaks in pooled threads.
*/
public static void clearContext() {
requestContext.remove();
}
/**
* Reset all static state. This is primarily for testing to ensure clean state between tests. This
* clears all timer maps so that new timers will be created and registered with the current
* registry.
*/
public static void reset() {
requestContext.remove();
requestTimers.clear();
databaseTimers.clear();
searchTimers.clear();
internalTimers.clear();
percentageHolders.clear();
}
@Getter
private static class RequestContext {
public static class RequestContext {
final String endpoint;
final String method;
Timer.Sample requestTimerSample;
long internalTimerStartNanos = 0;
volatile Timer.Sample requestTimerSample;
final AtomicLong internalTimerStartNanos = new AtomicLong(0);
long totalTime = 0;
long dbTime = 0;
long searchTime = 0;
long internalTime = 0;
volatile long totalTime = 0;
final AtomicLong dbTime = new AtomicLong(0);
final AtomicLong searchTime = new AtomicLong(0);
final AtomicLong internalTime = new AtomicLong(0);
int dbOperationCount = 0;
int searchOperationCount = 0;
final AtomicInteger dbOperationCount = new AtomicInteger(0);
final AtomicInteger searchOperationCount = new AtomicInteger(0);
RequestContext(String endpoint, String method) {
this.endpoint = endpoint;

View file

@ -12,6 +12,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchResponse;
import es.co.elastic.clients.json.JsonData;
import es.co.elastic.clients.json.JsonpMapper;
import es.co.elastic.clients.util.NamedValue;
import io.micrometer.core.instrument.Timer;
import jakarta.json.JsonObject;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
@ -29,6 +30,7 @@ import org.openmetadata.schema.tests.DataQualityReport;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.exception.SearchException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.monitoring.RequestLatencyContext;
import org.openmetadata.service.resources.settings.SettingsCache;
import org.openmetadata.service.search.AggregationManagementClient;
import org.openmetadata.service.search.SearchAggregation;
@ -177,8 +179,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli
searchRequestBuilder.size(0);
searchRequestBuilder.timeout("30s");
SearchResponse<JsonData> searchResponse =
client.search(searchRequestBuilder.build(), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequestBuilder.build(), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String responseJson = serializeSearchResponse(searchResponse);
return Response.status(Response.Status.OK).entity(responseJson).build();
@ -230,7 +239,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli
aggregations.size());
LOG.debug("Generic Aggregation - Full aggregations: {}", aggregations);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String response = serializeSearchResponse(searchResponse);
LOG.info(
@ -327,7 +344,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli
aggregations.size());
LOG.debug("Generic Aggregation with RBAC - Full aggregations: {}", aggregations);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String response = serializeSearchResponse(searchResponse);
LOG.info(
@ -416,8 +441,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli
searchRequestBuilder.size(0);
searchRequestBuilder.timeout("30s");
SearchResponse<JsonData> searchResponse =
client.search(searchRequestBuilder.build(), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequestBuilder.build(), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String response = serializeSearchResponse(searchResponse);
JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject();
@ -520,7 +552,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli
// Build and execute search
SearchRequest searchRequest = requestBuilder.build(resolvedIndex);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
LOG.info("Entity type counts query for index '{}' (resolved: '{}')", index, resolvedIndex);

View file

@ -14,6 +14,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchRequest;
import es.co.elastic.clients.elasticsearch.core.SearchResponse;
import es.co.elastic.clients.elasticsearch.indices.GetMappingResponse;
import es.co.elastic.clients.json.JsonData;
import io.micrometer.core.instrument.Timer;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import java.io.StringReader;
@ -35,6 +36,7 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
import org.openmetadata.service.jdbi3.DataInsightChartRepository;
import org.openmetadata.service.jdbi3.DataInsightSystemChartRepository;
import org.openmetadata.service.monitoring.RequestLatencyContext;
import org.openmetadata.service.search.DataInsightAggregatorClient;
import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchAggregatedUnusedAssetsCountAggregator;
import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchAggregatedUnusedAssetsSizeAggregator;
@ -77,7 +79,15 @@ public class ElasticSearchDataInsightAggregatorManager implements DataInsightAgg
new HashMap<>();
SearchRequest searchRequest =
aggregator.prepareSearchRequest(diChart, start, end, formulas, metricFormulaHolder, live);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return aggregator.processSearchResponse(
diChart, searchResponse, formulas, metricFormulaHolder);
}
@ -146,7 +156,15 @@ public class ElasticSearchDataInsightAggregatorManager implements DataInsightAgg
from,
queryFilter,
dataReportIndex);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return Response.status(OK)
.entity(processDataInsightChartResult(searchResponse, dataInsightChartName))
.build();
@ -160,7 +178,15 @@ public class ElasticSearchDataInsightAggregatorManager implements DataInsightAgg
}
SearchRequest searchRequest = QueryCostRecordsAggregator.getQueryCostRecords(serviceName);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return QueryCostRecordsAggregator.parseQueryCostResponse(searchResponse);
}

View file

@ -29,6 +29,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchResponse;
import es.co.elastic.clients.elasticsearch.core.search.Hit;
import es.co.elastic.clients.json.JsonData;
import es.co.elastic.clients.json.JsonpMapper;
import io.micrometer.core.instrument.Timer;
import jakarta.json.Json;
import jakarta.json.JsonObject;
import jakarta.json.JsonReader;
@ -150,7 +151,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
b.must(
m -> m.term(t -> t.field("sourceUrl").value(sourceUrl))))));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String responseJson = serializeSearchResponse(response);
return Response.status(OK).entity(responseJson).build();
}
@ -177,7 +186,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
.filter(
f -> f.term(t -> t.field("deleted").value(deleted))))));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String responseJson = serializeSearchResponse(response);
return Response.status(OK).entity(responseJson).build();
}
@ -302,7 +319,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
try {
SearchRequest searchRequest = requestBuilder.build(index);
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
List<Map<String, Object>> results = new ArrayList<>();
if (response.hits().hits() != null) {
@ -435,7 +460,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
try {
SearchRequest searchRequest = requestBuilder.build(index);
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
List<Map<String, Object>> results = new ArrayList<>();
Object[] lastHitSortValues = null;
@ -528,11 +561,13 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
addAggregationsToNLQQuery(requestBuilder, request.getIndex());
SearchRequest searchRequest = requestBuilder.build(request.getIndex());
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
// End search operation timing
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
// Cache successful queries
@ -621,7 +656,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
// Build and execute search request
SearchRequest searchRequest = requestBuilder.build(request.getIndex());
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String responseJson = serializeSearchResponse(response);
LOG.debug("Direct query search completed successfully");
@ -813,7 +856,16 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
Query boolQuery = Query.of(q -> q.bool(b -> b.must(mustQueries).filter(filterQueries)));
return client.search(s -> s.index(indexName).query(boolQuery).size(1000), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(s -> s.index(indexName).query(boolQuery).size(1000), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return response;
}
private void applySearchFilter(String filter, ElasticSearchRequestBuilder requestBuilder)
@ -1098,15 +1150,16 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
LOG.debug("Executing search on index: {}, query: {}", request.getIndex(), request.getQuery());
try {
io.micrometer.core.instrument.Timer.Sample searchTimerSample =
org.openmetadata.service.monitoring.RequestLatencyContext.startSearchOperation();
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchRequest searchRequest = requestBuilder.build(request.getIndex());
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
if (searchTimerSample != null) {
org.openmetadata.service.monitoring.RequestLatencyContext.endSearchOperation(
searchTimerSample);
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
if (!Boolean.TRUE.equals(request.getIsHierarchy())) {
@ -1194,7 +1247,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
// Execute search to get aggregations for parent terms
SearchRequest searchRequest = requestBuilder.build(request.getIndex());
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
if (searchResponse.aggregations() != null
&& searchResponse.aggregations().containsKey("fqnParts_agg")) {
@ -1396,7 +1457,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
addAggregationsToNLQQuery(requestBuilder, request.getIndex());
SearchRequest searchRequest = requestBuilder.build(request.getIndex());
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return Response.status(Response.Status.OK)
.entity(serializeSearchResponse(searchResponse))
@ -1431,7 +1500,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
.query(query)
.size(1000));
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (Hit<JsonData> hit : searchResponse.hits().hits()) {
if (hit.source() != null) {
@ -1478,7 +1555,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
.query(query)
.size(1000));
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (Hit<JsonData> hit : searchResponse.hits().hits()) {
if (hit.source() != null) {
@ -1581,7 +1666,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient {
.query(finalMainQuery)
.size(1000));
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (Hit<JsonData> hit : searchResponse.hits().hits()) {
if (hit.source() == null) {

View file

@ -23,6 +23,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchRequest;
import es.co.elastic.clients.elasticsearch.core.SearchResponse;
import es.co.elastic.clients.elasticsearch.core.search.Hit;
import es.co.elastic.clients.json.JsonData;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.io.StringReader;
import java.util.HashMap;
@ -37,6 +38,7 @@ import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.exception.SearchException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.monitoring.RequestLatencyContext;
@Slf4j
public class EsUtils {
@ -129,7 +131,14 @@ public class EsUtils {
}
}
return client.search(searchRequestBuilder.build(), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
try {
return client.search(searchRequestBuilder.build(), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
}
public static Map<String, Object> searchEREntityByKey(
@ -183,7 +192,15 @@ public class EsUtils {
null,
null,
fieldsToRemove);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (Hit<JsonData> hit : searchResponse.hits().hits()) {
if (hit.source() != null) {
@ -298,7 +315,15 @@ public class EsUtils {
null,
null,
fieldsToRemove);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (Hit<JsonData> hit : searchResponse.hits().hits()) {
if (hit.source() != null) {
@ -376,7 +401,14 @@ public class EsUtils {
// Apply query filter
buildSearchSourceFilter(queryFilter, searchRequestBuilder);
return client.search(searchRequestBuilder.build(), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
try {
return client.search(searchRequestBuilder.build(), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
}
private static Query buildBoolQueriesWithShould(Map<String, Set<String>> keysAndValues) {

View file

@ -164,11 +164,11 @@ public class GuavaLineageGraphCache implements LineageGraphCache {
public void logStats() {
CacheStats stats = cache.stats();
LOG.info(
"Cache Statistics: size={}, hits={}, misses={}, hitRatio={:.2f}, evictions={}, loadTime={}ms",
"Cache Statistics: size={}, hits={}, misses={}, hitRatio={}, evictions={}, loadTime={}ms",
cache.size(),
stats.hitCount(),
stats.missCount(),
getHitRatio(),
String.format("%.2f", getHitRatio()),
stats.evictionCount(),
stats.totalLoadTime() / 1_000_000); // Convert nanos to millis
}

View file

@ -3,6 +3,7 @@ package org.openmetadata.service.search.opensearch;
import com.fasterxml.jackson.core.JsonProcessingException;
import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import io.micrometer.core.instrument.Timer;
import jakarta.json.JsonObject;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
@ -20,6 +21,7 @@ import org.openmetadata.schema.tests.DataQualityReport;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.exception.SearchException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.monitoring.RequestLatencyContext;
import org.openmetadata.service.resources.settings.SettingsCache;
import org.openmetadata.service.search.AggregationManagementClient;
import org.openmetadata.service.search.SearchAggregation;
@ -174,8 +176,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient
searchRequestBuilder.size(0);
searchRequestBuilder.timeout("30s");
SearchResponse<JsonData> searchResponse =
client.search(searchRequestBuilder.build(), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequestBuilder.build(), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return Response.status(Response.Status.OK).entity(searchResponse.toJsonString()).build();
} catch (Exception e) {
LOG.error("Failed to execute aggregation", e);
@ -217,8 +226,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient
searchRequestBuilder.size(0);
searchRequestBuilder.timeout("30s");
SearchResponse<JsonData> searchResponse =
client.search(searchRequestBuilder.build(), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequestBuilder.build(), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String response = searchResponse.toJsonString();
JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject();
@ -296,8 +312,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient
searchRequestBuilder.size(0);
searchRequestBuilder.timeout("30s");
SearchResponse<JsonData> searchResponse =
client.search(searchRequestBuilder.build(), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequestBuilder.build(), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String response = searchResponse.toJsonString();
JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject();
@ -373,8 +396,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient
searchRequestBuilder.size(0);
searchRequestBuilder.timeout("30s");
SearchResponse<JsonData> searchResponse =
client.search(searchRequestBuilder.build(), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequestBuilder.build(), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String response = searchResponse.toJsonString();
JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject();
@ -460,7 +490,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient
// Build and execute search
SearchRequest searchRequest = requestBuilder.build(resolvedIndex);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
LOG.info("Entity type counts query for index '{}' (resolved: '{}')", index, resolvedIndex);

View file

@ -3,6 +3,7 @@ package org.openmetadata.service.search.opensearch;
import static jakarta.ws.rs.core.Response.Status.OK;
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
import io.micrometer.core.instrument.Timer;
import jakarta.ws.rs.core.Response;
import java.io.IOException;
import java.util.ArrayList;
@ -23,6 +24,7 @@ import org.openmetadata.service.Entity;
import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface;
import org.openmetadata.service.jdbi3.DataInsightChartRepository;
import org.openmetadata.service.jdbi3.DataInsightSystemChartRepository;
import org.openmetadata.service.monitoring.RequestLatencyContext;
import org.openmetadata.service.search.DataInsightAggregatorClient;
import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchAggregatedUnusedAssetsCountAggregator;
import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchAggregatedUnusedAssetsSizeAggregator;
@ -76,7 +78,15 @@ public class OpenSearchDataInsightAggregatorManager implements DataInsightAggreg
new HashMap<>();
SearchRequest searchRequest =
aggregator.prepareSearchRequest(diChart, start, end, formulas, metricFormulaHolder, live);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return aggregator.processSearchResponse(
diChart, searchResponse, formulas, metricFormulaHolder);
}
@ -145,7 +155,15 @@ public class OpenSearchDataInsightAggregatorManager implements DataInsightAggreg
from,
queryFilter,
dataReportIndex);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return Response.status(OK)
.entity(processDataInsightChartResultNew(searchResponse, dataInsightChartName))
.build();
@ -159,7 +177,15 @@ public class OpenSearchDataInsightAggregatorManager implements DataInsightAggreg
}
SearchRequest searchRequest = QueryCostRecordsAggregator.getQueryCostRecords(serviceName);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return QueryCostRecordsAggregator.parseQueryCostResponse(searchResponse);
}

View file

@ -170,7 +170,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
t.field("sourceUrl")
.value(FieldValue.of(sourceUrl)))))));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return Response.status(OK).entity(response.toJsonString()).build();
}
@ -200,7 +208,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
t.field("deleted")
.value(FieldValue.of(deleted)))))));
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return Response.status(OK).entity(response.toJsonString()).build();
}
@ -279,7 +295,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
try {
SearchRequest searchRequest = requestBuilder.build(index);
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
List<Map<String, Object>> results = new ArrayList<>();
if (response.hits().hits() != null) {
@ -449,7 +473,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
try {
SearchRequest searchRequest = requestBuilder.build(index);
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
List<Map<String, Object>> results = new ArrayList<>();
Object[] lastHitSortValues = null;
@ -607,7 +639,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
// Build and execute search request
SearchRequest searchRequest = requestBuilder.build(request.getIndex());
SearchResponse<JsonData> response = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> response;
try {
response = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
String responseJson = response.toJsonString();
LOG.debug("Direct query search completed successfully");
@ -799,7 +839,14 @@ public class OpenSearchSearchManager implements SearchManagementClient {
Query boolQuery = Query.of(q -> q.bool(b -> b.must(mustQueries).filter(filterQueries)));
return client.search(s -> s.index(indexName).query(boolQuery).size(1000), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
try {
return client.search(s -> s.index(indexName).query(boolQuery).size(1000), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
}
/**
@ -1252,7 +1299,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
// Execute search to get aggregations for parent terms
SearchRequest searchRequest = requestBuilder.build(request.getIndex());
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
if (searchResponse.aggregations() != null
&& searchResponse.aggregations().containsKey("fqnParts_agg")) {
@ -1441,7 +1496,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
addAggregationsToNLQQuery(requestBuilder, request.getIndex());
SearchRequest searchRequest = requestBuilder.build(request.getIndex());
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
return Response.status(Response.Status.OK).entity(searchResponse.toJsonString()).build();
} catch (Exception e) {
@ -1469,7 +1532,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
.query(query)
.size(1000));
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (var hit : searchResponse.hits().hits()) {
if (hit.source() != null) {
@ -1516,7 +1587,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
.query(query)
.size(1000));
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (var hit : searchResponse.hits().hits()) {
if (hit.source() != null) {
@ -1618,7 +1697,15 @@ public class OpenSearchSearchManager implements SearchManagementClient {
.query(finalBaseQuery)
.size(1000));
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (var hit : searchResponse.hits().hits()) {
if (hit.source() != null) {

View file

@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.JsonNode;
import com.fasterxml.jackson.databind.ObjectMapper;
import com.fasterxml.jackson.databind.node.ObjectNode;
import com.nimbusds.jose.util.Pair;
import io.micrometer.core.instrument.Timer;
import java.io.IOException;
import java.util.Base64;
import java.util.HashMap;
@ -27,6 +28,7 @@ import org.openmetadata.schema.api.lineage.LineageDirection;
import org.openmetadata.schema.utils.JsonUtils;
import org.openmetadata.sdk.exception.SearchException;
import org.openmetadata.service.Entity;
import org.openmetadata.service.monitoring.RequestLatencyContext;
import os.org.opensearch.client.json.JsonData;
import os.org.opensearch.client.opensearch.OpenSearchClient;
import os.org.opensearch.client.opensearch._types.FieldValue;
@ -135,8 +137,15 @@ public class OsUtils {
}
}
return client.search(
searchRequestBuilder.build(), os.org.opensearch.client.json.JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
try {
return client.search(
searchRequestBuilder.build(), os.org.opensearch.client.json.JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
}
public static Map<String, Object> searchEREntityByKey(
@ -190,7 +199,15 @@ public class OsUtils {
null,
null,
fieldsToRemove);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (Hit<JsonData> hit : searchResponse.hits().hits()) {
if (hit.source() != null) {
@ -306,7 +323,15 @@ public class OsUtils {
null,
null,
fieldsToRemove);
SearchResponse<JsonData> searchResponse = client.search(searchRequest, JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
SearchResponse<JsonData> searchResponse;
try {
searchResponse = client.search(searchRequest, JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
for (Hit<JsonData> hit : searchResponse.hits().hits()) {
if (hit.source() != null) {
@ -389,7 +414,14 @@ public class OsUtils {
// Apply query filter
buildSearchSourceFilter(queryFilter, searchRequestBuilder);
return client.search(searchRequestBuilder.build(), JsonData.class);
Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation();
try {
return client.search(searchRequestBuilder.build(), JsonData.class);
} finally {
if (searchTimerSample != null) {
RequestLatencyContext.endSearchOperation(searchTimerSample);
}
}
}
private static Query buildBoolQueriesWithShould(Map<String, Set<String>> keysAndValues) {

View file

@ -1,9 +1,13 @@
package org.openmetadata.service.util.jdbi;
import io.micrometer.core.instrument.Counter;
import io.micrometer.core.instrument.DistributionSummary;
import io.micrometer.core.instrument.Metrics;
import io.micrometer.core.instrument.Timer;
import java.time.temporal.ChronoUnit;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.regex.Pattern;
import lombok.extern.slf4j.Slf4j;
import org.jdbi.v3.core.statement.SqlLogger;
import org.jdbi.v3.core.statement.StatementContext;
@ -12,6 +16,32 @@ import org.openmetadata.service.monitoring.RequestLatencyContext;
@Slf4j
public class OMSqlLogger implements SqlLogger {
private static final String DB_TIMER_CONTEXT_KEY = "db.timer.context";
private static final Pattern SQL_TYPE_PATTERN =
Pattern.compile(
"^\\s*(SELECT|INSERT|UPDATE|DELETE|MERGE|CREATE|ALTER|DROP)\\b",
Pattern.CASE_INSENSITIVE);
private static volatile long slowQueryThresholdMs = 100;
private static final ConcurrentHashMap<String, Timer> LATENCY_TIMERS = new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Counter> QUERY_COUNTERS =
new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, Counter> SLOW_QUERY_COUNTERS =
new ConcurrentHashMap<>();
private static final ConcurrentHashMap<String, DistributionSummary> SLOW_QUERY_SUMMARIES =
new ConcurrentHashMap<>();
private static final Timer LEGACY_JDBI_TIMER = Metrics.timer("jdbi_requests_seconds");
private static final Timer LEGACY_LATENCY_TIMER = Metrics.timer("jdbi_latency_requests_seconds");
public static void setSlowQueryThresholdMs(long thresholdMs) {
slowQueryThresholdMs = thresholdMs;
LOG.info("Slow query threshold set to {} ms", thresholdMs);
}
public static long getSlowQueryThresholdMs() {
return slowQueryThresholdMs;
}
@Override
public void logBeforeExecution(StatementContext context) {
@ -19,10 +49,8 @@ public class OMSqlLogger implements SqlLogger {
LOG.debug("sql {}, parameters {}", context.getRenderedSql(), context.getBinding());
}
// Start database operation timing using Micrometer
try {
io.micrometer.core.instrument.Timer.Sample timerSample =
RequestLatencyContext.startDatabaseOperation();
Timer.Sample timerSample = RequestLatencyContext.startDatabaseOperation();
if (timerSample != null) {
context.define(DB_TIMER_CONTEXT_KEY, timerSample);
}
@ -33,20 +61,53 @@ public class OMSqlLogger implements SqlLogger {
@Override
public void logAfterExecution(StatementContext context) {
long elapsedTimeMillis = context.getElapsedTime(ChronoUnit.MILLIS);
long elapsedTime = context.getElapsedTime(ChronoUnit.SECONDS);
// Record using Micrometer API
Timer jdbiTimer = Metrics.timer("jdbi_requests_seconds");
jdbiTimer.record((long) (elapsedTime * 1000), TimeUnit.MILLISECONDS);
Timer latencyTimer = Metrics.timer("jdbi_latency_requests_seconds");
latencyTimer.record((long) (elapsedTime * 1000), TimeUnit.MILLISECONDS);
String queryType = extractQueryType(context.getRenderedSql());
// Use simple timer without histogram buckets for high-volume per-query metrics
// This significantly reduces Prometheus cardinality (histogram creates ~50 series per tag
// combo)
// Detailed latency analysis is available via slow query metrics below
Timer latencyTimer =
LATENCY_TIMERS.computeIfAbsent(
queryType,
type ->
Timer.builder("db.query.latency")
.tag("type", type)
.publishPercentileHistogram(false)
.register(Metrics.globalRegistry));
latencyTimer.record(elapsedTimeMillis, TimeUnit.MILLISECONDS);
Counter queryCounter =
QUERY_COUNTERS.computeIfAbsent(
queryType, type -> Metrics.counter("db.query.count", "type", type));
queryCounter.increment();
if (elapsedTimeMillis > slowQueryThresholdMs) {
Counter slowCounter =
SLOW_QUERY_COUNTERS.computeIfAbsent(
queryType, type -> Metrics.counter("db.query.slow", "type", type));
slowCounter.increment();
DistributionSummary slowSummary =
SLOW_QUERY_SUMMARIES.computeIfAbsent(
queryType,
type ->
DistributionSummary.builder("db.query.slow.latency")
.tag("type", type)
.register(Metrics.globalRegistry));
slowSummary.record(elapsedTimeMillis);
}
LEGACY_JDBI_TIMER.record((long) (elapsedTime * 1000), TimeUnit.MILLISECONDS);
LEGACY_LATENCY_TIMER.record((long) (elapsedTime * 1000), TimeUnit.MILLISECONDS);
// End database operation timing using Micrometer
try {
Object timerSample = context.getAttribute(DB_TIMER_CONTEXT_KEY);
if (timerSample instanceof io.micrometer.core.instrument.Timer.Sample) {
RequestLatencyContext.endDatabaseOperation(
(io.micrometer.core.instrument.Timer.Sample) timerSample);
if (timerSample instanceof Timer.Sample) {
RequestLatencyContext.endDatabaseOperation((Timer.Sample) timerSample);
}
} catch (Exception e) {
// Ignore - latency tracking is optional
@ -57,7 +118,18 @@ public class OMSqlLogger implements SqlLogger {
"sql {}, parameters {}, timeTaken {} ms",
context.getRenderedSql(),
context.getBinding(),
context.getElapsedTime(ChronoUnit.MILLIS));
elapsedTimeMillis);
}
}
private String extractQueryType(String sql) {
if (sql == null || sql.isBlank()) {
return "UNKNOWN";
}
var matcher = SQL_TYPE_PATTERN.matcher(sql);
if (matcher.find()) {
return matcher.group(1).toUpperCase();
}
return "OTHER";
}
}

View file

@ -0,0 +1,364 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.jdbi3;
import static org.junit.jupiter.api.Assertions.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.Future;
import java.util.concurrent.RejectedExecutionException;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicInteger;
import lombok.extern.slf4j.Slf4j;
import org.awaitility.Awaitility;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.openmetadata.service.config.BulkOperationConfiguration;
@Slf4j
class BulkExecutorTest {
@BeforeEach
void setUp() {
BulkExecutor.reset();
}
@AfterEach
void tearDown() {
BulkExecutor.reset();
}
@Test
void testDefaultInitialization() {
BulkExecutor executor = BulkExecutor.getInstance();
assertNotNull(executor);
assertEquals(10, executor.getMaxThreads());
assertEquals(1000, executor.getQueueSize());
assertEquals(300, executor.getTimeoutSeconds());
}
@Test
void testCustomInitialization() {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(5);
config.setQueueSize(500);
config.setTimeoutSeconds(120);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
assertEquals(5, executor.getMaxThreads());
assertEquals(500, executor.getQueueSize());
assertEquals(120, executor.getTimeoutSeconds());
}
@Test
void testConcurrencyLimit() throws InterruptedException {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(3);
config.setQueueSize(100);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
AtomicInteger maxConcurrent = new AtomicInteger(0);
AtomicInteger currentConcurrent = new AtomicInteger(0);
AtomicInteger completed = new AtomicInteger(0);
CountDownLatch allDone = new CountDownLatch(10);
for (int i = 0; i < 10; i++) {
executor.submit(
() -> {
try {
int current = currentConcurrent.incrementAndGet();
maxConcurrent.updateAndGet(max -> Math.max(max, current));
Thread.sleep(50); // Simulate work
currentConcurrent.decrementAndGet();
completed.incrementAndGet();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
allDone.countDown();
}
});
}
assertTrue(allDone.await(10, TimeUnit.SECONDS), "All tasks should complete");
assertEquals(10, completed.get(), "All 10 tasks should complete");
assertTrue(
maxConcurrent.get() <= 3,
"Max concurrent should not exceed 3, got: " + maxConcurrent.get());
LOG.info(
"Concurrency test: maxConcurrent={}, completed={}", maxConcurrent.get(), completed.get());
}
@Test
void testQueueCapacity() throws InterruptedException {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(1);
config.setQueueSize(5);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
CountDownLatch blocker = new CountDownLatch(1);
// Submit a blocking task
executor.submit(
() -> {
try {
blocker.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Fill the queue (5 tasks)
for (int i = 0; i < 5; i++) {
executor.submit(() -> {});
}
// Next submission should be rejected (queue full)
assertThrows(
RejectedExecutionException.class,
() -> executor.submit(() -> {}),
"Should reject when queue is full");
// Release blocker
blocker.countDown();
LOG.info("Queue capacity test passed");
}
@Test
void testHasCapacity() throws InterruptedException {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(1);
config.setQueueSize(2);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
assertTrue(executor.hasCapacity(), "Should have capacity initially");
CountDownLatch blocker = new CountDownLatch(1);
// Block the single thread
executor.submit(
() -> {
try {
blocker.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Fill queue
executor.submit(() -> {});
executor.submit(() -> {});
assertFalse(executor.hasCapacity(), "Should not have capacity when queue is full");
blocker.countDown();
// Wait for queue to drain
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(executor::hasCapacity);
assertTrue(executor.hasCapacity(), "Should have capacity after tasks complete");
}
@Test
void testActiveCountAndQueueDepth() throws InterruptedException {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(2);
config.setQueueSize(100);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
assertEquals(0, executor.getActiveCount(), "Initially no active threads");
assertEquals(0, executor.getQueueDepth(), "Initially empty queue");
CountDownLatch blocker = new CountDownLatch(1);
CountDownLatch tasksStarted = new CountDownLatch(2);
// Submit blocking tasks
for (int i = 0; i < 2; i++) {
executor.submit(
() -> {
tasksStarted.countDown();
try {
blocker.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
}
// Wait for tasks to start
tasksStarted.await(5, TimeUnit.SECONDS);
// Wait for threads to fully activate
Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> executor.getActiveCount() == 2);
assertEquals(2, executor.getActiveCount(), "Should have 2 active threads");
// Submit more tasks to queue
executor.submit(() -> {});
executor.submit(() -> {});
executor.submit(() -> {});
assertEquals(3, executor.getQueueDepth(), "Should have 3 queued tasks");
blocker.countDown();
// Wait for completion
Awaitility.await()
.atMost(5, TimeUnit.SECONDS)
.until(() -> executor.getActiveCount() == 0 && executor.getQueueDepth() == 0);
assertEquals(0, executor.getActiveCount(), "Should have no active threads after completion");
assertEquals(0, executor.getQueueDepth(), "Should have empty queue after completion");
}
@Test
void testSingletonBehavior() {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(5);
BulkExecutor.initialize(config);
BulkExecutor instance1 = BulkExecutor.getInstance();
BulkExecutor instance2 = BulkExecutor.getInstance();
assertSame(instance1, instance2, "Should return same singleton instance");
}
@Test
void testIsShutdown() {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(2);
config.setTimeoutSeconds(5);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
assertFalse(executor.isShutdown(), "Should not be shutdown initially");
executor.shutdown();
assertTrue(executor.isShutdown(), "Should be shutdown after shutdown() called");
}
@Test
void testSubmitRejectsAfterShutdown() {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(2);
config.setTimeoutSeconds(5);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
executor.shutdown();
assertThrows(
RejectedExecutionException.class,
() -> executor.submit(() -> {}),
"Should reject submit after shutdown");
}
@Test
void testSubmitWithFuture() throws Exception {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(2);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
AtomicBoolean taskExecuted = new AtomicBoolean(false);
Future<?> future =
executor.submitWithFuture(
() -> {
taskExecuted.set(true);
});
assertNotNull(future, "Should return a Future");
future.get(5, TimeUnit.SECONDS);
assertTrue(taskExecuted.get(), "Task should have executed");
assertTrue(future.isDone(), "Future should be done");
}
@Test
void testSubmitWithFutureRejectsAfterShutdown() {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(2);
config.setTimeoutSeconds(5);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
executor.shutdown();
assertThrows(
RejectedExecutionException.class,
() -> executor.submitWithFuture(() -> {}),
"Should reject submitWithFuture after shutdown");
}
@Test
void testFutureCancellation() throws Exception {
BulkOperationConfiguration config = new BulkOperationConfiguration();
config.setMaxThreads(1);
BulkExecutor.initialize(config);
BulkExecutor executor = BulkExecutor.getInstance();
CountDownLatch blocker = new CountDownLatch(1);
AtomicBoolean secondTaskRan = new AtomicBoolean(false);
// Block the single thread
executor.submitWithFuture(
() -> {
try {
blocker.await();
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
}
});
// Submit a cancellable task
Future<?> cancellableFuture =
executor.submitWithFuture(
() -> {
secondTaskRan.set(true);
});
// Cancel the queued task before it runs
boolean cancelled = cancellableFuture.cancel(false);
// Release the blocker
blocker.countDown();
// Wait a bit
Awaitility.await().atMost(2, TimeUnit.SECONDS).until(cancellableFuture::isDone);
assertTrue(
cancellableFuture.isCancelled() || cancelled,
"Future should be cancelled or able to cancel");
}
}

View file

@ -552,6 +552,486 @@ class RequestLatencyContextTest {
assertEquals("/api/v1/tables", MetricUtils.normalizeUri("/api/v1/tables?query=test&limit=10"));
}
@Test
void testContextPropagationToChildThreads() throws InterruptedException {
String endpoint = "/api/v1/tables/bulk";
// Start request in parent thread
RequestLatencyContext.startRequest(endpoint, "PUT");
simulateWork(10);
// Get context for propagation
RequestLatencyContext.RequestContext parentContext = RequestLatencyContext.getContext();
assertNotNull(parentContext, "Parent context should exist");
// Simulate bulk operation with child threads
Thread childThread1 =
new Thread(
() -> {
RequestLatencyContext.setContext(parentContext);
try {
Timer.Sample dbSample = RequestLatencyContext.startDatabaseOperation();
simulateWork(50);
RequestLatencyContext.endDatabaseOperation(dbSample);
} finally {
RequestLatencyContext.clearContext();
}
});
Thread childThread2 =
new Thread(
() -> {
RequestLatencyContext.setContext(parentContext);
try {
Timer.Sample dbSample = RequestLatencyContext.startDatabaseOperation();
simulateWork(75);
RequestLatencyContext.endDatabaseOperation(dbSample);
} finally {
RequestLatencyContext.clearContext();
}
});
childThread1.start();
childThread2.start();
childThread1.join();
childThread2.join();
simulateWork(10);
RequestLatencyContext.endRequest();
// Verify that DB time from both child threads was accumulated
String normalizedEndpoint = MetricUtils.normalizeUri(endpoint);
Timer dbTimer =
Metrics.globalRegistry
.find("request.latency.database")
.tag("endpoint", normalizedEndpoint)
.tag("method", "PUT")
.timer();
assertNotNull(dbTimer, "DB timer should exist");
double dbMs = dbTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS);
LOG.info(
"Total DB time from child threads: {}ms (expected ~125ms)", String.format("%.2f", dbMs));
// DB time should include operations from both child threads (50ms + 75ms = 125ms min)
assertTrue(
dbMs >= 100, "DB time should accumulate from child threads, expected ~125ms, got: " + dbMs);
}
@Test
void testContextGetSetClear() {
// Test getContext returns null when no context
assertNull(RequestLatencyContext.getContext(), "Should return null when no context");
// Start a request
RequestLatencyContext.startRequest("/api/v1/test", "GET");
// Get context should return non-null
RequestLatencyContext.RequestContext context = RequestLatencyContext.getContext();
assertNotNull(context, "Should return context after startRequest");
// Clear context
RequestLatencyContext.clearContext();
assertNull(RequestLatencyContext.getContext(), "Should return null after clearContext");
// Set context again
RequestLatencyContext.setContext(context);
assertNotNull(RequestLatencyContext.getContext(), "Should return context after setContext");
// Clean up
RequestLatencyContext.endRequest();
}
@Test
void testBulkOperationSimulation() throws InterruptedException {
// Simulates the actual bulk operation pattern used in EntityRepository
String endpoint = "/api/v1/tables/bulk";
int numEntities = 10;
int dbTimePerEntity = 20; // ms
RequestLatencyContext.startRequest(endpoint, "PUT");
simulateWork(5); // Initial processing
RequestLatencyContext.RequestContext parentContext = RequestLatencyContext.getContext();
// Simulate parallel entity processing like BOUNDED_BULK_EXECUTOR
java.util.List<Thread> threads = new java.util.ArrayList<>();
for (int i = 0; i < numEntities; i++) {
Thread thread =
new Thread(
() -> {
RequestLatencyContext.setContext(parentContext);
try {
// Each entity does a DB lookup and update
Timer.Sample dbSample1 = RequestLatencyContext.startDatabaseOperation();
simulateWork(dbTimePerEntity / 2); // findByName
RequestLatencyContext.endDatabaseOperation(dbSample1);
Timer.Sample dbSample2 = RequestLatencyContext.startDatabaseOperation();
simulateWork(dbTimePerEntity / 2); // update
RequestLatencyContext.endDatabaseOperation(dbSample2);
} finally {
RequestLatencyContext.clearContext();
}
});
threads.add(thread);
thread.start();
}
// Wait for all threads
for (Thread t : threads) {
t.join();
}
simulateWork(5); // Final processing
RequestLatencyContext.endRequest();
// Verify metrics
String normalizedEndpoint = MetricUtils.normalizeUri(endpoint);
Timer dbTimer =
Metrics.globalRegistry
.find("request.latency.database")
.tag("endpoint", normalizedEndpoint)
.tag("method", "PUT")
.timer();
Timer totalTimer =
Metrics.globalRegistry
.find("request.latency.total")
.tag("endpoint", normalizedEndpoint)
.tag("method", "PUT")
.timer();
assertNotNull(dbTimer, "DB timer should exist");
assertNotNull(totalTimer, "Total timer should exist");
double dbMs = dbTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS);
double totalMs = totalTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS);
// Expected: 10 entities * 20ms DB time = 200ms total DB time
// Due to parallelism, wall clock time is less, but accumulated DB time should be ~200ms
LOG.info(
"Bulk simulation: total={}ms, db={}ms, entities={}, expected db ~{}ms",
String.format("%.0f", totalMs),
String.format("%.0f", dbMs),
numEntities,
numEntities * dbTimePerEntity);
// DB time should be at least 80% of expected (allowing for timing variance)
double expectedDbTime = numEntities * dbTimePerEntity;
assertTrue(
dbMs >= expectedDbTime * 0.8,
String.format(
"DB time should be at least %.0fms (80%% of expected), got: %.0fms",
expectedDbTime * 0.8, dbMs));
// Verify operation count
var dbOperations =
Metrics.globalRegistry
.find("request.operations.database")
.tag("endpoint", normalizedEndpoint)
.tag("method", "PUT")
.summary();
assertNotNull(dbOperations, "Should have database operations summary");
assertEquals(
numEntities * 2,
dbOperations.totalAmount(),
"Should have " + (numEntities * 2) + " DB operations (2 per entity)");
}
@Test
void testConcurrentDbAndSearchOperations() throws InterruptedException {
String endpoint = "/api/v1/search/bulk";
RequestLatencyContext.startRequest(endpoint, "POST");
RequestLatencyContext.RequestContext parentContext = RequestLatencyContext.getContext();
// Thread 1: DB operations
Thread dbThread =
new Thread(
() -> {
RequestLatencyContext.setContext(parentContext);
try {
for (int i = 0; i < 3; i++) {
Timer.Sample sample = RequestLatencyContext.startDatabaseOperation();
simulateWork(30);
RequestLatencyContext.endDatabaseOperation(sample);
}
} finally {
RequestLatencyContext.clearContext();
}
});
// Thread 2: Search operations
Thread searchThread =
new Thread(
() -> {
RequestLatencyContext.setContext(parentContext);
try {
for (int i = 0; i < 2; i++) {
Timer.Sample sample = RequestLatencyContext.startSearchOperation();
simulateWork(50);
RequestLatencyContext.endSearchOperation(sample);
}
} finally {
RequestLatencyContext.clearContext();
}
});
dbThread.start();
searchThread.start();
dbThread.join();
searchThread.join();
RequestLatencyContext.endRequest();
String normalizedEndpoint = MetricUtils.normalizeUri(endpoint);
// Verify DB metrics
Timer dbTimer =
Metrics.globalRegistry
.find("request.latency.database")
.tag("endpoint", normalizedEndpoint)
.tag("method", "POST")
.timer();
assertNotNull(dbTimer, "DB timer should exist");
double dbMs = dbTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS);
assertTrue(dbMs >= 70, "DB time should be at least 70ms (3 * 30ms * 0.8), got: " + dbMs);
// Verify Search metrics
Timer searchTimer =
Metrics.globalRegistry
.find("request.latency.search")
.tag("endpoint", normalizedEndpoint)
.tag("method", "POST")
.timer();
assertNotNull(searchTimer, "Search timer should exist");
double searchMs = searchTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS);
assertTrue(
searchMs >= 80, "Search time should be at least 80ms (2 * 50ms * 0.8), got: " + searchMs);
// Verify operation counts
var dbOps =
Metrics.globalRegistry
.find("request.operations.database")
.tag("endpoint", normalizedEndpoint)
.tag("method", "POST")
.summary();
var searchOps =
Metrics.globalRegistry
.find("request.operations.search")
.tag("endpoint", normalizedEndpoint)
.tag("method", "POST")
.summary();
assertEquals(3, dbOps.totalAmount(), "Should have 3 DB operations");
assertEquals(2, searchOps.totalAmount(), "Should have 2 search operations");
LOG.info(
"Mixed operations: db={}ms (3 ops), search={}ms (2 ops)",
String.format("%.0f", dbMs),
String.format("%.0f", searchMs));
}
@Test
void testHighConcurrencyStressTest() throws InterruptedException {
// Stress test with many concurrent threads to catch race conditions
String endpoint = "/api/v1/stress/test";
int numThreads = 50;
int opsPerThread = 5;
RequestLatencyContext.startRequest(endpoint, "PUT");
RequestLatencyContext.RequestContext parentContext = RequestLatencyContext.getContext();
java.util.concurrent.CountDownLatch startLatch = new java.util.concurrent.CountDownLatch(1);
java.util.concurrent.CountDownLatch doneLatch =
new java.util.concurrent.CountDownLatch(numThreads);
for (int i = 0; i < numThreads; i++) {
new Thread(
() -> {
try {
startLatch.await(); // Wait for all threads to be ready
RequestLatencyContext.setContext(parentContext);
try {
for (int j = 0; j < opsPerThread; j++) {
Timer.Sample sample = RequestLatencyContext.startDatabaseOperation();
// Minimal work to focus on concurrency
Thread.sleep(1);
RequestLatencyContext.endDatabaseOperation(sample);
}
} finally {
RequestLatencyContext.clearContext();
}
} catch (InterruptedException e) {
Thread.currentThread().interrupt();
} finally {
doneLatch.countDown();
}
})
.start();
}
// Start all threads simultaneously
startLatch.countDown();
// Wait for completion
doneLatch.await();
RequestLatencyContext.endRequest();
// Verify operation count is correct despite high concurrency
String normalizedEndpoint = MetricUtils.normalizeUri(endpoint);
var dbOps =
Metrics.globalRegistry
.find("request.operations.database")
.tag("endpoint", normalizedEndpoint)
.tag("method", "PUT")
.summary();
assertNotNull(dbOps, "DB operations summary should exist");
int expectedOps = numThreads * opsPerThread;
assertEquals(
expectedOps,
dbOps.totalAmount(),
String.format(
"Should have exactly %d DB operations (%d threads * %d ops), got: %.0f",
expectedOps, numThreads, opsPerThread, dbOps.totalAmount()));
LOG.info(
"Stress test passed: {} threads, {} ops each, total ops recorded: {}",
numThreads,
opsPerThread,
(int) dbOps.totalAmount());
}
@Test
void testTimingAccuracyWithKnownDurations() throws InterruptedException {
// Test that timing measurements are reasonably accurate
String endpoint = "/api/v1/timing/test";
long expectedDbTime = 100; // ms
long expectedSearchTime = 75; // ms
long tolerance = 30; // Allow 30ms variance for CI environments
RequestLatencyContext.startRequest(endpoint, "GET");
Timer.Sample dbSample = RequestLatencyContext.startDatabaseOperation();
Thread.sleep(expectedDbTime);
RequestLatencyContext.endDatabaseOperation(dbSample);
Timer.Sample searchSample = RequestLatencyContext.startSearchOperation();
Thread.sleep(expectedSearchTime);
RequestLatencyContext.endSearchOperation(searchSample);
RequestLatencyContext.endRequest();
String normalizedEndpoint = MetricUtils.normalizeUri(endpoint);
Timer dbTimer =
Metrics.globalRegistry
.find("request.latency.database")
.tag("endpoint", normalizedEndpoint)
.tag("method", "GET")
.timer();
Timer searchTimer =
Metrics.globalRegistry
.find("request.latency.search")
.tag("endpoint", normalizedEndpoint)
.tag("method", "GET")
.timer();
double actualDbMs = dbTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS);
double actualSearchMs = searchTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS);
LOG.info(
"Timing accuracy: expected db={}ms got={}ms, expected search={}ms got={}ms",
expectedDbTime,
String.format("%.0f", actualDbMs),
expectedSearchTime,
String.format("%.0f", actualSearchMs));
assertTrue(
Math.abs(actualDbMs - expectedDbTime) <= tolerance,
String.format(
"DB time should be within %dms of %dms, got: %.0fms",
tolerance, expectedDbTime, actualDbMs));
assertTrue(
Math.abs(actualSearchMs - expectedSearchTime) <= tolerance,
String.format(
"Search time should be within %dms of %dms, got: %.0fms",
tolerance, expectedSearchTime, actualSearchMs));
}
@Test
void testPercentageCalculation() {
String endpoint = "/api/v1/percentage/test";
RequestLatencyContext.startRequest(endpoint, "GET");
// 100ms DB, 100ms search, ~100ms internal = ~33% each
simulateWork(50); // Internal
Timer.Sample dbSample = RequestLatencyContext.startDatabaseOperation();
simulateWork(100);
RequestLatencyContext.endDatabaseOperation(dbSample);
simulateWork(50); // Internal
Timer.Sample searchSample = RequestLatencyContext.startSearchOperation();
simulateWork(100);
RequestLatencyContext.endSearchOperation(searchSample);
RequestLatencyContext.endRequest();
String normalizedEndpoint = MetricUtils.normalizeUri(endpoint);
Gauge dbPercent =
Metrics.globalRegistry
.find("request.percentage.database")
.tag("endpoint", normalizedEndpoint)
.tag("method", "GET")
.gauge();
Gauge searchPercent =
Metrics.globalRegistry
.find("request.percentage.search")
.tag("endpoint", normalizedEndpoint)
.tag("method", "GET")
.gauge();
Gauge internalPercent =
Metrics.globalRegistry
.find("request.percentage.internal")
.tag("endpoint", normalizedEndpoint)
.tag("method", "GET")
.gauge();
assertNotNull(dbPercent, "DB percentage gauge should exist");
assertNotNull(searchPercent, "Search percentage gauge should exist");
assertNotNull(internalPercent, "Internal percentage gauge should exist");
double db = dbPercent.value();
double search = searchPercent.value();
double internal = internalPercent.value();
double total = db + search + internal;
LOG.info(
"Percentages: db={}%, search={}%, internal={}%, total={}%",
String.format("%.1f", db),
String.format("%.1f", search),
String.format("%.1f", internal),
String.format("%.1f", total));
// Total should be ~100%
assertTrue(
total >= 95 && total <= 105,
String.format("Total percentage should be ~100%%, got: %.1f%%", total));
// Each component should be roughly 25-40% given the timing
assertTrue(db >= 20 && db <= 45, String.format("DB should be 20-45%%, got: %.1f%%", db));
assertTrue(
search >= 20 && search <= 45,
String.format("Search should be 20-45%%, got: %.1f%%", search));
}
private void printDetailedMetrics(String endpoint) {
LOG.info("\n=== Detailed Metrics for {} ===", endpoint);

View file

@ -2882,8 +2882,8 @@ public class UserResourceTest extends EntityResourceTest<User, CreateUser> {
long minCacheHitTime = cacheHitTimes.stream().mapToLong(Long::longValue).min().orElse(0);
LOG.info(
"Cache HIT times - Avg: {:.2f}ms, Min: {}ms, Max: {}ms",
avgCacheHitTime,
"Cache HIT times - Avg: {}ms, Min: {}ms, Max: {}ms",
String.format("%.2f", avgCacheHitTime),
minCacheHitTime,
maxCacheHitTime);
@ -2891,8 +2891,10 @@ public class UserResourceTest extends EntityResourceTest<User, CreateUser> {
double performanceImprovement =
((double) cacheMissTime - avgCacheHitTime) / cacheMissTime * 100;
LOG.info(
"Performance improvement: {:.1f}% ({}ms → {:.1f}ms)",
performanceImprovement, cacheMissTime, avgCacheHitTime);
"Performance improvement: {}% ({}ms → {}ms)",
String.format("%.1f", performanceImprovement),
cacheMissTime,
String.format("%.1f", avgCacheHitTime));
// Assert significant performance improvement
assertTrue(
@ -2964,8 +2966,8 @@ public class UserResourceTest extends EntityResourceTest<User, CreateUser> {
LOG.info("Concurrent test results:");
LOG.info(" Total calls: {} across {} threads", totalCalls, threadCount);
LOG.info(" Total time: {}ms", totalConcurrentTime);
LOG.info(" Calls per second: {:.1f}", callsPerSecond);
LOG.info(" Avg concurrent call time: {:.2f}ms", avgConcurrentTime);
LOG.info(" Calls per second: {}", String.format("%.1f", callsPerSecond));
LOG.info(" Avg concurrent call time: {}ms", String.format("%.2f", avgConcurrentTime));
// Performance assertions for concurrent access
assertTrue(

View file

@ -247,8 +247,10 @@ class CachedPermissionPerformanceTest extends OpenMetadataApplicationTest {
double improvement = (cacheMissTime - cacheHitTime) / (double) cacheMissTime * 100;
LOG.info(
"Performance results - Cache miss: {:.2f}ms, Cache hit: {:.2f}ms, Improvement: {:.1f}%",
avgMissMs, avgHitMs, improvement);
"Performance results - Cache miss: {}ms, Cache hit: {}ms, Improvement: {}%",
String.format("%.2f", avgMissMs),
String.format("%.2f", avgHitMs),
String.format("%.1f", improvement));
// Cache should be at least 50% faster
assertTrue(

View file

@ -0,0 +1,48 @@
/*
* Copyright 2021 Collate
* Licensed under the Apache License, Version 2.0 (the "License");
* you may not use this file except in compliance with the License.
* You may obtain a copy of the License at
* http://www.apache.org/licenses/LICENSE-2.0
* Unless required by applicable law or agreed to in writing, software
* distributed under the License is distributed on an "AS IS" BASIS,
* WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
* See the License for the specific language governing permissions and
* limitations under the License.
*/
package org.openmetadata.service.util.jdbi;
import static org.junit.jupiter.api.Assertions.*;
import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
class OMSqlLoggerTest {
private long originalThreshold;
@BeforeEach
void setUp() {
originalThreshold = OMSqlLogger.getSlowQueryThresholdMs();
}
@AfterEach
void tearDown() {
OMSqlLogger.setSlowQueryThresholdMs(originalThreshold);
}
@Test
void testDefaultSlowQueryThreshold() {
assertEquals(100, OMSqlLogger.getSlowQueryThresholdMs());
}
@Test
void testSetSlowQueryThreshold() {
OMSqlLogger.setSlowQueryThresholdMs(500);
assertEquals(500, OMSqlLogger.getSlowQueryThresholdMs());
OMSqlLogger.setSlowQueryThresholdMs(50);
assertEquals(50, OMSqlLogger.getSlowQueryThresholdMs());
}
}