diff --git a/conf/openmetadata.yaml b/conf/openmetadata.yaml index b7905e7ce5a..7c453775a84 100644 --- a/conf/openmetadata.yaml +++ b/conf/openmetadata.yaml @@ -575,6 +575,17 @@ limits: className: ${LIMITS_CLASS_NAME:-"org.openmetadata.service.limits.DefaultLimits"} limitsConfigFile: ${LIMITS_CONFIG_FILE:-""} +# Bulk Operation Configuration +# Controls parallelism and resource usage for bulk API operations (e.g., bulk import/export) +# Uses a bounded thread pool to prevent connection pool exhaustion +bulkOperation: + # Max threads for bulk operations (recommendations: 2 vCore=5-8, 4 vCore=8-15, 8 vCore=15-25) + maxThreads: ${BULK_OPERATION_MAX_THREADS:-10} + # Max queued operations before rejection (returns 503) + queueSize: ${BULK_OPERATION_QUEUE_SIZE:-1000} + # Timeout in seconds for entire bulk operation + timeoutSeconds: ${BULK_OPERATION_TIMEOUT_SECONDS:-300} + web: uriPath: ${WEB_CONF_URI_PATH:-"/api"} hsts: diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java index 02b6131e29c..150e1da1b2d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/OpenMetadataApplication.java @@ -97,6 +97,7 @@ import org.openmetadata.service.exception.JsonMappingExceptionMapper; import org.openmetadata.service.exception.OMErrorPageHandler; import org.openmetadata.service.fernet.Fernet; import org.openmetadata.service.governance.workflows.WorkflowHandler; +import org.openmetadata.service.jdbi3.BulkExecutor; import org.openmetadata.service.jdbi3.CollectionDAO; import org.openmetadata.service.jdbi3.EntityRepository; import org.openmetadata.service.jdbi3.MigrationDAO; @@ -241,6 +242,9 @@ public class OpenMetadataApplication extends Application 0) { - LOG.info("Average throughput: {:.1f} entities/sec", currentThroughput); + LOG.info("Average throughput: {} entities/sec", String.format("%.1f", currentThroughput)); } } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/config/BulkOperationConfiguration.java b/openmetadata-service/src/main/java/org/openmetadata/service/config/BulkOperationConfiguration.java new file mode 100644 index 00000000000..73597f798a0 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/config/BulkOperationConfiguration.java @@ -0,0 +1,89 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.config; + +import com.fasterxml.jackson.annotation.JsonProperty; +import jakarta.validation.constraints.Max; +import jakarta.validation.constraints.Min; +import lombok.Getter; +import lombok.Setter; + +/** + * Configuration for bulk operations that controls parallelism and resource usage. + * + *

This uses a bounded thread pool to limit concurrent database operations, preventing bulk + * operations from exhausting the connection pool and starving regular API requests. + * + *

Environment variables (for Helm/Docker configuration): + * + *

+ * + *

Example YAML configuration: + * + *

+ * bulkOperation:
+ *   maxThreads: ${BULK_OPERATION_MAX_THREADS:-10}
+ *   queueSize: ${BULK_OPERATION_QUEUE_SIZE:-1000}
+ *   timeoutSeconds: ${BULK_OPERATION_TIMEOUT_SECONDS:-300}
+ * 
+ */ +@Getter +@Setter +public class BulkOperationConfiguration { + + /** + * Maximum number of threads for bulk operation processing. This directly controls how many + * concurrent database operations can occur during bulk processing. + * + *

Recommendations based on DB capacity: + * + *

+ * + *

Default: 10 (conservative, works for most deployments) + */ + @JsonProperty + @Min(1) + @Max(100) + private int maxThreads = 10; + + /** + * Maximum number of operations that can be queued waiting for a thread. When the queue is full, + * new bulk requests will be rejected with 503 Service Unavailable. + * + *

Default: 1000 (allows bursts while preventing memory exhaustion) + */ + @JsonProperty + @Min(100) + @Max(10000) + private int queueSize = 1000; + + /** + * Timeout in seconds for the entire bulk operation. If the operation doesn't complete within this + * time, it will be cancelled and return partial results. + * + *

Default: 300 seconds (5 minutes) + */ + @JsonProperty + @Min(30) + @Max(3600) + private int timeoutSeconds = 300; +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/BulkExecutor.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/BulkExecutor.java new file mode 100644 index 00000000000..3c16a606d99 --- /dev/null +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/BulkExecutor.java @@ -0,0 +1,223 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.jdbi3; + +import java.util.concurrent.ArrayBlockingQueue; +import java.util.concurrent.ExecutorService; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.ThreadPoolExecutor; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import lombok.Getter; +import lombok.extern.slf4j.Slf4j; +import org.openmetadata.service.config.BulkOperationConfiguration; + +/** + * Manages a bounded thread pool for bulk operations. This provides natural backpressure and + * prevents bulk operations from overwhelming the database connection pool. + * + *

Key design decisions: + * + *

+ */ +@Slf4j +public class BulkExecutor { + + private static volatile BulkExecutor instance; + private static final Object LOCK = new Object(); + private static final AtomicBoolean shutdownHookRegistered = new AtomicBoolean(false); + + @Getter private final ExecutorService executor; + @Getter private final int maxThreads; + @Getter private final int queueSize; + @Getter private final int timeoutSeconds; + private volatile boolean isShutdown = false; + + private BulkExecutor(BulkOperationConfiguration config) { + this.maxThreads = config.getMaxThreads(); + this.queueSize = config.getQueueSize(); + this.timeoutSeconds = config.getTimeoutSeconds(); + + // Create a bounded thread pool with a bounded queue + // When queue is full, new submissions will throw RejectedExecutionException + this.executor = + new ThreadPoolExecutor( + maxThreads, // core pool size + maxThreads, // max pool size (same as core for predictable behavior) + 60L, + TimeUnit.SECONDS, // idle thread timeout + new ArrayBlockingQueue<>(queueSize), // bounded queue + r -> { + Thread t = new Thread(r, "bulk-operation-worker"); + t.setDaemon(true); + return t; + }, + new ThreadPoolExecutor.AbortPolicy() // throw on rejection + ); + + LOG.info( + "BulkExecutor initialized: maxThreads={}, queueSize={}, timeoutSeconds={}", + maxThreads, + queueSize, + timeoutSeconds); + + registerShutdownHook(); + } + + private static void registerShutdownHook() { + if (shutdownHookRegistered.compareAndSet(false, true)) { + Runtime.getRuntime() + .addShutdownHook( + new Thread( + () -> { + LOG.info("JVM shutdown detected, shutting down BulkExecutor gracefully..."); + if (instance != null && !instance.isShutdown) { + instance.shutdown(); + } + }, + "bulk-executor-shutdown-hook")); + } + } + + /** Initialize with configuration. Should be called during application startup. */ + public static void initialize(BulkOperationConfiguration config) { + if (instance == null) { + synchronized (LOCK) { + if (instance == null) { + instance = new BulkExecutor(config); + } + } + } + } + + /** Get the singleton instance. Creates with defaults if not initialized. */ + public static BulkExecutor getInstance() { + if (instance == null) { + synchronized (LOCK) { + if (instance == null) { + LOG.warn("BulkExecutor not initialized, using defaults"); + instance = new BulkExecutor(new BulkOperationConfiguration()); + } + } + } + return instance; + } + + /** Reset the singleton (for testing). */ + public static void reset() { + synchronized (LOCK) { + if (instance != null) { + instance.isShutdown = true; + instance.executor.shutdownNow(); + instance = null; + } + } + } + + /** + * Check if the executor can accept more work. + * + * @return true if queue has capacity + */ + public boolean hasCapacity() { + if (executor instanceof ThreadPoolExecutor tpe) { + return tpe.getQueue().remainingCapacity() > 0; + } + return true; + } + + /** + * Get current queue depth. + * + * @return number of tasks waiting in queue + */ + public int getQueueDepth() { + if (executor instanceof ThreadPoolExecutor tpe) { + return tpe.getQueue().size(); + } + return 0; + } + + /** + * Get number of currently active threads. + * + * @return active thread count + */ + public int getActiveCount() { + if (executor instanceof ThreadPoolExecutor tpe) { + return tpe.getActiveCount(); + } + return 0; + } + + /** + * Submit a task for execution. Throws RejectedExecutionException if queue is full. + * + * @param task the task to execute + * @throws RejectedExecutionException if the queue is full + */ + public void submit(Runnable task) throws RejectedExecutionException { + if (isShutdown) { + throw new RejectedExecutionException("BulkExecutor is shut down"); + } + executor.execute(task); + } + + /** + * Submit a task for execution and return a Future that can be used to cancel the task. + * + * @param task the task to execute + * @return a Future representing the pending task + * @throws RejectedExecutionException if the queue is full + */ + public Future submitWithFuture(Runnable task) throws RejectedExecutionException { + if (isShutdown) { + throw new RejectedExecutionException("BulkExecutor is shut down"); + } + return executor.submit(task); + } + + /** Check if the executor has been shut down. */ + public boolean isShutdown() { + return isShutdown; + } + + /** Graceful shutdown with configurable timeout. */ + public void shutdown() { + if (isShutdown) { + return; + } + isShutdown = true; + LOG.info("Shutting down BulkExecutor, waiting for pending tasks to complete..."); + executor.shutdown(); + try { + if (!executor.awaitTermination(timeoutSeconds, TimeUnit.SECONDS)) { + LOG.warn( + "BulkExecutor did not terminate within {} seconds, forcing shutdown", timeoutSeconds); + executor.shutdownNow(); + } else { + LOG.info("BulkExecutor shut down gracefully"); + } + } catch (InterruptedException e) { + LOG.warn("BulkExecutor shutdown interrupted, forcing shutdown"); + executor.shutdownNow(); + Thread.currentThread().interrupt(); + } + } +} diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java index 1e725f09aea..35c4788b724 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/jdbi3/EntityRepository.java @@ -93,6 +93,9 @@ import com.google.common.util.concurrent.UncheckedExecutionException; import com.google.gson.Gson; import com.networknt.schema.Error; import com.networknt.schema.Schema; +import io.micrometer.core.instrument.DistributionSummary; +import io.micrometer.core.instrument.Metrics; +import io.micrometer.core.instrument.Timer; import jakarta.json.JsonPatch; import jakarta.validation.ConstraintViolationException; import jakarta.validation.constraints.NotNull; @@ -126,8 +129,6 @@ import java.util.UUID; import java.util.concurrent.CompletableFuture; import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.ExecutionException; -import java.util.concurrent.ExecutorService; -import java.util.concurrent.Executors; import java.util.concurrent.TimeUnit; import java.util.function.BiConsumer; import java.util.function.BiPredicate; @@ -208,6 +209,7 @@ import org.openmetadata.service.jdbi3.FeedRepository.TaskWorkflow; import org.openmetadata.service.jdbi3.FeedRepository.ThreadContext; import org.openmetadata.service.jobs.JobDAO; import org.openmetadata.service.lock.HierarchicalLockManager; +import org.openmetadata.service.monitoring.RequestLatencyContext; import org.openmetadata.service.rdf.RdfUpdater; import org.openmetadata.service.resources.tags.TagLabelUtil; import org.openmetadata.service.resources.teams.RoleResource; @@ -6700,15 +6702,21 @@ public abstract class EntityRepository { return findByNameOrNull(entity.getFullyQualifiedName(), Include.ALL) != null; } - private static final ExecutorService BULK_PROCESSING_EXECUTOR = - Executors.newVirtualThreadPerTaskExecutor(); - - private static final ExecutorService BOUNDED_BULK_EXECUTOR = - Executors.newFixedThreadPool(20, Thread.ofVirtual().factory()); - private static final ConcurrentHashMap> BULK_JOBS = new ConcurrentHashMap<>(); + // Cached metrics to avoid Timer.builder overhead on every call + private static final ConcurrentHashMap ENTITY_LATENCY_TIMERS = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap ENTITY_QUEUE_WAIT_TIMERS = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap BULK_OPERATION_TIMERS = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap BATCH_SIZE_SUMMARIES = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap SUCCESS_RATE_SUMMARIES = + new ConcurrentHashMap<>(); + public CompletableFuture submitAsyncBulkOperation( UriInfo uriInfo, List entities, String userName) { @@ -6716,6 +6724,7 @@ public abstract class EntityRepository { LOG.info( "Submitting async bulk operation with jobId: {} for {} entities", jobId, entities.size()); + // Use BulkExecutor for async operations too CompletableFuture job = CompletableFuture.supplyAsync( () -> { @@ -6730,7 +6739,7 @@ public abstract class EntityRepository { return errorResult; } }, - BULK_PROCESSING_EXECUTOR); + BulkExecutor.getInstance().getExecutor()); BULK_JOBS.put(jobId, job); @@ -6752,14 +6761,24 @@ public abstract class EntityRepository { List successRequests = new ArrayList<>(); List failedRequests = new ArrayList<>(); + long bulkStartTime = System.nanoTime(); + List entityLatenciesNanos = new ArrayList<>(); + for (T entity : entities) { + long entityStartTime = System.nanoTime(); try { createOrUpdate(uriInfo, entity, userName); + long entityDuration = System.nanoTime() - entityStartTime; + entityLatenciesNanos.add(entityDuration); + recordEntityMetrics(entityType, entityDuration, 0, true); successRequests.add( new BulkResponse() .withRequest(entity.getFullyQualifiedName()) .withStatus(Status.OK.getStatusCode())); } catch (Exception e) { + long entityDuration = System.nanoTime() - entityStartTime; + entityLatenciesNanos.add(entityDuration); + recordEntityMetrics(entityType, entityDuration, 0, false); LOG.warn("Failed to process entity in bulk operation", e); failedRequests.add( new BulkResponse() @@ -6769,21 +6788,44 @@ public abstract class EntityRepository { } } + long totalDurationNanos = System.nanoTime() - bulkStartTime; + result.setNumberOfRowsProcessed(entities.size()); result.setNumberOfRowsPassed(successRequests.size()); result.setNumberOfRowsFailed(failedRequests.size()); result.setSuccessRequest(successRequests); result.setFailedRequest(failedRequests); - if (failedRequests.size() > 0) { - result.setStatus(ApiStatus.PARTIAL_SUCCESS); + if (!failedRequests.isEmpty()) { + result.setStatus(successRequests.isEmpty() ? ApiStatus.FAILURE : ApiStatus.PARTIAL_SUCCESS); } + // Calculate metrics + long avgEntityLatencyMs = 0; + long maxEntityLatencyMs = 0; + if (!entityLatenciesNanos.isEmpty()) { + avgEntityLatencyMs = + entityLatenciesNanos.stream().mapToLong(Long::longValue).sum() + / entityLatenciesNanos.size() + / 1_000_000; + maxEntityLatencyMs = + entityLatenciesNanos.stream().mapToLong(Long::longValue).max().orElse(0) / 1_000_000; + } + + recordBulkMetrics( + entityType, + entities.size(), + successRequests.size(), + totalDurationNanos, + avgEntityLatencyMs, + maxEntityLatencyMs); + LOG.info( - "Async bulk operation completed: {} succeeded, {} failed out of {} total", + "Async bulk operation completed: {} succeeded, {} failed out of {} total, took {}ms", successRequests.size(), failedRequests.size(), - entities.size()); + entities.size(), + totalDurationNanos / 1_000_000); return result; } @@ -6871,35 +6913,142 @@ public abstract class EntityRepository { List successRequests = Collections.synchronizedList(new ArrayList<>()); List failedRequests = Collections.synchronizedList(new ArrayList<>()); + BulkExecutor bulkExecutor = BulkExecutor.getInstance(); + + // Track overall wall-clock time + long bulkStartTime = System.nanoTime(); + + // Check if system can accept more work + if (!bulkExecutor.hasCapacity()) { + LOG.warn( + "Bulk operation rejected: queue full (depth={}, max={})", + bulkExecutor.getQueueDepth(), + bulkExecutor.getQueueSize()); + result.setStatus(ApiStatus.FAILURE); + result.setNumberOfRowsProcessed(0); + result.setNumberOfRowsFailed(entities.size()); + for (T entity : entities) { + failedRequests.add( + new BulkResponse() + .withRequest(entity.getFullyQualifiedName()) + .withStatus(Status.SERVICE_UNAVAILABLE.getStatusCode()) + .withMessage("System overloaded, please retry later")); + } + result.setFailedRequest(failedRequests); + recordBulkMetrics(entityType, entities.size(), 0, System.nanoTime() - bulkStartTime, 0, 0); + return result; + } + + LOG.info( + "Starting bulk operation for {} {} entities (active={}, queued={})", + entities.size(), + entityType, + bulkExecutor.getActiveCount(), + bulkExecutor.getQueueDepth()); + List> futures = new ArrayList<>(); + // Track per-entity latencies for accurate metrics + List entityLatenciesNanos = Collections.synchronizedList(new ArrayList<>()); + + // Capture parent thread's latency context for propagation to worker threads + final RequestLatencyContext.RequestContext parentLatencyContext = + RequestLatencyContext.getContext(); + for (T entity : entities) { - CompletableFuture future = - CompletableFuture.runAsync( - () -> { + CompletableFuture future = new CompletableFuture<>(); + futures.add(future); + + final long submitTime = System.nanoTime(); + + try { + bulkExecutor.submit( + () -> { + // Propagate latency context to worker thread for accurate DB/Search tracking + if (parentLatencyContext != null) { + RequestLatencyContext.setContext(parentLatencyContext); + } + try { + long entityStartTime = System.nanoTime(); + long queueWaitTime = entityStartTime - submitTime; try { PutResponse putResponse = bulkCreateOrUpdateEntity(uriInfo, entity, userName); + long entityDuration = System.nanoTime() - entityStartTime; + entityLatenciesNanos.add(entityDuration); + successRequests.add( new BulkResponse() .withRequest(entity.getFullyQualifiedName()) .withStatus(Status.OK.getStatusCode())); createChangeEventForBulkOperation( putResponse.getEntity(), putResponse.getChangeType(), userName); - } catch (Exception e) { - LOG.warn("Failed to process entity in bulk operation", e); - failedRequests.add( - new BulkResponse() - .withRequest(entity.getFullyQualifiedName()) - .withStatus(Status.BAD_REQUEST.getStatusCode()) - .withMessage(e.getMessage())); - } - }, - BOUNDED_BULK_EXECUTOR); - futures.add(future); + // Record per-entity metrics + recordEntityMetrics(entityType, entityDuration, queueWaitTime, true); + future.complete(null); + } catch (Exception e) { + long entityDuration = System.nanoTime() - entityStartTime; + entityLatenciesNanos.add(entityDuration); + recordEntityMetrics(entityType, entityDuration, queueWaitTime, false); + handleBulkOperationError(entity, e, failedRequests); + future.complete(null); // Complete even on error so we don't hang + } + } finally { + // Clear context from worker thread to prevent memory leaks in pooled threads + if (parentLatencyContext != null) { + RequestLatencyContext.clearContext(); + } + } + }); + } catch (java.util.concurrent.RejectedExecutionException e) { + // Queue became full between check and submit + LOG.warn("Task rejected for entity: {}", entity.getFullyQualifiedName()); + failedRequests.add( + new BulkResponse() + .withRequest(entity.getFullyQualifiedName()) + .withStatus(Status.SERVICE_UNAVAILABLE.getStatusCode()) + .withMessage("System overloaded, please retry later")); + future.complete(null); + } } - CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])).join(); + // Wait with timeout + boolean timedOut = false; + try { + CompletableFuture.allOf(futures.toArray(new CompletableFuture[0])) + .get(bulkExecutor.getTimeoutSeconds(), TimeUnit.SECONDS); + } catch (java.util.concurrent.TimeoutException e) { + timedOut = true; + LOG.error( + "Bulk operation timed out after {}s. Completed: {}, Failed: {}, Total: {}", + bulkExecutor.getTimeoutSeconds(), + successRequests.size(), + failedRequests.size(), + entities.size()); + + // Check each future to find which entities actually timed out + // futures[i] corresponds to entities[i] + for (int i = 0; i < futures.size(); i++) { + CompletableFuture future = futures.get(i); + if (!future.isDone()) { + T entity = entities.get(i); + failedRequests.add( + new BulkResponse() + .withRequest(entity.getFullyQualifiedName()) + .withStatus(Status.REQUEST_TIMEOUT.getStatusCode()) + .withMessage("Operation timed out")); + // Cancel the future to signal we're no longer interested + future.cancel(false); + } + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + LOG.warn("Bulk operation interrupted"); + } catch (ExecutionException e) { + LOG.error("Unexpected error in bulk operation", e.getCause()); + } + + long totalDurationNanos = System.nanoTime() - bulkStartTime; result.setNumberOfRowsProcessed(entities.size()); result.setNumberOfRowsPassed(successRequests.size()); @@ -6907,16 +7056,148 @@ public abstract class EntityRepository { result.setSuccessRequest(successRequests); result.setFailedRequest(failedRequests); - if (failedRequests.size() > 0) { - result.setStatus(ApiStatus.PARTIAL_SUCCESS); + if (!failedRequests.isEmpty()) { + result.setStatus(successRequests.isEmpty() ? ApiStatus.FAILURE : ApiStatus.PARTIAL_SUCCESS); } + // Calculate and log detailed metrics + long avgEntityLatencyMs = 0; + long maxEntityLatencyMs = 0; + if (!entityLatenciesNanos.isEmpty()) { + avgEntityLatencyMs = + entityLatenciesNanos.stream().mapToLong(Long::longValue).sum() + / entityLatenciesNanos.size() + / 1_000_000; + maxEntityLatencyMs = + entityLatenciesNanos.stream().mapToLong(Long::longValue).max().orElse(0) / 1_000_000; + } + + long totalDurationMs = totalDurationNanos / 1_000_000; + double throughput = entities.size() * 1000.0 / Math.max(1, totalDurationMs); + + // Record bulk operation metrics + recordBulkMetrics( + entityType, + entities.size(), + successRequests.size(), + totalDurationNanos, + avgEntityLatencyMs, + maxEntityLatencyMs); + LOG.info( - "Bulk operation completed: {} succeeded, {} failed out of {} total", + "Bulk operation completed: entity={}, total={}, succeeded={}, failed={}, " + + "wallClockMs={}, avgEntityMs={}, maxEntityMs={}, throughput={}/s", + entityType, + entities.size(), successRequests.size(), failedRequests.size(), - entities.size()); + totalDurationMs, + avgEntityLatencyMs, + maxEntityLatencyMs, + String.format("%.1f", throughput)); return result; } + + private void recordEntityMetrics( + String entityType, long durationNanos, long queueWaitNanos, boolean success) { + // Per-entity processing time (cached, no histogram to reduce Prometheus cardinality) + // This fires for EVERY entity in a bulk operation, so we use simple timers. + // The bulk.operation.latency metric has histograms for percentile analysis. + String latencyKey = entityType + "|" + success; + Timer latencyTimer = + ENTITY_LATENCY_TIMERS.computeIfAbsent( + latencyKey, + k -> + Timer.builder("bulk.entity.latency") + .tag("entity", entityType) + .tag("success", String.valueOf(success)) + .register(Metrics.globalRegistry)); + latencyTimer.record(durationNanos, TimeUnit.NANOSECONDS); + + // Queue wait time (cached, simple timer) + Timer queueTimer = + ENTITY_QUEUE_WAIT_TIMERS.computeIfAbsent( + entityType, + k -> + Timer.builder("bulk.entity.queue_wait") + .tag("entity", entityType) + .register(Metrics.globalRegistry)); + queueTimer.record(queueWaitNanos, TimeUnit.NANOSECONDS); + } + + private void recordBulkMetrics( + String entityType, + int totalEntities, + int successCount, + long durationNanos, + long avgEntityMs, + long maxEntityMs) { + // Total bulk operation time (cached) + Timer operationTimer = + BULK_OPERATION_TIMERS.computeIfAbsent( + entityType, + k -> + Timer.builder("bulk.operation.latency") + .tag("entity", entityType) + .publishPercentileHistogram(true) + .register(Metrics.globalRegistry)); + operationTimer.record(durationNanos, TimeUnit.NANOSECONDS); + + // Batch size distribution (cached) + DistributionSummary batchSizeSummary = + BATCH_SIZE_SUMMARIES.computeIfAbsent( + entityType, + k -> + DistributionSummary.builder("bulk.operation.batch_size") + .tag("entity", entityType) + .register(Metrics.globalRegistry)); + batchSizeSummary.record(totalEntities); + + // Success rate as distribution (cached, avoids gauge memory leak) + if (totalEntities > 0) { + DistributionSummary successRateSummary = + SUCCESS_RATE_SUMMARIES.computeIfAbsent( + entityType, + k -> + DistributionSummary.builder("bulk.operation.success_rate") + .tag("entity", entityType) + .register(Metrics.globalRegistry)); + successRateSummary.record(successCount * 100.0 / totalEntities); + } + + // Record success and failure counts for alerting (Micrometer caches counters internally) + Metrics.counter("bulk.operation.entities.success", "entity", entityType) + .increment(successCount); + Metrics.counter("bulk.operation.entities.failed", "entity", entityType) + .increment(totalEntities - successCount); + } + + private void handleBulkOperationError(T entity, Exception e, List failedRequests) { + String fqn = entity.getFullyQualifiedName(); + int statusCode; + String message; + + // Categorize errors properly + if (e instanceof jakarta.ws.rs.WebApplicationException wae) { + statusCode = wae.getResponse().getStatus(); + message = e.getMessage(); + LOG.warn("Entity {} failed with status {}: {}", fqn, statusCode, message); + } else if (e instanceof java.sql.SQLException) { + statusCode = Status.INTERNAL_SERVER_ERROR.getStatusCode(); + message = "Database error: " + e.getMessage(); + LOG.error("Database error processing entity {}", fqn, e); + } else if (e instanceof IllegalArgumentException || e instanceof IllegalStateException) { + statusCode = Status.BAD_REQUEST.getStatusCode(); + message = e.getMessage(); + LOG.warn("Validation error for entity {}: {}", fqn, message); + } else { + statusCode = Status.INTERNAL_SERVER_ERROR.getStatusCode(); + message = "Unexpected error: " + e.getMessage(); + LOG.error("Unexpected error processing entity {}", fqn, e); + } + + failedRequests.add( + new BulkResponse().withRequest(fqn).withStatus(statusCode).withMessage(message)); + } } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/RequestLatencyContext.java b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/RequestLatencyContext.java index 43827fb3b41..c3af46813fa 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/RequestLatencyContext.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/monitoring/RequestLatencyContext.java @@ -7,13 +7,30 @@ import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import java.time.Duration; import java.util.concurrent.ConcurrentHashMap; +import java.util.concurrent.atomic.AtomicInteger; +import java.util.concurrent.atomic.AtomicLong; import java.util.concurrent.atomic.AtomicReference; import lombok.Getter; import lombok.extern.slf4j.Slf4j; /** - * Thread-local context for tracking request latencies using Micrometer. - * This provides accurate latency measurements including percentiles. + * Thread-local context for tracking request latencies using Micrometer. This provides accurate + * latency measurements including percentiles. + * + *

Thread-Safety and Multi-threaded Requests: + * + *

This context can be shared across multiple worker threads using {@link #setContext} for + * operations like bulk processing. When shared: + * + *

    + *
  • Database and search time tracking uses atomic operations and aggregates correctly + *
  • Operation counts (dbOperationCount, searchOperationCount) aggregate correctly + *
  • Internal time calculation may be inaccurate when multiple threads work concurrently, + * as the internal timer start point is shared. This is a known limitation. + *
+ * + *

For parallel operations, the "internal" percentage should be interpreted as approximate. The + * database and search percentages remain accurate as they track cumulative time across all threads. */ @Slf4j public class RequestLatencyContext { @@ -56,6 +73,9 @@ public class RequestLatencyContext { requestContext.set(context); String normalizedEndpoint = normalizeUri(endpoint); String timerKey = normalizedEndpoint + "|" + normalizedMethod; + // Total request timer with histogram for percentile analysis. + // Uses reduced SLO buckets (5 instead of 9) to minimize Prometheus cardinality. + // Histogram provides p50/p95/p99/p99.9 percentiles automatically. Timer timer = requestTimers.computeIfAbsent( timerKey, @@ -68,15 +88,11 @@ public class RequestLatencyContext { .minimumExpectedValue(Duration.ofMillis(1)) .maximumExpectedValue(Duration.ofSeconds(60)) .serviceLevelObjectives( - Duration.ofMillis(10), - Duration.ofMillis(50), - Duration.ofMillis(100), - Duration.ofMillis(200), - Duration.ofMillis(500), - Duration.ofSeconds(1), - Duration.ofSeconds(2), - Duration.ofSeconds(5), - Duration.ofSeconds(10)) + Duration.ofMillis(100), // Fast response + Duration.ofMillis(500), // Acceptable + Duration.ofSeconds(1), // Slow + Duration.ofSeconds(5), // Very slow + Duration.ofSeconds(10)) // Timeout threshold .register(Metrics.globalRegistry)); LOG.debug( "Created/retrieved timer for endpoint: {}, method: {}, timer: {}", @@ -84,7 +100,7 @@ public class RequestLatencyContext { normalizedMethod, timer); context.requestTimerSample = Timer.start(Metrics.globalRegistry); - context.internalTimerStartNanos = System.nanoTime(); + context.internalTimerStartNanos.set(System.nanoTime()); } /** @@ -96,12 +112,14 @@ public class RequestLatencyContext { return null; } - if (context.internalTimerStartNanos > 0) { - context.internalTime += System.nanoTime() - context.internalTimerStartNanos; - context.internalTimerStartNanos = 0; + // Atomically read and reset internalTimerStartNanos to prevent race conditions + // when multiple threads call this concurrently on the same context + long internalStart = context.internalTimerStartNanos.getAndSet(0); + if (internalStart > 0) { + context.internalTime.addAndGet(System.nanoTime() - internalStart); } - context.dbOperationCount++; + context.dbOperationCount.incrementAndGet(); return Timer.start(Metrics.globalRegistry); } @@ -115,9 +133,9 @@ public class RequestLatencyContext { // Use the shared dummy timer to measure elapsed time without recording long duration = timerSample.stop(DUMMY_TIMER); - context.dbTime += duration; + context.dbTime.addAndGet(duration); - context.internalTimerStartNanos = System.nanoTime(); + context.internalTimerStartNanos.set(System.nanoTime()); } public static Timer.Sample startSearchOperation() { @@ -125,12 +143,14 @@ public class RequestLatencyContext { if (context == null) { return null; } - if (context.internalTimerStartNanos > 0) { - context.internalTime += System.nanoTime() - context.internalTimerStartNanos; - context.internalTimerStartNanos = 0; + + // Atomically read and reset internalTimerStartNanos to prevent race conditions + long internalStart = context.internalTimerStartNanos.getAndSet(0); + if (internalStart > 0) { + context.internalTime.addAndGet(System.nanoTime() - internalStart); } - context.searchOperationCount++; + context.searchOperationCount.incrementAndGet(); return Timer.start(Metrics.globalRegistry); } @@ -144,10 +164,10 @@ public class RequestLatencyContext { // Use the shared dummy timer to measure elapsed time without recording long duration = timerSample.stop(DUMMY_TIMER); - context.searchTime += duration; + context.searchTime.addAndGet(duration); // Resume internal timer - context.internalTimerStartNanos = System.nanoTime(); + context.internalTimerStartNanos.set(System.nanoTime()); } public static void endRequest() { @@ -172,12 +192,22 @@ public class RequestLatencyContext { } } - if (context.internalTimerStartNanos > 0) { - context.internalTime += System.nanoTime() - context.internalTimerStartNanos; + long finalInternalStart = context.internalTimerStartNanos.get(); + if (finalInternalStart > 0) { + context.internalTime.addAndGet(System.nanoTime() - finalInternalStart); } - // Record per-request timers (not per-operation) - // This gives us the total DB time for THIS request + // Get final values from atomic fields + long dbTimeNanos = context.dbTime.get(); + long searchTimeNanos = context.searchTime.get(); + long internalTimeNanos = context.internalTime.get(); + int dbOps = context.dbOperationCount.get(); + int searchOps = context.searchOperationCount.get(); + + // Record per-request component timers (not per-operation) + // These use simple timers without histograms to reduce Prometheus cardinality. + // The total request timer (request.latency.total) has histograms for percentile analysis. + // Component timers provide mean/max/count which is sufficient for bottleneck identification. Timer dbTimer = databaseTimers.computeIfAbsent( timerKey, @@ -186,22 +216,9 @@ public class RequestLatencyContext { .tag(ENDPOINT, normalizedEndpoint) .tag(METHOD, context.method) .description("Total database latency per request") - .publishPercentileHistogram(true) - .minimumExpectedValue(Duration.ofMillis(1)) - .maximumExpectedValue(Duration.ofSeconds(30)) - .serviceLevelObjectives( - Duration.ofMillis(5), - Duration.ofMillis(10), - Duration.ofMillis(25), - Duration.ofMillis(50), - Duration.ofMillis(100), - Duration.ofMillis(250), - Duration.ofMillis(500), - Duration.ofSeconds(1), - Duration.ofSeconds(2)) .register(Metrics.globalRegistry)); - if (context.dbTime > 0) { - dbTimer.record(context.dbTime, java.util.concurrent.TimeUnit.NANOSECONDS); + if (dbTimeNanos > 0) { + dbTimer.record(dbTimeNanos, java.util.concurrent.TimeUnit.NANOSECONDS); } // Record total search time for THIS request @@ -213,22 +230,9 @@ public class RequestLatencyContext { .tag(ENDPOINT, normalizedEndpoint) .tag(METHOD, context.method) .description("Total search latency per request") - .publishPercentileHistogram(true) - .minimumExpectedValue(Duration.ofMillis(1)) - .maximumExpectedValue(Duration.ofSeconds(30)) - .serviceLevelObjectives( - Duration.ofMillis(5), - Duration.ofMillis(10), - Duration.ofMillis(25), - Duration.ofMillis(50), - Duration.ofMillis(100), - Duration.ofMillis(250), - Duration.ofMillis(500), - Duration.ofSeconds(1), - Duration.ofSeconds(2)) .register(Metrics.globalRegistry)); - if (context.searchTime > 0) { - searchTimer.record(context.searchTime, java.util.concurrent.TimeUnit.NANOSECONDS); + if (searchTimeNanos > 0) { + searchTimer.record(searchTimeNanos, java.util.concurrent.TimeUnit.NANOSECONDS); } // Record internal processing time for THIS request @@ -240,42 +244,29 @@ public class RequestLatencyContext { .tag(ENDPOINT, normalizedEndpoint) .tag(METHOD, context.method) .description("Internal processing latency per request") - .publishPercentileHistogram(true) - .minimumExpectedValue(Duration.ofMillis(1)) - .maximumExpectedValue(Duration.ofSeconds(10)) - .serviceLevelObjectives( - Duration.ofMillis(1), - Duration.ofMillis(5), - Duration.ofMillis(10), - Duration.ofMillis(25), - Duration.ofMillis(50), - Duration.ofMillis(100), - Duration.ofMillis(250), - Duration.ofMillis(500), - Duration.ofSeconds(1)) .register(Metrics.globalRegistry)); - if (context.internalTime > 0) { - internalTimer.record(context.internalTime, java.util.concurrent.TimeUnit.NANOSECONDS); + if (internalTimeNanos > 0) { + internalTimer.record(internalTimeNanos, java.util.concurrent.TimeUnit.NANOSECONDS); } // Record operation counts as distribution summaries to get avg/max/percentiles - if (context.dbOperationCount > 0) { + if (dbOps > 0) { Metrics.summary( "request.operations.database", ENDPOINT, normalizedEndpoint, METHOD, context.method) - .record(context.dbOperationCount); + .record(dbOps); } - if (context.searchOperationCount > 0) { + if (searchOps > 0) { Metrics.summary( "request.operations.search", ENDPOINT, normalizedEndpoint, METHOD, context.method) - .record(context.searchOperationCount); + .record(searchOps); } if (context.totalTime > 0) { long totalNanos = context.totalTime; - double dbPercent = (context.dbTime * 100.0) / totalNanos; - double searchPercent = (context.searchTime * 100.0) / totalNanos; - double internalPercent = (context.internalTime * 100.0) / totalNanos; + double dbPercent = (dbTimeNanos * 100.0) / totalNanos; + double searchPercent = (searchTimeNanos * 100.0) / totalNanos; + double internalPercent = (internalTimeNanos * 100.0) / totalNanos; // Get or create percentage holder for this endpoint and method PercentageHolder holder = @@ -315,15 +306,17 @@ public class RequestLatencyContext { // Log slow requests (over 1 second) if (context.totalTime > 1_000_000_000L) { LOG.warn( - "Slow request detected - endpoint: {}, total: {}ms, db: {}ms ({}%), search: {}ms ({}%), internal: {}ms ({}%)", + "Slow request detected - endpoint: {}, total: {}ms, db: {}ms ({}%), search: {}ms ({}%), internal: {}ms ({}%), dbOps: {}, searchOps: {}", context.endpoint, context.totalTime / 1_000_000, - context.dbTime / 1_000_000, - (context.dbTime * 100) / context.totalTime, - context.searchTime / 1_000_000, - (context.searchTime * 100) / context.totalTime, - context.internalTime / 1_000_000, - (context.internalTime * 100) / context.totalTime); + dbTimeNanos / 1_000_000, + (dbTimeNanos * 100) / context.totalTime, + searchTimeNanos / 1_000_000, + (searchTimeNanos * 100) / context.totalTime, + internalTimeNanos / 1_000_000, + (internalTimeNanos * 100) / context.totalTime, + dbOps, + searchOps); } } finally { @@ -331,20 +324,64 @@ public class RequestLatencyContext { } } + /** + * Get the current request context for propagation to child threads. This allows virtual threads + * or async tasks to share the same timing context as the parent request thread. + * + * @return the current RequestContext, or null if not in a request context + */ + public static RequestContext getContext() { + return requestContext.get(); + } + + /** + * Set the request context in the current thread. Used by child threads to inherit the parent's + * timing context for accurate metrics tracking across thread boundaries. + * + * @param context the RequestContext to set (obtained from parent thread via getContext()) + */ + public static void setContext(RequestContext context) { + if (context != null) { + requestContext.set(context); + } + } + + /** + * Clear the request context from the current thread. Should be called in finally blocks by child + * threads that received context via setContext() to prevent memory leaks in pooled threads. + */ + public static void clearContext() { + requestContext.remove(); + } + + /** + * Reset all static state. This is primarily for testing to ensure clean state between tests. This + * clears all timer maps so that new timers will be created and registered with the current + * registry. + */ + public static void reset() { + requestContext.remove(); + requestTimers.clear(); + databaseTimers.clear(); + searchTimers.clear(); + internalTimers.clear(); + percentageHolders.clear(); + } + @Getter - private static class RequestContext { + public static class RequestContext { final String endpoint; final String method; - Timer.Sample requestTimerSample; - long internalTimerStartNanos = 0; + volatile Timer.Sample requestTimerSample; + final AtomicLong internalTimerStartNanos = new AtomicLong(0); - long totalTime = 0; - long dbTime = 0; - long searchTime = 0; - long internalTime = 0; + volatile long totalTime = 0; + final AtomicLong dbTime = new AtomicLong(0); + final AtomicLong searchTime = new AtomicLong(0); + final AtomicLong internalTime = new AtomicLong(0); - int dbOperationCount = 0; - int searchOperationCount = 0; + final AtomicInteger dbOperationCount = new AtomicInteger(0); + final AtomicInteger searchOperationCount = new AtomicInteger(0); RequestContext(String endpoint, String method) { this.endpoint = endpoint; diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchAggregationManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchAggregationManager.java index e26712e3908..e124808a5f0 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchAggregationManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchAggregationManager.java @@ -12,6 +12,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchResponse; import es.co.elastic.clients.json.JsonData; import es.co.elastic.clients.json.JsonpMapper; import es.co.elastic.clients.util.NamedValue; +import io.micrometer.core.instrument.Timer; import jakarta.json.JsonObject; import jakarta.ws.rs.core.Response; import java.io.IOException; @@ -29,6 +30,7 @@ import org.openmetadata.schema.tests.DataQualityReport; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.sdk.exception.SearchException; import org.openmetadata.service.Entity; +import org.openmetadata.service.monitoring.RequestLatencyContext; import org.openmetadata.service.resources.settings.SettingsCache; import org.openmetadata.service.search.AggregationManagementClient; import org.openmetadata.service.search.SearchAggregation; @@ -177,8 +179,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli searchRequestBuilder.size(0); searchRequestBuilder.timeout("30s"); - SearchResponse searchResponse = - client.search(searchRequestBuilder.build(), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequestBuilder.build(), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String responseJson = serializeSearchResponse(searchResponse); return Response.status(Response.Status.OK).entity(responseJson).build(); @@ -230,7 +239,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli aggregations.size()); LOG.debug("Generic Aggregation - Full aggregations: {}", aggregations); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String response = serializeSearchResponse(searchResponse); LOG.info( @@ -327,7 +344,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli aggregations.size()); LOG.debug("Generic Aggregation with RBAC - Full aggregations: {}", aggregations); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String response = serializeSearchResponse(searchResponse); LOG.info( @@ -416,8 +441,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli searchRequestBuilder.size(0); searchRequestBuilder.timeout("30s"); - SearchResponse searchResponse = - client.search(searchRequestBuilder.build(), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequestBuilder.build(), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String response = serializeSearchResponse(searchResponse); JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject(); @@ -520,7 +552,15 @@ public class ElasticSearchAggregationManager implements AggregationManagementCli // Build and execute search SearchRequest searchRequest = requestBuilder.build(resolvedIndex); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } LOG.info("Entity type counts query for index '{}' (resolved: '{}')", index, resolvedIndex); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchDataInsightAggregatorManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchDataInsightAggregatorManager.java index 034caba1243..21fcc3dba7b 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchDataInsightAggregatorManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchDataInsightAggregatorManager.java @@ -14,6 +14,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchRequest; import es.co.elastic.clients.elasticsearch.core.SearchResponse; import es.co.elastic.clients.elasticsearch.indices.GetMappingResponse; import es.co.elastic.clients.json.JsonData; +import io.micrometer.core.instrument.Timer; import jakarta.ws.rs.core.Response; import java.io.IOException; import java.io.StringReader; @@ -35,6 +36,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface; import org.openmetadata.service.jdbi3.DataInsightChartRepository; import org.openmetadata.service.jdbi3.DataInsightSystemChartRepository; +import org.openmetadata.service.monitoring.RequestLatencyContext; import org.openmetadata.service.search.DataInsightAggregatorClient; import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchAggregatedUnusedAssetsCountAggregator; import org.openmetadata.service.search.elasticsearch.dataInsightAggregators.ElasticSearchAggregatedUnusedAssetsSizeAggregator; @@ -77,7 +79,15 @@ public class ElasticSearchDataInsightAggregatorManager implements DataInsightAgg new HashMap<>(); SearchRequest searchRequest = aggregator.prepareSearchRequest(diChart, start, end, formulas, metricFormulaHolder, live); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return aggregator.processSearchResponse( diChart, searchResponse, formulas, metricFormulaHolder); } @@ -146,7 +156,15 @@ public class ElasticSearchDataInsightAggregatorManager implements DataInsightAgg from, queryFilter, dataReportIndex); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return Response.status(OK) .entity(processDataInsightChartResult(searchResponse, dataInsightChartName)) .build(); @@ -160,7 +178,15 @@ public class ElasticSearchDataInsightAggregatorManager implements DataInsightAgg } SearchRequest searchRequest = QueryCostRecordsAggregator.getQueryCostRecords(serviceName); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return QueryCostRecordsAggregator.parseQueryCostResponse(searchResponse); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchSearchManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchSearchManager.java index f50859e44ae..d0a89d2b948 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchSearchManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/ElasticSearchSearchManager.java @@ -29,6 +29,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchResponse; import es.co.elastic.clients.elasticsearch.core.search.Hit; import es.co.elastic.clients.json.JsonData; import es.co.elastic.clients.json.JsonpMapper; +import io.micrometer.core.instrument.Timer; import jakarta.json.Json; import jakarta.json.JsonObject; import jakarta.json.JsonReader; @@ -150,7 +151,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { b.must( m -> m.term(t -> t.field("sourceUrl").value(sourceUrl)))))); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String responseJson = serializeSearchResponse(response); return Response.status(OK).entity(responseJson).build(); } @@ -177,7 +186,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { .filter( f -> f.term(t -> t.field("deleted").value(deleted)))))); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String responseJson = serializeSearchResponse(response); return Response.status(OK).entity(responseJson).build(); } @@ -302,7 +319,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { try { SearchRequest searchRequest = requestBuilder.build(index); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } List> results = new ArrayList<>(); if (response.hits().hits() != null) { @@ -435,7 +460,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { try { SearchRequest searchRequest = requestBuilder.build(index); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } List> results = new ArrayList<>(); Object[] lastHitSortValues = null; @@ -528,11 +561,13 @@ public class ElasticSearchSearchManager implements SearchManagementClient { addAggregationsToNLQQuery(requestBuilder, request.getIndex()); SearchRequest searchRequest = requestBuilder.build(request.getIndex()); - SearchResponse response = client.search(searchRequest, JsonData.class); - - // End search operation timing - if (searchTimerSample != null) { - RequestLatencyContext.endSearchOperation(searchTimerSample); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } } // Cache successful queries @@ -621,7 +656,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { // Build and execute search request SearchRequest searchRequest = requestBuilder.build(request.getIndex()); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String responseJson = serializeSearchResponse(response); LOG.debug("Direct query search completed successfully"); @@ -813,7 +856,16 @@ public class ElasticSearchSearchManager implements SearchManagementClient { Query boolQuery = Query.of(q -> q.bool(b -> b.must(mustQueries).filter(filterQueries))); - return client.search(s -> s.index(indexName).query(boolQuery).size(1000), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(s -> s.index(indexName).query(boolQuery).size(1000), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } + return response; } private void applySearchFilter(String filter, ElasticSearchRequestBuilder requestBuilder) @@ -1098,15 +1150,16 @@ public class ElasticSearchSearchManager implements SearchManagementClient { LOG.debug("Executing search on index: {}, query: {}", request.getIndex(), request.getQuery()); try { - io.micrometer.core.instrument.Timer.Sample searchTimerSample = - org.openmetadata.service.monitoring.RequestLatencyContext.startSearchOperation(); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); SearchRequest searchRequest = requestBuilder.build(request.getIndex()); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); - - if (searchTimerSample != null) { - org.openmetadata.service.monitoring.RequestLatencyContext.endSearchOperation( - searchTimerSample); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } } if (!Boolean.TRUE.equals(request.getIsHierarchy())) { @@ -1194,7 +1247,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { // Execute search to get aggregations for parent terms SearchRequest searchRequest = requestBuilder.build(request.getIndex()); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } if (searchResponse.aggregations() != null && searchResponse.aggregations().containsKey("fqnParts_agg")) { @@ -1396,7 +1457,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { addAggregationsToNLQQuery(requestBuilder, request.getIndex()); SearchRequest searchRequest = requestBuilder.build(request.getIndex()); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return Response.status(Response.Status.OK) .entity(serializeSearchResponse(searchResponse)) @@ -1431,7 +1500,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { .query(query) .size(1000)); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (Hit hit : searchResponse.hits().hits()) { if (hit.source() != null) { @@ -1478,7 +1555,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { .query(query) .size(1000)); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (Hit hit : searchResponse.hits().hits()) { if (hit.source() != null) { @@ -1581,7 +1666,15 @@ public class ElasticSearchSearchManager implements SearchManagementClient { .query(finalMainQuery) .size(1000)); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (Hit hit : searchResponse.hits().hits()) { if (hit.source() == null) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/EsUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/EsUtils.java index 971dd7f11a0..cb77c449419 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/EsUtils.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/elasticsearch/EsUtils.java @@ -23,6 +23,7 @@ import es.co.elastic.clients.elasticsearch.core.SearchRequest; import es.co.elastic.clients.elasticsearch.core.SearchResponse; import es.co.elastic.clients.elasticsearch.core.search.Hit; import es.co.elastic.clients.json.JsonData; +import io.micrometer.core.instrument.Timer; import java.io.IOException; import java.io.StringReader; import java.util.HashMap; @@ -37,6 +38,7 @@ import org.openmetadata.schema.api.lineage.LineageDirection; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.sdk.exception.SearchException; import org.openmetadata.service.Entity; +import org.openmetadata.service.monitoring.RequestLatencyContext; @Slf4j public class EsUtils { @@ -129,7 +131,14 @@ public class EsUtils { } } - return client.search(searchRequestBuilder.build(), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + try { + return client.search(searchRequestBuilder.build(), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } } public static Map searchEREntityByKey( @@ -183,7 +192,15 @@ public class EsUtils { null, null, fieldsToRemove); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (Hit hit : searchResponse.hits().hits()) { if (hit.source() != null) { @@ -298,7 +315,15 @@ public class EsUtils { null, null, fieldsToRemove); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (Hit hit : searchResponse.hits().hits()) { if (hit.source() != null) { @@ -376,7 +401,14 @@ public class EsUtils { // Apply query filter buildSearchSourceFilter(queryFilter, searchRequestBuilder); - return client.search(searchRequestBuilder.build(), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + try { + return client.search(searchRequestBuilder.build(), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } } private static Query buildBoolQueriesWithShould(Map> keysAndValues) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/lineage/GuavaLineageGraphCache.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/lineage/GuavaLineageGraphCache.java index b59943f272e..fe930a1dc60 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/lineage/GuavaLineageGraphCache.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/lineage/GuavaLineageGraphCache.java @@ -164,11 +164,11 @@ public class GuavaLineageGraphCache implements LineageGraphCache { public void logStats() { CacheStats stats = cache.stats(); LOG.info( - "Cache Statistics: size={}, hits={}, misses={}, hitRatio={:.2f}, evictions={}, loadTime={}ms", + "Cache Statistics: size={}, hits={}, misses={}, hitRatio={}, evictions={}, loadTime={}ms", cache.size(), stats.hitCount(), stats.missCount(), - getHitRatio(), + String.format("%.2f", getHitRatio()), stats.evictionCount(), stats.totalLoadTime() / 1_000_000); // Convert nanos to millis } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchAggregationManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchAggregationManager.java index fec7b94e5ab..8d68b15cd0c 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchAggregationManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchAggregationManager.java @@ -3,6 +3,7 @@ package org.openmetadata.service.search.opensearch; import com.fasterxml.jackson.core.JsonProcessingException; import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; +import io.micrometer.core.instrument.Timer; import jakarta.json.JsonObject; import jakarta.ws.rs.core.Response; import java.io.IOException; @@ -20,6 +21,7 @@ import org.openmetadata.schema.tests.DataQualityReport; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.sdk.exception.SearchException; import org.openmetadata.service.Entity; +import org.openmetadata.service.monitoring.RequestLatencyContext; import org.openmetadata.service.resources.settings.SettingsCache; import org.openmetadata.service.search.AggregationManagementClient; import org.openmetadata.service.search.SearchAggregation; @@ -174,8 +176,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient searchRequestBuilder.size(0); searchRequestBuilder.timeout("30s"); - SearchResponse searchResponse = - client.search(searchRequestBuilder.build(), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequestBuilder.build(), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return Response.status(Response.Status.OK).entity(searchResponse.toJsonString()).build(); } catch (Exception e) { LOG.error("Failed to execute aggregation", e); @@ -217,8 +226,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient searchRequestBuilder.size(0); searchRequestBuilder.timeout("30s"); - SearchResponse searchResponse = - client.search(searchRequestBuilder.build(), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequestBuilder.build(), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String response = searchResponse.toJsonString(); JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject(); @@ -296,8 +312,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient searchRequestBuilder.size(0); searchRequestBuilder.timeout("30s"); - SearchResponse searchResponse = - client.search(searchRequestBuilder.build(), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequestBuilder.build(), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String response = searchResponse.toJsonString(); JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject(); @@ -373,8 +396,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient searchRequestBuilder.size(0); searchRequestBuilder.timeout("30s"); - SearchResponse searchResponse = - client.search(searchRequestBuilder.build(), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequestBuilder.build(), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String response = searchResponse.toJsonString(); JsonObject jsonResponse = JsonUtils.readJson(response).asJsonObject(); @@ -460,7 +490,15 @@ public class OpenSearchAggregationManager implements AggregationManagementClient // Build and execute search SearchRequest searchRequest = requestBuilder.build(resolvedIndex); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } LOG.info("Entity type counts query for index '{}' (resolved: '{}')", index, resolvedIndex); diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchDataInsightAggregatorManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchDataInsightAggregatorManager.java index a2538e1196b..94c35b75dd8 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchDataInsightAggregatorManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchDataInsightAggregatorManager.java @@ -3,6 +3,7 @@ package org.openmetadata.service.search.opensearch; import static jakarta.ws.rs.core.Response.Status.OK; import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty; +import io.micrometer.core.instrument.Timer; import jakarta.ws.rs.core.Response; import java.io.IOException; import java.util.ArrayList; @@ -23,6 +24,7 @@ import org.openmetadata.service.Entity; import org.openmetadata.service.dataInsight.DataInsightAggregatorInterface; import org.openmetadata.service.jdbi3.DataInsightChartRepository; import org.openmetadata.service.jdbi3.DataInsightSystemChartRepository; +import org.openmetadata.service.monitoring.RequestLatencyContext; import org.openmetadata.service.search.DataInsightAggregatorClient; import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchAggregatedUnusedAssetsCountAggregator; import org.openmetadata.service.search.opensearch.dataInsightAggregator.OpenSearchAggregatedUnusedAssetsSizeAggregator; @@ -76,7 +78,15 @@ public class OpenSearchDataInsightAggregatorManager implements DataInsightAggreg new HashMap<>(); SearchRequest searchRequest = aggregator.prepareSearchRequest(diChart, start, end, formulas, metricFormulaHolder, live); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return aggregator.processSearchResponse( diChart, searchResponse, formulas, metricFormulaHolder); } @@ -145,7 +155,15 @@ public class OpenSearchDataInsightAggregatorManager implements DataInsightAggreg from, queryFilter, dataReportIndex); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return Response.status(OK) .entity(processDataInsightChartResultNew(searchResponse, dataInsightChartName)) .build(); @@ -159,7 +177,15 @@ public class OpenSearchDataInsightAggregatorManager implements DataInsightAggreg } SearchRequest searchRequest = QueryCostRecordsAggregator.getQueryCostRecords(serviceName); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return QueryCostRecordsAggregator.parseQueryCostResponse(searchResponse); } diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchSearchManager.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchSearchManager.java index 57a72d76038..c93cdca9c0d 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchSearchManager.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OpenSearchSearchManager.java @@ -170,7 +170,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { t.field("sourceUrl") .value(FieldValue.of(sourceUrl))))))); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return Response.status(OK).entity(response.toJsonString()).build(); } @@ -200,7 +208,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { t.field("deleted") .value(FieldValue.of(deleted))))))); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return Response.status(OK).entity(response.toJsonString()).build(); } @@ -279,7 +295,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { try { SearchRequest searchRequest = requestBuilder.build(index); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } List> results = new ArrayList<>(); if (response.hits().hits() != null) { @@ -449,7 +473,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { try { SearchRequest searchRequest = requestBuilder.build(index); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } List> results = new ArrayList<>(); Object[] lastHitSortValues = null; @@ -607,7 +639,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { // Build and execute search request SearchRequest searchRequest = requestBuilder.build(request.getIndex()); - SearchResponse response = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse response; + try { + response = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } String responseJson = response.toJsonString(); LOG.debug("Direct query search completed successfully"); @@ -799,7 +839,14 @@ public class OpenSearchSearchManager implements SearchManagementClient { Query boolQuery = Query.of(q -> q.bool(b -> b.must(mustQueries).filter(filterQueries))); - return client.search(s -> s.index(indexName).query(boolQuery).size(1000), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + try { + return client.search(s -> s.index(indexName).query(boolQuery).size(1000), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } } /** @@ -1252,7 +1299,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { // Execute search to get aggregations for parent terms SearchRequest searchRequest = requestBuilder.build(request.getIndex()); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } if (searchResponse.aggregations() != null && searchResponse.aggregations().containsKey("fqnParts_agg")) { @@ -1441,7 +1496,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { addAggregationsToNLQQuery(requestBuilder, request.getIndex()); SearchRequest searchRequest = requestBuilder.build(request.getIndex()); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } return Response.status(Response.Status.OK).entity(searchResponse.toJsonString()).build(); } catch (Exception e) { @@ -1469,7 +1532,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { .query(query) .size(1000)); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (var hit : searchResponse.hits().hits()) { if (hit.source() != null) { @@ -1516,7 +1587,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { .query(query) .size(1000)); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (var hit : searchResponse.hits().hits()) { if (hit.source() != null) { @@ -1618,7 +1697,15 @@ public class OpenSearchSearchManager implements SearchManagementClient { .query(finalBaseQuery) .size(1000)); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (var hit : searchResponse.hits().hits()) { if (hit.source() != null) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OsUtils.java b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OsUtils.java index 476f0817ac8..3dfebf96be7 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OsUtils.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/search/opensearch/OsUtils.java @@ -13,6 +13,7 @@ import com.fasterxml.jackson.databind.JsonNode; import com.fasterxml.jackson.databind.ObjectMapper; import com.fasterxml.jackson.databind.node.ObjectNode; import com.nimbusds.jose.util.Pair; +import io.micrometer.core.instrument.Timer; import java.io.IOException; import java.util.Base64; import java.util.HashMap; @@ -27,6 +28,7 @@ import org.openmetadata.schema.api.lineage.LineageDirection; import org.openmetadata.schema.utils.JsonUtils; import org.openmetadata.sdk.exception.SearchException; import org.openmetadata.service.Entity; +import org.openmetadata.service.monitoring.RequestLatencyContext; import os.org.opensearch.client.json.JsonData; import os.org.opensearch.client.opensearch.OpenSearchClient; import os.org.opensearch.client.opensearch._types.FieldValue; @@ -135,8 +137,15 @@ public class OsUtils { } } - return client.search( - searchRequestBuilder.build(), os.org.opensearch.client.json.JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + try { + return client.search( + searchRequestBuilder.build(), os.org.opensearch.client.json.JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } } public static Map searchEREntityByKey( @@ -190,7 +199,15 @@ public class OsUtils { null, null, fieldsToRemove); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (Hit hit : searchResponse.hits().hits()) { if (hit.source() != null) { @@ -306,7 +323,15 @@ public class OsUtils { null, null, fieldsToRemove); - SearchResponse searchResponse = client.search(searchRequest, JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + SearchResponse searchResponse; + try { + searchResponse = client.search(searchRequest, JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } for (Hit hit : searchResponse.hits().hits()) { if (hit.source() != null) { @@ -389,7 +414,14 @@ public class OsUtils { // Apply query filter buildSearchSourceFilter(queryFilter, searchRequestBuilder); - return client.search(searchRequestBuilder.build(), JsonData.class); + Timer.Sample searchTimerSample = RequestLatencyContext.startSearchOperation(); + try { + return client.search(searchRequestBuilder.build(), JsonData.class); + } finally { + if (searchTimerSample != null) { + RequestLatencyContext.endSearchOperation(searchTimerSample); + } + } } private static Query buildBoolQueriesWithShould(Map> keysAndValues) { diff --git a/openmetadata-service/src/main/java/org/openmetadata/service/util/jdbi/OMSqlLogger.java b/openmetadata-service/src/main/java/org/openmetadata/service/util/jdbi/OMSqlLogger.java index c3209397058..55ccfadb118 100644 --- a/openmetadata-service/src/main/java/org/openmetadata/service/util/jdbi/OMSqlLogger.java +++ b/openmetadata-service/src/main/java/org/openmetadata/service/util/jdbi/OMSqlLogger.java @@ -1,9 +1,13 @@ package org.openmetadata.service.util.jdbi; +import io.micrometer.core.instrument.Counter; +import io.micrometer.core.instrument.DistributionSummary; import io.micrometer.core.instrument.Metrics; import io.micrometer.core.instrument.Timer; import java.time.temporal.ChronoUnit; +import java.util.concurrent.ConcurrentHashMap; import java.util.concurrent.TimeUnit; +import java.util.regex.Pattern; import lombok.extern.slf4j.Slf4j; import org.jdbi.v3.core.statement.SqlLogger; import org.jdbi.v3.core.statement.StatementContext; @@ -12,6 +16,32 @@ import org.openmetadata.service.monitoring.RequestLatencyContext; @Slf4j public class OMSqlLogger implements SqlLogger { private static final String DB_TIMER_CONTEXT_KEY = "db.timer.context"; + private static final Pattern SQL_TYPE_PATTERN = + Pattern.compile( + "^\\s*(SELECT|INSERT|UPDATE|DELETE|MERGE|CREATE|ALTER|DROP)\\b", + Pattern.CASE_INSENSITIVE); + + private static volatile long slowQueryThresholdMs = 100; + + private static final ConcurrentHashMap LATENCY_TIMERS = new ConcurrentHashMap<>(); + private static final ConcurrentHashMap QUERY_COUNTERS = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap SLOW_QUERY_COUNTERS = + new ConcurrentHashMap<>(); + private static final ConcurrentHashMap SLOW_QUERY_SUMMARIES = + new ConcurrentHashMap<>(); + + private static final Timer LEGACY_JDBI_TIMER = Metrics.timer("jdbi_requests_seconds"); + private static final Timer LEGACY_LATENCY_TIMER = Metrics.timer("jdbi_latency_requests_seconds"); + + public static void setSlowQueryThresholdMs(long thresholdMs) { + slowQueryThresholdMs = thresholdMs; + LOG.info("Slow query threshold set to {} ms", thresholdMs); + } + + public static long getSlowQueryThresholdMs() { + return slowQueryThresholdMs; + } @Override public void logBeforeExecution(StatementContext context) { @@ -19,10 +49,8 @@ public class OMSqlLogger implements SqlLogger { LOG.debug("sql {}, parameters {}", context.getRenderedSql(), context.getBinding()); } - // Start database operation timing using Micrometer try { - io.micrometer.core.instrument.Timer.Sample timerSample = - RequestLatencyContext.startDatabaseOperation(); + Timer.Sample timerSample = RequestLatencyContext.startDatabaseOperation(); if (timerSample != null) { context.define(DB_TIMER_CONTEXT_KEY, timerSample); } @@ -33,20 +61,53 @@ public class OMSqlLogger implements SqlLogger { @Override public void logAfterExecution(StatementContext context) { + long elapsedTimeMillis = context.getElapsedTime(ChronoUnit.MILLIS); long elapsedTime = context.getElapsedTime(ChronoUnit.SECONDS); - // Record using Micrometer API - Timer jdbiTimer = Metrics.timer("jdbi_requests_seconds"); - jdbiTimer.record((long) (elapsedTime * 1000), TimeUnit.MILLISECONDS); - Timer latencyTimer = Metrics.timer("jdbi_latency_requests_seconds"); - latencyTimer.record((long) (elapsedTime * 1000), TimeUnit.MILLISECONDS); + String queryType = extractQueryType(context.getRenderedSql()); + + // Use simple timer without histogram buckets for high-volume per-query metrics + // This significantly reduces Prometheus cardinality (histogram creates ~50 series per tag + // combo) + // Detailed latency analysis is available via slow query metrics below + Timer latencyTimer = + LATENCY_TIMERS.computeIfAbsent( + queryType, + type -> + Timer.builder("db.query.latency") + .tag("type", type) + .publishPercentileHistogram(false) + .register(Metrics.globalRegistry)); + latencyTimer.record(elapsedTimeMillis, TimeUnit.MILLISECONDS); + + Counter queryCounter = + QUERY_COUNTERS.computeIfAbsent( + queryType, type -> Metrics.counter("db.query.count", "type", type)); + queryCounter.increment(); + + if (elapsedTimeMillis > slowQueryThresholdMs) { + Counter slowCounter = + SLOW_QUERY_COUNTERS.computeIfAbsent( + queryType, type -> Metrics.counter("db.query.slow", "type", type)); + slowCounter.increment(); + + DistributionSummary slowSummary = + SLOW_QUERY_SUMMARIES.computeIfAbsent( + queryType, + type -> + DistributionSummary.builder("db.query.slow.latency") + .tag("type", type) + .register(Metrics.globalRegistry)); + slowSummary.record(elapsedTimeMillis); + } + + LEGACY_JDBI_TIMER.record((long) (elapsedTime * 1000), TimeUnit.MILLISECONDS); + LEGACY_LATENCY_TIMER.record((long) (elapsedTime * 1000), TimeUnit.MILLISECONDS); - // End database operation timing using Micrometer try { Object timerSample = context.getAttribute(DB_TIMER_CONTEXT_KEY); - if (timerSample instanceof io.micrometer.core.instrument.Timer.Sample) { - RequestLatencyContext.endDatabaseOperation( - (io.micrometer.core.instrument.Timer.Sample) timerSample); + if (timerSample instanceof Timer.Sample) { + RequestLatencyContext.endDatabaseOperation((Timer.Sample) timerSample); } } catch (Exception e) { // Ignore - latency tracking is optional @@ -57,7 +118,18 @@ public class OMSqlLogger implements SqlLogger { "sql {}, parameters {}, timeTaken {} ms", context.getRenderedSql(), context.getBinding(), - context.getElapsedTime(ChronoUnit.MILLIS)); + elapsedTimeMillis); } } + + private String extractQueryType(String sql) { + if (sql == null || sql.isBlank()) { + return "UNKNOWN"; + } + var matcher = SQL_TYPE_PATTERN.matcher(sql); + if (matcher.find()) { + return matcher.group(1).toUpperCase(); + } + return "OTHER"; + } } diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/BulkExecutorTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/BulkExecutorTest.java new file mode 100644 index 00000000000..30ab90c03e5 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/jdbi3/BulkExecutorTest.java @@ -0,0 +1,364 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.jdbi3; + +import static org.junit.jupiter.api.Assertions.*; + +import java.util.concurrent.CountDownLatch; +import java.util.concurrent.Future; +import java.util.concurrent.RejectedExecutionException; +import java.util.concurrent.TimeUnit; +import java.util.concurrent.atomic.AtomicBoolean; +import java.util.concurrent.atomic.AtomicInteger; +import lombok.extern.slf4j.Slf4j; +import org.awaitility.Awaitility; +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; +import org.openmetadata.service.config.BulkOperationConfiguration; + +@Slf4j +class BulkExecutorTest { + + @BeforeEach + void setUp() { + BulkExecutor.reset(); + } + + @AfterEach + void tearDown() { + BulkExecutor.reset(); + } + + @Test + void testDefaultInitialization() { + BulkExecutor executor = BulkExecutor.getInstance(); + assertNotNull(executor); + assertEquals(10, executor.getMaxThreads()); + assertEquals(1000, executor.getQueueSize()); + assertEquals(300, executor.getTimeoutSeconds()); + } + + @Test + void testCustomInitialization() { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(5); + config.setQueueSize(500); + config.setTimeoutSeconds(120); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + assertEquals(5, executor.getMaxThreads()); + assertEquals(500, executor.getQueueSize()); + assertEquals(120, executor.getTimeoutSeconds()); + } + + @Test + void testConcurrencyLimit() throws InterruptedException { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(3); + config.setQueueSize(100); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + AtomicInteger maxConcurrent = new AtomicInteger(0); + AtomicInteger currentConcurrent = new AtomicInteger(0); + AtomicInteger completed = new AtomicInteger(0); + CountDownLatch allDone = new CountDownLatch(10); + + for (int i = 0; i < 10; i++) { + executor.submit( + () -> { + try { + int current = currentConcurrent.incrementAndGet(); + maxConcurrent.updateAndGet(max -> Math.max(max, current)); + Thread.sleep(50); // Simulate work + currentConcurrent.decrementAndGet(); + completed.incrementAndGet(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + allDone.countDown(); + } + }); + } + + assertTrue(allDone.await(10, TimeUnit.SECONDS), "All tasks should complete"); + assertEquals(10, completed.get(), "All 10 tasks should complete"); + assertTrue( + maxConcurrent.get() <= 3, + "Max concurrent should not exceed 3, got: " + maxConcurrent.get()); + + LOG.info( + "Concurrency test: maxConcurrent={}, completed={}", maxConcurrent.get(), completed.get()); + } + + @Test + void testQueueCapacity() throws InterruptedException { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(1); + config.setQueueSize(5); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + CountDownLatch blocker = new CountDownLatch(1); + + // Submit a blocking task + executor.submit( + () -> { + try { + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // Fill the queue (5 tasks) + for (int i = 0; i < 5; i++) { + executor.submit(() -> {}); + } + + // Next submission should be rejected (queue full) + assertThrows( + RejectedExecutionException.class, + () -> executor.submit(() -> {}), + "Should reject when queue is full"); + + // Release blocker + blocker.countDown(); + + LOG.info("Queue capacity test passed"); + } + + @Test + void testHasCapacity() throws InterruptedException { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(1); + config.setQueueSize(2); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + assertTrue(executor.hasCapacity(), "Should have capacity initially"); + + CountDownLatch blocker = new CountDownLatch(1); + + // Block the single thread + executor.submit( + () -> { + try { + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // Fill queue + executor.submit(() -> {}); + executor.submit(() -> {}); + + assertFalse(executor.hasCapacity(), "Should not have capacity when queue is full"); + + blocker.countDown(); + + // Wait for queue to drain + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(executor::hasCapacity); + + assertTrue(executor.hasCapacity(), "Should have capacity after tasks complete"); + } + + @Test + void testActiveCountAndQueueDepth() throws InterruptedException { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(2); + config.setQueueSize(100); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + assertEquals(0, executor.getActiveCount(), "Initially no active threads"); + assertEquals(0, executor.getQueueDepth(), "Initially empty queue"); + + CountDownLatch blocker = new CountDownLatch(1); + CountDownLatch tasksStarted = new CountDownLatch(2); + + // Submit blocking tasks + for (int i = 0; i < 2; i++) { + executor.submit( + () -> { + tasksStarted.countDown(); + try { + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + } + + // Wait for tasks to start + tasksStarted.await(5, TimeUnit.SECONDS); + + // Wait for threads to fully activate + Awaitility.await().atMost(5, TimeUnit.SECONDS).until(() -> executor.getActiveCount() == 2); + + assertEquals(2, executor.getActiveCount(), "Should have 2 active threads"); + + // Submit more tasks to queue + executor.submit(() -> {}); + executor.submit(() -> {}); + executor.submit(() -> {}); + + assertEquals(3, executor.getQueueDepth(), "Should have 3 queued tasks"); + + blocker.countDown(); + + // Wait for completion + Awaitility.await() + .atMost(5, TimeUnit.SECONDS) + .until(() -> executor.getActiveCount() == 0 && executor.getQueueDepth() == 0); + + assertEquals(0, executor.getActiveCount(), "Should have no active threads after completion"); + assertEquals(0, executor.getQueueDepth(), "Should have empty queue after completion"); + } + + @Test + void testSingletonBehavior() { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(5); + + BulkExecutor.initialize(config); + + BulkExecutor instance1 = BulkExecutor.getInstance(); + BulkExecutor instance2 = BulkExecutor.getInstance(); + + assertSame(instance1, instance2, "Should return same singleton instance"); + } + + @Test + void testIsShutdown() { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(2); + config.setTimeoutSeconds(5); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + assertFalse(executor.isShutdown(), "Should not be shutdown initially"); + + executor.shutdown(); + + assertTrue(executor.isShutdown(), "Should be shutdown after shutdown() called"); + } + + @Test + void testSubmitRejectsAfterShutdown() { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(2); + config.setTimeoutSeconds(5); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + executor.shutdown(); + + assertThrows( + RejectedExecutionException.class, + () -> executor.submit(() -> {}), + "Should reject submit after shutdown"); + } + + @Test + void testSubmitWithFuture() throws Exception { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(2); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + AtomicBoolean taskExecuted = new AtomicBoolean(false); + + Future future = + executor.submitWithFuture( + () -> { + taskExecuted.set(true); + }); + + assertNotNull(future, "Should return a Future"); + future.get(5, TimeUnit.SECONDS); + + assertTrue(taskExecuted.get(), "Task should have executed"); + assertTrue(future.isDone(), "Future should be done"); + } + + @Test + void testSubmitWithFutureRejectsAfterShutdown() { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(2); + config.setTimeoutSeconds(5); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + executor.shutdown(); + + assertThrows( + RejectedExecutionException.class, + () -> executor.submitWithFuture(() -> {}), + "Should reject submitWithFuture after shutdown"); + } + + @Test + void testFutureCancellation() throws Exception { + BulkOperationConfiguration config = new BulkOperationConfiguration(); + config.setMaxThreads(1); + + BulkExecutor.initialize(config); + BulkExecutor executor = BulkExecutor.getInstance(); + + CountDownLatch blocker = new CountDownLatch(1); + AtomicBoolean secondTaskRan = new AtomicBoolean(false); + + // Block the single thread + executor.submitWithFuture( + () -> { + try { + blocker.await(); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + }); + + // Submit a cancellable task + Future cancellableFuture = + executor.submitWithFuture( + () -> { + secondTaskRan.set(true); + }); + + // Cancel the queued task before it runs + boolean cancelled = cancellableFuture.cancel(false); + + // Release the blocker + blocker.countDown(); + + // Wait a bit + Awaitility.await().atMost(2, TimeUnit.SECONDS).until(cancellableFuture::isDone); + + assertTrue( + cancellableFuture.isCancelled() || cancelled, + "Future should be cancelled or able to cancel"); + } +} diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/RequestLatencyContextTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/RequestLatencyContextTest.java index c4d92bdf40e..51653d67f29 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/RequestLatencyContextTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/monitoring/RequestLatencyContextTest.java @@ -552,6 +552,486 @@ class RequestLatencyContextTest { assertEquals("/api/v1/tables", MetricUtils.normalizeUri("/api/v1/tables?query=test&limit=10")); } + @Test + void testContextPropagationToChildThreads() throws InterruptedException { + String endpoint = "/api/v1/tables/bulk"; + + // Start request in parent thread + RequestLatencyContext.startRequest(endpoint, "PUT"); + simulateWork(10); + + // Get context for propagation + RequestLatencyContext.RequestContext parentContext = RequestLatencyContext.getContext(); + assertNotNull(parentContext, "Parent context should exist"); + + // Simulate bulk operation with child threads + Thread childThread1 = + new Thread( + () -> { + RequestLatencyContext.setContext(parentContext); + try { + Timer.Sample dbSample = RequestLatencyContext.startDatabaseOperation(); + simulateWork(50); + RequestLatencyContext.endDatabaseOperation(dbSample); + } finally { + RequestLatencyContext.clearContext(); + } + }); + + Thread childThread2 = + new Thread( + () -> { + RequestLatencyContext.setContext(parentContext); + try { + Timer.Sample dbSample = RequestLatencyContext.startDatabaseOperation(); + simulateWork(75); + RequestLatencyContext.endDatabaseOperation(dbSample); + } finally { + RequestLatencyContext.clearContext(); + } + }); + + childThread1.start(); + childThread2.start(); + childThread1.join(); + childThread2.join(); + + simulateWork(10); + RequestLatencyContext.endRequest(); + + // Verify that DB time from both child threads was accumulated + String normalizedEndpoint = MetricUtils.normalizeUri(endpoint); + Timer dbTimer = + Metrics.globalRegistry + .find("request.latency.database") + .tag("endpoint", normalizedEndpoint) + .tag("method", "PUT") + .timer(); + + assertNotNull(dbTimer, "DB timer should exist"); + double dbMs = dbTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS); + + LOG.info( + "Total DB time from child threads: {}ms (expected ~125ms)", String.format("%.2f", dbMs)); + + // DB time should include operations from both child threads (50ms + 75ms = 125ms min) + assertTrue( + dbMs >= 100, "DB time should accumulate from child threads, expected ~125ms, got: " + dbMs); + } + + @Test + void testContextGetSetClear() { + // Test getContext returns null when no context + assertNull(RequestLatencyContext.getContext(), "Should return null when no context"); + + // Start a request + RequestLatencyContext.startRequest("/api/v1/test", "GET"); + + // Get context should return non-null + RequestLatencyContext.RequestContext context = RequestLatencyContext.getContext(); + assertNotNull(context, "Should return context after startRequest"); + + // Clear context + RequestLatencyContext.clearContext(); + assertNull(RequestLatencyContext.getContext(), "Should return null after clearContext"); + + // Set context again + RequestLatencyContext.setContext(context); + assertNotNull(RequestLatencyContext.getContext(), "Should return context after setContext"); + + // Clean up + RequestLatencyContext.endRequest(); + } + + @Test + void testBulkOperationSimulation() throws InterruptedException { + // Simulates the actual bulk operation pattern used in EntityRepository + String endpoint = "/api/v1/tables/bulk"; + int numEntities = 10; + int dbTimePerEntity = 20; // ms + + RequestLatencyContext.startRequest(endpoint, "PUT"); + simulateWork(5); // Initial processing + + RequestLatencyContext.RequestContext parentContext = RequestLatencyContext.getContext(); + + // Simulate parallel entity processing like BOUNDED_BULK_EXECUTOR + java.util.List threads = new java.util.ArrayList<>(); + for (int i = 0; i < numEntities; i++) { + Thread thread = + new Thread( + () -> { + RequestLatencyContext.setContext(parentContext); + try { + // Each entity does a DB lookup and update + Timer.Sample dbSample1 = RequestLatencyContext.startDatabaseOperation(); + simulateWork(dbTimePerEntity / 2); // findByName + RequestLatencyContext.endDatabaseOperation(dbSample1); + + Timer.Sample dbSample2 = RequestLatencyContext.startDatabaseOperation(); + simulateWork(dbTimePerEntity / 2); // update + RequestLatencyContext.endDatabaseOperation(dbSample2); + } finally { + RequestLatencyContext.clearContext(); + } + }); + threads.add(thread); + thread.start(); + } + + // Wait for all threads + for (Thread t : threads) { + t.join(); + } + + simulateWork(5); // Final processing + RequestLatencyContext.endRequest(); + + // Verify metrics + String normalizedEndpoint = MetricUtils.normalizeUri(endpoint); + + Timer dbTimer = + Metrics.globalRegistry + .find("request.latency.database") + .tag("endpoint", normalizedEndpoint) + .tag("method", "PUT") + .timer(); + Timer totalTimer = + Metrics.globalRegistry + .find("request.latency.total") + .tag("endpoint", normalizedEndpoint) + .tag("method", "PUT") + .timer(); + + assertNotNull(dbTimer, "DB timer should exist"); + assertNotNull(totalTimer, "Total timer should exist"); + + double dbMs = dbTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS); + double totalMs = totalTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS); + + // Expected: 10 entities * 20ms DB time = 200ms total DB time + // Due to parallelism, wall clock time is less, but accumulated DB time should be ~200ms + LOG.info( + "Bulk simulation: total={}ms, db={}ms, entities={}, expected db ~{}ms", + String.format("%.0f", totalMs), + String.format("%.0f", dbMs), + numEntities, + numEntities * dbTimePerEntity); + + // DB time should be at least 80% of expected (allowing for timing variance) + double expectedDbTime = numEntities * dbTimePerEntity; + assertTrue( + dbMs >= expectedDbTime * 0.8, + String.format( + "DB time should be at least %.0fms (80%% of expected), got: %.0fms", + expectedDbTime * 0.8, dbMs)); + + // Verify operation count + var dbOperations = + Metrics.globalRegistry + .find("request.operations.database") + .tag("endpoint", normalizedEndpoint) + .tag("method", "PUT") + .summary(); + assertNotNull(dbOperations, "Should have database operations summary"); + assertEquals( + numEntities * 2, + dbOperations.totalAmount(), + "Should have " + (numEntities * 2) + " DB operations (2 per entity)"); + } + + @Test + void testConcurrentDbAndSearchOperations() throws InterruptedException { + String endpoint = "/api/v1/search/bulk"; + + RequestLatencyContext.startRequest(endpoint, "POST"); + RequestLatencyContext.RequestContext parentContext = RequestLatencyContext.getContext(); + + // Thread 1: DB operations + Thread dbThread = + new Thread( + () -> { + RequestLatencyContext.setContext(parentContext); + try { + for (int i = 0; i < 3; i++) { + Timer.Sample sample = RequestLatencyContext.startDatabaseOperation(); + simulateWork(30); + RequestLatencyContext.endDatabaseOperation(sample); + } + } finally { + RequestLatencyContext.clearContext(); + } + }); + + // Thread 2: Search operations + Thread searchThread = + new Thread( + () -> { + RequestLatencyContext.setContext(parentContext); + try { + for (int i = 0; i < 2; i++) { + Timer.Sample sample = RequestLatencyContext.startSearchOperation(); + simulateWork(50); + RequestLatencyContext.endSearchOperation(sample); + } + } finally { + RequestLatencyContext.clearContext(); + } + }); + + dbThread.start(); + searchThread.start(); + dbThread.join(); + searchThread.join(); + + RequestLatencyContext.endRequest(); + + String normalizedEndpoint = MetricUtils.normalizeUri(endpoint); + + // Verify DB metrics + Timer dbTimer = + Metrics.globalRegistry + .find("request.latency.database") + .tag("endpoint", normalizedEndpoint) + .tag("method", "POST") + .timer(); + assertNotNull(dbTimer, "DB timer should exist"); + double dbMs = dbTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS); + assertTrue(dbMs >= 70, "DB time should be at least 70ms (3 * 30ms * 0.8), got: " + dbMs); + + // Verify Search metrics + Timer searchTimer = + Metrics.globalRegistry + .find("request.latency.search") + .tag("endpoint", normalizedEndpoint) + .tag("method", "POST") + .timer(); + assertNotNull(searchTimer, "Search timer should exist"); + double searchMs = searchTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS); + assertTrue( + searchMs >= 80, "Search time should be at least 80ms (2 * 50ms * 0.8), got: " + searchMs); + + // Verify operation counts + var dbOps = + Metrics.globalRegistry + .find("request.operations.database") + .tag("endpoint", normalizedEndpoint) + .tag("method", "POST") + .summary(); + var searchOps = + Metrics.globalRegistry + .find("request.operations.search") + .tag("endpoint", normalizedEndpoint) + .tag("method", "POST") + .summary(); + + assertEquals(3, dbOps.totalAmount(), "Should have 3 DB operations"); + assertEquals(2, searchOps.totalAmount(), "Should have 2 search operations"); + + LOG.info( + "Mixed operations: db={}ms (3 ops), search={}ms (2 ops)", + String.format("%.0f", dbMs), + String.format("%.0f", searchMs)); + } + + @Test + void testHighConcurrencyStressTest() throws InterruptedException { + // Stress test with many concurrent threads to catch race conditions + String endpoint = "/api/v1/stress/test"; + int numThreads = 50; + int opsPerThread = 5; + + RequestLatencyContext.startRequest(endpoint, "PUT"); + RequestLatencyContext.RequestContext parentContext = RequestLatencyContext.getContext(); + + java.util.concurrent.CountDownLatch startLatch = new java.util.concurrent.CountDownLatch(1); + java.util.concurrent.CountDownLatch doneLatch = + new java.util.concurrent.CountDownLatch(numThreads); + + for (int i = 0; i < numThreads; i++) { + new Thread( + () -> { + try { + startLatch.await(); // Wait for all threads to be ready + RequestLatencyContext.setContext(parentContext); + try { + for (int j = 0; j < opsPerThread; j++) { + Timer.Sample sample = RequestLatencyContext.startDatabaseOperation(); + // Minimal work to focus on concurrency + Thread.sleep(1); + RequestLatencyContext.endDatabaseOperation(sample); + } + } finally { + RequestLatencyContext.clearContext(); + } + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } finally { + doneLatch.countDown(); + } + }) + .start(); + } + + // Start all threads simultaneously + startLatch.countDown(); + // Wait for completion + doneLatch.await(); + + RequestLatencyContext.endRequest(); + + // Verify operation count is correct despite high concurrency + String normalizedEndpoint = MetricUtils.normalizeUri(endpoint); + var dbOps = + Metrics.globalRegistry + .find("request.operations.database") + .tag("endpoint", normalizedEndpoint) + .tag("method", "PUT") + .summary(); + + assertNotNull(dbOps, "DB operations summary should exist"); + int expectedOps = numThreads * opsPerThread; + assertEquals( + expectedOps, + dbOps.totalAmount(), + String.format( + "Should have exactly %d DB operations (%d threads * %d ops), got: %.0f", + expectedOps, numThreads, opsPerThread, dbOps.totalAmount())); + + LOG.info( + "Stress test passed: {} threads, {} ops each, total ops recorded: {}", + numThreads, + opsPerThread, + (int) dbOps.totalAmount()); + } + + @Test + void testTimingAccuracyWithKnownDurations() throws InterruptedException { + // Test that timing measurements are reasonably accurate + String endpoint = "/api/v1/timing/test"; + long expectedDbTime = 100; // ms + long expectedSearchTime = 75; // ms + long tolerance = 30; // Allow 30ms variance for CI environments + + RequestLatencyContext.startRequest(endpoint, "GET"); + + Timer.Sample dbSample = RequestLatencyContext.startDatabaseOperation(); + Thread.sleep(expectedDbTime); + RequestLatencyContext.endDatabaseOperation(dbSample); + + Timer.Sample searchSample = RequestLatencyContext.startSearchOperation(); + Thread.sleep(expectedSearchTime); + RequestLatencyContext.endSearchOperation(searchSample); + + RequestLatencyContext.endRequest(); + + String normalizedEndpoint = MetricUtils.normalizeUri(endpoint); + + Timer dbTimer = + Metrics.globalRegistry + .find("request.latency.database") + .tag("endpoint", normalizedEndpoint) + .tag("method", "GET") + .timer(); + Timer searchTimer = + Metrics.globalRegistry + .find("request.latency.search") + .tag("endpoint", normalizedEndpoint) + .tag("method", "GET") + .timer(); + + double actualDbMs = dbTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS); + double actualSearchMs = searchTimer.totalTime(java.util.concurrent.TimeUnit.MILLISECONDS); + + LOG.info( + "Timing accuracy: expected db={}ms got={}ms, expected search={}ms got={}ms", + expectedDbTime, + String.format("%.0f", actualDbMs), + expectedSearchTime, + String.format("%.0f", actualSearchMs)); + + assertTrue( + Math.abs(actualDbMs - expectedDbTime) <= tolerance, + String.format( + "DB time should be within %dms of %dms, got: %.0fms", + tolerance, expectedDbTime, actualDbMs)); + assertTrue( + Math.abs(actualSearchMs - expectedSearchTime) <= tolerance, + String.format( + "Search time should be within %dms of %dms, got: %.0fms", + tolerance, expectedSearchTime, actualSearchMs)); + } + + @Test + void testPercentageCalculation() { + String endpoint = "/api/v1/percentage/test"; + + RequestLatencyContext.startRequest(endpoint, "GET"); + + // 100ms DB, 100ms search, ~100ms internal = ~33% each + simulateWork(50); // Internal + + Timer.Sample dbSample = RequestLatencyContext.startDatabaseOperation(); + simulateWork(100); + RequestLatencyContext.endDatabaseOperation(dbSample); + + simulateWork(50); // Internal + + Timer.Sample searchSample = RequestLatencyContext.startSearchOperation(); + simulateWork(100); + RequestLatencyContext.endSearchOperation(searchSample); + + RequestLatencyContext.endRequest(); + + String normalizedEndpoint = MetricUtils.normalizeUri(endpoint); + + Gauge dbPercent = + Metrics.globalRegistry + .find("request.percentage.database") + .tag("endpoint", normalizedEndpoint) + .tag("method", "GET") + .gauge(); + Gauge searchPercent = + Metrics.globalRegistry + .find("request.percentage.search") + .tag("endpoint", normalizedEndpoint) + .tag("method", "GET") + .gauge(); + Gauge internalPercent = + Metrics.globalRegistry + .find("request.percentage.internal") + .tag("endpoint", normalizedEndpoint) + .tag("method", "GET") + .gauge(); + + assertNotNull(dbPercent, "DB percentage gauge should exist"); + assertNotNull(searchPercent, "Search percentage gauge should exist"); + assertNotNull(internalPercent, "Internal percentage gauge should exist"); + + double db = dbPercent.value(); + double search = searchPercent.value(); + double internal = internalPercent.value(); + double total = db + search + internal; + + LOG.info( + "Percentages: db={}%, search={}%, internal={}%, total={}%", + String.format("%.1f", db), + String.format("%.1f", search), + String.format("%.1f", internal), + String.format("%.1f", total)); + + // Total should be ~100% + assertTrue( + total >= 95 && total <= 105, + String.format("Total percentage should be ~100%%, got: %.1f%%", total)); + + // Each component should be roughly 25-40% given the timing + assertTrue(db >= 20 && db <= 45, String.format("DB should be 20-45%%, got: %.1f%%", db)); + assertTrue( + search >= 20 && search <= 45, + String.format("Search should be 20-45%%, got: %.1f%%", search)); + } + private void printDetailedMetrics(String endpoint) { LOG.info("\n=== Detailed Metrics for {} ===", endpoint); diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/resources/teams/UserResourceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/resources/teams/UserResourceTest.java index 34a35d89411..00b7588faed 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/resources/teams/UserResourceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/resources/teams/UserResourceTest.java @@ -2882,8 +2882,8 @@ public class UserResourceTest extends EntityResourceTest { long minCacheHitTime = cacheHitTimes.stream().mapToLong(Long::longValue).min().orElse(0); LOG.info( - "Cache HIT times - Avg: {:.2f}ms, Min: {}ms, Max: {}ms", - avgCacheHitTime, + "Cache HIT times - Avg: {}ms, Min: {}ms, Max: {}ms", + String.format("%.2f", avgCacheHitTime), minCacheHitTime, maxCacheHitTime); @@ -2891,8 +2891,10 @@ public class UserResourceTest extends EntityResourceTest { double performanceImprovement = ((double) cacheMissTime - avgCacheHitTime) / cacheMissTime * 100; LOG.info( - "Performance improvement: {:.1f}% ({}ms → {:.1f}ms)", - performanceImprovement, cacheMissTime, avgCacheHitTime); + "Performance improvement: {}% ({}ms → {}ms)", + String.format("%.1f", performanceImprovement), + cacheMissTime, + String.format("%.1f", avgCacheHitTime)); // Assert significant performance improvement assertTrue( @@ -2964,8 +2966,8 @@ public class UserResourceTest extends EntityResourceTest { LOG.info("Concurrent test results:"); LOG.info(" Total calls: {} across {} threads", totalCalls, threadCount); LOG.info(" Total time: {}ms", totalConcurrentTime); - LOG.info(" Calls per second: {:.1f}", callsPerSecond); - LOG.info(" Avg concurrent call time: {:.2f}ms", avgConcurrentTime); + LOG.info(" Calls per second: {}", String.format("%.1f", callsPerSecond)); + LOG.info(" Avg concurrent call time: {}ms", String.format("%.2f", avgConcurrentTime)); // Performance assertions for concurrent access assertTrue( diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/security/CachedPermissionPerformanceTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/security/CachedPermissionPerformanceTest.java index 2c739ec81e5..789662607c6 100644 --- a/openmetadata-service/src/test/java/org/openmetadata/service/security/CachedPermissionPerformanceTest.java +++ b/openmetadata-service/src/test/java/org/openmetadata/service/security/CachedPermissionPerformanceTest.java @@ -247,8 +247,10 @@ class CachedPermissionPerformanceTest extends OpenMetadataApplicationTest { double improvement = (cacheMissTime - cacheHitTime) / (double) cacheMissTime * 100; LOG.info( - "Performance results - Cache miss: {:.2f}ms, Cache hit: {:.2f}ms, Improvement: {:.1f}%", - avgMissMs, avgHitMs, improvement); + "Performance results - Cache miss: {}ms, Cache hit: {}ms, Improvement: {}%", + String.format("%.2f", avgMissMs), + String.format("%.2f", avgHitMs), + String.format("%.1f", improvement)); // Cache should be at least 50% faster assertTrue( diff --git a/openmetadata-service/src/test/java/org/openmetadata/service/util/jdbi/OMSqlLoggerTest.java b/openmetadata-service/src/test/java/org/openmetadata/service/util/jdbi/OMSqlLoggerTest.java new file mode 100644 index 00000000000..173d2fc16c0 --- /dev/null +++ b/openmetadata-service/src/test/java/org/openmetadata/service/util/jdbi/OMSqlLoggerTest.java @@ -0,0 +1,48 @@ +/* + * Copyright 2021 Collate + * Licensed under the Apache License, Version 2.0 (the "License"); + * you may not use this file except in compliance with the License. + * You may obtain a copy of the License at + * http://www.apache.org/licenses/LICENSE-2.0 + * Unless required by applicable law or agreed to in writing, software + * distributed under the License is distributed on an "AS IS" BASIS, + * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. + * See the License for the specific language governing permissions and + * limitations under the License. + */ +package org.openmetadata.service.util.jdbi; + +import static org.junit.jupiter.api.Assertions.*; + +import org.junit.jupiter.api.AfterEach; +import org.junit.jupiter.api.BeforeEach; +import org.junit.jupiter.api.Test; + +class OMSqlLoggerTest { + + private long originalThreshold; + + @BeforeEach + void setUp() { + originalThreshold = OMSqlLogger.getSlowQueryThresholdMs(); + } + + @AfterEach + void tearDown() { + OMSqlLogger.setSlowQueryThresholdMs(originalThreshold); + } + + @Test + void testDefaultSlowQueryThreshold() { + assertEquals(100, OMSqlLogger.getSlowQueryThresholdMs()); + } + + @Test + void testSetSlowQueryThreshold() { + OMSqlLogger.setSlowQueryThresholdMs(500); + assertEquals(500, OMSqlLogger.getSlowQueryThresholdMs()); + + OMSqlLogger.setSlowQueryThresholdMs(50); + assertEquals(50, OMSqlLogger.getSlowQueryThresholdMs()); + } +}