DEV Community

Cover image for Step-by-Step Guide: Interruption Management with OpenAI Realtime API
M Sea Bass
M Sea Bass

Posted on

Step-by-Step Guide: Interruption Management with OpenAI Realtime API

This article introduces how to implement a conversation interruption feature using the OpenAI Realtime API.

The details of the implementation are available in the GitHub repository.

This implementation is based on the code from Azure-Samples/aoai-realtime-audio-sdk. A detailed explanation of the code can be found in this article.

Audio Input

In this implementation, we use the microphone and speaker of the local PC for audio input and output.

The audio captured from the microphone is sent to the OpenAI Realtime API server for processing.

To capture audio from the local PC's microphone, we use the stream functionality of the pyaudio library. The following code sets up a stream for audio input:

p = pyaudio.PyAudio()
input_default_input_index = p.get_default_input_device_info()['index']
input_stream = p.open(
    format=STREAM_FORMAT,
    channels=INPUT_CHANNELS,
    rate=INPUT_SAMPLE_RATE,
    input=True,
    output=False,
    frames_per_buffer=INPUT_CHUNK_SIZE,
    input_device_index=input_default_input_index,
    start=False,
)
input_stream.start_stream()
Enter fullscreen mode Exit fullscreen mode

Audio capture is performed using threading.Thread for parallel processing. The audio data obtained from the microphone is encoded into base64 format and stored in a queue.

def listen_audio(input_stream: pyaudio.Stream):
    while True:
        audio_data = input_stream.read(INPUT_CHUNK_SIZE, exception_on_overflow=False)
        if audio_data is None:
            continue
        base64_audio = base64.b64encode(audio_data).decode("utf-8")
        audio_input_queue.put(base64_audio)

threading.Thread(target=listen_audio, args=(input_stream,), daemon=True).start()
Enter fullscreen mode Exit fullscreen mode

The base64 strings stored in the queue are sent to the OpenAI Realtime API server as "input_audio_buffer.append" messages.

async def send_audio(client: RTLowLevelClient):
    while not client.closed:
        base64_audio = await asyncio.get_event_loop().run_in_executor(None, audio_input_queue.get)
        await client.send(InputAudioBufferAppendMessage(audio=base64_audio))
        await asyncio.sleep(0)
Enter fullscreen mode Exit fullscreen mode

Audio Output

Audio playback is performed through the local PC's speakers using the audio data received from the OpenAI Realtime API server.

The audio data is received as "response.audio.delta" messages from the server. Since the received data is encoded in base64, it is decoded, stored in a queue, and converted into a playable format.

async def receive_messages(client: RTLowLevelClient):
    while True:
        message = await client.recv()
        if message is None:
            continue
        match message.type:
            case "response.audio.delta":
                audio_data = base64.b64decode(message.delta)
                for i in range(0, len(audio_data), OUTPUT_CHUNK_SIZE):
                    audio_output_queue.put(audio_data[i:i+OUTPUT_CHUNK_SIZE])
                await asyncio.sleep(0)
Enter fullscreen mode Exit fullscreen mode

The data stored in the queue is played through the local PC's speakers using parallel processing. This playback process uses threading.Thread to ensure that the audio data is played smoothly in real-time.

def play_audio(output_stream: pyaudio.Stream):
    while True:
        audio_data = audio_output_queue.get()
        output_stream.write(audio_data)

p = pyaudio.PyAudio()
output_default_output_index = p.get_default_output_device_info()['index']
output_stream = p.open(
    format=STREAM_FORMAT,
    channels=OUTPUT_CHANNELS,
    rate=OUTPUT_SAMPLE_RATE,
    input=False,
    output=True,
    frames_per_buffer=OUTPUT_CHUNK_SIZE,
    output_device_index=output_default_output_index,
    start=False,
)
output_stream.start_stream()

threading.Thread(target=play_audio, args=(output_stream,), daemon=True).start()
Enter fullscreen mode Exit fullscreen mode

Conversation Interruption Handling

The OpenAI Realtime API automatically detects conversation segments on the server side. This allows for the detection of new speech and the creation of real-time responses even while the AI is responding.

However, when playing audio on a local PC, it is important to stop the playback of ongoing audio to achieve a natural interruption of the conversation. This point requires attention. The detection of user speech is received from the OpenAI Realtime API server as an "input_audio_buffer.speech_started" message. When this message is received, the playback is stopped by clearing the audio data stored in the queue.

async def receive_messages(client: RTLowLevelClient):
    while True:
        message = await client.recv()
        # print(f"{message=}")
        if message is None:
            continue
        match message.type:
            case "input_audio_buffer.speech_started":
                print("Input Audio Buffer Speech Started Message")
                print(f"  Item Id: {message.item_id}")
                print(f"  Audio Start [ms]: {message.audio_start_ms}")
                while not audio_output_queue.empty():
                    audio_output_queue.get()
Enter fullscreen mode Exit fullscreen mode

As for audio output, no modifications are needed; it operates as described in the previously explained code.

Conclusion

This time, I introduced a Python implementation for conversation interruption.

I hope this article proves helpful to anyone who faces challenges with stopping AI speech effectively, as I did.

Additionally, the definition and configuration of stream instances can affect the quality of audio playback. If you experience interruptions in audio playback, reviewing these settings might help improve the situation.

Thank you for reading until the end.

Top comments (0)