OpenAI Realtime WebSocket Integration
This guide will walk you through integrating Bandwidth's Programmable Voice API with OpenAI's Realtime API via Websocket. This integration allows you to leverage OpenAI's advanced AI capabilities in your programmable call flows.
What you'll need
- A phone number associated to a Voice Configuration Package + a Programmble Voice Application
- Your OpenAI API Key
- Docker
- A publicly accessible server to host your webhook + websocket application (e.g., using ngrok)
- (Optional) Our sample application to get started: bandwidth-samples/openai-realtime-websockets-python
Call Flow
Before we dive in, let's walk through what an inbound call flow looks like with this integration.
This flow demonstrates a call that is answered by an AI agent and then transferred to a human agent. Let's break it down:
- A caller dials your Bandwidth number.
- Bandwidth sends a webhook to your server indicating an inbound call.
- Your server responds with BXML containing a
<StartStream>verb, instructing Bandwidth to start streaming audio to your Websocket Server. - Bandwidth initiates a stream event to your websocket server.
- Your websocket server establishes a session with OpenAI's Realtime API.
- Bandwidth streams the caller's audio to your websocket server, which forwards it to OpenAI.
- The caller has a conversation with the AI agent.
- When the AI agent determines that the call should be transferred to a human agent, it uses OpenAI's Transfer Tool.
- Your websocket server receives the transfer instruction and makes a
PUT /calls/{callId}request to Bandwidth to transfer the call. - Bandwidth transfers the call to the specified human agent.
- The caller continues the conversation with the human agent.
Let's Build It!
For convenience - we have provided a sample application to get you started. You can find it here: bandwidth-samples/openai-realtime-websockets-python. The sample application is built using Python and FastAPI, but you can use any language or framework that you prefer, such as NodeJS + Express or Java + Spring.
To run the sample application, simply clone the repository and follow the instructions in the README.
The following sections will walk you through the sample application code to help you understand how it works.
Setup our Environment
Lets first clone our sample application:
git clone https://github.com/Bandwidth-Samples/openai-realtime-sip-python
cd openai-realtime-websockets-python
The application provides a docker compose file to help you get started quickly, but you can also run the application via your local Python environment if you prefer.
First - ensure you have a .env file in the root of the project with the following variables:
export BW_ACCOUNT_ID="your_bw_account_id_here"
export BW_USERNAME="your_bw_username_here"
export BW_PASSWORD="your_bw_password_here"
export OPENAI_API_KEY="your_openai_api_key_here"
export TRANSFER_TO="+19195554321"
export BASE_URL="https://someNgrokId.ngrok-free.app"
export LOG_LEVEL="INFO"
export LOCAL_PORT=3000
Using Docker
docker compose up --build
Using Local Python Environment
python -m venv .venv
source .venv/bin/activate
cd app
pip install -r requirements.txt
python main.py
A successful startup should log the following:
INFO: Will watch for changes in these directories: ['/app']
INFO: Uvicorn running on http://0.0.0.0:3000 (Press CTRL+C to quit)
INFO: Started reloader process [1] using WatchFiles
INFO: Started server process [8]
INFO: Waiting for application startup.
INFO: Application startup complete.
The application runs on port 3000 by default, but can be overridden by setting the LOCAL_PORT environment variable.
Creating our FastAPI Server
The sample application uses FastAPI to create a simple web server that can handle incoming HTTP requests from OpenAI.
The sample application also provides a models directory that contains Pydantic models for the various OpenAI webhook events. We wont define what those models look like here, but you can find them in the models directory of the sample application.
# main.py
# !/usr/bin/env python3
# ...imports...
# Set our Environment Variables
try:
BW_ACCOUNT = os.environ["BW_ACCOUNT_ID"]
BW_USERNAME = os.environ["BW_USERNAME"]
BW_PASSWORD = os.environ["BW_PASSWORD"]
OPENAI_API_KEY = os.environ["OPENAI_API_KEY"]
TRANSFER_TO = os.environ["TRANSFER_TO"]
BASE_URL = os.environ["BASE_URL"]
LOG_LEVEL = os.environ["LOG_LEVEL"].upper()
LOCAL_PORT = int(os.environ.get("LOCAL_PORT", 3000))
except KeyError:
print("environment variables not set")
exit(1)
app = FastAPI()
# Health Check
@app.get("/health", status_code=http.HTTPStatus.NO_CONTENT)
def health():
return
# Handle Inbound Call Event from Bandwidth
@app.post("/webhooks/bandwidth/voice/initiate", status_code=http.HTTPStatus.OK)
def handle_initiate_event(callback: InitiateCallback) -> Response:
return Response()
# Handle Inbound WebSocket Connection from Bandwidth
@app.websocket("/ws")
async def handle_inbound_websocket(bandwidth_websocket: WebSocket, call_id: str = None):
return
def start_server(port: int) -> None:
uvicorn.run(
"main:app",
host="0.0.0.0",
port=port,
log_level="debug",
reload=True,
)
if __name__ == "__main__":
start_server(LOCAL_PORT)
The above code snippet creates a simple FastAPI server with three endpoints:
- A health check endpoint at
/healththat returns a204 No Contentstatus code. - A POST endpoint at
/webhooks/bandwidth/voice/initiatethat handles inbound call events from Bandwidth. - A WebSocket endpoint at
/wsthat handles an inbound WebSocket connection from Bandwidth for the Bi-Directional Audio Stream.
Handle Inbound Call Event
When a call is received on your Bandwidth number, Bandwidth will send a webhook to your server at the /webhooks/bandwidth/voice/initiate endpoint.
@app.post("/webhooks/bandwidth/voice/initiate", status_code=http.HTTPStatus.OK)
def handle_initiate_event(callback: InitiateCallback) -> Response:
call_id = callback.call_id
websocket_url = f"wss://{BASE_URL.replace('https://', '').replace('http://', '')}/ws"
start_stream = StartStream(
destination=f"{websocket_url}?call_id={call_id}",
mode="bidirectional",
name=call_id,
destination_username="foo",
destination_password="bar"
)
stop_stream = StopStream(name=call_id, wait="true")
bxml_response = Bxml(nested_verbs=[start_stream, stop_stream])
return Response(status_code=http.HTTPStatus.OK, content=bxml_response.to_bxml(), media_type="application/xml")
The above code snippet does the following:
- Parses the incoming webhook payload into an
InitiateCallbackPydantic model. - Extracts the
callIdfrom the webhook payload. - Constructs the WebSocket URL that Bandwidth will use to stream audio to your server.
- Creates a
<StartStream>verb with the WebSocket URL and other necessary parameters. - Creates a
<StopStream>verb to stop the stream when the call ends. - Constructs a BXML response containing the
<StartStream>and<StopStream>verbs - Returns the BXML response to Bandwidth.
Consider adding <StartRecording transcribe="true" /> before the <StartStream> verb to record and transcribe the call.
Handle Inbound WebSocket Connection
When Bandwidth receives the BXML response with the <StartStream> verb, it will initiate a WebSocket connection to your server at the /ws endpoint.
@app.websocket("/ws")
async def handle_inbound_websocket(bandwidth_websocket: WebSocket, call_id: str = None):
await bandwidth_websocket.accept()
if not call_id:
await bandwidth_websocket.close(code=1008, reason="Missing call_id parameter")
return
async with websockets.connect(
f"wss://api.openai.com/v1/realtime?model=gpt-realtime&temperature={AGENT_TEMPERATURE}",
additional_headers={
"Authorization": f"Bearer {OPENAI_API_KEY}"
}
) as openai_websocket:
await initialize_openai_session(openai_websocket)
await asyncio.gather(
receive_from_bandwidth_ws(bandwidth_websocket, openai_websocket),
receive_from_openai_ws(openai_websocket, bandwidth_websocket, call_id)
)
The above code snippet does the following:
- Accepts the incoming WebSocket connection from Bandwidth.
- Extracts the
call_idquery parameter from the WebSocket URL. - Establishes a WebSocket connection to OpenAI's Realtime API with the appropriate model and temperature settings.
- Initializes the OpenAI session by sending a
startmessage with the desired prompt and tools. - Uses
asyncio.gatherto concurrently handle receiving messages from both Bandwidth and OpenAI.
We use a call_id query parameter to correlate the WebSocket connection to the Bandwidth call. This call_id value is also present in the start websocket message from Bandwidth, but we need to provide it to the OpenAI Websocket for tool calls.
For demo purposes - the query parameter was the simplest way. Your implementation may vary, or not require the call id at all.
Broker the WebSocket Connections
The receive_from_bandwidth_ws and receive_from_openai_ws functions are responsible for brokering the audio and messages between Bandwidth and OpenAI.
async def receive_from_bandwidth_ws(bandwidth_websocket: WebSocket, openai_websocket: ClientConnection):
try:
async for message in bandwidth_websocket.iter_json():
event = BandwidthStreamEvent.model_validate(message)
match event.event_type:
case StreamEventType.STREAM_STARTED:
logger.info(f"Stream started for call ID: {event.metadata.call_id}")
case StreamEventType.MEDIA:
audio_append = {
"type": "input_audio_buffer.append",
"audio": event.payload
}
await openai_websocket.send(json.dumps(audio_append))
case StreamEventType.STREAM_STOPPED:
logger.info("stream stopped")
await bandwidth_websocket.close()
await openai_websocket.close()
case _:
logger.warning(f"Unhandled event type: {event.event_type}")
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"WebSocket connection closed with error: {e}")
await bandwidth_websocket.close()
await openai_websocket.close()
async def receive_from_openai_ws(openai_websocket: ClientConnection, bandwidth_websocket: WebSocket, call_id: str):
last_assistant_item = None
try:
async for message in openai_websocket:
openai_message = json.loads(message)
match openai_message.get('type'):
case 'response.output_audio.delta' if 'delta' in openai_message:
audio_payload = base64.b64encode(base64.b64decode(openai_message['delta'])).decode('utf-8')
media = StreamMedia(
content_type="audio/pcmu",
payload=audio_payload
)
play_audio_event = BandwidthStreamEvent(
event_type=StreamEventType.PLAY_AUDIO,
media=media
)
await bandwidth_websocket.send_text(play_audio_event.model_dump_json(by_alias=True, exclude_none=True))
case 'response.output_audio_transcript.done':
logger.info(openai_message.get('transcript'))
case 'conversation.item.done':
if openai_message.get('item').get('type') == 'function_call':
function_name = openai_message.get('item').get('name')
handle_tool_call(function_name, call_id)
case 'input_audio_buffer.speech_started':
truncate_event = {
"type": "conversation.item.truncate",
"item_id": last_assistant_item,
"content_index": 0,
"audio_end_ms": 0
}
await openai_websocket.send(json.dumps(truncate_event))
clear_event = BandwidthStreamEvent(
event_type=StreamEventType.CLEAR,
)
await bandwidth_websocket.send_text(clear_event.model_dump_json(by_alias=True, exclude_none=True))
last_assistant_item = None
case 'error':
logger.error(f"OpenAI Error: {openai_message.get('error').get('message')}")
case _:
logger.debug(f"Unhandled OpenAI message type: {openai_message.get('type')}")
pass
if openai_message.get('item'):
try:
last_assistant_item = openai_message.get('item').get('id')
except KeyError:
pass
except websockets.exceptions.ConnectionClosedError as e:
logger.error(f"OpenAI WebSocket connection closed with error: {e}")
await bandwidth_websocket.close()
await openai_websocket.close()
Bandwidth Messages
The receive_from_bandwidth_ws function listens for messages from Bandwidth and handles them based on the event type:
- For
startevents, it logs the start of the stream. - For
mediaevents, it decodes the base64 audio payload and sends it to OpenAI as aninput_audio_buffer.appendmessage. - For
stopevents, it closes both WebSocket connections. - For unhandled event types, it logs a warning, as these three are the only events we expect to receive.
OpenAI Messages
The receive_from_openai_ws function listens for messages from OpenAI and handles them based on the message type:
- For
response.output_audio.deltamessages, it encodes the audio delta and sends it to Bandwidth as aPLAY_AUDIOevent. - For
response.output_audio_transcript.donemessages, it logs the transcript of the audio. - For
conversation.item.donemessages, it checks if the item is afunction_calland invokes thehandle_tool_callfunction to process the tool call. - For
input_audio_buffer.speech_startedmessages, it sends aconversation.item.truncatemessage to OpenAI to truncate the last assistant item and sends aCLEARevent to Bandwidth to clear any played audio. - For
errormessages, it logs the error message. - For unhandled message types, it logs a debug message.
- It also keeps track of the last assistant item ID for truncation purposes.
Tools
The handle_tool_call function processes tool calls from OpenAI. In this example, we only handle the transfer tool, which transfers the call to a human agent.
def handle_tool_call(function_name: str, call_id: str = None):
match function_name:
case 'transfer_call':
logger.info("Request to transfer_call received")
transfer_number = PhoneNumber(TRANSFER_TO)
transfer_bxml = Transfer([transfer_number])
update_call_bxml = Bxml([transfer_bxml])
try:
bandwidth_voice_api_instance.update_call_bxml(BW_ACCOUNT, call_id, update_call_bxml.to_bxml())
except Exception as e:
logger.error(f"Error transferring call: {e}")
logger.warning(f"Unhandled function call: {function_name}")
return
Tools must be provided to the agent when the session is initialized. In this example, we provide the transfer_call tool to the agent. Tools are a powerful way to extend the capabilities of your AI agent.
Connect to your Public Server
Now that we have our application running locally, we need to expose it to the internet so that OpenAI can send webhook events to it. You can use a tool like ngrok to create a secure tunnel to your local server.
In a new terminal window, run the following command:
ngrok http 3000
This will give you a public URL that you can use to configure your Bandwidth Voice Application and OpenAI Webhook URL.
The URL generated by NGROK is what you will use as your BASE_URL environment variable. Ngrok must be started before running docker compose up if you are running the sample application.
Configure your Programmable Voice Application
This guide assumes that you have created a Programmable Voice Application and associated it with a phone number. If you haven't done this yet, please refer to the account setup guide to learn more.
Once your application is created - you will set the Inbound Call webhook URL to point to your public server's /webhooks/bandwidth/voice/initiate endpoint and the Status Callback URL to your /webhooks/bandwidth/voice/status endpoint.
http://someNgrokId.ngrok-free.app/webhooks/bandwidth/voice/initiate
http://someNgrokId.ngrok-free.app/webhooks/bandwidth/voice/status
Test the Integration
Now that everything is set up, you can test the integration by calling your Bandwidth phone number. You should be connected to the AI agent, and you can have a conversation with it. When you say "transfer me to a human agent", the call should be transferred to the number specified in the TRANSFER_TO environment variable.