Team Runs¶
What is a TeamRun?¶
A TeamRun represents a sequential execution pipeline of agents. Unlike Teams which define agent groups, TeamRuns manage how agents process data in sequence, with each agent receiving the output of the previous one. Optionally, a validator can be specified to create a final, structured output.
The key aspects are:
- Sequential processing order
- Message flow between agents
- Optional validation/structured output
- Execution monitoring
- Resource management
Creating TeamRuns¶
From Pool¶
# Create execution pipeline from agent names
run = pool.create_team_run(["analyzer", "planner", "executor"])
# With validator for structured output
run = pool.create_team_run(
["analyzer", "planner"],
validator=conclusion_writer.to_structured(AnalysisReport)
)
Using Pipeline Operator¶
# Create sequential pipeline from agents
pipeline = analyzer | planner | executor
def clean_data(msg: str) -> str:
return f"Cleaned: {msg}"
# With function transformers
pipeline = analyzer | clean_data | planner | execute_plan
# Mixing teams
pipeline = analysis_team | planning_team | execution_team
Running a TeamRun¶
Direct Execution¶
Simple run that waits for completion:
# Run and wait for results
results = await run.execute("Process this task")
# Access results
for response in results:
print(f"{response.agent_name}: {response.message.content}")
Monitored Execution¶
Get statistics while the run executes in background:
# Start run and get stats object
stats = await run.run_in_background("Process this task")
# Wait for completion when needed
results = await run.wait()
For detailed monitoring capabilities, see Task Monitoring.
Iterating Over Execution¶
For fine-grained control, you can use execute_iter()
which yields:
- Connection objects (
Talk
) before they're used - Responses (
AgentResponse
) after each agent executes
This allows you to:
- Configure routing before messages flow
- Monitor individual results
- Handle errors per agent
- Transform messages between agents
Example:
async for item in run.execute_iter("analyze this"):
match item:
case Talk():
print(f"Next: {item.source.name} -> {item.target.name}")
# Configure connection if needed
item.transform = lambda msg: f"Previous: {msg.content}"
case AgentResponse():
if item.success:
print(f"✅ {item.agent_name}: {item.message.content}")
else:
print(f"❌ {item.agent_name}: {item.error}")
break
Resource Management¶
TeamRuns handle cleanup automatically:
- Background tasks are tracked
- Connections are properly closed
- Resources are released on completion
You can also explicitly control the run:
# Cancel execution
await run.cancel()
# Check status
if run.is_running:
print("Still processing...")
Content Distribution¶
The distribute()
method allows sharing content and capabilities across all team members:
# Share knowledge with all agents
await run.distribute(
"Context: This is background information all agents should know.",
tools=["search_docs", "analyze_data"], # Share specific tools
resources=["knowledge_base", "guidelines"], # Share resources
)
Properties and Status¶
# Access team members
run.agents # List of agents in the pipeline
run.name # Team name (defaults to concatenated agent names)
# Check execution state
run.is_running # Whether execution is active
run.stats # Current execution statistics
Team Statistics¶
The stats
property provides aggregated information about the execution:
stats = run.stats
# Basic information
print(f"Team: {stats.source_names} → {stats.target_names}")
print(f"Active connections: {stats.num_connections}")
print(f"Messages: {stats.message_count}")
# Cost tracking
print(f"Total tokens: {stats.token_count}")
print(f"Total cost: ${stats.total_cost:.4f}")
print(f"Bytes transferred: {stats.byte_count}")
# Message access
for msg in stats.messages:
print(f"{msg.name}: {msg.content}")
# Error tracking
for agent, error, time in stats.errors:
print(f"{agent} failed at {time}: {error}")
Running in Background¶
The base class provides background execution support:
# Start execution
stats = await run.run_in_background("Process this")
# Wait for completion later
results = await run.wait()
# Or cancel if needed
await run.cancel()
Resource Management¶
The base class handles:
- Task tracking and cleanup
- Connection management
- Error propagation
- Background task cancellation
This ensures proper cleanup even in error cases:
try:
stats = await run.run_in_background("Process this")
# ... do other things ...
results = await run.wait()
except Exception:
# Background tasks are automatically cleaned up
# Connections are properly closed
await run.cancel() # Explicit cancellation if needed
The combination of sequential processing and monitoring capabilities makes TeamRuns suitable for both simple pipelines and complex multi-agent workflows.