Skip to content

Commit 66101c9

Browse files
k11kirkyTwixes
andauthored
Feat: Add llm observability to python sdk (#158)
Co-authored-by: Michael Matloka <[email protected]>
1 parent 05932b3 commit 66101c9

File tree

5 files changed

+657
-0
lines changed

5 files changed

+657
-0
lines changed

llm_observability_examples.py

+149
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,149 @@
1+
import os
2+
import uuid
3+
4+
import posthog
5+
from posthog.ai import AsyncOpenAI, OpenAI
6+
7+
# Example credentials - replace these with your own or use environment variables
8+
posthog.project_api_key = os.getenv("POSTHOG_PROJECT_API_KEY", "your-project-api-key")
9+
posthog.personal_api_key = os.getenv("POSTHOG_PERSONAL_API_KEY", "your-personal-api-key")
10+
posthog.host = os.getenv("POSTHOG_HOST", "http://localhost:8000") # Or https://app.posthog.com
11+
posthog.debug = True
12+
13+
openai_client = OpenAI(
14+
api_key=os.getenv("OPENAI_API_KEY", "your-openai-api-key"),
15+
posthog_client=posthog,
16+
)
17+
18+
async_openai_client = AsyncOpenAI(
19+
api_key=os.getenv("OPENAI_API_KEY", "your-openai-api-key"),
20+
posthog_client=posthog,
21+
)
22+
23+
24+
def main_sync():
25+
trace_id = str(uuid.uuid4())
26+
print("Trace ID:", trace_id)
27+
distinct_id = "test_distinct_id"
28+
properties = {"test_property": "test_value"}
29+
30+
try:
31+
basic_openai_call(distinct_id, trace_id, properties)
32+
streaming_openai_call(distinct_id, trace_id, properties)
33+
non_instrumented_openai_call()
34+
except Exception as e:
35+
print("Error during OpenAI call:", str(e))
36+
37+
38+
async def main_async():
39+
trace_id = str(uuid.uuid4())
40+
print("Trace ID:", trace_id)
41+
distinct_id = "test_distinct_id"
42+
properties = {"test_property": "test_value"}
43+
44+
try:
45+
await basic_async_openai_call(distinct_id, trace_id, properties)
46+
await streaming_async_openai_call(distinct_id, trace_id, properties)
47+
except Exception as e:
48+
print("Error during OpenAI call:", str(e))
49+
50+
51+
def basic_openai_call(distinct_id, trace_id, properties):
52+
response = openai_client.chat.completions.create(
53+
model="gpt-4o-mini",
54+
messages=[
55+
{"role": "system", "content": "You are a complex problem solver."},
56+
{"role": "user", "content": "Explain quantum computing in simple terms."},
57+
],
58+
max_tokens=100,
59+
temperature=0.7,
60+
posthog_distinct_id=distinct_id,
61+
posthog_trace_id=trace_id,
62+
posthog_properties=properties,
63+
)
64+
print(response)
65+
if response and response.choices:
66+
print("OpenAI response:", response.choices[0].message.content)
67+
else:
68+
print("No response or unexpected format returned.")
69+
return response
70+
71+
72+
async def basic_async_openai_call(distinct_id, trace_id, properties):
73+
response = await async_openai_client.chat.completions.create(
74+
model="gpt-4o-mini",
75+
messages=[
76+
{"role": "system", "content": "You are a complex problem solver."},
77+
{"role": "user", "content": "Explain quantum computing in simple terms."},
78+
],
79+
max_tokens=100,
80+
temperature=0.7,
81+
posthog_distinct_id=distinct_id,
82+
posthog_trace_id=trace_id,
83+
posthog_properties=properties,
84+
)
85+
if response and hasattr(response, "choices"):
86+
print("OpenAI response:", response.choices[0].message.content)
87+
else:
88+
print("No response or unexpected format returned.")
89+
return response
90+
91+
92+
def streaming_openai_call(distinct_id, trace_id, properties):
93+
94+
response = openai_client.chat.completions.create(
95+
model="gpt-4o-mini",
96+
messages=[
97+
{"role": "system", "content": "You are a complex problem solver."},
98+
{"role": "user", "content": "Explain quantum computing in simple terms."},
99+
],
100+
max_tokens=100,
101+
temperature=0.7,
102+
stream=True,
103+
posthog_distinct_id=distinct_id,
104+
posthog_trace_id=trace_id,
105+
posthog_properties=properties,
106+
)
107+
108+
for chunk in response:
109+
if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0:
110+
print(chunk.choices[0].delta.content or "", end="")
111+
112+
return response
113+
114+
115+
async def streaming_async_openai_call(distinct_id, trace_id, properties):
116+
response = await async_openai_client.chat.completions.create(
117+
model="gpt-4o-mini",
118+
messages=[
119+
{"role": "system", "content": "You are a complex problem solver."},
120+
{"role": "user", "content": "Explain quantum computing in simple terms."},
121+
],
122+
max_tokens=100,
123+
temperature=0.7,
124+
stream=True,
125+
posthog_distinct_id=distinct_id,
126+
posthog_trace_id=trace_id,
127+
posthog_properties=properties,
128+
)
129+
130+
async for chunk in response:
131+
if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0:
132+
print(chunk.choices[0].delta.content or "", end="")
133+
134+
return response
135+
136+
137+
def non_instrumented_openai_call():
138+
response = openai_client.images.generate(model="dall-e-3", prompt="A cute baby hedgehog", n=1, size="1024x1024")
139+
print(response)
140+
return response
141+
142+
143+
# HOW TO RUN:
144+
# comment out one of these to run the other
145+
146+
if __name__ == "__main__":
147+
main_sync()
148+
149+
# asyncio.run(main_async())

posthog/ai/__init__.py

+4
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,4 @@
1+
from .providers.openai.openai import OpenAI
2+
from .providers.openai.openai_async import AsyncOpenAI
3+
4+
__all__ = ["OpenAI", "AsyncOpenAI"]

posthog/ai/providers/openai/openai.py

+167
Original file line numberDiff line numberDiff line change
@@ -0,0 +1,167 @@
1+
import time
2+
import uuid
3+
from typing import Any, Dict, Optional
4+
5+
import openai.resources
6+
7+
try:
8+
import openai
9+
except ImportError:
10+
raise ModuleNotFoundError("Please install the OpenAI SDK to use this feature: 'pip install openai'")
11+
12+
from posthog.ai.utils import call_llm_and_track_usage, get_model_params
13+
from posthog.client import Client as PostHogClient
14+
15+
16+
class OpenAI(openai.OpenAI):
17+
"""
18+
A wrapper around the OpenAI SDK that automatically sends LLM usage events to PostHog.
19+
"""
20+
21+
_ph_client: PostHogClient
22+
23+
def __init__(self, posthog_client: PostHogClient, **kwargs):
24+
"""
25+
Args:
26+
api_key: OpenAI API key.
27+
posthog_client: If provided, events will be captured via this client instead
28+
of the global posthog.
29+
**openai_config: Any additional keyword args to set on openai (e.g. organization="xxx").
30+
"""
31+
super().__init__(**kwargs)
32+
self._ph_client = posthog_client
33+
self.chat = WrappedChat(self)
34+
35+
36+
class WrappedChat(openai.resources.chat.Chat):
37+
_client: OpenAI
38+
39+
@property
40+
def completions(self):
41+
return WrappedCompletions(self._client)
42+
43+
44+
class WrappedCompletions(openai.resources.chat.completions.Completions):
45+
_client: OpenAI
46+
47+
def create(
48+
self,
49+
posthog_distinct_id: Optional[str] = None,
50+
posthog_trace_id: Optional[str] = None,
51+
posthog_properties: Optional[Dict[str, Any]] = None,
52+
**kwargs: Any,
53+
):
54+
distinct_id = posthog_distinct_id or uuid.uuid4()
55+
56+
if kwargs.get("stream", False):
57+
return self._create_streaming(
58+
distinct_id,
59+
posthog_trace_id,
60+
posthog_properties,
61+
**kwargs,
62+
)
63+
64+
return call_llm_and_track_usage(
65+
distinct_id,
66+
self._client._ph_client,
67+
posthog_trace_id,
68+
posthog_properties,
69+
self._client.base_url,
70+
super().create,
71+
**kwargs,
72+
)
73+
74+
def _create_streaming(
75+
self,
76+
distinct_id: str,
77+
posthog_trace_id: Optional[str],
78+
posthog_properties: Optional[Dict[str, Any]],
79+
**kwargs: Any,
80+
):
81+
start_time = time.time()
82+
usage_stats: Dict[str, int] = {}
83+
accumulated_content = []
84+
if "stream_options" not in kwargs:
85+
kwargs["stream_options"] = {}
86+
kwargs["stream_options"]["include_usage"] = True
87+
response = super().create(**kwargs)
88+
89+
def generator():
90+
nonlocal usage_stats
91+
nonlocal accumulated_content
92+
try:
93+
for chunk in response:
94+
if hasattr(chunk, "usage") and chunk.usage:
95+
usage_stats = {
96+
k: getattr(chunk.usage, k, 0)
97+
for k in [
98+
"prompt_tokens",
99+
"completion_tokens",
100+
"total_tokens",
101+
]
102+
}
103+
104+
if hasattr(chunk, "choices") and chunk.choices and len(chunk.choices) > 0:
105+
content = chunk.choices[0].delta.content
106+
if content:
107+
accumulated_content.append(content)
108+
109+
yield chunk
110+
111+
finally:
112+
end_time = time.time()
113+
latency = end_time - start_time
114+
output = "".join(accumulated_content)
115+
self._capture_streaming_event(
116+
distinct_id,
117+
posthog_trace_id,
118+
posthog_properties,
119+
kwargs,
120+
usage_stats,
121+
latency,
122+
output,
123+
)
124+
125+
return generator()
126+
127+
def _capture_streaming_event(
128+
self,
129+
distinct_id: str,
130+
posthog_trace_id: Optional[str],
131+
posthog_properties: Optional[Dict[str, Any]],
132+
kwargs: Dict[str, Any],
133+
usage_stats: Dict[str, int],
134+
latency: float,
135+
output: str,
136+
):
137+
if posthog_trace_id is None:
138+
posthog_trace_id = uuid.uuid4()
139+
140+
event_properties = {
141+
"$ai_provider": "openai",
142+
"$ai_model": kwargs.get("model"),
143+
"$ai_model_parameters": get_model_params(kwargs),
144+
"$ai_input": kwargs.get("messages"),
145+
"$ai_output": {
146+
"choices": [
147+
{
148+
"content": output,
149+
"role": "assistant",
150+
}
151+
]
152+
},
153+
"$ai_request_url": str(self._client.base_url.join("chat/completions")),
154+
"$ai_http_status": 200,
155+
"$ai_input_tokens": usage_stats.get("prompt_tokens", 0),
156+
"$ai_output_tokens": usage_stats.get("completion_tokens", 0),
157+
"$ai_latency": latency,
158+
"$ai_trace_id": posthog_trace_id,
159+
"$ai_posthog_properties": posthog_properties,
160+
}
161+
162+
if hasattr(self._client._ph_client, "capture"):
163+
self._client._ph_client.capture(
164+
distinct_id=distinct_id,
165+
event="$ai_generation",
166+
properties=event_properties,
167+
)

0 commit comments

Comments
 (0)