Team Execution and Monitoring¶
LLMling provides two main classes for team operations:
Team
: A group of agents that can work togetherTeamRun
: A sequential execution pipeline of agents
This guide focuses on execution monitoring and progress tracking.
Basic Team Operation¶
Teams can be created and connected in different ways:
# Create team from agents
team = Team([agent1, agent2, agent3])
# Create team through pool
team = pool.create_team(["analyzer", "planner", "executor"])
# Connect team to target
team >> target_agent # All team members forward to target
Sequential Execution with TeamRun¶
For monitored sequential processing, use TeamRun:
# Create execution pipeline
run = agent1 | agent2 | agent3 # Direct creation
# or
run = pool.create_team_run(["analyzer", "planner", "executor"])
Monitoring Execution¶
TeamRun provides built-in monitoring through its background execution:
# Start execution in background
stats = await run.run_in_background("Task to execute")
# Check current status while running
while run.is_running:
print(f"Active connections: {len(stats)}")
print(f"Messages processed: {len(stats.messages)}")
print(f"Errors: {len(stats.errors)}")
await asyncio.sleep(0.5)
# Wait for completion and get results
result = await run.wait()
The Stats Object¶
The AggregatedTalkStats object provides execution metrics and history:
# Access during or after run
stats = run.stats
# Message information
stats.messages # All messages exchanged
stats.message_count # Total message count
stats.tool_calls # Tool calls made
# Timing and costs
stats.token_count # Total tokens used
stats.total_cost # Total cost in USD
stats.byte_count # Raw data transferred
# Agent information
stats.source_names # Agents that sent messages
stats.target_names # Agents that received messages
# Additional data
stats.error_log # Any errors that occurred
Example: Monitored Execution¶
from llmling_agent import AgentPool
async def main():
async with AgentPool() as pool:
# Create three agents with different roles
agent1 = await pool.add_agent(
"analyzer",
system_prompt="You analyze text and find key points."
)
agent2 = await pool.add_agent(
"summarizer",
system_prompt="You create concise summaries."
)
agent3 = await pool.add_agent(
"critic",
system_prompt="You evaluate and critique summaries."
)
# Create execution pipeline
run = agent1 | agent2 | agent3
# Start run and get stats object
stats = await run.run_in_background("Process this text...")
# Monitor progress
while run.is_running:
print("\nCurrent status:")
print(f"Processed messages: {stats.message_count}")
print(f"Active connections: {len(stats)}")
print(f"Errors: {len(stats.error_log)}")
await asyncio.sleep(0.5)
# Wait for completion
result = await run.wait()
# Final statistics
print(f"\nExecution complete:")
print(f"Total cost: ${stats.total_cost:.4f}")
print(f"Total tokens: {stats.token_count}")
Resource Management¶
Both Team and TeamRun handle cleanup automatically:
- Background tasks are tracked and cleaned up
- Connections are properly closed
- Resources are released on completion
Summary¶
The current monitoring system provides:
- Real-time execution tracking
- Comprehensive statistics
- Error logging
- Cost tracking
- Resource management
All while maintaining clean separation between team organization (Team) and sequential execution (TeamRun).