1+ """A stateful code interpreter tool that can execute Python code.
2+
3+ This module provides tools to execute Python code in isolated Jupyter kernels
4+ with stateful sessions. It's particularly useful for complex computations,
5+ data analysis, and multi-step programming tasks.
6+
7+ Key Features:
8+ - Stateful execution: Variables and imports persist across multiple calls with the same session_id
9+ - Isolated environment: Each session runs in its own Jupyter kernel for security
10+ - Rich output handling: Captures stdout, expression results, and error messages
11+ - Resource management: Explicit session lifecycle with start/stop controls
12+
13+ Setup:
14+ To use this tool, ensure Jupyter client and ipykernel are installed:
15+
16+ pip install jupyter_client ipykernel
17+
18+ If needed, register the kernel:
19+
20+ python -m ipykernel install --user
21+
22+ Example Usage Scenarios:
23+
24+ 1. Data Analysis:
25+ execute_code(
26+ session_id="data_analysis",
27+ code='''
28+ import pandas as pd
29+ import numpy as np
30+ data = pd.DataFrame({'x': [1, 2, 3, 4, 5], 'y': [2, 4, 6, 8, 10]})
31+ print("Correlation:", data.corr().iloc[0, 1])
32+ print("Mean of x:", data['x'].mean())
33+ '''
34+ )
35+
36+ 2. Multi-step Mathematical Computation:
37+ # Step 1: Define complex function
38+ execute_code(
39+ session_id="math_calc",
40+ code='''
41+ import math
42+ def complex_calculation(x, y):
43+ return math.sin(x) * math.cos(y) + math.sqrt(x**2 + y**2)
44+ '''
45+ )
46+
47+ # Step 2: Use the function
48+ execute_code(
49+ session_id="math_calc",
50+ code='''
51+ result = complex_calculation(3.14, 2.71)
52+ print(f"Result: {result}")
53+ '''
54+ )
55+
56+ 3. Machine Learning Prototype:
57+ execute_code(
58+ session_id="ml_prototype",
59+ code='''
60+ from sklearn.linear_model import LinearRegression
61+ import numpy as np
62+ X = np.array([[1], [2], [3], [4], [5]])
63+ y = np.array([2, 4, 6, 8, 10])
64+ model = LinearRegression().fit(X, y)
65+ prediction = model.predict([[6]])
66+ print(f"Prediction for x=6: {prediction[0]}")
67+ '''
68+ )
69+
70+ 4. Simple Calculations:
71+ execute_code(
72+ session_id="simple_calc",
73+ code="print('Hello, OxyGent!'); result = 10 + 20; print(f'Sum: {result}')"
74+ )
75+
76+ # Clean up when done
77+ stop_session(session_id="simple_calc")
78+
79+ 5. Error Handling:
80+ execute_code(
81+ session_id="error_test",
82+ code="undefined_variable + 1"
83+ )
84+ # Returns: "NameError: name 'undefined_variable' is not defined"
85+
86+ 6. Session State Persistence:
87+ # First call - initialize data
88+ execute_code(
89+ session_id="persistent_session",
90+ code="data = [1, 2, 3, 4, 5]; print('Data initialized')"
91+ )
92+
93+ # Second call - use data from first call
94+ execute_code(
95+ session_id="persistent_session",
96+ code="print('Length:', len(data)); print('Sum:', sum(data))"
97+ )
98+
99+ # Clean up
100+ stop_session(session_id="persistent_session")
101+ """
102+
103+ import asyncio
104+ import logging
105+ import threading
106+ import time
107+ from queue import Empty
108+
109+ from jupyter_client .manager import KernelManager
110+ from pydantic import Field
111+
112+ from oxygent .oxy import FunctionHub
113+
114+ logger = logging .getLogger (__name__ )
115+
116+ code_interpreter_tools = FunctionHub (name = "code_interpreter_tools" )
117+
118+
119+ class CodeInterpreter :
120+ """Synchronous class to manage Jupyter kernels and execute code.
121+
122+ This class handles the lifecycle of Jupyter kernels and provides
123+ thread-safe code execution capabilities. Each session gets its own
124+ isolated kernel environment.
125+ """
126+
127+ def __init__ (self ):
128+ """Initialize the CodeInterpreter with empty sessions dictionary."""
129+ self .sessions : dict [str , dict ] = {}
130+ self ._global_lock = threading .RLock ()
131+
132+ def start_kernel (self , session_id : str ):
133+ """Start a new Jupyter kernel for the given session ID.
134+
135+ If a kernel already exists for this session_id, returns the existing one.
136+
137+ Args:
138+ session_id (str): Unique identifier for the session
139+
140+ Returns:
141+ dict: Session dictionary containing kernel manager, client, and lock
142+
143+ Raises:
144+ RuntimeError: If kernel fails to start
145+ """
146+ with self ._global_lock :
147+ session = self .sessions .get (session_id )
148+ if session :
149+ return session
150+ km = KernelManager ()
151+ try :
152+ km .start_kernel ()
153+ client = km .client ()
154+ client .start_channels ()
155+ # Wait for the kernel to be ready to avoid first-call race
156+ try :
157+ # Some client impls provide wait_for_ready
158+ wait_for_ready = getattr (client , "wait_for_ready" , None )
159+ if callable (wait_for_ready ):
160+ wait_for_ready (timeout = 30 )
161+ else :
162+ # Fallback: small grace period to allow kernel to initialize
163+ time .sleep (0.5 )
164+ except Exception as e :
165+ logger .debug ("Kernel wait_for_ready encountered an issue: %s" , e )
166+ except Exception as e :
167+ logger .error ("Failed to start kernel for session %s: %s" , session_id , e )
168+ raise RuntimeError (f"Error starting Jupyter kernel: { e } " ) from e
169+ session = {
170+ "kernel_manager" : km ,
171+ "client" : client ,
172+ "lock" : threading .RLock (),
173+ }
174+ self .sessions [session_id ] = session
175+ return session
176+
177+ def stop_kernel (self , session_id : str ):
178+ """Stop and cleanup the Jupyter kernel for the given session ID.
179+
180+ Args:
181+ session_id (str): Unique identifier for the session to stop
182+ """
183+ with self ._global_lock :
184+ if session_id in self .sessions :
185+ session = self .sessions [session_id ]
186+ try :
187+ session ["client" ].stop_channels ()
188+ except Exception as e :
189+ logger .debug ("stop_channels error for %s: %s" , session_id , e )
190+ try :
191+ session ["kernel_manager" ].shutdown_kernel ()
192+ except Exception as e :
193+ logger .debug ("shutdown_kernel error for %s: %s" , session_id , e )
194+ del self .sessions [session_id ]
195+
196+ def _collect_outputs (self , client , msg_id : str , total_timeout : float = 30.0 ) -> str :
197+ """Collect all output messages from the kernel execution.
198+
199+ Args:
200+ client: Jupyter client instance
201+ msg_id (str): Message ID to track
202+ total_timeout (float): Maximum time to wait for output
203+
204+ Returns:
205+ str: Combined output from all messages
206+ """
207+ outputs : list [str ] = []
208+ deadline = time .time () + total_timeout
209+ saw_idle = False
210+ while time .time () < deadline :
211+ try :
212+ msg = client .get_iopub_msg (timeout = 0.5 )
213+ except Empty :
214+ continue
215+ except Exception as e :
216+ logger .debug ("get_iopub_msg error: %s" , e )
217+ continue
218+
219+ msg_type = msg .get ("header" , {}).get ("msg_type" )
220+ parent_id = msg .get ("parent_header" , {}).get ("msg_id" )
221+
222+ # 'status' messages may not always carry the same parent id
223+ if msg_type != "status" and parent_id != msg_id :
224+ continue
225+
226+ if msg_type == "status" :
227+ state = msg .get ("content" , {}).get ("execution_state" )
228+ if state == "idle" :
229+ saw_idle = True
230+ break
231+ continue
232+
233+ if msg_type == "stream" :
234+ outputs .append (msg .get ("content" , {}).get ("text" , "" ))
235+ elif msg_type in ("execute_result" , "display_data" ):
236+ data = msg .get ("content" , {}).get ("data" , {})
237+ text = data .get ("text/plain" )
238+ if text :
239+ outputs .append (text )
240+ elif msg_type == "error" :
241+ ename = msg .get ("content" , {}).get ("ename" , "" )
242+ evalue = msg .get ("content" , {}).get ("evalue" , "" )
243+ outputs .append (f"{ ename } : { evalue } " )
244+
245+ # Best-effort: ensure we have received execute_reply for our message
246+ try :
247+ while time .time () < deadline :
248+ reply = client .get_shell_msg (timeout = 0.1 )
249+ if reply .get ("parent_header" , {}).get ("msg_id" ) == msg_id :
250+ break
251+ except Exception :
252+ pass
253+
254+ return "\n " .join ([o for o in outputs if o ]).strip ()
255+
256+ def execute_code (self , session_id : str , code : str ) -> str :
257+ """Execute Python code in the specified session's kernel.
258+
259+ Args:
260+ session_id (str): Session identifier
261+ code (str): Python code to execute
262+
263+ Returns:
264+ str: Output from code execution
265+
266+ Raises:
267+ RuntimeError: If kernel fails to start or execute code
268+ """
269+ session = self .start_kernel (session_id )
270+ client = session ["client" ]
271+ # Serialize execution per session to prevent concurrent reads on client queues
272+ with session ["lock" ]:
273+ msg_id = client .execute (code )
274+ return self ._collect_outputs (client , msg_id )
275+
276+
277+ code_interpreter_instance = CodeInterpreter ()
278+
279+
280+ @code_interpreter_tools .tool (
281+ description = "Executes Python code in a stateful session. Use the same session_id to maintain state across multiple calls."
282+ )
283+ async def execute_code (
284+ session_id : str = Field (
285+ description = "The identifier for the execution session. All code with the same session_id will run in the same environment."
286+ ),
287+ code : str = Field (description = "The Python code to execute." ),
288+ ) -> str :
289+ """Execute Python code in a stateful Jupyter kernel session.
290+
291+ This tool runs Python code in an isolated Jupyter kernel, allowing for
292+ stateful execution across multiple calls with the same session_id.
293+
294+ Args:
295+ session_id (str): Identifier for the execution session. All code with
296+ the same session_id shares the same kernel environment.
297+ code (str): Python code to execute. Can be multiple lines.
298+
299+ Returns:
300+ str: The output from the code execution, including printed text,
301+ expression results, and error messages.
302+
303+ Example:
304+ >>> execute_code(session_id="calc", code="x = 5; y = 10; print(x + y)")
305+ '15'
306+
307+ >>> execute_code(session_id="calc", code="print(x * y)") # Uses variables from previous call
308+ '50'
309+
310+ Note:
311+ - Variables and imports persist within the same session
312+ - Each session runs in an isolated Jupyter kernel
313+ - Remember to call stop_session when finished to free resources
314+ - Errors are returned as formatted strings, not raised as exceptions
315+ """
316+ loop = asyncio .get_running_loop ()
317+ if not session_id or not isinstance (session_id , str ):
318+ return "Error: 'session_id' must be a non-empty string"
319+ if not code or not isinstance (code , str ):
320+ return "Error: 'code' must be a non-empty string"
321+ try :
322+ result = await loop .run_in_executor (
323+ None ,
324+ code_interpreter_instance .execute_code ,
325+ session_id ,
326+ code ,
327+ )
328+ return result
329+ except Exception as e :
330+ logger .warning ("Code execution failed for session %s: %s" , session_id , e )
331+ return f"Error: { e } "
332+
333+
334+ @code_interpreter_tools .tool (description = "Stops a session and cleans up its resources." )
335+ async def stop_session (
336+ session_id : str = Field (
337+ description = "The identifier for the execution session to stop."
338+ ),
339+ ) -> str :
340+ """Stop a session and clean up its resources.
341+
342+ This tool terminates the Jupyter kernel associated with the session
343+ and frees up all resources.
344+
345+ Args:
346+ session_id (str): Identifier for the session to stop.
347+
348+ Returns:
349+ str: Confirmation message that the session has been stopped.
350+
351+ Example:
352+ >>> stop_session(session_id="calc")
353+ 'Session calc stopped.'
354+
355+ Note:
356+ - Always call this when finished with a session to free resources
357+ - Once stopped, the session cannot be resumed
358+ - Errors are returned as formatted strings, not raised as exceptions
359+ """
360+ loop = asyncio .get_running_loop ()
361+ try :
362+ await loop .run_in_executor (None , code_interpreter_instance .stop_kernel , session_id )
363+ return f"Session { session_id } stopped."
364+ except Exception as e :
365+ logger .warning ("Failed to stop session %s: %s" , session_id , e )
366+ return f"Error: { e } "
0 commit comments