OpenMetadata/bin/distributed-test/scripts/perf-test.sh
Mohit Yadav b59aa7fc44
Improve indexing (#26154)
* Add Prometheus metrics for reindexing pipeline via Micrometer                                                       Bridge the existing reindexing atomic counters to Prometheus so operators     can alert on failures, latency spikes, and backpressure without relying      solely on database-flushed stats.

  - Add ReindexingMetrics singleton (initialize/getInstance pattern matching
    CacheMetrics) with job lifecycle counters, stage success/failed/warnings
    counters, bulk request timers with SLA buckets, payload size distribution,
    backpressure and promotion counters, and active/pending gauges
  - Register in MicrometerBundle after StreamableLogsMetrics
  - Instrument ReindexingOrchestrator.run() with job started/completed/failed/stopped
  - Bridge StageStatsTracker.flush() deltas to Prometheus per stage and entity type
  - Add bulk request latency timer and payload size recording in OpenSearchBulkSink
  - Record backpressure events in SearchIndexExecutor.handleBackpressure()
  - Record promotion success/failure in DefaultRecreateHandler
  - Add ReindexingMetricsTest with 24 tests covering all metric types

* Add Improvements

* Auto Gene

* Use Auto Config in distributed

* Fix Partition Claim Spread

* Make partition use config

* Correct total count

* Fix Wait time to 5 mins

* Revert om yaml

* Fix Sink sync

* Add Failure Handling at different stages

* Update script to create entities

* Move to scripts

* Add usage and fix script

* Fix Script

* Update generated TypeScript types

* Fix Staging miss

* Fix Stats reconcilation issue

* Revert workflow handler

* Fix Partition worker early sync

* Update Logs

* Update logs EntityRepository

* Error failure test

* Review Comments fix

* Fix Non Distributed live feed

* Fix Non Distributed stats feed

* Fix Review comments

* Fix Time Series cutt off

* Update generated TypeScript types

* Md

* Benchmark addition

* Fix date time warning

* Update load test to do benchmark analysis

* Disagnostic and update perf test

* Move load test to bin

* Fix Review Comments

* Add numeric values

* Move to localhost by default

* Fix Perf test issues

* Review Comments

* Add Preflight Fixes

* Add Preflight fixes for stale entry

* Remove stale entry on ApplicationHandler

---------

Co-authored-by: github-actions[bot] <github-actions[bot]@users.noreply.github.com>
2026-03-03 16:39:27 +05:30

2700 lines
124 KiB
Bash
Executable file

#!/bin/bash
# Load test data for distributed indexing testing
# Supports 30+ entity types including time-series data, lineage, and data quality
# Includes benchmarking, latency tracking, and cluster sizing recommendations
set -e
SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)"
# ── Default values (backward-compatible ~46k) ────────────────────────────────
SERVER_URL="http://localhost:8585"
NUM_TABLES=20000
NUM_TOPICS=3000
NUM_DASHBOARDS=5000
NUM_CHARTS=10000
NUM_PIPELINES=3000
NUM_STORED_PROCEDURES=0
NUM_CONTAINERS=2000
NUM_SEARCH_INDEXES=1000
NUM_MLMODELS=2000
NUM_QUERIES=0
NUM_DASHBOARD_DATA_MODELS=0
NUM_TEST_SUITES=0
NUM_TEST_CASES=0
NUM_GLOSSARY_TERMS=5000
NUM_GLOSSARIES=50
NUM_TAGS=1000
NUM_CLASSIFICATIONS=20
NUM_USERS=0
NUM_TEAMS=0
NUM_DOMAINS=0
NUM_DATA_PRODUCTS=0
NUM_API_COLLECTIONS=0
NUM_API_ENDPOINTS=0
NUM_LINEAGE_EDGES=0
NUM_TEST_CASE_RESULTS=0
NUM_ENTITY_REPORT_DATA=0
NUM_WEB_ANALYTIC_VIEWS=0
NUM_WEB_ANALYTIC_ACTIVITY=0
NUM_RAW_COST_ANALYSIS=0
NUM_AGG_COST_ANALYSIS=0
NUM_WORKERS=20
SCALE_APPLIED=""
ONLY_ENTITIES=""
OUTPUT_PATH=""
AUTH_TOKEN=""
RAMP_MODE=""
RAMP_BATCH=100
ADMIN_PORT=""
# Parse arguments
while [[ $# -gt 0 ]]; do
case $1 in
--scale)
SCALE_APPLIED="$2"
case $2 in
xlarge)
NUM_TABLES=2500000; NUM_TOPICS=400000; NUM_DASHBOARDS=200000; NUM_CHARTS=400000
NUM_PIPELINES=100000; NUM_STORED_PROCEDURES=100000; NUM_CONTAINERS=75000
NUM_SEARCH_INDEXES=50000; NUM_MLMODELS=50000; NUM_QUERIES=100000
NUM_DASHBOARD_DATA_MODELS=50000; NUM_TEST_SUITES=1500; NUM_TEST_CASES=150000
NUM_GLOSSARY_TERMS=50000; NUM_GLOSSARIES=500; NUM_TAGS=10000; NUM_CLASSIFICATIONS=200
NUM_USERS=5000; NUM_TEAMS=500; NUM_DOMAINS=50; NUM_DATA_PRODUCTS=500
NUM_API_COLLECTIONS=500; NUM_API_ENDPOINTS=50000; NUM_LINEAGE_EDGES=200000
NUM_TEST_CASE_RESULTS=300000; NUM_ENTITY_REPORT_DATA=50000
NUM_WEB_ANALYTIC_VIEWS=100000; NUM_WEB_ANALYTIC_ACTIVITY=50000
NUM_RAW_COST_ANALYSIS=25000; NUM_AGG_COST_ANALYSIS=25000
;;
large)
NUM_TABLES=1000000; NUM_TOPICS=160000; NUM_DASHBOARDS=80000; NUM_CHARTS=160000
NUM_PIPELINES=40000; NUM_STORED_PROCEDURES=40000; NUM_CONTAINERS=30000
NUM_SEARCH_INDEXES=20000; NUM_MLMODELS=20000; NUM_QUERIES=40000
NUM_DASHBOARD_DATA_MODELS=20000; NUM_TEST_SUITES=600; NUM_TEST_CASES=60000
NUM_GLOSSARY_TERMS=20000; NUM_GLOSSARIES=200; NUM_TAGS=4000; NUM_CLASSIFICATIONS=80
NUM_USERS=2000; NUM_TEAMS=200; NUM_DOMAINS=20; NUM_DATA_PRODUCTS=200
NUM_API_COLLECTIONS=200; NUM_API_ENDPOINTS=20000; NUM_LINEAGE_EDGES=80000
NUM_TEST_CASE_RESULTS=120000; NUM_ENTITY_REPORT_DATA=20000
NUM_WEB_ANALYTIC_VIEWS=40000; NUM_WEB_ANALYTIC_ACTIVITY=20000
NUM_RAW_COST_ANALYSIS=10000; NUM_AGG_COST_ANALYSIS=10000
;;
medium)
NUM_TABLES=250000; NUM_TOPICS=40000; NUM_DASHBOARDS=20000; NUM_CHARTS=40000
NUM_PIPELINES=10000; NUM_STORED_PROCEDURES=10000; NUM_CONTAINERS=7500
NUM_SEARCH_INDEXES=5000; NUM_MLMODELS=5000; NUM_QUERIES=10000
NUM_DASHBOARD_DATA_MODELS=5000; NUM_TEST_SUITES=150; NUM_TEST_CASES=15000
NUM_GLOSSARY_TERMS=5000; NUM_GLOSSARIES=50; NUM_TAGS=1000; NUM_CLASSIFICATIONS=20
NUM_USERS=500; NUM_TEAMS=50; NUM_DOMAINS=5; NUM_DATA_PRODUCTS=50
NUM_API_COLLECTIONS=50; NUM_API_ENDPOINTS=5000; NUM_LINEAGE_EDGES=20000
NUM_TEST_CASE_RESULTS=30000; NUM_ENTITY_REPORT_DATA=5000
NUM_WEB_ANALYTIC_VIEWS=10000; NUM_WEB_ANALYTIC_ACTIVITY=5000
NUM_RAW_COST_ANALYSIS=2500; NUM_AGG_COST_ANALYSIS=2500
;;
10k)
NUM_TABLES=5000; NUM_TOPICS=800; NUM_DASHBOARDS=400; NUM_CHARTS=800
NUM_PIPELINES=200; NUM_STORED_PROCEDURES=200; NUM_CONTAINERS=150
NUM_SEARCH_INDEXES=100; NUM_MLMODELS=100; NUM_QUERIES=200
NUM_DASHBOARD_DATA_MODELS=100; NUM_TEST_SUITES=3; NUM_TEST_CASES=300
NUM_GLOSSARY_TERMS=100; NUM_GLOSSARIES=1; NUM_TAGS=20; NUM_CLASSIFICATIONS=1
NUM_USERS=10; NUM_TEAMS=1; NUM_DOMAINS=1; NUM_DATA_PRODUCTS=1
NUM_API_COLLECTIONS=1; NUM_API_ENDPOINTS=100; NUM_LINEAGE_EDGES=400
NUM_TEST_CASE_RESULTS=600; NUM_ENTITY_REPORT_DATA=100
NUM_WEB_ANALYTIC_VIEWS=200; NUM_WEB_ANALYTIC_ACTIVITY=100
NUM_RAW_COST_ANALYSIS=50; NUM_AGG_COST_ANALYSIS=50
;;
small|50k)
NUM_TABLES=25000; NUM_TOPICS=4000; NUM_DASHBOARDS=2000; NUM_CHARTS=4000
NUM_PIPELINES=1000; NUM_STORED_PROCEDURES=1000; NUM_CONTAINERS=750
NUM_SEARCH_INDEXES=500; NUM_MLMODELS=500; NUM_QUERIES=1000
NUM_DASHBOARD_DATA_MODELS=500; NUM_TEST_SUITES=15; NUM_TEST_CASES=1500
NUM_GLOSSARY_TERMS=500; NUM_GLOSSARIES=5; NUM_TAGS=100; NUM_CLASSIFICATIONS=2
NUM_USERS=50; NUM_TEAMS=5; NUM_DOMAINS=1; NUM_DATA_PRODUCTS=5
NUM_API_COLLECTIONS=5; NUM_API_ENDPOINTS=500; NUM_LINEAGE_EDGES=2000
NUM_TEST_CASE_RESULTS=3000; NUM_ENTITY_REPORT_DATA=500
NUM_WEB_ANALYTIC_VIEWS=1000; NUM_WEB_ANALYTIC_ACTIVITY=500
NUM_RAW_COST_ANALYSIS=250; NUM_AGG_COST_ANALYSIS=250
;;
100k)
NUM_TABLES=50000; NUM_TOPICS=8000; NUM_DASHBOARDS=4000; NUM_CHARTS=8000
NUM_PIPELINES=2000; NUM_STORED_PROCEDURES=2000; NUM_CONTAINERS=1500
NUM_SEARCH_INDEXES=1000; NUM_MLMODELS=1000; NUM_QUERIES=2000
NUM_DASHBOARD_DATA_MODELS=1000; NUM_TEST_SUITES=30; NUM_TEST_CASES=3000
NUM_GLOSSARY_TERMS=1000; NUM_GLOSSARIES=10; NUM_TAGS=200; NUM_CLASSIFICATIONS=4
NUM_USERS=100; NUM_TEAMS=10; NUM_DOMAINS=2; NUM_DATA_PRODUCTS=10
NUM_API_COLLECTIONS=10; NUM_API_ENDPOINTS=1000; NUM_LINEAGE_EDGES=4000
NUM_TEST_CASE_RESULTS=6000; NUM_ENTITY_REPORT_DATA=1000
NUM_WEB_ANALYTIC_VIEWS=2000; NUM_WEB_ANALYTIC_ACTIVITY=1000
NUM_RAW_COST_ANALYSIS=500; NUM_AGG_COST_ANALYSIS=500
;;
150k)
NUM_TABLES=75000; NUM_TOPICS=12000; NUM_DASHBOARDS=6000; NUM_CHARTS=12000
NUM_PIPELINES=3000; NUM_STORED_PROCEDURES=3000; NUM_CONTAINERS=2250
NUM_SEARCH_INDEXES=1500; NUM_MLMODELS=1500; NUM_QUERIES=3000
NUM_DASHBOARD_DATA_MODELS=1500; NUM_TEST_SUITES=45; NUM_TEST_CASES=4500
NUM_GLOSSARY_TERMS=1500; NUM_GLOSSARIES=15; NUM_TAGS=300; NUM_CLASSIFICATIONS=6
NUM_USERS=150; NUM_TEAMS=15; NUM_DOMAINS=3; NUM_DATA_PRODUCTS=15
NUM_API_COLLECTIONS=15; NUM_API_ENDPOINTS=1500; NUM_LINEAGE_EDGES=6000
NUM_TEST_CASE_RESULTS=9000; NUM_ENTITY_REPORT_DATA=1500
NUM_WEB_ANALYTIC_VIEWS=3000; NUM_WEB_ANALYTIC_ACTIVITY=1500
NUM_RAW_COST_ANALYSIS=750; NUM_AGG_COST_ANALYSIS=750
;;
200k)
NUM_TABLES=100000; NUM_TOPICS=16000; NUM_DASHBOARDS=8000; NUM_CHARTS=16000
NUM_PIPELINES=4000; NUM_STORED_PROCEDURES=4000; NUM_CONTAINERS=3000
NUM_SEARCH_INDEXES=2000; NUM_MLMODELS=2000; NUM_QUERIES=4000
NUM_DASHBOARD_DATA_MODELS=2000; NUM_TEST_SUITES=60; NUM_TEST_CASES=6000
NUM_GLOSSARY_TERMS=2000; NUM_GLOSSARIES=20; NUM_TAGS=400; NUM_CLASSIFICATIONS=8
NUM_USERS=200; NUM_TEAMS=20; NUM_DOMAINS=4; NUM_DATA_PRODUCTS=20
NUM_API_COLLECTIONS=20; NUM_API_ENDPOINTS=2000; NUM_LINEAGE_EDGES=8000
NUM_TEST_CASE_RESULTS=12000; NUM_ENTITY_REPORT_DATA=2000
NUM_WEB_ANALYTIC_VIEWS=4000; NUM_WEB_ANALYTIC_ACTIVITY=2000
NUM_RAW_COST_ANALYSIS=1000; NUM_AGG_COST_ANALYSIS=1000
;;
250k)
NUM_TABLES=125000; NUM_TOPICS=20000; NUM_DASHBOARDS=10000; NUM_CHARTS=20000
NUM_PIPELINES=5000; NUM_STORED_PROCEDURES=5000; NUM_CONTAINERS=3750
NUM_SEARCH_INDEXES=2500; NUM_MLMODELS=2500; NUM_QUERIES=5000
NUM_DASHBOARD_DATA_MODELS=2500; NUM_TEST_SUITES=75; NUM_TEST_CASES=7500
NUM_GLOSSARY_TERMS=2500; NUM_GLOSSARIES=25; NUM_TAGS=500; NUM_CLASSIFICATIONS=10
NUM_USERS=250; NUM_TEAMS=25; NUM_DOMAINS=5; NUM_DATA_PRODUCTS=25
NUM_API_COLLECTIONS=25; NUM_API_ENDPOINTS=2500; NUM_LINEAGE_EDGES=10000
NUM_TEST_CASE_RESULTS=15000; NUM_ENTITY_REPORT_DATA=2500
NUM_WEB_ANALYTIC_VIEWS=5000; NUM_WEB_ANALYTIC_ACTIVITY=2500
NUM_RAW_COST_ANALYSIS=1250; NUM_AGG_COST_ANALYSIS=1250
;;
300k)
NUM_TABLES=150000; NUM_TOPICS=24000; NUM_DASHBOARDS=12000; NUM_CHARTS=24000
NUM_PIPELINES=6000; NUM_STORED_PROCEDURES=6000; NUM_CONTAINERS=4500
NUM_SEARCH_INDEXES=3000; NUM_MLMODELS=3000; NUM_QUERIES=6000
NUM_DASHBOARD_DATA_MODELS=3000; NUM_TEST_SUITES=90; NUM_TEST_CASES=9000
NUM_GLOSSARY_TERMS=3000; NUM_GLOSSARIES=30; NUM_TAGS=600; NUM_CLASSIFICATIONS=12
NUM_USERS=300; NUM_TEAMS=30; NUM_DOMAINS=6; NUM_DATA_PRODUCTS=30
NUM_API_COLLECTIONS=30; NUM_API_ENDPOINTS=3000; NUM_LINEAGE_EDGES=12000
NUM_TEST_CASE_RESULTS=18000; NUM_ENTITY_REPORT_DATA=3000
NUM_WEB_ANALYTIC_VIEWS=6000; NUM_WEB_ANALYTIC_ACTIVITY=3000
NUM_RAW_COST_ANALYSIS=1500; NUM_AGG_COST_ANALYSIS=1500
;;
350k)
NUM_TABLES=175000; NUM_TOPICS=28000; NUM_DASHBOARDS=14000; NUM_CHARTS=28000
NUM_PIPELINES=7000; NUM_STORED_PROCEDURES=7000; NUM_CONTAINERS=5250
NUM_SEARCH_INDEXES=3500; NUM_MLMODELS=3500; NUM_QUERIES=7000
NUM_DASHBOARD_DATA_MODELS=3500; NUM_TEST_SUITES=105; NUM_TEST_CASES=10500
NUM_GLOSSARY_TERMS=3500; NUM_GLOSSARIES=35; NUM_TAGS=700; NUM_CLASSIFICATIONS=14
NUM_USERS=350; NUM_TEAMS=35; NUM_DOMAINS=7; NUM_DATA_PRODUCTS=35
NUM_API_COLLECTIONS=35; NUM_API_ENDPOINTS=3500; NUM_LINEAGE_EDGES=14000
NUM_TEST_CASE_RESULTS=21000; NUM_ENTITY_REPORT_DATA=3500
NUM_WEB_ANALYTIC_VIEWS=7000; NUM_WEB_ANALYTIC_ACTIVITY=3500
NUM_RAW_COST_ANALYSIS=1750; NUM_AGG_COST_ANALYSIS=1750
;;
400k)
NUM_TABLES=200000; NUM_TOPICS=32000; NUM_DASHBOARDS=16000; NUM_CHARTS=32000
NUM_PIPELINES=8000; NUM_STORED_PROCEDURES=8000; NUM_CONTAINERS=6000
NUM_SEARCH_INDEXES=4000; NUM_MLMODELS=4000; NUM_QUERIES=8000
NUM_DASHBOARD_DATA_MODELS=4000; NUM_TEST_SUITES=120; NUM_TEST_CASES=12000
NUM_GLOSSARY_TERMS=4000; NUM_GLOSSARIES=40; NUM_TAGS=800; NUM_CLASSIFICATIONS=16
NUM_USERS=400; NUM_TEAMS=40; NUM_DOMAINS=8; NUM_DATA_PRODUCTS=40
NUM_API_COLLECTIONS=40; NUM_API_ENDPOINTS=4000; NUM_LINEAGE_EDGES=16000
NUM_TEST_CASE_RESULTS=24000; NUM_ENTITY_REPORT_DATA=4000
NUM_WEB_ANALYTIC_VIEWS=8000; NUM_WEB_ANALYTIC_ACTIVITY=4000
NUM_RAW_COST_ANALYSIS=2000; NUM_AGG_COST_ANALYSIS=2000
;;
450k)
NUM_TABLES=225000; NUM_TOPICS=36000; NUM_DASHBOARDS=18000; NUM_CHARTS=36000
NUM_PIPELINES=9000; NUM_STORED_PROCEDURES=9000; NUM_CONTAINERS=6750
NUM_SEARCH_INDEXES=4500; NUM_MLMODELS=4500; NUM_QUERIES=9000
NUM_DASHBOARD_DATA_MODELS=4500; NUM_TEST_SUITES=135; NUM_TEST_CASES=13500
NUM_GLOSSARY_TERMS=4500; NUM_GLOSSARIES=45; NUM_TAGS=900; NUM_CLASSIFICATIONS=18
NUM_USERS=450; NUM_TEAMS=45; NUM_DOMAINS=9; NUM_DATA_PRODUCTS=45
NUM_API_COLLECTIONS=45; NUM_API_ENDPOINTS=4500; NUM_LINEAGE_EDGES=18000
NUM_TEST_CASE_RESULTS=27000; NUM_ENTITY_REPORT_DATA=4500
NUM_WEB_ANALYTIC_VIEWS=9000; NUM_WEB_ANALYTIC_ACTIVITY=4500
NUM_RAW_COST_ANALYSIS=2250; NUM_AGG_COST_ANALYSIS=2250
;;
500k)
NUM_TABLES=250000; NUM_TOPICS=40000; NUM_DASHBOARDS=20000; NUM_CHARTS=40000
NUM_PIPELINES=10000; NUM_STORED_PROCEDURES=10000; NUM_CONTAINERS=7500
NUM_SEARCH_INDEXES=5000; NUM_MLMODELS=5000; NUM_QUERIES=10000
NUM_DASHBOARD_DATA_MODELS=5000; NUM_TEST_SUITES=150; NUM_TEST_CASES=15000
NUM_GLOSSARY_TERMS=5000; NUM_GLOSSARIES=50; NUM_TAGS=1000; NUM_CLASSIFICATIONS=20
NUM_USERS=500; NUM_TEAMS=50; NUM_DOMAINS=10; NUM_DATA_PRODUCTS=50
NUM_API_COLLECTIONS=50; NUM_API_ENDPOINTS=5000; NUM_LINEAGE_EDGES=20000
NUM_TEST_CASE_RESULTS=30000; NUM_ENTITY_REPORT_DATA=5000
NUM_WEB_ANALYTIC_VIEWS=10000; NUM_WEB_ANALYTIC_ACTIVITY=5000
NUM_RAW_COST_ANALYSIS=2500; NUM_AGG_COST_ANALYSIS=2500
;;
*)
echo "Unknown scale: $2 (use 10k|50k|100k|150k|200k|250k|300k|350k|400k|450k|500k|small|medium|large|xlarge)"
exit 1
;;
esac
shift 2
;;
--tables) NUM_TABLES="$2"; shift 2 ;;
--dashboards) NUM_DASHBOARDS="$2"; shift 2 ;;
--charts) NUM_CHARTS="$2"; shift 2 ;;
--pipelines) NUM_PIPELINES="$2"; shift 2 ;;
--topics) NUM_TOPICS="$2"; shift 2 ;;
--mlmodels) NUM_MLMODELS="$2"; shift 2 ;;
--containers) NUM_CONTAINERS="$2"; shift 2 ;;
--search-indexes) NUM_SEARCH_INDEXES="$2"; shift 2 ;;
--stored-procedures) NUM_STORED_PROCEDURES="$2"; shift 2 ;;
--queries) NUM_QUERIES="$2"; shift 2 ;;
--data-models) NUM_DASHBOARD_DATA_MODELS="$2"; shift 2 ;;
--test-suites) NUM_TEST_SUITES="$2"; shift 2 ;;
--test-cases) NUM_TEST_CASES="$2"; shift 2 ;;
--glossaries) NUM_GLOSSARIES="$2"; shift 2 ;;
--glossary-terms) NUM_GLOSSARY_TERMS="$2"; shift 2 ;;
--classifications) NUM_CLASSIFICATIONS="$2"; shift 2 ;;
--tags) NUM_TAGS="$2"; shift 2 ;;
--users) NUM_USERS="$2"; shift 2 ;;
--teams) NUM_TEAMS="$2"; shift 2 ;;
--domains) NUM_DOMAINS="$2"; shift 2 ;;
--data-products) NUM_DATA_PRODUCTS="$2"; shift 2 ;;
--api-collections) NUM_API_COLLECTIONS="$2"; shift 2 ;;
--api-endpoints) NUM_API_ENDPOINTS="$2"; shift 2 ;;
--lineage-edges) NUM_LINEAGE_EDGES="$2"; shift 2 ;;
--test-case-results) NUM_TEST_CASE_RESULTS="$2"; shift 2 ;;
--entity-report-data) NUM_ENTITY_REPORT_DATA="$2"; shift 2 ;;
--web-analytic-views) NUM_WEB_ANALYTIC_VIEWS="$2"; shift 2 ;;
--web-analytic-activity) NUM_WEB_ANALYTIC_ACTIVITY="$2"; shift 2 ;;
--raw-cost-analysis) NUM_RAW_COST_ANALYSIS="$2"; shift 2 ;;
--aggregated-cost-analysis) NUM_AGG_COST_ANALYSIS="$2"; shift 2 ;;
--workers) NUM_WORKERS="$2"; shift 2 ;;
--only) ONLY_ENTITIES="$2"; shift 2 ;;
--output) OUTPUT_PATH="$2"; shift 2 ;;
--token) AUTH_TOKEN="$2"; shift 2 ;;
--ramp) RAMP_MODE="true"; shift ;;
--ramp-batch) RAMP_BATCH="$2"; shift 2 ;;
--admin-port) ADMIN_PORT="$2"; shift 2 ;;
--databases) shift 2 ;; # ignored, auto-calculated now
--terms-per-glossary) shift 2 ;; # ignored, use --glossary-terms
--tags-per-classification) shift 2 ;; # ignored, use --tags
--server) SERVER_URL="$2"; shift 2 ;;
--quick)
SCALE_APPLIED="quick"
NUM_TABLES=3000; NUM_DASHBOARDS=1000; NUM_CHARTS=2000; NUM_PIPELINES=500
NUM_TOPICS=500; NUM_MLMODELS=300; NUM_CONTAINERS=300; NUM_SEARCH_INDEXES=200
NUM_STORED_PROCEDURES=200; NUM_QUERIES=200; NUM_DASHBOARD_DATA_MODELS=100
NUM_TEST_SUITES=5; NUM_TEST_CASES=500; NUM_GLOSSARY_TERMS=500; NUM_GLOSSARIES=10
NUM_TAGS=100; NUM_CLASSIFICATIONS=5; NUM_USERS=20; NUM_TEAMS=3; NUM_DOMAINS=1
NUM_DATA_PRODUCTS=3; NUM_API_COLLECTIONS=3; NUM_API_ENDPOINTS=100
NUM_LINEAGE_EDGES=500; NUM_TEST_CASE_RESULTS=1000; NUM_ENTITY_REPORT_DATA=100
NUM_WEB_ANALYTIC_VIEWS=200; NUM_WEB_ANALYTIC_ACTIVITY=100
NUM_RAW_COST_ANALYSIS=50; NUM_AGG_COST_ANALYSIS=50
shift
;;
-h|--help)
cat << 'HELPEOF'
Usage: perf-test.sh [OPTIONS]
Scale presets:
--scale {10k|50k|100k|...|500k|small|medium|large|xlarge}
Apply a preset (see table below)
--quick Quick mode (~10k entities)
Individual entity counts override any preset:
--tables NUM --dashboards NUM --charts NUM
--pipelines NUM --topics NUM --mlmodels NUM
--containers NUM --search-indexes NUM --stored-procedures NUM
--queries NUM --data-models NUM --test-suites NUM
--test-cases NUM --glossaries NUM --glossary-terms NUM
--classifications NUM --tags NUM --users NUM
--teams NUM --domains NUM --data-products NUM
--api-collections NUM --api-endpoints NUM --lineage-edges NUM
Time-series entity counts:
--test-case-results NUM --entity-report-data NUM
--web-analytic-views NUM --web-analytic-activity NUM
--raw-cost-analysis NUM --aggregated-cost-analysis NUM
Benchmarking & filtering:
--only ENTITIES Comma-separated entity types to create (e.g. tables,charts,topics)
When specified, only listed entities run. Prerequisites auto-enabled.
Valid names: tables, topics, dashboards, charts, pipelines,
storedProcedures, containers, searchIndexes, mlmodels, queries,
dashboardDataModels, testSuites, testCases, glossaries,
glossaryTerms, users, teams, domains, dataProducts,
apiCollections, apiEndpoints, lineageEdges, testCaseResults,
entityReportData, webAnalyticViews, webAnalyticActivity,
rawCostAnalysis, aggCostAnalysis
--output PATH Write JSON benchmark report to PATH
(default: benchmark-report-{timestamp}.json in current dir)
--token TOKEN Auth token (overrides hardcoded default)
--ramp Run concurrency ramp test before main load
--ramp-batch NUM Entities per ramp level (default: 100)
--admin-port PORT Admin port for Prometheus metrics scraping
Other:
--server URL Target server URL (default: https://mohitcorp.getcollate.io)
--workers NUM Concurrent workers (default: 20)
-h, --help Show this help message
Scale preset totals (numeric):
10k ~10K 50k ~50K 100k ~100K 150k ~150K 200k ~200K
250k ~250K 300k ~300K 350k ~350K 400k ~400K 450k ~450K
500k ~500K
Scale preset totals (named):
xlarge ~5M large ~2M medium ~500K small ~50K
Examples:
# Quick benchmark with just tables
./perf-test.sh --only tables --tables 100 --workers 5
# Full benchmark with JSON output
./perf-test.sh --quick --workers 10 --output /tmp/bench.json
# Only tables and charts, custom counts
./perf-test.sh --only tables,charts --tables 5000 --charts 2000
# Ramp test to find optimal concurrency
./perf-test.sh --ramp --only tables --tables 500 --workers 32
# Full benchmark with Prometheus metrics from admin port
./perf-test.sh --quick --workers 10 --admin-port 8586 --output /tmp/bench.json
HELPEOF
exit 0
;;
*)
echo "Unknown option: $1"
exit 1
;;
esac
done
# Calculate total
TOTAL=$((NUM_TABLES + NUM_TOPICS + NUM_DASHBOARDS + NUM_CHARTS + NUM_PIPELINES + \
NUM_STORED_PROCEDURES + NUM_CONTAINERS + NUM_SEARCH_INDEXES + NUM_MLMODELS + \
NUM_QUERIES + NUM_DASHBOARD_DATA_MODELS + NUM_TEST_SUITES + NUM_TEST_CASES + \
NUM_GLOSSARY_TERMS + NUM_GLOSSARIES + NUM_TAGS + NUM_CLASSIFICATIONS + \
NUM_USERS + NUM_TEAMS + NUM_DOMAINS + NUM_DATA_PRODUCTS + \
NUM_API_COLLECTIONS + NUM_API_ENDPOINTS + NUM_LINEAGE_EDGES + \
NUM_TEST_CASE_RESULTS + NUM_ENTITY_REPORT_DATA + \
NUM_WEB_ANALYTIC_VIEWS + NUM_WEB_ANALYTIC_ACTIVITY + \
NUM_RAW_COST_ANALYSIS + NUM_AGG_COST_ANALYSIS))
echo "======================================"
echo "Loading Test Data for Distributed Indexing"
echo "======================================"
echo "Server: $SERVER_URL"
if [ -n "$SCALE_APPLIED" ]; then
echo "Scale: $SCALE_APPLIED"
fi
echo "Workers: $NUM_WORKERS"
if [ -n "$ONLY_ENTITIES" ]; then
echo "Only: $ONLY_ENTITIES"
fi
if [ -n "$RAMP_MODE" ]; then
echo "Ramp: enabled (batch=$RAMP_BATCH)"
fi
if [ -n "$ADMIN_PORT" ]; then
echo "Admin: port $ADMIN_PORT (Prometheus metrics)"
fi
echo ""
echo "Entity counts:"
printf " %-26s %s\n" "Tables:" "$NUM_TABLES"
printf " %-26s %s\n" "Topics:" "$NUM_TOPICS"
printf " %-26s %s\n" "Dashboards:" "$NUM_DASHBOARDS"
printf " %-26s %s\n" "Charts:" "$NUM_CHARTS"
printf " %-26s %s\n" "Pipelines:" "$NUM_PIPELINES"
printf " %-26s %s\n" "Stored Procedures:" "$NUM_STORED_PROCEDURES"
printf " %-26s %s\n" "Containers:" "$NUM_CONTAINERS"
printf " %-26s %s\n" "Search Indexes:" "$NUM_SEARCH_INDEXES"
printf " %-26s %s\n" "ML Models:" "$NUM_MLMODELS"
printf " %-26s %s\n" "Queries:" "$NUM_QUERIES"
printf " %-26s %s\n" "Dashboard Data Models:" "$NUM_DASHBOARD_DATA_MODELS"
printf " %-26s %s\n" "Test Suites:" "$NUM_TEST_SUITES"
printf " %-26s %s\n" "Test Cases:" "$NUM_TEST_CASES"
printf " %-26s %s\n" "Glossaries:" "$NUM_GLOSSARIES"
printf " %-26s %s\n" "Glossary Terms:" "$NUM_GLOSSARY_TERMS"
printf " %-26s %s\n" "Classifications:" "$NUM_CLASSIFICATIONS"
printf " %-26s %s\n" "Tags:" "$NUM_TAGS"
printf " %-26s %s\n" "Users:" "$NUM_USERS"
printf " %-26s %s\n" "Teams:" "$NUM_TEAMS"
printf " %-26s %s\n" "Domains:" "$NUM_DOMAINS"
printf " %-26s %s\n" "Data Products:" "$NUM_DATA_PRODUCTS"
printf " %-26s %s\n" "API Collections:" "$NUM_API_COLLECTIONS"
printf " %-26s %s\n" "API Endpoints:" "$NUM_API_ENDPOINTS"
printf " %-26s %s\n" "Lineage Edges:" "$NUM_LINEAGE_EDGES"
printf " %-26s %s\n" "Test Case Results (TS):" "$NUM_TEST_CASE_RESULTS"
printf " %-26s %s\n" "Entity Report Data (TS):" "$NUM_ENTITY_REPORT_DATA"
printf " %-26s %s\n" "Web Analytic Views (TS):" "$NUM_WEB_ANALYTIC_VIEWS"
printf " %-26s %s\n" "Web Analytic Activity (TS):" "$NUM_WEB_ANALYTIC_ACTIVITY"
printf " %-26s %s\n" "Raw Cost Analysis (TS):" "$NUM_RAW_COST_ANALYSIS"
printf " %-26s %s\n" "Agg Cost Analysis (TS):" "$NUM_AGG_COST_ANALYSIS"
echo " --------------------------"
printf " %-26s %s\n" "Total:" "$TOTAL"
echo ""
python3 << PYEOF
import urllib.request
import urllib.error
import json
import sys
import time
import random
import uuid
import threading
import os
from concurrent.futures import ThreadPoolExecutor, as_completed
from datetime import datetime, timedelta, timezone
SERVER_URL = "${SERVER_URL}"
NUM_WORKERS = ${NUM_WORKERS}
# ── Entity counts ────────────────────────────────────────────────────────────
NUM_TABLES = ${NUM_TABLES}
NUM_TOPICS = ${NUM_TOPICS}
NUM_DASHBOARDS = ${NUM_DASHBOARDS}
NUM_CHARTS = ${NUM_CHARTS}
NUM_PIPELINES = ${NUM_PIPELINES}
NUM_STORED_PROCEDURES = ${NUM_STORED_PROCEDURES}
NUM_CONTAINERS = ${NUM_CONTAINERS}
NUM_SEARCH_INDEXES = ${NUM_SEARCH_INDEXES}
NUM_MLMODELS = ${NUM_MLMODELS}
NUM_QUERIES = ${NUM_QUERIES}
NUM_DASHBOARD_DATA_MODELS = ${NUM_DASHBOARD_DATA_MODELS}
NUM_TEST_SUITES = ${NUM_TEST_SUITES}
NUM_TEST_CASES = ${NUM_TEST_CASES}
NUM_GLOSSARY_TERMS = ${NUM_GLOSSARY_TERMS}
NUM_GLOSSARIES = ${NUM_GLOSSARIES}
NUM_TAGS = ${NUM_TAGS}
NUM_CLASSIFICATIONS = ${NUM_CLASSIFICATIONS}
NUM_USERS = ${NUM_USERS}
NUM_TEAMS = ${NUM_TEAMS}
NUM_DOMAINS = ${NUM_DOMAINS}
NUM_DATA_PRODUCTS = ${NUM_DATA_PRODUCTS}
NUM_API_COLLECTIONS = ${NUM_API_COLLECTIONS}
NUM_API_ENDPOINTS = ${NUM_API_ENDPOINTS}
NUM_LINEAGE_EDGES = ${NUM_LINEAGE_EDGES}
NUM_TEST_CASE_RESULTS = ${NUM_TEST_CASE_RESULTS}
NUM_ENTITY_REPORT_DATA = ${NUM_ENTITY_REPORT_DATA}
NUM_WEB_ANALYTIC_VIEWS = ${NUM_WEB_ANALYTIC_VIEWS}
NUM_WEB_ANALYTIC_ACTIVITY = ${NUM_WEB_ANALYTIC_ACTIVITY}
NUM_RAW_COST_ANALYSIS = ${NUM_RAW_COST_ANALYSIS}
NUM_AGG_COST_ANALYSIS = ${NUM_AGG_COST_ANALYSIS}
ONLY_ENTITIES_RAW = "${ONLY_ENTITIES}"
OUTPUT_PATH_RAW = "${OUTPUT_PATH}"
AUTH_TOKEN_RAW = "${AUTH_TOKEN}"
SCALE_APPLIED = "${SCALE_APPLIED}" or "default"
RAMP_MODE = "${RAMP_MODE}" == "true"
RAMP_BATCH = ${RAMP_BATCH}
ADMIN_PORT_RAW = "${ADMIN_PORT}"
# Auto-calculate database/schema counts
NUM_DATABASES = max(1, NUM_TABLES // 50000)
SCHEMAS_PER_DB = min(20, max(1, NUM_TABLES // (NUM_DATABASES * 5000))) if NUM_TABLES > 0 else 1
# ── Entity filter (--only) ───────────────────────────────────────────────────
ONLY_ENTITIES = set()
if ONLY_ENTITIES_RAW.strip():
ONLY_ENTITIES = {e.strip() for e in ONLY_ENTITIES_RAW.split(",") if e.strip()}
ENTITY_PREREQUISITES = {
"tables": {"_services_db", "_infra_db"},
"storedProcedures": {"_services_db", "_infra_db"},
"queries": {"_services_db"},
"dashboards": {"_services_dashboard"},
"charts": {"_services_dashboard"},
"dashboardDataModels": {"_services_dashboard"},
"topics": {"_services_messaging"},
"pipelines": {"_services_pipeline"},
"mlmodels": {"_services_mlmodel"},
"containers": {"_services_storage"},
"searchIndexes": {"_services_search"},
"apiCollections": {"_services_api"},
"apiEndpoints": {"_services_api", "apiCollections"},
"dataProducts": {"domains"},
"glossaryTerms": {"glossaries"},
"tags": {"classifications"},
"testCases": {"tables", "_services_db", "_infra_db"},
"testCaseResults": {"testCases", "tables", "_services_db", "_infra_db"},
"lineageEdges": {"tables", "_services_db", "_infra_db"},
}
def _resolve_prerequisites(entities):
resolved = set(entities)
changed = True
while changed:
changed = False
for e in list(resolved):
for prereq in ENTITY_PREREQUISITES.get(e, set()):
if prereq not in resolved:
resolved.add(prereq)
changed = True
return resolved
if ONLY_ENTITIES:
_resolved = _resolve_prerequisites(ONLY_ENTITIES)
_infra_needed = {p for p in _resolved if p.startswith("_")}
_resolved -= _infra_needed
else:
_resolved = set()
_infra_needed = set()
def should_run(entity_name):
if not ONLY_ENTITIES:
return True
return entity_name in ONLY_ENTITIES or entity_name in _resolved
def _need_infra(tag):
if not ONLY_ENTITIES:
return True
return tag in _infra_needed
print(f"Connecting to {SERVER_URL}...")
sys.stdout.flush()
# ── HTTP helper with retry ───────────────────────────────────────────────────
def make_request(url, data=None, method="GET", headers=None, retries=3):
if headers is None:
headers = {}
headers["Content-Type"] = "application/json"
encoded = json.dumps(data).encode("utf-8") if data else None
for attempt in range(retries):
req = urllib.request.Request(url, data=encoded, headers=headers, method=method)
try:
with urllib.request.urlopen(req, timeout=60) as resp:
body = resp.read().decode("utf-8")
try:
parsed = json.loads(body) if body.strip() else {}
except json.JSONDecodeError:
parsed = body
return resp.status, parsed
except urllib.error.HTTPError as e:
try:
body = e.read().decode("utf-8")
except Exception:
body = str(e)
if e.code >= 500 and attempt < retries - 1:
time.sleep(2 ** attempt)
continue
return e.code, body
except Exception as e:
if attempt < retries - 1:
time.sleep(2 ** attempt)
continue
return 0, str(e)
return 0, "max retries exceeded"
# ── Auth token ───────────────────────────────────────────────────────────────
if AUTH_TOKEN_RAW.strip():
token = AUTH_TOKEN_RAW.strip()
print("Using token from --token argument.")
else:
token = ("eyJraWQiOiJHYjM4OWEtOWY3Ni1nZGpzLWE5MmotMDI0MmJrOTQzNTYiLCJ0eXAiOiJKV1QiLCJhbGciOiJSUzI1NiJ9.eyJzdWIiOiJhZG1pbiIsImlzQm90IjpmYWxzZSwiaXNzIjoib3Blbi1tZXRhZGF0YS5vcmciLCJpYXQiOjE2NjM5Mzg0NjIsImVtYWlsIjoiYWRtaW5Ab3Blbm1ldGFkYXRhLm9yZyJ9.tS8um_5DKu7HgzGBzS1VTA5uUjKWOCU0B_j08WXBiEC0mr0zNREkqVfwFDD-d24HlNEbrqioLsBuFRiwIWKc1m_ZlVQbG7P36RUxhuv2vbSp80FKyNM-Tj93FDzq91jsyNmsQhyNv_fNr3TXfzzSPjHt8Go0FMMP66weoKMgW2PbXlhVKwEuXUHyakLLzewm9UMeQaEiRzhiTMU3UkLXcKbYEJJvfNFcLwSl9W8JCO_l0Yj3ud-qt_nQYEZwqW6u5nfdQllN133iikV4fM5QZsMCnm8Rq1mvLR0y9bmJiD7fwM1tmJ791TUWqmKaTnP49U493VanKpUAfzIiOiIbhg")
print("Using default admin JWT token for authentication.")
headers = {"Content-Type": "application/json", "Authorization": f"Bearer {token}"}
# ── Output path ──────────────────────────────────────────────────────────────
if OUTPUT_PATH_RAW.strip():
output_path = OUTPUT_PATH_RAW.strip()
else:
ts_str = datetime.now().strftime("%Y%m%d-%H%M%S")
output_path = f"benchmark-report-{ts_str}.json"
# ══════════════════════════════════════════════════════════════════════════════
# SERVER INTROSPECTION
# ══════════════════════════════════════════════════════════════════════════════
class ServerIntrospector:
def __init__(self, server_url, req_headers, admin_port=None):
self.server_url = server_url
self.req_headers = req_headers
self.admin_port = admin_port
self.info = {}
def _parse_url_host(self):
from urllib.parse import urlparse
parsed = urlparse(self.server_url)
return parsed.hostname or "localhost"
def collect(self):
print("Collecting server introspection data...")
sys.stdout.flush()
status, resp = make_request(
f"{self.server_url}/api/v1/system/version",
method="GET", headers=self.req_headers, retries=2,
)
if status == 200 and isinstance(resp, dict):
self.info["version"] = resp.get("version", "unknown")
self.info["revision"] = resp.get("revision", "unknown")
print(f" Server version: {self.info['version']} (rev: {self.info['revision']})")
else:
self.info["version"] = "unreachable"
self.info["revision"] = "unknown"
print(f" Could not fetch version (status={status})")
status, resp = make_request(
f"{self.server_url}/api/v1/system/status",
method="GET", headers=self.req_headers, retries=2,
)
if status == 200 and isinstance(resp, dict):
self.info["status"] = resp
healthy_components = sum(1 for v in resp.values() if isinstance(v, dict) and v.get("passed"))
total_components = sum(1 for v in resp.values() if isinstance(v, dict))
print(f" Component health: {healthy_components}/{total_components} healthy")
else:
self.info["status"] = {"error": f"status={status}"}
print(f" Could not fetch status (status={status})")
status, resp = make_request(
f"{self.server_url}/api/v1/system/entities/count",
method="GET", headers=self.req_headers, retries=2,
)
if status == 200 and isinstance(resp, dict):
self.info["entity_counts"] = resp
total_entities = sum(v for v in resp.values() if isinstance(v, (int, float)))
print(f" Pre-existing entities: {total_entities:,}")
else:
self.info["entity_counts"] = {}
print(f" Could not fetch entity counts (status={status})")
status, resp = make_request(
f"{self.server_url}/api/v1/system/services/count",
method="GET", headers=self.req_headers, retries=2,
)
if status == 200 and isinstance(resp, dict):
self.info["service_counts"] = resp
else:
self.info["service_counts"] = {}
if self.admin_port:
self._scrape_prometheus("before")
self.collect_diagnostics("before")
sys.stdout.flush()
return self.info
def _scrape_prometheus(self, label):
host = self._parse_url_host()
prom_url = f"http://{host}:{self.admin_port}/prometheus"
print(f" Scraping Prometheus metrics ({label}) from {prom_url}...")
sys.stdout.flush()
try:
req = urllib.request.Request(prom_url, method="GET")
with urllib.request.urlopen(req, timeout=10) as resp:
body = resp.read().decode("utf-8")
metrics = self._parse_prometheus(body)
key = f"prometheus_{label}"
self.info[key] = metrics
heap_used = metrics.get("jvm_memory_bytes_used_heap", "N/A")
heap_max = metrics.get("jvm_memory_bytes_max_heap", "N/A")
if isinstance(heap_used, (int, float)) and isinstance(heap_max, (int, float)):
print(f" JVM heap: {heap_used / 1048576:.0f}MB / {heap_max / 1048576:.0f}MB")
thread_count = metrics.get("jvm_threads_current", "N/A")
print(f" JVM threads: {thread_count}")
db_active = metrics.get("hikaricp_connections_active", "N/A")
db_idle = metrics.get("hikaricp_connections_idle", "N/A")
db_total = metrics.get("hikaricp_connections_total", "N/A")
print(f" DB pool: active={db_active}, idle={db_idle}, total={db_total}")
except Exception as e:
self.info[f"prometheus_{label}"] = {"error": str(e)}
print(f" Failed to scrape Prometheus: {e}")
sys.stdout.flush()
def collect_diagnostics(self, label):
diag_url = f"{self.server_url}/api/v1/system/diagnostics"
print(f" Collecting diagnostics ({label}) from {diag_url}...")
sys.stdout.flush()
try:
status, resp = make_request(diag_url, method="GET", headers=self.req_headers, retries=2)
if status == 200 and isinstance(resp, dict):
self.info[f"diagnostics_{label}"] = resp
jvm = resp.get("jvm", {})
jetty = resp.get("jetty", {})
db = resp.get("database", {})
bulk = resp.get("bulk_executor", {})
heap_pct = jvm.get("heap_usage_pct", "N/A")
print(f" JVM heap: {heap_pct}% used, GC pauses: {jvm.get('gc_pause_total_ms', 0)}ms")
print(f" Jetty: {jetty.get('threads_busy', '?')}/{jetty.get('threads_max', '?')} "
f"threads busy ({jetty.get('utilization_pct', 0)}%), "
f"queue: {jetty.get('queue_size', 0)}")
print(f" DB pool: {db.get('pool_active', '?')}/{db.get('pool_max', '?')} "
f"active ({db.get('pool_usage_pct', 0)}%), "
f"pending: {db.get('pool_pending', 0)}")
print(f" Bulk executor: queue {bulk.get('queue_depth', 0)}/{bulk.get('queue_capacity', '?')} "
f"({bulk.get('queue_usage_pct', 0)}%)")
return resp
else:
print(f" Diagnostics endpoint returned status={status} (may not be available)")
self.info[f"diagnostics_{label}"] = None
return None
except Exception as e:
print(f" Failed to collect diagnostics: {e}")
self.info[f"diagnostics_{label}"] = None
return None
def scrape_after(self):
if self.admin_port:
self._scrape_prometheus("after")
@staticmethod
def _parse_prometheus(text):
metrics = {}
target_prefixes = [
"jvm_memory_bytes_used", "jvm_memory_bytes_max",
"jvm_threads_current", "jvm_threads_daemon",
"hikaricp_connections_active", "hikaricp_connections_idle",
"hikaricp_connections_total", "hikaricp_connections_pending",
"io_dropwizard_jetty_MutableServletContextHandler_percent",
"io_dropwizard_jetty_MutableServletContextHandler_requests",
]
for line in text.split("\n"):
if line.startswith("#") or not line.strip():
continue
parts = line.split()
if len(parts) < 2:
continue
metric_name = parts[0]
try:
value = float(parts[1])
except ValueError:
continue
for prefix in target_prefixes:
if metric_name.startswith(prefix):
if 'area="heap"' in metric_name:
if "bytes_used" in metric_name:
metrics["jvm_memory_bytes_used_heap"] = value
elif "bytes_max" in metric_name:
metrics["jvm_memory_bytes_max_heap"] = value
elif 'area="nonheap"' not in metric_name:
simple_name = metric_name.split("{")[0]
if simple_name not in metrics:
metrics[simple_name] = value
break
return metrics
def report_section(self):
return dict(self.info)
introspector = ServerIntrospector(
SERVER_URL, dict(headers),
admin_port=ADMIN_PORT_RAW.strip() if ADMIN_PORT_RAW.strip() else None,
)
server_info = introspector.collect()
# ══════════════════════════════════════════════════════════════════════════════
# BENCHMARK INFRASTRUCTURE
# ══════════════════════════════════════════════════════════════════════════════
class BenchmarkCollector:
def __init__(self, entity_name, requested_count):
self.entity_name = entity_name
self.requested = requested_count
self.latencies = []
self.errors = []
self.created = 0
self.failed = 0
self.start_time = None
self.end_time = None
self.lock = threading.Lock()
self.window_counts = []
def record_success(self, latency_s):
with self.lock:
self.latencies.append(latency_s)
self.created += 1
def record_failure(self, latency_s, status, error):
with self.lock:
self.latencies.append(latency_s)
self.failed += 1
if len(self.errors) < 50:
self.errors.append({"status": status, "error": str(error)[:200]})
def _error_breakdown(self):
breakdown = {}
for e in self.errors:
code = e.get("status", 0)
if code == 0:
key = "connection_error"
else:
key = str(code)
breakdown[key] = breakdown.get(key, 0) + 1
return breakdown
def _latency_analysis(self):
if len(self.latencies) < 10:
return {}
s = sorted(self.latencies)
n = len(s)
p50 = s[int(n * 0.50)] * 1000
p90 = s[min(int(n * 0.90), n - 1)] * 1000
p95 = s[min(int(n * 0.95), n - 1)] * 1000
p99 = s[min(int(n * 0.99), n - 1)] * 1000
p90_p50_ratio = round(p90 / p50, 1) if p50 > 0 else 0
p99_p95_ratio = round(p99 / p95, 1) if p95 > 0 else 0
bimodal = p90_p50_ratio > 5.0
degradation_pct = 0.0
if n >= 20:
chunk = max(1, n // 5)
first_20 = s[:chunk]
last_20 = s[-chunk:]
avg_first = sum(first_20) / len(first_20) if first_20 else 0
avg_last = sum(last_20) / len(last_20) if last_20 else 0
if avg_first > 0:
degradation_pct = round((avg_last / avg_first - 1) * 100, 1)
result = {
"bimodal": bimodal,
"p90_p50_ratio": p90_p50_ratio,
"p99_p95_ratio": p99_p95_ratio,
"degradation_pct": degradation_pct,
}
findings = []
if bimodal:
findings.append(
f"Bimodal latency distribution (p50={p50:.0f}ms, p90={p90:.0f}ms, "
f"ratio={p90_p50_ratio}x) -- likely DB connection pool or thread pool wait"
)
if p99_p95_ratio > 3.0:
findings.append(
f"Extreme tail latency (p95={p95:.0f}ms, p99={p99:.0f}ms, "
f"ratio={p99_p95_ratio}x) -- possible GC pauses or lock contention"
)
if degradation_pct > 100.0:
findings.append(
f"Sustained load degradation: last 20% of requests {degradation_pct:.0f}% "
f"slower than first 20% -- resource exhaustion"
)
result["findings"] = findings
return result
def percentile(self, p):
if not self.latencies:
return 0
s = sorted(self.latencies)
k = int(len(s) * p / 100)
return s[min(k, len(s) - 1)]
def _throughput_windows(self):
if not self.window_counts or len(self.window_counts) < 2:
return []
buckets = []
bucket_size = 5
start_ts = self.window_counts[0][0]
i = 0
while i < len(self.window_counts):
bucket_start = start_ts + len(buckets) * bucket_size
bucket_end = bucket_start + bucket_size
count_at_start = self.window_counts[i][1] if i == 0 else None
count_at_end = None
for j in range(i, len(self.window_counts)):
if self.window_counts[j][0] >= bucket_end:
break
count_at_end = self.window_counts[j][1]
if count_at_start is None:
count_at_start = self.window_counts[j][1]
i = j + 1
if count_at_start is not None and count_at_end is not None:
delta = count_at_end - count_at_start
rps = delta / bucket_size if bucket_size > 0 else 0
buckets.append({
"elapsed_s": round(bucket_end - start_ts, 1),
"rps": round(rps, 1),
})
else:
i += 1
return buckets
def summary(self):
n = len(self.latencies)
if n == 0:
return None
elapsed = (self.end_time or time.time()) - self.start_time if self.start_time else 0
s = sorted(self.latencies)
result = {
"requested": self.requested,
"total_requests": n,
"created": self.created,
"failed": self.failed,
"error_rate_pct": round(self.failed / n * 100, 2) if n > 0 else 0,
"wall_clock_s": round(elapsed, 2),
"throughput_rps": round(n / elapsed, 2) if elapsed > 0 else 0,
"latency_ms": {
"min": round(s[0] * 1000, 1),
"p50": round(self.percentile(50) * 1000, 1),
"p75": round(self.percentile(75) * 1000, 1),
"p90": round(self.percentile(90) * 1000, 1),
"p95": round(self.percentile(95) * 1000, 1),
"p99": round(self.percentile(99) * 1000, 1),
"max": round(s[-1] * 1000, 1),
"avg": round(sum(s) / len(s) * 1000, 1),
},
"throughput_over_time": self._throughput_windows(),
"errors_sample": self.errors[:10],
"latency_analysis": self._latency_analysis(),
"error_breakdown": self._error_breakdown(),
}
return result
class HealthMonitor:
def __init__(self, server_url, req_headers, interval=5, diagnostics_interval=10):
self.server_url = server_url
self.req_headers = req_headers
self.interval = interval
self.diagnostics_interval = diagnostics_interval
self.samples = []
self.diagnostics_samples = []
self.running = True
self.start_time = time.time()
self._last_diag_time = 0
self._thread = threading.Thread(target=self._poll, daemon=True)
self._thread.start()
def _poll(self):
while self.running:
t0 = time.time()
try:
status, _ = make_request(
f"{self.server_url}/api/v1/system/version",
method="GET", headers=self.req_headers, retries=1,
)
except Exception:
status = 0
latency = (time.time() - t0) * 1000
self.samples.append({
"timestamp": time.time(),
"elapsed_s": round(time.time() - self.start_time, 1),
"latency_ms": round(latency, 1),
"status": status,
"healthy": status == 200,
})
now = time.time()
if now - self._last_diag_time >= self.diagnostics_interval:
self._last_diag_time = now
self._sample_diagnostics()
time.sleep(self.interval)
def _sample_diagnostics(self):
diag_url = f"{self.server_url}/api/v1/system/diagnostics"
try:
status, resp = make_request(diag_url, method="GET", headers=self.req_headers, retries=1)
if status == 200 and isinstance(resp, dict):
resp["_sample_elapsed_s"] = round(time.time() - self.start_time, 1)
self.diagnostics_samples.append(resp)
except Exception:
pass
def stop(self):
self.running = False
self._thread.join(timeout=self.interval + 2)
def summary(self):
if not self.samples:
return {
"total_checks": 0, "healthy": 0, "unhealthy": 0,
"health_latency_ms": {}, "timeline": [],
}
healthy = [s for s in self.samples if s["healthy"]]
unhealthy = [s for s in self.samples if not s["healthy"]]
latencies = sorted([s["latency_ms"] for s in self.samples])
n = len(latencies)
result = {
"total_checks": n,
"healthy": len(healthy),
"unhealthy": len(unhealthy),
"health_latency_ms": {
"min": round(latencies[0], 1),
"avg": round(sum(latencies) / n, 1),
"p95": round(latencies[min(int(n * 0.95), n - 1)], 1),
"max": round(latencies[-1], 1),
},
"timeline": self.samples,
}
if self.diagnostics_samples:
result["diagnostics_during"] = self.diagnostics_samples
return result
# ── Start health monitor ────────────────────────────────────────────────────
health_monitor = HealthMonitor(SERVER_URL, dict(headers), interval=5)
overall_start = time.time()
stats = {}
benchmarks = {}
phase_timings = {}
# ── Collected IDs for linking phases ─────────────────────────────────────────
collected_table_ids = []
collected_dashboard_ids = []
collected_pipeline_ids = []
collected_test_case_fqns = []
collect_lock = threading.Lock()
MAX_COLLECT = max(NUM_LINEAGE_EDGES * 2, NUM_TEST_CASE_RESULTS, 10000)
# ── Generic batch creator (with benchmarking) ───────────────────────────────
def create_entity_batch(entity_name, count, payload_fn, workers=None, collect_fn=None,
log_interval=None):
if count <= 0:
return 0, None
if workers is None:
workers = NUM_WORKERS
if log_interval is None:
log_interval = max(1, count // 20)
bench = BenchmarkCollector(entity_name, count)
bench.start_time = time.time()
print(f"\nCreating {count} {entity_name}...")
sys.stdout.flush()
counter_lock = threading.Lock()
sample_running = True
def _sampler():
while sample_running:
with counter_lock:
total = bench.created + bench.failed
bench.window_counts.append((time.time(), total))
time.sleep(1)
sampler_thread = threading.Thread(target=_sampler, daemon=True)
sampler_thread.start()
def _work(idx):
t0 = time.time()
payload = payload_fn(idx)
if payload is None:
return
url = payload.pop("__url__", None)
method = payload.pop("__method__", "PUT")
if url is None:
return
status, resp = make_request(url, data=payload, method=method, headers=headers)
latency = time.time() - t0
ok = status in [200, 201]
if ok:
bench.record_success(latency)
else:
bench.record_failure(latency, status, resp)
with counter_lock:
total = bench.created + bench.failed
if total % log_interval == 0 or total == count:
elapsed = time.time() - bench.start_time
rate = total / elapsed if elapsed > 0 else 0
print(f" {entity_name}: {total}/{count} ({rate:.1f}/sec) - OK: {bench.created}, Fail: {bench.failed}")
sys.stdout.flush()
if ok and collect_fn and isinstance(resp, dict):
collect_fn(idx, resp)
with ThreadPoolExecutor(max_workers=workers) as executor:
futures = [executor.submit(_work, i) for i in range(count)]
for f in as_completed(futures):
try:
f.result()
except Exception:
pass
sample_running = False
bench.end_time = time.time()
stats[entity_name] = bench.created
benchmarks[entity_name] = bench
elapsed = time.time() - bench.start_time
p50 = bench.percentile(50) * 1000
p95 = bench.percentile(95) * 1000
print(f"{entity_name} done: {bench.created} created, {bench.failed} failed "
f"({elapsed:.1f}s, p50={p50:.0f}ms, p95={p95:.0f}ms)")
sys.stdout.flush()
return bench.created, bench
# ── Ramp tester ──────────────────────────────────────────────────────────────
class RampTester:
def __init__(self, server_url, req_headers, max_workers, batch_size=100):
self.server_url = server_url
self.req_headers = req_headers
self.max_workers = max_workers
self.batch_size = batch_size
self.levels = []
self.optimal_workers = 1
self.saturation_point = None
def _ramp_levels(self):
levels = []
w = 1
while w <= self.max_workers:
levels.append(w)
w *= 2
if levels and levels[-1] != self.max_workers and self.max_workers > levels[-1]:
levels.append(self.max_workers)
return levels
def run(self, entity_type="tables", schema_fqns_list=None, service_fqn=None):
levels = self._ramp_levels()
print(f"\nRAMP TEST ({entity_type}, {self.batch_size} entities per level)")
print(f"{'Workers':>7} {'RPS':>7} {'p50ms':>7} {'p95ms':>7} {'p99ms':>7} {'Errors':>6}")
sys.stdout.flush()
best_rps = 0
best_workers = 1
prev_p95 = 0
for worker_count in levels:
level_result = self._run_level(worker_count, entity_type, schema_fqns_list, service_fqn)
self.levels.append(level_result)
marker = ""
if level_result["rps"] > best_rps:
best_rps = level_result["rps"]
best_workers = worker_count
if level_result["rps"] >= best_rps:
self.optimal_workers = worker_count
if (prev_p95 > 0 and level_result["p95_ms"] > prev_p95 * 3
and self.saturation_point is None):
self.saturation_point = worker_count
marker = " <- saturation"
elif level_result["rps"] >= best_rps and level_result["errors"] == 0:
marker = " <- optimal"
prev_p95 = level_result["p95_ms"] if level_result["p95_ms"] > 0 else prev_p95
print(f"{worker_count:>7} {level_result['rps']:>7.1f} {level_result['p50_ms']:>7.0f} "
f"{level_result['p95_ms']:>7.0f} {level_result['p99_ms']:>7.0f} "
f"{level_result['errors']:>6}{marker}")
sys.stdout.flush()
self.optimal_workers = best_workers
print(f"Optimal concurrency: {self.optimal_workers} workers "
f"({best_rps:.1f} rps, p95={self._get_p95_for(self.optimal_workers):.0f}ms)")
if self.saturation_point:
print(f"Saturation point: {self.saturation_point} workers")
print("")
sys.stdout.flush()
def _get_p95_for(self, workers):
for level in self.levels:
if level["workers"] == workers:
return level["p95_ms"]
return 0
def _run_level(self, worker_count, entity_type, schema_fqns_list, service_fqn):
latencies = []
errors = 0
error_lock = threading.Lock()
ramp_suffix = f"_ramp_{worker_count}w"
def _payload_fn(idx):
if entity_type == "tables" and schema_fqns_list:
sfqn = schema_fqns_list[idx % len(schema_fqns_list)] if schema_fqns_list else "default"
return {
"__url__": f"{self.server_url}/api/v1/tables",
"name": f"ramp_table_{worker_count}w_{idx:07d}",
"databaseSchema": sfqn,
"columns": [
{"name": "id", "dataType": "BIGINT"},
{"name": "name", "dataType": "VARCHAR", "dataLength": 255},
],
}
elif entity_type == "topics" and service_fqn:
return {
"__url__": f"{self.server_url}/api/v1/topics",
"name": f"ramp_topic_{worker_count}w_{idx:06d}",
"service": service_fqn,
"partitions": 3,
"replicationFactor": 1,
}
else:
if schema_fqns_list:
sfqn = schema_fqns_list[idx % len(schema_fqns_list)]
return {
"__url__": f"{self.server_url}/api/v1/tables",
"name": f"ramp_table_{worker_count}w_{idx:07d}",
"databaseSchema": sfqn,
"columns": [
{"name": "id", "dataType": "BIGINT"},
{"name": "name", "dataType": "VARCHAR", "dataLength": 255},
],
}
return None
def _work(idx):
nonlocal errors
t0 = time.time()
payload = _payload_fn(idx)
if payload is None:
return
url = payload.pop("__url__", None)
method = payload.pop("__method__", "PUT")
if url is None:
return
status, resp = make_request(url, data=payload, method=method,
headers=self.req_headers)
latency = time.time() - t0
with error_lock:
latencies.append(latency)
if status not in [200, 201]:
errors += 1
start = time.time()
with ThreadPoolExecutor(max_workers=worker_count) as executor:
futures = [executor.submit(_work, i) for i in range(self.batch_size)]
for f in as_completed(futures):
try:
f.result()
except Exception:
with error_lock:
errors += 1
elapsed = time.time() - start
if not latencies:
return {"workers": worker_count, "rps": 0, "p50_ms": 0, "p95_ms": 0,
"p99_ms": 0, "errors": errors}
s = sorted(latencies)
n = len(s)
return {
"workers": worker_count,
"rps": round(n / elapsed, 1) if elapsed > 0 else 0,
"p50_ms": round(s[int(n * 0.50)] * 1000, 0),
"p95_ms": round(s[min(int(n * 0.95), n - 1)] * 1000, 0),
"p99_ms": round(s[min(int(n * 0.99), n - 1)] * 1000, 0),
"errors": errors,
}
def report_section(self):
if not self.levels:
return None
analysis_parts = []
if self.optimal_workers:
opt = next((l for l in self.levels if l["workers"] == self.optimal_workers), None)
if opt:
analysis_parts.append(
f"Throughput peaks at {self.optimal_workers} workers ({opt['rps']} rps)."
)
if self.saturation_point:
sat = next((l for l in self.levels if l["workers"] == self.saturation_point), None)
if sat:
analysis_parts.append(
f"At {self.saturation_point}+ workers, p95 exceeds {sat['p95_ms']:.0f}ms "
f"and errors appear -- thread pool saturation."
)
return {
"batch_size": self.batch_size,
"levels": self.levels,
"optimal_workers": self.optimal_workers,
"saturation_point": self.saturation_point,
"analysis": " ".join(analysis_parts) if analysis_parts else "Ramp test completed.",
}
# ── Helper to create a service ───────────────────────────────────────────────
def create_service(endpoint, data):
status, resp = make_request(f"{SERVER_URL}/api/v1/services/{endpoint}", data=data,
method="PUT", headers=headers)
if status in [200, 201] and isinstance(resp, dict):
fqn = resp["fullyQualifiedName"]
print(f" Service created: {fqn}")
return fqn
print(f" Failed to create service: {status} - {resp}")
return None
# ══════════════════════════════════════════════════════════════════════════════
# PHASE 1: Metadata (no dependencies)
# ══════════════════════════════════════════════════════════════════════════════
phase_start = time.time()
print("\n" + "=" * 60)
print("PHASE 1: Metadata (domains, classifications, glossaries, users, teams)")
print("=" * 60)
# ── Domains ──────────────────────────────────────────────────────────────────
domain_fqns = []
if should_run("domains") and NUM_DOMAINS > 0:
def domain_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/domains",
"name": f"TestDomain_{idx:04d}",
"domainType": "Aggregate",
"description": f"Test domain {idx} for load testing",
}
def collect_domain(idx, resp):
with collect_lock:
domain_fqns.append(resp.get("fullyQualifiedName", f"TestDomain_{idx:04d}"))
create_entity_batch("domains", NUM_DOMAINS, domain_payload, collect_fn=collect_domain)
# ── Classifications & Tags ───────────────────────────────────────────────────
classification_fqns = []
if should_run("classifications") and NUM_CLASSIFICATIONS > 0:
print(f"\nCreating {NUM_CLASSIFICATIONS} classifications...")
sys.stdout.flush()
for i in range(NUM_CLASSIFICATIONS):
data = {
"name": f"TestClassification_{i:04d}",
"description": f"Test classification {i} for load testing",
}
status, resp = make_request(f"{SERVER_URL}/api/v1/classifications", data=data,
method="PUT", headers=headers)
if status in [200, 201] and isinstance(resp, dict):
classification_fqns.append(resp["fullyQualifiedName"])
stats["classifications"] = len(classification_fqns)
print(f"classifications done: {len(classification_fqns)} created")
if should_run("tags") and NUM_TAGS > 0 and classification_fqns:
tags_per_class = max(1, NUM_TAGS // len(classification_fqns))
tag_assignments = []
for cfqn in classification_fqns:
for j in range(tags_per_class):
tag_assignments.append((cfqn, j))
if len(tag_assignments) >= NUM_TAGS:
break
if len(tag_assignments) >= NUM_TAGS:
break
def tag_payload(idx):
cfqn, j = tag_assignments[idx]
return {
"__url__": f"{SERVER_URL}/api/v1/tags",
"name": f"Tag_{j:05d}",
"classification": cfqn,
"description": f"Test tag {j}",
}
create_entity_batch("tags", len(tag_assignments), tag_payload)
# ── Glossaries & Terms ───────────────────────────────────────────────────────
glossary_fqns = []
if should_run("glossaries") and NUM_GLOSSARIES > 0:
print(f"\nCreating {NUM_GLOSSARIES} glossaries...")
sys.stdout.flush()
for i in range(NUM_GLOSSARIES):
data = {
"name": f"TestGlossary_{i:04d}",
"displayName": f"Test Glossary {i}",
"description": f"Test glossary {i} for load testing",
}
status, resp = make_request(f"{SERVER_URL}/api/v1/glossaries", data=data,
method="PUT", headers=headers)
if status in [200, 201] and isinstance(resp, dict):
glossary_fqns.append(resp["fullyQualifiedName"])
stats["glossaries"] = len(glossary_fqns)
print(f"glossaries done: {len(glossary_fqns)} created")
if should_run("glossaryTerms") and NUM_GLOSSARY_TERMS > 0 and glossary_fqns:
terms_per_glossary = max(1, NUM_GLOSSARY_TERMS // len(glossary_fqns))
term_assignments = []
for gfqn in glossary_fqns:
for j in range(terms_per_glossary):
term_assignments.append((gfqn, j))
if len(term_assignments) >= NUM_GLOSSARY_TERMS:
break
if len(term_assignments) >= NUM_GLOSSARY_TERMS:
break
def term_payload(idx):
gfqn, j = term_assignments[idx]
return {
"__url__": f"{SERVER_URL}/api/v1/glossaryTerms",
"name": f"Term_{j:05d}",
"glossary": gfqn,
"displayName": f"Term {j}",
"description": f"Test glossary term {j}",
}
create_entity_batch("glossaryTerms", len(term_assignments), term_payload)
# ── Users ────────────────────────────────────────────────────────────────────
if should_run("users") and NUM_USERS > 0:
def user_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/users",
"name": f"testuser_{idx:05d}",
"email": f"testuser_{idx:05d}@example.com",
"displayName": f"Test User {idx}",
"description": f"Test user {idx} for load testing",
}
create_entity_batch("users", NUM_USERS, user_payload)
# ── Teams ────────────────────────────────────────────────────────────────────
if should_run("teams") and NUM_TEAMS > 0:
def team_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/teams",
"name": f"testteam_{idx:04d}",
"displayName": f"Test Team {idx}",
"description": f"Test team {idx} for load testing",
"teamType": "Group",
}
create_entity_batch("teams", NUM_TEAMS, team_payload)
phase_timings["phase_1_metadata"] = {"wall_clock_s": round(time.time() - phase_start, 2)}
# ══════════════════════════════════════════════════════════════════════════════
# PHASE 2: Services
# ══════════════════════════════════════════════════════════════════════════════
phase_start = time.time()
print("\n" + "=" * 60)
print("PHASE 2: Services")
print("=" * 60)
db_service_fqn = None
dashboard_service_fqn = None
pipeline_service_fqn = None
messaging_service_fqn = None
mlmodel_service_fqn = None
storage_service_fqn = None
search_service_fqn = None
api_service_fqn = None
need_db_svc = (should_run("tables") or should_run("storedProcedures") or should_run("queries")
or should_run("testCases") or should_run("testCaseResults")
or should_run("lineageEdges") or _need_infra("_services_db"))
need_dashboard_svc = (should_run("dashboards") or should_run("charts")
or should_run("dashboardDataModels") or _need_infra("_services_dashboard"))
need_pipeline_svc = should_run("pipelines") or _need_infra("_services_pipeline")
need_messaging_svc = should_run("topics") or _need_infra("_services_messaging")
need_mlmodel_svc = should_run("mlmodels") or _need_infra("_services_mlmodel")
need_storage_svc = should_run("containers") or _need_infra("_services_storage")
need_search_svc = should_run("searchIndexes") or _need_infra("_services_search")
need_api_svc = (should_run("apiCollections") or should_run("apiEndpoints")
or _need_infra("_services_api"))
if need_db_svc:
db_service_fqn = create_service("databaseServices", {
"name": "test-service-distributed",
"serviceType": "Mysql",
"connection": {"config": {"type": "Mysql", "username": "test",
"authType": {"password": "test"}, "hostPort": "localhost:3306"}},
})
if need_dashboard_svc:
dashboard_service_fqn = create_service("dashboardServices", {
"name": "test-dashboard-service",
"serviceType": "Looker",
"connection": {"config": {"type": "Looker", "clientId": "test-client-id",
"clientSecret": "test-client-secret",
"hostPort": "https://looker.example.com"}},
})
if need_pipeline_svc:
pipeline_service_fqn = create_service("pipelineServices", {
"name": "test-pipeline-service",
"serviceType": "Airflow",
"connection": {"config": {"type": "Airflow", "hostPort": "http://airflow.example.com:8080",
"connection": {"type": "Backend"}}},
})
if need_messaging_svc:
messaging_service_fqn = create_service("messagingServices", {
"name": "test-messaging-service",
"serviceType": "Kafka",
"connection": {"config": {"type": "Kafka", "bootstrapServers": "localhost:9092"}},
})
if need_mlmodel_svc:
mlmodel_service_fqn = create_service("mlmodelServices", {
"name": "test-mlmodel-service",
"serviceType": "Mlflow",
"connection": {"config": {"type": "Mlflow", "trackingUri": "http://mlflow.example.com:5000",
"registryUri": "http://mlflow.example.com:5000"}},
})
if need_storage_svc:
storage_service_fqn = create_service("storageServices", {
"name": "test-storage-service",
"serviceType": "S3",
"connection": {"config": {"type": "S3", "awsConfig": {
"awsAccessKeyId": "test-key", "awsSecretAccessKey": "test-secret",
"awsRegion": "us-east-1"}}},
})
if need_search_svc:
search_service_fqn = create_service("searchServices", {
"name": "test-search-service",
"serviceType": "ElasticSearch",
"connection": {"config": {"type": "ElasticSearch",
"hostPort": "http://elasticsearch.example.com:9200"}},
})
if need_api_svc:
api_service_fqn = create_service("apiServices", {
"name": "test-api-service",
"serviceType": "Rest",
"connection": {"config": {"type": "Rest",
"openAPISchemaConnection": {
"openAPISchemaURL": "http://api.example.com/openapi.json"}}},
})
phase_timings["phase_2_services"] = {"wall_clock_s": round(time.time() - phase_start, 2)}
# ══════════════════════════════════════════════════════════════════════════════
# PHASE 3: Infrastructure (databases, schemas, apiCollections)
# ══════════════════════════════════════════════════════════════════════════════
phase_start = time.time()
print("\n" + "=" * 60)
print("PHASE 3: Infrastructure (databases, schemas, API collections)")
print("=" * 60)
# ── Databases & Schemas ──────────────────────────────────────────────────────
schema_fqns = []
need_db_infra = (should_run("tables") or should_run("storedProcedures") or should_run("queries")
or should_run("testCases") or should_run("testCaseResults")
or should_run("lineageEdges") or _need_infra("_infra_db"))
if db_service_fqn and need_db_infra and NUM_TABLES > 0:
print(f"Creating {NUM_DATABASES} databases with {SCHEMAS_PER_DB} schemas each...")
sys.stdout.flush()
for i in range(NUM_DATABASES):
db_data = {"name": f"test_db_{i:04d}", "service": db_service_fqn}
status, resp = make_request(f"{SERVER_URL}/api/v1/databases", data=db_data,
method="PUT", headers=headers)
if status in [200, 201] and isinstance(resp, dict):
db_fqn = resp["fullyQualifiedName"]
for s in range(SCHEMAS_PER_DB):
schema_name = f"schema_{s:03d}" if SCHEMAS_PER_DB > 1 else "public"
s_data = {"name": schema_name, "database": db_fqn}
s_status, s_resp = make_request(f"{SERVER_URL}/api/v1/databaseSchemas",
data=s_data, method="PUT", headers=headers)
if s_status in [200, 201] and isinstance(s_resp, dict):
schema_fqns.append(s_resp["fullyQualifiedName"])
print(f" Created {NUM_DATABASES} databases, {len(schema_fqns)} schemas")
# ── API Collections ──────────────────────────────────────────────────────────
api_collection_fqns = []
if should_run("apiCollections") and api_service_fqn and NUM_API_COLLECTIONS > 0:
def api_coll_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/apiCollections",
"name": f"api_collection_{idx:04d}",
"service": api_service_fqn,
"displayName": f"API Collection {idx}",
"description": f"Test API collection {idx}",
}
def collect_api_coll(idx, resp):
with collect_lock:
api_collection_fqns.append(resp.get("fullyQualifiedName",
f"test-api-service.api_collection_{idx:04d}"))
create_entity_batch("apiCollections", NUM_API_COLLECTIONS, api_coll_payload,
collect_fn=collect_api_coll)
phase_timings["phase_3_infrastructure"] = {"wall_clock_s": round(time.time() - phase_start, 2)}
# ══════════════════════════════════════════════════════════════════════════════
# RAMP TEST (if enabled)
# ══════════════════════════════════════════════════════════════════════════════
ramp_result = None
if RAMP_MODE:
ramp_tester = RampTester(SERVER_URL, dict(headers), max_workers=NUM_WORKERS,
batch_size=RAMP_BATCH)
ramp_entity_type = "tables"
if ONLY_ENTITIES:
for candidate in ["tables", "topics", "dashboards", "pipelines"]:
if candidate in ONLY_ENTITIES:
ramp_entity_type = candidate
break
ramp_schema_fqns = schema_fqns if schema_fqns else None
ramp_service_fqn = None
if ramp_entity_type == "topics":
ramp_service_fqn = messaging_service_fqn
elif ramp_entity_type == "dashboards":
ramp_service_fqn = dashboard_service_fqn
elif ramp_entity_type == "pipelines":
ramp_service_fqn = pipeline_service_fqn
ramp_tester.run(entity_type=ramp_entity_type, schema_fqns_list=ramp_schema_fqns,
service_fqn=ramp_service_fqn)
ramp_result = ramp_tester.report_section()
if ramp_result:
ramp_result["entity_type"] = ramp_entity_type
# ══════════════════════════════════════════════════════════════════════════════
# PHASE 4: Core entities
# ══════════════════════════════════════════════════════════════════════════════
phase_start = time.time()
print("\n" + "=" * 60)
print("PHASE 4: Core entities")
print("=" * 60)
# ── Tables ───────────────────────────────────────────────────────────────────
if should_run("tables") and schema_fqns and NUM_TABLES > 0:
def table_payload(idx):
sfqn = schema_fqns[idx % len(schema_fqns)]
return {
"__url__": f"{SERVER_URL}/api/v1/tables",
"name": f"table_{idx:07d}",
"databaseSchema": sfqn,
"columns": [
{"name": "id", "dataType": "BIGINT", "description": "Primary key"},
{"name": "name", "dataType": "VARCHAR", "dataLength": 255},
{"name": "created_at", "dataType": "TIMESTAMP"},
{"name": "data", "dataType": "JSON"},
],
}
def collect_table(idx, resp):
with collect_lock:
if len(collected_table_ids) < MAX_COLLECT:
collected_table_ids.append((resp["id"], resp.get("fullyQualifiedName", "")))
create_entity_batch("tables", NUM_TABLES, table_payload, collect_fn=collect_table)
# ── Dashboards ───────────────────────────────────────────────────────────────
if should_run("dashboards") and dashboard_service_fqn and NUM_DASHBOARDS > 0:
def dashboard_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/dashboards",
"name": f"dashboard_{idx:06d}",
"service": dashboard_service_fqn,
"displayName": f"Test Dashboard {idx}",
"description": f"Test dashboard {idx}",
}
def collect_dashboard(idx, resp):
with collect_lock:
if len(collected_dashboard_ids) < MAX_COLLECT:
collected_dashboard_ids.append((resp["id"], resp.get("fullyQualifiedName", "")))
create_entity_batch("dashboards", NUM_DASHBOARDS, dashboard_payload,
collect_fn=collect_dashboard)
# ── Charts ───────────────────────────────────────────────────────────────────
if should_run("charts") and dashboard_service_fqn and NUM_CHARTS > 0:
chart_types = ["Line", "Bar", "Pie", "Area", "Scatter", "Table"]
def chart_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/charts",
"name": f"chart_{idx:06d}",
"service": dashboard_service_fqn,
"displayName": f"Test Chart {idx}",
"chartType": chart_types[idx % len(chart_types)],
"description": f"Test chart {idx}",
}
create_entity_batch("charts", NUM_CHARTS, chart_payload)
# ── Topics ───────────────────────────────────────────────────────────────────
if should_run("topics") and messaging_service_fqn and NUM_TOPICS > 0:
def topic_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/topics",
"name": f"topic_{idx:06d}",
"service": messaging_service_fqn,
"partitions": 3,
"replicationFactor": 1,
"description": f"Test topic {idx}",
}
create_entity_batch("topics", NUM_TOPICS, topic_payload)
# ── Pipelines ────────────────────────────────────────────────────────────────
if should_run("pipelines") and pipeline_service_fqn and NUM_PIPELINES > 0:
def pipeline_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/pipelines",
"name": f"pipeline_{idx:06d}",
"service": pipeline_service_fqn,
"displayName": f"Test Pipeline {idx}",
"description": f"Test pipeline {idx}",
}
def collect_pipeline(idx, resp):
with collect_lock:
if len(collected_pipeline_ids) < MAX_COLLECT:
collected_pipeline_ids.append((resp["id"], resp.get("fullyQualifiedName", "")))
create_entity_batch("pipelines", NUM_PIPELINES, pipeline_payload,
collect_fn=collect_pipeline)
# ── Stored Procedures ────────────────────────────────────────────────────────
if should_run("storedProcedures") and db_service_fqn and schema_fqns and NUM_STORED_PROCEDURES > 0:
def stored_proc_payload(idx):
sfqn = schema_fqns[idx % len(schema_fqns)]
return {
"__url__": f"{SERVER_URL}/api/v1/storedProcedures",
"name": f"stored_proc_{idx:06d}",
"databaseSchema": sfqn,
"storedProcedureCode": {
"code": f"CREATE PROCEDURE sp_{idx}() BEGIN SELECT 1; END",
"language": "SQL",
},
"description": f"Test stored procedure {idx}",
}
create_entity_batch("storedProcedures", NUM_STORED_PROCEDURES, stored_proc_payload)
# ── ML Models ────────────────────────────────────────────────────────────────
if should_run("mlmodels") and mlmodel_service_fqn and NUM_MLMODELS > 0:
algorithms = ["LinearRegression", "RandomForest", "XGBoost", "NeuralNetwork",
"SVM", "KMeans", "DecisionTree"]
def mlmodel_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/mlmodels",
"name": f"mlmodel_{idx:06d}",
"service": mlmodel_service_fqn,
"algorithm": algorithms[idx % len(algorithms)],
"displayName": f"Test ML Model {idx}",
"description": f"Test ML model {idx}",
}
create_entity_batch("mlmodels", NUM_MLMODELS, mlmodel_payload)
# ── Containers ───────────────────────────────────────────────────────────────
if should_run("containers") and storage_service_fqn and NUM_CONTAINERS > 0:
def container_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/containers",
"name": f"container_{idx:06d}",
"service": storage_service_fqn,
"displayName": f"Test Container {idx}",
"description": f"Test container {idx}",
}
create_entity_batch("containers", NUM_CONTAINERS, container_payload)
# ── Search Indexes ───────────────────────────────────────────────────────────
if should_run("searchIndexes") and search_service_fqn and NUM_SEARCH_INDEXES > 0:
def search_idx_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/searchIndexes",
"name": f"search_index_{idx:06d}",
"service": search_service_fqn,
"displayName": f"Test Search Index {idx}",
"description": f"Test search index {idx}",
"fields": [
{"name": "id", "dataType": "KEYWORD"},
{"name": "content", "dataType": "TEXT"},
{"name": "timestamp", "dataType": "DATE"},
],
}
create_entity_batch("searchIndexes", NUM_SEARCH_INDEXES, search_idx_payload)
# ── Queries ──────────────────────────────────────────────────────────────────
if should_run("queries") and NUM_QUERIES > 0 and db_service_fqn:
def query_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/queries",
"name": f"query_{idx:06d}",
"query": f"SELECT * FROM table_{idx % max(1, NUM_TABLES):07d} WHERE id = {idx}",
"service": db_service_fqn,
"description": f"Test query {idx}",
}
create_entity_batch("queries", NUM_QUERIES, query_payload)
# ── Dashboard Data Models ────────────────────────────────────────────────────
if should_run("dashboardDataModels") and dashboard_service_fqn and NUM_DASHBOARD_DATA_MODELS > 0:
dm_types = ["MetabaseDataModel", "SupersetDataModel", "TableauDataModel"]
def data_model_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/dashboard/datamodels",
"name": f"data_model_{idx:06d}",
"service": dashboard_service_fqn,
"dataModelType": dm_types[idx % len(dm_types)],
"displayName": f"Test Data Model {idx}",
"description": f"Test dashboard data model {idx}",
"columns": [
{"name": "id", "dataType": "BIGINT"},
{"name": "value", "dataType": "VARCHAR", "dataLength": 255},
],
}
create_entity_batch("dashboardDataModels", NUM_DASHBOARD_DATA_MODELS, data_model_payload)
# ── API Endpoints ────────────────────────────────────────────────────────────
if should_run("apiEndpoints") and api_service_fqn and api_collection_fqns and NUM_API_ENDPOINTS > 0:
http_methods = ["GET", "POST", "PUT", "DELETE", "PATCH"]
def api_endpoint_payload(idx):
coll_fqn = api_collection_fqns[idx % len(api_collection_fqns)]
return {
"__url__": f"{SERVER_URL}/api/v1/apiEndpoints",
"name": f"endpoint_{idx:06d}",
"apiCollection": coll_fqn,
"endpointURL": f"https://api.example.com/v1/resource_{idx}",
"requestMethod": http_methods[idx % len(http_methods)],
"displayName": f"Test Endpoint {idx}",
"description": f"Test API endpoint {idx}",
}
create_entity_batch("apiEndpoints", NUM_API_ENDPOINTS, api_endpoint_payload)
# ── Data Products ────────────────────────────────────────────────────────────
if should_run("dataProducts") and NUM_DATA_PRODUCTS > 0 and domain_fqns:
def data_product_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/dataProducts",
"name": f"data_product_{idx:04d}",
"domains": [domain_fqns[idx % len(domain_fqns)]],
"displayName": f"Test Data Product {idx}",
"description": f"Test data product {idx}",
}
create_entity_batch("dataProducts", NUM_DATA_PRODUCTS, data_product_payload)
phase_timings["phase_4_core_entities"] = {"wall_clock_s": round(time.time() - phase_start, 2)}
# ══════════════════════════════════════════════════════════════════════════════
# PHASE 5: Data Quality (test suites, test cases)
# ══════════════════════════════════════════════════════════════════════════════
phase_start = time.time()
print("\n" + "=" * 60)
print("PHASE 5: Data Quality (test suites, test cases)")
print("=" * 60)
test_suite_fqns = []
if should_run("testSuites") and NUM_TEST_SUITES > 0:
def test_suite_payload(idx):
return {
"__url__": f"{SERVER_URL}/api/v1/dataQuality/testSuites",
"name": f"testSuite_{idx:04d}",
"displayName": f"Test Suite {idx}",
"description": f"Test suite {idx} for load testing",
}
def collect_test_suite(idx, resp):
with collect_lock:
test_suite_fqns.append(resp.get("fullyQualifiedName",
f"testSuite_{idx:04d}"))
create_entity_batch("testSuites", NUM_TEST_SUITES, test_suite_payload,
collect_fn=collect_test_suite)
if should_run("testCases") and NUM_TEST_CASES > 0 and collected_table_ids:
table_level_defs = [
("tableRowCountToEqual", [{"name": "value", "value": "100"}]),
("tableColumnCountToEqual", [{"name": "columnCount", "value": "4"}]),
]
column_level_defs = [
("columnValuesToBeNotNull", []),
("columnValuesToBeUnique", []),
]
column_names = ["id", "name", "created_at", "data"]
def test_case_payload(idx):
table_id, table_fqn = collected_table_ids[idx % len(collected_table_ids)]
if idx % 3 == 0:
defn, params = table_level_defs[idx % len(table_level_defs)]
entity_link = f"<#E::table::{table_fqn}>"
else:
defn, params = column_level_defs[idx % len(column_level_defs)]
col = column_names[idx % len(column_names)]
entity_link = f"<#E::table::{table_fqn}::columns::{col}>"
return {
"__url__": f"{SERVER_URL}/api/v1/dataQuality/testCases",
"name": f"testCase_{idx:06d}",
"entityLink": entity_link,
"testDefinition": defn,
"parameterValues": params,
}
def collect_test_case(idx, resp):
with collect_lock:
if len(collected_test_case_fqns) < MAX_COLLECT:
fqn = resp.get("fullyQualifiedName")
if fqn:
collected_test_case_fqns.append(fqn)
create_entity_batch("testCases", NUM_TEST_CASES, test_case_payload,
collect_fn=collect_test_case)
phase_timings["phase_5_data_quality"] = {"wall_clock_s": round(time.time() - phase_start, 2)}
# ══════════════════════════════════════════════════════════════════════════════
# PHASE 6: Lineage
# ══════════════════════════════════════════════════════════════════════════════
phase_start = time.time()
print("\n" + "=" * 60)
print("PHASE 6: Lineage")
print("=" * 60)
if should_run("lineageEdges") and NUM_LINEAGE_EDGES > 0 and collected_table_ids:
n_t2t = int(NUM_LINEAGE_EDGES * 0.60)
n_t2d = int(NUM_LINEAGE_EDGES * 0.25)
n_p2t = NUM_LINEAGE_EDGES - n_t2t - n_t2d
lineage_tasks = []
for i in range(n_t2t):
from_idx = i % len(collected_table_ids)
to_idx = (i + 1) % len(collected_table_ids)
lineage_tasks.append(("table", collected_table_ids[from_idx][0],
"table", collected_table_ids[to_idx][0]))
if collected_dashboard_ids:
for i in range(n_t2d):
from_idx = i % len(collected_table_ids)
to_idx = i % len(collected_dashboard_ids)
lineage_tasks.append(("table", collected_table_ids[from_idx][0],
"dashboard", collected_dashboard_ids[to_idx][0]))
else:
n_t2t += n_t2d
n_t2d = 0
if collected_pipeline_ids:
for i in range(n_p2t):
from_idx = i % len(collected_pipeline_ids)
to_idx = (i + len(collected_table_ids) // 2) % len(collected_table_ids)
lineage_tasks.append(("pipeline", collected_pipeline_ids[from_idx][0],
"table", collected_table_ids[to_idx][0]))
else:
n_t2t += n_p2t
n_p2t = 0
actual_edges = len(lineage_tasks)
def lineage_payload(idx):
if idx >= len(lineage_tasks):
return None
from_type, from_id, to_type, to_id = lineage_tasks[idx]
return {
"__url__": f"{SERVER_URL}/api/v1/lineage",
"__method__": "PUT",
"edge": {
"fromEntity": {"id": from_id, "type": from_type},
"toEntity": {"id": to_id, "type": to_type},
},
}
create_entity_batch("lineageEdges", actual_edges, lineage_payload,
workers=min(10, NUM_WORKERS))
elif should_run("lineageEdges") and NUM_LINEAGE_EDGES > 0:
print("Skipping lineage: no table IDs collected")
stats["lineageEdges"] = 0
phase_timings["phase_6_lineage"] = {"wall_clock_s": round(time.time() - phase_start, 2)}
# ══════════════════════════════════════════════════════════════════════════════
# PHASE 7: Time-Series Data
# ══════════════════════════════════════════════════════════════════════════════
phase_start = time.time()
print("\n" + "=" * 60)
print("PHASE 7: Time-Series Data")
print("=" * 60)
ts_workers = min(10, NUM_WORKERS)
base_ts = int(time.time() * 1000)
# ── Test Case Results ────────────────────────────────────────────────────────
if should_run("testCaseResults") and NUM_TEST_CASE_RESULTS > 0 and collected_test_case_fqns:
statuses = ["Success", "Failed", "Aborted"]
def tc_result_payload(idx):
fqn = collected_test_case_fqns[idx % len(collected_test_case_fqns)]
encoded_fqn = urllib.request.quote(fqn, safe="")
ts = base_ts - (idx * 3600000)
return {
"__url__": f"{SERVER_URL}/api/v1/dataQuality/testCases/testCaseResults/{encoded_fqn}",
"__method__": "POST",
"timestamp": ts,
"testCaseStatus": statuses[idx % len(statuses)],
"result": f"Test result {idx}",
"testResultValue": [{"name": "value", "value": str(round(random.uniform(0, 100), 2))}],
}
create_entity_batch("testCaseResults", NUM_TEST_CASE_RESULTS, tc_result_payload,
workers=ts_workers)
elif should_run("testCaseResults") and NUM_TEST_CASE_RESULTS > 0:
print("Skipping testCaseResults: no test case FQNs collected")
stats["testCaseResults"] = 0
# ── Entity Report Data ───────────────────────────────────────────────────────
if should_run("entityReportData") and NUM_ENTITY_REPORT_DATA > 0:
entity_types_for_report = ["table", "topic", "dashboard", "pipeline", "mlmodel"]
def entity_report_payload(idx):
ts = base_ts - (idx * 86400000)
e_type = entity_types_for_report[idx % len(entity_types_for_report)]
entity_count = random.randint(1, 1000)
has_owner = random.randint(0, entity_count)
return {
"__url__": f"{SERVER_URL}/api/v1/analytics/dataInsights/data",
"__method__": "POST",
"timestamp": ts,
"reportDataType": "entityReportData",
"data": {
"entityType": e_type,
"entityTier": f"Tier.Tier{(idx % 5) + 1}",
"serviceName": "test-service-distributed",
"completedDescriptions": random.randint(0, 100),
"missingDescriptions": random.randint(0, 50),
"hasOwner": has_owner,
"missingOwner": entity_count - has_owner,
"entityCount": entity_count,
},
}
create_entity_batch("entityReportData", NUM_ENTITY_REPORT_DATA, entity_report_payload,
workers=ts_workers)
# ── Web Analytic Entity Views ────────────────────────────────────────────────
if should_run("webAnalyticViews") and NUM_WEB_ANALYTIC_VIEWS > 0:
def web_view_payload(idx):
ts = base_ts - (idx * 60000)
return {
"__url__": f"{SERVER_URL}/api/v1/analytics/dataInsights/data",
"__method__": "POST",
"timestamp": ts,
"reportDataType": "webAnalyticEntityViewReportData",
"data": {
"entityType": "table",
"entityFqn": f"test-service-distributed.test_db_0000.public.table_{idx % max(1, NUM_TABLES):07d}",
"entityHref": f"{SERVER_URL}/table/test-service-distributed.test_db_0000.public.table_{idx % max(1, NUM_TABLES):07d}",
"owner": f"user_{idx % 50}",
"views": random.randint(1, 500),
},
}
create_entity_batch("webAnalyticViews", NUM_WEB_ANALYTIC_VIEWS, web_view_payload,
workers=ts_workers)
# ── Web Analytic User Activity ───────────────────────────────────────────────
if should_run("webAnalyticActivity") and NUM_WEB_ANALYTIC_ACTIVITY > 0:
def web_activity_payload(idx):
ts = base_ts - (idx * 60000)
return {
"__url__": f"{SERVER_URL}/api/v1/analytics/dataInsights/data",
"__method__": "POST",
"timestamp": ts,
"reportDataType": "webAnalyticUserActivityReportData",
"data": {
"userName": f"testuser_{idx % max(1, NUM_USERS):05d}",
"userId": str(uuid.uuid4()),
"team": "test-team",
"totalSessions": random.randint(1, 20),
"totalSessionDuration": random.randint(10, 3600),
"totalPageView": random.randint(1, 100),
"lastSession": ts,
},
}
create_entity_batch("webAnalyticActivity", NUM_WEB_ANALYTIC_ACTIVITY,
web_activity_payload, workers=ts_workers)
# ── Raw Cost Analysis ────────────────────────────────────────────────────────
if should_run("rawCostAnalysis") and NUM_RAW_COST_ANALYSIS > 0:
def raw_cost_payload(idx):
ts = base_ts - (idx * 86400000)
if collected_table_ids:
table_id, table_fqn = collected_table_ids[idx % len(collected_table_ids)]
else:
table_id = str(uuid.uuid4())
table_fqn = f"test-service-distributed.test_db_0000.public.table_{idx % max(1, NUM_TABLES):07d}"
return {
"__url__": f"{SERVER_URL}/api/v1/analytics/dataInsights/data",
"__method__": "POST",
"timestamp": ts,
"reportDataType": "rawCostAnalysisReportData",
"data": {
"entity": {
"id": table_id,
"type": "table",
"fullyQualifiedName": table_fqn,
},
"sizeInByte": round(random.uniform(100.0, 100000.0), 2),
},
}
create_entity_batch("rawCostAnalysis", NUM_RAW_COST_ANALYSIS, raw_cost_payload,
workers=ts_workers)
# ── Aggregated Cost Analysis ─────────────────────────────────────────────────
if should_run("aggCostAnalysis") and NUM_AGG_COST_ANALYSIS > 0:
def agg_cost_payload(idx):
ts = base_ts - (idx * 86400000)
return {
"__url__": f"{SERVER_URL}/api/v1/analytics/dataInsights/data",
"__method__": "POST",
"timestamp": ts,
"reportDataType": "aggregatedCostAnalysisReportData",
"data": {
"entityType": "table",
"serviceName": "test-service-distributed",
"serviceType": "BigQuery",
"totalSize": round(random.uniform(1000.0, 1000000.0), 2),
"totalCount": random.randint(100, 100000),
"unusedDataAssets": {
"count": {"threeDays": random.randint(1, 50), "sevenDays": random.randint(1, 40),
"fourteenDays": random.randint(1, 30), "thirtyDays": random.randint(1, 20),
"sixtyDays": random.randint(1, 10)},
"size": {"threeDays": random.randint(100, 10000), "sevenDays": random.randint(100, 9000),
"fourteenDays": random.randint(100, 8000), "thirtyDays": random.randint(100, 7000),
"sixtyDays": random.randint(100, 6000)},
"totalSize": random.randint(1000, 50000),
"totalCount": random.randint(1, 50),
},
"frequentlyUsedDataAssets": {
"count": {"threeDays": random.randint(1, 10), "sevenDays": random.randint(1, 20),
"fourteenDays": random.randint(1, 30), "thirtyDays": random.randint(1, 40),
"sixtyDays": random.randint(1, 50)},
"size": {"threeDays": random.randint(1000, 50000), "sevenDays": random.randint(1000, 60000),
"fourteenDays": random.randint(1000, 70000), "thirtyDays": random.randint(1000, 80000),
"sixtyDays": random.randint(1000, 90000)},
"totalSize": random.randint(1000, 100000),
"totalCount": random.randint(1, 50),
},
},
}
create_entity_batch("aggCostAnalysis", NUM_AGG_COST_ANALYSIS, agg_cost_payload,
workers=ts_workers)
phase_timings["phase_7_time_series"] = {"wall_clock_s": round(time.time() - phase_start, 2)}
# ══════════════════════════════════════════════════════════════════════════════
# STOP HEALTH MONITOR & BUILD REPORT
# ══════════════════════════════════════════════════════════════════════════════
health_monitor.stop()
introspector.scrape_after()
introspector.collect_diagnostics("after")
overall_elapsed = time.time() - overall_start
total_created = sum(stats.values())
# ── Cluster sizing analysis ──────────────────────────────────────────────────
def generate_cluster_sizing(report, ramp_data=None):
findings = []
recommendations = {}
config_summary = []
entity_data = report.get("entities", {})
health = report.get("server_health", {})
measured_workers = report["metadata"]["workers"]
max_p95 = 0
avg_latency_ms = 0
total_latency_entries = 0
has_503 = False
has_504_or_timeout = False
has_conn_refused = False
has_bimodal = False
for entity, data in entity_data.items():
if not data:
continue
p95 = data.get("latency_ms", {}).get("p95", 0)
avg = data.get("latency_ms", {}).get("avg", 0)
if p95 > max_p95:
max_p95 = p95
total_latency_entries += 1
avg_latency_ms += avg
if p95 > 5000:
findings.append(f"{entity} PUT p95={p95:.0f}ms -- severe bottleneck")
elif p95 > 1000:
findings.append(f"{entity} PUT p95={p95:.0f}ms -- moderate latency")
latency_analysis = data.get("latency_analysis", {})
if latency_analysis.get("bimodal"):
has_bimodal = True
for finding in latency_analysis.get("findings", []):
findings.append(f"{entity}: {finding}")
error_breakdown = data.get("error_breakdown", {})
for code, count in error_breakdown.items():
if code == "503":
has_503 = True
findings.append(
f"{entity}: {count}x HTTP 503 -- admission control rejection "
f"(BulkExecutor queue full)"
)
elif code in ("504", "timeout"):
has_504_or_timeout = True
findings.append(
f"{entity}: {count}x HTTP {code} -- request timeout, "
f"thread pool likely exhausted"
)
elif code == "connection_error":
has_conn_refused = True
findings.append(
f"{entity}: {count}x connection refused/reset -- "
f"server accept queue full"
)
elif code == "500":
findings.append(f"{entity}: {count}x HTTP 500 -- internal server error, check logs")
elif code == "429":
findings.append(f"{entity}: {count}x HTTP 429 -- rate limited")
if total_latency_entries > 0:
avg_latency_ms = avg_latency_ms / total_latency_entries
total_checks = health.get("total_checks", 0)
unhealthy_count = health.get("unhealthy", 0)
if unhealthy_count > 0 and total_checks > 0:
pct = unhealthy_count / total_checks * 100
findings.append(f"Health check failed {unhealthy_count} times ({pct:.0f}%) -- server overwhelmed")
for entity, data in entity_data.items():
if not data:
continue
windows = data.get("throughput_over_time", [])
if len(windows) >= 4:
q_len = max(1, len(windows) // 4)
first_q = [w["rps"] for w in windows[:q_len]]
last_q = [w["rps"] for w in windows[-q_len:]]
if first_q and last_q:
avg_first = sum(first_q) / len(first_q)
avg_last = sum(last_q) / len(last_q)
if avg_first > 0 and avg_last / avg_first < 0.5:
degradation = (1 - avg_last / avg_first) * 100
findings.append(
f"{entity}: throughput degraded {degradation:.0f}% "
f"({avg_first:.0f} rps -> {avg_last:.0f} rps) -- resource exhaustion"
)
overall_rps = report.get("overall", {}).get("overall_throughput_rps", 0)
effective_capacity = int(150 / (avg_latency_ms / 1000)) if avg_latency_ms > 0 else 0
recommendations["server_threads"] = {
"analysis": (
f"{measured_workers} concurrent writers with 150 max server threads. "
f"Each PUT holds a thread for ~{avg_latency_ms:.0f}ms avg. "
f"Effective capacity: ~{effective_capacity} rps. "
f"Measured: {overall_rps:.0f} rps"
+ (" -- threads are blocked on DB/search." if overall_rps < effective_capacity * 0.5 else ".")
),
"current_env": "SERVER_MAX_THREADS=150",
"recommended_env": "SERVER_MAX_THREADS=300" if max_p95 > 2000 or has_504_or_timeout else "SERVER_MAX_THREADS=150",
"yaml_path": "server.applicationConnectors[0].maxThreads",
}
if max_p95 > 2000 or has_504_or_timeout:
config_summary.append("SERVER_MAX_THREADS=300")
recommendations["virtual_threads"] = {
"analysis": "Java 21 virtual threads can eliminate thread pool as bottleneck for I/O-bound workloads.",
"current_env": "SERVER_ENABLE_VIRTUAL_THREAD=false",
"recommended_env": "SERVER_ENABLE_VIRTUAL_THREAD=true",
"yaml_path": "server.enableVirtualThreads",
}
config_summary.append("SERVER_ENABLE_VIRTUAL_THREAD=true")
db_pool_rec = 100
if measured_workers > 15 or has_bimodal:
db_pool_rec = 150
if max_p95 > 5000:
db_pool_rec = 200
recommendations["db_pool"] = {
"analysis": (
f"{measured_workers} concurrent writers with 100 max DB connections. "
f"Each PUT does multiple DB round-trips. "
f"At {measured_workers} concurrent, pool utilization ~{min(99, measured_workers * 100 // 100)}%. "
f"Connection wait time may add 50-200ms."
),
"current_env": "DB_CONNECTION_POOL_MAX_SIZE=100",
"recommended_env": f"DB_CONNECTION_POOL_MAX_SIZE={db_pool_rec}",
"yaml_path": "database.hikari.maximumPoolSize",
}
if db_pool_rec > 100:
config_summary.append(f"DB_CONNECTION_POOL_MAX_SIZE={db_pool_rec}")
if has_bimodal:
recommendations["db_connection_timeout"] = {
"analysis": (
"Connection timeout 30s. If pool is exhausted, requests wait up to 30s "
"for a connection -- this explains the bimodal latency pattern."
),
"current_env": "DB_CONNECTION_TIMEOUT=30000",
"recommended_env": "DB_CONNECTION_TIMEOUT=10000",
"note": "Fail faster to prevent cascading timeouts",
}
config_summary.append("DB_CONNECTION_TIMEOUT=10000")
if max_p95 > 2000 or measured_workers > 10:
recommendations["search_connections"] = {
"analysis": (
"Search max connections: 30 total, 10 per route. "
"Async indexing means search rarely blocks PUTs directly, "
"but under heavy write load the search queue can back up."
),
"current_env": "ELASTICSEARCH_MAX_CONN_TOTAL=30",
"recommended_env": "ELASTICSEARCH_MAX_CONN_TOTAL=50",
}
config_summary.append("ELASTICSEARCH_MAX_CONN_TOTAL=50")
if has_503:
recommendations["bulk_operation"] = {
"analysis": "HTTP 503 errors indicate BulkExecutor queue is full.",
"current_env": "BULK_OPERATION_QUEUE_SIZE=1000, BULK_OPERATION_MAX_THREADS=10",
"recommended_env": "BULK_OPERATION_QUEUE_SIZE=2000, BULK_OPERATION_MAX_THREADS=20",
}
config_summary.append("BULK_OPERATION_QUEUE_SIZE=2000")
config_summary.append("BULK_OPERATION_MAX_THREADS=20")
if has_conn_refused:
recommendations["accept_queue"] = {
"analysis": "Connection refused/reset errors indicate server accept queue is full.",
"current_env": "SERVER_ACCEPT_QUEUE_SIZE=256",
"recommended_env": "SERVER_ACCEPT_QUEUE_SIZE=512",
}
config_summary.append("SERVER_ACCEPT_QUEUE_SIZE=512")
ramp_optimal = None
if ramp_data:
ramp_optimal = ramp_data.get("optimal_workers", measured_workers)
recommendations["api_concurrency"] = {
"current": measured_workers,
"recommended": ramp_optimal,
"reason": f"Ramp test shows optimal throughput at {ramp_optimal} workers",
}
else:
if max_p95 > 5000:
rec_workers = max(2, measured_workers // 4)
elif max_p95 > 2000:
rec_workers = max(2, measured_workers // 2)
else:
rec_workers = measured_workers
recommendations["api_concurrency"] = {
"current": measured_workers,
"recommended": rec_workers,
"reason": f"Max p95 latency {max_p95:.0f}ms across entity types",
}
if overall_rps > 0:
rec_rps = overall_rps
if ramp_optimal and ramp_optimal != measured_workers:
scale_factor = ramp_optimal / measured_workers if measured_workers > 0 else 1
rec_rps = overall_rps * max(0.5, min(2.0, scale_factor))
estimates = {}
for target in [50000, 250000, 1000000, 5000000]:
secs = target / rec_rps
label = f"{target // 1000}k_entities"
if secs < 3600:
estimates[label] = f"~{secs / 60:.0f} min"
else:
estimates[label] = f"~{secs / 3600:.1f} hours"
recommendations["estimated_times"] = estimates
if max_p95 > 5000 or (unhealthy_count / max(1, total_checks)) > 0.1 or has_503:
assessment = "undersized"
elif max_p95 > 2000 or has_bimodal:
assessment = "marginal"
else:
assessment = "adequate"
server_side = _build_server_side_analysis(report, findings, recommendations, config_summary)
return {
"assessment": assessment,
"findings": findings,
"recommendations": recommendations,
"config_summary": config_summary,
"server_side_analysis": server_side,
}
def _build_server_side_analysis(report, findings, recommendations, config_summary):
si = report.get("server_info", {})
diag_before = si.get("diagnostics_before")
diag_after = si.get("diagnostics_after")
if not diag_after:
return {"available": False}
analysis = {"available": True, "bottlenecks": [], "latency_breakdown": {}}
jvm = diag_after.get("jvm", {})
jetty = diag_after.get("jetty", {})
db = diag_after.get("database", {})
bulk = diag_after.get("bulk_executor", {})
req_latency = diag_after.get("request_latency", {})
analysis["snapshot_after"] = {
"jvm_heap_pct": jvm.get("heap_usage_pct", 0),
"gc_pause_total_ms": jvm.get("gc_pause_total_ms", 0),
"jetty_utilization_pct": jetty.get("utilization_pct", 0),
"jetty_queue_size": jetty.get("queue_size", 0),
"db_pool_usage_pct": db.get("pool_usage_pct", 0),
"db_pool_pending": db.get("pool_pending", 0),
"bulk_queue_usage_pct": bulk.get("queue_usage_pct", 0),
}
if diag_before:
jvm_before = diag_before.get("jvm", {})
gc_before = jvm_before.get("gc_pause_total_ms", 0)
gc_after = jvm.get("gc_pause_total_ms", 0)
analysis["gc_pause_delta_ms"] = gc_after - gc_before
for ep_key, ep_data in req_latency.items():
db_pct = ep_data.get("db_pct", 0)
search_pct = ep_data.get("search_pct", 0)
internal_pct = ep_data.get("internal_pct", 0)
db_pool_pct = db.get("pool_usage_pct", 0)
if db_pct > 60 and db_pool_pct > 80:
bottleneck = (f"DB bottleneck on {ep_key}: {db_pct}% of request time in DB, "
f"pool at {db_pool_pct}% utilization")
analysis["bottlenecks"].append(bottleneck)
findings.append(bottleneck)
if "db_pool_increase" not in recommendations:
recommendations["db_pool_increase"] = {
"analysis": f"DB pool at {db_pool_pct}% with {db_pct}% of latency in DB",
"recommended_env": "DB_CONNECTION_POOL_MAX_SIZE=150",
}
config_summary.append("DB_CONNECTION_POOL_MAX_SIZE=150")
if search_pct > 30:
bottleneck = f"Search pressure on {ep_key}: {search_pct}% of request time in search"
analysis["bottlenecks"].append(bottleneck)
findings.append(bottleneck)
analysis["latency_breakdown"][ep_key] = {
"avg_total_ms": ep_data.get("avg_total_ms", 0),
"db_pct": db_pct,
"search_pct": search_pct,
"internal_pct": internal_pct,
}
jetty_util = jetty.get("utilization_pct", 0)
jetty_queue = jetty.get("queue_size", 0)
if jetty_util > 90 and jetty_queue > 0:
queue_time = jetty.get("queue_time_avg_ms", 0)
bottleneck = (f"Thread pool saturated: {jetty_util}% utilization, "
f"{jetty_queue} requests queued, avg queue wait {queue_time}ms")
analysis["bottlenecks"].append(bottleneck)
findings.append(bottleneck)
bulk_queue_pct = bulk.get("queue_usage_pct", 0)
if bulk_queue_pct > 70:
bottleneck = f"Bulk executor queue at {bulk_queue_pct}%, near rejection threshold"
analysis["bottlenecks"].append(bottleneck)
findings.append(bottleneck)
if "bulk_operation" not in recommendations:
recommendations["bulk_operation"] = {
"analysis": f"Bulk executor queue at {bulk_queue_pct}%",
"recommended_env": "BULK_OPERATION_QUEUE_SIZE=2000, BULK_OPERATION_MAX_THREADS=20",
}
config_summary.append("BULK_OPERATION_QUEUE_SIZE=2000")
config_summary.append("BULK_OPERATION_MAX_THREADS=20")
heap_pct = jvm.get("heap_usage_pct", 0)
if heap_pct > 85:
bottleneck = f"JVM heap at {heap_pct}%, GC pressure likely causing tail latency"
analysis["bottlenecks"].append(bottleneck)
findings.append(bottleneck)
return analysis
# ── Build report ─────────────────────────────────────────────────────────────
entity_summaries = {}
for name, bench in benchmarks.items():
s = bench.summary()
if s:
entity_summaries[name] = s
report = {
"metadata": {
"timestamp": datetime.now(timezone.utc).strftime("%Y-%m-%dT%H:%M:%SZ"),
"server_url": SERVER_URL,
"workers": NUM_WORKERS,
"scale": SCALE_APPLIED,
"only_entities": list(ONLY_ENTITIES) if ONLY_ENTITIES else None,
"script_version": "3.0",
},
"server_info": introspector.report_section(),
"server_health": health_monitor.summary(),
"entities": entity_summaries,
"phases": phase_timings,
"overall": {
"total_entities_created": total_created,
"total_wall_clock_s": round(overall_elapsed, 2),
"overall_throughput_rps": round(total_created / overall_elapsed, 2) if overall_elapsed > 0 else 0,
"total_errors": sum(b.failed for b in benchmarks.values()),
"overall_error_rate_pct": round(
sum(b.failed for b in benchmarks.values()) /
max(1, sum(len(b.latencies) for b in benchmarks.values())) * 100, 2
),
},
}
if ramp_result:
report["ramp_test"] = ramp_result
si = introspector.report_section()
if si.get("diagnostics_before"):
report["diagnostics_before"] = si["diagnostics_before"]
if si.get("diagnostics_after"):
report["diagnostics_after"] = si["diagnostics_after"]
report["cluster_sizing"] = generate_cluster_sizing(report, ramp_data=ramp_result)
# ── Write JSON report ────────────────────────────────────────────────────────
report_for_file = json.loads(json.dumps(report))
if "server_health" in report_for_file and "timeline" in report_for_file["server_health"]:
del report_for_file["server_health"]["timeline"]
try:
with open(output_path, "w") as f:
json.dump(report_for_file, f, indent=2)
print(f"\nJSON report written to: {output_path}")
except Exception as e:
print(f"\nFailed to write JSON report: {e}")
# ── Pretty-print summary table ──────────────────────────────────────────────
def print_summary_table(report):
print("")
print("\u2550" * 70)
print("BENCHMARK RESULTS")
print("\u2550" * 70)
si = report.get("server_info", {})
if si.get("version"):
print(f"Server: {si.get('version', '?')} (rev: {si.get('revision', '?')[:8]})")
print("")
header = f"{'Entity':<22} {'Count':>7} {'Rate/s':>8} {'p50ms':>7} {'p95ms':>7} {'p99ms':>7} {'Errors':>7}"
print(header)
print("\u2500" * 70)
for name, data in report["entities"].items():
count = data["created"]
rps = data["throughput_rps"]
p50 = data["latency_ms"]["p50"]
p95 = data["latency_ms"]["p95"]
p99 = data["latency_ms"]["p99"]
err_pct = data["error_rate_pct"]
err_str = f"{err_pct:.1f}%" if err_pct > 0 else "0.0%"
print(f"{name:<22} {count:>7} {rps:>8.1f} {p50:>7.0f} {p95:>7.0f} {p99:>7.0f} {err_str:>7}")
print("\u2500" * 70)
overall = report["overall"]
total = overall["total_entities_created"]
overall_rps = overall["overall_throughput_rps"]
overall_err = overall["overall_error_rate_pct"]
err_str = f"{overall_err:.1f}%" if overall_err > 0 else "0.0%"
print(f"{'Overall':<22} {total:>7} {overall_rps:>8.1f} {'':>7} {'':>7} {'':>7} {err_str:>7}")
print("")
has_error_breakdowns = False
for name, data in report["entities"].items():
eb = data.get("error_breakdown", {})
if eb:
if not has_error_breakdowns:
print("ERROR BREAKDOWN:")
has_error_breakdowns = True
codes = ", ".join(f"{code}={cnt}" for code, cnt in sorted(eb.items()))
print(f" {name}: {codes}")
if has_error_breakdowns:
print("")
has_latency_findings = False
for name, data in report["entities"].items():
la = data.get("latency_analysis", {})
for finding in la.get("findings", []):
if not has_latency_findings:
print("LATENCY ANALYSIS:")
has_latency_findings = True
print(f" {name}: {finding}")
if has_latency_findings:
print("")
health = report["server_health"]
total_checks = health["total_checks"]
healthy = health["healthy"]
if total_checks > 0:
health_pct = healthy / total_checks * 100
health_p95 = health.get("health_latency_ms", {}).get("p95", 0)
print(f"Server Health: {healthy}/{total_checks} checks healthy ({health_pct:.1f}%)")
print(f"Health p95 latency: {health_p95:.0f}ms")
else:
print("Server Health: no checks recorded")
print("")
sizing = report.get("cluster_sizing", {})
assessment = sizing.get("assessment", "unknown")
indicator = {"undersized": "!! UNDERSIZED", "marginal": "? MARGINAL", "adequate": "OK ADEQUATE"}.get(
assessment, f" {assessment.upper()}"
)
print(f"CLUSTER SIZING: {indicator}")
for finding in sizing.get("findings", []):
print(f" - {finding}")
recs = sizing.get("recommendations", {})
concurrency = recs.get("api_concurrency", {})
if concurrency:
print(f" - Recommend: {concurrency.get('recommended', '?')} workers "
f"(currently {concurrency.get('current', '?')})")
estimates = recs.get("estimated_times", {})
if "1000k_entities" in estimates:
print(f" - At current rate: 1M entities = {estimates['1000k_entities']}")
print("")
ssa = sizing.get("server_side_analysis", {})
if ssa.get("available"):
print("SERVER-SIDE BREAKDOWN (from /api/v1/system/diagnostics):")
snap = ssa.get("snapshot_after", {})
gc_delta = ssa.get("gc_pause_delta_ms", 0)
si_info = report.get("server_info", {})
diag_after = si_info.get("diagnostics_after", {})
jvm_d = diag_after.get("jvm", {})
jetty_d = diag_after.get("jetty", {})
db_d = diag_after.get("database", {})
bulk_d = diag_after.get("bulk_executor", {})
heap_used_gb = jvm_d.get("heap_used_bytes", 0) / (1024**3)
heap_max_gb = jvm_d.get("heap_max_bytes", 0) / (1024**3)
print(f" JVM: heap {heap_used_gb:.1f}GB/{heap_max_gb:.1f}GB "
f"({snap.get('jvm_heap_pct', 0)}%), "
f"GC pauses +{gc_delta}ms during load")
print(f" Jetty: {jetty_d.get('threads_busy', '?')}/{jetty_d.get('threads_max', '?')} "
f"threads busy ({snap.get('jetty_utilization_pct', 0)}%), "
f"queue depth: {snap.get('jetty_queue_size', 0)}")
print(f" DB Pool: {db_d.get('pool_active', '?')}/{db_d.get('pool_max', '?')} "
f"active ({snap.get('db_pool_usage_pct', 0)}%), "
f"{snap.get('db_pool_pending', 0)} pending connections")
print(f" Bulk Executor: queue {bulk_d.get('queue_depth', 0)}/"
f"{bulk_d.get('queue_capacity', '?')} "
f"({snap.get('bulk_queue_usage_pct', 0)}%)")
breakdown = ssa.get("latency_breakdown", {})
put_entries = {k: v for k, v in breakdown.items() if k.startswith("PUT")}
if put_entries:
print("")
print(" Latency Breakdown (PUT endpoints):")
print(f" {'Endpoint':<30} {'Total':>8} {'DB%':>6} {'Search%':>9} {'Internal%':>11}")
for ep, bd in sorted(put_entries.items()):
ep_short = ep.replace("PUT ", "")[:28]
print(f" {ep_short:<30} {bd['avg_total_ms']:>7.0f}ms "
f"{bd['db_pct']:>5.1f}% {bd['search_pct']:>8.1f}% "
f"{bd['internal_pct']:>10.1f}%")
bottlenecks = ssa.get("bottlenecks", [])
if bottlenecks:
print("")
primary = bottlenecks[0]
print(f" BOTTLENECK: {primary}")
print("")
config_summary = sizing.get("config_summary", [])
if config_summary:
print("RECOMMENDED CONFIGURATION:")
for env_line in config_summary:
print(f" export {env_line}")
print("")
print(f"Full report: {output_path}")
print(f"Total time: {overall['total_wall_clock_s']:.1f}s")
print_summary_table(report)
print("")
print("Collected IDs for linking:")
print(f" Table IDs: {len(collected_table_ids)}")
print(f" Dashboard IDs: {len(collected_dashboard_ids)}")
print(f" Pipeline IDs: {len(collected_pipeline_ids)}")
print(f" Test Case FQNs: {len(collected_test_case_fqns)}")
PYEOF
echo ""
echo "Test data loaded. You can now trigger reindexing with: ./scripts/trigger-reindex.sh"