mirror of
https://github.com/open-metadata/OpenMetadata
synced 2026-05-24 09:39:11 +00:00
Fix Search Index Contention (#18605)
* Fix Search Index Contention * Update searchIndexingAppConfig.json * Missing Error Logs and Stats --------- Co-authored-by: Pere Miquel Brull <peremiquelbrull@gmail.com>
This commit is contained in:
parent
f4fdafeb8a
commit
9a5dc61ca7
11 changed files with 179 additions and 113 deletions
|
|
@ -0,0 +1,5 @@
|
|||
-- Remove SearchIndexing for api Service, collection and endpoint
|
||||
DELETE er FROM entity_relationship er JOIN installed_apps ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
|
||||
DELETE er FROM entity_relationship er JOIN apps_marketplace ia ON er.fromId = ia.id OR er.toId = ia.id WHERE ia.name = 'SearchIndexingApplication';
|
||||
DELETE from installed_apps where name = 'SearchIndexingApplication';
|
||||
DELETE from apps_marketplace where name = 'SearchIndexingApplication';
|
||||
|
|
@ -0,0 +1,5 @@
|
|||
-- Remove SearchIndexing for api Service, collection and endpoint
|
||||
DELETE FROM entity_relationship er USING installed_apps ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
|
||||
DELETE FROM entity_relationship er USING apps_marketplace ia WHERE (er.fromId = ia.id OR er.toId = ia.id) AND ia.name = 'SearchIndexingApplication';
|
||||
DELETE from installed_apps where name = 'SearchIndexingApplication';
|
||||
DELETE from apps_marketplace where name = 'SearchIndexingApplication';
|
||||
|
|
@ -1,5 +1,6 @@
|
|||
package org.openmetadata.service.apps.bundles.searchIndex;
|
||||
|
||||
import static org.openmetadata.common.utils.CommonUtil.nullOrEmpty;
|
||||
import static org.openmetadata.service.Entity.API_COLLCECTION;
|
||||
import static org.openmetadata.service.Entity.API_ENDPOINT;
|
||||
import static org.openmetadata.service.Entity.API_SERVICE;
|
||||
|
|
@ -46,18 +47,14 @@ import static org.openmetadata.service.socket.WebSocketManager.SEARCH_INDEX_JOB_
|
|||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.ENTITY_TYPE_KEY;
|
||||
import static org.openmetadata.service.workflows.searchIndex.ReindexingUtil.isDataInsightIndex;
|
||||
|
||||
import java.util.ArrayList;
|
||||
import java.util.Collections;
|
||||
import java.util.HashMap;
|
||||
import java.util.List;
|
||||
import java.util.Map;
|
||||
import java.util.Set;
|
||||
import java.util.concurrent.BlockingQueue;
|
||||
import java.util.concurrent.CountDownLatch;
|
||||
import java.util.concurrent.ExecutionException;
|
||||
import java.util.concurrent.ExecutorService;
|
||||
import java.util.concurrent.Executors;
|
||||
import java.util.concurrent.Future;
|
||||
import java.util.concurrent.LinkedBlockingQueue;
|
||||
import java.util.concurrent.TimeUnit;
|
||||
import java.util.concurrent.atomic.AtomicReference;
|
||||
|
|
@ -65,7 +62,6 @@ import lombok.Getter;
|
|||
import lombok.extern.slf4j.Slf4j;
|
||||
import org.apache.commons.lang.exception.ExceptionUtils;
|
||||
import org.jetbrains.annotations.NotNull;
|
||||
import org.openmetadata.common.utils.CommonUtil;
|
||||
import org.openmetadata.schema.EntityInterface;
|
||||
import org.openmetadata.schema.EntityTimeSeriesInterface;
|
||||
import org.openmetadata.schema.analytics.ReportData;
|
||||
|
|
@ -154,22 +150,16 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
TEST_CASE_RESULT);
|
||||
|
||||
// Constants to replace magic numbers
|
||||
private static final int DEFAULT_PAYLOAD_SIZE = 100;
|
||||
private static final int DEFAULT_BATCH_SIZE = 5;
|
||||
private static final int DEFAULT_MAX_RETRIES = 1000;
|
||||
private static final int DEFAULT_TIMEOUT = 10000;
|
||||
private static final int MAX_CONSUMERS = 10;
|
||||
private BulkSink searchIndexSink;
|
||||
|
||||
@Getter private EventPublisherJob jobData;
|
||||
private final Object jobDataLock = new Object();
|
||||
private volatile boolean stopped = false;
|
||||
private ExecutorService producerExecutor;
|
||||
private ExecutorService consumerExecutor;
|
||||
private ExecutorService entityReaderExecutor;
|
||||
private ExecutorService producerExecutor;
|
||||
private final BlockingQueue<IndexingTask<?>> taskQueue = new LinkedBlockingQueue<>();
|
||||
private final AtomicReference<Stats> searchIndexStats = new AtomicReference<>();
|
||||
private final AtomicReference<Integer> batchSize = new AtomicReference<>(DEFAULT_BATCH_SIZE);
|
||||
private final AtomicReference<Integer> batchSize = new AtomicReference<>(5);
|
||||
|
||||
public SearchIndexApp(CollectionDAO collectionDAO, SearchRepository searchRepository) {
|
||||
super(collectionDAO, searchRepository);
|
||||
|
|
@ -245,20 +235,20 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
new OpenSearchIndexSink(
|
||||
searchRepository.getSearchClient(),
|
||||
jobData.getPayLoadSize(),
|
||||
DEFAULT_PAYLOAD_SIZE,
|
||||
DEFAULT_BATCH_SIZE,
|
||||
DEFAULT_MAX_RETRIES,
|
||||
DEFAULT_TIMEOUT);
|
||||
jobData.getMaxConcurrentRequests(),
|
||||
jobData.getMaxRetries(),
|
||||
jobData.getInitialBackoff(),
|
||||
jobData.getMaxBackoff());
|
||||
LOG.info("Initialized OpenSearchIndexSink.");
|
||||
} else {
|
||||
this.searchIndexSink =
|
||||
new ElasticSearchIndexSink(
|
||||
searchRepository.getSearchClient(),
|
||||
jobData.getPayLoadSize(),
|
||||
DEFAULT_PAYLOAD_SIZE,
|
||||
DEFAULT_BATCH_SIZE,
|
||||
DEFAULT_MAX_RETRIES,
|
||||
DEFAULT_TIMEOUT);
|
||||
jobData.getMaxConcurrentRequests(),
|
||||
jobData.getMaxRetries(),
|
||||
jobData.getInitialBackoff(),
|
||||
jobData.getMaxBackoff());
|
||||
LOG.info("Initialized ElasticSearchIndexSink.");
|
||||
}
|
||||
}
|
||||
|
|
@ -288,56 +278,68 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
}
|
||||
|
||||
private void performReindex(JobExecutionContext jobExecutionContext) throws InterruptedException {
|
||||
int numProducers = jobData.getEntities().size();
|
||||
int numConsumers = calculateNumberOfConsumers();
|
||||
int numProducers = jobData.getProducerThreads();
|
||||
int numConsumers = jobData.getConsumerThreads();
|
||||
LOG.info("Starting reindexing with {} producers and {} consumers.", numProducers, numConsumers);
|
||||
|
||||
producerExecutor = Executors.newFixedThreadPool(numProducers);
|
||||
consumerExecutor = Executors.newFixedThreadPool(numConsumers);
|
||||
entityReaderExecutor = Executors.newCachedThreadPool();
|
||||
CountDownLatch producerLatch = new CountDownLatch(numProducers);
|
||||
producerExecutor = Executors.newFixedThreadPool(numProducers);
|
||||
|
||||
try {
|
||||
for (String entityType : jobData.getEntities()) {
|
||||
producerExecutor.submit(
|
||||
() -> {
|
||||
try {
|
||||
reCreateIndexes(entityType);
|
||||
processEntityType(entityType);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error processing entity type {}", entityType, e);
|
||||
} finally {
|
||||
producerLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
for (int i = 0; i < numConsumers; i++) {
|
||||
consumerExecutor.submit(
|
||||
() -> {
|
||||
try {
|
||||
consumeTasks(jobExecutionContext);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.warn("Consumer thread interrupted.");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
producerLatch.await();
|
||||
sendPoisonPills(numConsumers);
|
||||
processEntityReindex(jobExecutionContext);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error during reindexing process.", e);
|
||||
throw e;
|
||||
} finally {
|
||||
shutdownExecutor(producerExecutor, "ProducerExecutor", 1, TimeUnit.HOURS);
|
||||
shutdownExecutor(producerExecutor, "ReaderExecutor", 1, TimeUnit.MINUTES);
|
||||
shutdownExecutor(consumerExecutor, "ConsumerExecutor", 20, TimeUnit.SECONDS);
|
||||
shutdownExecutor(entityReaderExecutor, "ReaderExecutor", 20, TimeUnit.SECONDS);
|
||||
}
|
||||
}
|
||||
|
||||
private int calculateNumberOfConsumers() {
|
||||
return Math.min(Runtime.getRuntime().availableProcessors(), MAX_CONSUMERS);
|
||||
private void processEntityReindex(JobExecutionContext jobExecutionContext)
|
||||
throws InterruptedException {
|
||||
int numConsumers = jobData.getConsumerThreads();
|
||||
CountDownLatch producerLatch = new CountDownLatch(getTotalLatchCount(jobData.getEntities()));
|
||||
for (String entityType : jobData.getEntities()) {
|
||||
try {
|
||||
reCreateIndexes(entityType);
|
||||
int totalEntityRecords = getTotalEntityRecords(entityType);
|
||||
Source<?> source = createSource(entityType);
|
||||
int noOfThreads = calculateNumberOfThreads(totalEntityRecords);
|
||||
if (totalEntityRecords > 0) {
|
||||
for (int i = 0; i < noOfThreads; i++) {
|
||||
int currentOffset = i * batchSize.get();
|
||||
producerExecutor.submit(
|
||||
() -> {
|
||||
try {
|
||||
processReadTask(entityType, source, currentOffset);
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error processing entity type {}", entityType, e);
|
||||
} finally {
|
||||
producerLatch.countDown();
|
||||
}
|
||||
});
|
||||
}
|
||||
}
|
||||
} catch (Exception e) {
|
||||
LOG.error("Error processing entity type {}", entityType, e);
|
||||
}
|
||||
}
|
||||
|
||||
for (int i = 0; i < numConsumers; i++) {
|
||||
consumerExecutor.submit(
|
||||
() -> {
|
||||
try {
|
||||
consumeTasks(jobExecutionContext);
|
||||
} catch (InterruptedException e) {
|
||||
Thread.currentThread().interrupt();
|
||||
LOG.warn("Consumer thread interrupted.");
|
||||
}
|
||||
});
|
||||
}
|
||||
|
||||
producerLatch.await();
|
||||
sendPoisonPills(numConsumers);
|
||||
}
|
||||
|
||||
private void consumeTasks(JobExecutionContext jobExecutionContext) throws InterruptedException {
|
||||
|
|
@ -548,29 +550,42 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
|
||||
private void processTask(IndexingTask<?> task, JobExecutionContext jobExecutionContext) {
|
||||
String entityType = task.entityType();
|
||||
List<?> entities = task.entities();
|
||||
ResultList<?> entities = task.entities();
|
||||
Map<String, Object> contextData = new HashMap<>();
|
||||
contextData.put(ENTITY_TYPE_KEY, entityType);
|
||||
|
||||
try {
|
||||
if (!TIME_SERIES_ENTITIES.contains(entityType)) {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<EntityInterface> entityList = (List<EntityInterface>) entities;
|
||||
List<EntityInterface> entityList = (List<EntityInterface>) entities.getData();
|
||||
searchIndexSink.write(entityList, contextData);
|
||||
} else {
|
||||
@SuppressWarnings("unchecked")
|
||||
List<EntityTimeSeriesInterface> entityList = (List<EntityTimeSeriesInterface>) entities;
|
||||
List<EntityTimeSeriesInterface> entityList =
|
||||
(List<EntityTimeSeriesInterface>) entities.getData();
|
||||
searchIndexSink.write(entityList, contextData);
|
||||
}
|
||||
|
||||
// After successful write, create a new StepStats for the current batch
|
||||
StepStats currentEntityStats = new StepStats();
|
||||
currentEntityStats.setSuccessRecords(entities.size());
|
||||
currentEntityStats.setFailedRecords(0);
|
||||
currentEntityStats.setSuccessRecords(entities.getData().size());
|
||||
currentEntityStats.setFailedRecords(entities.getErrors().size());
|
||||
// Do NOT set Total Records here
|
||||
|
||||
// Update statistics in a thread-safe manner
|
||||
synchronized (jobDataLock) {
|
||||
if (!entities.getErrors().isEmpty()) {
|
||||
jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR);
|
||||
jobData.setFailure(
|
||||
new IndexingError()
|
||||
.withErrorSource(IndexingError.ErrorSource.READER)
|
||||
.withSubmittedCount(batchSize.get())
|
||||
.withSuccessCount(entities.getData().size())
|
||||
.withFailedCount(entities.getErrors().size())
|
||||
.withMessage(
|
||||
"Issues in Reading A Batch For Entities. Check Errors Corresponding to Entities.")
|
||||
.withFailedEntities(entities.getErrors()));
|
||||
}
|
||||
updateStats(entityType, currentEntityStats);
|
||||
}
|
||||
|
||||
|
|
@ -584,7 +599,7 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
|
||||
StepStats failedEntityStats = new StepStats();
|
||||
failedEntityStats.setSuccessRecords(0);
|
||||
failedEntityStats.setFailedRecords(entities.size());
|
||||
failedEntityStats.setFailedRecords(entities.getData().size());
|
||||
updateStats(entityType, failedEntityStats);
|
||||
}
|
||||
LOG.error("Unexpected error during processing task for entity {}", entityType, e);
|
||||
|
|
@ -601,14 +616,14 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
if (!TIME_SERIES_ENTITIES.contains(entityType)) {
|
||||
PaginatedEntitiesSource paginatedSource =
|
||||
new PaginatedEntitiesSource(entityType, batchSize.get(), fields);
|
||||
if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) {
|
||||
if (!nullOrEmpty(jobData.getAfterCursor())) {
|
||||
paginatedSource.getCursor().set(jobData.getAfterCursor());
|
||||
}
|
||||
source = paginatedSource;
|
||||
} else {
|
||||
PaginatedEntityTimeSeriesSource paginatedSource =
|
||||
new PaginatedEntityTimeSeriesSource(entityType, batchSize.get(), fields);
|
||||
if (!CommonUtil.nullOrEmpty(jobData.getAfterCursor())) {
|
||||
if (!nullOrEmpty(jobData.getAfterCursor())) {
|
||||
paginatedSource.getCursor().set(jobData.getAfterCursor());
|
||||
}
|
||||
source = paginatedSource;
|
||||
|
|
@ -617,17 +632,14 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
return source;
|
||||
}
|
||||
|
||||
private void processEntityType(String entityType)
|
||||
throws InterruptedException, ExecutionException {
|
||||
int totalEntityRecords = getTotalEntityRecords(entityType);
|
||||
if (totalEntityRecords > 0) {
|
||||
Source<?> source = createSource(entityType);
|
||||
int loadPerThread = calculateLoadPerThread(totalEntityRecords);
|
||||
List<Future<?>> futures = submitReaderTasks(entityType, source, loadPerThread);
|
||||
for (Future<?> future : futures) {
|
||||
future.get();
|
||||
}
|
||||
private int getTotalLatchCount(Set<String> entities) {
|
||||
int totalCount = 0;
|
||||
for (String entityType : entities) {
|
||||
int totalEntityRecords = getTotalEntityRecords(entityType);
|
||||
int noOfThreads = calculateNumberOfThreads(totalEntityRecords);
|
||||
totalCount += noOfThreads;
|
||||
}
|
||||
return totalCount;
|
||||
}
|
||||
|
||||
private int getTotalEntityRecords(String entityType) {
|
||||
|
|
@ -636,24 +648,12 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
.getTotalRecords();
|
||||
}
|
||||
|
||||
private List<Future<?>> submitReaderTasks(
|
||||
String entityType, Source<?> source, int loadPerThread) {
|
||||
List<Future<?>> futures = new ArrayList<>();
|
||||
for (int i = 0; i < loadPerThread; i++) {
|
||||
int currentOffset = i * batchSize.get();
|
||||
Future<?> future =
|
||||
entityReaderExecutor.submit(() -> processReadTask(entityType, source, currentOffset));
|
||||
futures.add(future);
|
||||
}
|
||||
return futures;
|
||||
}
|
||||
|
||||
private void processReadTask(String entityType, Source<?> source, int offset) {
|
||||
try {
|
||||
Object resultList = source.readWithCursor(RestUtil.encodeCursor(String.valueOf(offset)));
|
||||
if (resultList != null) {
|
||||
List<?> entities = extractEntities(entityType, resultList);
|
||||
if (entities != null && !entities.isEmpty()) {
|
||||
ResultList<?> entities = extractEntities(entityType, resultList);
|
||||
if (!nullOrEmpty(entities.getData())) {
|
||||
LOG.info(
|
||||
"Creating Indexing Task for entityType: {}, current offset: {}", entityType, offset);
|
||||
createIndexingTask(entityType, entities, offset);
|
||||
|
|
@ -664,16 +664,29 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
LOG.warn("Reader thread interrupted for entityType: {}", entityType);
|
||||
} catch (SearchIndexException e) {
|
||||
LOG.error("Error while reading source for entityType: {}", entityType, e);
|
||||
synchronized (jobDataLock) {
|
||||
jobData.setStatus(EventPublisherJob.Status.ACTIVE_ERROR);
|
||||
jobData.setFailure(e.getIndexingError());
|
||||
int remainingRecords = getRemainingRecordsToProcess(entityType);
|
||||
if (remainingRecords - batchSize.get() <= 0) {
|
||||
updateStats(
|
||||
entityType,
|
||||
new StepStats().withSuccessRecords(0).withFailedRecords(remainingRecords));
|
||||
} else {
|
||||
updateStats(
|
||||
entityType, new StepStats().withSuccessRecords(0).withFailedRecords(batchSize.get()));
|
||||
}
|
||||
}
|
||||
}
|
||||
}
|
||||
|
||||
private void createIndexingTask(String entityType, List<?> entities, int offset)
|
||||
private void createIndexingTask(String entityType, ResultList<?> entities, int offset)
|
||||
throws InterruptedException {
|
||||
IndexingTask<?> task = new IndexingTask<>(entityType, entities, offset);
|
||||
taskQueue.put(task);
|
||||
}
|
||||
|
||||
private synchronized int calculateLoadPerThread(int totalEntityRecords) {
|
||||
private synchronized int calculateNumberOfThreads(int totalEntityRecords) {
|
||||
int mod = totalEntityRecords % batchSize.get();
|
||||
if (mod == 0) {
|
||||
return totalEntityRecords / batchSize.get();
|
||||
|
|
@ -683,16 +696,26 @@ public class SearchIndexApp extends AbstractNativeApplication {
|
|||
}
|
||||
|
||||
@SuppressWarnings("unchecked")
|
||||
private List<?> extractEntities(String entityType, Object resultList) {
|
||||
private ResultList<?> extractEntities(String entityType, Object resultList) {
|
||||
if (!TIME_SERIES_ENTITIES.contains(entityType)) {
|
||||
return ((ResultList<? extends EntityInterface>) resultList).getData();
|
||||
return ((ResultList<? extends EntityInterface>) resultList);
|
||||
} else {
|
||||
return ((ResultList<? extends EntityTimeSeriesInterface>) resultList).getData();
|
||||
return ((ResultList<? extends EntityTimeSeriesInterface>) resultList);
|
||||
}
|
||||
}
|
||||
|
||||
private record IndexingTask<T>(String entityType, List<T> entities, int currentEntityOffset) {
|
||||
private synchronized int getRemainingRecordsToProcess(String entityType) {
|
||||
StepStats entityStats =
|
||||
((StepStats)
|
||||
searchIndexStats.get().getEntityStats().getAdditionalProperties().get(entityType));
|
||||
return entityStats.getTotalRecords()
|
||||
- entityStats.getFailedRecords()
|
||||
- entityStats.getSuccessRecords();
|
||||
}
|
||||
|
||||
private record IndexingTask<T>(
|
||||
String entityType, ResultList<T> entities, int currentEntityOffset) {
|
||||
public static final IndexingTask<?> POISON_PILL =
|
||||
new IndexingTask<>(null, Collections.emptyList(), -1);
|
||||
new IndexingTask<>(null, new ResultList<>(), -1);
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -278,10 +278,15 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
|||
description = "Flag to determine if indexes should be recreated.")
|
||||
boolean recreateIndexes,
|
||||
@Option(
|
||||
names = {"--num-threads"},
|
||||
names = {"--producer-threads"},
|
||||
defaultValue = "10",
|
||||
description = "Number of threads to use for processing.")
|
||||
int numThreads,
|
||||
int producerThreads,
|
||||
@Option(
|
||||
names = {"--consumer-threads"},
|
||||
defaultValue = "10",
|
||||
description = "Number of threads to use for processing.")
|
||||
int consumerThreads,
|
||||
@Option(
|
||||
names = {"--back-off"},
|
||||
defaultValue = "1000",
|
||||
|
|
@ -320,7 +325,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
|||
batchSize,
|
||||
payloadSize,
|
||||
recreateIndexes,
|
||||
numThreads,
|
||||
producerThreads,
|
||||
consumerThreads,
|
||||
backOff,
|
||||
maxBackOff,
|
||||
maxRequests,
|
||||
|
|
@ -336,7 +342,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
|||
int batchSize,
|
||||
long payloadSize,
|
||||
boolean recreateIndexes,
|
||||
int numThreads,
|
||||
int producerThreads,
|
||||
int consumerThreads,
|
||||
int backOff,
|
||||
int maxBackOff,
|
||||
int maxRequests,
|
||||
|
|
@ -354,7 +361,8 @@ public class OpenMetadataOperations implements Callable<Integer> {
|
|||
.withBatchSize(batchSize)
|
||||
.withPayLoadSize(payloadSize)
|
||||
.withRecreateIndex(recreateIndexes)
|
||||
.withNumberOfThreads(numThreads)
|
||||
.withProducerThreads(producerThreads)
|
||||
.withConsumerThreads(consumerThreads)
|
||||
.withInitialBackoff(backOff)
|
||||
.withMaxBackoff(maxBackOff)
|
||||
.withMaxConcurrentRequests(maxRequests)
|
||||
|
|
|
|||
|
|
@ -47,7 +47,8 @@
|
|||
"recreateIndex": false,
|
||||
"batchSize": "100",
|
||||
"payLoadSize": 104857600,
|
||||
"numberOfThreads": 5,
|
||||
"producerThreads": 10,
|
||||
"consumerThreads": 10,
|
||||
"maxConcurrentRequests": 100,
|
||||
"maxRetries": 3,
|
||||
"initialBackoff": 1000,
|
||||
|
|
|
|||
|
|
@ -61,6 +61,12 @@
|
|||
"recreateIndex": false,
|
||||
"batchSize": "100",
|
||||
"payLoadSize": 104857600,
|
||||
"producerThreads": 10,
|
||||
"consumerThreads": 10,
|
||||
"maxConcurrentRequests": 100,
|
||||
"maxRetries": 3,
|
||||
"initialBackoff": 1000,
|
||||
"maxBackoff": 10000,
|
||||
"searchIndexMappingLanguage": "EN"
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -44,11 +44,17 @@
|
|||
"existingJavaType": "java.lang.Long",
|
||||
"default": 104857600
|
||||
},
|
||||
"numberOfThreads": {
|
||||
"title": "Number of Threads",
|
||||
"producerThreads": {
|
||||
"title": "Number of Producer Threads",
|
||||
"description": "Number of threads to use for reindexing",
|
||||
"type": "integer",
|
||||
"default": 5
|
||||
"default": 10
|
||||
},
|
||||
"consumerThreads": {
|
||||
"title": "Number of Consumer Threads",
|
||||
"description": "Number of threads to use for reindexing",
|
||||
"type": "integer",
|
||||
"default": 10
|
||||
},
|
||||
"maxConcurrentRequests": {
|
||||
"title": "Max Concurrent Requests",
|
||||
|
|
@ -80,4 +86,4 @@
|
|||
}
|
||||
},
|
||||
"additionalProperties": false
|
||||
}
|
||||
}
|
||||
|
|
|
|||
|
|
@ -108,11 +108,17 @@
|
|||
"existingJavaType": "java.lang.Long",
|
||||
"default": 104857600
|
||||
},
|
||||
"numberOfThreads": {
|
||||
"title": "Number of Threads",
|
||||
"description": "Number of threads to use for reindexing",
|
||||
"producerThreads": {
|
||||
"title": "Number of Producer Threads to use",
|
||||
"description": "Number of producer threads to use for reindexing",
|
||||
"type": "integer",
|
||||
"default": 5
|
||||
"default": 10
|
||||
},
|
||||
"consumerThreads": {
|
||||
"title": "Number of Consumer Threads to use",
|
||||
"description": "Number of consumer threads to use for reindexing",
|
||||
"type": "integer",
|
||||
"default": 10
|
||||
},
|
||||
"maxConcurrentRequests": {
|
||||
"title": "Max Concurrent Requests",
|
||||
|
|
|
|||
|
|
@ -16,8 +16,14 @@
|
|||
"type": "integer",
|
||||
"default": 104857600
|
||||
},
|
||||
"numberOfThreads": {
|
||||
"title": "Number of Threads",
|
||||
"producerThreads": {
|
||||
"title": "Number of Producer Threads",
|
||||
"description": "Number of threads to use for reindexing",
|
||||
"type": "integer",
|
||||
"default": 5
|
||||
},
|
||||
"consumerThreads": {
|
||||
"title": "Number of Consumer Threads",
|
||||
"description": "Number of threads to use for reindexing",
|
||||
"type": "integer",
|
||||
"default": 5
|
||||
|
|
|
|||
Loading…
Reference in a new issue