Skip to content

Commit 0b9f3ee

Browse files
authored
Merge pull request #42 from mpdimitr/mixed-workload
exposed metrics for inserts
2 parents 6a6707f + bc7074d commit 0b9f3ee

File tree

1 file changed

+92
-29
lines changed

1 file changed

+92
-29
lines changed

engine/base_client/search.py

Lines changed: 92 additions & 29 deletions
Original file line numberDiff line numberDiff line change
@@ -164,13 +164,26 @@ def cycling_query_generator(queries, total_count):
164164

165165
# Process queries with progress updates
166166
results = []
167+
total_insert_count = 0
168+
total_search_count = 0
169+
all_insert_latencies = []
170+
all_search_latencies = []
171+
167172
for query in used_queries:
168-
results.append(search_one(query))
173+
if random.random() < insert_fraction:
174+
precision, latency = insert_one(query)
175+
total_insert_count += 1
176+
all_insert_latencies.append(latency)
177+
results.append(('insert', precision, latency))
178+
else:
179+
precision, latency = search_one(query)
180+
total_search_count += 1
181+
all_search_latencies.append(latency)
182+
results.append(('search', precision, latency))
169183
pbar.update(1)
170184

171185
# Close the progress bar
172186
pbar.close()
173-
174187
total_time = time.perf_counter() - start
175188
else:
176189
# Dynamically calculate chunk size based on total_query_count
@@ -206,10 +219,19 @@ def cycling_query_generator(queries, total_count):
206219

207220
# Collect results from all worker processes
208221
results = []
222+
total_insert_count = 0
223+
total_search_count = 0
224+
all_insert_latencies = []
225+
all_search_latencies = []
209226
min_start_time = time.perf_counter()
227+
210228
for _ in processes:
211-
proc_start_time, chunk_results = result_queue.get()
229+
proc_start_time, chunk_results, insert_count, search_count, insert_latencies, search_latencies = result_queue.get()
212230
results.extend(chunk_results)
231+
total_insert_count += insert_count
232+
total_search_count += search_count
233+
all_insert_latencies.extend(insert_latencies)
234+
all_search_latencies.extend(search_latencies)
213235

214236
# Update min_start_time if necessary
215237
if proc_start_time < min_start_time:
@@ -222,24 +244,53 @@ def cycling_query_generator(queries, total_count):
222244
for process in processes:
223245
process.join()
224246

225-
# Extract precisions and latencies (outside the timed section)
226-
precisions, latencies = zip(*results)
247+
# Extract overall precisions and latencies
248+
all_precisions = [result[1] for result in results]
249+
all_latencies = [result[2] for result in results]
250+
251+
# Calculate search-only precisions (exclude inserts from precision calculation)
252+
search_precisions = [result[1] for result in results if result[0] == 'search']
227253

228254
self.__class__.delete_client()
229255

230256
return {
257+
# Overall metrics
231258
"total_time": total_time,
232-
"mean_time": np.mean(latencies),
233-
"mean_precisions": np.mean(precisions),
234-
"std_time": np.std(latencies),
235-
"min_time": np.min(latencies),
236-
"max_time": np.max(latencies),
237-
"rps": len(latencies) / total_time,
238-
"p50_time": np.percentile(latencies, 50),
239-
"p95_time": np.percentile(latencies, 95),
240-
"p99_time": np.percentile(latencies, 99),
241-
"precisions": precisions,
242-
"latencies": latencies,
259+
"total_operations": len(all_latencies),
260+
"rps": len(all_latencies) / total_time,
261+
262+
# Search metrics
263+
"search_count": total_search_count,
264+
"search_rps": total_search_count / total_time if total_search_count > 0 else 0,
265+
"mean_search_time": np.mean(all_search_latencies) if all_search_latencies else 0,
266+
"mean_search_precision": np.mean(search_precisions) if search_precisions else 0,
267+
"p50_search_time": np.percentile(all_search_latencies, 50) if all_search_latencies else 0,
268+
"p95_search_time": np.percentile(all_search_latencies, 95) if all_search_latencies else 0,
269+
"p99_search_time": np.percentile(all_search_latencies, 99) if all_search_latencies else 0,
270+
271+
# Insert metrics
272+
"insert_count": total_insert_count,
273+
"insert_rps": total_insert_count / total_time if total_insert_count > 0 else 0,
274+
"mean_insert_time": np.mean(all_insert_latencies) if all_insert_latencies else 0,
275+
"p50_insert_time": np.percentile(all_insert_latencies, 50) if all_insert_latencies else 0,
276+
"p95_insert_time": np.percentile(all_insert_latencies, 95) if all_insert_latencies else 0,
277+
"p99_insert_time": np.percentile(all_insert_latencies, 99) if all_insert_latencies else 0,
278+
279+
# Mixed workload metrics
280+
"actual_insert_fraction": total_insert_count / len(all_latencies) if len(all_latencies) > 0 else 0,
281+
"target_insert_fraction": insert_fraction,
282+
283+
# Legacy compatibility (for existing code that expects these)
284+
"mean_time": np.mean(all_latencies),
285+
"mean_precisions": np.mean(search_precisions) if search_precisions else 1.0, # Only search precisions
286+
"std_time": np.std(all_latencies),
287+
"min_time": np.min(all_latencies),
288+
"max_time": np.max(all_latencies),
289+
"p50_time": np.percentile(all_latencies, 50),
290+
"p95_time": np.percentile(all_latencies, 95),
291+
"p99_time": np.percentile(all_latencies, 99),
292+
"precisions": search_precisions, # Only search precisions
293+
"latencies": all_latencies,
243294
}
244295

245296
def setup_search(self):
@@ -259,6 +310,27 @@ def chunked_iterable(iterable, size):
259310
while chunk := list(islice(it, size)):
260311
yield chunk
261312

313+
def process_chunk(chunk, search_one, insert_one, insert_fraction):
314+
results = []
315+
insert_count = 0
316+
search_count = 0
317+
insert_latencies = []
318+
search_latencies = []
319+
320+
for i, query in enumerate(chunk):
321+
if random.random() < insert_fraction:
322+
precision, latency = insert_one(query)
323+
insert_count += 1
324+
insert_latencies.append(latency)
325+
results.append(('insert', precision, latency))
326+
else:
327+
precision, latency = search_one(query)
328+
search_count += 1
329+
search_latencies.append(latency)
330+
results.append(('search', precision, latency))
331+
332+
return results, insert_count, search_count, insert_latencies, search_latencies
333+
262334
# Function to be executed by each worker process
263335
def worker_function(self, distance, search_one, insert_one, chunk, result_queue, insert_fraction=0.0):
264336
self.init_client(
@@ -270,16 +342,7 @@ def worker_function(self, distance, search_one, insert_one, chunk, result_queue,
270342
self.setup_search()
271343

272344
start_time = time.perf_counter()
273-
results = process_chunk(chunk, search_one, insert_one, insert_fraction)
274-
result_queue.put((start_time, results))
275-
276-
277-
def process_chunk(chunk, search_one, insert_one, insert_fraction):
278-
results = []
279-
for i, query in enumerate(chunk):
280-
if random.random() < insert_fraction:
281-
result = insert_one(query)
282-
else:
283-
result = search_one(query)
284-
results.append(result)
285-
return results
345+
results, insert_count, search_count, insert_latencies, search_latencies = process_chunk(
346+
chunk, search_one, insert_one, insert_fraction
347+
)
348+
result_queue.put((start_time, results, insert_count, search_count, insert_latencies, search_latencies))

0 commit comments

Comments
 (0)