Skip to content
New issue

Have a question about this project? Sign up for a free GitHub account to open an issue and contact its maintainers and the community.

By clicking “Sign up for GitHub”, you agree to our terms of service and privacy statement. We’ll occasionally send you account related emails.

Already on GitHub? Sign in to your account

Python: draft initial implementation of Realtime API #10127

Draft
wants to merge 4 commits into
base: main
Choose a base branch
from
Draft
Show file tree
Hide file tree
Changes from all commits
Commits
File filter

Filter by extension

Filter by extension


Conversations
Failed to load comments.
Loading
Jump to
Jump to file
Failed to load files.
Loading
Diff view
Diff view
3 changes: 3 additions & 0 deletions python/pyproject.toml
Original file line number Diff line number Diff line change
Expand Up @@ -123,6 +123,9 @@ dapr = [
"dapr-ext-fastapi>=1.14.0",
"flask-dapr>=1.14.0"
]
openai_realtime = [
"openai[realtime] ~= 1.0"
]

[tool.uv]
prerelease = "if-necessary-or-explicit"
Expand Down
192 changes: 192 additions & 0 deletions python/samples/concepts/audio/04-chat_with_realtime_api.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,192 @@
# Copyright (c) Microsoft. All rights reserved.
import asyncio
import contextlib
import logging
import signal
from typing import Any

from openai.types.beta.realtime.realtime_server_event import RealtimeServerEvent

from samples.concepts.audio.audio_player_async import AudioPlayerAsync
from samples.concepts.audio.audio_recorder_stream import AudioRecorderStream
from semantic_kernel import Kernel
from semantic_kernel.connectors.ai import FunctionChoiceBehavior
from semantic_kernel.connectors.ai.open_ai import (
OpenAIRealtime,
OpenAIRealtimeExecutionSettings,
TurnDetection,
)
from semantic_kernel.connectors.ai.prompt_execution_settings import PromptExecutionSettings
from semantic_kernel.connectors.ai.realtime_client_base import RealtimeClientBase
from semantic_kernel.contents import AudioContent, ChatHistory, StreamingTextContent
from semantic_kernel.functions import kernel_function

logging.basicConfig(level=logging.WARNING)
logger = logging.getLogger(__name__)

# This simple sample demonstrates how to use the OpenAI Realtime API to create
# a chat bot that can listen and respond directly through audio.
# It requires installing:
# - semantic-kernel[openai_realtime]
# - pyaudio
# - sounddevice
# - pydub
# e.g. pip install semantic-kernel[openai_realtime] pyaudio sounddevice pydub

# The characterics of your speaker and microphone are a big factor in a smooth conversation
# so you may need to try out different devices for each.
# you can also play around with the turn_detection settings to get the best results.
# It has device id's set in the AudioRecorderStream and AudioPlayerAsync classes,
# so you may need to adjust these for your system.
# you can check the available devices by uncommenting line below the function


def check_audio_devices():
import sounddevice as sd # type: ignore

print(sd.query_devices())


# check_audio_devices()


class Speaker:
"""This is a simple class that opens the session with the realtime api and plays the audio response.

At the same time it prints the transcript of the conversation to the console.
"""

def __init__(self, audio_player: AudioPlayerAsync, realtime_client: RealtimeClientBase, kernel: Kernel):
self.audio_player = audio_player
self.realtime_client = realtime_client
self.kernel = kernel

async def play(
self,
chat_history: ChatHistory,
settings: OpenAIRealtimeExecutionSettings,
print_transcript: bool = True,
) -> None:
# reset the frame count for the audio player
self.audio_player.reset_frame_count()
# open the connection to the realtime api
async with self.realtime_client as client:
# update the session with the chat_history and settings
await client.update_session(settings=settings, chat_history=chat_history)
# print the start message of the transcript
if print_transcript:
print("Mosscap (transcript): ", end="")
try:
# start listening for events
async for content in self.realtime_client.event_listener(settings=settings, kernel=self.kernel):
if not content:
continue
# the contents returned should be StreamingChatMessageContent
# so we will loop through the items within it.
for item in content.items:
match item:
case StreamingTextContent():
if print_transcript:
print(item.text, end="")
await asyncio.sleep(0.01)
continue
case AudioContent():
self.audio_player.add_data(item.data)
await asyncio.sleep(0.01)
continue
except asyncio.CancelledError:
print("\nThanks for talking to Mosscap!")


class Microphone:
"""This is a simple class that opens the microphone and sends the audio to the realtime api."""

def __init__(self, audio_recorder: AudioRecorderStream, realtime_client: RealtimeClientBase):
self.audio_recorder = audio_recorder
self.realtime_client = realtime_client

async def record_audio(self):
with contextlib.suppress(asyncio.CancelledError):
async for content in self.audio_recorder.stream_audio_content():
if content.data:
await self.realtime_client.send_event(
"input_audio_buffer.append",
content=content,
)
await asyncio.sleep(0.01)


# this function is used to stop the processes when ctrl + c is pressed
def signal_handler():
for task in asyncio.all_tasks():
task.cancel()


@kernel_function
def get_weather(location: str) -> str:
"""Get the weather for a location."""
logger.debug(f"Getting weather for {location}")
return f"The weather in {location} is sunny."


def response_created_callback(
event: RealtimeServerEvent, settings: PromptExecutionSettings | None = None, **kwargs: Any
) -> None:
"""Add a empty print to start a new line for a new response."""
print("")


async def main() -> None:
# setup the asyncio loop with the signal event handler
loop = asyncio.get_event_loop()
loop.add_signal_handler(signal.SIGINT, signal_handler)

# create the Kernel and add a simple function for function calling.
kernel = Kernel()
kernel.add_function(plugin_name="weather", function_name="get_weather", function=get_weather)

# create the realtime client and register the response created callback
realtime_client = OpenAIRealtime(ai_model_id="gpt-4o-realtime-preview-2024-12-17")
realtime_client.register_event_handler("response.created", response_created_callback)

# create the speaker and microphone
speaker = Speaker(AudioPlayerAsync(device_id=7), realtime_client, kernel)
microphone = Microphone(AudioRecorderStream(device_id=2), realtime_client)

# Create the settings for the session
# the key thing to decide on is to enable the server_vad turn detection
# if turn is turned off (by setting turn_detection=None), you will have to send
# the "input_audio_buffer.commit" and "response.create" event to the realtime api
# to signal the end of the user's turn and start the response.

# The realtime api, does not use a system message, but takes instructions as a parameter for a session
instructions = """
You are a chat bot. Your name is Mosscap and
you have one goal: figure out what people need.
Your full name, should you need to know it, is
Splendid Speckled Mosscap. You communicate
effectively, but you tend to answer with long
flowery prose.
"""
# but we can add a chat history to conversation after starting it
chat_history = ChatHistory()
chat_history.add_user_message("Hi there, who are you?")
chat_history.add_assistant_message("I am Mosscap, a chat bot. I'm trying to figure out what people need.")

settings = OpenAIRealtimeExecutionSettings(
instructions=instructions,
voice="sage",
turn_detection=TurnDetection(type="server_vad", create_response=True, silence_duration_ms=800, threshold=0.8),
function_choice_behavior=FunctionChoiceBehavior.Auto(),
)
# start the the speaker and the microphone
with contextlib.suppress(asyncio.CancelledError):
await asyncio.gather(*[speaker.play(chat_history, settings), microphone.record_audio()])


if __name__ == "__main__":
print(
"Instruction: start speaking, when you stop the API should detect you finished and start responding."
"Press ctrl + c to stop the program."
)
asyncio.run(main())
75 changes: 75 additions & 0 deletions python/samples/concepts/audio/audio_player_async.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,75 @@
# Copyright (c) Microsoft. All rights reserved.

import threading

import numpy as np
import pyaudio
import sounddevice as sd

CHUNK_LENGTH_S = 0.05 # 100ms
SAMPLE_RATE = 24000
FORMAT = pyaudio.paInt16
CHANNELS = 1


class AudioPlayerAsync:
def __init__(self, device_id: int | None = None):
self.queue = []
self.lock = threading.Lock()
self.stream = sd.OutputStream(
callback=self.callback,
samplerate=SAMPLE_RATE,
channels=CHANNELS,
dtype=np.int16,
blocksize=int(CHUNK_LENGTH_S * SAMPLE_RATE),
device=device_id,
)
self.playing = False
self._frame_count = 0

def callback(self, outdata, frames, time, status): # noqa
with self.lock:
data = np.empty(0, dtype=np.int16)

# get next item from queue if there is still space in the buffer
while len(data) < frames and len(self.queue) > 0:
item = self.queue.pop(0)
frames_needed = frames - len(data)
data = np.concatenate((data, item[:frames_needed]))
if len(item) > frames_needed:
self.queue.insert(0, item[frames_needed:])

self._frame_count += len(data)

# fill the rest of the frames with zeros if there is no more data
if len(data) < frames:
data = np.concatenate((data, np.zeros(frames - len(data), dtype=np.int16)))

outdata[:] = data.reshape(-1, 1)

def reset_frame_count(self):
self._frame_count = 0

def get_frame_count(self):
return self._frame_count

def add_data(self, data: bytes):
with self.lock:
# bytes is pcm16 single channel audio data, convert to numpy array
np_data = np.frombuffer(data, dtype=np.int16)
self.queue.append(np_data)
if not self.playing:
self.start()

def start(self):
self.playing = True
self.stream.start()

def stop(self):
self.playing = False
self.stream.stop()
with self.lock:
self.queue = []

def terminate(self):
self.stream.close()
60 changes: 60 additions & 0 deletions python/samples/concepts/audio/audio_recorder_stream.py
Original file line number Diff line number Diff line change
@@ -0,0 +1,60 @@
# Copyright (c) Microsoft. All rights reserved.

import asyncio
import base64
from collections.abc import AsyncGenerator
from typing import Any, ClassVar, cast

from pydantic import BaseModel

from semantic_kernel.contents.audio_content import AudioContent


class AudioRecorderStream(BaseModel):
"""A class to record audio from the microphone and save it to a WAV file.

To start recording, press the spacebar. To stop recording, release the spacebar.

To use as a context manager, that automatically removes the output file after exiting the context:
```
with AudioRecorder(output_filepath="output.wav") as recorder:
recorder.start_recording()
# Do something with the recorded audio
...
```
"""

# Audio recording parameters
CHANNELS: ClassVar[int] = 1
SAMPLE_RATE: ClassVar[int] = 24000
CHUNK_LENGTH_S: ClassVar[float] = 0.05
device_id: int | None = None

async def stream_audio_content(self) -> AsyncGenerator[AudioContent, None]:
import sounddevice as sd # type: ignore

# device_info = sd.query_devices()
# print(device_info)

read_size = int(self.SAMPLE_RATE * 0.02)

stream = sd.InputStream(
channels=self.CHANNELS,
samplerate=self.SAMPLE_RATE,
dtype="int16",
device=self.device_id,
)
stream.start()
try:
while True:
if stream.read_available < read_size:
await asyncio.sleep(0)
eavanvalkenburg marked this conversation as resolved.
Show resolved Hide resolved
continue

data, _ = stream.read(read_size)
yield AudioContent(data=base64.b64encode(cast(Any, data)), data_format="base64", mime_type="audio/wav")
except KeyboardInterrupt:
pass
finally:
stream.stop()
stream.close()
Original file line number Diff line number Diff line change
Expand Up @@ -276,7 +276,9 @@ async def get_streaming_chat_message_contents(
for msg in messages:
if msg is not None:
all_messages.append(msg)
if any(isinstance(item, FunctionCallContent) for item in msg.items):
if not function_call_returned and any(
isinstance(item, FunctionCallContent) for item in msg.items
):
function_call_returned = True
yield messages

Expand Down Expand Up @@ -442,7 +444,10 @@ def _get_ai_model_id(self, settings: "PromptExecutionSettings") -> str:
return getattr(settings, "ai_model_id", self.ai_model_id) or self.ai_model_id

def _yield_function_result_messages(self, function_result_messages: list) -> bool:
"""Determine if the function result messages should be yielded."""
"""Determine if the function result messages should be yielded.

If there are messages and if the first message has items, then yield the messages.
"""
return len(function_result_messages) > 0 and len(function_result_messages[0].items) > 0

# endregion
Loading
Loading