Skip to content

feat: Create Individual Virtual Env for the Kernel #74

New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Open
wants to merge 7 commits into
base: main
Choose a base branch
from
Open
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension

Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
1 change: 1 addition & 0 deletions .gitignore
Original file line number Diff line number Diff line change
Expand Up @@ -4,6 +4,7 @@

# Program related
process_pids/
kernel.*
kernel_connection_file.json

# Python stuff
Expand Down
3 changes: 2 additions & 1 deletion frontend/src/App.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -53,7 +53,7 @@ function App() {
])
);
let [waitingForSystem, setWaitingForSystem] = useState<WaitingStates>(
WaitingStates.Idle
WaitingStates.StartingKernel
);
const chatScrollRef = React.useRef<HTMLDivElement>(null);

Expand All @@ -78,6 +78,7 @@ function App() {
const handleCommand = (command: string) => {
if (command == "reset") {
addMessage({ text: "Restarting the kernel.", type: "message", role: "system" });
setWaitingForSystem(WaitingStates.StartingKernel);

fetch(`${Config.API_ADDRESS}/restart`, {
method: "POST",
Expand Down
1 change: 1 addition & 0 deletions frontend/src/components/Chat.tsx
Original file line number Diff line number Diff line change
Expand Up @@ -94,6 +94,7 @@ function Message(props: {


export enum WaitingStates {
StartingKernel = "Starting Kernel",
GeneratingCode = "Generating code",
RunningCode = "Running code",
UploadingFile = "Uploading file",
Expand Down
2 changes: 1 addition & 1 deletion gpt_code_ui/kernel_program/config.py
Original file line number Diff line number Diff line change
Expand Up @@ -15,4 +15,4 @@ def get_logger():
logger = logging.getLogger(__name__)
if "DEBUG" in os.environ:
logger.setLevel(logging.DEBUG)
return logger
return logger
166 changes: 112 additions & 54 deletions gpt_code_ui/kernel_program/kernel_manager.py
Original file line number Diff line number Diff line change
@@ -1,14 +1,16 @@
import sys
import subprocess
import os
import shutil
import atexit
import queue
import json
import signal
import pathlib
import threading
import time
import atexit
import traceback
import venv

from time import sleep
from jupyter_client import BlockingKernelClient
Expand Down Expand Up @@ -56,7 +58,7 @@ def cleanup_spawned_processes():
os.kill(pid, signal.CTRL_BREAK_EVENT)
else:
os.kill(pid, signal.SIGKILL)

# After successful kill, cleanup pid file
os.remove(fp)

Expand Down Expand Up @@ -128,29 +130,27 @@ def flush_kernel_msgs(kc, tries=1, timeout=0.2):
while True:
try:
msg = kc.get_iopub_msg(timeout=timeout)
if msg["msg_type"] == "execute_result":
if "text/plain" in msg["content"]["data"]:
send_message(
msg["content"]["data"]["text/plain"], "message_raw"
)
if msg["msg_type"] == "display_data":
if "image/png" in msg["content"]["data"]:
# Convert to Slack upload
send_message(
msg["content"]["data"]["image/png"],
message_type="image/png",
)
elif "text/plain" in msg["content"]["data"]:
send_message(msg["content"]["data"]["text/plain"])

elif msg["msg_type"] == "stream":
logger.debug("Received stream output %s" % msg["content"]["text"])
send_message(msg["content"]["text"])
elif msg["msg_type"] == "error":
send_message(
utils.escape_ansi("\n".join(msg["content"]["traceback"])),
"message_raw",
)
msg_type = msg["msg_type"]
msg_content = msg["content"]

logger.debug(f'Received "{msg_type}" output: {msg_content}')

if msg_type in ("execute_result", "display_data"):
content_data = msg_content["data"]

if "image/png" in content_data:
send_message(content_data["image/png"], message_type="image/png")
elif "image/jpeg" in content_data:
send_message(content_data["image/jpeg"], message_type="image/jpeg")
elif "text/plain" in content_data:
send_message(content_data["text/plain"], "message_raw" if msg_type == "execute_result" else "message")

elif msg_type == "stream":
send_message(msg_content["text"])

elif msg_type == "error":
send_message(utils.escape_ansi("\n".join(msg["content"]["traceback"])), "message_error")

except queue.Empty:
hit_empty += 1
if hit_empty == tries:
Expand All @@ -167,58 +167,116 @@ def flush_kernel_msgs(kc, tries=1, timeout=0.2):
logger.debug(f"{e} [{type(e)}")


def start_kernel():
kernel_connection_file = os.path.join(os.getcwd(), "kernel_connection_file.json")
def create_venv(venv_dir: pathlib.Path, install_default_packages: bool) -> pathlib.Path:
venv_bindir = venv_dir / 'bin'
venv_python_executable = venv_bindir / os.path.basename(sys.executable)

if os.path.isfile(kernel_connection_file):
os.remove(kernel_connection_file)
if os.path.isdir(kernel_connection_file):
os.rmdir(kernel_connection_file)
if not os.path.isdir(venv_dir):
# create virtual env inside venv_dir directory
venv.create(venv_dir, system_site_packages=True, with_pip=True, upgrade_deps=True)

launch_kernel_script_path = os.path.join(
pathlib.Path(__file__).parent.resolve(), "launch_kernel.py"
)
if install_default_packages:
# install wheel because some packages do not like being installed without
subprocess.run([venv_python_executable, '-m', 'pip', 'install', 'wheel>=0.41,<1.0'])
# install all default packages into the venv
default_packages = [
"ipykernel>=6,<7",
"numpy>=1.24,<1.25",
"dateparser>=1.1,<1.2",
"pandas>=1.5,<1.6",
"geopandas>=0.13,<0.14",
"tabulate>=0.9.0<1.0",
"PyPDF2>=3.0,<3.1",
"pdfminer>=20191125,<20191200",
"pdfplumber>=0.9,<0.10",
"matplotlib>=3.7,<3.8",
"openpyxl>=3.1.2,<4",
]
subprocess.run([venv_python_executable, '-m', 'pip', 'install'] + default_packages)

# get base env library path as we need this to refer to this form a derived venv
site_packages = subprocess.check_output([venv_python_executable, '-c', 'import sysconfig; print(sysconfig.get_paths()["purelib"])'])
site_packages = site_packages.decode('utf-8').split('\n')[0]

return pathlib.Path(site_packages)


def create_derived_venv(base_venv: pathlib.Path, venv_dir: pathlib.Path):
site_packages_base = create_venv(base_venv, install_default_packages=True)
site_packages_derived = create_venv(venv_dir, install_default_packages=False)

os.makedirs('workspace/', exist_ok=True)
# create a link from derived venv into the base venv, see https://stackoverflow.com/a/75545634
with open(site_packages_derived / '_base_packages.pth', 'w') as pth:
pth.write(f'{site_packages_base}\n')

venv_bindir = venv_dir / 'bin'
venv_python_executable = venv_bindir / os.path.basename(sys.executable)

return venv_bindir, venv_python_executable


def start_kernel(id: str):
cwd = pathlib.Path(os.getcwd())
kernel_dir = cwd / f'kernel.{id}'
base_dir = cwd / 'kernel.base'

# Cleanup potential leftovers
shutil.rmtree(kernel_dir, ignore_errors=True)
os.makedirs(kernel_dir)

kernel_env = os.environ.copy()
kernel_connection_file = kernel_dir / "kernel_connection_file.json"
launch_kernel_script_path = pathlib.Path(__file__).parent.resolve() / "launch_kernel.py"

kernel_venv_dir = kernel_dir / 'venv'
kernel_venv_bindir, kernel_python_executable = create_derived_venv(base_dir, kernel_venv_dir)
kernel_env['PATH'] = str(kernel_venv_bindir) + os.pathsep + kernel_env['PATH']

# start the kernel using the virtual env python executable
kernel_process = subprocess.Popen(
[
sys.executable,
kernel_python_executable,
launch_kernel_script_path,
"--IPKernelApp.connection_file",
kernel_connection_file,
"--matplotlib=inline",
"--quiet",
],
cwd='workspace/'
cwd=kernel_dir,
env=kernel_env,
)
# Write PID for caller to kill
str_kernel_pid = str(kernel_process.pid)
os.makedirs(config.KERNEL_PID_DIR, exist_ok=True)
with open(os.path.join(config.KERNEL_PID_DIR, str_kernel_pid + ".pid"), "w") as p:
p.write("kernel")

utils.store_pid(kernel_process.pid, "kernel")

# Wait for kernel connection file to be written
while True:
if not os.path.isfile(kernel_connection_file):
try:
with open(kernel_connection_file, 'r') as fp:
json.load(fp)
except (FileNotFoundError, json.JSONDecodeError):
# Either file was not yet there or incomplete (then JSON parsing failed)
sleep(0.1)
pass
else:
# Keep looping if JSON parsing fails, file may be partially written
try:
with open(kernel_connection_file, 'r') as fp:
json.load(fp)
break
except json.JSONDecodeError:
pass
break

# Client
kc = BlockingKernelClient(connection_file=kernel_connection_file)
kc = BlockingKernelClient(connection_file=str(kernel_connection_file))
kc.load_connection_file()
kc.start_channels()
kc.wait_for_ready()
return kc
return kc, kernel_dir


if __name__ == "__main__":
kc = start_kernel()
start_snakemq(kc)
try:
kernel_id = sys.argv[1]
except IndexError as e:
logger.exception('Missing kernel ID command line parameter', e)
else:
kc, kernel_dir = start_kernel(id=kernel_id)

# make sure the dir with the virtualenv will be deleted after kernel termination
atexit.register(lambda: shutil.rmtree(kernel_dir, ignore_errors=True))

start_snakemq(kc)
2 changes: 1 addition & 1 deletion gpt_code_ui/kernel_program/launch_kernel.py
Original file line number Diff line number Diff line change
@@ -1,4 +1,4 @@
if __name__ == "__main__":
from ipykernel import kernelapp as app

app.launch_new_instance()
app.launch_new_instance()
34 changes: 17 additions & 17 deletions gpt_code_ui/kernel_program/main.py
Original file line number Diff line number Diff line change
Expand Up @@ -7,7 +7,6 @@
import time

import asyncio
import json
import threading

from queue import Queue
Expand Down Expand Up @@ -47,24 +46,26 @@
app = Flask(__name__)
CORS(app)


def start_kernel_manager():
global kernel_manager_process

kernel_manager_script_path = os.path.join(
pathlib.Path(__file__).parent.resolve(), "kernel_manager.py"
)
kernel_manager_process = subprocess.Popen(
[sys.executable, kernel_manager_script_path]
)
kernel_manager_process = subprocess.Popen([
sys.executable,
kernel_manager_script_path,
'workspace', # This will be used as part of the folder name for the workspace and to create the venv inside. Can be anything, but using 'workspace' makes file up-/download very simple
])

utils.store_pid(kernel_manager_process.pid, "kernel_manager")

# Write PID as <pid>.pid to config.KERNEL_PID_DIR
os.makedirs(config.KERNEL_PID_DIR, exist_ok=True)
with open(os.path.join(config.KERNEL_PID_DIR, "%d.pid" % kernel_manager_process.pid), "w") as p:
p.write("kernel_manager")

def cleanup_kernel_program():
kernel_manager.cleanup_spawned_processes()


async def start_snakemq():
global messaging

Expand All @@ -77,7 +78,7 @@ def on_recv(conn, ident, message):
if message["value"] == "ready":
logger.debug("Kernel is ready.")
result_queue.put({
"value":"Kernel is ready.",
"value": "Kernel is ready.",
"type": "message"
})

Expand All @@ -97,8 +98,9 @@ def send_queued_messages():
while True:
if send_queue.qsize() > 0:
message = send_queue.get()
utils.send_json(messaging,
{"type": "execute", "value": message["command"]},
utils.send_json(
messaging,
{"type": "execute", "value": message["command"]},
config.IDENT_KERNEL_MANAGER
)
time.sleep(0.1)
Expand All @@ -117,7 +119,7 @@ async def async_link_loop():

@app.route("/api", methods=["POST", "GET"])
def handle_request():

if request.method == "GET":
# Handle GET requests by sending everything that's in the receive_queue
results = [result_queue.get() for _ in range(result_queue.qsize())]
Expand All @@ -128,7 +130,8 @@ def handle_request():
send_queue.put(data)

return jsonify({"result": "success"})



@app.route("/restart", methods=["POST"])
def handle_restart():

Expand All @@ -152,9 +155,6 @@ async def main():
def run_flask_app():
app.run(host="0.0.0.0", port=APP_PORT)


if __name__ == "__main__":
asyncio.run(main())




Loading