Skip to content

Commit c3aa588

Browse files
committed
Update following kv events added to KVConnectorOutput
This commit follows recommendation from @markmc to use a specific event property instead of piggybacking on stats. PR vllm-project#28309 adds the events property to KVConnectorOutput and this commit picks up the new property and uses it to pass the events from worker side to scheduler side. Signed-off-by: Martin Hickey <[email protected]>
1 parent 15e7aa8 commit c3aa588

File tree

1 file changed

+11
-64
lines changed

1 file changed

+11
-64
lines changed

vllm/distributed/kv_transfer/kv_connector/v1/lmcache_connector.py

Lines changed: 11 additions & 64 deletions
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,7 @@
11
# SPDX-License-Identifier: Apache-2.0
22
# SPDX-FileCopyrightText: Copyright contributors to the vLLM project
3+
import time
34
from collections.abc import Iterable
4-
from dataclasses import dataclass
55
from typing import TYPE_CHECKING, Any, Optional
66

77
import torch
@@ -10,13 +10,12 @@
1010
)
1111

1212
from vllm.config import VllmConfig
13-
from vllm.distributed.kv_events import BlockStored, KVCacheEvent
13+
from vllm.distributed.kv_events import BlockStored, KVCacheEvent, KVEventBatch
1414
from vllm.distributed.kv_transfer.kv_connector.v1.base import (
1515
KVConnectorBase_V1,
1616
KVConnectorMetadata,
1717
KVConnectorRole,
1818
)
19-
from vllm.distributed.kv_transfer.kv_connector.v1.metrics import KVConnectorStats
2019
from vllm.logger import init_logger
2120
from vllm.v1.core.sched.output import SchedulerOutput
2221
from vllm.v1.outputs import KVConnectorOutput
@@ -31,50 +30,6 @@
3130
logger = init_logger(__name__)
3231

3332

34-
@dataclass
35-
class LMCacheKVEvents(KVConnectorStats):
36-
"""
37-
Maintain a list of KV events
38-
"""
39-
40-
def aggregate(self, other: "KVConnectorStats") -> "LMCacheKVEvents":
41-
if not other and not isinstance(other, LMCacheKVEvents):
42-
raise TypeError("Can only aggregate with another LMCacheKVEvents")
43-
44-
if other.is_empty():
45-
return self
46-
47-
if self.is_empty():
48-
self.data["kv_events"] = []
49-
50-
other_events = other.get_kv_events()
51-
for other_event in other_events:
52-
self.data["kv_events"].append(other_event)
53-
54-
return self
55-
56-
def reset(self):
57-
self.data.clear()
58-
59-
def reduce(self) -> dict[str, int | float]:
60-
return {
61-
"kv_events": 0,
62-
}
63-
64-
def add_kv_event(self, event: BlockStored):
65-
if self.is_empty():
66-
self.data["kv_events"] = []
67-
self.data["kv_events"].append(event)
68-
69-
def get_kv_events(self) -> list[BlockStored] | None:
70-
if self.is_empty():
71-
return None
72-
return self.data["kv_events"]
73-
74-
def is_empty(self) -> bool:
75-
return not self.data or self.data.get("kv_events", 0) == 0
76-
77-
7833
class LMCacheConnectorV1(KVConnectorBase_V1):
7934
def __init__(
8035
self,
@@ -187,20 +142,18 @@ def get_finished(
187142
"""
188143
return self._lmcache_engine.get_finished(finished_req_ids)
189144

190-
def get_kv_connector_stats(self) -> Optional["KVConnectorStats"]:
145+
def get_kv_connector_kv_cache_events(self) -> Optional["KVEventBatch"]:
191146
"""
192-
Get the KV connector stats collected during the last interval.
147+
Get the KV connector kv cache events collected during the last interval.
193148
"""
194-
assert self._lmcache_engine is not None
195-
196149
events = self._lmcache_engine.get_kv_events()
197150
if not events:
198151
return None
199152

200-
lmcache_kv_events: LMCacheKVEvents | None = None
153+
lmcache_kv_events: KVEventBatch | None = None
201154
for event in events:
202155
if lmcache_kv_events is None:
203-
lmcache_kv_events = LMCacheKVEvents()
156+
lmcache_kv_events = KVEventBatch(ts=time.time(), events=[])
204157
block = BlockStored(
205158
block_hashes=event.block_hashes,
206159
parent_block_hash=event.parent_block_hash,
@@ -209,7 +162,7 @@ def get_kv_connector_stats(self) -> Optional["KVConnectorStats"]:
209162
block_size=event.block_size,
210163
medium=event.medium,
211164
)
212-
lmcache_kv_events.add_kv_event(block)
165+
lmcache_kv_events.events.append(block)
213166

214167
return lmcache_kv_events
215168

@@ -269,14 +222,14 @@ def update_connector_output(self, connector_output: KVConnectorOutput):
269222
connectors output.
270223
"""
271224
# Get the KV events
272-
kv_events = connector_output.kv_connector_stats
225+
kv_events = connector_output.kv_cache_events
273226
if (
274227
not kv_events
275-
or not isinstance(kv_events, LMCacheKVEvents)
276-
or kv_events.is_empty()
228+
or not isinstance(kv_events, KVEventBatch)
229+
or not kv_events.events
277230
):
278231
return
279-
self._kv_events = kv_events.get_kv_events()
232+
self._kv_events.extend(kv_events.events)
280233
return
281234

282235
def request_finished(
@@ -306,9 +259,3 @@ def take_events(self) -> Iterable["KVCacheEvent"]:
306259
if self._kv_events is not None:
307260
yield from self._kv_events
308261
self._kv_events.clear()
309-
310-
@classmethod
311-
def build_kv_connector_stats(
312-
cls, data: dict[str, Any] | None = None
313-
) -> KVConnectorStats | None:
314-
return LMCacheKVEvents(data=data) if data is not None else LMCacheKVEvents()

0 commit comments

Comments
 (0)