Skip to content

Limit concurrent requests to 28000 #19

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 29 commits into
base: main
Choose a base branch
from
Open
Changes from all commits
Commits
Show all changes
29 commits
Select commit Hold shift + click to select a range
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
199 changes: 105 additions & 94 deletions benchmark_serving.py
Original file line number Diff line number Diff line change
Expand Up @@ -25,6 +25,7 @@
from datetime import datetime
import json
import random
import sys
import requests
import time
from typing import AsyncGenerator, List, Optional, Tuple, Dict
Expand All @@ -45,6 +46,7 @@
MIN_SEQ_LEN = 4
NEW_TEXT_KEY = "\nOutput:\n"
PROMETHEUS_PORT = 9090
CONNECTIONS_LIMIT = 28000

# Prometheus Metrics
prompt_length_metric = Histogram("LatencyProfileGenerator:prompt_length", "Input prompt length", buckets=[2**i for i in range(1, 16)])
Expand All @@ -53,13 +55,24 @@
tpot_metric = Histogram('LatencyProfileGenerator:time_per_output_token', 'Time per output token per request (excluding first token)')
ttft_metric = Histogram('LatencyProfileGenerator:time_to_first_token', 'Time to first token per request')
active_requests_metric = Gauge('LatencyProfileGenerator:active_requests', 'How many requests actively being processed')
active_connections_metric = Gauge('LatencyProfileGenerator:active_connections', 'How many active connections')

# Exhaused connections warning should only be printed once per run
connection_limit_reached = False

# Add trace config for monitoring in flight requests
async def on_request_start(session, trace_config_ctx, params):
global connection_limit_reached
active_requests_metric.inc()
active_connections_metric.set(len(session.connector._acquired))
if not connection_limit_reached and len(session.connector._acquired) == CONNECTIONS_LIMIT:
print("Warning: Connection limit reached. Omitting server metrics due to inaccuracy")
connection_limit_reached = True


async def on_request_end(session, trace_config_ctx, params):
active_requests_metric.dec()
active_connections_metric.set(len(session.connector._acquired))

trace_config = aiohttp.TraceConfig()
trace_config.on_request_start.append(on_request_start)
Expand Down Expand Up @@ -153,6 +166,7 @@ def init_errors_map() -> Dict[str, int]:

async def send_stream_request(
backend: str,
clientSession: any,
api_url: str,
prompt: str,
prompt_len: int,
Expand Down Expand Up @@ -198,51 +212,50 @@ async def send_stream_request(
most_recent_timestamp = st
output = ""
timeout = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(timeout=timeout,trust_env=True) as session:
try:
async with session.post(api_url, headers=headers, json=pload, ssl=False) as response:
async for chunk_bytes in response.content.iter_chunks():
chunk_bytes = chunk_bytes[0].strip()
if not chunk_bytes:
continue
timestamp = time.perf_counter()
# First token
if ttft == 0.0:
ttft = timestamp - st
else:
itl.append(timestamp - most_recent_timestamp)
most_recent_timestamp = timestamp
if backend == "vllm":
if chunk_bytes.decode("utf-8")[6:] != "[DONE]":
output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"]
elif backend == "jetstream":
if chunk_bytes.decode("utf-8") != "":
output += json.loads(chunk_bytes.decode("utf-8"))["text"]

except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, None, errors
try:
async with clientSession.post(api_url, headers=headers, json=pload, ssl=False) as response:
async for chunk_bytes in response.content.iter_chunks():
chunk_bytes = chunk_bytes[0].strip()
if not chunk_bytes:
continue
timestamp = time.perf_counter()
# First token
if ttft == 0.0:
ttft = timestamp - st
else:
itl.append(timestamp - most_recent_timestamp)
most_recent_timestamp = timestamp
if backend == "vllm":
if chunk_bytes.decode("utf-8")[6:] != "[DONE]":
output += json.loads(chunk_bytes.decode("utf-8")[6:])["choices"][0]["text"]
elif backend == "jetstream":
if chunk_bytes.decode("utf-8") != "":
output += json.loads(chunk_bytes.decode("utf-8"))["text"]

except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, None, errors
request_end_time = time.time()
output_token_ids = tokenizer(output).input_ids
output_len = len(output_token_ids)
Expand All @@ -259,6 +272,7 @@ async def send_stream_request(
return request_latency, ttft, itl, None

async def send_request(
clientSession: any,
backend: str,
api_url: str,
prompt: str,
Expand Down Expand Up @@ -343,41 +357,37 @@ async def send_request(
else:
raise ValueError(f"Unknown backend: {backend}")

# Set client timeout to be 3 hrs.
timeout = aiohttp.ClientTimeout(total=timeout)
async with aiohttp.ClientSession(timeout=timeout,trust_env=True,trace_configs=[trace_config]) as session:
while True:
try:
async with session.post(api_url, headers=headers, json=pload, ssl=False) as response:
output = await response.json()

# Re-send the request if it failed.
if "error" not in output:
break
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, None, errors
while True:
try:
async with clientSession.post(api_url, headers=headers, json=pload, ssl=False, timeout=None) as response:
output = await response.json()
# Re-send the request if it failed.
if "error" not in output:
break
except aiohttp.client_exceptions.ClientConnectorError as client_err:
errors["ClientConnectorError"] += 1
print(f"ClientConnectorError: {client_err}")
return None, None, None, errors
except asyncio.TimeoutError as timeout_err:
errors["TimeoutError"] += 1
print(f"TimeoutError: {timeout_err}")
return None, None, None, errors
except aiohttp.client_exceptions.ClientOSError as e:
errors["ClientOSError"] += 1
print(f"ClientOSError: {e}")
return None, None, None, errors
except aiohttp.client_exceptions.ContentTypeError as e:
print(f"ContentTypeError: {e}, response: {response}")
errors["ContentTypeError"] += 1
return None, None, None, errors
except aiohttp.client_exceptions.ServerDisconnectedError as e:
errors["ServerDisconnectedError"] += 1
print(f"ServerDisconnectedError: {e}")
return None, None, None, errors
except Exception as e:
print(f"Unknown error {e}")
errors["unknown_error"] += 1
return None, None, None, errors

request_end_time = time.time()
# Naive HF transformers generation and TensorRT-LLM generation stops at EOS
Expand Down Expand Up @@ -414,15 +424,15 @@ async def send_request(
return request_latency, None, None, None


async def run_single_request(args: argparse.Namespace, api_url: str, tokenizer: PreTrainedTokenizerBase,
async def run_single_request(args: argparse.Namespace, clientSession: any, api_url: str, tokenizer: PreTrainedTokenizerBase,
prompt: str, prompt_len: int, output_len: int, chosen_model: str) -> Tuple[str, Tuple]:
if args.stream_request:
result = await send_stream_request(
args.backend, api_url, prompt, prompt_len, output_len,
clientSession, args.backend, api_url, prompt, prompt_len, output_len,
args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, chosen_model, args.request_timeout,)
else:
result = await send_request(
args.backend, api_url, prompt, prompt_len, output_len,
clientSession, args.backend, api_url, prompt, prompt_len, output_len,
args.best_of, args.use_beam_search, args.top_k, tokenizer, args.sax_model, chosen_model, args.request_timeout,)
return chosen_model, result

Expand Down Expand Up @@ -456,16 +466,16 @@ async def benchmark(
benchmark_start_time = time.time()
tasks: List[asyncio.Task] = []
prompts_sent = 0
async for request in generate_next_request(input_requests, args.request_rate):
if prompts_sent >= args.num_prompts:
break
prompt, prompt_len, output_len = request
chosen_model = random.choices(model_names, weights=model_weights)[0]
task = asyncio.create_task(run_single_request(args, api_url, tokenizer, prompt, prompt_len, output_len, chosen_model))
tasks.append(task)
prompts_sent += 1

results = await asyncio.gather(*tasks)
async with aiohttp.ClientSession(trust_env=False, connector=aiohttp.TCPConnector(keepalive_timeout=30, enable_cleanup_closed=True, limit=CONNECTIONS_LIMIT,),timeout=None, trace_configs=[trace_config]) as clientSession:
async for request in generate_next_request(input_requests, args.request_rate):
if prompts_sent >= args.num_prompts:
break
prompt, prompt_len, output_len = request
chosen_model = random.choices(model_names, weights=model_weights)[0]
task = asyncio.create_task(run_single_request(args, clientSession, api_url, tokenizer, prompt, prompt_len, output_len, chosen_model))
tasks.append(task)
prompts_sent += 1
results = await asyncio.gather(*tasks)

overall_results = {"latencies": [], "ttfts": [], "itls": [], "tpots": [], "errors": init_errors_map()}
per_model_results: Dict[str, Dict[str, List]] = {}
Expand Down Expand Up @@ -830,7 +840,8 @@ def print_and_save_result(args: argparse.Namespace, benchmark_duration, total_re
}

server_metrics = {}
if args.scrape_server_metrics:
global connection_limit_reached
if args.scrape_server_metrics and not connection_limit_reached:
server_metrics = print_metrics(metrics_to_scrape(args.backend), benchmark_duration, args.pm_namespace, args.pm_job)
if args.save_json_results:
save_json_results(args, benchmark_result, server_metrics, model, errors)
Expand Down