Imagine a fast food restaurant taking orders in real-time using a speech-to-text API. The challenge is the customer will start speaking and sending audio data before the WebSocket connection opens. We need a way to capture that audio along with transcribing whatever the customers say after the WebSocket has been opened until they are finished speaking their order.
One solution is using a buffer, or a queue, to store the audio data before the WebSocket is connected. In Python, we can implement a buffer by using a list. We can add the audio data in bytes to the queue before the WebSocket connection is made and even continue using the buffer during the speech-to-text transcription after the connection is made.
In the next section, we will see to implement this solution using Python and the Deepgram speech-to-text API.
Using a Buffer in Python to Store Audio Data from Speech-to-Text Transcription
To run this code you’ll need a few things.
- Grab a Deepgram API key from Deepgram
- Install the following packages using
pip
:
pip install deepgram-sdk
pip install PyAudio
The following is the solution implemented in Python with a quick explanation of the code:
import pyaudio
import asyncio
import websockets
import os
import json
DEEPGRAM_API_KEY = "YOUR_DEEPGRAM_API_KEY"
FORMAT = pyaudio.paInt16
CHANNELS = 1
RATE = 16000
CHUNK = 8000
audio_queue = asyncio.Queue()
def callback(input_data, frame_count, time_info, status_flags):
audio_queue.put_nowait(input_data)
return (input_data, pyaudio.paContinue)
async def microphone():
audio = pyaudio.PyAudio()
stream = audio.open(
format = FORMAT,
channels = CHANNELS,
rate = RATE,
input = True,
frames_per_buffer = CHUNK,
stream_callback = callback
)
stream.start_stream()
while stream.is_active():
await asyncio.sleep(0.1)
stream.stop_stream()
stream.close()
async def process():
extra_headers = {
'Authorization': 'token ' + DEEPGRAM_API_KEY
}
async with websockets.connect('wss://api.deepgram.com/v1/listen?encoding=linear16&sample_rate=16000&channels=1', extra_headers = extra_headers) as ws:
async def sender(ws): # sends audio to websocket
try:
while True:
data = await audio_queue.get().
await ws.send(data)
except Exception as e:
print('Error while sending: ', + str(e))
raise
async def receiver(ws):
async for msg in ws:
msg = json.loads(msg)
transcript = msg['channel']['alternatives'][0]['transcript']
if transcript:
print(f'Transcript = {transcript}')
await asyncio.gather(sender(ws), receiver(ws))
async def run():
await asyncio.gather(microphone(),process())
if __name__ == '__main__':
asyncio.run(run())
Python Code Explanation for Using a Buffer with Speech-to-Text Transcription
Since we’re working with Python’s asyncio, we need to create a callback function as defined by PyAudio. This callback puts an item into the queue without blocking.
def callback(input_data, frame_count, time_info, status_flags):
audio_queue.put_nowait(input_data)
return (input_data, pyaudio.paContinue)
We define a microphone()
function, create a stream
based on PyAudio, and pass in our callback in stream_callback
. We then start the stream and loop through it while it’s active.
async def microphone():
audio = pyaudio.PyAudio()
stream = audio.open(
format = FORMAT,
channels = CHANNELS,
rate = RATE,
input = True,
frames_per_buffer = CHUNK,
stream_callback = callback
)
stream.start_stream()
while stream.is_active():
await asyncio.sleep(0.1)
stream.stop_stream()
stream.close()
Next, we define an outer function called process()
that gets the authorization for Deepgram. We create a context manager to async with websockets.connect
to connect to the Deepgram WebSocket server.
The sender()
function sends audio to the WebSocket. The buffer audio_queue.get()
removes and returns an item from the queue. If the queue is empty, it waits until an item is available.
The reciever()
function receives the transcript, parses the JSON response, and prints the transcript to the console.
Lastly, we run the program using asyncio.run(run())
inside of main
.
async def process():
extra_headers = {
'Authorization': 'token ' + DEEPGRAM_API_KEY
}
async with websockets.connect('wss://api.deepgram.com/v1/listen?encoding=linear16&sample_rate=16000&channels=1', extra_headers = extra_headers) as ws:
async def sender(ws):
try:
while True:
data = await audio_queue.get().
await ws.send(data)
except Exception as e:
print('Error while sending: ', + str(e))
raise
async def receiver(ws): # receives the transcript
async for msg in ws:
msg = json.loads(msg)
transcript = msg['channel']['alternatives'][0]['transcript']
if transcript:
print(f'Transcript = {transcript}')
await asyncio.gather(sender(ws), receiver(ws))
async def run():
await asyncio.gather(microphone(),process())
if __name__ == '__main__':
asyncio.run(run())
Conclusion
We hope you enjoyed this short project. If you need help with the tutorial or running the code please don’t hesitate to reach out to us. The best place to start is in our GitHub Discussions.
Top comments (0)