Skip to content

Commit

Permalink
moved pg.py stuff into dbms/postgres/cli.py
Browse files Browse the repository at this point in the history
  • Loading branch information
wangpatrick57 committed Dec 30, 2024
1 parent d4a3e99 commit 7c05fa6
Show file tree
Hide file tree
Showing 3 changed files with 111 additions and 128 deletions.
118 changes: 106 additions & 12 deletions dbms/postgres/cli.py
Original file line number Diff line number Diff line change
Expand Up @@ -6,9 +6,12 @@
import shutil
import subprocess
from pathlib import Path
from typing import Optional
from typing import Any, Optional

import click
import pglast
import psutil
import psycopg
import sqlalchemy
from gymlib import (
get_dbdata_tgz_symlink_path,
Expand All @@ -17,22 +20,12 @@
linkname_to_name,
)
from gymlib.shell import subprocess_run
from sqlalchemy import create_engine, text

from benchmark.constants import DEFAULT_SCALE_FACTOR
from benchmark.job.load_info import JobLoadInfo
from benchmark.tpch.load_info import TpchLoadInfo
from dbms.load_info_base_class import LoadInfoBaseClass
from util.pg import (
DBGYM_POSTGRES_DBNAME,
DBGYM_POSTGRES_PASS,
DBGYM_POSTGRES_USER,
DEFAULT_POSTGRES_DBNAME,
DEFAULT_POSTGRES_PORT,
SHARED_PRELOAD_LIBRARIES,
create_sqlalchemy_conn,
sql_file_execute,
sqlalchemy_conn_execute,
)
from util.workspace import (
WORKSPACE_PATH_PLACEHOLDER,
DBGymWorkspace,
Expand All @@ -42,6 +35,13 @@
is_ssd,
)

DBGYM_POSTGRES_USER = "dbgym_user"
DBGYM_POSTGRES_PASS = "dbgym_pass"
DBGYM_POSTGRES_DBNAME = "dbgym"
DEFAULT_POSTGRES_DBNAME = "postgres"
DEFAULT_POSTGRES_PORT = 5432
SHARED_PRELOAD_LIBRARIES = "boot,pg_hint_plan,pg_prewarm"


@click.group(name="postgres")
@click.pass_obj
Expand Down Expand Up @@ -359,3 +359,97 @@ def _start_or_stop_postgres(
f"./pg_ctl -D \"{dbdata_path}\" -o '-p {pgport}' stop",
cwd=pgbin_path,
)


def sqlalchemy_conn_execute(
conn: sqlalchemy.Connection, sql: str
) -> sqlalchemy.engine.CursorResult[Any]:
return conn.execute(text(sql))


def sql_file_queries(dbgym_workspace: DBGymWorkspace, filepath: Path) -> list[str]:
with dbgym_workspace.open_and_save(filepath) as f:
lines: list[str] = []
for line in f:
if line.startswith("--"):
continue
if len(line.strip()) == 0:
continue
lines.append(line)
queries_str = "".join(lines)
queries: list[str] = pglast.split(queries_str)
return queries


def sql_file_execute(
dbgym_workspace: DBGymWorkspace, conn: sqlalchemy.Connection, filepath: Path
) -> None:
for sql in sql_file_queries(dbgym_workspace, filepath):
sqlalchemy_conn_execute(conn, sql)


# The reason pgport is an argument is because when doing agnet HPO, we want to run multiple instances of Postgres
# at the same time. In this situation, they need to have different ports
def get_connstr(pgport: int = DEFAULT_POSTGRES_PORT, use_psycopg: bool = True) -> str:
connstr_suffix = f"{DBGYM_POSTGRES_USER}:{DBGYM_POSTGRES_PASS}@localhost:{pgport}/{DBGYM_POSTGRES_DBNAME}"
# use_psycopg means whether or not we use the psycopg.connect() function
# counterintuively, you *don't* need psycopg in the connection string if you *are*
# using the psycopg.connect() function
connstr_prefix = "postgresql" if use_psycopg else "postgresql+psycopg"
return connstr_prefix + "://" + connstr_suffix


def get_kv_connstr(pgport: int = DEFAULT_POSTGRES_PORT) -> str:
return f"host=localhost port={pgport} user={DBGYM_POSTGRES_USER} password={DBGYM_POSTGRES_PASS} dbname={DBGYM_POSTGRES_DBNAME}"


def create_psycopg_conn(pgport: int = DEFAULT_POSTGRES_PORT) -> psycopg.Connection[Any]:
connstr = get_connstr(use_psycopg=True, pgport=pgport)
psycopg_conn = psycopg.connect(connstr, autocommit=True, prepare_threshold=None)
return psycopg_conn


def create_sqlalchemy_conn(
pgport: int = DEFAULT_POSTGRES_PORT,
) -> sqlalchemy.Connection:
connstr = get_connstr(use_psycopg=False, pgport=pgport)
engine: sqlalchemy.Engine = create_engine(
connstr,
execution_options={"isolation_level": "AUTOCOMMIT"},
)
return engine.connect()


def get_is_postgres_running() -> bool:
"""
This is often used in assertions to ensure that Postgres isn't running before we
execute some code.
I intentionally do not have a function that forcefully *stops* all Postgres instances.
This is risky because it could accidentally stop instances it wasn't supposed (e.g.
Postgres instances run by other users on the same machine).
Stopping Postgres instances is thus a responsibility of the human to take care of.
"""
return len(get_running_postgres_ports()) > 0


def get_running_postgres_ports() -> list[int]:
"""
Returns a list of all ports on which Postgres is currently running.
There are ways to check with psycopg/sqlalchemy. However, I chose to check using
psutil to keep it as simple as possible and orthogonal to how connections work.
"""
running_ports = []

for conn in psutil.net_connections(kind="inet"):
if conn.status == "LISTEN":
try:
proc = psutil.Process(conn.pid)
if proc.name() == "postgres":
running_ports.append(conn.laddr.port)
except (psutil.NoSuchProcess, psutil.AccessDenied):
continue

return running_ports
6 changes: 5 additions & 1 deletion env/pg_conn.py
Original file line number Diff line number Diff line change
Expand Up @@ -22,7 +22,11 @@
from plumbum import local
from psycopg.errors import ProgramLimitExceeded, QueryCanceled

from util.pg import DBGYM_POSTGRES_DBNAME, SHARED_PRELOAD_LIBRARIES, get_kv_connstr
from dbms.postgres.cli import (
DBGYM_POSTGRES_DBNAME,
SHARED_PRELOAD_LIBRARIES,
get_kv_connstr,
)
from util.workspace import DBGymWorkspace, parent_path_of_path

CONNECT_TIMEOUT = 300
Expand Down
115 changes: 0 additions & 115 deletions util/pg.py

This file was deleted.

0 comments on commit 7c05fa6

Please sign in to comment.