Deploy bidirectional streaming agents with Vertex AI Agent Engine and Live API

This blog has been co-authored by Hanfei Sun, Vertex AI Agent Engine, Software Engineer, and Huang Xia, Vertex AI Agent Engine, Software Engineer.

TL;DR: Vertex AI Agent Engine now integrates with the Live API to enable real-time, bidirectional streaming agents. This allows for low-latency, human-like conversations using text and audio. This post demonstrates how to quickly build a streaming agent with the Agent Development Kit (ADK), leveraging a fully managed, serverless platform that handles deployment as a stateful, tool-using streaming endpoint without manual stream management.

For years, we’ve built AI interactions around a fundamental, unnatural pattern: request-response. The user speaks, a spinner appears, we wait, and the AI finally responds. This isn’t a conversation; it’s a turn-based query.

Human dialogue is messy, real-time, and bidirectional. We talk over each other, we interrupt, and we expect immediate feedback. This is precisely the capability that Google’s Multimodal Live API delivers. It’s a shift from turn-based queries to a continuous, low-latency, bidirectional stream.

But how do you take this API and use it to build scalable agentic applications? When you start building agents with this API, you need to know how to architect and maintain a stateful, persistent WebSocket connection layer that can proxy bidirectional streams between your agents and the Gemini API, managing the complexities of asynchronous I/O, connection state, and error handling.

This is where it integrates with Vertex AI Agent Engine, our fully-managed, serverless platform for deploying agents that would leverage the Google’s Multimodal Live API

Instead of just telling you that it works, let’s look under the hood at how the integration works. This article shows you two primary paths to deploy a real-time streaming agent, moving from full low-level control to a powerful high-level abstraction.

Understanding Live API integration for bidirectional streaming agents

The Live API itself is a bidirectional, multimodal streaming interface to Gemini.

Bidirectional streaming provides a persistent, two-way communication channel between your application and the agent, letting you move beyond turn-based, request-response patterns.

Bidirectional streaming means data flows continuously in both directions over a single, persistent connection (typically a WebSocket). You can send information, and the AI can start responding even before you finish, allowing you to interrupt it just like in a real chat.

Multimodal means it handles multiple types of data. You can stream text, your voice (Audio), or even a camera feed (Video) in, and you can get text or natural-sounding spoken replies (Audio) streamed out.

Manually building a server to manage these persistent WebSocket connections, handle concurrent audio streams, and allow you to deploy agents can be a tricky engineering challenge. This is the problem Agent Engine solves. Instead of building a WebSocket server, you deploy a Python object. Agent Engine provides the serverless infrastructure, the public-facing endpoint, and the logic to manage the real-time stream, connecting it directly to your Python code.

Get started with Live API on Agent Engine

To get started, let’s examine how the integration between Live API and Agent Engine works at a lower level. This “Path 1” gives you full control over the streaming logic. You can find the original code in this tutorial.

Define the bidirectional method

The key integration point is a single asynchronous method signature you define in your agent class:

async def bidi_stream_query(self, input_queue: asyncio.Queue) -> AsyncIterator[Dict[str, Any]]:

In particular, you start with developing a Python class like the LiveAudioAgent below to Agent Engine.

class LiveAudioAgent:

    def set_up(self):
        # Build the WebSocket URL for the Gemini Live API backend
        self.gemini_api_url = f"wss://{LOCATION}-aiplatform.googleapis.com/..."
        # Configuration to request AUDIO responses
        self.gemini_config = {"response_modalities": ["AUDIO"]}

    async def _authenticate(self):
        # Helper to get Google auth tokens
        ...

    async def bidi_stream_query(
        self,
        input_queue: asyncio.Queue  # Agent Engine puts client messages here
    ) -> AsyncIterator[Dict[str, Any]]:

        token = await self._authenticate()
        headers = {"Authorization": f"Bearer {token}"}

        # Your agent creates its OWN connection to the Gemini Live API
        async with websockets.connect(self.gemini_api_url, extra_headers=headers) as websocket:

            # Setup the backend connection (e.g., tell it which model to use)
            await self._setup_session(websocket)

            # Run the main loop
            while True:
                # Wait for input from the client (via Agent Engine)
                request = await input_queue.get()
                text_input = request["input"]

                # Send that input to the Gemini Live API
                await self._send_text(websocket, text_input)

                # Listen for the streaming response (audio) from Gemini
                async for audio_data in self._receive_audio(websocket):
                    # Yield the audio chunk back to the client (via Agent Engine)
                    yield audio_data

This class needs to be defined in a way that once it gets deployed, a client app can connect to your agent’s streaming endpoint. Agent Engine establishes the persistent connection and calls your bidi_stream_query method. It provides you with an asyncio.Queue. So, any message the client sends (e.g., user text) is automatically put onto this queue. Your code simply needs to await queue.get() to receive input. Finally, to send data back to the client (like text or audio chunks), your function just need to use yield expression.

Notice how inside the bidi_stream_query method, you have complete control. As shown in the LiveAudioAgent example, your code can create its own secondary WebSocket connection directly to the Gemini Live API backend. So, it becomes a real-time bridge: it takes client input from the queue, forwards it to the Live API, receives audio chunks back from Gemini, and yields those chunks directly back to the client.

(Optional) Register the method

But how does Agent Engine know your method is for bidi streaming? You must register it by implementing the register_operations method. This tells the platform which methods map to which execution modes: synchronous (“”), streaming (stream), or bidirectional streaming (bidi_stream).

from typing import AsyncIterable, Iterable

class LiveAudioAgent:

    def register_operations(self):
        return {
            # The list of synchronous operations
            "": ["query", "query"],
            # The list of streaming operations
            "stream": ["stream_query", "stream_query"],
            # The list of bidi streaming operations
            "bidi_stream": ["bidi_stream_query", "bidi_stream_query"]
        }

Connect to the deployed agent

At this point you have your streaming agent. To deploy this agent, you must include any dependencies (like websockets) in your requirements. After it gets deployed, your client application can start interacting with it simply using the config parameter which indicates the registered bidi method it wants to call.

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        # Specify which bidi_stream method to run
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    
    # ... send/receive logic ...
    await connection.send({"input": "Hello!"})

Simplifying real-time streaming agents with Agent Development Kit (ADK) on Vertex AI Agent Engine

Now that you have a better understanding of how the integration works, let’s put everything together to build an agent with Agent Development Kit (ADK).

When you use ADK, you gain the capabilities of the Live API without writing a single line of WebSocket or queue-handling code.

You define your agent not as a streaming class, but declaratively using the ADK’s LlmAgent class. You give it an instruction, a list of Python tools (like get_weather), and a Live API-compatible model. Then, you wrap your ADK Agent in an AdkApp object and deploy that single object to Agent Engine.

The AdkApp is now extended, and it automatically provides a bidi_stream_query endpoint for you. You don’t have to write it. When a client connects to your deployed ADK agent’s streaming endpoint, the ADK’s internal logic handles everything:

  • It automatically manages the connection to the Gemini Live API.
  • It (can) handle the session and conversation memory.
  • It listens to the user’s streamed request.
  • It orchestrates tool calls (like calling your get_weather function) mid-conversation.
  • It generates a response and automatically streams back both text and synthesized audio, allowing for natural, interruptible, tool-using conversations out of the box.

The following shows what the integration looks like.

from google.adk import Agent, AdkApp
from google.adk.tools import FunctionTool
from google.adk.sessions.in_memory_session_service import InMemorySessionService
from google.adk.memory.in_memory_memory_service import InMemoryMemoryService

# 1. Define your tools (just standard Python functions)
async def get_weather(location: str) -> dict:
    """Gets current weather for a location."""
    # ... (logic to call a real API)
    return {"temp": "75°F", "conditions": "Sunny"}

# 2. Define your agent declaratively
weather_agent = Agent(
    model='gemini-2.0-flash-live-preview-04-09', # A Live API-compatible model
    name='weather_assistant',
    instruction="You are a friendly weather assistant. Use your tools.",
    tools=[
        FunctionTool(get_weather),
    ],
)

# 3. Wrap your agent in the deployable AdkApp
# This example uses in-memory services for simplicity
app = AdkApp(
    agent=weather_agent,
    session_service_builder=InMemorySessionService,
    memory_service_builder=InMemoryMemoryService
)

# 4. Deploy this 'app' object to Agent Engine.
# client.agent_engines.create(agent=app, ...)
#
# That's it. You now have a fully-managed, scalable, real-time
# audio agent endpoint that can use your Python tools.

Interact with the deployed ADK Agent

Interacting with a deployed ADK agent is also standardized. You send a LiveRequest object and parse the resulting Event objects, which can contain text, audio, or tool call information.

import numpy as np
from google.adk.agents.live_request_queue import LiveRequest
from google.adk.events import Event
from google.genai import types

# Helper to format the text input as a LiveRequest
def prepare_live_request(input_text: str) -> LiveRequest:
    part = types.Part.from_text(text=input_text)
    content = types.Content(parts=[part])
    return LiveRequest(content=content)

async with client.aio.live.agent_engines.connect(
        agent_engine=remote_live_agent.api_resource.name,
        config={"class_method": "bidi_stream_query"}
        ) as connection:
    
    first_req = True
    while True:
        input_text = input("Enter your question: ")
        if input_text == "exit":
            break
        
        # Send the LiveRequest
        if first_req:
            await connection.send({
                "user_id": "some-user-id",
                "live_request": prepare_live_request(input_text).dict()
            })
            first_req = False
        else:
            await connection.send(prepare_live_request(input_text).dict())

        audio_data = []
        # Listen for events (text, audio, tool calls, etc.)
        while True:
            response = await connection.receive()
            event = Event.model_validate(response["bidiStreamOutput"])
            part = event.content and event.content.parts and event.content.parts[0]

            if not part:
                # End of turn
                break
            
            # Check if the part contains audio data
            if part.inline_data and part.inline_data.data:
                chunk_data = part.inline_data.data
                data = np.frombuffer(chunk_data, dtype=np.int1Storage)
                audio_data.append(data)
            elif part.text:
                print(f"Agent (text): {part.text}")
        
        if audio_data:
            # Concatenate and play the full audio response
            concatenated_audio = np.concatenate(audio_data)
            # display(Audio(concatenated_audio, rate=24000, autoplay=True)) # for notebooks
            print("[Audio response received]")

Thanks to this integration between Live API, Agent Development Kit, and Vertex AI Agent Engine, you can now focus on your agent’s logic (its tools and instructions). Agent Engine and ADK handle the complex infrastructure of real-time, multimodal conversation.

Conclusion

We’ve explored how to deploy real-time streaming applications and streaming agents on Vertex AI Agent Engine. The choice, as always, comes down to your specific needs.

Vertex AI Agent Engine is flexible enough to allow you to manage your own connection to the Gemini backend, giving you complete control over streaming mechanics. This makes it ideal for highly bespoke or experimental agents. Alternatively, you can use the Agent Development Kit with an Agent Engine template to abstract the entire agentic and streaming lifecycle. This allows you to focus purely on your agent’s logic (its instructions and tools) rather than its infrastructure.

In both cases, it is important to keep in mind a few things when using the bidirectional streaming API with your agents:

  • Deployment: Deploying an agent with bidirectional methods is supported through the Google Gen AI SDK, which automatically sets the EXPERIMENTAL agent server mode.
  • Timeout: The maximum timeout for a bidi streaming query is 10 minutes. For longer processes, you should break the task into smaller chunks and use session or memory to maintain state.
  • Backpressure: You must manage the rate of data flow. Your agent should throttle content consumption to avoid latency or memory pressure if it processes data too slowly. Likewise, you may need to throttle content generation if your agent produces data faster than the client can consume it.

At the end, Vertex AI Agent Engine will provide the managed, serverless platform to run these persistent connections at scale, freeing you from building and maintaining your own WebSocket infrastructure for your agents.

What’s next

Here’s how you can start using this integration:

  • Explore the Full Tutorial Notebook: The best way to learn is by doing. This tutorial notebook contains all the code for the Echo Agent, the manual LiveAudioAgent, and the ADK Weather Assistant covered in this post. Run the code, see it work, and start tinkering.

  • Refer to the Official Documentation: Dive deeper into the concepts, limitations, and advanced configurations by reading the official documentation for the Live API and Vertex AI Agent Engine.

As always, if you get thoughts and feedback, I’d love to hear about it. Connect with me on LinkedIn or X/Twitter to share what you’re working on and share your ideas.

1 Like