Skip to content
Merged
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
121 changes: 92 additions & 29 deletions engine/base_client/search.py
Original file line number Diff line number Diff line change
Expand Up @@ -164,13 +164,26 @@ def cycling_query_generator(queries, total_count):

# Process queries with progress updates
results = []
total_insert_count = 0
total_search_count = 0
all_insert_latencies = []
all_search_latencies = []

for query in used_queries:
results.append(search_one(query))
if random.random() < insert_fraction:
precision, latency = insert_one(query)
total_insert_count += 1
all_insert_latencies.append(latency)
results.append(('insert', precision, latency))
else:
precision, latency = search_one(query)
total_search_count += 1
all_search_latencies.append(latency)
results.append(('search', precision, latency))
pbar.update(1)

# Close the progress bar
pbar.close()

total_time = time.perf_counter() - start
else:
# Dynamically calculate chunk size based on total_query_count
Expand Down Expand Up @@ -206,10 +219,19 @@ def cycling_query_generator(queries, total_count):

# Collect results from all worker processes
results = []
total_insert_count = 0
total_search_count = 0
all_insert_latencies = []
all_search_latencies = []
min_start_time = time.perf_counter()

for _ in processes:
proc_start_time, chunk_results = result_queue.get()
proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get()
results.extend(chunk_results)
total_insert_count += insert_count
total_search_count += search_count
all_insert_latencies.extend(insert_latencies)
all_search_latencies.extend(search_latencies)

# Update min_start_time if necessary
if proc_start_time < min_start_time:
Expand All @@ -222,24 +244,53 @@ def cycling_query_generator(queries, total_count):
for process in processes:
process.join()

# Extract precisions and latencies (outside the timed section)
precisions, latencies = zip(*results)
# Extract overall precisions and latencies
all_precisions = [result[1] for result in results]
all_latencies = [result[2] for result in results]

# Calculate search-only precisions (exclude inserts from precision calculation)
search_precisions = [result[1] for result in results if result[0] == 'search']

self.__class__.delete_client()

return {
# Overall metrics
"total_time": total_time,
"mean_time": np.mean(latencies),
"mean_precisions": np.mean(precisions),
"std_time": np.std(latencies),
"min_time": np.min(latencies),
"max_time": np.max(latencies),
"rps": len(latencies) / total_time,
"p50_time": np.percentile(latencies, 50),
"p95_time": np.percentile(latencies, 95),
"p99_time": np.percentile(latencies, 99),
"precisions": precisions,
"latencies": latencies,
"total_operations": len(all_latencies),
"rps": len(all_latencies) / total_time,

# Search metrics
"search_count": total_search_count,
"search_rps": total_search_count / total_time if total_search_count > 0 else 0,
"mean_search_time": np.mean(all_search_latencies) if all_search_latencies else 0,
"mean_search_precision": np.mean(search_precisions) if search_precisions else 0,
"p50_search_time": np.percentile(all_search_latencies, 50) if all_search_latencies else 0,
"p95_search_time": np.percentile(all_search_latencies, 95) if all_search_latencies else 0,
"p99_search_time": np.percentile(all_search_latencies, 99) if all_search_latencies else 0,

# Insert metrics
"insert_count": total_insert_count,
"insert_rps": total_insert_count / total_time if total_insert_count > 0 else 0,
"mean_insert_time": np.mean(all_insert_latencies) if all_insert_latencies else 0,
"p50_insert_time": np.percentile(all_insert_latencies, 50) if all_insert_latencies else 0,
"p95_insert_time": np.percentile(all_insert_latencies, 95) if all_insert_latencies else 0,
"p99_insert_time": np.percentile(all_insert_latencies, 99) if all_insert_latencies else 0,

# Mixed workload metrics
"actual_insert_fraction": total_insert_count / len(all_latencies) if len(all_latencies) > 0 else 0,
"target_insert_fraction": insert_fraction,

# Legacy compatibility (for existing code that expects these)
"mean_time": np.mean(all_latencies),
"mean_precisions": np.mean(search_precisions) if search_precisions else 1.0, # Only search precisions
"std_time": np.std(all_latencies),
"min_time": np.min(all_latencies),
"max_time": np.max(all_latencies),
"p50_time": np.percentile(all_latencies, 50),
"p95_time": np.percentile(all_latencies, 95),
"p99_time": np.percentile(all_latencies, 99),
"precisions": search_precisions, # Only search precisions
"latencies": all_latencies,
}

def setup_search(self):
Expand All @@ -259,6 +310,27 @@ def chunked_iterable(iterable, size):
while chunk := list(islice(it, size)):
yield chunk

def process_chunk(chunk, search_one, insert_one, insert_fraction):
results = []
insert_count = 0
search_count = 0
insert_latencies = []
search_latencies = []

for i, query in enumerate(chunk):
if random.random() < insert_fraction:
precision, latency = insert_one(query)
insert_count += 1
insert_latencies.append(latency)
results.append(('insert', precision, latency))
else:
precision, latency = search_one(query)
search_count += 1
search_latencies.append(latency)
results.append(('search', precision, latency))

return results, insert_count, search_count, insert_latencies, search_latencies

# Function to be executed by each worker process
def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0):
self.init_client(
Expand All @@ -270,16 +342,7 @@ def worker_function(self, distance, search_one, insert_one, chunk, result_queue,
self.setup_search()

start_time = time.perf_counter()
results = process_chunk(chunk, search_one, insert_one, insert_fraction)
result_queue.put((start_time, results))


def process_chunk(chunk, search_one, insert_one, insert_fraction):
results = []
for i, query in enumerate(chunk):
if random.random() < insert_fraction:
result = insert_one(query)
else:
result = search_one(query)
results.append(result)
return results
results, insert_count, search_count, insert_latencies, search_latencies = process_chunk(
chunk, search_one, insert_one, insert_fraction
)
result_queue.put((start_time, results, insert_count, search_count, insert_latencies, search_latencies))