Skip to content

Commit 7a30af5

Browse files
Replay changing config (#56)
**Summary**: Replay now applies config deltas. **Demo**: Modified test that passes. <img width="626" alt="Screenshot 2024-12-24 at 20 18 09" src="https://github.com/user-attachments/assets/bcbf5e9c-f68c-452c-8a4e-47b1f226c63f" /> **Details**: * Undoing query knobs is not yet handled. This depends on whether the interface takes in config deltas or full configs for query knobs which depends on how the agents work.
1 parent 3cd655f commit 7a30af5

File tree

5 files changed

+107
-38
lines changed

5 files changed

+107
-38
lines changed

env/integtest_replay.py

+36-1
Original file line numberDiff line numberDiff line change
@@ -1,7 +1,14 @@
11
import unittest
22

3+
from benchmark.tpch.constants import DEFAULT_TPCH_SEED
34
from env.integtest_util import IntegtestWorkspace, MockTuningAgent
45
from env.replay import replay
6+
from env.tuning_agent import (
7+
DBMSConfigDelta,
8+
IndexesDelta,
9+
QueryKnobsDelta,
10+
SysKnobsDelta,
11+
)
512

613

714
class ReplayTests(unittest.TestCase):
@@ -11,7 +18,35 @@ def setUpClass() -> None:
1118

1219
def test_replay(self) -> None:
1320
agent = MockTuningAgent(IntegtestWorkspace.get_dbgym_cfg())
14-
replay(IntegtestWorkspace.get_dbgym_cfg(), agent.tuning_agent_artifacts_dpath)
21+
agent.delta_to_return = DBMSConfigDelta(
22+
indexes=IndexesDelta(
23+
["CREATE INDEX idx_orders_custkey ON orders(o_custkey)"]
24+
),
25+
sysknobs=SysKnobsDelta(
26+
{"shared_buffers": "2GB"},
27+
),
28+
qknobs=QueryKnobsDelta(
29+
{
30+
f"S{DEFAULT_TPCH_SEED}-Q1": [
31+
"set enable_hashagg = off",
32+
"set enable_sort = on",
33+
],
34+
}
35+
),
36+
)
37+
agent.step()
38+
replay_data = replay(
39+
IntegtestWorkspace.get_dbgym_cfg(), agent.tuning_agent_artifacts_dpath
40+
)
41+
42+
# We do some very simple sanity checks here due to the inherent randomness of executing a workload.
43+
# We check that there is one data point for the initial config and one for the config change.
44+
self.assertEqual(len(replay_data), 2)
45+
# We check that the second step is faster.
46+
self.assertLess(replay_data[1][0], replay_data[0][0])
47+
# We check that no queries timed out in either step.
48+
self.assertEqual(replay_data[0][1], 0)
49+
self.assertEqual(replay_data[1][1], 0)
1550

1651

1752
if __name__ == "__main__":

env/integtest_tuning_agent.py

+21-13
Original file line numberDiff line numberDiff line change
@@ -23,37 +23,45 @@ def make_config(letter: str) -> DBMSConfigDelta:
2323
qknobs=QueryKnobsDelta({letter: [letter]}),
2424
)
2525

26-
def test_get_step_delta(self) -> None:
26+
def test_get_delta_at_step(self) -> None:
2727
agent = MockTuningAgent(IntegtestWorkspace.get_dbgym_cfg())
2828

29-
agent.config_to_return = PostgresConnTests.make_config("a")
29+
agent.delta_to_return = PostgresConnTests.make_config("a")
3030
agent.step()
31-
agent.config_to_return = PostgresConnTests.make_config("b")
31+
agent.delta_to_return = PostgresConnTests.make_config("b")
3232
agent.step()
33-
agent.config_to_return = PostgresConnTests.make_config("c")
33+
agent.delta_to_return = PostgresConnTests.make_config("c")
3434
agent.step()
3535

3636
reader = TuningAgentArtifactsReader(agent.tuning_agent_artifacts_dpath)
3737

38-
self.assertEqual(reader.get_step_delta(1), PostgresConnTests.make_config("b"))
39-
self.assertEqual(reader.get_step_delta(0), PostgresConnTests.make_config("a"))
40-
self.assertEqual(reader.get_step_delta(1), PostgresConnTests.make_config("b"))
41-
self.assertEqual(reader.get_step_delta(2), PostgresConnTests.make_config("c"))
38+
self.assertEqual(
39+
reader.get_delta_at_step(1), PostgresConnTests.make_config("b")
40+
)
41+
self.assertEqual(
42+
reader.get_delta_at_step(0), PostgresConnTests.make_config("a")
43+
)
44+
self.assertEqual(
45+
reader.get_delta_at_step(1), PostgresConnTests.make_config("b")
46+
)
47+
self.assertEqual(
48+
reader.get_delta_at_step(2), PostgresConnTests.make_config("c")
49+
)
4250

43-
def test_get_all_deltas(self) -> None:
51+
def test_get_all_deltas_in_order(self) -> None:
4452
agent = MockTuningAgent(IntegtestWorkspace.get_dbgym_cfg())
4553

46-
agent.config_to_return = PostgresConnTests.make_config("a")
54+
agent.delta_to_return = PostgresConnTests.make_config("a")
4755
agent.step()
48-
agent.config_to_return = PostgresConnTests.make_config("b")
56+
agent.delta_to_return = PostgresConnTests.make_config("b")
4957
agent.step()
50-
agent.config_to_return = PostgresConnTests.make_config("c")
58+
agent.delta_to_return = PostgresConnTests.make_config("c")
5159
agent.step()
5260

5361
reader = TuningAgentArtifactsReader(agent.tuning_agent_artifacts_dpath)
5462

5563
self.assertEqual(
56-
reader.get_all_deltas(),
64+
reader.get_all_deltas_in_order(),
5765
[
5866
PostgresConnTests.make_config("a"),
5967
PostgresConnTests.make_config("b"),

env/integtest_util.py

+5-5
Original file line numberDiff line numberDiff line change
@@ -92,14 +92,14 @@ def get_default_metadata() -> TuningAgentMetadata:
9292
class MockTuningAgent(TuningAgent):
9393
def __init__(self, *args: Any, **kwargs: Any) -> None:
9494
super().__init__(*args, **kwargs)
95-
self.config_to_return: Optional[DBMSConfigDelta] = None
95+
self.delta_to_return: Optional[DBMSConfigDelta] = None
9696

9797
def _get_metadata(self) -> TuningAgentMetadata:
9898
return IntegtestWorkspace.get_default_metadata()
9999

100100
def _step(self) -> DBMSConfigDelta:
101-
assert self.config_to_return is not None
102-
ret = self.config_to_return
103-
# Setting this ensures you must set self.config_to_return every time.
104-
self.config_to_return = None
101+
assert self.delta_to_return is not None
102+
ret = self.delta_to_return
103+
# Setting this ensures you must set self.delta_to_return every time.
104+
self.delta_to_return = None
105105
return ret

env/replay.py

+37-12
Original file line numberDiff line numberDiff line change
@@ -1,18 +1,24 @@
1-
import logging
1+
from collections import defaultdict
22
from pathlib import Path
33

44
from env.pg_conn import PostgresConn
55
from env.tuning_agent import TuningAgentArtifactsReader
66
from env.workload import Workload
7-
from util.log import DBGYM_OUTPUT_LOGGER_NAME
87
from util.pg import DEFAULT_POSTGRES_PORT
98
from util.workspace import DBGymConfig
109

1110

12-
# TODO: make it return the full replay data.
13-
def replay(dbgym_cfg: DBGymConfig, tuning_agent_artifacts_dpath: Path) -> None:
14-
reader = TuningAgentArtifactsReader(tuning_agent_artifacts_dpath)
11+
def replay(
12+
dbgym_cfg: DBGymConfig, tuning_agent_artifacts_dpath: Path
13+
) -> list[tuple[float, int]]:
14+
"""
15+
Returns the total runtime and the number of timed out queries for each step.
1516
17+
The first step will use no configuration changes.
18+
"""
19+
replay_data: list[tuple[float, int]] = []
20+
21+
reader = TuningAgentArtifactsReader(tuning_agent_artifacts_dpath)
1622
pg_conn = PostgresConn(
1723
dbgym_cfg,
1824
DEFAULT_POSTGRES_PORT,
@@ -28,21 +34,40 @@ def replay(dbgym_cfg: DBGymConfig, tuning_agent_artifacts_dpath: Path) -> None:
2834

2935
pg_conn.restore_pristine_snapshot()
3036
pg_conn.restart_postgres()
31-
total_runtime, num_timed_out_queries = time_workload(pg_conn, workload)
32-
print(f"Total runtime: {total_runtime / 1e6} seconds")
33-
print(f"Number of timed out queries: {num_timed_out_queries}")
37+
qknobs: defaultdict[str, list[str]] = defaultdict(list)
38+
replay_data.append(time_workload(pg_conn, workload, qknobs))
39+
40+
for delta in reader.get_all_deltas_in_order():
41+
pg_conn.restart_with_changes(delta.sysknobs)
42+
43+
for index in delta.indexes:
44+
pg_conn.psql(index)
45+
46+
for query, knobs in delta.qknobs.items():
47+
# TODO: account for deleting a knob if we are representing knobs as deltas.
48+
qknobs[query].extend(knobs)
49+
50+
replay_data.append(time_workload(pg_conn, workload, qknobs))
51+
3452
pg_conn.shutdown_postgres()
53+
return replay_data
3554

3655

37-
def time_workload(pg_conn: PostgresConn, workload: Workload) -> tuple[float, int]:
56+
def time_workload(
57+
pg_conn: PostgresConn, workload: Workload, qknobs: dict[str, list[str]]
58+
) -> tuple[float, int]:
3859
"""
39-
It returns the total runtime and the number of timed out queries.
60+
Returns the total runtime and the number of timed out queries.
4061
"""
4162
total_runtime: float = 0
4263
num_timed_out_queries: int = 0
4364

44-
for query in workload.get_queries_in_order():
45-
runtime, did_time_out, _ = pg_conn.time_query(query)
65+
for qid in workload.get_query_order():
66+
query = workload.get_query(qid)
67+
this_query_knobs = qknobs[qid]
68+
runtime, did_time_out, _ = pg_conn.time_query(
69+
query, query_knobs=this_query_knobs
70+
)
4671
total_runtime += runtime
4772
if did_time_out:
4873
num_timed_out_queries += 1

env/tuning_agent.py

+8-7
Original file line numberDiff line numberDiff line change
@@ -9,6 +9,7 @@
99
# These types are only given as the outputs of tuning agents.
1010
IndexesDelta = NewType("IndexesDelta", list[str])
1111
SysKnobsDelta = NewType("SysKnobsDelta", dict[str, str])
12+
# TODO: I'm not decided whether these should be deltas or full configs. I'm going to figure this out once I integrate Proto-X and UDO.
1213
QueryKnobsDelta = NewType("QueryKnobsDelta", dict[str, list[str]])
1314

1415

@@ -67,7 +68,7 @@ class DBMSConfigDelta:
6768
qknobs: QueryKnobsDelta
6869

6970

70-
def get_step_delta_fpath(tuning_agent_artifacts_dpath: Path, step_num: int) -> Path:
71+
def get_delta_at_step_fpath(tuning_agent_artifacts_dpath: Path, step_num: int) -> Path:
7172
return tuning_agent_artifacts_dpath / f"step{step_num}_delta.json"
7273

7374

@@ -96,7 +97,7 @@ def step(self) -> None:
9697
curr_step_num = self.next_step_num
9798
self.next_step_num += 1
9899
dbms_cfg_delta = self._step()
99-
with get_step_delta_fpath(
100+
with get_delta_at_step_fpath(
100101
self.tuning_agent_artifacts_dpath, curr_step_num
101102
).open("w") as f:
102103
json.dump(asdict(dbms_cfg_delta), f)
@@ -123,7 +124,7 @@ def __init__(self, tuning_agent_artifacts_dpath: Path) -> None:
123124
self.tuning_agent_artifacts_dpath = tuning_agent_artifacts_dpath
124125
assert is_fully_resolved(self.tuning_agent_artifacts_dpath)
125126
num_steps = 0
126-
while get_step_delta_fpath(
127+
while get_delta_at_step_fpath(
127128
self.tuning_agent_artifacts_dpath, num_steps
128129
).exists():
129130
num_steps += 1
@@ -141,9 +142,9 @@ def get_metadata(self) -> TuningAgentMetadata:
141142
pgbin_path=Path(data["pgbin_path"]),
142143
)
143144

144-
def get_step_delta(self, step_num: int) -> DBMSConfigDelta:
145+
def get_delta_at_step(self, step_num: int) -> DBMSConfigDelta:
145146
assert step_num >= 0 and step_num < self.num_steps
146-
with get_step_delta_fpath(self.tuning_agent_artifacts_dpath, step_num).open(
147+
with get_delta_at_step_fpath(self.tuning_agent_artifacts_dpath, step_num).open(
147148
"r"
148149
) as f:
149150
data = json.load(f)
@@ -153,5 +154,5 @@ def get_step_delta(self, step_num: int) -> DBMSConfigDelta:
153154
qknobs=data["qknobs"],
154155
)
155156

156-
def get_all_deltas(self) -> list[DBMSConfigDelta]:
157-
return [self.get_step_delta(step_num) for step_num in range(self.num_steps)]
157+
def get_all_deltas_in_order(self) -> list[DBMSConfigDelta]:
158+
return [self.get_delta_at_step(step_num) for step_num in range(self.num_steps)]

0 commit comments

Comments
 (0)