Call Gemini Realtime API with Audio Input/Output
info
Requires LiteLLM Proxy v1.70.1+
- Setup config.yaml for LiteLLM Proxy
model_list:
- model_name: "gemini-2.0-flash"
litellm_params:
model: gemini/gemini-2.0-flash-live-001
model_info:
mode: realtime
- Start LiteLLM Proxy
litellm-proxy start
- Run test script
import asyncio
import websockets
import json
import base64
from dotenv import load_dotenv
import wave
import base64
import soundfile as sf
import sounddevice as sd
import io
import numpy as np
# Load environment variables
OPENAI_API_KEY = "sk-1234" # Replace with your LiteLLM API key
OPENAI_API_URL = 'ws://{PROXY_URL}/v1/realtime?model=gemini-2.0-flash' # REPLACE WITH `wss://{PROXY_URL}/v1/realtime?model=gemini-2.0-flash` for secure connection
WAV_FILE_PATH = "/path/to/audio.wav" # Replace with your .wav file path
async def send_session_update(ws):
session_update = {
"type": "session.update",
"session": {
"conversation_id": "123456",
"language": "en-US",
"transcription_mode": "fast",
"modalities": ["text"]
}
}
await ws.send(json.dumps(session_update))
async def send_audio_file(ws, file_path):
with wave.open(file_path, 'rb') as wav_file:
chunk_size = 1024 # Adjust as needed
while True:
chunk = wav_file.readframes(chunk_size)
if not chunk:
break
base64_audio = base64.b64encode(chunk).decode('utf-8')
audio_message = {
"type": "input_audio_buffer.append",
"audio": base64_audio
}
await ws.send(json.dumps(audio_message))
await asyncio.sleep(0.1) # Add a small delay to simulate real-time streaming
# Send end of audio stream message
await ws.send(json.dumps({"type": "input_audio_buffer.end"}))
def play_base64_audio(base64_string, sample_rate=24000, channels=1):
# Decode the base64 string
audio_data = base64.b64decode(base64_string)
# Convert to numpy array
audio_np = np.frombuffer(audio_data, dtype=np.int16)
# Reshape if stereo
if channels == 2:
audio_np = audio_np.reshape(-1, 2)
# Normalize
audio_float = audio_np.astype(np.float32) / 32768.0
# Play the audio
sd.play(audio_float, sample_rate)
sd.wait()
def combine_base64_audio(base64_strings):
# Step 1: Decode base64 strings to binary
binary_data = [base64.b64decode(s) for s in base64_strings]
# Step 2: Concatenate binary data
combined_binary = b''.join(binary_data)
# Step 3: Encode combined binary back to base64
combined_base64 = base64.b64encode(combined_binary).decode('utf-8')
return combined_base64
async def listen_in_background(ws):
combined_b64_audio_str = []
try:
while True:
response = await ws.recv()
message_json = json.loads(response)
print(f"message_json: {message_json}")
if message_json['type'] == 'response.audio.delta' and message_json.get('delta'):
play_base64_audio(message_json["delta"])
except Exception:
print("END OF STREAM")
async def main():
async with websockets.connect(
OPENAI_API_URL,
additional_headers={
"Authorization": f"Bearer {OPENAI_API_KEY}",
"OpenAI-Beta": "realtime=v1"
}
) as ws:
asyncio.create_task(listen_in_background(ws=ws))
await send_session_update(ws)
await send_audio_file(ws, WAV_FILE_PATH)
if __name__ == "__main__":
asyncio.run(main())