This article shows you how to implement a stateful multi-agent system using Google’s Agent Development Kit (ADK). We’ll walk through:
- Session state management with
FunctionToolandToolContext - A custom agent (
StateFlowAgent) that inheritsBaseAgentand orchestrates sub-agents - Dynamic toolsets — StateManagementToolset is always available, while TaskProcessingToolset adapts to the current workflow
- A Coordinator → Onboarding/Support/Completion sub‑agent architecture
- An interactive demo powered by
InMemorySessionServiceandRunner

Implementation Overview
1. Custom Agent
- Create
StateFlowAgentby extendingBaseAgent - Implements its own
_run_async_implto route through coordinator and sub-agents
2. State Management Tools
- Expose eight
set_*/get_*functions as tools to read/writesession.state. These tools allow agents to control the conversation flow.
3. Toolsets
StateManagementToolsetalways provides eight state-management tools- TaskProcessingToolset provides onboarding tools when current_workflow == “onboarding”, support tools when current_workflow == “support”, and both tools when workflow is undefined or other
4. Sub-Agents
- Coordinator: decides which workflow to run and calls
set_workflow_phase - OnboardingAgent: guides new-user setup
- SupportAgent: handles troubleshooting
- CompletionAgent: summarizes and closes out
5. Orchestration Logic
The _run_async_impl method orchestrates the workflow within a single turn.
async def _run_async_impl(self, ctx): cw = ctx.session.state.get("current_workflow", "none") cs = ctx.session.state.get("current_stage", "start") # (1) If first time or start, run Coordinator if cw == "none" or cs == "start": async for ev in self.coordinator_agent.run_async(ctx): yield ev cw = ctx.session.state.get("current_workflow", "none") # (2) Route to the appropriate sub-agent if cw == "onboarding": async for ev in self.onboarding_agent.run_async(ctx): yield ev elif cw == "support": async for ev in self.support_agent.run_async(ctx): yield ev elif cw == "completion": async for ev in self.completion_agent.run_async(ctx): yield ev else: logger.warning(f"Unknown workflow: {cw}")
6. Interactive Demo
- Use
InMemorySessionServiceto keep session state in memory - Use
Runner.run(user_id, session_id, new_message)to drive the agent run_demo_interaction()prints before/after state and agent responses
1. State Management Tools (FunctionTool)
1.1 set_workflow_state / get_workflow_state
- Store and retrieve
current_workflow¤t_stageinsession.state. - Let any LLM sub-agent call these to signal “what step we’re on.”
1.2 update_user_data / get_user_data
- Persist arbitrary user data under keys prefixed with
"user:". - Useful for collecting profile info or user input across turns.
1.3 set_temporary_flag
- Store ephemeral flags under
"temp:…"—they don’t need to survive across sessions.
1.4 apply_state_delta
- Apply multiple key/value updates in one call. To increment a numerical value, pass the special string
"INCREMENT:"as the value for the corresponding key.
1.5 get_workflow_phase / set_workflow_phase
- Manage a finer-grained
phasewithin eachstage, and automatically track the history of the last 10 phase transitions. This is the recommended and more powerful way to manage workflow progression, as it encapsulatesworkflow,stage, andphaseupdates in a single call.
2. Toolset Definitions (BaseToolset)
2.1 StateManagementToolset
Always returns all eight state‑management tools, providing a consistent interface for all agents to manage the session state.
class StateManagementToolset(BaseToolset): def __init__(self): self._tools = [ FunctionTool(func=set_workflow_state), FunctionTool(func=get_workflow_state), FunctionTool(func=update_user_data), FunctionTool(func=get_user_data), FunctionTool(func=set_temporary_flag), FunctionTool(func=apply_state_delta), FunctionTool(func=get_workflow_phase), FunctionTool(func=set_workflow_phase), ] async def get_tools(self, readonly_context=None): return self._tools async def close(self): await asyncio.sleep(0)
2.2 TaskProcessingToolset
Returns tools dynamically:
current_workflow == 'onboarding'→[process_onboarding_task]current_workflow == 'support'→[process_support_task]- otherwise →
[process_onboarding_task, process_support_task]
class TaskProcessingToolset(BaseToolset): def __init__(self): self._onboarding_tool = FunctionTool(func=process_onboarding_task) self._support_tool = FunctionTool(func=process_support_task) async def get_tools(self, readonly_context=None): wf = readonly_context.state.get("current_workflow","none") if wf == "onboarding": return [self._onboarding_tool] elif wf == "support": return [self._support_tool] else: return [self._onboarding_tool, self._support_tool] async def close(self): await asyncio.sleep(0)
3. Sub-Agent Definitions (LlmAgent)
Coordinator
- Reads the current phase, analyzes user input, and must call
set_workflow_phase(...)to correctly initialize the workflow, stage, and phase.
OnboardingAgent
- Guides users through collection, validation, setup, completion phases.
SupportAgent
- Handles “analyze_issue → provide_solution → verify_resolution → complete.”
CompletionAgent
- Summarizes and sets final state to
"completion".
Each sub-agent references the shared state_toolset and, where needed, task_toolset.
4. Custom Agent Orchestration (StateFlowAgent)
- Inherits
BaseAgentand holds references to the four sub-agents. - Implements
_run_async_implto:
- Run Coordinator on first turn or when stage is
"start" - Immediately, within the same turn, reroute to the
OnboardingAgent,SupportAgent, orCompletionAgentbased on thecurrent_workflowstate set by the Coordinator.
This approach makes the entire system “stateful” — every decision is driven by stored session data.
5. Session Management & Interactive Demo
# Create session in memoryawait session_service.create_session( app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID, state={"app:name":"StateFlowDemo","app:version":"1.0.0"})# Read multiple user lines, then demo each:print("Enter your messages (blank line to finish):")lines = []while True: line = input("> ").strip() if not line: break lines.append(line)for msg in lines: await run_demo_interaction(msg) input("Press Enter …")
run_demo_interaction()prints the state before/after and the agent’s response.- Each call to
runner.run(...)loads the latest session, executes_run_async_impl, and applies state deltas.
6. Full Code Listing
"""Custom Agent with State Management Tools and Flow SwitchingThis implementation demonstrates:1. State management through function calling tools2. Flow switching based on state values3. Custom toolset implementation4. Dynamic agent behavior based on state"""import asyncioimport loggingfrom typing import Dict, Any, List, Optional, AsyncGeneratorfrom typing_extensions import overrideimport jsonfrom google.adk.agents import LlmAgent, BaseAgentfrom google.adk.agents.invocation_context import InvocationContextfrom google.adk.tools import FunctionTool, BaseToolfrom google.adk.tools.base_toolset import BaseToolsetfrom google.adk.tools.tool_context import ToolContextfrom google.adk.agents.readonly_context import ReadonlyContextfrom google.genai import typesfrom google.adk.sessions import InMemorySessionServicefrom google.adk.runners import Runnerfrom google.adk.events import Event# --- Constants ---APP_NAME = "state_flow_app"USER_ID = "user123"SESSION_ID = "session456"GEMINI_2_FLASH = "gemini-2.0-flash"# --- Configure Logging ---logging.basicConfig(level=logging.INFO)logger = logging.getLogger(__name__)# --- State Management Tools ---def set_workflow_state(workflow_name: str, stage: str, tool_context: ToolContext) -> Dict[str, Any]: """ Sets the current workflow and stage in session state. Args: workflow_name: Name of the workflow (e.g., 'onboarding', 'task_processing', 'support') stage: Current stage within the workflow (e.g., 'collect_info', 'process', 'complete') tool_context: Tool execution context Returns: Status of the operation """ logger.info(f"Setting workflow state: {workflow_name} -> {stage}") tool_context.state["current_workflow"] = workflow_name tool_context.state["current_stage"] = stage tool_context.state["last_state_update"] = "set_workflow_state" return { "status": "success", "message": f"Workflow set to {workflow_name}, stage: {stage}" }def get_workflow_state(tool_context: ToolContext) -> Dict[str, Any]: """ Gets the current workflow state from session. Args: tool_context: Tool execution context Returns: Current workflow state information """ current_workflow = tool_context.state.get("current_workflow", "none") current_stage = tool_context.state.get("current_stage", "none") logger.info(f"Retrieved workflow state: {current_workflow} -> {current_stage}") return { "status": "success", "current_workflow": current_workflow, "current_stage": current_stage, "available_workflows": ["onboarding", "task_processing", "support", "completion"] }def update_user_data(key: str, value: str, tool_context: ToolContext) -> Dict[str, Any]: """ Updates user-specific data in the session state. Args: key: The key to store the data under (will be prefixed with 'user:') value: The value to store tool_context: Tool execution context Returns: Status of the operation """ user_key = f"user:{key}" tool_context.state[user_key] = value tool_context.state["last_data_update"] = key logger.info(f"Updated user data: {user_key} = {value}") return { "status": "success", "message": f"Updated user data: {key} = {value}" }def get_user_data(key: str, tool_context: ToolContext) -> Dict[str, Any]: """ Retrieves user-specific data from the session state. Args: key: The key to retrieve (will be prefixed with 'user:') tool_context: Tool execution context Returns: The requested user data or None if not found """ user_key = f"user:{key}" value = tool_context.state.get(user_key) logger.info(f"Retrieved user data: {user_key} = {value}") return { "status": "success", "key": key, "value": value, "found": value is not None }def set_temporary_flag(flag_name: str, flag_value: bool, tool_context: ToolContext) -> Dict[str, Any]: """ Sets a temporary flag that won't persist across sessions. Args: flag_name: Name of the flag flag_value: Boolean value for the flag tool_context: Tool execution context Returns: Status of the operation """ temp_key = f"temp:{flag_name}" tool_context.state[temp_key] = flag_value logger.info(f"Set temporary flag: {temp_key} = {flag_value}") return { "status": "success", "message": f"Temporary flag {flag_name} set to {flag_value}" }def apply_state_delta(state_changes: Dict[str, Any], tool_context: ToolContext) -> Dict[str, Any]: """ Apply multiple state changes at once using a state delta pattern. This demonstrates the EventActions.state_delta approach from the documentation. Args: state_changes: Dictionary of state changes to apply tool_context: Tool execution context Returns: Status and summary of the operation """ import time logger.info(f"Applying state delta with {len(state_changes)} changes") # Apply all state changes through the tool context changes_applied = [] for key, value in state_changes.items(): # Handle special case for incrementing counters if isinstance(value, str) and value.startswith("INCREMENT:"): base_key = value.replace("INCREMENT:", "") current_value = tool_context.state.get(key, 0) new_value = current_value + 1 tool_context.state[key] = new_value changes_applied.append(f"{key}: {current_value} -> {new_value}") else: tool_context.state[key] = value changes_applied.append(f"{key}: {value}") # Add metadata about the operation tool_context.state["last_delta_timestamp"] = time.time() tool_context.state["temp:last_delta_operation"] = "apply_state_delta" logger.info(f"Applied state delta: {changes_applied}") return { "status": "success", "message": f"Applied {len(state_changes)} state changes", "changes_applied": changes_applied, "operation_timestamp": tool_context.state["last_delta_timestamp"] }def get_workflow_phase(tool_context: ToolContext) -> Dict[str, Any]: """ Gets the current workflow phase with enhanced tracking. Phase represents the detailed step within a workflow stage. Args: tool_context: Tool execution context Returns: Current workflow phase information """ current_workflow = tool_context.state.get("current_workflow", "none") current_stage = tool_context.state.get("current_stage", "none") current_phase = tool_context.state.get("workflow_phase", "start") # Get phase history if available phase_history = tool_context.state.get("phase_history", []) logger.info(f"Retrieved workflow phase: {current_workflow}.{current_stage}.{current_phase}") return { "status": "success", "current_workflow": current_workflow, "current_stage": current_stage, "current_phase": current_phase, "phase_history": phase_history, "available_phases": { "onboarding": ["start", "collect_info", "validate", "setup", "complete"], "support": ["start", "analyze", "diagnose", "resolve", "verify", "complete"], "task_processing": ["start", "prepare", "execute", "review", "complete"], "completion": ["start", "summarize", "finalize", "complete"] } }def set_workflow_phase(workflow_name: str, stage: str, phase: str, tool_context: ToolContext) -> Dict[str, Any]: """ Sets the current workflow, stage, and phase with history tracking. Uses state delta pattern internally for multiple updates. Args: workflow_name: Name of the workflow stage: Current stage within the workflow phase: Detailed phase within the stage tool_context: Tool execution context Returns: Status of the operation """ import time current_time = time.time() # Get current phase history phase_history = tool_context.state.get("phase_history", []) # Create state delta for multiple coordinated updates state_delta = { "current_workflow": workflow_name, "current_stage": stage, "workflow_phase": phase, "last_phase_update": current_time, "temp:phase_transition": f"{stage}.{phase}", } # Add to phase history phase_entry = { "workflow": workflow_name, "stage": stage, "phase": phase, "timestamp": current_time } phase_history.append(phase_entry) # Keep only last 10 phase transitions if len(phase_history) > 10: phase_history = phase_history[-10:] state_delta["phase_history"] = phase_history # Apply all changes through tool context for key, value in state_delta.items(): tool_context.state[key] = value logger.info(f"Set workflow phase: {workflow_name}.{stage}.{phase}") return { "status": "success", "message": f"Workflow phase set to {workflow_name}.{stage}.{phase}", "previous_phases": len(phase_history) - 1, "transition_id": f"{workflow_name}_{stage}_{phase}_{int(current_time)}" }# --- State Management Toolset ---class StateManagementToolset(BaseToolset): """Toolset for managing session state and workflow flow.""" def __init__(self, prefix: str = "state_"): self.prefix = prefix # Create FunctionTool instances (FunctionTool uses function name by default) self._set_workflow_tool = FunctionTool(func=set_workflow_state) self._get_workflow_tool = FunctionTool(func=get_workflow_state) self._update_user_data_tool = FunctionTool(func=update_user_data) self._get_user_data_tool = FunctionTool(func=get_user_data) self._set_flag_tool = FunctionTool(func=set_temporary_flag) # New enhanced phase and delta tools self._apply_delta_tool = FunctionTool(func=apply_state_delta) self._get_phase_tool = FunctionTool(func=get_workflow_phase) self._set_phase_tool = FunctionTool(func=set_workflow_phase) logger.info(f"StateManagementToolset initialized with prefix '{self.prefix}'") async def get_tools(self, readonly_context: Optional[ReadonlyContext] = None) -> List[BaseTool]: """Return all state management tools.""" tools = [ self._set_workflow_tool, self._get_workflow_tool, self._update_user_data_tool, self._get_user_data_tool, self._set_flag_tool, # Enhanced phase and delta tools self._apply_delta_tool, self._get_phase_tool, self._set_phase_tool ] logger.info(f"StateManagementToolset providing {len(tools)} tools") return tools async def close(self) -> None: """Clean up resources.""" logger.info("StateManagementToolset.close() called") await asyncio.sleep(0)# --- Workflow Processing Functions ---def process_onboarding_task(task_details: str, tool_context: ToolContext) -> Dict[str, Any]: """Process an onboarding-related task.""" logger.info(f"Processing onboarding task: {task_details}") # Simulate onboarding logic tool_context.state["temp:onboarding_task"] = task_details tool_context.state["onboarding_progress"] = "in_progress" return { "status": "success", "message": f"Onboarding task started: {task_details}", "next_action": "collect_user_information" }def process_support_task(issue_type: str, description: str, tool_context: ToolContext) -> Dict[str, Any]: """Process a support-related task.""" logger.info(f"Processing support task: {issue_type} - {description}") # Simulate support logic tool_context.state["temp:support_issue"] = { "type": issue_type, "description": description } tool_context.state["support_status"] = "analyzing" return { "status": "success", "message": f"Support ticket created for {issue_type}", "ticket_id": f"TICKET_{hash(description) % 10000}", "next_action": "analyze_issue" }# --- Task Processing Toolset ---class TaskProcessingToolset(BaseToolset): """Toolset for processing different types of tasks based on workflow state.""" def __init__(self, prefix: str = "task_"): self.prefix = prefix # Create FunctionTool instances (FunctionTool uses function name by default) self._onboarding_tool = FunctionTool(func=process_onboarding_task) self._support_tool = FunctionTool(func=process_support_task) logger.info(f"TaskProcessingToolset initialized with prefix '{self.prefix}'") async def get_tools(self, readonly_context: Optional[ReadonlyContext] = None) -> List[BaseTool]: """Return tools based on current workflow state.""" tools = [] if readonly_context: current_workflow = readonly_context.state.get("current_workflow", "none") if current_workflow == "onboarding": tools.append(self._onboarding_tool) elif current_workflow == "support": tools.append(self._support_tool) else: # Default: provide both tools tools.extend([self._onboarding_tool, self._support_tool]) else: # No context available, provide all tools tools.extend([self._onboarding_tool, self._support_tool]) logger.info(f"TaskProcessingToolset providing {len(tools)} tools for workflow: {readonly_context.state.get('current_workflow', 'unknown') if readonly_context else 'no_context'}") return tools async def close(self) -> None: """Clean up resources.""" logger.info("TaskProcessingToolset.close() called") await asyncio.sleep(0)# --- Custom State-Aware Agent ---class StateFlowAgent(BaseAgent): """ Custom agent that switches behavior based on session state. This agent demonstrates: - State-driven flow control - Dynamic tool selection - Workflow orchestration """ # Field declarations for Pydantic coordinator_agent: LlmAgent onboarding_agent: LlmAgent support_agent: LlmAgent completion_agent: LlmAgent model_config = {"arbitrary_types_allowed": True} def __init__( self, name: str, coordinator_agent: LlmAgent, onboarding_agent: LlmAgent, support_agent: LlmAgent, completion_agent: LlmAgent, ): """Initialize the StateFlowAgent with specialized sub-agents.""" sub_agents_list = [ coordinator_agent, onboarding_agent, support_agent, completion_agent ] super().__init__( name=name, coordinator_agent=coordinator_agent, onboarding_agent=onboarding_agent, support_agent=support_agent, completion_agent=completion_agent, sub_agents=sub_agents_list ) @override async def _run_async_impl(self, ctx: InvocationContext) -> AsyncGenerator[Event, None]: """ Implement custom orchestration logic based on session state. """ logger.info(f"[{self.name}] Starting state-driven workflow") # Get current workflow state current_workflow = ctx.session.state.get("current_workflow", "none") current_stage = ctx.session.state.get("current_stage", "start") logger.info(f"[{self.name}] Current workflow: {current_workflow}, stage: {current_stage}") # Always start with coordinator to determine flow if current_workflow == "none" or current_stage == "start": logger.info(f"[{self.name}] Running coordinator to determine workflow") async for event in self.coordinator_agent.run_async(ctx): yield event # Re-read state after coordinator current_workflow = ctx.session.state.get("current_workflow", "none") current_stage = ctx.session.state.get("current_stage", "start") logger.info(f"[{self.name}] After coordinator - workflow: {current_workflow}, stage: {current_stage}") # Route to appropriate agent based on workflow if current_workflow == "onboarding": logger.info(f"[{self.name}] Executing onboarding workflow") async for event in self.onboarding_agent.run_async(ctx): yield event elif current_workflow == "support": logger.info(f"[{self.name}] Executing support workflow") async for event in self.support_agent.run_async(ctx): yield event elif current_workflow == "completion": logger.info(f"[{self.name}] Executing completion workflow") async for event in self.completion_agent.run_async(ctx): yield event else: logger.warning(f"[{self.name}] Unknown workflow: {current_workflow}") # Check if workflow should continue or is complete final_stage = ctx.session.state.get("current_stage", "none") if final_stage != "complete": logger.info(f"[{self.name}] Workflow in progress, stage: {final_stage}") else: logger.info(f"[{self.name}] Workflow completed") # Set workflow to completion if not already there if current_workflow != "completion": ctx.session.state["current_workflow"] = "completion" ctx.session.state["current_stage"] = "start"# --- Create Tool Instances ---state_toolset = StateManagementToolset()task_toolset = TaskProcessingToolset()# --- Define Specialized Agents ---coordinator_agent = LlmAgent( name="Coordinator", model=GEMINI_2_FLASH, instruction="""You are a workflow coordinator. Based on the user's request, determine the appropriate workflow and set the initial state.Use the state management tools to:1. First, get the current workflow phase with get_workflow_phase2. Analyze the user's request to determine the appropriate workflow: - 'onboarding' for new user setup, account creation, getting started - 'support' for help, issues, problems, troubleshooting - 'task_processing' for general tasks, work items, processing requests - 'completion' for finishing up, wrapping up, final steps3. Set the workflow using set_workflow_phase with the determined workflow, stage, and phase4. You can also use apply_state_delta to make multiple coordinated state changes5. If relevant, update user data with update_user_dataAlways explain your decision and what workflow you're setting up.""", tools=[state_toolset])onboarding_agent = LlmAgent( name="OnboardingAgent", model=GEMINI_2_FLASH, instruction="""You are an onboarding specialist. Help new users get started.Your workflow phases:- collect_info: start -> validate -> setup -> complete- setup_account: start -> configure -> verify -> complete - provide_tutorial: start -> explain -> practice -> completeUse state tools to:- Get current workflow phase with get_workflow_phase- Update workflow phase with set_workflow_phase as you progress- Use apply_state_delta for coordinated state updates- Store user information with update_user_data- Use process_onboarding_task to process onboarding tasksCurrent workflow phase: {workflow_phase}User data collected: Check with get_user_data for 'name', 'role', 'experience_level'Guide users through onboarding with clear phase progression.""", tools=[state_toolset, task_toolset])support_agent = LlmAgent( name="SupportAgent", model=GEMINI_2_FLASH, instruction="""You are a support specialist. Help users with issues and problems.Your workflow stages:1. 'analyze_issue' - Understand the problem2. 'provide_solution' - Offer solutions and help3. 'verify_resolution' - Confirm the issue is resolved4. 'complete' - Close the support caseUse state tools to:- Get current workflow state with get_workflow_state- Update workflow stage with set_workflow_state as you progress- Use process_support_task to process support requests- Set temporary flags with set_temporary_flag for tracking issue resolutionCurrent workflow stage: {current_stage}Support case details: Check temp state for 'support_issue'Provide helpful, step-by-step assistance to resolve user issues.""", tools=[state_toolset, task_toolset])completion_agent = LlmAgent( name="CompletionAgent", model=GEMINI_2_FLASH, instruction="""You are a completion specialist. Help users wrap up and finish their sessions.Your role:- Summarize what was accomplished- Provide next steps or recommendations - Offer additional help if needed- Set workflow to 'none' when truly finishedUse state tools to:- Get current workflow state and user data with get_workflow_state and get_user_data- Set final state with set_workflow_state('completion', 'finished') when done- Summarize the session based on the state informationCurrent session state shows: {current_workflow} workflow was completed.Check user data and previous actions to provide a helpful summary.""", tools=[state_toolset])# --- Create the Main Agent ---state_flow_agent = StateFlowAgent( name="StateFlowAgent", coordinator_agent=coordinator_agent, onboarding_agent=onboarding_agent, support_agent=support_agent, completion_agent=completion_agent)# --- Setup Session and Runner ---session_service = InMemorySessionService()initial_state = { "app:name": "StateFlowDemo", "app:version": "1.0.0"}# Session will be created in the main functionrunner = Runner( agent=state_flow_agent, app_name=APP_NAME, session_service=session_service)# --- Demo Function ---async def run_demo_interaction(user_input: str): """ Run a demo interaction with the state flow agent. Args: user_input: The user's input message """ print(f"\n{'='*50}") print(f"USER INPUT: {user_input}") print(f"{'='*50}") # Get current state before interaction current_session = await session_service.get_session( app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID ) print(f"State before: {json.dumps(current_session.state, indent=2)}") # Create user message and run agent content = types.Content( role='user', parts=[types.Part(text=user_input)] ) events = runner.run( user_id=USER_ID, session_id=SESSION_ID, new_message=content ) # Process events and capture final response final_response = "No response captured" for event in events: if event.is_final_response() and event.content and event.content.parts: final_response = event.content.parts[0].text break # Get final state final_session = await session_service.get_session( app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID ) print(f"\nAGENT RESPONSE: {final_response}") print(f"\nState after: {json.dumps(final_session.state, indent=2)}") print(f"{'='*50}\n")# --- State Delta Demo Function ---async def demo_state_delta_functions(): """ Demonstrate the state delta functions without requiring API credentials. This shows how the functions work with direct ToolContext simulation. """ print("=== State Delta Functions Demo ===\n") # Create a mock ToolContext for demonstration class MockToolContext: def __init__(self): self.state = { "app:name": "StateFlowDemo", "app:version": "1.0.0", "current_workflow": "none", "current_stage": "start" } mock_context = MockToolContext() print("1. Initial State:") print(json.dumps(mock_context.state, indent=2)) print("\n2. Setting workflow phase...") result = set_workflow_phase("onboarding", "collect_info", "start", mock_context) print(f"Result: {result}") print("State after phase set:") print(json.dumps(mock_context.state, indent=2)) print("\n3. Applying state delta...") delta_changes = { "user:name": "Alice Johnson", "user:role": "developer", "user:login_count": "INCREMENT:", # Special increment syntax "task_priority": "high", "temp:demo_flag": True } result = apply_state_delta(delta_changes, mock_context) print(f"Result: {result}") print("State after delta:") print(json.dumps(mock_context.state, indent=2)) print("\n4. Getting workflow phase...") result = get_workflow_phase(mock_context) print(f"Phase info: {result}") print("\n5. Progressing to next phase...") result = set_workflow_phase("onboarding", "collect_info", "validate", mock_context) print(f"Result: {result}") print("Final state:") print(json.dumps(mock_context.state, indent=2)) print("\n=== Demo completed! ===")# --- Main Demo ---async def main(): print("State Flow Agent with Delta Functions") print("=====================================\n") # Delta Function Demo await demo_state_delta_functions() print("\nState delta functions are working perfectly!") print("The full agent demo requires API credentials and is commented out.") print("This demonstrates the core state management functionality.") # Full Agent Demo (Interactive) print("\nStarting State Flow Agent Demo") print("This demonstrates state-driven workflow management\n") # Create the session await session_service.create_session( app_name=APP_NAME, user_id=USER_ID, session_id=SESSION_ID, state=initial_state ) # Get messages from the user line by line print("Please enter the messages to use for the interaction in order.") print("Press Enter on an empty line to finish.\n") demo_interactions = [] idx = 1 while True: msg = input(f"{idx}. Message > ").strip() if not msg: break demo_interactions.append(msg) idx += 1 # Confirm and display the entered messages try: from tabulate import tabulate table = [[i + 1, m] for i, m in enumerate(demo_interactions)] print("\nEntered Messages:") print(tabulate(table, headers=["#", "Message"], tablefmt="grid")) except ImportError: print("\nEntered Messages:") for i, m in enumerate(demo_interactions, 1): print(f"{i}. {m}") input("\nPress Enter to run with these messages...") # Run the interaction for each message for i, interaction in enumerate(demo_interactions, 1): print(f"\n--- Interaction {i}/{len(demo_interactions)} ---") await run_demo_interaction(interaction) if i < len(demo_interactions): input("Press Enter to continue to next interaction…") print("\n--- All interactions completed ---") # Continue with additional interactions? print("\nDo you want to continue the conversation?") while True: additional_msg = input("Additional message (or press Enter to exit): ").strip() if not additional_msg: break await run_demo_interaction(additional_msg) print("Demo completed!") print("Thank you for trying the State Flow Agent with Delta Functions!")if __name__ == "__main__": asyncio.run(main())