Agent Evaluation and Testing: From Development to Production
A comprehensive guide to evaluating AI agents—task success metrics, trajectory analysis, tool use correctness, sandboxing, and building robust testing pipelines for production agent systems.
Table of Contents
The Agent Evaluation Challenge
Evaluating AI agents is fundamentally different from evaluating LLMs. A language model produces text; an agent produces actions—and those actions have real-world consequences. Your agent might successfully generate correct SQL but fail because it queried the wrong database. It might use the right tools but in the wrong order. It might complete a task but take 50 steps when 5 would suffice.
This guide covers the complete agent evaluation stack: metrics that matter, testing frameworks, trajectory analysis, sandboxing for safe evaluation, and production monitoring.
Prerequisites:
- Familiarity with building agentic AI systems
- Understanding of LLM evaluation basics
- Experience deploying ML systems
What you'll learn:
- Agent-specific evaluation metrics beyond LLM benchmarks
- Building evaluation datasets and test suites
- Trajectory analysis and debugging
- Sandboxed execution environments
- Production monitoring and alerting
- A/B testing for agents
Why Agent Evaluation Is Different
Traditional LLM evaluation asks: "Is this output correct?" Agent evaluation must ask:
| Dimension | LLM Evaluation | Agent Evaluation |
|---|---|---|
| Output | Text quality | Task completion |
| Process | N/A | Tool selection, ordering, efficiency |
| Safety | Content filtering | Action consequences |
| Cost | Tokens | Tokens + API calls + compute |
| Latency | Time to first token | End-to-end task time |
| State | Stateless | Multi-step with memory |
An agent that produces beautiful reasoning but calls the wrong API is worse than one with terse reasoning that completes the task.
Core Agent Metrics
1. Task Success Rate
The most important metric: did the agent complete the task correctly?
Evaluating this is tricky because "correct" isn't always binary. Consider an agent asked to summarize a document: there's no single correct answer, just degrees of quality. We use LLM-as-judge to evaluate semantic similarity between actual and expected answers, with a threshold (0.8) to determine success.
Why task success evaluation is harder than LLM evaluation: With LLMs, you compare generated text to reference text. With agents, you must evaluate whether a sequence of actions achieved a goal. The agent might take an unexpected path but still succeed. Or it might execute the expected plan but fail due to external factors (API down, data missing). You need to separate "agent behavior quality" from "task outcome."
The three evaluation strategies and when to use each:
-
Exact match - Use for deterministic tasks with one correct answer. Example: "What is the capital of France?" Expected: "Paris". Actual must match exactly (after normalization). Fast, cheap, but only works for factual queries.
-
LLM-as-judge semantic comparison - Use for generation tasks with multiple valid answers. Example: "Summarize this document in 3 sentences." The LLM judge compares semantic similarity between expected and actual summaries. Flexible, handles paraphrasing, but costs tokens and can be inconsistent.
-
Custom callable criteria - Use for programmatic validation. Example: "Generate valid JSON with these fields." Check if output parses and contains required keys. Deterministic, fast, domain-specific.
The TaskResult data structure is the agent's report card: It captures not just success/failure but the full story: how many steps were taken (efficiency), which tools were called (strategy), errors encountered (failure modes), tokens used (cost), and latency (performance). This rich signal enables diagnosing WHY an agent succeeded or failed, not just THAT it did.
The TaskEvaluator implementation below shows production-grade evaluation:
Key design decisions:
- Multiple outcome types beyond binary success/failure (partial success, error, timeout) - real agents fail in many ways
- Verified success - The agent reported success, but was it actually correct? Cross-check against ground truth
- Efficiency metrics tracked alongside correctness - A slow, expensive success is still a problem
- LLM judge with fallback - If judge unavailable, fall back to exact match rather than failing evaluation
- Partial completion estimation - For debugging: if agent failed, how far did it get?
The _compare_answers method implements the LLM-as-judge pattern correctly: clear evaluation criteria (semantic similarity, key facts, meaning equivalence), request just a number (not verbose explanation), handle parsing failures gracefully. The prompt is deliberately simple—complex scoring rubrics increase judge variance.
from dataclasses import dataclass
from typing import Optional, Callable
from enum import Enum
class TaskOutcome(Enum):
SUCCESS = "success"
PARTIAL_SUCCESS = "partial_success"
FAILURE = "failure"
ERROR = "error"
TIMEOUT = "timeout"
@dataclass
class TaskResult:
task_id: str
outcome: TaskOutcome
final_answer: Optional[str]
expected_answer: Optional[str]
steps_taken: int
total_tokens: int
latency_ms: float
tool_calls: list[dict]
errors: list[str]
class TaskEvaluator:
"""Evaluate task completion."""
def __init__(self, judge_llm=None):
self.judge_llm = judge_llm
def evaluate(
self,
task: str,
result: TaskResult,
ground_truth: Optional[str] = None,
success_criteria: Optional[Callable[[str], bool]] = None
) -> dict:
"""Evaluate a task result."""
evaluation = {
"task_id": result.task_id,
"outcome": result.outcome.value,
"metrics": {}
}
# Outcome-based evaluation
if result.outcome == TaskOutcome.SUCCESS:
if ground_truth:
# Compare against ground truth
match_score = self._compare_answers(
result.final_answer,
ground_truth
)
evaluation["metrics"]["answer_match"] = match_score
evaluation["verified_success"] = match_score > 0.8
if success_criteria:
# Apply custom success criteria
evaluation["verified_success"] = success_criteria(result.final_answer)
elif result.outcome == TaskOutcome.PARTIAL_SUCCESS:
# Evaluate partial completion
evaluation["metrics"]["completion_percentage"] = self._estimate_completion(
task, result
)
# Efficiency metrics
evaluation["metrics"]["steps"] = result.steps_taken
evaluation["metrics"]["tokens"] = result.total_tokens
evaluation["metrics"]["latency_ms"] = result.latency_ms
evaluation["metrics"]["tool_calls"] = len(result.tool_calls)
return evaluation
def _compare_answers(self, actual: str, expected: str) -> float:
"""Compare actual answer to expected using LLM judge."""
if not self.judge_llm:
# Fall back to exact match
return 1.0 if actual.strip().lower() == expected.strip().lower() else 0.0
prompt = f"""Compare these two answers and rate their semantic similarity from 0.0 to 1.0.
Expected answer: {expected}
Actual answer: {actual}
Consider:
- Do they convey the same information?
- Are key facts correct?
- Is the meaning equivalent even if wording differs?
Score (just the number):"""
response = self.judge_llm.chat([{"role": "user", "content": prompt}])
try:
return float(response.content.strip())
except ValueError:
return 0.5
def _estimate_completion(self, task: str, result: TaskResult) -> float:
"""Estimate partial task completion percentage."""
if not self.judge_llm:
return 0.5
prompt = f"""Estimate what percentage of this task was completed.
Task: {task}
Agent's work:
- Steps taken: {result.steps_taken}
- Tool calls: {len(result.tool_calls)}
- Final output: {result.final_answer[:500] if result.final_answer else 'None'}
- Errors: {result.errors}
Completion percentage (0-100):"""
response = self.judge_llm.chat([{"role": "user", "content": prompt}])
try:
return float(response.content.strip()) / 100
except ValueError:
return 0.5
2. Tool Use Correctness
Evaluate whether the agent used the right tools correctly. This goes beyond "did tools succeed?"—we measure:
- Selection: Did the agent pick appropriate tools for the task?
- Arguments: Were tool arguments valid and sensible?
- Efficiency: Did the agent make redundant or unnecessary calls?
- Coverage: Were all necessary tools used?
Why tool evaluation is often more informative than task success: A task can succeed despite poor tool use (lucky path), or fail despite excellent tool use (the task was impossible). Tool evaluation isolates agent behavior from task difficulty. If an agent makes 50 redundant API calls before succeeding, that's valuable signal even though task success is 100%. Conversely, if an agent uses tools perfectly but fails because the information wasn't in your database, that's not an agent problem—it's a data problem.
The taxonomy of tool failures: Not all tool failures are equal. Selection failures (choosing wrong tool) indicate reasoning problems. Argument failures (right tool, wrong inputs) indicate schema understanding problems. Redundant calls indicate state management problems—the agent forgot it already made this call. Missing calls indicate planning problems—the agent didn't realize a step was needed. Each failure type suggests different fixes.
The _detect_redundant_calls method catches a common failure mode: agents retrying the same call repeatedly (often from confusion or hallucinated errors). The time_window parameter groups calls close in time—if the same tool with same args is called within 5 seconds, it's likely a bug.
@dataclass
class ToolCall:
tool_name: str
arguments: dict
result: str
timestamp: float
success: bool
@dataclass
class ToolUseMetrics:
total_calls: int
successful_calls: int
failed_calls: int
redundant_calls: int
missing_calls: list[str]
incorrect_args: list[dict]
tool_selection_score: float
class ToolUseEvaluator:
"""Evaluate tool use patterns."""
def __init__(self, judge_llm=None):
self.judge_llm = judge_llm
def evaluate(
self,
task: str,
tool_calls: list[ToolCall],
expected_tools: Optional[list[str]] = None,
tool_schemas: Optional[dict] = None
) -> ToolUseMetrics:
"""Evaluate tool use for a task."""
total = len(tool_calls)
successful = sum(1 for tc in tool_calls if tc.success)
failed = total - successful
# Detect redundant calls (same tool, same args, close in time)
redundant = self._detect_redundant_calls(tool_calls)
# Check for missing expected tools
missing = []
if expected_tools:
used_tools = set(tc.tool_name for tc in tool_calls)
missing = [t for t in expected_tools if t not in used_tools]
# Validate arguments against schemas
incorrect_args = []
if tool_schemas:
incorrect_args = self._validate_arguments(tool_calls, tool_schemas)
# Overall tool selection score
selection_score = self._evaluate_tool_selection(task, tool_calls)
return ToolUseMetrics(
total_calls=total,
successful_calls=successful,
failed_calls=failed,
redundant_calls=redundant,
missing_calls=missing,
incorrect_args=incorrect_args,
tool_selection_score=selection_score
)
def _detect_redundant_calls(
self,
tool_calls: list[ToolCall],
time_window: float = 5.0
) -> int:
"""Detect redundant tool calls."""
redundant = 0
for i, call in enumerate(tool_calls[1:], 1):
for prev_call in tool_calls[:i]:
if (
call.tool_name == prev_call.tool_name and
call.arguments == prev_call.arguments and
call.timestamp - prev_call.timestamp < time_window
):
redundant += 1
break
return redundant
def _validate_arguments(
self,
tool_calls: list[ToolCall],
schemas: dict
) -> list[dict]:
"""Validate tool arguments against schemas."""
from jsonschema import validate, ValidationError
incorrect = []
for call in tool_calls:
if call.tool_name in schemas:
schema = schemas[call.tool_name]
try:
validate(instance=call.arguments, schema=schema)
except ValidationError as e:
incorrect.append({
"tool": call.tool_name,
"arguments": call.arguments,
"error": str(e)
})
return incorrect
def _evaluate_tool_selection(
self,
task: str,
tool_calls: list[ToolCall]
) -> float:
"""Evaluate whether correct tools were selected for the task."""
if not self.judge_llm or not tool_calls:
return 0.5
tool_sequence = " -> ".join(tc.tool_name for tc in tool_calls)
prompt = f"""Evaluate the tool selection for this task.
Task: {task}
Tools used (in order): {tool_sequence}
Rate from 0.0 to 1.0:
- Were the right tools selected?
- Were they used in a logical order?
- Were any unnecessary tools called?
- Were any necessary tools missing?
Score (just the number):"""
response = self.judge_llm.chat([{"role": "user", "content": prompt}])
try:
return float(response.content.strip())
except ValueError:
return 0.5
3. Reasoning Quality
Evaluate the quality of the agent's reasoning.
Why reasoning quality matters beyond outcomes: An agent that arrives at the correct answer through flawed reasoning is a liability—it will fail unpredictably on similar problems. Evaluating reasoning quality catches "right answer, wrong process" situations. This is especially important for high-stakes applications where you need to trust not just the answer, but the path that led there.
The four dimensions of reasoning quality: We evaluate coherence (does each step follow logically from the previous?), relevance (is the reasoning focused on the task or wandering?), efficiency (is this a reasonable path or needlessly convoluted?), and error recovery (when something goes wrong, does the agent adapt?). Different agents fail on different dimensions—some are coherent but inefficient, others efficient but easily derailed by errors.
Using LLM-as-judge for reasoning evaluation: Reasoning quality is inherently subjective—humans disagree about what constitutes "good" reasoning. LLM judges provide consistent (if imperfect) evaluation at scale. The key is well-designed rubrics: rather than asking "is this good reasoning?" (too vague), we ask specific questions about logical flow, contradiction detection, and goal relevance.
@dataclass
class ReasoningMetrics:
coherence_score: float
relevance_score: float
efficiency_score: float
error_recovery_score: float
overall_score: float
issues: list[str]
class ReasoningEvaluator:
"""Evaluate agent reasoning quality."""
def __init__(self, judge_llm):
self.judge_llm = judge_llm
def evaluate(
self,
task: str,
reasoning_trace: list[str],
tool_calls: list[ToolCall],
final_answer: str
) -> ReasoningMetrics:
"""Evaluate reasoning quality."""
# Evaluate different dimensions
coherence = self._evaluate_coherence(reasoning_trace)
relevance = self._evaluate_relevance(task, reasoning_trace)
efficiency = self._evaluate_efficiency(reasoning_trace, tool_calls)
recovery = self._evaluate_error_recovery(reasoning_trace, tool_calls)
# Identify specific issues
issues = self._identify_issues(task, reasoning_trace, tool_calls, final_answer)
overall = (coherence + relevance + efficiency + recovery) / 4
return ReasoningMetrics(
coherence_score=coherence,
relevance_score=relevance,
efficiency_score=efficiency,
error_recovery_score=recovery,
overall_score=overall,
issues=issues
)
def _evaluate_coherence(self, reasoning_trace: list[str]) -> float:
"""Evaluate logical coherence of reasoning."""
if len(reasoning_trace) < 2:
return 1.0
trace_text = "\n".join(reasoning_trace)
prompt = f"""Evaluate the logical coherence of this reasoning trace.
Reasoning:
{trace_text}
Consider:
- Does each step follow logically from the previous?
- Are there contradictions?
- Is the reasoning internally consistent?
Score from 0.0 (incoherent) to 1.0 (perfectly coherent):"""
response = self.judge_llm.chat([{"role": "user", "content": prompt}])
try:
return float(response.content.strip())
except ValueError:
return 0.5
def _evaluate_relevance(self, task: str, reasoning_trace: list[str]) -> float:
"""Evaluate relevance of reasoning to task."""
trace_text = "\n".join(reasoning_trace)
prompt = f"""Evaluate how relevant this reasoning is to the task.
Task: {task}
Reasoning:
{trace_text}
Consider:
- Does the reasoning address the task directly?
- Are there irrelevant tangents?
- Does it stay focused on the goal?
Score from 0.0 (irrelevant) to 1.0 (highly relevant):"""
response = self.judge_llm.chat([{"role": "user", "content": prompt}])
try:
return float(response.content.strip())
except ValueError:
return 0.5
def _evaluate_efficiency(
self,
reasoning_trace: list[str],
tool_calls: list[ToolCall]
) -> float:
"""Evaluate reasoning efficiency."""
# Heuristic: penalize excessive steps
step_count = len(reasoning_trace)
tool_count = len(tool_calls)
# Baseline expectations (adjust based on your domain)
expected_steps = 5
expected_tools = 3
step_efficiency = min(1.0, expected_steps / max(step_count, 1))
tool_efficiency = min(1.0, expected_tools / max(tool_count, 1))
return (step_efficiency + tool_efficiency) / 2
def _evaluate_error_recovery(
self,
reasoning_trace: list[str],
tool_calls: list[ToolCall]
) -> float:
"""Evaluate how well the agent recovered from errors."""
failed_calls = [tc for tc in tool_calls if not tc.success]
if not failed_calls:
return 1.0 # No errors to recover from
# Check if agent adapted after failures
trace_text = "\n".join(reasoning_trace)
prompt = f"""Evaluate how well the agent recovered from errors.
Failed tool calls: {len(failed_calls)}
Reasoning trace:
{trace_text}
Consider:
- Did the agent acknowledge errors?
- Did it try alternative approaches?
- Did it eventually succeed despite failures?
Score from 0.0 (no recovery) to 1.0 (excellent recovery):"""
response = self.judge_llm.chat([{"role": "user", "content": prompt}])
try:
return float(response.content.strip())
except ValueError:
return 0.5
def _identify_issues(
self,
task: str,
reasoning_trace: list[str],
tool_calls: list[ToolCall],
final_answer: str
) -> list[str]:
"""Identify specific issues in reasoning."""
issues = []
# Check for common issues
trace_text = " ".join(reasoning_trace).lower()
# Hallucination indicators
if "i don't have" in trace_text or "i cannot" in trace_text:
if any(tc.success for tc in tool_calls):
issues.append("Potential hallucination: claimed inability despite successful tool use")
# Loop indicators
if self._detect_reasoning_loop(reasoning_trace):
issues.append("Reasoning loop detected")
# Off-topic indicators
if self._detect_off_topic(task, reasoning_trace):
issues.append("Off-topic reasoning detected")
# Ignored error indicators
failed_calls = [tc for tc in tool_calls if not tc.success]
if failed_calls and "error" not in trace_text:
issues.append("Tool errors may have been ignored")
return issues
def _detect_reasoning_loop(self, reasoning_trace: list[str], window: int = 3) -> bool:
"""Detect if reasoning is stuck in a loop."""
if len(reasoning_trace) < window * 2:
return False
# Check for repeated patterns
for i in range(len(reasoning_trace) - window):
pattern = reasoning_trace[i:i+window]
for j in range(i + window, len(reasoning_trace) - window + 1):
if reasoning_trace[j:j+window] == pattern:
return True
return False
def _detect_off_topic(self, task: str, reasoning_trace: list[str]) -> bool:
"""Detect if reasoning went off-topic."""
# Simple heuristic: check if key task terms appear in reasoning
task_terms = set(task.lower().split())
reasoning_text = " ".join(reasoning_trace).lower()
common_terms = {"the", "a", "an", "is", "are", "to", "for", "and", "or"}
task_terms -= common_terms
if not task_terms:
return False
mentioned = sum(1 for term in task_terms if term in reasoning_text)
return mentioned / len(task_terms) < 0.3
4. Cost Efficiency
Track and evaluate resource usage:
Why cost metrics are critical for production agents: Agents can be expensive. A naive agent might retry failed operations indefinitely, rack up thousands of unnecessary tool calls, or use GPT-4 when GPT-4o-mini would suffice. Without cost monitoring, you'll get surprise bills. With it, you can identify inefficient behaviors and optimize.
The hidden costs beyond token usage: Tokens are obvious, but agents incur costs you might miss: (1) API calls to tools—each database query, web search, or external API has a cost, (2) Compute time—long-running agents tie up resources and hurt user experience, (3) Opportunity cost—slow agents mean fewer requests served per dollar of infrastructure.
Efficiency metrics require a baseline: Saying "this agent used 10K tokens" is meaningless without context. Is that good? The CostEvaluator compares against a baseline (previous version, competitor, or theoretical minimum) to compute efficiency scores. If your new agent uses 2x the tokens for the same success rate, that's a red flag—investigate before deploying.
The token breakdown insight: Not all tokens are equal. Input tokens (prompts, context) are typically 3-10x cheaper than output tokens (generation). An agent that uses 5K input + 1K output costs less than one using 2K input + 2K output, despite using more total tokens. Track them separately to optimize correctly.
Model pricing tiers as a first-class concern: The PRICING dictionary in the code reflects real-world costs as of January 2025. Notice the massive range: GPT-4o-mini is 16x cheaper than GPT-4o for input, 16x for output. For many tasks, the cheaper model works fine. Cost evaluation should drive model selection: measure success rate by model, choose the cheapest model that meets your quality threshold.
The cost-per-task metric is your north star: Total spend is less actionable than per-task costs. "0.15 per customer support query" does—you can compare to human support costs ($5-15 per query), see trends over time, and set cost budgets per task type.
@dataclass
class CostMetrics:
total_tokens: int
input_tokens: int
output_tokens: int
tool_call_count: int
api_calls: int
compute_time_ms: float
estimated_cost_usd: float
cost_per_task: float
efficiency_score: float
class CostEvaluator:
"""Evaluate agent cost efficiency."""
# Pricing per 1K tokens (example rates)
PRICING = {
"gpt-4o": {"input": 0.0025, "output": 0.01},
"gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
"claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
"claude-3-5-haiku": {"input": 0.0008, "output": 0.004},
}
def __init__(self, model: str = "gpt-4o"):
self.model = model
self.pricing = self.PRICING.get(model, {"input": 0.01, "output": 0.03})
def evaluate(
self,
task_results: list[TaskResult],
baseline_metrics: Optional[dict] = None
) -> CostMetrics:
"""Evaluate cost efficiency across tasks."""
total_input = sum(r.total_tokens * 0.3 for r in task_results) # Estimate
total_output = sum(r.total_tokens * 0.7 for r in task_results)
total_tokens = sum(r.total_tokens for r in task_results)
total_tool_calls = sum(len(r.tool_calls) for r in task_results)
total_time = sum(r.latency_ms for r in task_results)
estimated_cost = (
(total_input / 1000) * self.pricing["input"] +
(total_output / 1000) * self.pricing["output"]
)
cost_per_task = estimated_cost / len(task_results) if task_results else 0
# Calculate efficiency score
efficiency = self._calculate_efficiency(
task_results, baseline_metrics
)
return CostMetrics(
total_tokens=int(total_tokens),
input_tokens=int(total_input),
output_tokens=int(total_output),
tool_call_count=total_tool_calls,
api_calls=len(task_results),
compute_time_ms=total_time,
estimated_cost_usd=estimated_cost,
cost_per_task=cost_per_task,
efficiency_score=efficiency
)
def _calculate_efficiency(
self,
results: list[TaskResult],
baseline: Optional[dict]
) -> float:
"""Calculate efficiency relative to baseline."""
if not baseline:
return 0.5
# Compare to baseline metrics
avg_tokens = sum(r.total_tokens for r in results) / len(results)
avg_steps = sum(r.steps_taken for r in results) / len(results)
baseline_tokens = baseline.get("avg_tokens", avg_tokens)
baseline_steps = baseline.get("avg_steps", avg_steps)
token_efficiency = min(1.0, baseline_tokens / max(avg_tokens, 1))
step_efficiency = min(1.0, baseline_steps / max(avg_steps, 1))
return (token_efficiency + step_efficiency) / 2
Building Evaluation Datasets
Task Dataset Structure
Why evaluation datasets are the foundation of agent quality: You can't improve what you don't measure, and you can't measure without representative test cases. A good evaluation dataset is your agent's report card—it tells you what works, what breaks, and where to focus development effort. Without one, you're flying blind, deploying changes based on vibes instead of data.
The challenge of creating agent evaluation datasets: Unlike LLM benchmarks (where you just need question-answer pairs), agent evaluation requires specifying the task, the expected outcome, acceptable tool usage patterns, and success criteria that may be programmatic (output must be valid JSON) or semantic (answer must address the key points). You're not just evaluating text quality—you're evaluating task completion behavior.
Dataset structure principles:
-
Rich metadata - Each task needs more than just description + expected output. You need category (for filtering), difficulty (for stratified evaluation), expected tools (to catch tool selection bugs), timeout (to catch infinite loops), and tags (for slicing results).
-
Versioned and reproducible - Include version numbers. As your agent improves, earlier benchmarks become easier. Track version-over-version progress. Lock random seeds for sampling to ensure reproducibility.
-
Hierarchical organization - Organize by category and difficulty. This enables targeted evaluation: "How does my agent perform on data analysis vs code generation?" or "What's the success rate on hard tasks?"
-
Multiple success criteria types - Some tasks have exact answers (factual queries), some have semantic answers (summaries), some have programmatic validation (code that passes tests). Support all three via
expected_output(for exact/semantic) andsuccess_criteria(for programmatic).
The YAML format for agent test suites: YAML is human-editable and version-control friendly. Engineers can write tests, domain experts can review them, and you can diff changes easily. The structure maps directly to the Python dataclasses, making loading trivial.
Why filtering and sampling matter: You won't run your full benchmark on every change. For rapid iteration, you'll sample (e.g., 20 random tasks for quick checks). For debugging specific issues, you'll filter by category ("show me all failed database query tasks"). The dataset API must support these workflows efficiently.
Example YAML structure for reference:
name: "agent-eval-v1"
version: "1.0"
description: "Comprehensive agent evaluation dataset"
tasks:
- id: "sql-001"
description: "Generate SQL query for customer count by region"
category: "database"
difficulty: "easy"
expected_output: "SELECT region, COUNT(*) FROM customers GROUP BY region"
expected_tools: ["sql_generator", "sql_validator"]
timeout_seconds: 30
tags: ["sql", "aggregation"]
from dataclasses import dataclass, field
from typing import Optional, Any
import json
import yaml
@dataclass
class EvalTask:
id: str
description: str
category: str
difficulty: str # easy, medium, hard
expected_output: Optional[str] = None
expected_tools: list[str] = field(default_factory=list)
success_criteria: Optional[str] = None # Python expression
timeout_seconds: int = 120
tags: list[str] = field(default_factory=list)
metadata: dict = field(default_factory=dict)
@dataclass
class EvalDataset:
name: str
version: str
tasks: list[EvalTask]
categories: list[str]
description: str = ""
def filter_by_category(self, category: str) -> "EvalDataset":
"""Filter tasks by category."""
filtered = [t for t in self.tasks if t.category == category]
return EvalDataset(
name=f"{self.name}:{category}",
version=self.version,
tasks=filtered,
categories=[category],
description=self.description
)
def filter_by_difficulty(self, difficulty: str) -> "EvalDataset":
"""Filter tasks by difficulty."""
filtered = [t for t in self.tasks if t.difficulty == difficulty]
return EvalDataset(
name=f"{self.name}:{difficulty}",
version=self.version,
tasks=filtered,
categories=self.categories,
description=self.description
)
def sample(self, n: int, seed: int = 42) -> "EvalDataset":
"""Sample n tasks randomly."""
import random
random.seed(seed)
sampled = random.sample(self.tasks, min(n, len(self.tasks)))
return EvalDataset(
name=f"{self.name}:sample_{n}",
version=self.version,
tasks=sampled,
categories=self.categories,
description=self.description
)
@classmethod
def from_yaml(cls, path: str) -> "EvalDataset":
"""Load dataset from YAML file."""
with open(path) as f:
data = yaml.safe_load(f)
tasks = [EvalTask(**t) for t in data.get("tasks", [])]
return cls(
name=data.get("name", "unnamed"),
version=data.get("version", "1.0"),
tasks=tasks,
categories=list(set(t.category for t in tasks)),
description=data.get("description", "")
)
def to_yaml(self, path: str):
"""Save dataset to YAML file."""
data = {
"name": self.name,
"version": self.version,
"description": self.description,
"tasks": [
{
"id": t.id,
"description": t.description,
"category": t.category,
"difficulty": t.difficulty,
"expected_output": t.expected_output,
"expected_tools": t.expected_tools,
"success_criteria": t.success_criteria,
"timeout_seconds": t.timeout_seconds,
"tags": t.tags,
"metadata": t.metadata
}
for t in self.tasks
]
}
with open(path, "w") as f:
yaml.dump(data, f, default_flow_style=False)
Example Dataset
# eval_dataset.yaml
name: "agent-eval-v1"
version: "1.0.0"
description: "Comprehensive agent evaluation dataset"
tasks:
# Information Retrieval
- id: "info-001"
description: "Find the current population of Tokyo"
category: "information_retrieval"
difficulty: "easy"
expected_output: "approximately 14 million"
expected_tools: ["web_search"]
tags: ["search", "factual"]
- id: "info-002"
description: "Find the CEOs of the top 3 tech companies by market cap and their tenure"
category: "information_retrieval"
difficulty: "medium"
expected_tools: ["web_search"]
tags: ["search", "multi-step", "synthesis"]
# Data Analysis
- id: "data-001"
description: "Query the users table and find the top 5 users by order count"
category: "data_analysis"
difficulty: "easy"
expected_tools: ["query_database"]
success_criteria: "len(result.get('users', [])) == 5"
tags: ["sql", "aggregation"]
- id: "data-002"
description: "Analyze user growth over the past 12 months and identify seasonal patterns"
category: "data_analysis"
difficulty: "hard"
expected_tools: ["query_database", "calculate"]
tags: ["sql", "analysis", "multi-step"]
# Code Tasks
- id: "code-001"
description: "Read the main.py file and identify any potential security issues"
category: "code_analysis"
difficulty: "medium"
expected_tools: ["read_file"]
tags: ["security", "code-review"]
- id: "code-002"
description: "Find all TODO comments in the codebase and summarize them"
category: "code_analysis"
difficulty: "easy"
expected_tools: ["search_code", "read_file"]
tags: ["search", "summarization"]
# Multi-Tool Tasks
- id: "multi-001"
description: "Research competitor pricing, calculate the average, and store results"
category: "multi_tool"
difficulty: "hard"
expected_tools: ["web_search", "calculate", "write_file"]
timeout_seconds: 180
tags: ["research", "calculation", "storage"]
# Error Recovery
- id: "recovery-001"
description: "Query user data from the users table (note: table might be named 'customers')"
category: "error_recovery"
difficulty: "medium"
expected_tools: ["query_database"]
metadata:
expected_error: "table not found"
recovery_expected: true
tags: ["error-handling", "adaptation"]
Programmatic Dataset Generation
Why you can't manually create enough test cases: A good evaluation dataset needs hundreds of tasks to cover the agent's capability space. Writing them by hand is tedious and introduces bias—you'll test what you think of, not what users actually do. Programmatic generation scales test creation and finds edge cases you'd never manually write.
The three types of programmatic generation:
-
Variations from templates - Take a base task ("Find population of Tokyo") and generate variants ("Find population of Paris/London/Berlin..."). Same capability tested, different surface forms. Catches agents that memorize rather than generalize.
-
Adversarial generation - Deliberately create tasks designed to break your agent. Common adversarial patterns: tasks with ambiguous phrasing, missing information, conflicting requirements, or red herrings. If you only test "happy path" tasks, you'll miss failure modes.
-
Domain-specific synthesis - Use domain knowledge to generate realistic tasks. For SQL agents: enumerate all SQL operations (SELECT, JOIN, GROUP BY, etc.) × different data types × different table schemas. For web search agents: categories × information types × temporal requirements.
Why LLM-generated test cases need human review: LLMs can generate test cases faster than humans, but quality varies. Common LLM generation failures: duplicate tasks (same semantic content, different wording), trivial variations (changing one number), invalid expected outputs (the LLM hallucinates), or unclear success criteria. Human-in-the-loop: LLM generates candidates → human reviews/edits → approved tests enter dataset.
The variation generation strategy: The generate_variations method creates semantically equivalent but syntactically different tasks. Example: "Find restaurants in New York" → variants: "Show me places to eat in NYC", "What are the top restaurants in Manhattan", "I'm looking for dining options in New York City". This tests robustness to query phrasing—critical for production where users don't follow templates.
Tagging generated tests: Notice the ["generated"] tag added to variations. This enables analysis: do generated tasks have different passing rates than hand-written ones? If generated tasks are systematically harder, that signals a distribution shift problem—your hand-written tests aren't representative of production.
class DatasetGenerator:
"""Generate evaluation tasks programmatically."""
def __init__(self, llm):
self.llm = llm
def generate_variations(
self,
base_task: EvalTask,
num_variations: int = 5
) -> list[EvalTask]:
"""Generate variations of a base task."""
prompt = f"""Generate {num_variations} variations of this task.
Original task: {base_task.description}
Category: {base_task.category}
Difficulty: {base_task.difficulty}
Create variations that:
1. Test the same capability
2. Have similar difficulty
3. Use different specific values/entities
4. Are clearly distinct from each other
Format each as:
TASK: [description]
---"""
response = self.llm.chat([{"role": "user", "content": prompt}])
variations = []
for i, task_text in enumerate(response.content.split("---")):
if "TASK:" in task_text:
description = task_text.split("TASK:")[1].strip()
variations.append(EvalTask(
id=f"{base_task.id}-var{i+1}",
description=description,
category=base_task.category,
difficulty=base_task.difficulty,
expected_tools=base_task.expected_tools,
tags=base_task.tags + ["generated"]
))
return variations
def generate_adversarial_tasks(
self,
base_task: EvalTask,
num_tasks: int = 3
) -> list[EvalTask]:
"""Generate adversarial versions of a task."""
prompt = f"""Generate {num_tasks} adversarial versions of this task that might trip up an AI agent.
Original task: {base_task.description}
Create versions that:
1. Have ambiguous requirements
2. Include misleading context
3. Require careful interpretation
4. Test edge cases
Format each as:
TASK: [description]
TRAP: [what makes this adversarial]
---"""
response = self.llm.chat([{"role": "user", "content": prompt}])
adversarial = []
for i, section in enumerate(response.content.split("---")):
if "TASK:" in section and "TRAP:" in section:
task_text = section.split("TASK:")[1].split("TRAP:")[0].strip()
trap_text = section.split("TRAP:")[1].strip()
adversarial.append(EvalTask(
id=f"{base_task.id}-adv{i+1}",
description=task_text,
category=base_task.category,
difficulty="hard",
expected_tools=base_task.expected_tools,
tags=base_task.tags + ["adversarial"],
metadata={"trap": trap_text}
))
return adversarial
Trajectory Analysis
Understanding how an agent arrived at its answer is as important as the answer itself.
Trajectory Capture
from dataclasses import dataclass, field
from typing import Optional, Any
from datetime import datetime
import json
@dataclass
class TrajectoryStep:
step_number: int
timestamp: datetime
step_type: str # "reasoning", "tool_call", "tool_result", "answer"
content: str
metadata: dict = field(default_factory=dict)
@dataclass
class Trajectory:
task_id: str
task_description: str
steps: list[TrajectoryStep] = field(default_factory=list)
final_answer: Optional[str] = None
outcome: Optional[TaskOutcome] = None
total_tokens: int = 0
start_time: Optional[datetime] = None
end_time: Optional[datetime] = None
def add_reasoning(self, content: str, metadata: dict = None):
"""Add a reasoning step."""
self.steps.append(TrajectoryStep(
step_number=len(self.steps) + 1,
timestamp=datetime.now(),
step_type="reasoning",
content=content,
metadata=metadata or {}
))
def add_tool_call(self, tool_name: str, arguments: dict, metadata: dict = None):
"""Add a tool call step."""
self.steps.append(TrajectoryStep(
step_number=len(self.steps) + 1,
timestamp=datetime.now(),
step_type="tool_call",
content=json.dumps({"tool": tool_name, "arguments": arguments}),
metadata=metadata or {}
))
def add_tool_result(self, tool_name: str, result: str, success: bool, metadata: dict = None):
"""Add a tool result step."""
self.steps.append(TrajectoryStep(
step_number=len(self.steps) + 1,
timestamp=datetime.now(),
step_type="tool_result",
content=json.dumps({"tool": tool_name, "result": result[:1000], "success": success}),
metadata=metadata or {}
))
def set_answer(self, answer: str, outcome: TaskOutcome):
"""Set the final answer."""
self.final_answer = answer
self.outcome = outcome
self.end_time = datetime.now()
self.steps.append(TrajectoryStep(
step_number=len(self.steps) + 1,
timestamp=datetime.now(),
step_type="answer",
content=answer
))
def to_dict(self) -> dict:
"""Convert to dictionary for serialization."""
return {
"task_id": self.task_id,
"task_description": self.task_description,
"steps": [
{
"step_number": s.step_number,
"timestamp": s.timestamp.isoformat(),
"step_type": s.step_type,
"content": s.content,
"metadata": s.metadata
}
for s in self.steps
],
"final_answer": self.final_answer,
"outcome": self.outcome.value if self.outcome else None,
"total_tokens": self.total_tokens,
"duration_ms": (self.end_time - self.start_time).total_seconds() * 1000 if self.end_time and self.start_time else None
}
def get_tool_calls(self) -> list[dict]:
"""Extract tool calls from trajectory."""
return [
json.loads(s.content)
for s in self.steps
if s.step_type == "tool_call"
]
def get_reasoning_steps(self) -> list[str]:
"""Extract reasoning steps."""
return [
s.content
for s in self.steps
if s.step_type == "reasoning"
]
Trajectory Analyzer
class TrajectoryAnalyzer:
"""Analyze agent trajectories for patterns and issues."""
def __init__(self, judge_llm=None):
self.judge_llm = judge_llm
def analyze(self, trajectory: Trajectory) -> dict:
"""Comprehensive trajectory analysis."""
analysis = {
"summary": self._generate_summary(trajectory),
"patterns": self._identify_patterns(trajectory),
"issues": self._identify_issues(trajectory),
"metrics": self._compute_metrics(trajectory),
"recommendations": self._generate_recommendations(trajectory)
}
return analysis
def _generate_summary(self, trajectory: Trajectory) -> dict:
"""Generate trajectory summary."""
tool_calls = trajectory.get_tool_calls()
reasoning = trajectory.get_reasoning_steps()
return {
"total_steps": len(trajectory.steps),
"reasoning_steps": len(reasoning),
"tool_calls": len(tool_calls),
"unique_tools": list(set(tc["tool"] for tc in tool_calls)),
"outcome": trajectory.outcome.value if trajectory.outcome else "unknown",
"duration_ms": (trajectory.end_time - trajectory.start_time).total_seconds() * 1000 if trajectory.end_time and trajectory.start_time else None
}
def _identify_patterns(self, trajectory: Trajectory) -> list[dict]:
"""Identify behavioral patterns in trajectory."""
patterns = []
tool_calls = trajectory.get_tool_calls()
# Pattern: Search-then-verify
if len(tool_calls) >= 2:
for i in range(len(tool_calls) - 1):
if tool_calls[i]["tool"] == "web_search" and tool_calls[i+1]["tool"] == "web_search":
patterns.append({
"type": "verification_search",
"description": "Agent performed follow-up search to verify information"
})
# Pattern: Error retry
tool_results = [s for s in trajectory.steps if s.step_type == "tool_result"]
for i, result in enumerate(tool_results[:-1]):
result_data = json.loads(result.content)
if not result_data.get("success"):
next_result = json.loads(tool_results[i+1].content)
if result_data.get("tool") == next_result.get("tool"):
patterns.append({
"type": "error_retry",
"description": f"Agent retried {result_data['tool']} after failure"
})
# Pattern: Tool chain
if len(tool_calls) >= 3:
tool_sequence = [tc["tool"] for tc in tool_calls]
patterns.append({
"type": "tool_chain",
"description": f"Tool sequence: {' -> '.join(tool_sequence)}"
})
return patterns
def _identify_issues(self, trajectory: Trajectory) -> list[dict]:
"""Identify issues in trajectory."""
issues = []
tool_calls = trajectory.get_tool_calls()
reasoning = trajectory.get_reasoning_steps()
# Issue: Excessive steps
if len(trajectory.steps) > 20:
issues.append({
"type": "excessive_steps",
"severity": "warning",
"description": f"Trajectory has {len(trajectory.steps)} steps, which may indicate inefficiency"
})
# Issue: Repeated tool calls
tool_signatures = [json.dumps(tc, sort_keys=True) for tc in tool_calls]
repeated = len(tool_signatures) - len(set(tool_signatures))
if repeated > 0:
issues.append({
"type": "repeated_calls",
"severity": "warning",
"description": f"Found {repeated} repeated tool calls with identical arguments"
})
# Issue: No tool use
if len(tool_calls) == 0 and trajectory.outcome != TaskOutcome.SUCCESS:
issues.append({
"type": "no_tools_used",
"severity": "error",
"description": "Agent failed without attempting any tool use"
})
# Issue: Ignored errors
tool_results = [s for s in trajectory.steps if s.step_type == "tool_result"]
failed_results = [json.loads(r.content) for r in tool_results if not json.loads(r.content).get("success")]
if failed_results:
reasoning_text = " ".join(reasoning).lower()
if "error" not in reasoning_text and "fail" not in reasoning_text:
issues.append({
"type": "ignored_errors",
"severity": "error",
"description": f"Agent ignored {len(failed_results)} tool failures in reasoning"
})
return issues
def _compute_metrics(self, trajectory: Trajectory) -> dict:
"""Compute trajectory metrics."""
tool_calls = trajectory.get_tool_calls()
tool_results = [s for s in trajectory.steps if s.step_type == "tool_result"]
successful_calls = sum(
1 for r in tool_results
if json.loads(r.content).get("success")
)
return {
"steps_per_tool_call": len(trajectory.steps) / max(len(tool_calls), 1),
"tool_success_rate": successful_calls / max(len(tool_results), 1),
"reasoning_density": len(trajectory.get_reasoning_steps()) / max(len(trajectory.steps), 1),
"tokens_per_step": trajectory.total_tokens / max(len(trajectory.steps), 1)
}
def _generate_recommendations(self, trajectory: Trajectory) -> list[str]:
"""Generate improvement recommendations."""
recommendations = []
issues = self._identify_issues(trajectory)
metrics = self._compute_metrics(trajectory)
if any(i["type"] == "excessive_steps" for i in issues):
recommendations.append("Consider adding planning phase to reduce step count")
if any(i["type"] == "repeated_calls" for i in issues):
recommendations.append("Implement caching or state tracking to avoid redundant tool calls")
if metrics["tool_success_rate"] < 0.7:
recommendations.append("Improve tool error handling and retry logic")
if metrics["reasoning_density"] < 0.2:
recommendations.append("Agent may benefit from more explicit reasoning steps")
return recommendations
def compare_trajectories(
self,
trajectory_a: Trajectory,
trajectory_b: Trajectory
) -> dict:
"""Compare two trajectories for the same task."""
metrics_a = self._compute_metrics(trajectory_a)
metrics_b = self._compute_metrics(trajectory_b)
return {
"step_count": {
"a": len(trajectory_a.steps),
"b": len(trajectory_b.steps),
"winner": "a" if len(trajectory_a.steps) < len(trajectory_b.steps) else "b"
},
"tool_success_rate": {
"a": metrics_a["tool_success_rate"],
"b": metrics_b["tool_success_rate"],
"winner": "a" if metrics_a["tool_success_rate"] > metrics_b["tool_success_rate"] else "b"
},
"outcome": {
"a": trajectory_a.outcome.value if trajectory_a.outcome else "unknown",
"b": trajectory_b.outcome.value if trajectory_b.outcome else "unknown"
}
}
Sandboxed Execution
Safe evaluation requires isolating agent actions from production systems.
Docker-Based Sandbox
import docker
import tempfile
import os
from typing import Optional
class DockerSandbox:
"""Docker-based sandbox for safe agent evaluation."""
def __init__(
self,
image: str = "python:3.11-slim",
memory_limit: str = "512m",
cpu_limit: float = 1.0,
network_mode: str = "none",
timeout: int = 120
):
self.image = image
self.memory_limit = memory_limit
self.cpu_limit = cpu_limit
self.network_mode = network_mode
self.timeout = timeout
self.client = docker.from_env()
def run_agent(
self,
agent_code: str,
task: str,
tools_config: dict,
environment: Optional[dict] = None
) -> dict:
"""Run agent in sandbox."""
with tempfile.TemporaryDirectory() as tmpdir:
# Write agent code
agent_path = os.path.join(tmpdir, "agent.py")
with open(agent_path, "w") as f:
f.write(agent_code)
# Write task
task_path = os.path.join(tmpdir, "task.json")
with open(task_path, "w") as f:
json.dump({"task": task, "tools": tools_config}, f)
# Write runner script
runner_path = os.path.join(tmpdir, "run.py")
with open(runner_path, "w") as f:
f.write(self._generate_runner())
try:
container = self.client.containers.run(
self.image,
command=["python", "/workspace/run.py"],
volumes={tmpdir: {"bind": "/workspace", "mode": "rw"}},
mem_limit=self.memory_limit,
cpu_quota=int(self.cpu_limit * 100000),
network_mode=self.network_mode,
environment=environment or {},
detach=True,
remove=False
)
# Wait for completion with timeout
result = container.wait(timeout=self.timeout)
# Get logs
logs = container.logs().decode("utf-8")
# Read output
output_path = os.path.join(tmpdir, "output.json")
if os.path.exists(output_path):
with open(output_path) as f:
output = json.load(f)
else:
output = {"error": "No output file generated"}
return {
"exit_code": result["StatusCode"],
"logs": logs,
"output": output
}
except docker.errors.ContainerError as e:
return {"error": f"Container error: {e}"}
except Exception as e:
return {"error": f"Sandbox error: {e}"}
finally:
try:
container.remove(force=True)
except:
pass
def _generate_runner(self) -> str:
"""Generate the runner script."""
return '''
import json
import sys
import traceback
def main():
# Load task
with open("/workspace/task.json") as f:
config = json.load(f)
task = config["task"]
tools_config = config["tools"]
# Import agent
sys.path.insert(0, "/workspace")
from agent import Agent
# Create mock tools
tools = create_mock_tools(tools_config)
# Run agent
agent = Agent(tools=tools)
try:
result = agent.run(task)
output = {
"success": True,
"result": result,
"trajectory": agent.get_trajectory() if hasattr(agent, "get_trajectory") else []
}
except Exception as e:
output = {
"success": False,
"error": str(e),
"traceback": traceback.format_exc()
}
# Write output
with open("/workspace/output.json", "w") as f:
json.dump(output, f)
def create_mock_tools(config):
"""Create mock tools based on config."""
tools = {}
for name, spec in config.items():
tools[name] = MockTool(name, spec)
return tools
class MockTool:
def __init__(self, name, spec):
self.name = name
self.spec = spec
def execute(self, **kwargs):
# Return mock response or error based on spec
if "mock_response" in self.spec:
return self.spec["mock_response"]
return {"mock": True, "tool": self.name, "args": kwargs}
if __name__ == "__main__":
main()
'''
def create_test_environment(
self,
database_fixture: Optional[str] = None,
file_fixtures: Optional[dict] = None
) -> str:
"""Create a test environment with fixtures."""
env_id = f"sandbox-{os.urandom(4).hex()}"
# Create network for this environment
network = self.client.networks.create(env_id, driver="bridge")
# Optionally spin up database
if database_fixture:
db_container = self.client.containers.run(
"postgres:15",
name=f"{env_id}-db",
environment={
"POSTGRES_PASSWORD": "test",
"POSTGRES_DB": "testdb"
},
network=env_id,
detach=True
)
# Load fixture
# ... load SQL fixture ...
return env_id
def cleanup_environment(self, env_id: str):
"""Clean up test environment."""
# Remove containers
for container in self.client.containers.list(filters={"name": env_id}):
container.remove(force=True)
# Remove network
try:
network = self.client.networks.get(env_id)
network.remove()
except:
pass
Mock Tool System
from dataclasses import dataclass
from typing import Callable, Any, Optional
import re
@dataclass
class MockResponse:
content: Any
success: bool = True
latency_ms: float = 100
class MockToolSystem:
"""System for creating mock tools for evaluation."""
def __init__(self):
self.tools: dict[str, "MockTool"] = {}
self.call_log: list[dict] = []
def register_tool(
self,
name: str,
response_generator: Callable[[dict], MockResponse],
schema: Optional[dict] = None
):
"""Register a mock tool."""
self.tools[name] = MockTool(
name=name,
response_generator=response_generator,
schema=schema
)
def call_tool(self, name: str, arguments: dict) -> dict:
"""Call a mock tool."""
if name not in self.tools:
return {"error": f"Unknown tool: {name}"}
tool = self.tools[name]
response = tool.call(arguments)
self.call_log.append({
"tool": name,
"arguments": arguments,
"response": response,
"timestamp": datetime.now().isoformat()
})
return response
def get_call_log(self) -> list[dict]:
"""Get log of all tool calls."""
return self.call_log
def reset(self):
"""Reset call log."""
self.call_log = []
class MockTool:
"""A mock tool for evaluation."""
def __init__(
self,
name: str,
response_generator: Callable[[dict], MockResponse],
schema: Optional[dict] = None
):
self.name = name
self.response_generator = response_generator
self.schema = schema
def call(self, arguments: dict) -> dict:
"""Call the mock tool."""
response = self.response_generator(arguments)
return {
"success": response.success,
"content": response.content,
"latency_ms": response.latency_ms
}
# Pre-built mock generators
def create_web_search_mock(knowledge_base: dict) -> Callable:
"""Create a mock web search that uses a knowledge base."""
def generator(args: dict) -> MockResponse:
query = args.get("query", "").lower()
# Search knowledge base
results = []
for topic, info in knowledge_base.items():
if any(term in topic.lower() for term in query.split()):
results.append({
"title": topic,
"snippet": info[:200],
"url": f"https://example.com/{topic.replace(' ', '-')}"
})
if results:
return MockResponse(content={"results": results[:5]})
else:
return MockResponse(content={"results": [], "message": "No results found"})
return generator
def create_database_mock(tables: dict) -> Callable:
"""Create a mock database that responds to SQL queries."""
def generator(args: dict) -> MockResponse:
query = args.get("query", "").upper()
# Very simple SQL parsing
if "SELECT" in query:
# Extract table name
match = re.search(r"FROM\s+(\w+)", query, re.IGNORECASE)
if match:
table = match.group(1).lower()
if table in tables:
return MockResponse(content={"rows": tables[table][:100]})
else:
return MockResponse(
content={"error": f"Table '{table}' not found"},
success=False
)
return MockResponse(
content={"error": "Only SELECT queries supported in mock"},
success=False
)
return generator
# Usage example
mock_system = MockToolSystem()
mock_system.register_tool(
"web_search",
create_web_search_mock({
"Tokyo population": "Tokyo has a population of approximately 14 million people in the city proper.",
"Python programming": "Python is a high-level programming language known for its simplicity.",
})
)
mock_system.register_tool(
"query_database",
create_database_mock({
"users": [
{"id": 1, "name": "Alice", "email": "alice@example.com"},
{"id": 2, "name": "Bob", "email": "bob@example.com"},
],
"orders": [
{"id": 1, "user_id": 1, "total": 99.99},
{"id": 2, "user_id": 2, "total": 149.99},
]
})
)
Evaluation Pipeline
Complete Evaluation Runner
import asyncio
from dataclasses import dataclass
from typing import Optional
import json
from datetime import datetime
@dataclass
class EvaluationConfig:
dataset: EvalDataset
agent_factory: Callable[[], Any] # Factory to create agent instances
mock_tools: MockToolSystem
judge_llm: Any
max_concurrent: int = 5
timeout_seconds: int = 120
save_trajectories: bool = True
output_dir: str = "./eval_results"
@dataclass
class EvaluationRun:
run_id: str
config_name: str
start_time: datetime
end_time: Optional[datetime] = None
results: list[dict] = None
summary: dict = None
class EvaluationPipeline:
"""Complete evaluation pipeline for agents."""
def __init__(self, config: EvaluationConfig):
self.config = config
self.task_evaluator = TaskEvaluator(config.judge_llm)
self.tool_evaluator = ToolUseEvaluator(config.judge_llm)
self.reasoning_evaluator = ReasoningEvaluator(config.judge_llm)
self.trajectory_analyzer = TrajectoryAnalyzer(config.judge_llm)
async def run(self) -> EvaluationRun:
"""Run complete evaluation."""
run_id = f"eval-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
run = EvaluationRun(
run_id=run_id,
config_name=self.config.dataset.name,
start_time=datetime.now(),
results=[]
)
# Create output directory
os.makedirs(self.config.output_dir, exist_ok=True)
# Run evaluations with concurrency limit
semaphore = asyncio.Semaphore(self.config.max_concurrent)
async def evaluate_task(task: EvalTask):
async with semaphore:
return await self._evaluate_single_task(task)
tasks = [evaluate_task(task) for task in self.config.dataset.tasks]
results = await asyncio.gather(*tasks, return_exceptions=True)
# Process results
for task, result in zip(self.config.dataset.tasks, results):
if isinstance(result, Exception):
run.results.append({
"task_id": task.id,
"error": str(result),
"outcome": "error"
})
else:
run.results.append(result)
run.end_time = datetime.now()
run.summary = self._compute_summary(run.results)
# Save results
self._save_results(run)
return run
async def _evaluate_single_task(self, task: EvalTask) -> dict:
"""Evaluate a single task."""
# Create fresh agent instance
agent = self.config.agent_factory()
# Reset mock tools
self.config.mock_tools.reset()
# Create trajectory tracker
trajectory = Trajectory(
task_id=task.id,
task_description=task.description,
start_time=datetime.now()
)
# Run agent with timeout
try:
async with asyncio.timeout(task.timeout_seconds):
result = await self._run_agent_with_tracking(
agent, task, trajectory
)
except asyncio.TimeoutError:
trajectory.set_answer("", TaskOutcome.TIMEOUT)
result = TaskResult(
task_id=task.id,
outcome=TaskOutcome.TIMEOUT,
final_answer=None,
expected_answer=task.expected_output,
steps_taken=len(trajectory.steps),
total_tokens=trajectory.total_tokens,
latency_ms=(datetime.now() - trajectory.start_time).total_seconds() * 1000,
tool_calls=trajectory.get_tool_calls(),
errors=["Task timed out"]
)
# Evaluate result
task_eval = self.task_evaluator.evaluate(
task.description,
result,
ground_truth=task.expected_output,
success_criteria=self._parse_success_criteria(task.success_criteria)
)
# Evaluate tool use
tool_eval = self.tool_evaluator.evaluate(
task.description,
[ToolCall(
tool_name=tc["tool"],
arguments=tc["arguments"],
result="",
timestamp=0,
success=True
) for tc in trajectory.get_tool_calls()],
expected_tools=task.expected_tools
)
# Evaluate reasoning
reasoning_eval = self.reasoning_evaluator.evaluate(
task.description,
trajectory.get_reasoning_steps(),
[ToolCall(
tool_name=tc["tool"],
arguments=tc["arguments"],
result="",
timestamp=0,
success=True
) for tc in trajectory.get_tool_calls()],
result.final_answer or ""
)
# Analyze trajectory
trajectory_analysis = self.trajectory_analyzer.analyze(trajectory)
return {
"task_id": task.id,
"task_description": task.description,
"category": task.category,
"difficulty": task.difficulty,
"outcome": result.outcome.value,
"task_evaluation": task_eval,
"tool_evaluation": {
"total_calls": tool_eval.total_calls,
"successful_calls": tool_eval.successful_calls,
"selection_score": tool_eval.tool_selection_score
},
"reasoning_evaluation": {
"coherence": reasoning_eval.coherence_score,
"relevance": reasoning_eval.relevance_score,
"efficiency": reasoning_eval.efficiency_score,
"overall": reasoning_eval.overall_score
},
"trajectory_analysis": trajectory_analysis,
"trajectory": trajectory.to_dict() if self.config.save_trajectories else None
}
async def _run_agent_with_tracking(
self,
agent,
task: EvalTask,
trajectory: Trajectory
) -> TaskResult:
"""Run agent and track trajectory."""
# Hook into agent to capture trajectory
# This depends on your agent implementation
try:
final_answer = await agent.run(task.description)
outcome = TaskOutcome.SUCCESS
except Exception as e:
final_answer = str(e)
outcome = TaskOutcome.ERROR
trajectory.steps.append(TrajectoryStep(
step_number=len(trajectory.steps) + 1,
timestamp=datetime.now(),
step_type="error",
content=str(e)
))
trajectory.set_answer(final_answer, outcome)
return TaskResult(
task_id=task.id,
outcome=outcome,
final_answer=final_answer,
expected_answer=task.expected_output,
steps_taken=len(trajectory.steps),
total_tokens=trajectory.total_tokens,
latency_ms=(trajectory.end_time - trajectory.start_time).total_seconds() * 1000,
tool_calls=trajectory.get_tool_calls(),
errors=[]
)
def _parse_success_criteria(self, criteria: Optional[str]) -> Optional[Callable]:
"""Parse success criteria string into function."""
if not criteria:
return None
def evaluator(result: str) -> bool:
try:
# Safe evaluation of criteria
return eval(criteria, {"result": result, "len": len, "str": str})
except:
return False
return evaluator
def _compute_summary(self, results: list[dict]) -> dict:
"""Compute summary statistics."""
total = len(results)
outcomes = [r.get("outcome", "error") for r in results]
by_category = {}
by_difficulty = {}
for r in results:
cat = r.get("category", "unknown")
diff = r.get("difficulty", "unknown")
if cat not in by_category:
by_category[cat] = {"total": 0, "success": 0}
by_category[cat]["total"] += 1
if r.get("outcome") == "success":
by_category[cat]["success"] += 1
if diff not in by_difficulty:
by_difficulty[diff] = {"total": 0, "success": 0}
by_difficulty[diff]["total"] += 1
if r.get("outcome") == "success":
by_difficulty[diff]["success"] += 1
return {
"total_tasks": total,
"success_count": outcomes.count("success"),
"failure_count": outcomes.count("failure"),
"error_count": outcomes.count("error"),
"timeout_count": outcomes.count("timeout"),
"success_rate": outcomes.count("success") / total if total > 0 else 0,
"by_category": {
k: {"success_rate": v["success"] / v["total"] if v["total"] > 0 else 0, **v}
for k, v in by_category.items()
},
"by_difficulty": {
k: {"success_rate": v["success"] / v["total"] if v["total"] > 0 else 0, **v}
for k, v in by_difficulty.items()
},
"avg_reasoning_score": sum(
r.get("reasoning_evaluation", {}).get("overall", 0)
for r in results
) / total if total > 0 else 0,
"avg_tool_selection_score": sum(
r.get("tool_evaluation", {}).get("selection_score", 0)
for r in results
) / total if total > 0 else 0
}
def _save_results(self, run: EvaluationRun):
"""Save evaluation results."""
output_path = os.path.join(
self.config.output_dir,
f"{run.run_id}.json"
)
with open(output_path, "w") as f:
json.dump({
"run_id": run.run_id,
"config_name": run.config_name,
"start_time": run.start_time.isoformat(),
"end_time": run.end_time.isoformat() if run.end_time else None,
"summary": run.summary,
"results": run.results
}, f, indent=2)
print(f"Results saved to {output_path}")
Production Monitoring
Real-Time Agent Monitoring
from dataclasses import dataclass
from typing import Optional
import time
from collections import deque
import threading
@dataclass
class AgentMetrics:
timestamp: float
task_id: str
outcome: str
latency_ms: float
tokens: int
tool_calls: int
errors: int
class AgentMonitor:
"""Real-time monitoring for production agents."""
def __init__(
self,
window_size: int = 1000,
alert_threshold_error_rate: float = 0.1,
alert_threshold_latency_ms: float = 5000
):
self.metrics: deque[AgentMetrics] = deque(maxlen=window_size)
self.alert_callbacks: list[Callable] = []
self.error_threshold = alert_threshold_error_rate
self.latency_threshold = alert_threshold_latency_ms
self._lock = threading.Lock()
def record(self, metrics: AgentMetrics):
"""Record agent metrics."""
with self._lock:
self.metrics.append(metrics)
# Check for alerts
self._check_alerts()
def on_alert(self, callback: Callable[[str, dict], None]):
"""Register alert callback."""
self.alert_callbacks.append(callback)
def _check_alerts(self):
"""Check if any alert thresholds are exceeded."""
if len(self.metrics) < 10:
return
recent = list(self.metrics)[-100:]
# Error rate alert
error_count = sum(1 for m in recent if m.outcome == "error")
error_rate = error_count / len(recent)
if error_rate > self.error_threshold:
self._trigger_alert("high_error_rate", {
"error_rate": error_rate,
"threshold": self.error_threshold,
"sample_size": len(recent)
})
# Latency alert
avg_latency = sum(m.latency_ms for m in recent) / len(recent)
if avg_latency > self.latency_threshold:
self._trigger_alert("high_latency", {
"avg_latency_ms": avg_latency,
"threshold": self.latency_threshold,
"sample_size": len(recent)
})
def _trigger_alert(self, alert_type: str, details: dict):
"""Trigger alert callbacks."""
for callback in self.alert_callbacks:
try:
callback(alert_type, details)
except Exception as e:
print(f"Alert callback error: {e}")
def get_dashboard_data(self) -> dict:
"""Get data for monitoring dashboard."""
with self._lock:
recent = list(self.metrics)
if not recent:
return {"status": "no_data"}
# Compute statistics
outcomes = [m.outcome for m in recent]
latencies = [m.latency_ms for m in recent]
tokens = [m.tokens for m in recent]
return {
"total_requests": len(recent),
"success_rate": outcomes.count("success") / len(outcomes),
"error_rate": outcomes.count("error") / len(outcomes),
"timeout_rate": outcomes.count("timeout") / len(outcomes),
"latency": {
"p50": sorted(latencies)[len(latencies) // 2],
"p90": sorted(latencies)[int(len(latencies) * 0.9)],
"p99": sorted(latencies)[int(len(latencies) * 0.99)],
"avg": sum(latencies) / len(latencies)
},
"tokens": {
"avg": sum(tokens) / len(tokens),
"total": sum(tokens)
},
"time_range": {
"start": recent[0].timestamp,
"end": recent[-1].timestamp
}
}
# Prometheus metrics integration
try:
from prometheus_client import Counter, Histogram, Gauge
AGENT_REQUESTS = Counter(
"agent_requests_total",
"Total agent requests",
["outcome"]
)
AGENT_LATENCY = Histogram(
"agent_latency_seconds",
"Agent request latency",
buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
)
AGENT_TOKENS = Counter(
"agent_tokens_total",
"Total tokens used"
)
AGENT_TOOL_CALLS = Counter(
"agent_tool_calls_total",
"Total tool calls",
["tool_name"]
)
class PrometheusAgentMonitor(AgentMonitor):
"""Agent monitor with Prometheus metrics."""
def record(self, metrics: AgentMetrics):
super().record(metrics)
# Update Prometheus metrics
AGENT_REQUESTS.labels(outcome=metrics.outcome).inc()
AGENT_LATENCY.observe(metrics.latency_ms / 1000)
AGENT_TOKENS.inc(metrics.tokens)
except ImportError:
PrometheusAgentMonitor = AgentMonitor
A/B Testing Framework
import random
from dataclasses import dataclass
from typing import Optional
import hashlib
@dataclass
class Variant:
name: str
weight: float
agent_factory: Callable
@dataclass
class ABTestConfig:
test_id: str
variants: list[Variant]
start_time: datetime
end_time: Optional[datetime] = None
class ABTestingFramework:
"""A/B testing framework for agents."""
def __init__(self):
self.active_tests: dict[str, ABTestConfig] = {}
self.results: dict[str, list[dict]] = {}
def create_test(
self,
test_id: str,
variants: list[Variant],
duration_hours: Optional[int] = None
):
"""Create a new A/B test."""
# Normalize weights
total_weight = sum(v.weight for v in variants)
for v in variants:
v.weight /= total_weight
end_time = None
if duration_hours:
end_time = datetime.now() + timedelta(hours=duration_hours)
self.active_tests[test_id] = ABTestConfig(
test_id=test_id,
variants=variants,
start_time=datetime.now(),
end_time=end_time
)
self.results[test_id] = []
def get_variant(self, test_id: str, user_id: str) -> Variant:
"""Get variant for a user (deterministic assignment)."""
if test_id not in self.active_tests:
raise ValueError(f"Unknown test: {test_id}")
test = self.active_tests[test_id]
# Deterministic assignment based on user_id
hash_input = f"{test_id}:{user_id}"
hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
normalized = (hash_value % 10000) / 10000
cumulative = 0
for variant in test.variants:
cumulative += variant.weight
if normalized < cumulative:
return variant
return test.variants[-1] # Fallback
def record_result(
self,
test_id: str,
variant_name: str,
metrics: dict
):
"""Record a result for an A/B test."""
if test_id not in self.results:
self.results[test_id] = []
self.results[test_id].append({
"variant": variant_name,
"metrics": metrics,
"timestamp": datetime.now().isoformat()
})
def analyze_test(self, test_id: str) -> dict:
"""Analyze A/B test results."""
if test_id not in self.results:
return {"error": "No results for test"}
results = self.results[test_id]
# Group by variant
by_variant = {}
for r in results:
variant = r["variant"]
if variant not in by_variant:
by_variant[variant] = []
by_variant[variant].append(r["metrics"])
# Compute statistics for each variant
analysis = {}
for variant, metrics_list in by_variant.items():
success_rate = sum(
1 for m in metrics_list if m.get("outcome") == "success"
) / len(metrics_list)
avg_latency = sum(
m.get("latency_ms", 0) for m in metrics_list
) / len(metrics_list)
analysis[variant] = {
"sample_size": len(metrics_list),
"success_rate": success_rate,
"avg_latency_ms": avg_latency
}
# Statistical significance (simplified)
if len(by_variant) == 2:
variants = list(by_variant.keys())
n1, n2 = len(by_variant[variants[0]]), len(by_variant[variants[1]])
p1 = analysis[variants[0]]["success_rate"]
p2 = analysis[variants[1]]["success_rate"]
# Pooled proportion
p_pool = (p1 * n1 + p2 * n2) / (n1 + n2)
se = (p_pool * (1 - p_pool) * (1/n1 + 1/n2)) ** 0.5
if se > 0:
z_score = (p1 - p2) / se
# Rough p-value estimate
is_significant = abs(z_score) > 1.96 # 95% confidence
analysis["comparison"] = {
"z_score": z_score,
"is_significant": is_significant,
"winner": variants[0] if p1 > p2 else variants[1] if p2 > p1 else "tie"
}
return analysis
CI/CD Integration
GitHub Actions for Agent Evaluation
# .github/workflows/agent-eval.yml
name: Agent Evaluation
on:
pull_request:
paths:
- 'agent/**'
- 'prompts/**'
- 'tools/**'
schedule:
- cron: '0 2 * * *' # Nightly full eval
jobs:
quick-eval:
name: Quick Evaluation (PR Check)
runs-on: ubuntu-latest
if: github.event_name == 'pull_request'
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: |
pip install -r requirements.txt
pip install pytest pytest-json-report
- name: Run quick evaluation
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python -m pytest tests/eval/quick/ \
--json-report \
--json-report-file=eval-results.json
- name: Check evaluation thresholds
run: |
python scripts/check_eval_thresholds.py eval-results.json \
--min-success-rate 0.85 \
--max-avg-latency 5000
- name: Upload evaluation report
uses: actions/upload-artifact@v4
with:
name: eval-report
path: eval-results.json
full-eval:
name: Full Evaluation (Nightly)
runs-on: ubuntu-latest
if: github.event_name == 'schedule'
steps:
- uses: actions/checkout@v4
- name: Set up Python
uses: actions/setup-python@v5
with:
python-version: '3.11'
- name: Install dependencies
run: pip install -r requirements.txt
- name: Run full evaluation
env:
OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
run: |
python scripts/run_full_eval.py \
--dataset eval/full_dataset.yaml \
--output-dir results/ \
--parallel 5
- name: Generate report
run: python scripts/generate_eval_report.py results/
- name: Upload to dashboard
run: |
python scripts/upload_to_dashboard.py results/ \
--dashboard-url ${{ secrets.DASHBOARD_URL }}
- name: Notify on regression
if: failure()
uses: slackapi/slack-github-action@v1
with:
payload: |
{
"text": "Agent evaluation regression detected",
"blocks": [
{
"type": "section",
"text": {
"type": "mrkdwn",
"text": "*Agent Evaluation Failed*\nCheck the <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|workflow run> for details."
}
}
]
}
env:
SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}
Threshold Checker Script
# scripts/check_eval_thresholds.py
import json
import sys
import argparse
def check_thresholds(results_file: str, min_success_rate: float, max_avg_latency: float):
"""Check evaluation results against thresholds."""
with open(results_file) as f:
results = json.load(f)
# Calculate metrics
tests = results.get("tests", [])
passed = sum(1 for t in tests if t.get("outcome") == "passed")
total = len(tests)
success_rate = passed / total if total > 0 else 0
latencies = [
t.get("metadata", {}).get("latency_ms", 0)
for t in tests
]
avg_latency = sum(latencies) / len(latencies) if latencies else 0
print(f"Success rate: {success_rate:.2%} (threshold: {min_success_rate:.2%})")
print(f"Avg latency: {avg_latency:.0f}ms (threshold: {max_avg_latency:.0f}ms)")
# Check thresholds
failures = []
if success_rate < min_success_rate:
failures.append(
f"Success rate {success_rate:.2%} below threshold {min_success_rate:.2%}"
)
if avg_latency > max_avg_latency:
failures.append(
f"Avg latency {avg_latency:.0f}ms above threshold {max_avg_latency:.0f}ms"
)
if failures:
print("\nThreshold violations:")
for f in failures:
print(f" - {f}")
sys.exit(1)
print("\nAll thresholds passed!")
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("results_file")
parser.add_argument("--min-success-rate", type=float, default=0.85)
parser.add_argument("--max-avg-latency", type=float, default=5000)
args = parser.parse_args()
check_thresholds(args.results_file, args.min_success_rate, args.max_avg_latency)
Regression Testing Framework
from dataclasses import dataclass
from typing import Optional
import json
from datetime import datetime
@dataclass
class RegressionTest:
task_id: str
expected_outcome: str
expected_tools: list[str]
max_steps: int
max_latency_ms: float
baseline_success_rate: float
class RegressionTestSuite:
"""Track regressions against baseline performance."""
def __init__(self, baseline_file: str):
self.baseline = self._load_baseline(baseline_file)
self.current_results = []
def _load_baseline(self, path: str) -> dict:
"""Load baseline metrics."""
with open(path) as f:
return json.load(f)
def add_result(self, task_id: str, result: dict):
"""Add a test result."""
self.current_results.append({
"task_id": task_id,
"result": result,
"timestamp": datetime.now().isoformat()
})
def check_regressions(self) -> dict:
"""Compare current results against baseline."""
regressions = []
improvements = []
for result in self.current_results:
task_id = result["task_id"]
baseline = self.baseline.get(task_id)
if not baseline:
continue
current = result["result"]
# Check success rate regression
baseline_success = baseline.get("success_rate", 0)
current_success = 1.0 if current.get("outcome") == "success" else 0.0
if current_success < baseline_success - 0.1: # 10% tolerance
regressions.append({
"task_id": task_id,
"metric": "success_rate",
"baseline": baseline_success,
"current": current_success
})
elif current_success > baseline_success + 0.1:
improvements.append({
"task_id": task_id,
"metric": "success_rate",
"baseline": baseline_success,
"current": current_success
})
# Check latency regression
baseline_latency = baseline.get("avg_latency_ms", 0)
current_latency = current.get("latency_ms", 0)
if current_latency > baseline_latency * 1.5: # 50% tolerance
regressions.append({
"task_id": task_id,
"metric": "latency_ms",
"baseline": baseline_latency,
"current": current_latency
})
return {
"regressions": regressions,
"improvements": improvements,
"total_tests": len(self.current_results),
"regression_count": len(regressions),
"improvement_count": len(improvements)
}
def update_baseline(self, output_path: str):
"""Update baseline with current results."""
new_baseline = {}
for result in self.current_results:
task_id = result["task_id"]
current = result["result"]
new_baseline[task_id] = {
"success_rate": 1.0 if current.get("outcome") == "success" else 0.0,
"avg_latency_ms": current.get("latency_ms", 0),
"updated_at": datetime.now().isoformat()
}
with open(output_path, "w") as f:
json.dump(new_baseline, f, indent=2)
Human Evaluation Framework
Structured Human Review
from dataclasses import dataclass, field
from typing import Optional, Literal
from datetime import datetime
import uuid
@dataclass
class HumanEvaluation:
eval_id: str
task_id: str
evaluator_id: str
timestamp: datetime
# Core ratings (1-5 scale)
task_completion: int # Did the agent complete the task?
answer_quality: int # How good was the final answer?
reasoning_quality: int # How good was the reasoning?
efficiency: int # Was the approach efficient?
# Binary flags
factually_correct: bool
used_appropriate_tools: bool
handled_errors_well: bool
would_trust_in_production: bool
# Open feedback
strengths: list[str] = field(default_factory=list)
weaknesses: list[str] = field(default_factory=list)
suggestions: str = ""
# Metadata
time_to_evaluate_seconds: int = 0
class HumanEvaluationManager:
"""Manage human evaluation workflow."""
def __init__(self, storage_backend):
self.storage = storage_backend
self.active_reviews = {}
def create_review_batch(
self,
trajectory_ids: list[str],
evaluator_ids: list[str],
reviews_per_trajectory: int = 2
) -> str:
"""Create a batch of reviews to be completed."""
batch_id = str(uuid.uuid4())
assignments = []
for traj_id in trajectory_ids:
# Assign multiple evaluators per trajectory
assigned_evaluators = evaluator_ids[:reviews_per_trajectory]
for eval_id in assigned_evaluators:
assignments.append({
"trajectory_id": traj_id,
"evaluator_id": eval_id,
"status": "pending"
})
self.storage.create_batch(batch_id, assignments)
return batch_id
def get_next_review(self, evaluator_id: str) -> Optional[dict]:
"""Get the next trajectory for an evaluator to review."""
pending = self.storage.get_pending_reviews(evaluator_id)
if not pending:
return None
assignment = pending[0]
trajectory = self.storage.get_trajectory(assignment["trajectory_id"])
return {
"assignment_id": assignment["id"],
"trajectory": trajectory,
"task": trajectory["task"],
"steps": trajectory["steps"],
"final_answer": trajectory["final_answer"]
}
def submit_review(
self,
assignment_id: str,
evaluation: HumanEvaluation
):
"""Submit a completed review."""
self.storage.save_evaluation(assignment_id, evaluation)
self.storage.mark_complete(assignment_id)
def compute_inter_rater_reliability(self, batch_id: str) -> dict:
"""Compute agreement between evaluators."""
evaluations = self.storage.get_batch_evaluations(batch_id)
# Group by trajectory
by_trajectory = {}
for eval in evaluations:
traj_id = eval["trajectory_id"]
if traj_id not in by_trajectory:
by_trajectory[traj_id] = []
by_trajectory[traj_id].append(eval)
# Calculate agreement metrics
agreements = {
"task_completion": [],
"answer_quality": [],
"factually_correct": []
}
for traj_id, evals in by_trajectory.items():
if len(evals) < 2:
continue
# Pairwise agreement
for i, eval1 in enumerate(evals):
for eval2 in evals[i+1:]:
# Task completion agreement (within 1 point)
tc_agree = abs(eval1["task_completion"] - eval2["task_completion"]) <= 1
agreements["task_completion"].append(tc_agree)
# Answer quality agreement
aq_agree = abs(eval1["answer_quality"] - eval2["answer_quality"]) <= 1
agreements["answer_quality"].append(aq_agree)
# Factual correctness agreement (exact)
fc_agree = eval1["factually_correct"] == eval2["factually_correct"]
agreements["factually_correct"].append(fc_agree)
return {
metric: sum(values) / len(values) if values else 0
for metric, values in agreements.items()
}
def generate_calibration_report(self, evaluator_id: str) -> dict:
"""Generate calibration report for an evaluator."""
evaluator_evals = self.storage.get_evaluator_history(evaluator_id)
all_evals = self.storage.get_all_evaluations()
# Compare evaluator to population
metrics = ["task_completion", "answer_quality", "reasoning_quality"]
calibration = {}
for metric in metrics:
evaluator_avg = sum(e[metric] for e in evaluator_evals) / len(evaluator_evals)
population_avg = sum(e[metric] for e in all_evals) / len(all_evals)
calibration[metric] = {
"evaluator_avg": evaluator_avg,
"population_avg": population_avg,
"bias": evaluator_avg - population_avg,
"bias_direction": "lenient" if evaluator_avg > population_avg else "strict"
}
return calibration
Evaluation UI Component
# Flask routes for evaluation UI
from flask import Flask, render_template, request, jsonify
app = Flask(__name__)
eval_manager = HumanEvaluationManager(storage)
@app.route('/evaluate')
def evaluation_page():
"""Render evaluation interface."""
evaluator_id = request.args.get('evaluator_id')
review = eval_manager.get_next_review(evaluator_id)
if not review:
return render_template('no_reviews.html')
return render_template('evaluate.html', review=review)
@app.route('/api/submit_evaluation', methods=['POST'])
def submit_evaluation():
"""Submit evaluation via API."""
data = request.json
evaluation = HumanEvaluation(
eval_id=str(uuid.uuid4()),
task_id=data['task_id'],
evaluator_id=data['evaluator_id'],
timestamp=datetime.now(),
task_completion=data['task_completion'],
answer_quality=data['answer_quality'],
reasoning_quality=data['reasoning_quality'],
efficiency=data['efficiency'],
factually_correct=data['factually_correct'],
used_appropriate_tools=data['used_appropriate_tools'],
handled_errors_well=data['handled_errors_well'],
would_trust_in_production=data['would_trust_in_production'],
strengths=data.get('strengths', []),
weaknesses=data.get('weaknesses', []),
suggestions=data.get('suggestions', ''),
time_to_evaluate_seconds=data.get('time_seconds', 0)
)
eval_manager.submit_review(data['assignment_id'], evaluation)
return jsonify({'success': True})
@app.route('/api/dashboard/metrics')
def dashboard_metrics():
"""Get metrics for evaluation dashboard."""
return jsonify({
'total_evaluations': eval_manager.storage.count_evaluations(),
'pending_reviews': eval_manager.storage.count_pending(),
'avg_task_completion': eval_manager.storage.avg_metric('task_completion'),
'avg_answer_quality': eval_manager.storage.avg_metric('answer_quality'),
'trust_rate': eval_manager.storage.percentage_true('would_trust_in_production')
})
Benchmark Suites
Standard Agent Benchmarks
class AgentBenchmarkSuite:
"""Standard benchmark suite for agent evaluation."""
BENCHMARKS = {
"information_retrieval": {
"description": "Test agent's ability to find and synthesize information",
"tasks": [
{
"id": "ir-001",
"task": "Find the current CEO of OpenAI and their background",
"expected_tools": ["web_search"],
"verification": "factual_check"
},
{
"id": "ir-002",
"task": "What are the top 3 Python web frameworks by GitHub stars?",
"expected_tools": ["web_search", "github_api"],
"verification": "list_comparison"
}
]
},
"data_analysis": {
"description": "Test agent's ability to analyze data",
"tasks": [
{
"id": "da-001",
"task": "Calculate the average and standard deviation of sales in the sales table",
"expected_tools": ["query_database"],
"verification": "numeric_check"
},
{
"id": "da-002",
"task": "Find the top 5 customers by total order value",
"expected_tools": ["query_database"],
"verification": "query_result_check"
}
]
},
"multi_step_reasoning": {
"description": "Test agent's ability to chain multiple steps",
"tasks": [
{
"id": "ms-001",
"task": "Find all users who signed up last month, get their order history, and identify who hasn't made a purchase",
"expected_tools": ["query_database"],
"min_steps": 2,
"verification": "multi_query_check"
}
]
},
"error_recovery": {
"description": "Test agent's ability to recover from errors",
"tasks": [
{
"id": "er-001",
"task": "Query the users table (note: might be named 'customers')",
"expected_behavior": "retry_with_correction",
"mock_error": {"table": "users", "error": "relation does not exist"}
}
]
}
}
def __init__(self, agent, mock_tools: dict = None):
self.agent = agent
self.mock_tools = mock_tools or {}
async def run_benchmark(self, benchmark_name: str) -> dict:
"""Run a specific benchmark suite."""
if benchmark_name not in self.BENCHMARKS:
raise ValueError(f"Unknown benchmark: {benchmark_name}")
benchmark = self.BENCHMARKS[benchmark_name]
results = []
for task in benchmark["tasks"]:
result = await self._run_task(task)
results.append(result)
return {
"benchmark": benchmark_name,
"description": benchmark["description"],
"results": results,
"summary": self._summarize_results(results)
}
async def run_all_benchmarks(self) -> dict:
"""Run all benchmark suites."""
all_results = {}
for name in self.BENCHMARKS:
all_results[name] = await self.run_benchmark(name)
return {
"benchmarks": all_results,
"overall_summary": self._overall_summary(all_results)
}
async def _run_task(self, task: dict) -> dict:
"""Run a single benchmark task."""
import time
start = time.time()
try:
# Run agent
result = await self.agent.run(task["task"])
# Verify result
verification = await self._verify_result(
task,
result,
task.get("verification")
)
return {
"task_id": task["id"],
"success": verification["passed"],
"result": result,
"verification": verification,
"latency_ms": (time.time() - start) * 1000
}
except Exception as e:
return {
"task_id": task["id"],
"success": False,
"error": str(e),
"latency_ms": (time.time() - start) * 1000
}
async def _verify_result(self, task: dict, result: dict, verification_type: str) -> dict:
"""Verify task result."""
if verification_type == "factual_check":
# Use LLM to verify factual accuracy
return await self._factual_verification(task, result)
elif verification_type == "numeric_check":
# Check numeric results
return self._numeric_verification(task, result)
elif verification_type == "list_comparison":
# Compare list results
return self._list_verification(task, result)
else:
# Default: check if task completed without error
return {"passed": result.get("outcome") == "success"}
def _summarize_results(self, results: list) -> dict:
"""Summarize benchmark results."""
passed = sum(1 for r in results if r.get("success"))
total = len(results)
latencies = [r.get("latency_ms", 0) for r in results]
return {
"passed": passed,
"total": total,
"success_rate": passed / total if total > 0 else 0,
"avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0
}
def _overall_summary(self, all_results: dict) -> dict:
"""Generate overall benchmark summary."""
total_passed = 0
total_tasks = 0
for benchmark in all_results.values():
summary = benchmark.get("summary", {})
total_passed += summary.get("passed", 0)
total_tasks += summary.get("total", 0)
return {
"total_passed": total_passed,
"total_tasks": total_tasks,
"overall_success_rate": total_passed / total_tasks if total_tasks > 0 else 0
}
Conclusion
Agent evaluation requires a multi-dimensional approach:
- Task success is the primary metric, but it's not enough alone
- Tool use correctness reveals whether agents are reasoning properly about capabilities
- Trajectory analysis helps understand and debug agent behavior
- Sandboxed execution enables safe evaluation without production risk
- Production monitoring catches regressions and issues in real-time
- A/B testing validates improvements before full rollout
Start with a small, well-curated evaluation dataset. Add metrics and analysis capabilities incrementally. The investment in evaluation infrastructure pays dividends in agent reliability and user trust.
Frequently Asked Questions
Related Articles
Building Agentic AI Systems: A Complete Implementation Guide
A comprehensive guide to building AI agents—tool use, ReAct pattern, planning, memory, context management, MCP integration, and multi-agent orchestration. With full prompt examples and production patterns.
LLM Evaluation in Production: Beyond Benchmarks
How to evaluate LLM performance in real-world applications, where academic benchmarks often fail to capture what matters.
LLM Observability and Monitoring: From Development to Production
A comprehensive guide to LLM observability—tracing, metrics, cost tracking, and the tools that make production AI systems reliable. Comparing LangSmith, Langfuse, Arize Phoenix, and more.