Skip to content

[Python/Blacksheep] Adjust the unit startup mode #9886

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 33 commits into
base: master
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
152 changes: 152 additions & 0 deletions frameworks/Python/blacksheep/app-socketify.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,152 @@
import multiprocessing
import os
import psycopg
import platform
import random
import asyncio
import blacksheep as bs
import jinja2
from pathlib import Path
from psycopg_pool import AsyncConnectionPool

READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = %s'
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=%s WHERE id=%s'
ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
CORE_COUNT = multiprocessing.cpu_count()

MAX_POOL_SIZE = CORE_COUNT * 2
MIN_POOL_SIZE = max(1, MAX_POOL_SIZE // 2)
db_pool = None

async def setup_db(app):
global db_pool
conninfo = (
f"postgresql://{os.getenv('PGUSER', 'benchmarkdbuser')}:{os.getenv('PGPASS', 'benchmarkdbpass')}"
f"@tfb-database:5432/hello_world"
)
db_pool = AsyncConnectionPool(
conninfo=conninfo,
min_size=MIN_POOL_SIZE,
max_size=MAX_POOL_SIZE,
open=False,
timeout=5.0,
max_lifetime=1800,
)
await db_pool.open()

async def shutdown_db(app):
global db_pool
if db_pool is not None:
await db_pool.close()
db_pool = None

def load_fortunes_template():
with Path("templates/fortune.html").open("r") as f:
return jinja2.Template(f.read())

fortune_template = load_fortunes_template()

app = bs.Application()
app.on_start += setup_db
app.on_stop += shutdown_db

def get_num_queries(request):
try:
value = request.query.get('queries')
if value is None:
return 1
query_count = int(value[0])
except (KeyError, IndexError, ValueError):
return 1
return min(max(query_count, 1), 500)

JSON_CONTENT_TYPE = b"application/json"

@bs.get('/json')
async def json_test(request):
return bs.json({'message': 'Hello, world!'})

@bs.get('/db')
async def single_db_query_test(request):
row_id = random.randint(1, 10000)
async with db_pool.connection() as db_conn:
async with db_conn.cursor() as cursor:
await cursor.execute(READ_ROW_SQL, (row_id,))
number = await cursor.fetchone()
return bs.json({'id': row_id, 'randomNumber': number[1]})

@bs.get('/queries')
async def multiple_db_queries_test(request):
num_queries = get_num_queries(request)
row_ids = random.sample(range(1, 10000), num_queries)
worlds = []
async with db_pool.connection() as db_conn:
async with db_conn.cursor() as cursor:
for row_id in row_ids:
await cursor.execute(READ_ROW_SQL, (row_id,))
number = await cursor.fetchone()
worlds.append({"id": row_id, "randomNumber": number[1]})
return bs.json(worlds)

@bs.get('/fortunes')
async def fortunes_test(request):
async with db_pool.connection() as db_conn:
async with db_conn.cursor() as cursor:
await cursor.execute("SELECT * FROM Fortune")
fortunes = await cursor.fetchall()
fortunes.append(ADDITIONAL_ROW)
fortunes.sort(key=lambda row: row[1])
data = fortune_template.render(fortunes=fortunes)
return bs.html(data)

@bs.get('/updates')
async def db_updates_test(request):
num_queries = get_num_queries(request)
updates = sorted(zip(
random.sample(range(1, 10000), num_queries),
random.sample(range(1, 10000), num_queries)
), key=lambda x: x[1])
worlds = [{"id": row_id, "randomNumber": number} for row_id, number in updates]
for _ in range(5):
async with db_pool.connection() as db_conn:
try:
await db_conn.execute("SET TRANSACTION ISOLATION LEVEL READ COMMITTED")
async with db_conn.cursor() as cursor:
for row_id, number in updates:
await cursor.execute(READ_ROW_SQL, (row_id,))
await cursor.fetchone()
for _ in range(5):
try:
await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
return bs.json(worlds)
except psycopg.errors.DeadlockDetected:
await db_conn.rollback()
continue
# await cursor.executemany(WRITE_ROW_SQL, [(number, row_id) for row_id, number in updates])
except (psycopg.errors.OperationalError, psycopg.errors.PipelineAborted):
await db_conn.rollback()
continue
raise Exception("connect error")

@bs.get('/plaintext')
async def plaintext_test(request):
return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))

if platform.python_implementation() == 'PyPy':
import logging
from socketify import ASGI
workers = int(multiprocessing.cpu_count())
if os.environ.get('TRAVIS') == 'true':
workers = 2

def run_app():
ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run()

def create_fork():
n = os.fork()
if not n > 0:
run_app()

for i in range(1, workers):
create_fork()
run_app()
72 changes: 48 additions & 24 deletions frameworks/Python/blacksheep/app.py
Original file line number Diff line number Diff line change
@@ -1,28 +1,29 @@
import multiprocessing
import os
import asyncpg
import platform
import random
import asyncio
from operator import itemgetter
import blacksheep as bs
import jinja2
import msgspec
from pathlib import Path

READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
MAX_POOL_SIZE = 1000 // multiprocessing.cpu_count()
MIN_POOL_SIZE = max(int(MAX_POOL_SIZE / 2), 1)
db_pool = None
key = itemgetter(1)

try:
import uvloop
asyncio.set_event_loop_policy(uvloop.EventLoopPolicy())
except Exception:
...

READ_ROW_SQL = 'SELECT "id", "randomnumber" FROM "world" WHERE id = $1'
WRITE_ROW_SQL = 'UPDATE "world" SET "randomnumber"=$1 WHERE id=$2'
ADDITIONAL_ROW = [0, "Additional fortune added at request time."]
MAX_CONNECTIONS = 1900
CORE_COUNT = multiprocessing.cpu_count()
MAX_POOL_SIZE = max(1,int(os.getenv('MAX_POOL_SIZE', MAX_CONNECTIONS // CORE_COUNT)))
MIN_POOL_SIZE = max(1,int(os.getenv('MIN_POOL_SIZE', MAX_POOL_SIZE // 2)))

db_pool = None

async def setup_db(app):
global db_pool
db_pool = await asyncpg.create_pool(
Expand All @@ -35,6 +36,12 @@ async def setup_db(app):
max_size=MAX_POOL_SIZE,
)

async def shutdown_db(app):
"""Close asyncpg connection pool for the current process."""
global db_pool
if db_pool is not None:
await db_pool.close()
db_pool = None

def load_fortunes_template():
with Path("templates/fortune.html").open("r") as f:
Expand All @@ -45,7 +52,7 @@ def load_fortunes_template():

app = bs.Application()
app.on_start += setup_db

app.on_stop += shutdown_db

def get_num_queries(request):
try:
Expand All @@ -55,14 +62,9 @@ def get_num_queries(request):
query_count = int(value[0])
except (KeyError, IndexError, ValueError):
return 1
if query_count < 1:
return 1
if query_count > 500:
return 500
return query_count
return min(max(query_count, 1), 500)

ENCODER = msgspec.json.Encoder()
DECODER = msgspec.json.Decoder()
JSON_CONTENT_TYPE = b"application/json"
def jsonify(
data,
Expand Down Expand Up @@ -122,26 +124,25 @@ async def fortunes_test(request):
fortunes = await db_conn.fetch("SELECT * FROM Fortune")

fortunes.append(ADDITIONAL_ROW)
fortunes.sort(key = key)
fortunes.sort(key=lambda row: row[1])
data = fortune_template.render(fortunes=fortunes)
return bs.html(data)


@bs.get('/updates')
async def db_updates_test(request):
num_queries = get_num_queries(request)
ids = sorted(random.sample(range(1, 10000 + 1), num_queries))
numbers = sorted(random.sample(range(1, 10000), num_queries))
updates = list(zip(ids, numbers))

# worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
updates = list(zip(
random.sample(range(1, 10000), num_queries),
sorted(random.sample(range(1, 10000), num_queries))
))
worlds = [Result(id=row_id, randomNumber=number) for row_id, number in updates]
# worlds = [ {"id": row_id, "randomNumber": number} for row_id, number in updates ]
async with db_pool.acquire() as db_conn:
statement = await db_conn.prepare(READ_ROW_SQL)
for row_id, _ in updates:
await statement.fetchval(row_id)
await db_conn.executemany(WRITE_ROW_SQL, updates)

return jsonify(worlds)


Expand All @@ -150,3 +151,26 @@ async def plaintext_test(request):
return bs.Response(200, content=bs.Content(b"text/plain", b'Hello, World!'))
#return bs.text('Hello, World!')


if platform.python_implementation() == 'PyPy':
from socketify import ASGI
workers = int(multiprocessing.cpu_count())
if _is_travis:
workers = 2

def run_app():
ASGI(app).listen(8080, lambda config: logging.info(f"Listening on port http://localhost:{config.port} now\n")).run()


def create_fork():
n = os.fork()
# n greater than 0 means parent process
if not n > 0:
run_app()


# fork limiting the cpu count - 1
for i in range(1, workers):
create_fork()

run_app()
23 changes: 23 additions & 0 deletions frameworks/Python/blacksheep/benchmark_config.json
Original file line number Diff line number Diff line change
Expand Up @@ -46,6 +46,29 @@
"display_name": "blacksheep-nginx-unit",
"versus": "None",
"notes": ""
},
"pypy-socketify": {
"json_url": "/json",
"fortune_url": "/fortunes",
"plaintext_url": "/plaintext",
"db_url": "/db",
"query_url": "/queries?queries=",
"update_url": "/updates?queries=",
"port": 8080,
"approach": "Realistic",
"classification": "Micro",
"framework": "blacksheep",
"language": "Python",
"flavor": "Python3",
"platform": "ASGI",
"webserver": "Socketify.py",
"os": "Linux",
"orm": "Raw",
"database_os": "Linux",
"database": "Postgres",
"display_name": "Blacksheep [Socketify.py PyPy3]",
"versus": "None",
"notes": ""
}
}]
}
18 changes: 12 additions & 6 deletions frameworks/Python/blacksheep/blacksheep-nginx-unit.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,14 +4,20 @@ WORKDIR /blacksheep

COPY ./ /blacksheep

RUN pip3 install -U pip
RUN pip3 install Cython==3.0.12
RUN pip3 install -r /blacksheep/requirements.txt
RUN pip3 install -r /blacksheep/requirements-uvicorn.txt
RUN apt-get update; apt-get install libuv1 -y

RUN chmod +x start-unit.sh
RUN pip3 install -U pip -q
RUN pip3 install Cython==3.0.12 -q
RUN pip3 install -r /blacksheep/requirements.txt -q
RUN pip3 install -r /blacksheep/requirements-uvicorn.txt -q

ENV PGSSLMODE=disable
RUN CORE_COUNT=$(nproc) && \
sed -i "s|\"processes\": [0-9]*|\"processes\": $CORE_COUNT|g" /blacksheep/unit-config.json

RUN chmod +x start-unit.sh
ENTRYPOINT []
EXPOSE 8080
CMD ["./start-unit.sh"]

# CMD ["unitd", "--no-daemon", "--control", "unix:/var/run/control.unit.sock"]
CMD ["./start-unit.sh"]
15 changes: 15 additions & 0 deletions frameworks/Python/blacksheep/blacksheep-pypy-socketify.dockerfile
Original file line number Diff line number Diff line change
@@ -0,0 +1,15 @@
FROM pypy:3.11-bookworm

ADD ./ /blacksheep

WORKDIR /blacksheep

RUN apt-get update; apt-get install libuv1 libpq5 -y

RUN pip3 install -r /blacksheep/requirements.txt
RUN pip3 install -r /blacksheep/requirements-pypy.txt

EXPOSE 8080

CMD python ./app-socketify.py

13 changes: 7 additions & 6 deletions frameworks/Python/blacksheep/blacksheep.dockerfile
Original file line number Diff line number Diff line change
Expand Up @@ -4,12 +4,13 @@ WORKDIR /blacksheep

COPY ./ /blacksheep

RUN pip3 install -U pip
RUN pip3 install Cython==3.0.12
RUN pip3 install -r /blacksheep/requirements.txt
RUN pip3 install -r /blacksheep/requirements-gunicorn.txt
RUN pip3 install -r /blacksheep/requirements-uvicorn.txt
RUN apt-get update; apt-get install libuv1 -y

RUN pip3 install -U pip -q
RUN pip3 install Cython==3.0.12 -q
RUN pip3 install -r /blacksheep/requirements.txt -q
RUN pip3 install -r /blacksheep/requirements-uvicorn.txt -q
ENV GUNICORN=1
EXPOSE 8080

CMD gunicorn app:app -k uvicorn.workers.UvicornWorker -c blacksheep_conf.py
CMD gunicorn app:app -k uvicorn_worker.UvicornWorker -c blacksheep_conf.py
Loading
Loading