agent-2 zeigt, wie du einen Avaluma-Avatar aus einer externen Audioquelle steuerst und dabei die LiveKit-Agents-Sprach-Pipeline vollständig umgehst. Statt durch STT, LLM und TTS zu leiten, verbindest du dich als unabhängiger Teilnehmer mit dem LiveKit-Raum und streamst rohe PCM-Audio-Bytes direkt über das DataStream-Topic lk.audio_stream an den Avatar. Dieses Muster gibt dir volle Kontrolle über den Audioinhalt — nutze deinen eigenen TTS-Dienst, spiele vorab aufgenommene Dateien ab oder leite eine beliebige Audioquelle ein.
Wann dieses Muster verwenden
Nutze das Muster für externes Audio, wenn:
- Du einen bestehenden TTS- oder Audio-Generierungsdienst behalten möchtest
- Du vorab aufgenommene Audiosegmente über den Avatar abspielen musst
- Du den Avatar unabhängig von einer
AgentSession-Pipeline steuern willst
- Du Avaluma in ein System integrierst, das sein eigenes Audio-Routing verwaltet
Pipeline
WAV-Datei → DataStream (lk.audio_stream) → Avaluma-Avatar → Video-Stream
│
Avatar-Server
(animiert .hvia-Datei)
Setup
Avatar-ID setzen
Öffne agents/2-agent-with-external-audio/agent-2.py und setze avatar_id auf den Namen deiner .hvia-Datei ohne Endung:avatar_id = "your-avatar-id"
Agent starten
Starte livekit-agent-2 mit Docker Compose:docker compose up livekit-agent-2 -d
Dieser Service mountet drei zusätzliche Pfade im Vergleich zu livekit-agent-1: das Agent-Skript, das externe Sender-Modul und das assets/-Verzeichnis mit Beispiel-WAV-Dateien.
Agent-Code
agent-2.py richtet AvatarSession und AgentSession ein und startet dann den externen Audio-Sender als asyncio-Task für die lokale Simulation:
import asyncio
import os
import external_audio_sender
from avaluma_livekit_plugin import AvatarSession
from dotenv import load_dotenv
from livekit.agents import (
Agent,
AgentSession,
JobContext,
WorkerOptions,
cli,
)
load_dotenv(".env.local")
agent_name = "agent-2"
avatar_id = "260218-Avaluma_Avatar_Kadda_v5"
license_key = os.getenv("AVALUMA_LICENSE_KEY", "")
avatar_server_url = os.getenv("AVATAR_SERVER_URL", "https://api.avaluma.ai")
class Assistant(Agent):
def __init__(self) -> None:
super().__init__(
instructions="""You are a helpful virtual AI assistant. The user is interacting with you via voice, even if you perceive the conversation as text.
You eagerly assist users with their questions by providing information from your extensive knowledge.
Your responses are concise, to the point, and without any complex formatting or punctuation including emojis, asterisks, or other symbols.
You are curious, friendly, and have a sense of humor.""",
)
async def entrypoint(ctx: JobContext):
ctx.log_context_fields = {
"room": ctx.room.name,
}
session = AgentSession()
if license_key is None:
raise ValueError("AVALUMA_LICENSE_KEY is not set")
avatar = AvatarSession(
license_key=license_key, # Your License Key
avatar_id=avatar_id, # Avatar identifier (Name of .hvia file)
avatar_server_url=avatar_server_url,
)
await avatar.start(agent_session=session, room=ctx.room)
await session.start(
agent=Assistant(),
room=ctx.room,
)
await ctx.connect()
# Audio from an external service via DataStream, bypassing AgentSession.
# In production, external_audio_sender.run() would live in a separate process/service
# and connect to the room independently — started here only for local simulation.
asyncio.create_task(external_audio_sender.run(ctx.room.name))
if __name__ == "__main__":
cli.run_app(
WorkerOptions(
entrypoint,
agent_name=agent_name,
)
)
Externer Audio-Sender
external_audio_sender.py ist das Herzstück des Musters. Er verbindet sich als unabhängiger Teilnehmer mit dem LiveKit-Raum und streamt alle paar Sekunden eine WAV-Datei per DataStream an den Avatar:
"""
Simulates an external audio service that sends audio directly to the avatar
via LiveKit DataStream — without any access to AgentSession or the agent pipeline.
In production this would be a completely separate process or microservice that
independently connects to the LiveKit room and streams audio to the avatar.
"""
import asyncio
import os
import uuid
import wave
from livekit import api, rtc
AUDIO_STREAM_TOPIC = "lk.audio_stream"
WAV_PATH = os.path.join(os.path.dirname(__file__), "assets/hello_world_16kHz.wav")
def _load_wav_data(path: str) -> tuple[bytes, int, int]:
with wave.open(path, "rb") as wf:
sample_rate = wf.getframerate()
num_channels = wf.getnchannels()
raw_data = wf.readframes(wf.getnframes())
return raw_data, sample_rate, num_channels
async def _wait_for_avatar(room: rtc.Room) -> rtc.RemoteParticipant:
"""Return the avatar participant as soon as it appears in the room."""
for p in room.remote_participants.values():
if p.identity.startswith("avatar-"):
return p
loop = asyncio.get_running_loop()
fut: asyncio.Future[rtc.RemoteParticipant] = loop.create_future()
def _on_participant_connected(participant: rtc.RemoteParticipant) -> None:
if participant.identity.startswith("avatar-") and not fut.done():
fut.set_result(participant)
room.on("participant_connected", _on_participant_connected)
try:
return await fut
finally:
room.off("participant_connected", _on_participant_connected)
async def run(room_name: str, interval: float = 5.0) -> None:
"""
Create a token, connect to the LiveKit room as an independent participant,
and send WAV audio directly to the avatar via DataStream every `interval` seconds.
This bypasses AgentSession entirely — the token is the only thing needed.
"""
livekit_url = os.getenv("LIVEKIT_URL", "")
livekit_api_key = os.getenv("LIVEKIT_API_KEY", "")
livekit_api_secret = os.getenv("LIVEKIT_API_SECRET", "")
token = (
api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret)
.with_identity("external-audio-sender")
.with_name("External Audio Sender")
.with_grants(api.VideoGrants(room_join=True, room=room_name))
.with_kind("agent") # required — the avatar only accepts streams from agent-kind participants
.to_jwt()
)
room = rtc.Room()
await room.connect(livekit_url, token)
room.register_byte_stream_handler("lk.agent.session", lambda _reader, _identity: None)
raw_data, sample_rate, num_channels = _load_wav_data(WAV_PATH)
avatar = await _wait_for_avatar(room)
while True:
await asyncio.sleep(interval)
writer = await room.local_participant.stream_bytes(
name=f"AUDIO_{uuid.uuid4().hex[:8]}",
topic=AUDIO_STREAM_TOPIC,
destination_identities=[avatar.identity],
attributes={
"sample_rate": str(sample_rate),
"num_channels": str(num_channels),
},
)
await writer.write(raw_data)
await writer.aclose() # closing the stream signals the avatar to render the segment
DataStream-Audio-Protokoll
Der externe Sender kommuniziert mit dem Avatar über LiveKit-DataStream nach dem Protokoll, das AvatarSession erwartet. Verwende diese Werte exakt, wenn du deinen eigenen Sender baust:
| Eigenschaft | Wert | Hinweise |
|---|
| Topic | lk.audio_stream | Muss exakt diese Zeichenkette sein |
| Ziel | Identität des Avatar-Teilnehmers | Identität beginnt immer mit avatar- — nutze _wait_for_avatar(), um sie zu finden |
Attribut: sample_rate | String, z. B. "16000" | Aus deiner Audioquelle gelesen; muss zu den tatsächlichen PCM-Daten passen |
Attribut: num_channels | String, z. B. "1" | Aus deiner Audioquelle gelesen |
| Payload | Rohe PCM-Bytes | Bytes direkt übergeben — keine Kodierung nötig |
| Ende eines Segments | await writer.aclose() | Das Schließen des Stream-Writers signalisiert dem Avatar, das Segment zu rendern |
Token-Anforderungen
Der Sender-Teilnehmer muss with_kind("agent") beim Erstellen seines LiveKit-Access-Tokens verwenden. Der Avatar akzeptiert nur Audio-Streams von Teilnehmern mit Agent-Art — Streams von anderen Teilnehmertypen werden ignoriert.
token = (
api.AccessToken(api_key=livekit_api_key, api_secret=livekit_api_secret)
.with_identity("external-audio-sender")
.with_grants(api.VideoGrants(room_join=True, room=room_name))
.with_kind("agent") # required
.to_jwt()
)
Auf den Avatar-Teilnehmer warten
Vor dem Streamen musst du die Teilnehmer-Identität des Avatars auflösen. Der Helfer _wait_for_avatar() deckt beide Fälle ab — der Avatar ist bereits beigetreten oder tritt erst bei, nachdem dein Sender verbunden hat:
async def _wait_for_avatar(room: rtc.Room) -> rtc.RemoteParticipant:
"""Return the avatar participant as soon as it appears in the room."""
for p in room.remote_participants.values():
if p.identity.startswith("avatar-"):
return p
loop = asyncio.get_running_loop()
fut: asyncio.Future[rtc.RemoteParticipant] = loop.create_future()
def _on_participant_connected(participant: rtc.RemoteParticipant) -> None:
if participant.identity.startswith("avatar-") and not fut.done():
fut.set_result(participant)
room.on("participant_connected", _on_participant_connected)
try:
return await fut
finally:
room.off("participant_connected", _on_participant_connected)
Im agent-2-Beispiel wird external_audio_sender.run() der Einfachheit halber während der lokalen Entwicklung als asyncio-Task innerhalb des Agent-Prozesses gestartet. In der Produktion betreibst du den externen Sender als völlig separaten Dienst mit eigenem LiveKit-Token — er benötigt keinen Zugriff auf den Agent-Prozess oder die AgentSession. Er braucht nur den Raumnamen und gültige LiveKit-Zugangsdaten.