I believe the next big leap in AI will be in the robotics space, taking the reasoning power of Large Language Models (LLMs) and putting it into physical bodies. While focusing on the utility of a robot, like having it perform tasks or chores, is a compelling area of development, we often overlook the interaction between the human user and the machine itself. Rather than focusing on what the robot can do, we should also look at how a robot's movements and actions can trigger specific expectations and social norms for the person interacting with it.
Recently I’ve been experimenting with the Open Duck Mini, and while finally getting the robot to move its head, walk, and generally just do something is an amazing feeling all on its own, I wanted to take some time and explore the topics of human-robot interaction (HRI) and character development. To do this, I built a custom animation tool that helps define animations that the robot can run through, allowing the robot to move in ways that humans naturally recognize as conveying a message, such as nodding or shaking its head.
With some animations defined, I paired them with the Gemini Live API, which serves as an excellent tool for HRI by allowing people to speak directly to the robot and trigger these gestures so the machine can interact with the world in an appropriate, recognizable way. All of the code for this project, including the animation tool client, Flask API server, and playback scripts, is available on GitHub.
Note: It’s worth noting that I recorded this with all durations set to 300ms, which is why it moves quickly and then directly into the next keyframe interpolation. You can slow down animations by extending the duration of the steps along with changing the easing functions, which I’ll cover more later in this article.
For most of the middleware that I'll discuss here today, I was able to generate the code using Gemini and AI Studio to get what I wanted done quickly, helping me bridge the gap between hardware control and high-level AI reasoning without getting bogged down in the setup. This let me go from defining the idea over a few days of casual thinking, generally while I was gardening or doing something around the house, to spending less than a day creating the end project. That said, I closely orchestrated what was generated and reviewed the code to avoid unexpected hiccups.
Character First: The Disney Approach to HRI
We can learn a lot from the animators and imagineers at Disney, who I highly admire for all of the cool work they do with character creation and human-robot interaction (if you’re unfamiliar with it, definitely look up their concept of Living Characters and what they’ve done in that space; it’s kind of chaotic, but I love it). Often the characters that Disney is building are already defined in a movie or TV show, but sometimes they’re brand new and need to be fleshed out for how they interact in a given situation, what physical quirks they have, how they speak, if there is an accent or specific dialect that’s appropriate, and a whole lot of other things that need to be considered. They establish the character’s personality and the specific ways the character expresses emotions like happiness, curiosity, or frustration, and then they engineer the physical robot to embody that character.
While a robot does not strictly need a personality to function, defining these traits early in the design phase helps ease the interaction between the machine and the people around it. By establishing what personality the robot should project as it is being built, we can ensure its physical behaviors align with human social expectations. Disney's work, such as their autonomous walking Olaf robot, which they’ve released a bit on their process of getting a proper heel to toe gait, shows how combining artistic animation principles with robotics can create machines that feel alive and responsive, rather than merely mechanical.
Along with their work in robotics, I’ve had a copy of The Illusion of Life on my shelf for years, as it’s an amazing reference for general animation (I’ve been drawing as a hobby for a few years), and something I used regularly when working as an Android developer years ago. Some of the concepts, such as the various ways to express reality in cartoons, and especially the concepts around variations in movement speed that are needed when interpolating between key frames, were really useful for this project.
As this open source robot platform is based on a Star Wars droid (which, as a member of the 501st and Rebel Legion costuming charity groups, I knew I “needed” to make one), the personality can be focused on cute beeps and whirls instead of words, and with the way the two legs hold itself being similar to a duck (hence the robot name), I took some inspiration from the overly exaggerated movements of early Donald Duck animations.
Middleware: Why We Build Our Own
Designing physical gestures requires a direct link between the animator's intent and the robot's hardware. Because public animation tools for custom-built hardware are practically non-existent, I needed to develop a custom pipeline to manually position the duck, record those motor configurations as keyframes, and compile them into structured action plans that could execute in sync with audio files that enrich my robot’s response personality. To achieve this quickly, I used AI Studio to generate a custom Flask API server for interfacing with the robot and a companion applet for changing settings on the key frames.
Designing Gestures with the Animation Tool
The workflow centers around a companion applet running on a laptop, which communicates over the network with the Flask server hosted on the Raspberry Pi Zero 2W inside the robot. This applet allows me to read the live motor positions as I physically pose the robot (focused on motor IDs 30, 31, 32, and 33, which are the neck and head motors on the Open Duck Mini), decide what other emotive features should be used, such as eye and projector lights and antenna orientations, and save the entire sequence as a JSON action script that can later be read by my robot’s python code.
The reason this is using SSH instead of a direct USB connection to the robot is two fold: the Raspberry Pi Zero 2W has one USB port that I expanded with a small hub to get a USB microphone added (I could have used an i2s microphone, but I wanted options), but cable management is kind of a nightmare, so I wanted less attached to my robot. That USB microphone also requires that the robot be in a USB host orientation, rather than peripheral, so instead of messing up my working configuration, I just went for an all software approach. I’m also able to take the robot away from home and work on it because I configured the Pi to connect to my home network, and if it doesn’t find that network, it turns on its own hotspot that I can connect to for creating a connection from my laptop.
Here is an example of an action script generated by the tool with the pose in a neutral position, though each frame becomes another node in the keyframes array:
{
"keyframes": [
{
"id": "mpzph7m3gpw97yqw2c",
"durationMs": 300,
"pauseMs": 200,
"motors": {
"30": 0.5905826033359718,
"31": 0.3988350048502669,
"32": -0.18867963690993372,
"33": -0.15186409800067846
},
"lightsOn": false,
"projectorOn": false,
"interpolation": "bezier",
"antennas": {
"left": "center",
"right": "center"
}
},
...
],
"globalSound": "beep1.wav"
}
The script defines keyframes with target motor positions, lights, and antenna angles. It also specifies a globalSound (like beep1.wav) to play alongside the movement.
The Flask API Server
The API server running on the Raspberry Pi serves as the bridge between the high-level animation design and the physical hardware. It exposes endpoints to read the current motor positions during the recording phase and to execute the playbacks of the saved JSON files for testing (there’s a separate Python code snippet for directly playing back an animation during regular robot use).
Again, this code was generated, but I planned out what it should look like and reviewed it for quality, so let’s check it out in smaller chunks.
1. Initialization and Setup
First, we set up the Flask application, enable Cross-Origin Resource Sharing (CORS) so the laptop applet can communicate with it, and initialize the hardware interface (HWI) for the motors that we’ll be reading. We also define the GPIO pins for the LEDs, projector/flashlight, speaker for audio playback, and small servos that control the duck's antennas.
from flask import Flask, request, jsonify
from flask_cors import CORS
from mini_bdx_runtime.duck_config import DuckConfig
from mini_bdx_runtime.rustypot_position_hwi import HWI
from gpiozero import LED, AngularServo
import time
app = Flask(__name__)
CORS(app)
config = DuckConfig()
hwi = HWI(config)
motor_ids = [30, 31, 32, 33]
SPEAKER_INDEX = 0
led1 = LED(23)
led2 = LED(24)
projector = LED(25)
servo_left = AngularServo(12, min_angle=-90, max_angle=90)
servo_right = AngularServo(13, min_angle=-90, max_angle=90)
Note on Imports: You’ll notice we are importing from
mini_bdx_runtime. This is the core runtime package from the original BDX robot project. It wraps the low-level motor communication (usingrustypotto talk to the Dynamixel Protocol 2.0 used by the Feetech servos), saving us from writing raw serial byte packets ourselves.
2. Hardware Helper and Dampening Functions
Next, we define helper functions to control the physical components, a mathematical helper for Bezier interpolation, and hardware dampening functions.
While Bezier interpolation in software handles the mathematical path, we also dynamically adjust the motors' internal hardware limits (acceleration and velocity profiles) using the rustypot library. This allows us to create effects like "viscous" (sluggish, heavy) or "clamped" (snappy but smoothed) movements by writing directly to the motor registers.
def set_lights(on):
if on:
led1.on()
led2.on()
else:
led1.off()
led2.off()
def set_projector(on):
if on:
projector.on()
else:
projector.off()
def get_antenna_angle(pos_str):
if pos_str == 'back': return -90
if pos_str == 'forward': return 90
return 0
def set_antennas(positions):
servo_left.angle = get_antenna_angle(positions.get('left', 'center'))
servo_right.angle = get_antenna_angle(positions.get('right', 'center'))
ADDR_PROFILE_ACCEL = 108
ADDR_PROFILE_VELOC = 112
def set_hardware_dampening(accel, veloc):
try:
for mid in motor_ids:
if hasattr(hwi.io, 'write_data'):
hwi.io.write_data(mid, ADDR_PROFILE_ACCEL, accel, 4)
hwi.io.write_data(mid, ADDR_PROFILE_VELOC, veloc, 4)
except Exception as e:
pass
def apply_interpolation_dampening(interp_type):
if interp_type == 'linear':
set_hardware_dampening(0, 400)
elif interp_type == 'bezier_viscous':
set_hardware_dampening(10, 200)
elif interp_type == 'bezier_clamped':
set_hardware_dampening(40, 500)
else: # bezier
set_hardware_dampening(30, 400)
def bezier_interpolate(t, type_str):
if type_str == 'linear':
return t
if type_str == 'bezier':
return t * t * (3.0 - 2.0 * t) # Standard smoothstep (cubic Hermite)
if type_str == 'bezier_viscous':
return t * t * t * (t * (t * 6.0 - 15.0) + 10.0) # Quintic ease-in-out (smoother start/end)
if type_str == 'bezier_clamped':
return 1.0 - (1.0 - t) ** 3 # Cubic ease-out (fast start, slow end)
return t
3. Reading Motor Positions
To record keyframes in the applet, we need to read the current state of the robot. The /read endpoint queries the hardware interface and returns the current positions of the four head motors.
@app.route('/read', methods=['GET'])
def read_pos():
try:
positions = hwi.io.read_present_position(motor_ids)
res = {str(mid): pos for mid, pos in zip(motor_ids, positions)}
return jsonify(res)
except Exception as e:
return jsonify({"error": str(e)}), 500
4. The Playback Loop (Gestures & Audio)
The /play endpoint is where the magic happens. It receives the JSON action script, triggers the audio playback (assuming there’s an associated audio file) asynchronously (using sounddevice), and then steps through the keyframes, interpolating motor positions at a strict 30Hz update rate to prevent jitter.
@app.route('/play', methods=['POST'])
def play_macro():
data = request.json
keyframes = data.get('keyframes', [])
global_sound = data.get('globalSound', '')
# Handle Audio Playback Asynchronously
if global_sound:
try:
import sounddevice as sd
import soundfile as sf
import numpy as np
import os
sound_path = f"assets/{global_sound}"
if os.path.exists(sound_path):
audio_data, fs = sf.read(sound_path, dtype='float32')
if len(audio_data.shape) == 1:
audio_data = audio_data.reshape(-1, 1)
audio_data = np.tile(audio_data, (1, 2))
audio_data = audio_data * 2.0
sd.play(audio_data, fs, device=SPEAKER_INDEX)
else:
print(f"Sound file not found: {sound_path}")
except Exception as e:
print(f"Failed to play sound: {e}")
# Motor Sequencing Loop
try:
start_positions = hwi.io.read_present_position(motor_ids)
current_positions = {str(mid): pos for mid, pos in zip(motor_ids, start_positions)}
except:
current_positions = {str(mid): 0 for mid in motor_ids}
try:
hwi.io.enable_torque(motor_ids)
except:
pass
for frame in keyframes:
set_lights(frame.get('lightsOn', False))
set_projector(frame.get('projectorOn', False))
set_antennas(frame.get('antennas', {}))
dur_sec = frame.get('durationMs', 1000) / 1000.0
steps = max(1, int(dur_sec * 30)) # 30Hz update rate
interp = frame.get('interpolation', 'linear')
apply_interpolation_dampening(interp)
target_motors = frame.get('motors', {})
start_frame_pos = current_positions.copy()
start_time = time.time()
for step in range(1, steps + 1):
t = step / float(steps)
eased_t = bezier_interpolate(t, interp)
step_targets = []
for mid in motor_ids:
s_val = start_frame_pos.get(str(mid), current_positions.get(str(mid), 0))
end_val = target_motors.get(str(mid), target_motors.get(mid, s_val))
val = s_val + (end_val - s_val) * eased_t
step_targets.append(float(val))
current_positions[str(mid)] = val
try:
hwi.io.write_goal_position(motor_ids, step_targets)
except Exception as e:
print(f"HW error on step {step}: {e}")
# Strict timing to prevent jitter and stalls
target_time = start_time + step * (dur_sec / steps)
now = time.time()
if target_time > now:
time.sleep(target_time - now)
pause_sec = frame.get('pauseMs', 0) / 1000.0
if pause_sec > 0:
time.sleep(pause_sec)
# Reset hardware to default state after animation
set_lights(False)
set_projector(False)
set_antennas({'left': 'center', 'right': 'center'})
try:
hwi.io.disable_torque(motor_ids)
except:
pass
return jsonify({"success": True})
if __name__ == '__main__':
app.run(host='0.0.0.0', port=5000)
Hardware Dampening vs. Software Timing
You might wonder: if we are already calculating Bezier curves in Python, why do we need to write to the motor's hardware registers?
In robotics, the best results come from combining Software Timing and Hardware Dampening:
Software Timing (Duration & Easing): The Python loop computes precise, time-synchronized positions (e.g., at 30Hz) and sends them to the motors. This ensures that all joints (like neck and head) arrive at their targets at the exact same millisecond, keeping the motion coordinated. It also allows for complex easing curves (like overshoot or anticipation) that hardware cannot calculate on its own.
Hardware Dampening (Smoothing & Physics): The Feetech STS3215 motors have internal PID controllers and motion profiles. By writing to Profile Acceleration (Address 108) and Profile Velocity (Address 112) using the rustypot library, we tune how the hardware physically reacts. This acts as a physical low-pass filter, rounding off any micro-jitters caused by USB-to-Serial (how the Raspberry Pi Zero connects to the motors) communication latency and preventing the motors from buzzing or jerking.
For example, bezier_viscous caps the hardware acceleration and velocity, forcing the motor to "drag" behind the software commands, creating a heavy, fluid, "underwater" feel. Conversely, bezier_clamped allows aggressive tracking for sharp movements but maintains enough acceleration limit to protect the gears from grinding. With these different interpolators in play, you will need to be considerate of how long you specify the motors to take to reach their position, or provide enough of a post-keyframe wait period for things to catch up so that animations aren’t cut short during playback.
Real-Time Interaction with the Gemini Live API
To transform these static gestures into responsive behaviors, I integrated the Gemini Live API into a python script running on the robot. The Live API is particularly well-suited for human-robot interaction because it supports low-latency, low-overhead bidirectional streaming of audio, allowing the user to speak naturally to the machine.
Instead of configuring the model to return voice responses, I structured the interaction so that the robot communicates entirely through physical actions. By utilizing system instructions and function calling, I instructed Gemini to act as a silent physical agent that translates the user's speech into gesture triggers.
The model is configured with the following system instruction:
"You are a physical robot. Do not speak. Respond ONLY using tools. Use trigger_action with 'yes' to agree, 'no' to disagree, or 'beep1' to acknowledge neutrally."
This setup ensures that when the user speaks, Gemini evaluates the input and selects the most appropriate gesture to trigger, reinforcing social norms, like nodding to show agreement, without relying on synthetic speech. In a more robust project, we could add dozens of animations for different emotions and moods to truly give the robot a specific personality.
Let's look at how the Live API integration script is constructed.
1. Imports, Hardware, Audio, and Dampening Setup
We start by importing the necessary libraries (including the google-genai SDK), configuring the audio parameters, and setting up the hardware dampening registers and helper functions. The script needs to handle both microphone input and speaker output, as well as managing physical motor dampening on the fly.
import asyncio
import os
import sys
import json
import time
import pyaudio
import numpy as np
import soundfile as sf
from google import genai
from google.genai import types
from mini_bdx_runtime.duck_config import DuckConfig
from mini_bdx_runtime.rustypot_position_hwi import HWI
from gpiozero import LED, AngularServo
config_hw = DuckConfig()
hwi = HWI(config_hw)
motor_ids = [30, 31, 32, 33]
MIC_INDEX = 1
SPEAKER_INDEX = 0
led1, led2, projector = LED(23), LED(24), LED(25)
servo_left = AngularServo(12, min_angle=-90, max_angle=90)
servo_right = AngularServo(13, min_angle=-90, max_angle=90)
FORMAT = pyaudio.paInt16
CHANNELS = 1
HW_SAMPLE_RATE = 48000
API_SEND_RATE = 16000
CHUNK_SIZE = 1024
MIC_FACTOR = 3
pya = pyaudio.PyAudio()
audio_queue_mic = asyncio.Queue(maxsize=5)
audio_queue_output = asyncio.Queue()
# Register addresses for DYNAMIXEL Protocol 2.0 Dampening (used by Feetech STS3215)
ADDR_PROFILE_ACCEL = 108
ADDR_PROFILE_VELOC = 112
def set_hardware_dampening(accel, veloc):
"""Writes to motor registers to handle hardware-level smoothing."""
try:
for mid in motor_ids:
if hasattr(hwi.io, 'write_data'):
hwi.io.write_data(mid, ADDR_PROFILE_ACCEL, accel, 4)
hwi.io.write_data(mid, ADDR_PROFILE_VELOC, veloc, 4)
except Exception:
pass
def apply_interpolation_dampening(interp_type):
"""Sets different hardware snappiness based on the JSON interpolation."""
if interp_type == 'linear':
set_hardware_dampening(0, 400)
elif interp_type == 'bezier_viscous':
set_hardware_dampening(10, 200)
elif interp_type == 'bezier_clamped':
set_hardware_dampening(40, 500)
else: # standard bezier
set_hardware_dampening(30, 400)
def bezier_interpolate(t, type_str):
if type_str == 'linear':
return t
if type_str == 'bezier':
return t * t * (3.0 - 2.0 * t)
if type_str == 'bezier_viscous':
return t * t * t * (t * (t * 6.0 - 15.0) + 10.0)
if type_str == 'bezier_clamped':
return 1.0 - (1.0 - t) ** 3
return t
2. Live API Configuration
Here we initialize the Gemini client and define the LiveConnectConfig object. We pass our system instruction and define the trigger_action tool, which tells the Gemini Live API what actions it is able to perform.
api_key = os.environ.get("GEMINI_API_KEY")
client = genai.Client(api_key=api_key, http_options={'api_version': 'v1alpha'})
live_config = types.LiveConnectConfig(
response_modalities=["AUDIO"],
system_instruction=types.Content(parts=[types.Part(text=(
"You are a physical robot. Do not speak. Respond ONLY using tools. "
"Use trigger_action with 'yes' to agree, 'no' to disagree, or 'beep1' to acknowledge neutrally."
))]),
tools=[types.Tool(function_declarations=[
types.FunctionDeclaration(
name="trigger_action",
description="Triggers a robot animation and sound.",
parameters=types.Schema(
type="OBJECT",
properties={"action_name": types.Schema(type="STRING", description="Action name like 'yes' or 'beep1'")},
required=["action_name"]
)
)
])]
)
Note: You'll need to install the dependency via
pip install -q -U google-genaiand export your API key withexport GEMINI_API_KEY="your_key". You can find more information on Gemini API keys here.
3. Audio Input Handling
These two asynchronous tasks handle capturing audio from the physical microphone and sending it to the active Gemini Live session. Since the API expects 16kHz audio, but my mic runs at 48kHz, I downsample the audio in listen_audio by slicing the numpy array. As this is specific to my own hardware modification, your own project may need some modifications here.
async def listen_audio():
stream = await asyncio.to_thread(
pya.open, format=FORMAT, channels=CHANNELS, rate=HW_SAMPLE_RATE,
input=True, input_device_index=MIC_INDEX, frames_per_buffer=CHUNK_SIZE * MIC_FACTOR
)
while True:
data = await asyncio.to_thread(stream.read, CHUNK_SIZE * MIC_FACTOR, exception_on_overflow=False)
audio_array = np.frombuffer(data, dtype=np.int16)
resampled = audio_array[::MIC_FACTOR].tobytes()
await audio_queue_mic.put({"data": resampled, "mime_type": "audio/pcm"})
async def send_realtime(session):
while True:
msg = await audio_queue_mic.get()
await session.send_realtime_input(audio=msg)
If you are running this on a Pi and get PyAudio errors, it is likely because
MIC_INDEXorSPEAKER_INDEXdoesn't match your hardware. For example, my USB microphone and speaker index swapped at some point when running two pieces of hardware, so I had to go back and update my code values. You can find your specific indexes by running a quick Python script on the Pi:import pyaudio p = pyaudio.PyAudio() for i in range(p.get_device_count()): print(f"Index {i}: {p.get_device_info_by_index(i)['name']}")
python
Just run that, look for your USB Mic and Speaker names, and update the index numbers in the setup config
4. Receiving Responses and Dispatching Actions
The receive_and_trigger task listens for responses from Gemini. When the model decides to trigger an action, it sends a tool call. The script intercepts this, spawns a background thread to play the animation JSON that was generated via the animation tool, and sends a success response back to Gemini.
async def receive_and_trigger(session):
while True:
turn = session.receive()
async for response in turn:
if response.tool_call:
for call in response.tool_call.function_calls:
if call.name == "trigger_action":
action = call.args.get("action_name")
asyncio.create_task(asyncio.to_thread(play_animation_task, action))
await session.send_tool_response(
function_responses=[types.FunctionResponse(
id=call.id, name=call.name, response={"status": "ok"}
)]
)
sc = response.server_content
if sc and sc.input_transcription:
print(f"You: {sc.input_transcription.text}")
5. Playing Animations and Audio Output
When an action is triggered, play_animation_task loads the corresponding JSON file, handles the motor sequencing, and loads the audio file. Because the Live API connection is active, we use a shared speaker queue (audio_queue_output) and a unified play_audio task to write the bytes to the speaker hardware.
Because this is a longer bit of code, let’s break it down into its key components.
A. Loading the Action Plan
First, the function locates and loads the JSON file that defines the animation. If the file is missing, it exits early to prevent crashes.
def play_animation_task(action_name):
"""Parses JSON, triggers motors, and queues audio for the shared speaker task."""
try:
json_path = f"assets/{action_name}.json"
if not os.path.exists(json_path):
print(f"File not found: {json_path}")
return
with open(json_path, 'r') as f:
data = json.load(f)
B. Audio Processing and Queuing
If the animation has an associated sound, we load the WAV file. Because our hardware might require a different sample rate than the file, we perform real-time resampling. We also apply digital normalization and a 3x gain boost so the duck can be heard over the motor noise before converting the audio to PCM bytes and pushing it to the output queue.
global_sound = data.get('globalSound')
if global_sound:
sound_path = f"assets/{global_sound}"
if os.path.exists(sound_path):
# Read file as float32 for high-quality gain math
audio_data, samplerate = sf.read(sound_path, dtype='float32')
if len(audio_data.shape) > 1: audio_data = audio_data[:, 0]
# Resample to hardware 48kHz
if samplerate != HW_SAMPLE_RATE:
duration = len(audio_data) / samplerate
audio_data = np.interp(
np.linspace(0, len(audio_data), int(duration * HW_SAMPLE_RATE)),
np.arange(len(audio_data)),
audio_data
)
# Digital Normalization + Volume Boost
max_val = np.max(np.abs(audio_data))
if max_val > 0: audio_data = (audio_data / max_val)
audio_data = audio_data * 3.0 # Strong 3x Gain
final_pcm = (audio_data * 32767).clip(-32768, 32767).astype(np.int16)
audio_queue_output.put_nowait(final_pcm.tobytes())
print(f"ROBOT ACTION: {action_name} (Audio: {global_sound})")
C. Motor Initialization
Before we can move the motors, we need to enable torque and read their current positions so we know where to start the interpolation.
keyframes = data.get('keyframes', [])
try:
hwi.io.enable_torque(motor_ids)
start_pos = hwi.io.read_present_position(motor_ids)
current_pos = {str(mid): pos for mid, pos in zip(motor_ids, start_pos)}
except: current_pos = {str(mid): 0 for mid in motor_ids}
D. The Sequencing Loop
This is the core loop. For each keyframe, we set the peripheral states (lights, projector, antennas) and calculate the intermediate motor positions using Bezier interpolation. The loop runs at a target rate, sleeping as necessary to maintain strict timing and prevent jitter.
for frame in keyframes:
set_lights(frame.get('lightsOn', False))
set_projector(frame.get('projectorOn', False))
set_antennas(frame.get('antennas', {}))
interp = frame.get('interpolation', 'linear')
apply_interpolation_dampening(interp)
dur = frame.get('durationMs', 1000) / 1000.0
steps = max(1, int(dur * 30))
target_motors = frame.get('motors', {})
start_frame_pos = current_pos.copy()
start_t = time.time()
for step in range(1, steps + 1):
t = step / float(steps)
eased_t = bezier_interpolate(t, interp)
step_targets = []
for mid in motor_ids:
s_val = start_frame_pos.get(str(mid), current_pos.get(str(mid), 0))
e_val = target_motors.get(str(mid), s_val)
val = s_val + (e_val - s_val) * eased_t
step_targets.append(float(val))
current_pos[str(mid)] = val
try: hwi.io.write_goal_position(motor_ids, step_targets)
except: pass
target_time = start_t + step * (dur / steps)
now = time.time()
if target_time > now: time.sleep(target_time - now)
if frame.get('pauseMs', 0) > 0: time.sleep(frame['pauseMs'] / 1000.0)
E. Cleanup and Reset
Once the animation completes, we turn off the lights and projector, center the antennas, and disable torque on the motors to prevent overheating. This is where you could change these defaults if your own application required it.
# Reset components after animation
set_lights(False); set_projector(False); set_antennas({'left': 'center', 'right': 'center'})
try: hwi.io.disable_torque(motor_ids)
except: pass
except Exception as e:
print(f"Action Error: {e}")
6. The Unified Speaker Task and Main Loop
Finally, the play_audio task continuously monitors audio_queue_output and writes any outgoing sound bytes to the speaker hardware. The run function connects the session and orchestrates all the concurrent tasks.
async def play_audio():
"""Unified speaker task for all robot sounds."""
stream = await asyncio.to_thread(
pya.open, format=FORMAT, channels=CHANNELS, rate=HW_SAMPLE_RATE,
output=True, output_device_index=SPEAKER_INDEX
)
while True:
bytestream = await audio_queue_output.get()
await asyncio.to_thread(stream.write, bytestream)
async def run():
try:
async with client.aio.live.connect(
model="gemini-3.1-flash-live-preview", config=live_config
) as live_session:
print("Robot Ready. Monitoring mic...")
async with asyncio.TaskGroup() as tg:
tg.create_task(send_realtime(live_session))
tg.create_task(listen_audio())
tg.create_task(receive_and_trigger(live_session))
tg.create_task(play_audio())
except Exception as e: print(f"Error: {e}")
finally: pya.terminate()
if __name__ == "__main__":
try: asyncio.run(run())
except KeyboardInterrupt: pass
Future Explorations
While the Open Duck Mini is a small-scale robot, the engineering challenges involved in this project, such as latency management, audio resampling, real-time synchronization, and hardware-software co-design, are ideal for learning about the challenges faced in large-scale robotics development.
Using generative AI to develop the necessary middleware allowed me to build a complete HRI pipeline in a fraction of the time it would have taken traditionally. By combining flexible models like Gemini with custom physical gestures, we can start building robots that don't just execute commands, but participate in social spaces in a recognizable way.
One thing I would like to do at some point is start modifying this robot to be more robust. I want to put neopixel lights in for the eyes to support various colors based on the robot’s ‘mood’, upgrade from the Raspberry Pi Zero 2w to a Raspberry Pi 5 or Jetson Orin Nano, potentially replace the motors with 12v motors (including the necessary power upgrades). At that point though I may as well build a new BDX droid, which my wallet wouldn’t appreciate, but it’s so cool, so we’ll see :)
I'm looking forward to expanding this setup, and I hope this encourages other makers to explore the intersection of LLMs and physical embodiment. To this end, the next tools to explore would be the Gemini Robotics-ER model for perception and task orchestration, as well as the MuJoCo simulation tool.

Top comments (0)