Using A2A Extensions for Injecting Session State into ADK Agents exposed as A2A servers


More in my A2A Deep Dive Series:

The A2A 1.0 Milestone: Ensuring and Testing Backward Compatibility

Running and Debugging A2A Agents in Gemini Enterprise


Software industry learned their lessons transitioning from monolithic applications to microservices, and we are now seeing a similar evolution in AI. Teams shift from building monolithic multi-agent applications to orchestrating fully distributed multi-agent systems. And it is not easy — there are more questions than answers. Many of these questions are about state management and sharing.

Recently, one of my customers brought up a classic challenge in this space:

“Lukasz, we have ADK Agents — all works, but when we package and deploy them as A2A Agents, we don’t know how to pass session state to these ADK Agents using A2A endpoints.”

To understand the problem, we first need to look at how the Agent Development Kit (ADK) handles context. By design, every interaction with an ADK agent happens within a strictly isolated session.

Under the hood, ADK relies on the SessionService to manage these sessions. While the default InMemorySessionService is useful for local development, production environments typically utilize persistent backend implementations, such as remote databases. Regardless of the storage backend, ADK enforces strict session isolation. Every session is assigned a unique ID and is strictly scoped per user and per agent application, ensuring no data leaks or shared state between different users.

Every session maintains a dedicated working memory for key-value variables, known as session state:

Session state has several key uses in AI applications:

  • Dynamic Prompting: It can be used to populate agent prompt templates that use session state variables as placeholders, allowing instructions to adapt based on the values injected when calling the agent.

  • Short-Term Memory: It works as a temporary data store. While the agent runs, it can update the state to save relevant details or outputs from earlier steps.

  • Multi-Agent Coordination: In multi-agent setups, session state acts as a shared workspace. Downstream agents can easily access variables set by earlier agents to handle complex, multi-step tasks, all while keeping individual user sessions strictly isolated.

Let’s focus on that first application. Here is a sample single-agent ADK application that uses the session state variable user_info as a placeholder within the agent’s system instructions:

from google.adk.agents import Agent

from dotenv import load_dotenv
load_dotenv()

instruction = """
You are helpful AI assistant.
Here are details about user:
<UserContext>
{user_info}
</UserContext>

Answer the user's questions with professional depth and technical accuracy.
"""

root_agent = Agent(
    model="gemini-2.5-flash",
    name="specialist_agent",
    instruction=instruction
)

We can now use the ADK runner to execute this agent. To do this, we will create a new session and initialize the session state with a JSON object containing the user_info details:

session_service = InMemorySessionService()
artifact_service = InMemoryArtifactService()

AGENT_APP_NAME = "myagentapp"

runner = Runner(app_name=AGENT_APP_NAME, 
            agent=root_agent, 
            artifact_service=artifact_service,
            session_service=session_service
)


_state = {
        "user_info": {
          "name": "Lukasz Olejniczak",
          "gender": "male",
          "role": "AI Specialist",
          "email": "lolejniczak@google.com",
          "phone": "1234567890"
        },
        "time": "2024-10-23T00:48:45.640722"
    }

session = await session_service.create_session(
                            app_name = AGENT_APP_NAME, 
                            user_id = "testuser", 
                            state = _state
)



content = types.Content(role='user', 
    parts=[types.Part(text="What is my name?")]
)
events = runner.run(
      user_id=session.user_id, 
      session_id=session.id, 
      new_message=content
)

Alternatively, we could deploy our AI Agent to Agent Engine or Cloud Run to run it as an AI microservice. While creating a session and calling the agent via a REST API more closely resembles a true production setup, we won’t explore that path here. I want to keep things simple and focus strictly on how to pass session state variables when packaging our ADK agent as an A2A Server. But as you can see, session state is a highly effective way to inject runtime details — like user profiles, product specs, or event data — directly into your AI Agent.

Now, let’s say you want to expose your ADK Agent as an A2A Agent to the broader ecosystem. To do this we need to wrap it inside an A2A AgentExecutor. Think of this executor as a universal translator. It listens for standardized A2A requests coming from other agents and translates them into the specific format that our internal ADK Agent understands.

To build this translator, our class only needs to implement two standard methods:

  • execute: The core engine where we translate incoming A2A request, and hand it off to our ADK Agent to perform the actual work.

  • cancel: A method to stop a running process mid-flight (to keep things simple, we will completely neglect this one for our example).

Once we wrap our agent in this executor, we spin it up as an A2A server. This server automatically exposes two essential endpoints that the outside world uses to interact with our agent:

  • Messaging Endpoint: The active communication line. This is where other agents send their messages

  • Agent Card Endpoint: The discovery channel. Think of it as an ID badge that provides metadata about our agent, explaining what it is and what capabilities it has.

With the A2A foundations out of the way, let’s dive into the code and build our AgentExecutor:

import logging
from typing import Any, Union
from uuid import uuid4
from a2a.types import Message, Part, Role

from a2a.server.agent_execution import AgentExecutor, RequestContext
from a2a.server.events.event_queue import EventQueue
from a2a.server.tasks import TaskUpdater
from a2a.types import Part, TaskState, Message, Artifact
from a2a.types import Message, Part, Role, Task, TaskStatus

from google.adk.runners import Runner
from google.adk.artifacts import InMemoryArtifactService
from google.adk.sessions import InMemorySessionService
from google.adk.memory import InMemoryMemoryService
from google.genai import types
import sys
import json
from a2a.helpers.proto_helpers import new_task_from_user_message



class ADKA2AExecutorWithRunner(AgentExecutor):
    def __init__(self, agent: Any):
        self.agent = agent
        self.runner = None

    def _init_adk(self):
        """Standard ADK Runner initialization."""
        if not self.runner:
            self.runner = Runner(
                app_name=self.agent.name,
                agent=self.agent,
                artifact_service=InMemoryArtifactService(),
                session_service=InMemorySessionService(),
                memory_service=InMemoryMemoryService(),
            )

    async def cancel(self, context: RequestContext, event_queue: EventQueue) -> None:
            logger.info(f"Cancellation requested for task {context.task_id}")
            
    async def execute(self, context: RequestContext, event_queue: EventQueue) -> None:
        self._init_adk()

        # 1. Identity Management
        user_id = "a2a_user" ## For simplicity 
        
        # 2. Task Lifecycle Management
        task = new_task_from_user_message(context.message)
        await event_queue.enqueue_event(task)

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)

        working_message = updater.new_agent_message(
            parts=[Part(text='Processing your question...')]
        )
        await updater.start_work(message=working_message)

        # 3. Prepare ADK Input
        query = context.get_user_input()
        content = types.Content(role='user', parts=[types.Part(text=query)])

        try:
            # 4. Session Retrieval/Creation
            session = await self.runner.session_service.get_session(
                app_name=self.runner.app_name,
                user_id=user_id,
                session_id=context.context_id
            )
            
            if not session:
                logger.info(f"Creating fresh session for {user_id}")
                session = await self.runner.session_service.create_session(
                    app_name=self.runner.app_name,
                    user_id=user_id,
                    session_id=context.context_id
                )

            # 5. ADK Execution Loop
            final_event = None
            async for event in self.runner.run_async(
                session_id=session.id,
                user_id=user_id,
                new_message=content
            ):
                if event.is_final_response():
                    final_event = event

            # 6. Response Processing
            if final_event and final_event.content and final_event.content.parts:
                response_text = "".join(
                    part.text for part in final_event.content.parts if hasattr(part, 'text') and part.text
                )
                
                if response_text:
                    updated_session = await self.runner.session_service.get_session(
                        app_name = self.runner.app_name, 
                        user_id = user_id, 
                        session_id = context.context_id
                    )
                    
                    await updater.add_artifact(
                        [Part(text=response_text)],
                        name='result'
                    )   
                    await updater.complete()
                    return

            # Fallback if no response generated
            msg = Message(
                role=Role.ROLE_AGENT, 
                message_id=str(uuid4()), 
                parts=[Part(text="Agent failed to produce a final response.")]
            )
            await updater.update_status(
                TaskState.TASK_STATE_FAILED,
                message=msg
            )

        except Exception as e:
            logger.exception(f"Critical error in ADK Execution: {e}")
            msg = Message(
                role=Role.ROLE_AGENT, 
                message_id=str(uuid4()), 
                parts=[Part(text=f"Critical error in ADK Execution: {e}")]
            )
            await updater.update_status(
                TaskState.TASK_STATE_FAILED,
                message=msg
            )

While the code might look a bit long at first glance, it mostly reuses the standard ADK runner snippets we have already covered.

The main question you might have looking at the execute method is: how do we actually extract the user’s prompt from the incoming A2A request?

The answer lies in the RequestContext object:

We can easily grab the payload using its built-in method:

query = context.get_user_input()

Once we have that query, we package it into a standard Content object…

content = types.Content(role='user', parts=[types.Part(text=query)])

…and pass it directly to our ADK runner to do the heavy lifting:

self.runner.run_async(
     session_id=session.id,
     user_id=user_id,
     new_message=content
)

Finally, when the ADK Agent generates its response, we have to translate it back into “A2A language.” We do this by packaging the answer as an A2A artifact and signaling that the task is finished, which dispatches the final response back to the A2A client:

await updater.add_artifact(
      [Part(text=response_text)],
      name='result'
)   
await updater.complete()

Now that our AgentExecutor is ready, we need to spin up the actual server to host it. Here is the code to run an A2A Server that utilizes the executor we just defined:

import uvicorn
import logging
from fastapi import FastAPI

from executor import ADKA2AExecutorWithRunner

from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
from a2a.types import AgentCard, AgentCapabilities, AgentInterface, AgentSkill
from agent import root_agent

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("a2a.server")

agent_executor = ADKA2AExecutorWithRunner(agent=root_agent)

# 5. Define the Agent Card
agent_card = AgentCard(
    name='ADK Specialist Agent',
    description='Professional Agent with State Injection',
    supported_interfaces=[
        AgentInterface(
            url="http://localhost:9999/a2a/jsonrpc", 
            protocol_binding="JSONRPC"
        )
    ],
    capabilities=AgentCapabilities(
        extensions=[]
    ),
    skills=[AgentSkill(id='specialist', name='AI Specialist Support')],
    version="1.0.0",
)

# 6. Setup Request Handler
request_handler = DefaultRequestHandler(
    agent_card=agent_card,
    agent_executor=agent_executor,
    task_store=InMemoryTaskStore(),
)

# 7. Initialize FastAPI and mount routes
app = FastAPI(title="A2A ADK Specialist Server")

app.routes.extend(create_agent_card_routes(agent_card))
app.routes.extend(create_jsonrpc_routes(request_handler, '/a2a/jsonrpc'))

if __name__ == "__main__":
    logger.info("Starting A2A Server on port 9999...")
    uvicorn.run(app, host="0.0.0.0", port=9999)

Notice how we define the AgentCard here? This is that “ID badge” we mentioned earlier, which tells the A2A ecosystem exactly how to interact with our agent. In this example, we are using JSON-RPC for communication, exposing our agent on port 9999.

Building the A2A Client

To test this, we need an A2A Client to send a message to our newly minted server. Here is a simple script to do just that:

import httpx
import asyncio
import logging
from uuid import uuid4
from a2a.client.client import ClientCallContext
from a2a.client.client_factory import ClientFactory, ClientConfig
from a2a.types import SendMessageRequest, Message, Role, Task, StreamResponse
import sys
from a2a.types import Message, Part, Role
from typing import Any
from a2a.client.service_parameters import (
    ServiceParametersFactory,
    with_a2a_extensions,
)

logging.basicConfig(level=logging.DEBUG)
from a2a.client.client import ClientCallContext
from a2a.client.interceptors import ClientCallInterceptor, BeforeArgs, AfterArgs


async def run_client():
    async with httpx.AsyncClient() as httpx_client:
        client = await ClientFactory(
            ClientConfig(httpx_client=httpx_client)
        ).create_from_url(
            url='http://localhost:9999',
            interceptors=[]
        )

        msg = Message(
            role=Role.ROLE_USER, 
            message_id=str(uuid4()), 
            parts=[Part(text="Who I am and what is my email?")]
        )
        
        ## send message
        events = [
            event async for event in client.send_message(request=SendMessageRequest(message=msg))
        ]
        response = events[0]
        print(response)

if __name__ == "__main__":
    asyncio.run(run_client())

The Moment of Truth

Now, let’s put it all together! First, run your A2A server script. Once it is listening on port 9999, execute the A2A client script. The client will fire off A2A Request with the query asking, “Who am I and what is my email?” directly to the server.

But if you look at the response, you will immediately spot a problem: the agent failed to generate answer!

Why did this happen? When we ran the ADK Agent locally earlier, we explicitly created a session and injected a rich JSON object containing user_info before we even asked the question.

However, when we wrapped our agent in the A2A AgentExecutor, we hit a wall. Look closely at what our A2A client actually sends: it only sends the raw text message (“Who am I and what is my email?”). There is nothing in that standard A2A request payload that includes our session state variables. Our execute method grabs the user query, creates a completely blank session, and runs the agent.

This is exactly what we get from A2A Agent:

status {
    state: TASK_STATE_FAILED
    message {
      message_id: "773f60a7-e3e5-4eb3-a284-676ee7332908"
      role: ROLE_AGENT
      parts {
        text: "Critical error in ADK Execution: \'Context variable not found: `user_info`.\'"
      }
    }

So, if the standard payload doesn’t support it, how do we actually pass that session state from the A2A Client straight through to our ADK Agent? Figuring out the answer to that exact puzzle is the reason I wrote this post in the first place. Here is how we do it :smiley:

With this simple example, we have successfully reproduced a common pain point developers face when breaking a monolithic multi-agent application into a distributed architecture. Our A2A Client only passed the raw user query, leaving us without any obvious mechanism to instantiate the underlying ADK Agent’s session state.

So, let’s pause and consider what we actually need to make this work. Two fundamental questions emerge:

  1. Schema Discovery: Session state is a flexible key-value store, and the Agent Designer decides which variables are required. But how does the A2A Server communicate the expected JSON schema to potential consumers (A2A Clients) so they know exactly what data to provide?

  2. Data Transmission: The A2A Client needs a way to send that JSON object alongside the user query. How do we pass both the conversational prompt and the background session state within a single A2A request?

Enter A2A Extensions

While there are many creative ways to approach this, the most robust solution is to use A2A Extensions. This is a highly flexible mechanism designed specifically to expand the standard A2A protocol.

Typically, developers leverage A2A Extensions for scenarios such as:

  • Data-Only Extensions: Exposing new, structured information in the Agent Card (e.g., regulatory compliance badges) without altering the standard request-response flow.

  • Profile Extensions: Enforcing additional structure and state change requirements on core messages.

  • Method Extensions (Extended Skills): Adding entirely new RPC methods beyond the protocol’s core set.

  • State Machine Extensions: Introducing new states or transitions to the task lifecycle.

This list isn’t exhaustive. As the A2A documentation notes, “the exact set of possible ways to use extensions is intentionally broad, facilitating the ability to expand A2A beyond known use cases.”

Our Approach

For our specific session state challenge, here is the approach we will take:

  • Schema via Agent Card: We will publish the expected JSON schema for the session state directly inside the Agent Card. This allows A2A Clients to dynamically discover and format the required data before sending a request.

  • State via Message Metadata: The A2A Client will transmit the actual session state JSON embedded within the message metadata.

  • Custom Extension Helpers: We will build a custom A2A Extension containing dedicated helper functions. These helpers will:

1/ Allow the Client to verify if the Server supports the extension and fetch the expected JSON schema.

2/ Allow the Server to detect if the Client is requesting the use of extension, extract the session state JSON from the incoming message, and validate it against the schema.

Here is the step-by-step breakdown of how we will implement this across both the A2A Client and the A2A Server:

On the A2A Client side (the left column), the client proactively uses the Agent Card to discover and get the exact JSON schema it needs to format for the session state. Crucially, it then packages this formatted state as a JSON object directly inside the message metadata before dispatching the user query.

On the A2A Server side (the right column), the server performs validation handshake using our extension helpers: it checks for the requested extension, identifies the state key, and validates the incoming state JSON against the defined schema. Once validated, the server moves to the internal, light-blue ADK internal step to create the ADK session and initialize it using that validated state. This is the key moment that was previously missing.

After the ADK agent executes, the server takes a final snapshot of the updated session state, and adds this updated state back to the response metadata under the state key before sending it back to the client, ensuring state persistence across the entire distributed interaction.

Building the StateInjectionExtension

Let’s dive into the code and build our extension class, StateInjectionExtension. We will break it down piece by piece.

First, let’s look at the constructor and the core identifiers:

class StateInjectionExtension:
        def __init__(self, state_schema: str | dict | None = None):
            self.CORE_PATH = 'github.com/lolejniczak-shared/a2a-samples/extensions/state_injection/v1'
            self.URI = f'https://{self.CORE_PATH}'
            self.STATE_FIELD = f'{self.CORE_PATH}/state'

            # Store as dict for internal validation speed, but accept string for flexibility
            if isinstance(state_schema, str):
                try:
                    self.state_schema = json.loads(state_schema)
                except json.JSONDecodeError as e:
                    logger.error(f"Failed to parse state_schema JSON: {e}")
                    self.state_schema = None
            else:
                self.state_schema = state_schema

The constructor accepts the expected session state JSON schema as either a string or a dictionary (storing it internally as a dictionary for faster validation).

More importantly, we initialize two critical fields:

  • URI: [https://github.com/lolejniczak-shared/a2a-samples/extensions/state_injection/v1](https://github.com/lolejniczak-shared/a2a-samples/extensions/state_injection/v1)

  • STATE_FIELD: [https://github.com/lolejniczak-shared/a2a-samples/extensions/state_injection/v1/state](https://github.com/lolejniczak-shared/a2a-samples/extensions/state_injection/v1/state)

Why a GitHub URI? If you are new to A2A Extensions, you might wonder: Why does the URI point to GitHub? Does our agent now depend on GitHub to run?

The short answer is no. The URI acts primarily as a globally unique namespace identifier for our extension. While in some official A2A sample extensions (like the timestamp extension), the URI points to a Markdown specification file explaining the extension to clients, simple A2A clients don’t strictly require this. Hovewer — if you are planning to have extensions that can be used by many clients — it is good practice to document it and spec.md is great mechanism to do it. Also if there is something like extension registry in the future — having standardized name for file that documents extension will make it easy to import that documentation into registry. So no dependency on github. Even for our purposes, the URI is purely an ID, and the STATE_FIELD derived from that URI determines the exact metadata key where our session state JSON will live.

1. Checking for Extension Support

First, our extension needs a way to review an AgentCard and check if our specific URI is already on its list of supported capabilities. Instead of writing custom parsing logic, we can utilize a handy built-in A2A SDK utility function called find_extension_by_uri:

    def is_supported(self, card: AgentCard | None) -> bool:
        """Returns whether this extension is supported by the AgentCard."""
        return find_extension_by_uri(card, self.URI) is not None if card else False

2. Defining the Extension Details

Next, we need a helper function that defines exactly how our extension should be represented when it is registered. We will pass along our unique URI, a brief description, and the expected state_schema. Schema will be published as extension Paramus attribute.

        def agent_extension(self) -> AgentExtension:
                """Get the AgentExtension representing this extension."""
                return AgentExtension(
                            uri=self.URI,
                            description='Injects state into message metadata for ADK state synchronization.',
                            params={"state_schema": self.state_schema} if self.state_schema else {},
                            required=True 
                        )

Notice the required=True flag here. This is crucial: it strictly enforces that any A2A Client trying to communicate with our ADK Agent must use this extension. If they don’t, the request will fail. It ensures our ADK Agent will never be executed without the required session state.

3. Registering the Extension on the Card

Finally, we need a method to actually inject this definition into the Agent Card’s payload. This method ensures the capabilities list exists, verifies that our extension isn’t already attached (using our is_supported method), and then safely appends it.

def add_to_card(self, card: AgentCard) -> AgentCard:
            """Add this extension to an AgentCard."""
            if not card.capabilities.extensions:
               card.capabilities.extensions = []

            if not self.is_supported(card):
                card.capabilities.extensions.append(self.agent_extension())
                logger.info(f"Extension {self.URI} successfully added to AgentCard.")
            return card

4. Verifying the Client Request

When the A2A Server receives an incoming task, it needs a quick way to verify if the client explicitly asked to use our extension. We can check this by inspecting the requested_extensions list attached to the RequestContext:

        def is_requested(self, context: RequestContext) -> bool:
            requested = self.URI in context.requested_extensions
            if requested:
                print(f"Extension {self.URI} requested for request {getattr(context, 'request_id', 'N/A')}")
            else:
                print(f"There is no {self.URI} on the list")
            return requested

5. Detecting State in the Payload

Once we know the extension is requested, we need to check if the incoming Message (or Artifact) actually contains our custom state payload. This helper simply looks inside the metadata dictionary for our specific STATE_FIELD key:

        def has_state(self, o: Message | Artifact) -> bool:
            """Returns whether a message or artifact has state."""
            return bool(o.metadata and self.STATE_FIELD in o.metadata)

6. Validating the State JSON Schema

This next step is crucial for reliability. Before we blindly pass the incoming JSON into our ADK Agent’s session state, we must ensure it matches the exact schema we expect.

This helper grabs the raw state from the metadata, converts it to a standard dictionary using MessageToDict, and validates it using the jsonschema library. If the client sends missing fields or the wrong data types, this method catches it immediately:

        def is_valid_schema(self, o: Message | Artifact) -> bool:
            """
            Validates the state object within metadata against the extension's schema.
            """
            if not self.has_state(o):
                return False
            
            if not self.state_schema:
                logger.warning(f"Validation skipped for {self.URI}: No schema defined in extension.")
                return True
                
            try:
                raw_state = o.metadata[self.STATE_FIELD]
                state_data = MessageToDict(raw_state)
                jsonschema.validate(instance=state_data, schema=self.state_schema)
                return True
            except jsonschema.ValidationError as e:
                logger.error(f"Schema validation failed for {self.URI}: {e.message} at {list(e.path)}")
                return False
            except Exception as e:
                logger.exception(f"Unexpected error during schema validation: {e}")
                return False

7. Injecting and Extracting the State Payload

The final two methods are our practical getters and setters. They handle the actual injection of the state into our messages and the extraction of the state when we need to read it.

The add_state method safely checks if a metadata dictionary exists (creating one if it doesn’t) and injects our state payload. It also includes a safety check to ensure we don’t accidentally overwrite state that has already been injected:

    def add_state(self, o: Message | Artifact, state_dict = {}) -> None:
            """Injects the state into the metadata field."""
            if o.metadata is None:
                o.metadata = {}
            if self.has_state(o): 
                logger.debug(f"State already exists on {type(o).__name__}, skipping injection.")
                return
            o.metadata[self.STATE_FIELD] = state_dict if state_dict is not None else {}
            logger.info(f"Successfully injected state into {type(o).__name__} metadata.")
        

Finally, the get_state method does the reverse. If the state exists, it pulls it out of the metadata and neatly converts it back into a standard Python dictionary using MessageToDict so our executor can easily use it:

def get_state(self, o: Union[Message, Artifact]) -> dict :
            """Helper to extract the state if it exists."""
            if self.has_state(o):
                val = o.metadata[self.STATE_FIELD]
                m = MessageToDict(val)
                if isinstance(m, dict):
                    return m
                logger.warning(f"Expected dict in {self.STATE_FIELD}, found {type(val).__name__}.")
            return {}

Wiring Up the A2A Server

Now that our StateInjectionExtension class is ready, we need to apply it to both our A2A Server and A2A Client. Let’s start with the Server.

Our Server needs to do two things: announce that it supports this state injection extension, and publish the exact JSON schema it expects for the session state. Both of these details will be shared with the outside world via the Agent Card.

Our underlying ADK Agent expects a user_info object containing a name, role, and email. All three attributes are required. Here is the corresponding JSON schema for that structure:

USER_INFO_SCHEMA = {
    "type": "object",
    "properties": {
        "user_info": {
            "type": "object",
            "properties": {
                "name": {"type": "string"},
                "role": {"type": "string"},
                "email": {"type": "string", "format": "email"}
            },
            "required": ["name", "role", "email"],
            "additionalProperties": False
        }
    },
    "required": ["user_info"]
}

With our schema defined, we can initialize our extension and inject it into our server setup. Here is what our updated server startup script looks like:

import uvicorn
import logging
from fastapi import FastAPI

from executor import ADKA2AExecutorWithRunner
from extensions.state_injection import StateInjectionExtension
from a2a.server.request_handlers import DefaultRequestHandler
from a2a.server.tasks import InMemoryTaskStore
from a2a.server.routes import create_agent_card_routes, create_jsonrpc_routes
from a2a.types import AgentCard, AgentCapabilities, AgentInterface, AgentSkill
from agent import root_agent

logging.basicConfig(level=logging.INFO)
logger = logging.getLogger("a2a.server")

# 1. Initialize our extension with the defined schema
state_ext = StateInjectionExtension(state_schema=USER_INFO_SCHEMA)

# 2. Pass the extension instance into our custom executor
agent_executor = ADKA2AExecutorWithRunner(agent=root_agent, state_ext=state_ext)

# 3. Add the extension to the Agent Card
agent_card = AgentCard(
    name='ADK Specialist Agent',
    description='Professional Agent with State Injection',
    supported_interfaces=[
        AgentInterface(
            url="http://localhost:9999/a2a/jsonrpc", 
            protocol_binding="JSONRPC"
        )
    ],
    capabilities=AgentCapabilities(
        extensions=[state_ext.agent_extension()] ##### !!!!!!!!!!! 
    ),
    skills=[AgentSkill(id='specialist', name='AI Specialist Support')],
    version="1.0.0",
)

# 4. The rest of the setup remains identical
request_handler = DefaultRequestHandler(
    agent_card=agent_card,
    agent_executor=agent_executor,
    task_store=InMemoryTaskStore(),
)

app = FastAPI(title="A2A ADK Specialist Server")

app.routes.extend(create_agent_card_routes(agent_card))
app.routes.extend(create_jsonrpc_routes(request_handler, '/a2a/jsonrpc'))

if __name__ == "__main__":
    logger.info("Starting A2A Server on port 9999...")
    uvicorn.run(app, host="0.0.0.0", port=9999)

In our case this Agent Card will resolve to such JSON:

What Changed?

While most of the server code stayed the same, we made three critical updates:

  1. Instantiated the Extension: We created state_ext and loaded it with our USER_INFO_SCHEMA.

  2. Updated the Agent Card: We added state_ext.agent_extension() to the card’s capabilities. Now, any A2A Client that queries this endpoint will immediately know exactly how to format the session state.

  3. Injected into the Executor: We passed the state_ext instance directly into our ADKA2AExecutorWithRunner. This is a vital change, as our executor will need access to those helper methods (like validation and metadata extraction) to process incoming client messages.

Updating the Agent Executor

Now, let’s “double click” on the specific changes needed inside our custom AgentExecutor.

First, as we established in the server setup, we must modify the constructor so it can receive the instance of our StateInjectionExtension:

class ADKA2AExecutorWithRunner(AgentExecutor):
    def __init__(self, agent: Any, state_ext: StateInjectionExtension):
        self.agent = agent
        self.state_ext = state_ext
        self.runner = None

Now comes the magic. We need to update our execute method to actually use the helper functions we built into our extension. Before we create a new ADK session, we need to verify if the extension was requested, check if the message metadata contains the state, and—most importantly—validate that the provided JSON matches our schema.

Here is how the first half of our updated execute method looks:

        injected_state = {}

        logger.info("Checking if client requested state injection extension")
        if self.state_ext.is_requested(context):
            logger.info(f"Checking if message includes state")
            if self.state_ext.has_state(context.message):
                    logger.info(f"Checking if state JSON has valid schema")
                    if self.state_ext.is_valid_schema(context.message):
                        injected_state = self.state_ext.get_state(context.message)
                    else:
                        logger.error("Metadata state failed schema validation.")

In the A2A protocol, communication is designed to handle complex, long-running agentic workflows. When you ask an AI Agent a question, it rarely has an instant answer. It might need to query a database, call external APIs, or run through multiple reasoning steps. If we relied on a simple, synchronous request-response model, the connection would just hang and potentially time out.

To solve this, A2A models interactions as Tasks.

A Task represents the complete lifecycle of a single unit of work requested by the client. Therefore as next step, we enqueue the new task for client request and create a TaskUpdater object. This allows us to communicate with the A2A client and send initial progress updates—such as a friendly “Processing your question…” message—so the user knows the agent is actively working on their request:

        task = new_task_from_user_message(context.message)
        await event_queue.enqueue_event(task)

        updater = TaskUpdater(event_queue, context.task_id, context.context_id)
        working_message = updater.new_agent_message(
            parts=[Part(text='Processing your question...')]
        )
        await updater.start_work(message=working_message)
        print(f"Current task: {context.current_task}")

Technicaly, by enqueuing a task and using a TaskUpdater, we shift to an asynchronous model. This is incredibly powerful because it allows our server to stream intermediate updates—like our “Processing your question…” message or partial artifacts—back to the client in real-time, keeping the user engaged while the underlying ADK agent works in the background.

Injecting the State into the ADK Session

With initial_state securely extracted and validated, we can now fix the exact problem that initiated this entire journey: initializing the ADK Agent’s session state.

We simply pass our initial_state variable directly into the ADK runner’s create_session method:

 logger.info(f"Creating fresh session for {user_id}")
 session = await self.runner.session_service.create_session(
                    app_name=self.runner.app_name,
                    user_id=user_id,
                    session_id=context.context_id,
                    state=injected_state # <--- THE MISSING LINK SOLVED!
                )

By adding those few lines of validation and passing the extracted state into create_session, our A2A Server is now fully capable of receiving out-of-band context and bridging it into the ADK agent.

Sending the Final Response

The very last step for our AgentExecutor is to send the completed response back to the A2A Client.

Here, we package the ADK Agent’s textual answer as an artifact. However, we also need to explicitly acknowledge that we successfully processed the out-of-band data. We let the A2A Client know that we activated and utilized their requested extension by passing our extension’s URI into the extensions list when creating the artifact:

 await updater.add_artifact(
       [Part(text=response_text)],
        name='result',
        metadata=resp_metadata,
        extensions = [self.state_ext.URI]
)  
await updater.complete()

By including self.state_ext.URI, we close the loop. The client asked for state injection, the server successfully handled it, and the server explicitly confirms it in the final receipt.

With the server fully equipped to advertise, accept, validate, and inject session state, we just need to update our A2A Client to actually send it. Let’s look at how to build the client-side of this handshake.

The Naive Client Approach (And Why It Fails)

At first glance, you might think all we need to do on the client side is simply create our state dictionary, assign it to a new key in the Message metadata, and send it off.

It seems logical. Here is what that naive implementation looks like:

user_state = {"user_info": {"name": "Lukasz", "role": "AI Specialist", "email": "lolejniczak@google.com"}}

ext = StateInjectionExtension() 

client = await ClientFactory(
       ClientConfig(httpx_client=httpx_client)
).create_from_url(
       url='http://localhost:9999'
)

# 4. construct Message
msg = Message(
            role=Role.ROLE_USER, 
            message_id=str(uuid4()), 
            parts=[Part(text="Who am I and what is my email?")]
)

## add metadata 
msg.metadata[ext.STATE_FIELD] = user_state
       
## 5. send Message 
events = [
     event async for event in client.send_message(request=SendMessageRequest(message=msg)) 
]

response = events[0]
        

In theory, we should expect the final print statement to output our extension’s URI, proving that the A2A Server understood the metadata and activated the state injection to generate the response:

print(response.task.artifacts[0].extensions)

However — when you run it exactly like this, you are going to hit a wall. Take a look at the terminal logs below (client on the left, server on the right) to see exactly what happens when we try this:

You quickly learn a critical rule of the A2A protocol: simply stuffing data into a Message’s metadata does not automatically request the activation of an A2A extension on the server side!

Why the Metadata Wasn’t Enough? Let’s look back at how our AgentExecutor actually processes incoming requests. We specifically added a step to check if the client explicitly requested the server to use our extension, utilizing one of our handy helper functions:

        def is_requested(self, context: RequestContext) -> bool:
            requested = self.URI in context.requested_extensions
            if requested:
                print(f"Extension {self.URI} requested for request {getattr(context, 'request_id', 'N/A')}")
            else:
                print(f"There is no {self.URI} on the list")
            return requested

Notice what this function is actually doing. It doesn’t look at the message metadata right away. Instead, it checks the RequestContext object to see if its requested_extensions collection includes the unique URI of our state injection extension.

If we pause the execution inside our AgentExecutor and print out the state of the RequestContext object for that incoming request, the problem becomes instantly clear:

As you can see, the requested_extensions set is completely empty! Even though we manually attached the JSON payload to the message envelope, we failed to properly notify the A2A protocol layer that it needed to activate the extension to process it. We need a way to properly signal this request from the client side.

It turns out that Message metadata is merely the transport layer—it is simply how we ship the additional data payload across the wire. However, to actually trigger the protocol-level behavior, we must modify our A2A Client to formally request that the server activate that specific extension.

Think of this as a classic separation of concerns in distributed system design: the metadata is our data plane, while the extension request acts as our control plane.

There are two primary ways an A2A Client can formalize this request:

  1. By injecting Service Parameters directly when setting up the client call.

  2. By injecting Service Parameters to all client calls using an Interceptor.

Method 1: Explicitly Passing Service Parameters

The most direct way to request an extension is by attaching Service Parameters to a specific client call. We can do this using the A2A SDK’s ServiceParametersFactory and with_a2a_extensions utilities.

Here, we define the parameters, wrap them in a ClientCallContext, and explicitly pass that context when we send our message:

from a2a.client.service_parameters import (
    ServiceParametersFactory,
    with_a2a_extensions,
)

ext = StateInjectionExtension() 

extensions = [ext.URI]

## <-- THE CRITICAL ADDITION
service_params = ServiceParametersFactory.create(
            [with_a2a_extensions(extensions)] 
)
## <-- THE CRITICAL ADDITION

context = ClientCallContext(service_parameters=service_params) ## <-- THE CRITICAL ADDITION


client = await ClientFactory(
            ClientConfig(httpx_client=httpx_client)
        ).create_from_url(
            url='http://localhost:9999'
        )

# 4. construct Message
msg = Message(
            role=Role.ROLE_USER, 
            message_id=str(uuid4()), 
            parts=[Part(text="Who am I and what is my email?")]
)
msg.metadata[ext.STATE_FIELD] = user_state
        
events = [
            event async for event in client.send_message(
                      request=SendMessageRequest(message=msg)), 
                      context=context)  ## <-- THE CRITICAL ADDITION
]
response = events[0]

This approach works perfectly if you only want to request the extension for one specific A2A Client call. But what if your application requires the session state to be injected with every single message? Passing the context manually every time quickly becomes repetitive and error-prone.

Method 2: The Interceptor Pattern

If you want to ensure the extension is requested automatically on every interaction, you should use Interceptors.

Think of an interceptor as a piece of middleware — a block of code injected on the client side that seamlessly catches and modifies requests before they leave the client, and catches responses before they reach the end user. If you are familiar with the ADK, this concept is very similar to ADK callbacks.

To implement this, we need to create our own custom class that extends the base ClientCallInterceptor. We will be asked to implement two core methods:

  • before: This allows us to intercept and modify the outgoing request (like silently adding our extension request and injecting the state JSON) right before it hits the network.

  • after: This allows us to inspect or modify the incoming response from the server before our application processes it.

Building the Custom Interceptor

To make our state injection automatic, we will create StateInjectionClientInterceptor by extending the base ClientCallInterceptor.

Our interceptor needs to do two things before the request leaves the client:

  1. Inject the Control Signal: Add the extension URI to the service parameters so the server knows to activate it.

  2. Inject the Payload: Attach the actual session state JSON to the message metadata.

Here is what that looks like in code:

class StateInjectionClientInterceptor(ClientCallInterceptor):
    def __init__(self, extension: Any, state_data: dict):
        self.extension = extension
        self.state_data = state_data

    async def before(self, args: BeforeArgs) -> Tuple[Any, Any]:
        uri = self.extension.URI
        field = self.extension.STATE_FIELD
    
        logger.info(f"Interceptor activating extension via context: {uri}")

        ## Adding service parameters
        extensions = [uri]
        service_params = ServiceParametersFactory.create(
            [with_a2a_extensions(extensions)]
        )
        ctx = ClientCallContext(service_parameters=service_params) 
        args.context = ctx


        request = None
        if isinstance(args.input, dict):
            request = args.input.get('request')
        elif hasattr(args.input, 'message'):
            request = args.input
        
        if request and hasattr(request, 'message') and request.message:
            msg = request.message
            if msg.metadata is None:
                msg.metadata = {}
            ## Adding metadata 
            msg.metadata[field] = self.state_data
            logger.debug(f"Injected state into message metadata: {field}")
        else:
            logger.warning("Interceptor could not find message in args.input to inject state.")

        # Return the modified input and early_return per 1.0-dev dataclass
        return args.input, args.context

    async def after(self, args: AfterArgs) -> Tuple[Any, bool]:
        """Satisfy abstract method."""
        return args.result, args.context

The Simplified Client

Because our interceptor now handles both the formal extension request and the metadata injection, our actual client code becomes incredibly clean.

We simply pass the interceptor to the ClientFactory during setup. From that point on, we just write our standard A2A message logic. The interceptor acts as invisible middleware, handling the complex state handshake in the background on every single call:

    async with httpx.AsyncClient() as httpx_client:
        # 1. Setup Data
        user_state = {"user_info": {"name": "Lukasz", "role": "AI Specialist", "email": "lolejniczak@google.com"}}
        
        # 2. Setup Extension and Interceptor
        ext = StateInjectionExtension() 
        state_interceptor = StateInjectionClientInterceptor(ext, user_state)

        client = await ClientFactory(
            ClientConfig(httpx_client=httpx_client)
        ).create_from_url(
            url='http://localhost:9999',
            interceptors=[state_interceptor] # <--- Magic happens here
        )

        # 4. Send Message
        msg = Message(
            role=Role.ROLE_USER, 
            message_id=str(uuid4()), 
            parts=[Part(text="Who am I and what is my email?")]
        )

        events = [
            event async for event in client.send_message(
                  request=SendMessageRequest(message=msg)
            ) 
        ]
        response = events[0]

Now, when we run our fully updated setup and debug the RequestContext inside the Server’s AgentExecutor, we finally see what we’ve been looking for. The set is no longer empty; our extension is proudly sitting on the list of requested_extensions:

Because the server sees the formal request, it safely extracts our metadata, validates the schema, initializes the ADK session, and executes the agent flawlessly!

Summary:

Breaking a monolithic multi-agent application into a distributed, A2A-driven architecture introduces some design challenges. Standard A2A conversational payloads are designed for request-response dialogue, however one can use A2A extensions as mechanism to also pass the out-of-band initialization data (like user profiles or session context) required by underlying ADK agents.

Here is the architectural breakdown of what we built, why it matters, and how we implemented it:

The Core Challenge (The “Why”) You cannot simply append a JSON object to a distributed agent’s prompt. To initialize an ADK Agent’s session state safely and predictably, the server needs to enforce a data schema, and the client needs a standardized way to transmit that data alongside the primary query.

The Solution Architecture (The “How”) We utilized A2A Extensions to create a custom, out-of-band data channel. This required separating our solution into a data plane (the actual session state JSON) and a control plane (the formal protocol request to activate the extension).

The Implementation (The “What”)

  • Server-Side Schema & Validation: We built a custom StateInjectionExtension that publishes our required JSON schema directly to the Agent Card. This allows clients to dynamically discover the required state structure.

  • Execution Hooking: We modified the AgentExecutor to intercept incoming requests, verify if the extension was requested, extract the metadata payload, and validate it against our schema. Once validated, we dynamically injected this payload into the ADK runner to initialize the create_session method before the agent began its work.

  • Client-Side Interceptors: We implemented the Interceptor Pattern. By extending ClientCallInterceptor, we built middleware that automatically injects both the formal extension request (via Service Parameters) and the data payload (via Message metadata) into every outbound request.

Code:

Code covered in this blog post is available in my git repository: GitHub - lolejniczak-shared/a2a-state-extension · GitHub

If you found this helpful, I’ve written more on A2A here:

The A2A 1.0 Milestone: Ensuring and Testing Backward Compatibility

Running and Debugging A2A Agents in Gemini Enterprise

This article is authored by Lukasz Olejniczak — Customer Engineer at Google Cloud. The views expressed are those of the authors and don’t necessarily reflect those of Google.

Please clap for this article if you enjoyed reading it. For more about google cloud, data science, data engineering, and AI/ML follow me on LinkedIn