diff --git a/README.md b/README.md index be9f7f7..97a14d4 100644 --- a/README.md +++ b/README.md @@ -46,4 +46,4 @@ example: ./run_pareto_analysis.sh 3cNWY5 wiki10m Serve the webui on port 8000: - cd web-ui-new; python3 -m http.server + cd web-ui; python3 -m http.server diff --git a/generate-combinations.py b/generate-combinations.py index fbb93dc..0d4de98 100644 --- a/generate-combinations.py +++ b/generate-combinations.py @@ -59,68 +59,59 @@ else: algo_variants[param] = value - # Generate all combination of variants. For each combination, generate a hashed ID, and a file with the - # name pattern as --.json. The file should contain the invariants as is, and the variants as the current combination. if algo_variants: - # Separate efSearch from other variants if it exists efSearch_values = None + efSearchScaleFactor_values = None other_variant_keys = [] other_variant_values = [] - + for key, value in algo_variants.items(): if key == 'efSearch': efSearch_values = value + elif key == 'efSearchScaleFactor': + efSearchScaleFactor_values = value else: other_variant_keys.append(key) other_variant_values.append(value) - - # Generate combinations with efSearch at the beginning (innermost loop) - if efSearch_values and other_variant_keys: - # Generate combinations of other parameters first + + if (efSearch_values or efSearchScaleFactor_values) and other_variant_keys: for other_combination in itertools.product(*other_variant_values): other_variants = dict(zip(other_variant_keys, other_combination)) - # Then iterate through efSearch values - for ef_index, ef_value in enumerate(efSearch_values): + search_values = efSearch_values if efSearch_values else efSearchScaleFactor_values + search_key = 'efSearch' if efSearch_values else 'efSearchScaleFactor' + for ef_index, ef_value in enumerate(search_values): current_variants = other_variants.copy() - current_variants['efSearch'] = ef_value - - # Skip if cagraIntermediateDegree < cagraGraphDegree + current_variants[search_key] = ef_value + if 'cagraIntermediateDegree' in current_variants and 'cagraGraphDegree' in current_variants: if current_variants['cagraIntermediateDegree'] < current_variants['cagraGraphDegree']: - print(f"\t\tSkipping combination: cagraIntermediateDegree ({current_variants['cagraIntermediateDegree']}) < cagraGraphDegree ({current_variants['cagraGraphDegree']})") continue - - # Skip if hnswMaxConn > hnswBeamWidth + if 'hnswMaxConn' in current_variants and 'hnswBeamWidth' in current_variants: if current_variants['hnswMaxConn'] > current_variants['hnswBeamWidth']: - print(f"\t\tSkipping combination: hnswMaxConn ({current_variants['hnswMaxConn']}) > hnswBeamWidth ({current_variants['hnswBeamWidth']})") continue - - # Generate hash only from other_variants (excluding efSearch) + base_hash = hashlib.md5(json.dumps(other_variants, sort_keys=True).encode()).hexdigest()[:8] - hash_id = f"{base_hash}-ef{ef_value}" - + hash_id = f"{base_hash}-ef{ef_value}" if search_key == 'efSearch' else f"{base_hash}-efs{ef_value}" + config = algo_invariants.copy() config.update(current_variants) - - # For multiple efSearch combinations: subsequent ones skip indexing - if len(efSearch_values) > 1 and ef_index > 0: + + if len(search_values) > 1 and ef_index > 0: config['skipIndexing'] = True - - # Set cleanIndexDirectory based on position + if ef_index == 0: config['cleanIndexDirectory'] = False - elif ef_index == len(efSearch_values) - 1: + elif ef_index == len(search_values) - 1: config['cleanIndexDirectory'] = True else: config['cleanIndexDirectory'] = False - - # Use base_hash for index directory paths + if 'hnswIndexDirPath' in config: config['hnswIndexDirPath'] = f"hnswIndex-{base_hash}" if 'cuvsIndexDirPath' in config: config['cuvsIndexDirPath'] = f"cuvsIndex-{base_hash}" - + filename = f"{algo}-{hash_id}.json" sweep_dir = f"{args.configs_dir}/{sweep}" filepath = f"{sweep_dir}/{filename}" @@ -128,35 +119,32 @@ with open(filepath, 'w') as f: json.dump(config, f, indent=2) print(f"\tGenerated config file: {filepath}") - elif efSearch_values: - # Only efSearch values, no other variants - for ef_index, ef_value in enumerate(efSearch_values): - current_variants = {'efSearch': ef_value} - # Generate hash from empty dict since no other variants exist + elif efSearch_values or efSearchScaleFactor_values: + search_values = efSearch_values if efSearch_values else efSearchScaleFactor_values + search_key = 'efSearch' if efSearch_values else 'efSearchScaleFactor' + for ef_index, ef_value in enumerate(search_values): + current_variants = {search_key: ef_value} base_hash = hashlib.md5(json.dumps({}, sort_keys=True).encode()).hexdigest()[:8] - hash_id = f"{base_hash}-ef{ef_value}" - + hash_id = f"{base_hash}-ef{ef_value}" if search_key == 'efSearch' else f"{base_hash}-efs{ef_value}" + config = algo_invariants.copy() config.update(current_variants) - - # For multiple efSearch combinations: subsequent ones skip indexing - if len(efSearch_values) > 1 and ef_index > 0: + + if len(search_values) > 1 and ef_index > 0: config['skipIndexing'] = True - - # Set cleanIndexDirectory based on position + if ef_index == 0: config['cleanIndexDirectory'] = False - elif ef_index == len(efSearch_values) - 1: + elif ef_index == len(search_values) - 1: config['cleanIndexDirectory'] = True else: config['cleanIndexDirectory'] = False - - # Use base_hash for index directory paths + if 'hnswIndexDirPath' in config: config['hnswIndexDirPath'] = f"hnswIndex-{base_hash}" if 'cuvsIndexDirPath' in config: config['cuvsIndexDirPath'] = f"cuvsIndex-{base_hash}" - + filename = f"{algo}-{hash_id}.json" sweep_dir = f"{args.configs_dir}/{sweep}" filepath = f"{sweep_dir}/{filename}" @@ -165,26 +153,21 @@ json.dump(config, f, indent=2) print(f"\tGenerated config file: {filepath}") else: - # No efSearch, use original logic variant_keys = list(algo_variants.keys()) variant_values = list(algo_variants.values()) for combination in itertools.product(*variant_values): current_variants = dict(zip(variant_keys, combination)) - - # Skip if cagraIntermediateDegree < cagraGraphDegree + if 'cagraIntermediateDegree' in current_variants and 'cagraGraphDegree' in current_variants: if current_variants['cagraIntermediateDegree'] < current_variants['cagraGraphDegree']: - print(f"\t\tSkipping combination: cagraIntermediateDegree ({current_variants['cagraIntermediateDegree']}) < cagraGraphDegree ({current_variants['cagraGraphDegree']})") continue - - # Skip if hnswMaxConn > hnswBeamWidth + if 'hnswMaxConn' in current_variants and 'hnswBeamWidth' in current_variants: if current_variants['hnswMaxConn'] > current_variants['hnswBeamWidth']: - print(f"\t\tSkipping combination: hnswMaxConn ({current_variants['hnswMaxConn']}) > hnswBeamWidth ({current_variants['hnswBeamWidth']})") continue - + hash_id = hashlib.md5(json.dumps(current_variants, sort_keys=True).encode()).hexdigest()[:8] - + config = algo_invariants.copy() config.update(current_variants) filename = f"{algo}-{hash_id}.json" @@ -194,6 +177,16 @@ with open(filepath, 'w') as f: json.dump(config, f, indent=2) print(f"\tGenerated config file: {filepath}") - - + else: + hash_id = hashlib.md5(json.dumps({}, sort_keys=True).encode()).hexdigest()[:8] + config = algo_invariants.copy() + filename = f"{algo}-{hash_id}.json" + sweep_dir = f"{args.configs_dir}/{sweep}" + filepath = f"{sweep_dir}/{filename}" + os.makedirs(sweep_dir, exist_ok=True) + with open(filepath, 'w') as f: + json.dump(config, f, indent=2) + print(f"\tGenerated config file: {filepath}") + + print("----------------------") diff --git a/plot_pareto.py b/plot_pareto.py index a481b23..cec4da8 100755 --- a/plot_pareto.py +++ b/plot_pareto.py @@ -126,12 +126,16 @@ def create_plot_search( # Sorting by mean y-value helps aligning plots with labels def mean_y(algo): points = np.array(all_data[algo], dtype=object) + if len(points) == 0 or points.ndim < 2: + return float('inf') return -np.log(np.array(points[:, 3], dtype=np.float32)).mean() # Find range for logit x-scale min_x, max_x = 1, 0 for algo in sorted(all_data.keys(), key=mean_y): points = np.array(all_data[algo], dtype=object) + if len(points) == 0 or points.ndim < 2: + continue xs = points[:, 2] ys = points[:, 3] min_x = min([min_x] + [x for x in xs if x > 0]) @@ -226,10 +230,14 @@ def create_plot_build( # Sorting by mean y-value helps aligning plots with labels def mean_y(algo): points = np.array(search_results[algo], dtype=object) + if len(points) == 0 or points.ndim < 2: + return float('inf') return -np.log(np.array(points[:, 3], dtype=np.float32)).mean() for pos, algo in enumerate(sorted(search_results.keys(), key=mean_y)): points = np.array(search_results[algo], dtype=object) + if len(points) == 0 or points.ndim < 2: + continue # x is recall, ls is algo_name, idxs is index_name xs = points[:, 2] ls = points[:, 0] @@ -279,33 +287,32 @@ def mean_y(algo): df = pd.DataFrame(data, index=index) df.replace(0.0, np.nan, inplace=True) df = df.dropna(how="all") + + if df.empty or df.shape[1] == 0: + print(f"Skipping build plot: no data points in recall buckets >= 80%") + return + plt.figure(figsize=(12, 9)) ax = df.plot.bar(rot=0, color=colors) fig = ax.get_figure() - # Add speedup annotations if 'LUCENE_HNSW' in df.columns and 'CAGRA_HNSW' in df.columns: y_max = ax.get_ylim()[1] - for i, bucket in enumerate(df.index): lucene_time = df.loc[bucket, 'LUCENE_HNSW'] cagra_time = df.loc[bucket, 'CAGRA_HNSW'] - if pd.notna(lucene_time) and pd.notna(cagra_time) and lucene_time > 0 and cagra_time > 0: speedup = lucene_time / cagra_time - # Position annotations just above the bars, below subtitle ax.text(i, y_max * 0.98, f'{speedup:.1f}x', ha='center', va='bottom', fontsize=9, fontweight='bold', bbox=dict(boxstyle='round,pad=0.2', facecolor='white', alpha=0.9, edgecolor='gray')) print(f"writing build output to {fn_out}") - plt.title( - "Average Build Time within Recall Range " - f"for k={k} n_queries={n_queries}" - ) + plt.title(f"Average Build Time within Recall Range for k={k} n_queries={n_queries}") plt.suptitle(f"{dataset}") plt.ylabel("Build Time (s)") fig.savefig(fn_out) + plt.close() def load_lines(results_path, result_files, method, index_key, mode, time_unit): diff --git a/pom.xml b/pom.xml index eed8e93..ac2acb6 100644 --- a/pom.xml +++ b/pom.xml @@ -16,14 +16,6 @@ UTF-8 - - - searchscale-maven - SearchScale Maven - https://maven.searchscale.com/snapshots - - - @@ -37,15 +29,15 @@ - - com.nvidia.cuvs.lucene - cuvs-lucene - 25.10.0-33318-SNAPSHOT - + + com.nvidia.cuvs.lucene + cuvs-lucene + 25.10.0 + com.nvidia.cuvs cuvs-java - 25.10.0-55985-SNAPSHOT + 25.10.0 org.apache.lucene diff --git a/run_queries.py b/run_queries.py index e494a92..e508511 100644 --- a/run_queries.py +++ b/run_queries.py @@ -18,8 +18,8 @@ def parse_arguments(): help='Number of warmup queries before measurements (default: 20)') parser.add_argument('--top-k', type=int, default=100, help='Number of nearest neighbors to retrieve (default: 100)') - parser.add_argument('--ef-search', type=int, default=800, - help='Overfetch parameter for topK (default: 800)') + parser.add_argument('--ef-search-scale-factor', type=float, default=2.0, + help='efSearch scale factor (efSearch = efSearchScaleFactor * topK, default: 2.0)') # File paths parser.add_argument('--query-file', type=str, default='data/queries.fbin', @@ -38,29 +38,62 @@ def parse_arguments(): help='Name of the vector field in Solr (default: article_vector)') # Other options - parser.add_argument('--timeout', type=int, default=30, - help='Request timeout in seconds (default: 30)') + parser.add_argument('--timeout', type=int, default=120, + help='Request timeout in seconds (default: 120)') parser.add_argument('--skip-queries', type=int, default=0, help='Number of queries to skip from the beginning of the file (default: 0)') return parser.parse_args() def read_query_vectors(filename, num_to_read, skip=0): - """Read query vectors from fbin file""" - with open(filename, 'rb') as f: - num_vectors = struct.unpack('I', f.read(4))[0] - dim = struct.unpack('I', f.read(4))[0] - - # Skip vectors if needed - if skip > 0: - f.seek(8 + skip * dim * 4) # 8 bytes header + skip vectors - - vectors = [] - for i in range(min(num_to_read, num_vectors - skip)): - vector = np.frombuffer(f.read(dim * 4), dtype=np.float32) - vectors.append(vector) - - return vectors + """Read query vectors from .fbin or .fvecs file""" + is_fbin = filename.endswith('.fbin') + is_fvecs = filename.endswith('.fvecs') or filename.endswith('.fvecs.gz') + + if is_fbin: + with open(filename, 'rb') as f: + num_vectors = struct.unpack(' 0: + f.seek(8 + skip * dim * 4) + + vectors = [] + for i in range(min(num_to_read, num_vectors - skip)): + vector = np.frombuffer(f.read(dim * 4), dtype=np.float32) + vectors.append(vector) + + return vectors + elif is_fvecs: + import gzip + open_func = gzip.open if filename.endswith('.gz') else open + + with open_func(filename, 'rb') as f: + vectors = [] + skipped = 0 + + while len(vectors) < num_to_read: + dim_bytes = f.read(4) + if len(dim_bytes) < 4: + break + + dim = struct.unpack(' 0: f.seek(8 + skip * gt_k * 4) @@ -82,15 +114,13 @@ def read_ground_truth(neighbors_file, num_queries, skip=0, k=100): def perform_knn_query_with_timing(query_vector, topK, args): """Perform single KNN query and measure latency""" url = f"{args.solr_url}/solr/{args.collection}/select?omitHeader=true" - - # Convert vector to string format for Solr vector_str = "[" + ",".join(map(str, query_vector)) + "]" payload = { "fields": "id,score", "query": {"lucene": { "df": "name", - "query": "{!knn f=" + args.vector_field + " topK=" + str(args.ef_search) + "}" + vector_str + "query": "{!knn f=" + args.vector_field + " topK=" + str(topK) + " efSearchScaleFactor=" + str(args.ef_search_scale_factor) + "}" + vector_str } }, "limit": topK @@ -100,16 +130,13 @@ def perform_knn_query_with_timing(query_vector, topK, args): } try: - # Measure latency start_time = time.perf_counter() response = requests.request("GET", url, json=payload, headers=headers, timeout=args.timeout) end_time = time.perf_counter() - latency_ms = (end_time - start_time) * 1000 # Convert to milliseconds + latency_ms = (end_time - start_time) * 1000 response_data = response.json() - - # Extract IDs from response docs = response_data.get('response', {}).get('docs', []) retrieved_ids = [int(doc['id']) for doc in docs] @@ -136,30 +163,23 @@ def run_warmup_queries(query_vectors, num_warmup, args): _, _ = perform_knn_query_with_timing(vector, topK=args.top_k, args=args) def main(): - # Parse command line arguments args = parse_arguments() - # Read all query vectors (warmup + test) total_vectors_needed = args.warmup_queries + args.num_queries all_query_vectors = read_query_vectors(args.query_file, total_vectors_needed, skip=args.skip_queries) - # Split into warmup and test vectors warmup_vectors = all_query_vectors[:args.warmup_queries] test_vectors = all_query_vectors[args.warmup_queries:] - # Read ground truth (only for test queries, skip warmup) ground_truth_all = read_ground_truth(args.neighbors_file, num_queries=args.num_queries, skip=args.warmup_queries + args.skip_queries, k=args.top_k) - # Run warmup queries run_warmup_queries(warmup_vectors, args.warmup_queries, args) - # Process test queries and measure latency recalls = [] latencies = [] for i, (query_vector, ground_truth) in enumerate(zip(test_vectors, ground_truth_all)): - # Perform query with timing retrieved_ids, latency_ms = perform_knn_query_with_timing(query_vector, topK=args.top_k, args=args) if len(retrieved_ids) > 0 and latency_ms is not None: @@ -167,31 +187,26 @@ def main(): recalls.append(recall) latencies.append(latency_ms) else: - recalls.append(0.0) # Failed query + recalls.append(0.0) - # Calculate and report only the two metrics avg_recall = np.mean(recalls) * 100 mean_latency = np.mean(latencies) print(f"Average Recall@{args.top_k}: {avg_recall:.2f}%") print(f"Mean Latency: {mean_latency:.2f}ms") - # Read existing results file if it exists, otherwise create new structure try: with open(args.output_file, "r") as f: results = json.load(f) except FileNotFoundError: results = {} - # Ensure metrics section exists if "metrics" not in results: results["metrics"] = {} - # Add the new metrics results["metrics"]["recall-accuracy"] = avg_recall results["metrics"]["mean-latency"] = mean_latency - # Save results to output file with open(args.output_file, "w") as f: json.dump(results, f, indent=2) diff --git a/run_sweep.sh b/run_sweep.sh index 8d768fb..4313c9a 100755 --- a/run_sweep.sh +++ b/run_sweep.sh @@ -1,5 +1,10 @@ #!/bin/bash + +# Always run from the repo root (directory of this script) +SCRIPT_DIR="$(cd "$(dirname "${BASH_SOURCE[0]}")" && pwd)" +cd "$SCRIPT_DIR" || exit 1 + # Parse command-line arguments while getopts ":-:" opt; do case $OPTARG in @@ -31,7 +36,6 @@ done DATA_DIR=${DATA_DIR:-datasets} DATASETS_FILE=${DATASETS_FILE:-datasets.json} MODE=${MODE:-lucene} -SWEEPS_FILE=${SWEEPS_FILE:-sweeps.json} CONFIGS_DIR=${CONFIGS_DIR:-configs} TIMESTAMP=$(date +"%Y%m%d_%H%M%S") RESULTS_DIR=${RESULTS_DIR:-results} @@ -43,9 +47,13 @@ if [ "$MODE" != "lucene" ] && [ "$MODE" != "solr" ]; then exit 1 fi -# Set mode-specific defaults -if [ "$MODE" = "solr" ]; then - SWEEPS_FILE=${SWEEPS_FILE:-solr-sweeps.json} +# Set mode-specific defaults for sweeps file +if [ -z "$SWEEPS_FILE" ]; then + if [ "$MODE" = "solr" ]; then + SWEEPS_FILE="solr-sweeps.json" + else + SWEEPS_FILE="sweeps.json" + fi fi BENCHMARKID=$(head /dev/urandom | tr -dc A-Za-z0-9 | head -c 6) @@ -155,9 +163,10 @@ if [ "$RUN_BENCHMARKS" = "true" ]; then echo "$SWEEP_NAME/$CONFIG_NAME: FAILED" >> "$SUMMARY_FILE" fi elif [ "$MODE" = "solr" ]; then - # Extract dataset name from sweeps file - DATASET_NAME=$(jq -r '.wiki10m.dataset' "$SWEEPS_FILE") - BATCHES_DIR="${DATASET_NAME}_batches" + # Extract dataset name from sweeps file (use the first key in the sweeps file) + DATASET_NAME=$(jq -r "keys[0]" "$SWEEPS_FILE") + DATASET_FROM_SWEEP=$(jq -r ".[\"$DATASET_NAME\"].dataset" "$SWEEPS_FILE") + BATCHES_DIR="${DATASET_FROM_SWEEP}_batches" SOLR_UPDATE_URL="http://localhost:8983/solr/test/update?commit=true&overwrite=false" # Run Solr benchmark using the new format: config batches_dir solr_url results_dir @@ -181,13 +190,13 @@ if [ "$RUN_BENCHMARKS" = "true" ]; then index_hash=$(echo "$CONFIG_NAME" | sed -E 's/.*-([a-f0-9]{8})(-.+)?$/\1/') if [ ${#index_hash} -eq 8 ]; then algo=$(jq -r '.algoToRun' "$config_file") - if [ "$algo" = "LUCENE_HNSW" ] && ! jq -e '.metrics["cuvs-indexing-time"]' "$results_file" >/dev/null 2>&1; then + if [ "$algo" = "LUCENE_HNSW" ] && ! jq -e '.metrics["hnsw-total-time-ms"]' "$results_file" >/dev/null 2>&1; then metric_type="cuvs" - elif [ "$algo" = "CAGRA_HNSW" ] && ! jq -e '.metrics["cuvs-indexing-time"]' "$results_file" >/dev/null 2>&1; then + elif [ "$algo" = "CAGRA_HNSW" ] && ! jq -e '.metrics["cuvs-total-time-ms"]' "$results_file" >/dev/null 2>&1; then metric_type="cuvs" - elif [ "$algo" = "hnsw" ] && ! jq -e '.metrics["cuvs-indexing-time"]' "$results_file" >/dev/null 2>&1; then + elif [ "$algo" = "hnsw" ] && ! jq -e '.metrics["solr-total-indexing-time-ms"]' "$results_file" >/dev/null 2>&1; then metric_type="cuvs" - elif [ "$algo" = "cagra_hnsw" ] && ! jq -e '.metrics["cuvs-indexing-time"]' "$results_file" >/dev/null 2>&1; then + elif [ "$algo" = "cagra_hnsw" ] && ! jq -e '.metrics["solr-total-indexing-time-ms"]' "$results_file" >/dev/null 2>&1; then metric_type="cuvs" else metric_type="" diff --git a/solr-benchmarks.sh b/solr-benchmarks.sh index 9b357a6..743949e 100755 --- a/solr-benchmarks.sh +++ b/solr-benchmarks.sh @@ -14,6 +14,8 @@ JAVABIN_FILES_DIR="$2" URL="$3" RESULTS_DIR="$4" +mkdir -p "$RESULTS_DIR" + # Check if config file exists if [ ! -f "$CONFIG_FILE" ]; then echo "Error: Config file '$CONFIG_FILE' not found" @@ -29,7 +31,7 @@ fi # Extract variables from config file using jq VECTOR_DIMENSION=$(jq -r '.vectorDimension' "$CONFIG_FILE") KNN_ALGORITHM=$(jq -r '.algoToRun' "$CONFIG_FILE") -EF_SEARCH=$(jq -r '.efSearch' "$CONFIG_FILE") +EF_SEARCH_SCALE_FACTOR=$(jq -r '.efSearchScaleFactor' "$CONFIG_FILE") WARMUP_QUERIES=$(jq -r '.numWarmUpQueries' "$CONFIG_FILE") TOTAL_QUERIES=$(jq -r '.numQueriesToRun' "$CONFIG_FILE") QUERY_FILE=$(jq -r '.queryFile' "$CONFIG_FILE") @@ -45,9 +47,6 @@ BEAM_WIDTH=$(jq -r 'if .hnswBeamWidth == null or .hnswBeamWidth == "null" then " CLEAN_INDEX_DIRECTORY=$(jq -r 'if has("cleanIndexDirectory") then .cleanIndexDirectory else true end' "$CONFIG_FILE") SKIP_INDEXING=$(jq -r 'if has("skipIndexing") then .skipIndexing else false end' "$CONFIG_FILE") -echo "DEBUG: CLEAN_INDEX_DIRECTORY=$CLEAN_INDEX_DIRECTORY" -echo "DEBUG: SKIP_INDEXING=$SKIP_INDEXING" - # Extract Solr URL from the update URL parameter SOLR_URL=$(echo "$URL" | sed 's|/solr/.*||') @@ -55,20 +54,191 @@ SOLR_URL=$(echo "$URL" | sed 's|/solr/.*||') DATA_DIR="data" DATASET_FILENAME="wiki_all_10M.tar" SOLR_GITHUB_REPO="https://github.com/apache/solr.git" -SOLR_ROOT=solr-10.0.0-SNAPSHOT -NPARALLEL=8 +SOLR_ROOT=solr-11.0.0-SNAPSHOT +NPARALLEL=${NPARALLEL:-4} SIMILARITY_FUNCTION=${SIMILARITY_FUNCTION:-euclidean} RAM_BUFFER_SIZE_MB=${RAM_BUFFER_SIZE_MB:-20000} +SOLR_HEAP=${SOLR_HEAP:-16G} + +METRICS_PID="" + +wait_for_solr() { + local url="$1" + local retries="${2:-60}" + for _ in $(seq 1 "$retries"); do + if curl -sSf "${url}/solr/admin/info/system?wt=json" >/dev/null 2>&1; then + return 0 + fi + sleep 1 + done + return 1 +} + +start_metrics() { + if [ -n "${METRICS_PID:-}" ]; then + return + fi + + mkdir -p "$RESULTS_DIR" + SOLR_URL="$SOLR_URL" RESULTS_DIR="$RESULTS_DIR" python3 -u << 'METRICS_EOF' > "$RESULTS_DIR/metrics.log" 2>&1 & +import urllib.request +import json +import time +import signal +import sys +import math +import os + +solr_url = os.environ["SOLR_URL"] +results_dir = os.environ["RESULTS_DIR"] +start_time = int(time.time() * 1000) +memory_samples = [] +cpu_samples = [] + +PREFERRED_SCOPE = "io.opentelemetry.runtime-telemetry-java17" + +def save_metrics(): + import os + os.makedirs(results_dir, exist_ok=True) + with open(f"{results_dir}/memory_metrics.json", "w") as f: + json.dump({"memory_samples": memory_samples}, f, indent=2) + with open(f"{results_dir}/cpu_metrics.json", "w") as f: + json.dump({"cpu_samples": cpu_samples}, f, indent=2) + +def handle_signal(signum, frame): + save_metrics() + sys.exit(0) + +signal.signal(signal.SIGTERM, handle_signal) +signal.signal(signal.SIGINT, handle_signal) + +def parse_labels(label_str: str): + labels = {} + if not label_str: + return labels + parts = [] + cur = [] + in_quotes = False + for ch in label_str: + if ch == '"' and (not cur or cur[-1] != '\\'): + in_quotes = not in_quotes + if ch == ',' and not in_quotes: + parts.append(''.join(cur)) + cur = [] + else: + cur.append(ch) + if cur: + parts.append(''.join(cur)) + for p in parts: + if '=' not in p: + continue + k, v = p.split('=', 1) + labels[k.strip()] = v.strip().strip('"') + return labels + +def parse_prometheus(text: str): + points = [] + for line in text.splitlines(): + if not line or line[0] == '#': + continue + try: + left, val = line.rsplit(' ', 1) + except ValueError: + continue + if '{' in left: + name, rest = left.split('{', 1) + labels = parse_labels(rest.rstrip('}')) + else: + name, labels = left, {} + try: + value = float(val) + except ValueError: + continue + points.append((name, labels, value)) + return points + +def pick_scope(points): + scopes = set() + for _, labels, _ in points: + s = labels.get("otel_scope_name") + if s: + scopes.add(s) + if PREFERRED_SCOPE in scopes: + return PREFERRED_SCOPE + return next(iter(scopes), None) + +for _ in range(60): + try: + urllib.request.urlopen(f"{solr_url}/solr/admin/metrics?wt=prometheus", timeout=5) + break + except Exception: + time.sleep(1) +else: + print("Error: Solr metrics endpoint not ready", file=sys.stderr) + sys.exit(1) + +while True: + try: + elapsed = int(time.time() * 1000) - start_time + + sys_resp = urllib.request.urlopen(f"{solr_url}/solr/admin/info/system?wt=json", timeout=5) + sys_data = json.loads(sys_resp.read().decode("utf-8")) + heap_used = sys_data["jvm"]["memory"]["raw"]["used"] + heap_max = sys_data["jvm"]["memory"]["raw"]["max"] + + resp = urllib.request.urlopen(f"{solr_url}/solr/admin/metrics?wt=prometheus", timeout=10) + text = resp.read().decode("utf-8", errors="replace") + points = parse_prometheus(text) + scope = pick_scope(points) + + nonheap_used = 0.0 + proc_cpu = None + sys_cpu = None + + for name, labels, value in points: + if scope and labels.get("otel_scope_name") not in (None, scope): + continue + if name == "jvm_memory_used_bytes": + if labels.get("jvm_memory_type") == "non_heap": + nonheap_used += value + elif name == "jvm_cpu_recent_utilization_ratio": + proc_cpu = value + elif name == "jvm_system_cpu_utilization_ratio": + sys_cpu = value + + if proc_cpu is None or math.isnan(proc_cpu): + proc_cpu = 0.0 + if sys_cpu is None or math.isnan(sys_cpu): + sys_cpu = 0.0 + + memory_samples.append({ + "timestamp": elapsed, + "heapUsed": int(heap_used), + "heapMax": int(heap_max), + "offHeapUsed": int(nonheap_used), + }) + cpu_samples.append({ + "timestamp": elapsed, + "cpuUsagePercent": float(proc_cpu) * 100.0, + "systemCpuUsagePercent": float(sys_cpu) * 100.0, + }) + + time.sleep(2) + except Exception as e: + print(f"Metrics collection error: {e}", file=sys.stderr) + time.sleep(2) +METRICS_EOF + + METRICS_PID=$! + echo "Started metrics collection (PID: $METRICS_PID)" + trap "kill -TERM $METRICS_PID 2>/dev/null; wait $METRICS_PID 2>/dev/null" EXIT +} -# Only proceed with indexing if skipIndexing is false if [ "$SKIP_INDEXING" = "false" ]; then -# Generate configset files on the fly mkdir -p temp-configset -# Generate managed-schema if [ "$KNN_ALGORITHM" = "hnsw" ]; then - # For HNSW: Include hnswMaxConnections and hnswBeamWidth in fieldType cat > temp-configset/managed-schema << EOF @@ -88,7 +258,6 @@ if [ "$KNN_ALGORITHM" = "hnsw" ]; then EOF else - # For CAGRA_HNSW: Keep original format without HNSW-specific parameters cat > temp-configset/managed-schema << EOF @@ -108,9 +277,7 @@ else EOF fi -# Generate solrconfig.xml if [ "$KNN_ALGORITHM" = "hnsw" ]; then - # For HNSW: Remove codecFactory altogether cat > temp-configset/solrconfig.xml << EOF @@ -129,7 +296,7 @@ if [ "$KNN_ALGORITHM" = "hnsw" ]; then \${solr.autoCommit.maxTime:1500000} - false + true \${solr.autoSoftCommit.maxTime:1500000} @@ -147,7 +314,6 @@ if [ "$KNN_ALGORITHM" = "hnsw" ]; then EOF else - # For CAGRA_HNSW: Keep codecFactory as before cat > temp-configset/solrconfig.xml << EOF @@ -166,7 +332,7 @@ else \${solr.autoCommit.maxTime:1500000} - false + true \${solr.autoSoftCommit.maxTime:1500000} @@ -194,79 +360,163 @@ else EOF fi +pkill -9 java 2>/dev/null || true +sleep 2 + +if [ ! -f "$SOLR_ROOT.tgz" ]; then + echo "Error: Solr tarball '$SOLR_ROOT.tgz' not found" + exit 1 +fi +rm -rf $SOLR_ROOT +if ! tar -xf "$SOLR_ROOT.tgz"; then + echo "Error: Failed to extract Solr tarball" + exit 1 +fi -# Load cuvs module, start Solr -pkill -9 java; rm -rf $SOLR_ROOT -tar -xf $SOLR_ROOT.tgz cd $SOLR_ROOT -cp log4j2.xml solr-10.0.0-SNAPSHOT/server/resources/log4j2.xml -cp modules/cuvs/lib/*.jar server/solr-webapp/webapp/WEB-INF/lib/ -bin/solr start -m 29G +cp modules/cuvs/lib/*.jar server/solr-webapp/webapp/WEB-INF/lib/ 2>/dev/null || true +if ! bin/solr start -m "$SOLR_HEAP"; then + echo "Error: Failed to start Solr" + exit 1 +fi cd .. -# Create collection with dynamically generated configset -(cd temp-configset && zip -r - *) | curl -X POST --header "Content-Type:application/octet-stream" --data-binary @- "$SOLR_URL/solr/admin/configs?action=UPLOAD&name=cuvs" -curl "$SOLR_URL/solr/admin/collections?action=CREATE&name=test&numShards=1&collection.configName=cuvs" -# Create results file with configuration object +if ! wait_for_solr "$SOLR_URL" 180; then + echo "Error: Solr did not start on $SOLR_URL after 180 seconds" + ps aux | grep -i solr | grep -v grep || echo "No Solr process found" + exit 1 +fi + +(cd temp-configset && zip -r - *) | curl -X POST --header "Content-Type:application/octet-stream" --data-binary @- "$SOLR_URL/solr/admin/configs?action=UPLOAD&name=cuvs" >/dev/null 2>&1 + +curl "$SOLR_URL/solr/admin/collections?action=CREATE&name=test&numShards=1&collection.configName=cuvs" >/dev/null 2>&1 + +start_metrics + python3 << EOF import json import os -# Ensure results directory exists os.makedirs("$RESULTS_DIR", exist_ok=True) -# Read the config file and put it as-is into the configuration section with open("$CONFIG_FILE", "r") as config_file: config_data = json.load(config_file) -# Create initial results file with configuration and metrics results = { "configuration": config_data, "metrics": {} } -# Read javabin preparation time if available javabin_time_file = "$JAVABIN_FILES_DIR" + "_preparation_time.txt" if os.path.exists(javabin_time_file): with open(javabin_time_file, "r") as f: javabin_prep_time = int(f.read().strip()) - results["metrics"]["javabin-preparation-time"] = javabin_prep_time + results["metrics"]["javabin-generation-time-ms"] = javabin_prep_time with open("$RESULTS_DIR/results.json", "w") as f: json.dump(results, f, indent=2) EOF -start_time=$(date +%s%N) # Record start time in nanoseconds +start_time=$(date +%s%N) -# Loop through each file in the directory and post it in batches using Python python3 << EOF -import subprocess from pathlib import Path -from concurrent.futures import ThreadPoolExecutor +from concurrent.futures import ThreadPoolExecutor, as_completed +import requests +import time files = sorted(Path("$JAVABIN_FILES_DIR").glob("*")) -def upload(f): - print(f"Uploading {f}...") - subprocess.run(["http", "--ignore-stdin", "POST", "$URL", "Content-Type:application/javabin", f"@{f}"]) - print(f"Completed {f}") +failed_files = [] +total_upload_time_ms = 0.0 + +def upload(f, max_retries=3): + for attempt in range(max_retries): + try: + if attempt > 0: + wait_time = min((attempt + 1) * 5, 30) + print(f"Uploading {f.name}... (attempt {attempt + 1}/{max_retries}, waiting {wait_time}s)", flush=True) + time.sleep(wait_time) + else: + print(f"Uploading {f.name}... (attempt {attempt + 1}/{max_retries})", flush=True) + + batch_start = time.perf_counter() + with open(f, "rb") as fh: + r = requests.post( + "$URL", + data=fh, + headers={"Content-Type": "application/javabin"}, + timeout=600, + ) + r.raise_for_status() + batch_time = (time.perf_counter() - batch_start) * 1000 + + print(f"Completed {f.name} (size: {f.stat().st_size / 1024 / 1024:.1f}MB, time: {batch_time:.1f}ms)", flush=True) + time.sleep(0.5) + return batch_time + except requests.exceptions.HTTPError as e: + if e.response.status_code >= 500 and attempt < max_retries - 1: + continue + print(f"Failed to upload {f.name}: HTTP {e.response.status_code if hasattr(e, 'response') else 'unknown'}", flush=True) + return None + except requests.exceptions.ConnectionError as e: + if attempt < max_retries - 1: + continue + print(f"Failed to upload {f.name}: Connection error", flush=True) + return None + except Exception as e: + if attempt < max_retries - 1: + continue + print(f"Failed to upload {f.name}: {e}", flush=True) + return None + return None print(f"Starting batch upload with {$NPARALLEL} parallel workers") with ThreadPoolExecutor(max_workers=$NPARALLEL) as ex: - list(ex.map(upload, files)) -print("All uploads completed") + futures = {ex.submit(upload, f): f for f in files} + for future in as_completed(futures): + f = futures[future] + try: + batch_time = future.result() + if batch_time is None: + failed_files.append(f) + else: + total_upload_time_ms += batch_time + except Exception as e: + print(f"Exception for {f.name}: {e}", flush=True) + failed_files.append(f) + +if failed_files: + print(f"\nWarning: {len(failed_files)} files failed to upload:", flush=True) + for f in failed_files: + print(f" - {f.name}", flush=True) + exit(1) +else: + print(f"All uploads completed successfully", flush=True) + print(f"Total batch upload time: {total_upload_time_ms:.1f}ms", flush=True) + + import json + with open("$RESULTS_DIR/upload_timing.json", "w") as f: + json.dump({"total_batch_upload_time_ms": total_upload_time_ms}, f, indent=2) EOF -end_time=$(date +%s%N) # Record end time in nanoseconds +INDEXING_EXIT_CODE=$? + +if [ $INDEXING_EXIT_CODE -ne 0 ]; then + echo "Error: Indexing failed with exit code $INDEXING_EXIT_CODE" + exit $INDEXING_EXIT_CODE +fi + +end_time=$(date +%s%N) +duration=$(( (end_time - start_time) / 1000000 )) -duration=$(( (end_time - start_time) / 1000000 )) # Calculate duration in milliseconds -echo "Execution time: $duration ms" -echo "Done!" +export RESULTS_DIR +export DURATION=$duration -fi # End of skipIndexing check +fi + +start_metrics -# Run query benchmarks -echo "Running query benchmarks..." python3 run_queries.py \ - --ef-search $EF_SEARCH \ + --ef-search-scale-factor $EF_SEARCH_SCALE_FACTOR \ --warmup-queries $WARMUP_QUERIES \ --num-queries $TOTAL_QUERIES \ --query-file $QUERY_FILE \ @@ -276,35 +526,57 @@ python3 run_queries.py \ --vector-field $VECTOR_COL_NAME \ --top-k $TOP_K \ --output-file "$RESULTS_DIR/results.json" +QUERY_EXIT_CODE=$? + +if [ $QUERY_EXIT_CODE -ne 0 ]; then + echo "Error: Query benchmarks failed with exit code $QUERY_EXIT_CODE" + exit $QUERY_EXIT_CODE +fi -# Add indexing time to results.json only if indexing was performed if [ "$SKIP_INDEXING" = "false" ]; then - python3 -c " + python3 << 'EOF' import json -with open('$RESULTS_DIR/results.json', 'r+') as f: +import os + +results_dir = os.environ.get('RESULTS_DIR') +duration = int(os.environ.get('DURATION', '0')) + +with open(f'{results_dir}/results.json', 'r+') as f: results = json.load(f) if 'metrics' not in results: results['metrics'] = {} - results['metrics']['cuvs-indexing-time'] = $duration + + results['metrics']['solr-total-indexing-time-ms'] = duration + + upload_timing_file = f'{results_dir}/upload_timing.json' + if os.path.exists(upload_timing_file): + with open(upload_timing_file, 'r') as timing_file: + timing_data = json.load(timing_file) + results['metrics']['solr-batch-upload-time-ms'] = timing_data['total_batch_upload_time_ms'] + f.seek(0) json.dump(results, f, indent=2) f.truncate() -" +EOF + METRICS_EXIT_CODE=$? + if [ $METRICS_EXIT_CODE -ne 0 ]; then + echo "Warning: Failed to add timing metrics to results.json (non-fatal)" + fi +fi + +if [ -n "${METRICS_PID:-}" ]; then + kill -TERM $METRICS_PID 2>/dev/null || true + wait $METRICS_PID 2>/dev/null || true fi -# Cleanup rm -rf temp-configset -echo "DEBUG: About to check CLEAN_INDEX_DIRECTORY condition: '$CLEAN_INDEX_DIRECTORY'" if [ "$CLEAN_INDEX_DIRECTORY" = "true" ]; then - echo "DEBUG: CLEAN_INDEX_DIRECTORY is true, stopping and cleaning Solr" cd $SOLR_ROOT bin/solr stop -p 8983 cd .. rm -rf $SOLR_ROOT - echo "Stopped Solr and cleaned it up..." -else - echo "DEBUG: CLEAN_INDEX_DIRECTORY is false, preserving Solr for reuse" fi wait +exit 0 diff --git a/solr-setup.sh b/solr-setup.sh index e6daea3..a14a924 100755 --- a/solr-setup.sh +++ b/solr-setup.sh @@ -88,9 +88,8 @@ ALGORITHM_PARAMS=$(jq -r ".[\"$DATASET_NAME\"].algorithms[\"$ALGORITHM_NAME\"]" # Extract specific algorithm parameters with defaults if [ "$ALGORITHM_NAME" = "cagra_hnsw" ]; then CUVS_WRITER_THREADS=$(echo "$ALGORITHM_PARAMS" | jq -r '.cuvsWriterThreads // 16') - # Use first values from arrays for setup - INT_GRAPH_DEGREE=$(echo "$ALGORITHM_PARAMS" | jq -r '.cagraIntermediateGraphDegree[0] // 128') - GRAPH_DEGREE=$(echo "$ALGORITHM_PARAMS" | jq -r '.cagraGraphDegree[0] // 64') + INT_GRAPH_DEGREE=$(echo "$ALGORITHM_PARAMS" | jq -r 'if .cagraIntermediateGraphDegree | type == "array" then .cagraIntermediateGraphDegree[0] else .cagraIntermediateGraphDegree end // 128') + GRAPH_DEGREE=$(echo "$ALGORITHM_PARAMS" | jq -r 'if .cagraGraphDegree | type == "array" then .cagraGraphDegree[0] else .cagraGraphDegree end // 64') HNSW_LAYERS=$(echo "$ALGORITHM_PARAMS" | jq -r '.cagraHnswLayers // 1') fi @@ -108,7 +107,7 @@ JFG_GITHUB_URL="https://github.com/SearchScale/solr-javabin-generator.git" SOLR_DIR="solr" SOLR_GITHUB_REPO="https://github.com/apache/solr.git" SOLR_CUVS_MODULE_BRANCH="main" -SOLR_ROOT=solr-10.0.0-SNAPSHOT +SOLR_ROOT=solr-11.0.0-SNAPSHOT JAVABIN_FILES_DIR="${DATASET_FROM_SWEEP}_batches" SOLR_URL="http://localhost:8983" URL="$SOLR_URL/solr/test/update?commit=true&overwrite=false" @@ -143,15 +142,13 @@ if [ ! -d "$SOLR_DIR" ]; then cd .. fi -# Use the javabin file generator to generate javabin files rm -rf $JAVABIN_FILES_DIR if [ ! -d "$JAVABIN_FILES_DIR" ]; then - javabin_start_time=$(date +%s%N) # Record start time in nanoseconds + javabin_start_time=$(date +%s%N) java -jar $JFG_DIR/target/javabin-generator-1.0-SNAPSHOT-jar-with-dependencies.jar data_file=$DATASET_FILE output_dir=$JAVABIN_FILES_DIR batch_size=$BATCH_SIZE docs_count=$DOCS_COUNT threads=all - javabin_end_time=$(date +%s%N) # Record end time in nanoseconds - javabin_duration=$(( (javabin_end_time - javabin_start_time) / 1000000 )) # Calculate duration in milliseconds + javabin_end_time=$(date +%s%N) + javabin_duration=$(( (javabin_end_time - javabin_start_time) / 1000000 )) echo "JavaBin preparation time: $javabin_duration ms" - # Store the timing in a file for the benchmark script to read echo "$javabin_duration" > ${JAVABIN_FILES_DIR}_preparation_time.txt fi diff --git a/solr-sweeps.json b/solr-sweeps.json index 109a17e..c10b5d4 100644 --- a/solr-sweeps.json +++ b/solr-sweeps.json @@ -7,11 +7,11 @@ "numWarmUpQueries": 20, "batchSize": 500000, "topK": 100, - "efSearch": [ - 800, - 600, - 400, - 200 + "efSearchScaleFactor": [ + 8.0, + 6.0, + 4.0, + 2.0 ], "vectorColName": "article_vector", "cleanIndexDirectory": true diff --git a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java index 072a112..297be4c 100644 --- a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java +++ b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/LuceneCuvsBenchmarks.java @@ -10,9 +10,7 @@ import java.nio.file.Files; import java.nio.file.Path; import java.nio.file.Paths; -import java.text.SimpleDateFormat; import java.util.ArrayList; -import java.util.Calendar; import java.util.Collections; import java.util.LinkedHashMap; import java.util.List; @@ -62,7 +60,7 @@ import org.slf4j.LoggerFactory; import com.nvidia.cuvs.lucene.GPUKnnFloatVectorQuery; -import com.nvidia.cuvs.lucene.Lucene101AcceleratedHNSWCodec; +import com.nvidia.cuvs.lucene.Lucene99AcceleratedHNSWVectorsFormat; public class LuceneCuvsBenchmarks { @@ -80,7 +78,7 @@ private static void setPerThreadRAMLimit(IndexWriterConfig config, int limitMB) // First, try to find the field in IndexWriterConfig java.lang.reflect.Field field = null; Class clazz = config.getClass(); - + // Try to find the field in the class hierarchy while (clazz != null && field == null) { try { @@ -90,7 +88,7 @@ private static void setPerThreadRAMLimit(IndexWriterConfig config, int limitMB) clazz = clazz.getSuperclass(); } } - + if (field == null) { // If not found in IndexWriterConfig, try LiveIndexWriterConfig clazz = config.getClass().getSuperclass(); @@ -102,7 +100,7 @@ private static void setPerThreadRAMLimit(IndexWriterConfig config, int limitMB) } } } - + if (field != null) { field.setAccessible(true); field.setInt(config, limitMB); @@ -124,12 +122,12 @@ public static void main(String[] args) throws Throwable { } BenchmarkConfiguration config = Util.newObjectMapper().readValue(new File(args[0]), BenchmarkConfiguration.class); - + // Override benchmarkID if provided as command line argument if (args.length >= 2) { config.benchmarkID = args[1]; } - + // Override resultsDirectory if provided as command line argument if (args.length >= 3) { config.resultsDirectory = args[2]; @@ -138,6 +136,9 @@ public static void main(String[] args) throws Throwable { List queryResults = Collections.synchronizedList(new ArrayList()); config.debugPrintArguments(); + MetricsCollector metricsCollector = new MetricsCollector(); + metricsCollector.start(); + // [0] Pre-check Util.preCheck(config); @@ -228,8 +229,8 @@ public static void main(String[] args) throws Throwable { luceneHNSWWriterConfig.setMergePolicy(NoMergePolicy.INSTANCE); // Use reflection to bypass the 2048MB per-thread limit and set it to 10GB setPerThreadRAMLimit(luceneHNSWWriterConfig, 10240); // 10GB per thread - log.info("Configured HNSW writer - MaxBufferedDocs: {}, RAMBufferSizeMB: {}, PerThreadRAMLimit: {} MB", - config.flushFreq, luceneHNSWWriterConfig.getRAMBufferSizeMB(), + log.info("Configured HNSW writer - MaxBufferedDocs: {}, RAMBufferSizeMB: {}, PerThreadRAMLimit: {} MB", + config.flushFreq, luceneHNSWWriterConfig.getRAMBufferSizeMB(), luceneHNSWWriterConfig.getRAMPerThreadHardLimitMB()); IndexWriterConfig cuvsIndexWriterConfig = new IndexWriterConfig(new StandardAnalyzer()); @@ -244,7 +245,7 @@ public static void main(String[] args) throws Throwable { cuvsIndexWriterConfig.setMergePolicy(NoMergePolicy.INSTANCE); // Use reflection to bypass the 2048MB per-thread limit and set it to 10GB setPerThreadRAMLimit(cuvsIndexWriterConfig, 10240); // 10GB per thread - log.info("Configured CuVS writer - MaxBufferedDocs: {}, RAMBufferSizeMB: {}, PerThreadRAMLimit: {} MB", + log.info("Configured CuVS writer - MaxBufferedDocs: {}, RAMBufferSizeMB: {}, PerThreadRAMLimit: {} MB", config.flushFreq, cuvsIndexWriterConfig.getRAMBufferSizeMB(), cuvsIndexWriterConfig.getRAMPerThreadHardLimitMB()); @@ -259,8 +260,8 @@ public static void main(String[] args) throws Throwable { IndexWriter luceneHnswIndexWriter = null; IndexWriter cuvsIndexWriter = null; - - + + if (config.algoToRun.equalsIgnoreCase("LUCENE_HNSW")) { if (!config.createIndexInMemory) { Path hnswIndex = Path.of(config.hnswIndexDirPath); @@ -289,17 +290,17 @@ public static void main(String[] args) throws Throwable { } var formatName = writer.getConfig().getCodec().knnVectorsFormat().getName(); - - boolean isCuVSIndexing = formatName.equals("Lucene99AcceleratedHNSWVectorsFormat"); + + boolean isCuVSIndexing = "CAGRA_HNSW".equalsIgnoreCase(config.algoToRun); log.info("Indexing documents using {} ...", formatName); long indexStartTime = System.currentTimeMillis(); - indexDocuments(writer, config, titles, vectorProvider); + indexDocuments(writer, config, titles, vectorProvider, metrics, isCuVSIndexing); long indexTimeTaken = System.currentTimeMillis() - indexStartTime; if (isCuVSIndexing) { - metrics.put("cuvs-indexing-time", indexTimeTaken); + metrics.put("cuvs-total-time-ms", indexTimeTaken); } else { - metrics.put("hnsw-indexing-time", indexTimeTaken); + metrics.put("hnsw-total-time-ms", indexTimeTaken); } log.info("Time taken for index building (end to end): {} ms", indexTimeTaken); @@ -329,7 +330,7 @@ public static void main(String[] args) throws Throwable { writer == cuvsIndexWriter ? config.cuvsIndexDirPath : config.hnswIndexDirPath, e); } } - + Directory indexDir = MMapDirectory.open("CAGRA_HNSW".equals(config.algoToRun) ? Path.of(config.cuvsIndexDirPath) : Path.of(config.hnswIndexDirPath)); log.info("Index directory is: {} (using memory-mapped files)", indexDir); log.info("Querying documents using {} ...", config.algoToRun); @@ -339,34 +340,35 @@ public static void main(String[] args) throws Throwable { Util.calculateRecallAccuracy(queryResults, metrics, "CAGRA_HNSW".equalsIgnoreCase(config.algoToRun)); + metricsCollector.stop(); + String resultsJson = Util.newObjectMapper().writerWithDefaultPrettyPrinter() .writeValueAsString(Map.of("configuration", config, "metrics", metrics)); if (config.saveResultsOnDisk) { - // Use the resultsDirectory directly if provided String resultsDir = config.resultsDirectory != null ? config.resultsDirectory : "results"; File results = new File(resultsDir); if (!results.exists()) { results.mkdirs(); } - // Save results.json directly to the specified directory FileUtils.write( new File(results.toString() + "/results.json"), resultsJson, Charset.forName("UTF-8")); - - // Save CSV with neighbors data Util.writeCSV(queryResults, results.toString() + "/neighbors.csv"); - + + try { + metricsCollector.writeToFiles(resultsDir); + } catch (IOException e) { + log.error("Failed to write metrics files", e); + } + log.info("Results saved to directory: {}", resultsDir); } log.info("\n-----\nOverall metrics: " + metrics + "\nMetrics: \n" + resultsJson + "\n-----"); - - // Close the index directory before cleaning + indexDir.close(); - - // Clean index directory after benchmarks complete if requested if (config.cleanIndexDirectory && !config.createIndexInMemory) { Path indexPath = null; if (config.algoToRun.equalsIgnoreCase("LUCENE_HNSW")) { @@ -374,7 +376,7 @@ public static void main(String[] args) throws Throwable { } else if (config.algoToRun.equalsIgnoreCase("CAGRA_HNSW")) { indexPath = Path.of(config.cuvsIndexDirPath); } - + if (indexPath != null) { try { log.info("Cleaning index directory: {}", indexPath); @@ -386,6 +388,9 @@ public static void main(String[] args) throws Throwable { } } } finally { + if (metricsCollector != null) { + metricsCollector.stop(); + } if (vectorProvider != null) { vectorProvider.close(); } @@ -393,16 +398,18 @@ public static void main(String[] args) throws Throwable { } private static void indexDocuments(IndexWriter writer, BenchmarkConfiguration config, List titles, - VectorProvider vectorProvider) throws IOException, InterruptedException { + VectorProvider vectorProvider, Map metrics, boolean isCuVSIndexing) throws IOException, InterruptedException { int threads = config.numIndexThreads; ExecutorService pool = Executors.newFixedThreadPool(threads); AtomicInteger numDocsIndexed = new AtomicInteger(0); log.info("Starting indexing with {} threads.", threads); - log.info("IndexWriter config - MaxBufferedDocs: {}, RAMBufferSizeMB: {}", + log.info("IndexWriter config - MaxBufferedDocs: {}, RAMBufferSizeMB: {}", writer.getConfig().getMaxBufferedDocs(), writer.getConfig().getRAMBufferSizeMB()); final int numDocsToIndex = Math.min(config.numDocs, vectorProvider.size()); + long indexingStartTime = System.nanoTime(); + for (int i = 0; i < threads; i++) { pool.submit(() -> { int localCount = 0; @@ -441,16 +448,33 @@ private static void indexDocuments(IndexWriter writer, BenchmarkConfiguration co pool.shutdown(); pool.awaitTermination(Long.MAX_VALUE, TimeUnit.SECONDS); - // log.info("Calling forceMerge(1)."); - // writer.forceMerge(1); + long indexingEndTime = System.nanoTime(); + long indexingTimeMs = TimeUnit.NANOSECONDS.toMillis(indexingEndTime - indexingStartTime); + log.info("Calling commit."); + long serializationStartTime = System.nanoTime(); writer.commit(); + long serializationEndTime = System.nanoTime(); + long serializationTimeMs = TimeUnit.NANOSECONDS.toMillis(serializationEndTime - serializationStartTime); + writer.close(); + + if (isCuVSIndexing) { + metrics.put("cuvs-indexing-time-ms", indexingTimeMs); + metrics.put("cuvs-serialization-time-ms", serializationTimeMs); + log.info("CuVS indexing time (IndexWriter.addDocument): {} ms", indexingTimeMs); + log.info("CuVS serialization time (IndexWriter.commit): {} ms", serializationTimeMs); + } else { + metrics.put("hnsw-indexing-time-ms", indexingTimeMs); + metrics.put("hnsw-serialization-time-ms", serializationTimeMs); + log.info("HNSW indexing time (IndexWriter.addDocument): {} ms", indexingTimeMs); + log.info("HNSW serialization time (IndexWriter.commit): {} ms", serializationTimeMs); + } } private static void search(Directory directory, BenchmarkConfiguration config, boolean useCuVS, Map metrics, List queryResults, List groundTruth) { - + DB db = null; try (IndexReader indexReader = DirectoryReader.open(directory)) { IndexSearcher indexSearcher = new IndexSearcher(indexReader); @@ -504,7 +528,7 @@ private static void search(Directory directory, BenchmarkConfiguration config, b config.cagraSearchWidth); } else { int effectiveEfSearch = config.getEffectiveEfSearch(); - query = new KnnFloatVectorQuery(config.vectorColName, queryVector, effectiveEfSearch); + query = new KnnFloatVectorQuery(config.vectorColName, queryVector, effectiveEfSearch); } TopDocs topDocs; @@ -553,8 +577,8 @@ private static void search(Directory directory, BenchmarkConfiguration config, b double retrievalTimeTakenMs = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - retrievalStartTime); if (currentQueryId > config.numWarmUpQueries) { retrievalLatencies.put(queryId.get(), retrievalTimeTakenMs); - } - + } + // Debug: Log results for all queries log.info("Query " + currentQueryId + " - First 5 neighbors: " + neighbors.subList(0, Math.min(5, neighbors.size()))); log.info("Query " + currentQueryId + " - First 5 distances: " + scores.subList(0, Math.min(5, scores.size()))); @@ -564,7 +588,7 @@ private static void search(Directory directory, BenchmarkConfiguration config, b var s = useCuVS ? "lucene_cuvs" : "lucene_hnsw"; if (currentQueryId > config.numWarmUpQueries) { QueryResult result = new QueryResult(s, currentQueryId, neighbors, groundTruth.get(currentQueryId), scores, - searchTimeTakenMs); + searchTimeTakenMs); queryResults.add(result); } else { log.info("Skipping warmup query: {}", currentQueryId); @@ -616,15 +640,26 @@ public KnnVectorsFormat getKnnVectorsFormatForField(String field) { } private static Codec getCuVSCodec(BenchmarkConfiguration config) { - // Use Lucene101AcceleratedHNSWCodec with configurable parameters + // Use Lucene99AcceleratedHNSWVectorsFormat wrapped in a Codec // Constructor signature: (cuvsWriterThreads, intGraphDegree, graphDegree, hnswLayers, maxConn, beamWidth) - return new Lucene101AcceleratedHNSWCodec( - config.cuvsWriterThreads, - config.cagraIntermediateGraphDegree, - config.cagraGraphDegree, - config.cagraHnswLayers, - config.hnswMaxConn, - config.hnswBeamWidth); + // Use defaults for hnswMaxConn and hnswBeamWidth if not set (needed for HNSW graph construction) + int maxConn = config.hnswMaxConn > 0 ? config.hnswMaxConn : 16; + int beamWidth = config.hnswBeamWidth > 0 ? config.hnswBeamWidth : 100; + int hnswLayers = config.cagraHnswLayers > 0 ? config.cagraHnswLayers : 1; + + return new Lucene101Codec(Mode.BEST_SPEED) { + @Override + public KnnVectorsFormat getKnnVectorsFormatForField(String field) { + KnnVectorsFormat knnFormat = new Lucene99AcceleratedHNSWVectorsFormat( + config.cuvsWriterThreads, + config.cagraIntermediateGraphDegree, + config.cagraGraphDegree, + hnswLayers, + maxConn, + beamWidth); + return new HighDimensionKnnVectorsFormat(knnFormat, config.vectorDimension); + } + }; } // Removed ConfigurableCuVSCodec - using CuVSCPUSearchCodec directly with better error handling diff --git a/src/main/java/com/searchscale/lucene/cuvs/benchmarks/MetricsCollector.java b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/MetricsCollector.java new file mode 100644 index 0000000..65516f3 --- /dev/null +++ b/src/main/java/com/searchscale/lucene/cuvs/benchmarks/MetricsCollector.java @@ -0,0 +1,153 @@ +package com.searchscale.lucene.cuvs.benchmarks; + +import java.io.File; +import java.io.IOException; +import java.lang.management.ManagementFactory; +import java.lang.management.MemoryMXBean; +import java.util.ArrayList; +import java.util.List; +import java.util.concurrent.atomic.AtomicBoolean; + +import com.fasterxml.jackson.databind.ObjectMapper; +import com.sun.management.OperatingSystemMXBean; + +import org.apache.commons.io.FileUtils; +import org.slf4j.Logger; +import org.slf4j.LoggerFactory; + +public class MetricsCollector { + private static final Logger log = LoggerFactory.getLogger(MetricsCollector.class); + private static final int SAMPLE_INTERVAL_MS = 2000; + + private final AtomicBoolean running = new AtomicBoolean(false); + private Thread collectorThread; + private final List memorySamples = new ArrayList<>(); + private final List cpuSamples = new ArrayList<>(); + private final MemoryMXBean memoryBean = ManagementFactory.getMemoryMXBean(); + private final OperatingSystemMXBean osBean = + (OperatingSystemMXBean) ManagementFactory.getOperatingSystemMXBean(); + private final long startTime = System.currentTimeMillis(); + + public void start() { + if (running.get()) { + return; + } + + running.set(true); + collectorThread = new Thread(() -> { + while (running.get()) { + try { + collectSample(); + Thread.sleep(SAMPLE_INTERVAL_MS); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + break; + } catch (Exception e) { + log.warn("Error collecting metrics", e); + } + } + }); + + collectorThread.setDaemon(true); + collectorThread.start(); + log.info("Started metrics collection"); + } + + public void stop() { + if (!running.get()) { + return; + } + + running.set(false); + if (collectorThread != null) { + collectorThread.interrupt(); + try { + collectorThread.join(3000); + } catch (InterruptedException e) { + Thread.currentThread().interrupt(); + } + } + + log.info("Collected {} memory samples, {} CPU samples", + memorySamples.size(), cpuSamples.size()); + } + + private void collectSample() { + long elapsed = System.currentTimeMillis() - startTime; + + long heapUsed = memoryBean.getHeapMemoryUsage().getUsed(); + long heapMax = memoryBean.getHeapMemoryUsage().getMax(); + long nonHeapUsed = memoryBean.getNonHeapMemoryUsage().getUsed(); + memorySamples.add(new MemorySample(elapsed, heapUsed, heapMax, nonHeapUsed)); + + double processCpu = osBean.getProcessCpuLoad() * 100.0; + double systemCpu = osBean.getSystemCpuLoad() * 100.0; + if (Double.isNaN(processCpu)) processCpu = 0.0; + if (Double.isNaN(systemCpu)) systemCpu = 0.0; + cpuSamples.add(new CpuSample(elapsed, processCpu, systemCpu)); + } + + public void writeToFiles(String resultsDir) throws IOException { + File dir = new File(resultsDir); + if (!dir.exists()) { + dir.mkdirs(); + } + + ObjectMapper mapper = Util.newObjectMapper(); + + File memoryFile = new File(dir, "memory_metrics.json"); + String memoryJson = mapper.writerWithDefaultPrettyPrinter() + .writeValueAsString(new MemoryMetrics(memorySamples)); + FileUtils.write(memoryFile, memoryJson, "UTF-8"); + + File cpuFile = new File(dir, "cpu_metrics.json"); + String cpuJson = mapper.writerWithDefaultPrettyPrinter() + .writeValueAsString(new CpuMetrics(cpuSamples)); + FileUtils.write(cpuFile, cpuJson, "UTF-8"); + + log.info("Metrics written to {}", dir); + } + + static class MemorySample { + public long timestamp; + public long heapUsed; + public long heapMax; + public long offHeapUsed; + + public MemorySample(long timestamp, long heapUsed, long heapMax, long offHeapUsed) { + this.timestamp = timestamp; + this.heapUsed = heapUsed; + this.heapMax = heapMax; + this.offHeapUsed = offHeapUsed; + } + } + + static class CpuSample { + public long timestamp; + public double cpuUsagePercent; + public double systemCpuUsagePercent; + + public CpuSample(long timestamp, double cpuUsagePercent, double systemCpuUsagePercent) { + this.timestamp = timestamp; + this.cpuUsagePercent = cpuUsagePercent; + this.systemCpuUsagePercent = systemCpuUsagePercent; + } + } + + static class MemoryMetrics { + public List memory_samples; + + public MemoryMetrics(List samples) { + this.memory_samples = samples; + } + } + + static class CpuMetrics { + public List cpu_samples; + + public CpuMetrics(List samples) { + this.cpu_samples = samples; + } + } +} + diff --git a/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec b/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec deleted file mode 100644 index 1b1f3a6..0000000 --- a/src/main/resources/META-INF/services/org.apache.lucene.codecs.Codec +++ /dev/null @@ -1,3 +0,0 @@ -com.nvidia.cuvs.lucene.CuVS2510GPUSearchCodec -com.nvidia.cuvs.lucene.Lucene101AcceleratedHNSWCodec -org.apache.lucene.codecs.lucene101.Lucene101Codec \ No newline at end of file diff --git a/web-ui-new/index.html b/web-ui/index.html similarity index 100% rename from web-ui-new/index.html rename to web-ui/index.html diff --git a/web-ui-new/js/dashboard.js b/web-ui/js/dashboard.js similarity index 98% rename from web-ui-new/js/dashboard.js rename to web-ui/js/dashboard.js index b13f579..a25ab4f 100644 --- a/web-ui-new/js/dashboard.js +++ b/web-ui/js/dashboard.js @@ -181,7 +181,8 @@ class BenchmarkDashboard { const lines = summaryText.split('\n'); for (const line of lines) { - const match = line.match(/^([\w-]+\/[\w-]+):/); + // Match any path-like token before the first colon (e.g. "wiki10m/cagra_hnsw-b7fa14d3-efs2.0:") + const match = line.match(/^([^:]+):/); if (match) { const configPath = match[1].trim(); configs.push(configPath); @@ -205,23 +206,26 @@ class BenchmarkDashboard { if (metrics['recall-accuracy'] !== undefined) { recallKey = 'recall-accuracy'; - indexingTimeKey = 'cuvs-indexing-time'; + indexingTimeKey = 'solr-total-indexing-time-ms'; indexSizeKey = 'cuvs-index-size'; meanLatencyKey = 'mean-latency'; } else if (algoType === 'CAGRA_HNSW' || algoType === 'cagra_hnsw') { recallKey = 'cuvs-recall-accuracy'; - indexingTimeKey = 'cuvs-indexing-time'; + indexingTimeKey = 'cuvs-total-time-ms'; indexSizeKey = 'cuvs-index-size'; meanLatencyKey = 'hnsw-mean-latency'; } else if (algoType === 'LUCENE_HNSW' || algoType === 'hnsw') { recallKey = 'hnsw-recall-accuracy'; - indexingTimeKey = 'hnsw-indexing-time'; + indexingTimeKey = 'hnsw-total-time-ms'; indexSizeKey = 'hnsw-index-size'; meanLatencyKey = 'hnsw-mean-latency'; } else { recallKey = metrics['recall-accuracy'] !== undefined ? 'recall-accuracy' : metrics['cuvs-recall-accuracy'] !== undefined ? 'cuvs-recall-accuracy' : 'hnsw-recall-accuracy'; - indexingTimeKey = metrics['cuvs-indexing-time'] !== undefined ? 'cuvs-indexing-time' : 'hnsw-indexing-time'; + indexingTimeKey = metrics['cuvs-total-time-ms'] !== undefined ? 'cuvs-total-time-ms' : + metrics['hnsw-total-time-ms'] !== undefined ? 'hnsw-total-time-ms' : + metrics['solr-total-indexing-time-ms'] !== undefined ? 'solr-total-indexing-time-ms' : + metrics['cuvs-indexing-time'] !== undefined ? 'cuvs-indexing-time' : 'hnsw-indexing-time'; indexSizeKey = metrics['cuvs-index-size'] !== undefined ? 'cuvs-index-size' : 'hnsw-index-size'; meanLatencyKey = metrics['mean-latency'] !== undefined ? 'mean-latency' : 'hnsw-mean-latency'; } @@ -824,6 +828,7 @@ class BenchmarkDashboard { + `; diff --git a/web-ui-new/results b/web-ui/results similarity index 100% rename from web-ui-new/results rename to web-ui/results