OpenMetadata/docker/development/distributed-test/scripts/load-test-data.sh
Sriharsha Chintalapani 89f627da81
Distributed Search Indexing with Push Notifications (#24939)
* Add Distributed Indexing in Multi-Server scenarios

* Add Distributed Indexing in Multi-Server scenarios

* Update generated TypeScript types

* Handle Servers leaving and joining

* Update generated TypeScript types

* spotless fix

* Refactor Code for Single Server and Multiple Server

* Add Metrics and Search Index Orphaned Cleanup

* Add Language

* Add Test settings

* Add Test data

* Add Test data

* Update generated TypeScript types

* Add Load Test for more entities

* Add Stats fix

* Add server information

* Fix Staging INdex unavailable to DistributedJobParticipant

* Fix Stats issue

* Align Tests

* Fix Stats and Error Handling

* participant stat fix

* Fix coordinator stats

* Add E2E failure tests

* Fix Stats for Reader and Sink

* Added flush for sinking stats

* Add language label

* Fix Entity Build Errors

* Missing commit

* Update generated TypeScript types

* Change runId to serverId

* Fix test failures

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
Co-authored-by: Mohit Yadav <105265192+mohityadav766@users.noreply.github.com>
Co-authored-by: mohitdeuex <mohit.y@deuexsolutions.com>
2026-01-23 06:12:05 +05:30

960 lines
35 KiB
Bash
Executable file

#!/bin/bash
# Load test data for distributed indexing testing
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# Default values
SERVER_URL="http://localhost:8585"
NUM_TABLES=20000
NUM_DASHBOARDS=5000
NUM_CHARTS=10000
NUM_PIPELINES=3000
NUM_TOPICS=3000
NUM_MLMODELS=2000
NUM_CONTAINERS=2000
NUM_SEARCH_INDEXES=1000
NUM_GLOSSARIES=50
NUM_TERMS_PER_GLOSSARY=100
NUM_CLASSIFICATIONS=20
NUM_TAGS_PER_CLASSIFICATION=50
NUM_DATABASES=10
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--tables)
NUM_TABLES="$2"
shift 2
;;
--dashboards)
NUM_DASHBOARDS="$2"
shift 2
;;
--charts)
NUM_CHARTS="$2"
shift 2
;;
--pipelines)
NUM_PIPELINES="$2"
shift 2
;;
--topics)
NUM_TOPICS="$2"
shift 2
;;
--mlmodels)
NUM_MLMODELS="$2"
shift 2
;;
--containers)
NUM_CONTAINERS="$2"
shift 2
;;
--search-indexes)
NUM_SEARCH_INDEXES="$2"
shift 2
;;
--glossaries)
NUM_GLOSSARIES="$2"
shift 2
;;
--terms-per-glossary)
NUM_TERMS_PER_GLOSSARY="$2"
shift 2
;;
--classifications)
NUM_CLASSIFICATIONS="$2"
shift 2
;;
--tags-per-classification)
NUM_TAGS_PER_CLASSIFICATION="$2"
shift 2
;;
--databases)
NUM_DATABASES="$2"
shift 2
;;
--server)
SERVER_URL="$2"
shift 2
;;
--quick)
# Quick mode for rapid testing - smaller dataset
NUM_TABLES=3000
NUM_DASHBOARDS=1000
NUM_CHARTS=2000
NUM_PIPELINES=500
NUM_TOPICS=500
NUM_MLMODELS=300
NUM_CONTAINERS=300
NUM_SEARCH_INDEXES=200
NUM_GLOSSARIES=10
NUM_TERMS_PER_GLOSSARY=50
NUM_CLASSIFICATIONS=5
NUM_TAGS_PER_CLASSIFICATION=20
shift
;;
-h|--help)
echo "Usage: $0 [OPTIONS]"
echo ""
echo "Options:"
echo " --tables NUM Number of tables to create (default: 20000)"
echo " --dashboards NUM Number of dashboards to create (default: 5000)"
echo " --charts NUM Number of charts to create (default: 10000)"
echo " --pipelines NUM Number of pipelines to create (default: 3000)"
echo " --topics NUM Number of topics to create (default: 3000)"
echo " --mlmodels NUM Number of ML models to create (default: 2000)"
echo " --containers NUM Number of containers to create (default: 2000)"
echo " --search-indexes NUM Number of search indexes to create (default: 1000)"
echo " --glossaries NUM Number of glossaries to create (default: 50)"
echo " --terms-per-glossary NUM Number of terms per glossary (default: 100)"
echo " --classifications NUM Number of classifications to create (default: 20)"
echo " --tags-per-classification Number of tags per classification (default: 50)"
echo " --databases NUM Number of databases (default: 10)"
echo " --server URL Target server URL (default: http://localhost:8585)"
echo " --quick Quick mode with smaller dataset (~10k entities)"
echo " -h, --help Show this help message"
exit 0
;;
*)
echo "Unknown option: $1"
exit 1
;;
esac
done
NUM_GLOSSARY_TERMS=$((NUM_GLOSSARIES * NUM_TERMS_PER_GLOSSARY))
NUM_TAGS=$((NUM_CLASSIFICATIONS * NUM_TAGS_PER_CLASSIFICATION))
TOTAL=$((NUM_TABLES + NUM_DASHBOARDS + NUM_CHARTS + NUM_PIPELINES + NUM_TOPICS + NUM_MLMODELS + NUM_CONTAINERS + NUM_SEARCH_INDEXES + NUM_GLOSSARIES + NUM_GLOSSARY_TERMS + NUM_CLASSIFICATIONS + NUM_TAGS))
echo "======================================"
echo "Loading Test Data for Distributed Indexing"
echo "======================================"
echo "Server: $SERVER_URL"
echo ""
echo "Entity counts:"
echo " - Tables: $NUM_TABLES"
echo " - Dashboards: $NUM_DASHBOARDS"
echo " - Charts: $NUM_CHARTS"
echo " - Pipelines: $NUM_PIPELINES"
echo " - Topics: $NUM_TOPICS"
echo " - ML Models: $NUM_MLMODELS"
echo " - Containers: $NUM_CONTAINERS"
echo " - Search Indexes: $NUM_SEARCH_INDEXES"
echo " - Glossaries: $NUM_GLOSSARIES"
echo " - Glossary Terms: $NUM_GLOSSARY_TERMS"
echo " - Classifications: $NUM_CLASSIFICATIONS"
echo " - Tags: $NUM_TAGS"
echo " --------------------------"
echo " - Total: $TOTAL"
echo ""
# Use Python with urllib (built-in, no extra packages needed)
python3 << EOF
import urllib.request
import urllib.error
import json
import sys
import time
import random
from concurrent.futures import ThreadPoolExecutor, as_completed
SERVER_URL = "${SERVER_URL}"
NUM_TABLES = ${NUM_TABLES}
NUM_DASHBOARDS = ${NUM_DASHBOARDS}
NUM_CHARTS = ${NUM_CHARTS}
NUM_PIPELINES = ${NUM_PIPELINES}
NUM_TOPICS = ${NUM_TOPICS}
NUM_MLMODELS = ${NUM_MLMODELS}
NUM_CONTAINERS = ${NUM_CONTAINERS}
NUM_SEARCH_INDEXES = ${NUM_SEARCH_INDEXES}
NUM_GLOSSARIES = ${NUM_GLOSSARIES}
NUM_TERMS_PER_GLOSSARY = ${NUM_TERMS_PER_GLOSSARY}
NUM_CLASSIFICATIONS = ${NUM_CLASSIFICATIONS}
NUM_TAGS_PER_CLASSIFICATION = ${NUM_TAGS_PER_CLASSIFICATION}
NUM_DATABASES = ${NUM_DATABASES}
print(f"Connecting to {SERVER_URL}...")
sys.stdout.flush()
def make_request(url, data=None, method="GET", headers=None):
if headers is None:
headers = {}
headers["Content-Type"] = "application/json"
if data:
data = json.dumps(data).encode('utf-8')
req = urllib.request.Request(url, data=data, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=60) as response:
return response.status, json.loads(response.read().decode('utf-8'))
except urllib.error.HTTPError as e:
try:
body = e.read().decode('utf-8')
except:
body = str(e)
return e.code, body
except Exception as e:
return 0, str(e)
# Use admin JWT token directly
token = "eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg"
print("Using admin JWT token for authentication.")
headers = {"Content-Type": "application/json"}
if token:
headers["Authorization"] = f"Bearer {token}"
overall_start = time.time()
stats = {
"tables": 0, "dashboards": 0, "charts": 0, "pipelines": 0, "topics": 0,
"mlmodels": 0, "containers": 0, "searchIndexes": 0,
"glossaries": 0, "glossaryTerms": 0, "classifications": 0, "tags": 0
}
# Store created entity FQNs for linking
dashboard_fqns = []
# ========== CLASSIFICATIONS & TAGS ==========
if NUM_CLASSIFICATIONS > 0:
print("")
print("=" * 50)
print("Creating Classifications and Tags")
print("=" * 50)
created_classifications = 0
created_tags = 0
start_time = time.time()
for i in range(NUM_CLASSIFICATIONS):
classification_data = {
"name": f"TestClassification_{i:04d}",
"description": f"Test classification {i} for distributed indexing testing"
}
status, resp = make_request(
f"{SERVER_URL}/api/v1/classifications",
data=classification_data,
method="PUT",
headers=headers
)
if status in [200, 201] and isinstance(resp, dict):
created_classifications += 1
classification_fqn = resp["fullyQualifiedName"]
# Create tags for this classification
def create_tag(args):
class_fqn, tag_idx = args
tag_data = {
"name": f"Tag_{tag_idx:04d}",
"classification": class_fqn,
"description": f"Test tag {tag_idx} in classification"
}
status, _ = make_request(f"{SERVER_URL}/api/v1/tags", data=tag_data, method="PUT", headers=headers)
return status in [200, 201]
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(create_tag, (classification_fqn, j)) for j in range(NUM_TAGS_PER_CLASSIFICATION)]
for future in as_completed(futures):
if future.result():
created_tags += 1
if (i + 1) % 5 == 0 or i == NUM_CLASSIFICATIONS - 1:
elapsed = time.time() - start_time
print(f" Classifications: {created_classifications}/{NUM_CLASSIFICATIONS}, Tags: {created_tags}/{NUM_CLASSIFICATIONS * NUM_TAGS_PER_CLASSIFICATION}")
sys.stdout.flush()
stats["classifications"] = created_classifications
stats["tags"] = created_tags
print(f"Classifications completed: {created_classifications} created")
print(f"Tags completed: {created_tags} created")
# ========== GLOSSARIES & TERMS ==========
if NUM_GLOSSARIES > 0:
print("")
print("=" * 50)
print("Creating Glossaries and Terms")
print("=" * 50)
created_glossaries = 0
created_terms = 0
start_time = time.time()
for i in range(NUM_GLOSSARIES):
glossary_data = {
"name": f"TestGlossary_{i:04d}",
"displayName": f"Test Glossary {i}",
"description": f"Test glossary {i} for distributed indexing testing"
}
status, resp = make_request(
f"{SERVER_URL}/api/v1/glossaries",
data=glossary_data,
method="PUT",
headers=headers
)
if status in [200, 201] and isinstance(resp, dict):
created_glossaries += 1
glossary_fqn = resp["fullyQualifiedName"]
# Create terms for this glossary
def create_term(args):
gloss_fqn, term_idx = args
term_data = {
"name": f"Term_{term_idx:04d}",
"glossary": gloss_fqn,
"displayName": f"Term {term_idx}",
"description": f"Test glossary term {term_idx}"
}
status, _ = make_request(f"{SERVER_URL}/api/v1/glossaryTerms", data=term_data, method="PUT", headers=headers)
return status in [200, 201]
with ThreadPoolExecutor(max_workers=10) as executor:
futures = [executor.submit(create_term, (glossary_fqn, j)) for j in range(NUM_TERMS_PER_GLOSSARY)]
for future in as_completed(futures):
if future.result():
created_terms += 1
if (i + 1) % 10 == 0 or i == NUM_GLOSSARIES - 1:
elapsed = time.time() - start_time
print(f" Glossaries: {created_glossaries}/{NUM_GLOSSARIES}, Terms: {created_terms}/{NUM_GLOSSARIES * NUM_TERMS_PER_GLOSSARY}")
sys.stdout.flush()
stats["glossaries"] = created_glossaries
stats["glossaryTerms"] = created_terms
print(f"Glossaries completed: {created_glossaries} created")
print(f"Glossary Terms completed: {created_terms} created")
# ========== TABLES ==========
if NUM_TABLES > 0:
print("")
print("=" * 50)
print("Creating Tables")
print("=" * 50)
# Create database service
print("Creating database service...")
sys.stdout.flush()
service_data = {
"name": "test-service-distributed",
"serviceType": "Mysql",
"connection": {
"config": {
"type": "Mysql",
"username": "test",
"authType": {"password": "test"},
"hostPort": "localhost:3306"
}
}
}
status, resp = make_request(
f"{SERVER_URL}/api/v1/services/databaseServices",
data=service_data,
method="PUT",
headers=headers
)
if status in [200, 201] and isinstance(resp, dict):
db_service_fqn = resp["fullyQualifiedName"]
print(f"Database service created: {db_service_fqn}")
else:
print(f"Failed to create database service: {status} - {resp}")
sys.exit(1)
# Create databases
print(f"Creating {NUM_DATABASES} databases...")
sys.stdout.flush()
database_fqns = []
for i in range(NUM_DATABASES):
db_data = {"name": f"test_db_{i:04d}", "service": db_service_fqn}
status, resp = make_request(f"{SERVER_URL}/api/v1/databases", data=db_data, method="PUT", headers=headers)
if status in [200, 201] and isinstance(resp, dict):
database_fqns.append(resp["fullyQualifiedName"])
print(f"Created {len(database_fqns)} databases")
# Create schemas
print("Creating schemas...")
schema_fqns = []
for db_fqn in database_fqns:
schema_data = {"name": "public", "database": db_fqn}
status, resp = make_request(f"{SERVER_URL}/api/v1/databaseSchemas", data=schema_data, method="PUT", headers=headers)
if status in [200, 201] and isinstance(resp, dict):
schema_fqns.append(resp["fullyQualifiedName"])
print(f"Created {len(schema_fqns)} schemas")
if not schema_fqns:
print("ERROR: No schemas created. Cannot continue with tables.")
else:
# Create tables
print(f"Creating {NUM_TABLES} tables...")
sys.stdout.flush()
tables_per_schema = NUM_TABLES // len(schema_fqns)
created = 0
failed = 0
start_time = time.time()
def create_table(args):
schema_fqn, table_idx = args
table_data = {
"name": f"table_{table_idx:06d}",
"databaseSchema": schema_fqn,
"columns": [
{"name": "id", "dataType": "BIGINT", "description": "Primary key"},
{"name": "name", "dataType": "VARCHAR", "dataLength": 255, "description": "Name field"},
{"name": "created_at", "dataType": "TIMESTAMP", "description": "Creation timestamp"},
{"name": "data", "dataType": "JSON", "description": "JSON data field"}
]
}
status, _ = make_request(f"{SERVER_URL}/api/v1/tables", data=table_data, method="PUT", headers=headers)
return status in [200, 201]
# Prepare tasks
tasks = []
table_idx = 0
for schema_fqn in schema_fqns:
for _ in range(tables_per_schema):
tasks.append((schema_fqn, table_idx))
table_idx += 1
if table_idx >= NUM_TABLES:
break
if table_idx >= NUM_TABLES:
break
# Execute with thread pool
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(create_table, task): task for task in tasks}
for future in as_completed(futures):
if future.result():
created += 1
else:
failed += 1
total = created + failed
if total % 1000 == 0 or total == len(tasks):
elapsed = time.time() - start_time
rate = total / elapsed if elapsed > 0 else 0
print(f" Tables: {total}/{NUM_TABLES} ({rate:.1f}/sec) - Created: {created}, Failed: {failed}")
sys.stdout.flush()
stats["tables"] = created
print(f"Tables completed: {created} created, {failed} failed")
# ========== DASHBOARDS ==========
if NUM_DASHBOARDS > 0:
print("")
print("=" * 50)
print("Creating Dashboards")
print("=" * 50)
# Create dashboard service
print("Creating dashboard service...")
sys.stdout.flush()
dashboard_service_data = {
"name": "test-dashboard-service",
"serviceType": "Looker",
"connection": {
"config": {
"type": "Looker",
"clientId": "test-client-id",
"clientSecret": "test-client-secret",
"hostPort": "https://looker.example.com"
}
}
}
status, resp = make_request(
f"{SERVER_URL}/api/v1/services/dashboardServices",
data=dashboard_service_data,
method="PUT",
headers=headers
)
if status in [200, 201] and isinstance(resp, dict):
dashboard_service_fqn = resp["fullyQualifiedName"]
print(f"Dashboard service created: {dashboard_service_fqn}")
else:
print(f"Failed to create dashboard service: {status} - {resp}")
dashboard_service_fqn = None
if dashboard_service_fqn:
# Create dashboards
print(f"Creating {NUM_DASHBOARDS} dashboards...")
sys.stdout.flush()
created = 0
failed = 0
start_time = time.time()
def create_dashboard(idx):
dashboard_data = {
"name": f"dashboard_{idx:06d}",
"service": dashboard_service_fqn,
"displayName": f"Test Dashboard {idx}",
"description": f"Auto-generated test dashboard {idx} for distributed indexing testing"
}
status, resp = make_request(f"{SERVER_URL}/api/v1/dashboards", data=dashboard_data, method="PUT", headers=headers)
if status in [200, 201] and isinstance(resp, dict):
return resp.get("fullyQualifiedName")
return None
# Execute with thread pool
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(create_dashboard, i): i for i in range(NUM_DASHBOARDS)}
for future in as_completed(futures):
result = future.result()
if result:
created += 1
dashboard_fqns.append(result)
else:
failed += 1
total = created + failed
if total % 1000 == 0 or total == NUM_DASHBOARDS:
elapsed = time.time() - start_time
rate = total / elapsed if elapsed > 0 else 0
print(f" Dashboards: {total}/{NUM_DASHBOARDS} ({rate:.1f}/sec) - Created: {created}, Failed: {failed}")
sys.stdout.flush()
stats["dashboards"] = created
print(f"Dashboards completed: {created} created, {failed} failed")
# ========== CHARTS ==========
if NUM_CHARTS > 0 and dashboard_fqns:
print("")
print("=" * 50)
print("Creating Charts")
print("=" * 50)
print(f"Creating {NUM_CHARTS} charts linked to dashboards...")
sys.stdout.flush()
created = 0
failed = 0
start_time = time.time()
def create_chart(idx):
# Link chart to a random dashboard
dashboard_fqn = random.choice(dashboard_fqns) if dashboard_fqns else None
chart_data = {
"name": f"chart_{idx:06d}",
"service": dashboard_service_fqn,
"displayName": f"Test Chart {idx}",
"chartType": random.choice(["Line", "Bar", "Pie", "Area", "Scatter", "Table"]),
"description": f"Auto-generated test chart {idx}"
}
status, _ = make_request(f"{SERVER_URL}/api/v1/charts", data=chart_data, method="PUT", headers=headers)
return status in [200, 201]
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(create_chart, i): i for i in range(NUM_CHARTS)}
for future in as_completed(futures):
if future.result():
created += 1
else:
failed += 1
total = created + failed
if total % 1000 == 0 or total == NUM_CHARTS:
elapsed = time.time() - start_time
rate = total / elapsed if elapsed > 0 else 0
print(f" Charts: {total}/{NUM_CHARTS} ({rate:.1f}/sec) - Created: {created}, Failed: {failed}")
sys.stdout.flush()
stats["charts"] = created
print(f"Charts completed: {created} created, {failed} failed")
# ========== PIPELINES ==========
if NUM_PIPELINES > 0:
print("")
print("=" * 50)
print("Creating Pipelines")
print("=" * 50)
# Create pipeline service
print("Creating pipeline service...")
sys.stdout.flush()
pipeline_service_data = {
"name": "test-pipeline-service",
"serviceType": "Airflow",
"connection": {
"config": {
"type": "Airflow",
"hostPort": "http://airflow.example.com:8080",
"connection": {
"type": "BackendConnection"
}
}
}
}
status, resp = make_request(
f"{SERVER_URL}/api/v1/services/pipelineServices",
data=pipeline_service_data,
method="PUT",
headers=headers
)
if status in [200, 201] and isinstance(resp, dict):
pipeline_service_fqn = resp["fullyQualifiedName"]
print(f"Pipeline service created: {pipeline_service_fqn}")
else:
print(f"Failed to create pipeline service: {status} - {resp}")
pipeline_service_fqn = None
if pipeline_service_fqn:
# Create pipelines
print(f"Creating {NUM_PIPELINES} pipelines...")
sys.stdout.flush()
created = 0
failed = 0
start_time = time.time()
def create_pipeline(idx):
pipeline_data = {
"name": f"pipeline_{idx:06d}",
"service": pipeline_service_fqn,
"displayName": f"Test Pipeline {idx}",
"description": f"Auto-generated test pipeline {idx} for distributed indexing testing"
}
status, _ = make_request(f"{SERVER_URL}/api/v1/pipelines", data=pipeline_data, method="PUT", headers=headers)
return status in [200, 201]
# Execute with thread pool
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(create_pipeline, i): i for i in range(NUM_PIPELINES)}
for future in as_completed(futures):
if future.result():
created += 1
else:
failed += 1
total = created + failed
if total % 1000 == 0 or total == NUM_PIPELINES:
elapsed = time.time() - start_time
rate = total / elapsed if elapsed > 0 else 0
print(f" Pipelines: {total}/{NUM_PIPELINES} ({rate:.1f}/sec) - Created: {created}, Failed: {failed}")
sys.stdout.flush()
stats["pipelines"] = created
print(f"Pipelines completed: {created} created, {failed} failed")
# ========== TOPICS ==========
if NUM_TOPICS > 0:
print("")
print("=" * 50)
print("Creating Topics")
print("=" * 50)
# Create messaging service
print("Creating messaging service...")
sys.stdout.flush()
messaging_service_data = {
"name": "test-messaging-service",
"serviceType": "Kafka",
"connection": {
"config": {
"type": "Kafka",
"bootstrapServers": "localhost:9092"
}
}
}
status, resp = make_request(
f"{SERVER_URL}/api/v1/services/messagingServices",
data=messaging_service_data,
method="PUT",
headers=headers
)
if status in [200, 201] and isinstance(resp, dict):
messaging_service_fqn = resp["fullyQualifiedName"]
print(f"Messaging service created: {messaging_service_fqn}")
else:
print(f"Failed to create messaging service: {status} - {resp}")
messaging_service_fqn = None
if messaging_service_fqn:
# Create topics
print(f"Creating {NUM_TOPICS} topics...")
sys.stdout.flush()
created = 0
failed = 0
start_time = time.time()
def create_topic(idx):
topic_data = {
"name": f"topic_{idx:06d}",
"service": messaging_service_fqn,
"partitions": 3,
"replicationFactor": 1,
"description": f"Auto-generated test topic {idx} for distributed indexing testing"
}
status, _ = make_request(f"{SERVER_URL}/api/v1/topics", data=topic_data, method="PUT", headers=headers)
return status in [200, 201]
# Execute with thread pool
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(create_topic, i): i for i in range(NUM_TOPICS)}
for future in as_completed(futures):
if future.result():
created += 1
else:
failed += 1
total = created + failed
if total % 1000 == 0 or total == NUM_TOPICS:
elapsed = time.time() - start_time
rate = total / elapsed if elapsed > 0 else 0
print(f" Topics: {total}/{NUM_TOPICS} ({rate:.1f}/sec) - Created: {created}, Failed: {failed}")
sys.stdout.flush()
stats["topics"] = created
print(f"Topics completed: {created} created, {failed} failed")
# ========== ML MODELS ==========
if NUM_MLMODELS > 0:
print("")
print("=" * 50)
print("Creating ML Models")
print("=" * 50)
# Create ML model service
print("Creating ML model service...")
sys.stdout.flush()
mlmodel_service_data = {
"name": "test-mlmodel-service",
"serviceType": "Mlflow",
"connection": {
"config": {
"type": "Mlflow",
"trackingUri": "http://mlflow.example.com:5000",
"registryUri": "http://mlflow.example.com:5000"
}
}
}
status, resp = make_request(
f"{SERVER_URL}/api/v1/services/mlmodelServices",
data=mlmodel_service_data,
method="PUT",
headers=headers
)
if status in [200, 201] and isinstance(resp, dict):
mlmodel_service_fqn = resp["fullyQualifiedName"]
print(f"ML model service created: {mlmodel_service_fqn}")
else:
print(f"Failed to create ML model service: {status} - {resp}")
mlmodel_service_fqn = None
if mlmodel_service_fqn:
print(f"Creating {NUM_MLMODELS} ML models...")
sys.stdout.flush()
created = 0
failed = 0
start_time = time.time()
algorithms = ["LinearRegression", "RandomForest", "XGBoost", "NeuralNetwork", "SVM", "KMeans", "DecisionTree"]
def create_mlmodel(idx):
mlmodel_data = {
"name": f"mlmodel_{idx:06d}",
"service": mlmodel_service_fqn,
"algorithm": random.choice(algorithms),
"displayName": f"Test ML Model {idx}",
"description": f"Auto-generated test ML model {idx}"
}
status, _ = make_request(f"{SERVER_URL}/api/v1/mlmodels", data=mlmodel_data, method="PUT", headers=headers)
return status in [200, 201]
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(create_mlmodel, i): i for i in range(NUM_MLMODELS)}
for future in as_completed(futures):
if future.result():
created += 1
else:
failed += 1
total = created + failed
if total % 500 == 0 or total == NUM_MLMODELS:
elapsed = time.time() - start_time
rate = total / elapsed if elapsed > 0 else 0
print(f" ML Models: {total}/{NUM_MLMODELS} ({rate:.1f}/sec) - Created: {created}, Failed: {failed}")
sys.stdout.flush()
stats["mlmodels"] = created
print(f"ML Models completed: {created} created, {failed} failed")
# ========== CONTAINERS ==========
if NUM_CONTAINERS > 0:
print("")
print("=" * 50)
print("Creating Containers")
print("=" * 50)
# Create storage service
print("Creating storage service...")
sys.stdout.flush()
storage_service_data = {
"name": "test-storage-service",
"serviceType": "S3",
"connection": {
"config": {
"type": "S3",
"awsConfig": {
"awsAccessKeyId": "test-key",
"awsSecretAccessKey": "test-secret",
"awsRegion": "us-east-1"
}
}
}
}
status, resp = make_request(
f"{SERVER_URL}/api/v1/services/storageServices",
data=storage_service_data,
method="PUT",
headers=headers
)
if status in [200, 201] and isinstance(resp, dict):
storage_service_fqn = resp["fullyQualifiedName"]
print(f"Storage service created: {storage_service_fqn}")
else:
print(f"Failed to create storage service: {status} - {resp}")
storage_service_fqn = None
if storage_service_fqn:
print(f"Creating {NUM_CONTAINERS} containers...")
sys.stdout.flush()
created = 0
failed = 0
start_time = time.time()
def create_container(idx):
container_data = {
"name": f"container_{idx:06d}",
"service": storage_service_fqn,
"displayName": f"Test Container {idx}",
"description": f"Auto-generated test container {idx}"
}
status, _ = make_request(f"{SERVER_URL}/api/v1/containers", data=container_data, method="PUT", headers=headers)
return status in [200, 201]
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(create_container, i): i for i in range(NUM_CONTAINERS)}
for future in as_completed(futures):
if future.result():
created += 1
else:
failed += 1
total = created + failed
if total % 500 == 0 or total == NUM_CONTAINERS:
elapsed = time.time() - start_time
rate = total / elapsed if elapsed > 0 else 0
print(f" Containers: {total}/{NUM_CONTAINERS} ({rate:.1f}/sec) - Created: {created}, Failed: {failed}")
sys.stdout.flush()
stats["containers"] = created
print(f"Containers completed: {created} created, {failed} failed")
# ========== SEARCH INDEXES ==========
if NUM_SEARCH_INDEXES > 0:
print("")
print("=" * 50)
print("Creating Search Indexes")
print("=" * 50)
# Create search service
print("Creating search service...")
sys.stdout.flush()
search_service_data = {
"name": "test-search-service",
"serviceType": "ElasticSearch",
"connection": {
"config": {
"type": "ElasticSearch",
"hostPort": "http://elasticsearch.example.com:9200"
}
}
}
status, resp = make_request(
f"{SERVER_URL}/api/v1/services/searchServices",
data=search_service_data,
method="PUT",
headers=headers
)
if status in [200, 201] and isinstance(resp, dict):
search_service_fqn = resp["fullyQualifiedName"]
print(f"Search service created: {search_service_fqn}")
else:
print(f"Failed to create search service: {status} - {resp}")
search_service_fqn = None
if search_service_fqn:
print(f"Creating {NUM_SEARCH_INDEXES} search indexes...")
sys.stdout.flush()
created = 0
failed = 0
start_time = time.time()
def create_search_index(idx):
search_index_data = {
"name": f"search_index_{idx:06d}",
"service": search_service_fqn,
"displayName": f"Test Search Index {idx}",
"description": f"Auto-generated test search index {idx}",
"fields": [
{"name": "id", "dataType": "KEYWORD"},
{"name": "content", "dataType": "TEXT"},
{"name": "timestamp", "dataType": "DATE"}
]
}
status, _ = make_request(f"{SERVER_URL}/api/v1/searchIndexes", data=search_index_data, method="PUT", headers=headers)
return status in [200, 201]
with ThreadPoolExecutor(max_workers=20) as executor:
futures = {executor.submit(create_search_index, i): i for i in range(NUM_SEARCH_INDEXES)}
for future in as_completed(futures):
if future.result():
created += 1
else:
failed += 1
total = created + failed
if total % 200 == 0 or total == NUM_SEARCH_INDEXES:
elapsed = time.time() - start_time
rate = total / elapsed if elapsed > 0 else 0
print(f" Search Indexes: {total}/{NUM_SEARCH_INDEXES} ({rate:.1f}/sec) - Created: {created}, Failed: {failed}")
sys.stdout.flush()
stats["searchIndexes"] = created
print(f"Search Indexes completed: {created} created, {failed} failed")
# ========== SUMMARY ==========
overall_elapsed = time.time() - overall_start
total_created = sum(stats.values())
print("")
print("=" * 50)
print("Test Data Loading Complete!")
print("=" * 50)
print("")
print("Summary:")
print(f" Tables: {stats['tables']:>6}")
print(f" Dashboards: {stats['dashboards']:>6}")
print(f" Charts: {stats['charts']:>6}")
print(f" Pipelines: {stats['pipelines']:>6}")
print(f" Topics: {stats['topics']:>6}")
print(f" ML Models: {stats['mlmodels']:>6}")
print(f" Containers: {stats['containers']:>6}")
print(f" Search Indexes: {stats['searchIndexes']:>6}")
print(f" Glossaries: {stats['glossaries']:>6}")
print(f" Glossary Terms: {stats['glossaryTerms']:>6}")
print(f" Classifications: {stats['classifications']:>6}")
print(f" Tags: {stats['tags']:>6}")
print(f" --------------------------")
print(f" Total: {total_created:>6}")
print("")
print(f"Time: {overall_elapsed:.1f} seconds")
if overall_elapsed > 0:
print(f"Rate: {total_created/overall_elapsed:.1f} entities/second")
EOF
echo ""
echo "Test data loaded. You can now trigger reindexing with: ./scripts/trigger-reindex.sh"