1. Overall Architecture
1.1 System Hierarchical Structure
OpenAvatarChat adopts a layered architecture, divided into three levels from top to bottom:

Architecture Description:
1. ChatEngine (Top Layer)
The core of the system, managing the entire chat engine
Responsible for initialization, configuration loading, and Handler management
Supports concurrent multi-session operation, with each session running independently
2. ChatSession (Middle Layer)
Corresponds to a user session (one WebRTC connection)
Manages all Handler instances within the session
Manages data flow, threads, and queues
3. Handler (Bottom Layer)
Functional modules responsible for specific task processing
Includes: RTC client, VAD, ASR, LLM, TTS, Avatar, etc.
Each Handler creates an independent instance when the session starts
1.2 Core Component Description
ChatEngine (src/chat_engine/chat_engine.py)
Responsibilities:
System initialization and management
Creation and initialization of HandlerManager
Creation and destruction of sessions
Management of concurrent multi-session operation
Key Methods:
def initialize(engine_config, app=None, ui=None):
# Initialize HandlerManager
# Load all Handlers
# Set up the client Handler
def create_client_session(session_info, client_handler):
# Create a new ChatSession
# Prepare the Handler environment
# Return the session and Handler environment
def stop_session(session_id):
# Stop and destroy the session
HandlerManager (src/chat_engine/core/handler_manager.py)
Responsibilities:
Dynamically load Handler modules from configuration files
Key Data Structure:
handler_registries = {
"RtcClient": HandlerRegistry(
base_info=HandlerBaseInfo(...),
handler=RtcClient instance,
handler_config=configuration object
),
"SileroVad": HandlerRegistry(...),
...
}
ChatSession (src/chat_engine/core/chat_session.py)
Responsibilities:
Manage data flow for a single session
Create and manage Handler instances
Key Data Structure:
# Data routing table: Data type → Handler input queue
data_sinks = {
ChatDataType.MIC_AUDIO: [
DataSink(owner="SileroVad", sink_queue=vad_queue),
],
ChatDataType.HUMAN_TEXT: [
DataSink(owner="LLM_Bailian", sink_queue=llm_queue),
],
}
Handler records: Handler name → Handler environment
handlers = {
"SileroVad": HandlerRecord(env=HandlerEnv(...)),
...
}
2. Data Flow Process
2.1 Complete Data Flow Architecture Diagram
The complete data flow is as follows:

2.2 Detailed Data Flow Process
Step 1: Client Input

Step 2: Data Distribution (Subscription Distribution)

Key Mechanisms:
data_sinks is a mapping table from data types to Handler input queues.
The system automatically finds all subscribers based on the data type.
Data is simultaneously distributed to all Handlers that have subscribed to that data type.
Step 3: Handler Processing
Each Handler has an independent processing thread that reads data from its own input queue:

Step 4: Chained Data Flow
Data automatically forms a processing chain based on the input and output definitions of the Handlers:

Step 5: Client Output

2.3 Key Data Structures: Queues and Routing
Input Queues:
# Client input queues (created by RTC Client Handler)
input_queues = {
EngineChannelType.AUDIO: asyncio.Queue(),
EngineChannelType.VIDEO: asyncio.Queue(),
EngineChannelType.TEXT: asyncio.Queue(),
}
Handler Input Queues:
# Each Handler has its own input queue
vad_input_queue = queue.Queue() # Input queue for SileroVad
asr_input_queue = queue.Queue() # Input queue for SenseVoice
llm_input_queue = queue.Queue() # Input queue for LLM_Bailian
tts_input_queue = queue.Queue() # Input queue for Edge_TTS
avatar_input_queue = queue.Queue() # Input queue for AvatarMusetalk
Data Routing Table (data_sinks):
# Data type → List of Handlers that subscribe to this type
data_sinks = {
ChatDataType.MIC_AUDIO: [
DataSink(owner="SileroVad", sink_queue=vad_input_queue),
],
ChatDataType.HUMAN_AUDIO: [
DataSink(owner="SenseVoice", sink_queue=asr_input_queue),
],
ChatDataType.HUMAN_TEXT: [
DataSink(owner="LLM_Bailian", sink_queue=llm_input_queue),
],
ChatDataType.AVATAR_TEXT: [
DataSink(owner="Edge_TTS", sink_queue=tts_input_queue),
],
ChatDataType.AVATAR_AUDIO: [
DataSink(owner="AvatarMusetalk", sink_queue=avatar_input_queue),
],
}
Output Queue Mapping:
# (Handler Name, Data Type) → Output Queue
outputs = {
("AvatarMusetalk", ChatDataType.AVATAR_VIDEO): DataSink(
sink_queue=output_queues[EngineChannelType.VIDEO]
),
}
3. The Essence of Handler
3.1 What is a Handler?
A Handler is an independent functional module, and each Handler is responsible for a specific task:
RTC Client Handler: Manages WebRTC connections, receives user input, and sends output
SileroVad Handler: Voice Activity Detection (VAD), detects whether the user is speaking
SenseVoice Handler: Speech Recognition (ASR), converts speech into text
LLM Handler: Large Language Model, generates response text
TTS Handler: Text-to-Speech (TTS), converts text into audio
Avatar Handler: Avatar driving, generates video from audio
3.2 The Nature of a Handler: Independent Threads
Key Understanding: Each Handler creates an independent thread when the session starts.

Thread Operation Mode:
# Core loop of the handler_pumper thread
def handler_pumper(session_context, handler_env, sinks, outputs):
shared_states = session_context.shared_states
input_queue = handler_env.input_queue # Handler's input queue
while shared_states.active: # Continue running while the session is active
try:
# 1. Read data from the input queue
input_data = input_queue.get_nowait()
except queue.Empty:
time.sleep(0.03) # Sleep for 30ms when the queue is empty
continue
# 2. Call the Handler to process the data
handler_result = handler_env.handler.handle(
handler_env.context,
input_data,
handler_env.output_info
)
# 3. Submit the processed result
ChatDataSubmitter.submit(handler_result)
│
└─→ distribute_data() # Distribute to the next Handler</code></pre><h3 class="wp-block-heading" style="font-size:clamp(18.959px, 1.185rem + ((1vw - 3.2px) * 1.255), 30px);">3.3 The Lifecycle of a Handler</h3><h4 class="wp-block-heading" style="font-size:clamp(16.293px, 1.018rem + ((1vw - 3.2px) * 0.989), 25px);">Stage 1: Load (load)</h4><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-b85c410936e4085440a31364d204b137">When the system starts, each Handler executes a load:</p><pre class="wp-block-code"><code>handler.load(engine_config, handler_config)</code></pre><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-d70759991c85e4d41799731d87dd3c20"><strong>Purpose</strong>:</p><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-c7adf9dabae7a9ec3268914328da44c2"><img src="https://s.w.org/images/core/emoji/15.0.3/72x72/25aa.png" alt="▪" class="wp-smiley" style="height: 1em; max-height: 1em;" /> Load model files</p><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-110eeec2b4f8d0745e672a7460c70773"><img src="https://s.w.org/images/core/emoji/15.0.3/72x72/25aa.png" alt="▪" class="wp-smiley" style="height: 1em; max-height: 1em;" /> Initialize global resources</p><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-e86f46ea31a3ab58b33f3f3d817c88f7"><img src="https://s.w.org/images/core/emoji/15.0.3/72x72/25aa.png" alt="▪" class="wp-smiley" style="height: 1em; max-height: 1em;" /> Prepare the Handler runtime environment</p><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-47e3acf204bd1f841288f37930cfd1fe"><strong>Examples</strong>:</p><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-4cf4aa4c73ceff3e42c49c493d0756fe"><img src="https://s.w.org/images/core/emoji/15.0.3/72x72/25aa.png" alt="▪" class="wp-smiley" style="height: 1em; max-height: 1em;" /> SileroVad: Load the VAD model</p><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-7d5a5d31e923c06a261ce9c71875b756"><img src="https://s.w.org/images/core/emoji/15.0.3/72x72/25aa.png" alt="▪" class="wp-smiley" style="height: 1em; max-height: 1em;" /> SenseVoice: Load the ASR model</p><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-bf814852ddf0e1c48e75d01d8b4be34a"><img src="https://s.w.org/images/core/emoji/15.0.3/72x72/25aa.png" alt="▪" class="wp-smiley" style="height: 1em; max-height: 1em;" /> LLM: Initialize API client</p><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-ca77cb04f64945a005b4db0cb0e2ae8c"><img src="https://s.w.org/images/core/emoji/15.0.3/72x72/25aa.png" alt="▪" class="wp-smiley" style="height: 1em; max-height: 1em;" /> Avatar: Load the avatar model</p><h4 class="wp-block-heading" style="font-size:clamp(16.293px, 1.018rem + ((1vw - 3.2px) * 0.989), 25px);">Stage 2: Create Context (create_context)</h4><p class="has-black-color has-text-color has-link-color has-small-medium-font-size wp-elements-3929972cd6d907f4813220ddd0c955e8">When each session is created, an independent context is created for each Handler:</p><pre class="wp-block-code"><code>handler_context = handler.create_context(session_context, handler_config)
Purpose:
For example: LLM creates conversation history, ASR creates an audio buffer
Stage 3: Handle (handle)
During the session, the Handler continuously processes data:
handler_result = handler.handle(context, inputs, output_definitions)
Features:
Each Handler runs in its own thread
It reads data from its own input queue
After processing, it outputs the result
Stage 4: Destroy Context (destroy_context)
When the session ends, the Handler context is cleaned up:
handler.destroy_context(handler_context)
Purpose:
Release session-related resources
3.4 Interface Definition of Handlers
All Handlers inherit from HandlerBase and must implement the following interfaces:
class HandlerBase(ABC):
@abstractmethod
def load(self, engine_config, handler_config):
"""Load the Handler (e.g., load models)"""
pass
@abstractmethod
def create_context(self, session_context, handler_config):
"""Create Handler context"""
pass
@abstractmethod
def handle(self, context, inputs, output_definitions):
"""Process input data"""
pass
@abstractmethod
def get_handler_detail(self, session_context, context):
"""Declare input and output data types"""
return HandlerDetail(
inputs={...}, # Input type definitions
outputs={...} # Output type definitions
)
@abstractmethod
def destroy_context(self, context):
"""Destroy Handler context"""
pass
3.5 Key Method of Handler: get_handler_detail
This is the key method for interaction between the Handler and the system. The Handler declares its inputs and outputs through this method:
def get_handler_detail(self, session_context, context) -> HandlerDetail:
return HandlerDetail(
inputs={
ChatDataType.MIC_AUDIO: HandlerDataInfo(
type=ChatDataType.MIC_AUDIO,
# Other configurations...
)
},
outputs={
ChatDataType.HUMAN_AUDIO: HandlerDataInfo(
type=ChatDataType.HUMAN_AUDIO,
definition=output_definition,
)
}
)
How the System Uses It:
1. During the prepare_handler() stage, the system calls get_handler_detail().
2. Based on the returned inputs, the system creates a data routing table:
for input_type, input_info in io_detail.inputs.items():
sink_list = data_sinks.setdefault(input_type, [])
data_sink = DataSink(
owner=handler_name,
sink_queue=handler_input_queue
)
sink_list.append(data_sink)
3. When data of that type arrives, the system automatically distributes it to the Handler’s input queue.
4. Handler Collaborative Mechanism
4.1 Data Subscription Mechanism
Core Idea: Handlers “subscribe” to data by declaring input types, and the system automatically establishes data routing.
Establishing Subscription Relationships

Subscription Example
For example, in the glut3.yaml configuration:
# SileroVad subscribes to MIC_AUDIO
data_sinks[ChatDataType.MIC_AUDIO] = [
DataSink(owner="SileroVad", sink_queue=vad_queue),
]
SenseVoice subscribes to HUMAN_AUDIO (SileroVad's output)
data_sinks[ChatDataType.HUMAN_AUDIO] = [
DataSink(owner="SenseVoice", sink_queue=asr_queue),
]
LLM_Bailian subscribes to HUMAN_TEXT (SenseVoice's output)
data_sinks[ChatDataType.HUMAN_TEXT] = [
DataSink(owner="LLM_Bailian", sink_queue=llm_queue),
]
Edge_TTS subscribes to AVATAR_TEXT (LLM's output)
data_sinks[ChatDataType.AVATAR_TEXT] = [
DataSink(owner="Edge_TTS", sink_queue=tts_queue),
]
AvatarMusetalk subscribes to AVATAR_AUDIO (TTS's output)
data_sinks[ChatDataType.AVATAR_AUDIO] = [
DataSink(owner="AvatarMusetalk", sink_queue=avatar_queue),
]
4.2 Data Distribution Mechanism (Subscription Distribution)
When data arrives, the system automatically distributes it through distribute_data():
def distribute_data(data: ChatData, sinks, outputs):
# 1. Check if it's the final output (directly sent to the client)
source_key = (data.source, data.type)
if source_key in outputs:
outputs[source_key].sink_queue.put_nowait(data)
# 2. Find all Handlers subscribed to this data type
sink_list = sinks.get(data.type, [])
# 3. Distribute to all subscribers
for sink in sink_list:
if sink.owner == data.source:
continue # Skip the data source itself
sink.sink_queue.put_nowait(data) # Put into Handler's input queue
Key Points:
Data is automatically routed based on type.
A piece of data can be distributed to multiple subscribers simultaneously.
Handlers are completely decoupled and unaware of each other’s existence.
4.3 Handler Parallel Processing Mechanism
Parallel Execution
All Handler threads run simultaneously without blocking each other:

Data Flow Sequence Guarantee
Although Handlers run in parallel, the data flow is sequential:
MIC_AUDIO → HUMAN_AUDIO → HUMAN_TEXT → AVATAR_TEXT → AVATAR_AUDIO → AVATAR_VIDEO
Why the Sequence is Guaranteed
1. Data Type Driven:
SenseVoice subscribes to HUMAN_AUDIO (not MIC_AUDIO)
SenseVoice only receives data when HUMAN_AUDIO is produced
2. Queue Buffering:
Each Handler has its own input queue.
The queue automatically buffers data to ensure the sequence.
3. VAD’s Speech End Marker:
VAD outputs the human_speech_end marker during processing.
ASR waits for this marker before performing inference.
This ensures that a complete speech segment is processed.
4.4 Decoupling of Handlers
Complete Decoupling
Handlers do not communicate directly with each other, only interact through data types:
❌ Incorrect Way (Tightly Coupled):
SileroVad → Direct Call → SenseVoice.handle()
✅ Correct Way (Decoupled):
SileroVad → Outputs HUMAN_AUDIO → System Distributes → SenseVoice Input Queue
Benefits of Decoupling
1. Easy to Extend: Adding a new Handler only requires declaring input/output without modifying existing Handlers.
2. Flexible Combination: Handlers can be flexibly combined through configuration files.
3. Easy to Test: Each Handler can be tested independently.
4. Easy to Maintain: Handlers have clear responsibilities and do not interfere with each other.
4.5 Session End Mechanism
Shared Flag Control
All threads share a flag: shared_states.active
# While the session is running
shared_states.active = True
All threads loop and check
while shared_states.active:
# Process data
...
When the session ends
shared_states.active = False
All threads automatically exit the loop
End Process

5. Detailed Explanation of Handlers
5.1 RTC Client Handler
Function: Manages WebRTC connections and handles bidirectional communication with the client.
Input: Client audio/video/text (received via WebRTC)
Output: Avatar video/audio (sent via WebRTC)
Key Code Locations:
src/handlers/client/rtc_client/client_handler_rtc.py
src/service/rtc_service/rtc_stream.py
Workflow:

5.2 SileroVad Handler (Voice Activity Detection)
Function: Detects whether the user is speaking and filters out silence.
Input: ChatDataType.MIC_AUDIO (raw audio)
Output: ChatDataType.HUMAN_AUDIO (human speech audio, with speech activity markers)
Key Code Location: src/handlers/vad/silerovad/vad_handler_silero.py
Key Methods:
def get_handler_detail(self, ...):
return HandlerDetail(
inputs={
ChatDataType.MIC_AUDIO: HandlerDataInfo(...)
},
outputs={
ChatDataType.HUMAN_AUDIO: HandlerDataInfo(...)
}
)
def handle(self, context, inputs, output_definitions):
# 1. Extract audio from input
audio_data = inputs.data.get_main_data()
# 2. VAD model inference
is_speech = self.model(audio_data)
# 3. If speech is detected, output HUMAN_AUDIO
if is_speech:
yield ChatData(type=HUMAN_AUDIO, data=audio_data)
Features:
Real-time processing with streaming output
Output contains human_speech_start and human_speech_end markers
ASR relies on these markers to determine when to perform recognition
5.3 SenseVoice Handler (Speech Recognition)
Function: Converts speech to text.
Input: ChatDataType.HUMAN_AUDIO (human speech audio)
Output: ChatDataType.HUMAN_TEXT (recognized text)
Key Code Location: src/handlers/asr/sensevoice/asr_handler_sensevoice.py
Key Methods:
def get_handler_detail(self, ...):
return HandlerDetail(
inputs={
ChatDataType.HUMAN_AUDIO: HandlerDataInfo(...)
},
outputs={
ChatDataType.HUMAN_TEXT: HandlerDataInfo(...)
}
)
def handle(self, context, inputs, output_definitions):
# 1. Accumulate audio data
context.audio_buffer.append(inputs.data.get_main_data())
# 2. Check if there is a human_speech_end marker
if inputs.data.has_meta('human_speech_end'):
# 3. Perform ASR inference
text = self.model(context.audio_buffer)
# 4. Output recognized text
yield ChatData(type=HUMAN_TEXT, data=text)
# 5. Clear the buffer
context.audio_buffer.clear()
Features:
Accumulates audio and waits for the speech end marker
Performs ASR on complete speech segments
Text format output: <|zh|><|NEUTRAL|><|Speech|><|woitn|>你好
5.4 LLM Handler (Large Language Model)
Function: Understands user input and generates response text.
Input: ChatDataType.HUMAN_TEXT (user text)
Output: ChatDataType.AVATAR_TEXT (AI response text)
Key Code Location: src/handlers/llm/openai_compatible/llm_handler_openai_compatible.py
Key Methods:
def get_handler_detail(self, ...):
return HandlerDetail(
inputs={
ChatDataType.HUMAN_TEXT: HandlerDataInfo(...)
},
outputs={
ChatDataType.AVATAR_TEXT: HandlerDataInfo(...)
}
)
def handle(self, context, inputs, output_definitions):
# 1. Update conversation history
context.history.add_user_message(inputs.data.get_main_data())
# 2. Call the LLM API (streaming)
response = self.client.chat.completions.create(
model=self.model_name,
messages=context.history.get_messages(),
stream=True
)
# 3. Stream the output text
for chunk in response:
text = chunk.choices[0].delta.content
if text:
yield ChatData(type=AVATAR_TEXT, data=text)
Features:
Maintains conversation history
Configurable for different LLM models (Bailian, OpenAI compatible, etc.)
5.5 Edge_TTS Handler (Text-to-Speech)
Function: Converts text to speech.
Input: ChatDataType.AVATAR_TEXT (AI response text)
Output: ChatDataType.AVATAR_AUDIO (generated audio)
Key Code Location: src/handlers/tts/edgetts/tts_handler_edgetts.py
Key Methods:
def get_handler_detail(self, ...):
return HandlerDetail(
inputs={
ChatDataType.AVATAR_TEXT: HandlerDataInfo(...)
},
outputs={
ChatDataType.AVATAR_AUDIO: HandlerDataInfo(...)
}
)
def handle(self, context, inputs, output_definitions):
# 1. Accumulate text
context.text_buffer += inputs.data.get_main_data()
# 2. Check if there is a text end marker
if inputs.data.has_meta('text_end'):
# 3. Call the TTS API to generate audio
audio = edge_tts.generate(
text=context.text_buffer,
voice=self.voice
)
# 4. Output audio stream
for audio_chunk in audio:
yield ChatData(type=AVATAR_AUDIO, data=audio_chunk)
# 5. Clear the buffer
context.text_buffer = ""
Features:
Accumulates text and waits for a complete sentence
Supports multiple voices (selectable via configuration)
5.6 AvatarMusetalk Handler (Avatar Driving)
Function: Generates avatar video (lip-sync) from audio.
Input: ChatDataType.AVATAR_AUDIO (TTS-generated audio)
Output: ChatDataType.AVATAR_VIDEO (avatar video frames)
Key Code Location: src/handlers/avatar/musetalk/avatar_handler_musetalk.py
Key Methods:
def get_handler_detail(self, ...):
return HandlerDetail(
inputs={
ChatDataType.AVATAR_AUDIO: HandlerDataInfo(...)
},
outputs={
ChatDataType.AVATAR_VIDEO: HandlerDataInfo(...)
}
)
def handle(self, context, inputs, output_definitions):
# 1. Accumulate audio data
context.audio_buffer.append(inputs.data.get_main_data())
# 2. Check if there is an audio end marker
if inputs.data.has_meta('audio_end'):
# 3. MuseTalk model processing
video_frames = self.model(
audio=context.audio_buffer,
avatar_image=context.avatar_image
)
# 4. Output video frame stream
for frame in video_frames:
yield ChatData(type=AVATAR_VIDEO, data=frame)
# 5. Clear the buffer
context.audio_buffer.clear()
Features:
Uses the MuseTalk model for inference
5.7 Summary of the Handler Processing Chain

6. Quick Reference
6.1 Key Code Locations
| Function | File Path | Key Method |
|---|---|---|
| Main Entry | src/glut.py |
main() |
| Engine Initialization | src/chat_engine/chat_engine.py |
ChatEngine.initialize() |
| Handler Loading | src/chat_engine/core/handler_manager.py |
HandlerManager.initialize() |
| Session Creation | src/chat_engine/chat_engine.py |
ChatEngine.create_client_session() |
| Data Distribution | src/chat_engine/core/chat_session.py |
ChatSession.distribute_data() |
| Input Processing | src/chat_engine/core/chat_session.py |
ChatSession.inputs_pumper() |
| Handler Processing | src/chat_engine/core/chat_session.py |
ChatSession.handler_pumper() |
6.2 Key Data Structures
Data Types (ChatDataType):
MIC_AUDIO # Microphone audio
HUMAN_AUDIO # Human speech audio
HUMAN_TEXT # User text
AVATAR_TEXT # AI response text
AVATAR_AUDIO # TTS audio
AVATAR_VIDEO # Avatar video
Data Routing Table (data_sinks):
data_sinks: Dict[ChatDataType, List[DataSink]]
Data type → List of Handlers subscribed to this type
Handler Registry:
handler_registries: Dict[str, HandlerRegistry]
Handler name → Handler registration info
6.3 Core Execution Flow

6.4 Key Features of Modularity
1. Configuration-driven: Define Handler combinations through YAML configuration files.
2. Dynamic Loading: Import and instantiate Handlers dynamically at runtime based on the configuration.
3. Data-driven Routing: Automatically distribute data based on data types, with Handlers unaware of each other.
4. Asynchronous Processing: Each Handler runs in its own thread, communicating via queues.
5. Loose Coupling: Handlers do not depend on each other directly, only on data types.
6. Easy to Extend: To add a new Handler, simply implement the HandlerBase interface.
7. Summary
OpenAvatarChat adopts a layered, modular architecture design:
Top Layer (ChatEngine): Manages the entire system and supports concurrent multi-session operation.
Middle Layer (ChatSession): Manages a single session and coordinates the collaborative work of Handlers.
Bottom Layer (Handler): Independent functional modules that communicate via data types.
Core Mechanisms:
Data Subscription: Handlers subscribe to data by declaring input types.
Automatic Routing: The system automatically distributes data based on data types.
Parallel Processing: Handlers run concurrently in independent threads.
Queue Communication: Communication between Handlers is asynchronous and decoupled via queues.
This design achieves a highly cohesive, loosely coupled architecture that makes the system easy to extend, maintain, and test.
The post OpenAvatarChat: A Detailed Explanation of System Architecture and Handler Collaboration Mechanism appeared first on Frank Fu's Blog.
Top comments (0)