DEV Community

Mart Schweiger
Mart Schweiger

Posted on • Originally published at github.com

Build an Agora Transcription Bot with AssemblyAI Universal-3 Pro

Build an Agora Transcription Bot with AssemblyAI Universal-3 Pro

This tutorial walks through building a real-time transcription bot in Python that joins an Agora channel as a silent observer, captures each participant's audio as raw PCM frames, and streams it to AssemblyAI Universal-3 Pro Streaming for speaker-aware transcripts.

The full source is available at github.com/kelseyefoster/voice-agent-agora-universal-3-pro.

Why This Stack?

Agora's Python Server SDK lets a server-side bot join channels, subscribe to participant audio as raw PCM frames, and optionally publish audio back — without any browser or mobile client. This PCM stream format aligns directly with what AssemblyAI Universal-3 Pro Streaming expects, making the integration unusually clean.

Metric AssemblyAI Universal-3 Pro Agora Built-in STT
P50 latency 307ms ~600–900ms
Word Error Rate 8.9% ~14–18%
Speaker diarization ✅ Real-time
Languages 99+ Limited

Prerequisites

Quick Start

git clone https://github.com/kelseyefoster/voice-agent-agora-universal-3-pro
cd voice-agent-agora-universal-3-pro

pip install -r requirements.txt
cp .env.example .env
# Fill in AGORA_APP_ID, AGORA_APP_CERT, ASSEMBLYAI_API_KEY

python bot.py --channel my-channel
Enter fullscreen mode Exit fullscreen mode

The bot joins the channel, opens one AssemblyAI WebSocket per participant, and prints completed turn transcripts to stdout. Press Ctrl+C to stop cleanly.

Environment Variables

AGORA_APP_ID=your_agora_app_id
AGORA_APP_CERT=your_agora_certificate
AGORA_CHANNEL=my-channel
AGORA_BOT_UID=9999
ASSEMBLYAI_API_KEY=your_assemblyai_api_key
Enter fullscreen mode Exit fullscreen mode

How It Works

1. Join the channel as an audience bot

from agora.rtc.agora_service import AgoraService, AgoraServiceConfig
from agora.rtc.rtc_connection import RTCConnConfig
from agora.rtc.agora_base import ClientRoleType, ChannelProfileType, AudioScenarioType

cfg = AgoraServiceConfig()
cfg.appid = AGORA_APP_ID
cfg.enable_audio_processor = True
cfg.audio_scenario = AudioScenarioType.AUDIO_SCENARIO_CHORUS

service = AgoraService()
service.initialize(cfg)

conn_cfg = RTCConnConfig(
    client_role_type=ClientRoleType.CLIENT_ROLE_AUDIENCE,
    channel_profile=ChannelProfileType.CHANNEL_PROFILE_LIVE_BROADCASTING,
)
connection = service.create_rtc_connection(conn_cfg)
connection.connect(token, channel, str(bot_uid))
Enter fullscreen mode Exit fullscreen mode

2. Configure 16 kHz audio output before subscribing

agora_channel = connection.get_local_user()

# Set BEFORE subscribe_all_audio — eliminates resampling
agora_channel.set_playback_audio_frame_before_mixing_parameters(
    num_of_channels=1,
    sample_rate=16000,
)
agora_channel.subscribe_all_audio()
Enter fullscreen mode Exit fullscreen mode

Each PcmAudioFrame will contain 160 samples of 16-bit little-endian PCM at 16 kHz mono — exactly what AssemblyAI expects.

3. Open one AssemblyAI WebSocket per participant

AAI_WS_URL = (
    "wss://streaming.assemblyai.com/v3/ws"
    f"?sample_rate=16000"
    "&speech_model=u3-rt-pro"
    "&format_turns=true"
)

async def stream_participant(agora_channel, uid: int, api_key: str):
    headers = {"Authorization": api_key}
    async with websockets.connect(AAI_WS_URL, additional_headers=headers) as ws:
        begin = json.loads(await ws.recv())
        print(f"[uid={uid}] Session: {begin['id']}")

        async def send_audio():
            async for frame in agora_channel.get_audio_frames(uid):
                await ws.send(frame.data)

        async def recv_transcripts():
            async for message in ws:
                event = json.loads(message)
                if event["type"] == "Turn" and event.get("end_of_turn"):
                    print(f"[uid={uid}] {event['transcript']}")

        await asyncio.gather(send_audio(), recv_transcripts())
Enter fullscreen mode Exit fullscreen mode

4. Track participants dynamically

active_streams: dict[int, asyncio.Task] = {}

def on_user_joined(uid: int):
    task = asyncio.create_task(stream_participant(agora_channel, uid, api_key))
    active_streams[uid] = task

def on_user_left(uid: int, reason: int):
    if uid in active_streams:
        active_streams[uid].cancel()
        del active_streams[uid]

connection.register_observer_callback("on_user_joined", on_user_joined)
connection.register_observer_callback("on_user_offline", on_user_left)
Enter fullscreen mode Exit fullscreen mode

5. Terminate cleanly

async def close_stream(ws):
    await ws.send(json.dumps({"type": "Terminate"}))
    async for message in ws:
        event = json.loads(message)
        if event["type"] == "Termination":
            print(f"Audio processed: {event['audio_duration_seconds']}s")
            break
Enter fullscreen mode Exit fullscreen mode

Production Token Generation

pip install agora-token-builder
Enter fullscreen mode Exit fullscreen mode
from agora_token_builder import RtcTokenBuilder, Role_Subscriber
import time

def generate_bot_token(app_id, app_cert, channel, uid):
    expire = int(time.time()) + 3600
    return RtcTokenBuilder.buildTokenWithUid(
        app_id, app_cert, channel, uid, Role_Subscriber, expire
    )
Enter fullscreen mode Exit fullscreen mode

Extending the Bot

The end_of_turn transcript is a clean signal to drive downstream logic:

if event["type"] == "Turn" and event.get("end_of_turn"):
    transcript = event["transcript"]

    # Option A: send to an LLM
    await send_to_llm(uid, transcript)

    # Option B: store in a database
    await db.insert(uid=uid, text=transcript)

    # Option C: trigger a webhook
    await post_webhook({"uid": uid, "text": transcript})
Enter fullscreen mode Exit fullscreen mode

Resources

Top comments (0)