Build an Agora Transcription Bot with AssemblyAI Universal-3 Pro
This tutorial walks through building a real-time transcription bot in Python that joins an Agora channel as a silent observer, captures each participant's audio as raw PCM frames, and streams it to AssemblyAI Universal-3 Pro Streaming for speaker-aware transcripts.
The full source is available at github.com/kelseyefoster/voice-agent-agora-universal-3-pro.
Why This Stack?
Agora's Python Server SDK lets a server-side bot join channels, subscribe to participant audio as raw PCM frames, and optionally publish audio back — without any browser or mobile client. This PCM stream format aligns directly with what AssemblyAI Universal-3 Pro Streaming expects, making the integration unusually clean.
| Metric | AssemblyAI Universal-3 Pro | Agora Built-in STT |
|---|---|---|
| P50 latency | 307ms | ~600–900ms |
| Word Error Rate | 8.9% | ~14–18% |
| Speaker diarization | ✅ Real-time | ❌ |
| Languages | 99+ | Limited |
Prerequisites
- Python 3.9+
- Agora Console account — App ID and App Certificate
- AssemblyAI API key
Quick Start
git clone https://github.com/kelseyefoster/voice-agent-agora-universal-3-pro
cd voice-agent-agora-universal-3-pro
pip install -r requirements.txt
cp .env.example .env
# Fill in AGORA_APP_ID, AGORA_APP_CERT, ASSEMBLYAI_API_KEY
python bot.py --channel my-channel
The bot joins the channel, opens one AssemblyAI WebSocket per participant, and prints completed turn transcripts to stdout. Press Ctrl+C to stop cleanly.
Environment Variables
AGORA_APP_ID=your_agora_app_id
AGORA_APP_CERT=your_agora_certificate
AGORA_CHANNEL=my-channel
AGORA_BOT_UID=9999
ASSEMBLYAI_API_KEY=your_assemblyai_api_key
How It Works
1. Join the channel as an audience bot
from agora.rtc.agora_service import AgoraService, AgoraServiceConfig
from agora.rtc.rtc_connection import RTCConnConfig
from agora.rtc.agora_base import ClientRoleType, ChannelProfileType, AudioScenarioType
cfg = AgoraServiceConfig()
cfg.appid = AGORA_APP_ID
cfg.enable_audio_processor = True
cfg.audio_scenario = AudioScenarioType.AUDIO_SCENARIO_CHORUS
service = AgoraService()
service.initialize(cfg)
conn_cfg = RTCConnConfig(
client_role_type=ClientRoleType.CLIENT_ROLE_AUDIENCE,
channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING,
)
connection = service.create_rtc_connection(conn_cfg)
connection.connect(token, channel, str(bot_uid))
2. Configure 16 kHz audio output before subscribing
agora_channel = connection.get_local_user()
# Set BEFORE subscribe_all_audio — eliminates resampling
agora_channel.set_playback_audio_frame_before_mixing_parameters(
num_of_channels=1,
sample_rate=16000,
)
agora_channel.subscribe_all_audio()
Each PcmAudioFrame will contain 160 samples of 16-bit little-endian PCM at 16 kHz mono — exactly what AssemblyAI expects.
3. Open one AssemblyAI WebSocket per participant
AAI_WS_URL = (
"wss://streaming.assemblyai.com/v3/ws"
f"?sample_rate=16000"
"&speech_model=u3-rt-pro"
"&format_turns=true"
)
async def stream_participant(agora_channel, uid: int, api_key: str):
headers = {"Authorization": api_key}
async with websockets.connect(AAI_WS_URL, additional_headers=headers) as ws:
begin = json.loads(await ws.recv())
print(f"[uid={uid}] Session: {begin['id']}")
async def send_audio():
async for frame in agora_channel.get_audio_frames(uid):
await ws.send(frame.data)
async def recv_transcripts():
async for message in ws:
event = json.loads(message)
if event["type"] == "Turn" and event.get("end_of_turn"):
print(f"[uid={uid}] {event['transcript']}")
await asyncio.gather(send_audio(), recv_transcripts())
4. Track participants dynamically
active_streams: dict[int, asyncio.Task] = {}
def on_user_joined(uid: int):
task = asyncio.create_task(stream_participant(agora_channel, uid, api_key))
active_streams[uid] = task
def on_user_left(uid: int, reason: int):
if uid in active_streams:
active_streams[uid].cancel()
del active_streams[uid]
connection.register_observer_callback("on_user_joined", on_user_joined)
connection.register_observer_callback("on_user_offline", on_user_left)
5. Terminate cleanly
async def close_stream(ws):
await ws.send(json.dumps({"type": "Terminate"}))
async for message in ws:
event = json.loads(message)
if event["type"] == "Termination":
print(f"Audio processed: {event['audio_duration_seconds']}s")
break
Production Token Generation
pip install agora-token-builder
from agora_token_builder import RtcTokenBuilder, Role_Subscriber
import time
def generate_bot_token(app_id, app_cert, channel, uid):
expire = int(time.time()) + 3600
return RtcTokenBuilder.buildTokenWithUid(
app_id, app_cert, channel, uid, Role_Subscriber, expire
)
Extending the Bot
The end_of_turn transcript is a clean signal to drive downstream logic:
if event["type"] == "Turn" and event.get("end_of_turn"):
transcript = event["transcript"]
# Option A: send to an LLM
await send_to_llm(uid, transcript)
# Option B: store in a database
await db.insert(uid=uid, text=transcript)
# Option C: trigger a webhook
await post_webhook({"uid": uid, "text": transcript})
Top comments (0)