Skip to content
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions llm_bench/README.md
Original file line number Diff line number Diff line change
Expand Up @@ -65,6 +65,7 @@ Generation options:
- `--chat`: specify to call chat API instead of raw completions
- `--stream`: stream the result back. Enabling this gives "time to first token" and "time per token" metrics
- (optional) `--logprobs`: corresponds to `logprobs` API parameter. For some providers, it's needed for output token counting in streaming mode.
- `--num-queries`: stop sending requests after reaching specified number of queries.

### Writing results

Expand Down
46 changes: 45 additions & 1 deletion llm_bench/load_test.py
Original file line number Diff line number Diff line change
Expand Up @@ -147,6 +147,8 @@ class InitTracker:
logging_params = None
environment = None
tokenizer = None
req_sent = 0
req_quit_called = False

@classmethod
def notify_init(cls, environment, logging_params):
Expand All @@ -163,13 +165,18 @@ def notify_init(cls, environment, logging_params):
@classmethod
def notify_first_request(cls):
with cls.lock:
if cls.environment.parsed_options.qps is not None and cls.first_request_done == 0:
if (
cls.environment.parsed_options.qps is not None
and cls.first_request_done == 0
and cls.environment.parsed_options.num_queries is None
):
# if in QPS mode, reset after first successful request comes back
cls.reset_stats()
cls.first_request_done += 1
if (
cls.environment.parsed_options.qps is not None
and cls.first_request_done == 0
and cls.environment.parsed_options.num_queries is None
and cls.users == cls.first_request_done
):
# if in fixed load mode, reset after all users issued one request (we're in a steady state)
Expand Down Expand Up @@ -202,6 +209,35 @@ def load_tokenizer(cls, dir):
cls.tokenizer.add_bos_token = False
cls.tokenizer.add_eos_token = False
return cls.tokenizer

@classmethod
def try_acquire_request(cls):
"""
Thread-safe: attempt to reserve one request slot.
Returns True if the caller should proceed to send a request.
Returns False if the num_queries (if set) has been reached.
If this acquisition brings the sent count to num_queries, schedule a single runner.stop()
to stop issuing new requests and let in-flight requests finish.
"""
if cls.environment.parsed_options.num_queries is None:
return True
with cls.lock:
if cls.req_sent >= cls.environment.parsed_options.num_queries:
if not cls.req_quit_called:
cls.req_quit_called = True
def do_quit():
try:
if cls.environment and getattr(cls.environment, "runner", None):
print(f"Reached queries-num={cls.environment.parsed_options.num_queries}, signaling shutdown after in-flight requests finish")
cls.environment.runner.stop(graceful=True)
cls.environment.runner.user_count = 0
except Exception as e:
print(f"Failed to quit runner: {e}")
t = threading.Thread(target=do_quit, daemon=True)
t.start()
return False
cls.req_sent += 1
return True


events.spawning_complete.add_listener(InitTracker.notify_spawning_complete)
Expand Down Expand Up @@ -752,6 +788,8 @@ def insert_image_placeholders(self, prompt, num_images, prompt_images_positionin

@task
def generate_text(self):
if not InitTracker.try_acquire_request():
return
max_tokens = self.max_tokens_sampler.sample()
prompt, images = self._get_input()
data = self.provider_formatter.format_payload(prompt, max_tokens, images)
Expand Down Expand Up @@ -1028,6 +1066,12 @@ def init_parser(parser):
default="constant",
help="Must be used with --qps. Specifies how to space out requests: equally ('constant') or by sampling wait times from a distribution ('uniform' or 'exponential'). Expected QPS is going to match --qps",
)
parser.add_argument(
"--num-queries",
type=int,
default=None,
help="Stop sending requests after reaching specified number of queries.",
)
parser.add_argument(
"--burst",
type=float,
Expand Down