Skip to content

Commit 81a7d9c

Browse files
authored
Benchmark lots of schemas (#273)
For schema based sharding benchmarks it is useful to be able to create and benchmark lots of schemas.
1 parent 43250eb commit 81a7d9c

File tree

3 files changed

+150
-0
lines changed

3 files changed

+150
-0
lines changed

.gitignore

+1
Original file line numberDiff line numberDiff line change
@@ -1,2 +1,3 @@
11
*.pyc
22
.vscode/
3+
.venv

schema-bench/bench.py

+147
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,147 @@
1+
#!/usr/bin/env python3
2+
3+
import fire
4+
import psycopg
5+
from psycopg import sql
6+
from concurrent.futures import ProcessPoolExecutor
7+
from itertools import repeat
8+
import multiprocessing
9+
import sys
10+
import subprocess
11+
import tempfile
12+
13+
14+
def eprint(*args, **kwargs):
15+
"""eprint prints to stderr"""
16+
17+
print(*args, file=sys.stderr, **kwargs)
18+
19+
20+
def run(command, *args, check=True, shell=True, silent=False, **kwargs):
21+
"""run runs the given command and prints it to stderr"""
22+
23+
if not silent:
24+
eprint(f"+ {command} ")
25+
if silent:
26+
kwargs.setdefault("stdout", subprocess.DEVNULL)
27+
return subprocess.run(command, *args, check=check, shell=shell, **kwargs)
28+
29+
30+
def capture(command, *args, **kwargs):
31+
"""runs the given command and returns its output as a string"""
32+
return run(command, *args, stdout=subprocess.PIPE, text=True, **kwargs).stdout
33+
34+
35+
def create_schemas(args):
36+
connection_string = args[0]
37+
table_count = args[1]
38+
citus = args[2]
39+
foreign_key = args[3]
40+
indexes = args[4]
41+
42+
with psycopg.connect(
43+
connection_string, prepare_threshold=None, autocommit=True
44+
) as conn:
45+
if citus:
46+
conn.execute("SET citus.enable_schema_based_sharding TO ON")
47+
48+
for i in indexes:
49+
schema_string = f"schema_bench_{i}"
50+
schema = sql.Identifier(schema_string)
51+
conn.execute(
52+
sql.SQL("DROP SCHEMA IF EXISTS {} CASCADE").format(
53+
schema,
54+
)
55+
)
56+
fkey_table = sql.Identifier(schema_string, f"table_0")
57+
conn.execute(sql.SQL("CREATE SCHEMA {}").format(schema))
58+
for j in range(table_count):
59+
table = sql.Identifier(schema_string, f"table_{j}")
60+
conn.execute(
61+
sql.SQL(
62+
"CREATE TABLE {}(id bigserial PRIMARY KEY, data text, number bigint, num2 int, string varchar, cased_string citext)"
63+
).format(table)
64+
)
65+
conn.execute(
66+
sql.SQL("CREATE INDEX ON {} (num2)").format(table)
67+
)
68+
if foreign_key:
69+
fkey = sql.Identifier(f"table_{j}_fkey")
70+
conn.execute(
71+
sql.SQL("ALTER TABLE {} ADD CONSTRAINT {} FOREIGN KEY (number) REFERENCES {}(id)").format(table, fkey, fkey_table)
72+
)
73+
74+
75+
def chunkify(l, n):
76+
"""Yield n number of striped chunks from l."""
77+
return [l[i::n] for i in range(n)]
78+
79+
80+
class Bench:
81+
def build(
82+
self,
83+
start=0,
84+
scale=100,
85+
table_count=10,
86+
concurrency=multiprocessing.cpu_count(),
87+
connection_string="",
88+
foreign_key=False,
89+
citus=True,
90+
):
91+
with psycopg.connect(
92+
connection_string, prepare_threshold=None, autocommit=True
93+
) as conn:
94+
conn.execute('CREATE EXTENSION IF NOT EXISTS citext')
95+
96+
chunks = chunkify(list(range(start, start+scale)), concurrency)
97+
with ProcessPoolExecutor(
98+
max_workers=concurrency,
99+
) as executor:
100+
for _ in executor.map(
101+
create_schemas,
102+
zip(repeat(connection_string), repeat(table_count), repeat(citus), repeat(foreign_key), chunks)
103+
):
104+
pass
105+
106+
def run(
107+
self,
108+
scale=100,
109+
table_count=10,
110+
connection_string="",
111+
client=multiprocessing.cpu_count(),
112+
jobs=multiprocessing.cpu_count(),
113+
time=5,
114+
progress=5,
115+
):
116+
with tempfile.NamedTemporaryFile("w") as f:
117+
f.write(
118+
f"""
119+
\\set schemaid random(0, {scale - 1})
120+
\\set tableid random(0, {table_count - 1})
121+
SELECT data, number FROM schema_bench_:schemaid.table_:tableid WHERE id = 1;
122+
""",
123+
)
124+
f.flush()
125+
126+
run(
127+
[
128+
"pgbench",
129+
"--file",
130+
f.name,
131+
"--client",
132+
str(client),
133+
"--jobs",
134+
str(jobs),
135+
"--time",
136+
str(time),
137+
"--progress",
138+
str(progress),
139+
"--no-vacuum",
140+
connection_string,
141+
],
142+
shell=False,
143+
)
144+
145+
146+
if __name__ == "__main__":
147+
fire.Fire(Bench)

schema-bench/requirements.txt

+2
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,2 @@
1+
fire
2+
psycopg[c,pool]

0 commit comments

Comments
 (0)