agent-2 demonstrates how to drive an Avaluma avatar from an external audio source, completely bypassing the LiveKit Agents voice pipeline. Instead of routing through STT, LLM, and TTS, you connect to the LiveKit room as an independent participant and stream raw PCM audio bytes directly to the avatar using the lk.audio_stream DataStream topic. This pattern gives you full control over the audio content — use your own TTS service, play pre-recorded files, or pipe in any audio source you choose.
When to Use This Pattern
Use the external audio pattern when:
- You have an existing TTS or audio generation service you want to keep
- You need to play pre-recorded audio segments through the avatar
- You want to drive the avatar independently of an
AgentSession pipeline
- You are integrating Avaluma into a system that already manages its own audio routing
Pipeline
WAV file → DataStream (lk.audio_stream) → Avaluma Avatar → Video stream
│
Avatar Server
(animates .hvia file)
Setup
Set your avatar ID
Open agents/2-agent-with-external-audio/agent-2.py and set avatar_id to your .hvia filename without the extension:avatar_id = "your-avatar-id"
Start the agent
Launch livekit-agent-2 with Docker Compose:docker compose up livekit-agent-2 -d
This service mounts three additional paths compared to livekit-agent-1: the agent script, the external sender module, and the assets/ directory containing sample WAV files.
Agent Code
agent-2.py sets up the AvatarSession and AgentSession, then spawns the external audio sender as an asyncio task for local simulation:
import asyncio
import os
import external_audio_sender
from avaluma_livekit_plugin import AvatarSession
from dotenv import load_dotenv
from livekit.agents import (
Agent,
AgentSession,
JobContext,
WorkerOptions,
cli,
)
load_dotenv(".env.local")
agent_name = "agent-2"
avatar_id = "260218-Avaluma_Avatar_Kadda_v5"
license_key = os.getenv("AVALUMA_LICENSE_KEY", "")
avatar_server_url = os.getenv("AVATAR_SERVER_URL", "https://api.avaluma.ai")
class Assistant(Agent):
def __init__(self) -> None:
super().__init__(
instructions="""You are a helpful virtual AI assistant. The user is interacting with you via voice, even if you perceive the conversation as text.
You eagerly assist users with their questions by providing information from your extensive knowledge.
Your responses are concise, to the point, and without any complex formatting or punctuation including emojis, asterisks, or other symbols.
You are curious, friendly, and have a sense of humor.""",
)
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {
"room": ctx.room.name,
}
session = AgentSession()
if license_key is None:
raise ValueError("AVALUMA_LICENSE_KEY is not set")
avatar = AvatarSession(
license_key=license_key, # Your License Key
avatar_id=avatar_id, # Avatar identifier (Name of .hvia file)
avatar_server_url=avatar_server_url,
)
await avatar.start(agent_session=session, room=ctx.room)
await session.start(
agent=Assistant(),
room=ctx.room,
)
await ctx.connect()
# Audio from an external service via DataStream, bypassing AgentSession.
# In production, external_audio_sender.run() would live in a separate process/service
# and connect to the room independently — started here only for local simulation.
asyncio.create_task(external_audio_sender.run(ctx.room.name))
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint,
agent_name=agent_name,
)
)
External Audio Sender
external_audio_sender.py is the heart of the pattern. It connects to the LiveKit room as an independent participant and streams a WAV file to the avatar via DataStream every few seconds:
"""
Simulates an external audio service that sends audio directly to the avatar
via LiveKit DataStream — without any access to AgentSession or the agent pipeline.
In production this would be a completely separate process or microservice that
independently connects to the LiveKit room and streams audio to the avatar.
"""
import asyncio
import os
import uuid
import wave
from livekit import api, rtc
AUDIO_STREAM_TOPIC = "lk.audio_stream"
WAV_PATH = os.path.join(os.path.dirname(__file__), "assets/hello_world_16kHz.wav")
def _load_wav_data(path: str) -> tuple[bytes, int, int]:
with wave.open(path, "rb") as wf:
sample_rate = wf.getframerate()
num_channels = wf.getnchannels()
raw_data = wf.readframes(wf.getnframes())
return raw_data, sample_rate, num_channels
async def _wait_for_avatar(room: rtc.Room) -> rtc.RemoteParticipant:
"""Return the avatar participant as soon as it appears in the room."""
for p in room.remote_participants.values():
if p.identity.startswith("avatar-"):
return p
loop = asyncio.get_running_loop()
fut: asyncio.Future[rtc.RemoteParticipant] = loop.create_future()
def _on_participant_connected(participant: rtc.RemoteParticipant) -> None:
if participant.identity.startswith("avatar-") and not fut.done():
fut.set_result(participant)
room.on("participant_connected", _on_participant_connected)
try:
return await fut
finally:
room.off("participant_connected", _on_participant_connected)
async def run(room_name: str, interval: float = 5.0) -> None:
"""
Create a token, connect to the LiveKit room as an independent participant,
and send WAV audio directly to the avatar via DataStream every `interval` seconds.
This bypasses AgentSession entirely — the token is the only thing needed.
"""
livekit_url = os.getenv("LIVEKIT_URL", "")
livekit_api_key = os.getenv("LIVEKIT_API_KEY", "")
livekit_api_secret = os.getenv("LIVEKIT_API_SECRET", "")
token = (
api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret)
.with_identity("external-audio-sender")
.with_name("External Audio Sender")
.with_grants(api.VideoGrants(room_join=True, room=room_name))
.with_kind("agent") # required — the avatar only accepts streams from agent-kind participants
.to_jwt()
)
room = rtc.Room()
await room.connect(livekit_url, token)
room.register_byte_stream_handler("lk.agent.session", lambda _reader, _identity: None)
raw_data, sample_rate, num_channels = _load_wav_data(WAV_PATH)
avatar = await _wait_for_avatar(room)
while True:
await asyncio.sleep(interval)
writer = await room.local_participant.stream_bytes(
name=f"AUDIO_{uuid.uuid4().hex[:8]}",
topic=AUDIO_STREAM_TOPIC,
destination_identities=[avatar.identity],
attributes={
"sample_rate": str(sample_rate),
"num_channels": str(num_channels),
},
)
await writer.write(raw_data)
await writer.aclose() # closing the stream signals the avatar to render the segment
DataStream Audio Protocol
The external sender communicates with the avatar over LiveKit DataStream using the protocol that AvatarSession expects. Use these values exactly when building your own sender:
| Property | Value | Notes |
|---|
| Topic | lk.audio_stream | Must be this exact string |
| Destination | Avatar participant identity | Identity always starts with avatar- — use _wait_for_avatar() to locate it |
Attribute: sample_rate | String, e.g. "16000" | Read from your audio source; must match the actual PCM data |
Attribute: num_channels | String, e.g. "1" | Read from your audio source |
| Payload | Raw PCM bytes | Pass the bytes directly — no encoding needed |
| End of segment | await writer.aclose() | Closing the stream writer signals the avatar to flush and render the segment |
Token Requirements
The sender participant must use with_kind("agent") when creating its LiveKit access token. The avatar only accepts audio streams from participants with agent kind — streams from other participant types are ignored.
token = (
api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret)
.with_identity("external-audio-sender")
.with_grants(api.VideoGrants(room_join=True, room=room_name))
.with_kind("agent") # required
.to_jwt()
)
Waiting for the Avatar Participant
Before streaming, you must resolve the avatar’s participant identity. The _wait_for_avatar() helper handles both the case where the avatar has already joined and the case where it joins after your sender connects:
async def _wait_for_avatar(room: rtc.Room) -> rtc.RemoteParticipant:
"""Return the avatar participant as soon as it appears in the room."""
for p in room.remote_participants.values():
if p.identity.startswith("avatar-"):
return p
loop = asyncio.get_running_loop()
fut: asyncio.Future[rtc.RemoteParticipant] = loop.create_future()
def _on_participant_connected(participant: rtc.RemoteParticipant) -> None:
if participant.identity.startswith("avatar-") and not fut.done():
fut.set_result(participant)
room.on("participant_connected", _on_participant_connected)
try:
return await fut
finally:
room.off("participant_connected", _on_participant_connected)
In the agent-2 example, external_audio_sender.run() is launched as an asyncio task inside the agent process for convenience during local development. In production, run the external sender as a completely separate service with its own LiveKit token — it needs no access to the agent process or the AgentSession. The only thing it needs is the room name and valid LiveKit credentials.