Skip to main content
Back to Blog

Agent Evaluation and Testing: From Development to Production

A comprehensive guide to evaluating AI agents—task success metrics, trajectory analysis, tool use correctness, sandboxing, and building robust testing pipelines for production agent systems.

11 min read
Share:

The Agent Evaluation Challenge

Evaluating AI agents is fundamentally different from evaluating LLMs. A language model produces text; an agent produces actions—and those actions have real-world consequences. Your agent might successfully generate correct SQL but fail because it queried the wrong database. It might use the right tools but in the wrong order. It might complete a task but take 50 steps when 5 would suffice.

This guide covers the complete agent evaluation stack: metrics that matter, testing frameworks, trajectory analysis, sandboxing for safe evaluation, and production monitoring.

Prerequisites:

  • Familiarity with building agentic AI systems
  • Understanding of LLM evaluation basics
  • Experience deploying ML systems

What you'll learn:

  • Agent-specific evaluation metrics beyond LLM benchmarks
  • Building evaluation datasets and test suites
  • Trajectory analysis and debugging
  • Sandboxed execution environments
  • Production monitoring and alerting
  • A/B testing for agents

Why Agent Evaluation Is Different

Traditional LLM evaluation asks: "Is this output correct?" Agent evaluation must ask:

DimensionLLM EvaluationAgent Evaluation
OutputText qualityTask completion
ProcessN/ATool selection, ordering, efficiency
SafetyContent filteringAction consequences
CostTokensTokens + API calls + compute
LatencyTime to first tokenEnd-to-end task time
StateStatelessMulti-step with memory

An agent that produces beautiful reasoning but calls the wrong API is worse than one with terse reasoning that completes the task.

Core Agent Metrics

1. Task Success Rate

The most important metric: did the agent complete the task correctly?

Evaluating this is tricky because "correct" isn't always binary. Consider an agent asked to summarize a document: there's no single correct answer, just degrees of quality. We use LLM-as-judge to evaluate semantic similarity between actual and expected answers, with a threshold (0.8) to determine success.

Why task success evaluation is harder than LLM evaluation: With LLMs, you compare generated text to reference text. With agents, you must evaluate whether a sequence of actions achieved a goal. The agent might take an unexpected path but still succeed. Or it might execute the expected plan but fail due to external factors (API down, data missing). You need to separate "agent behavior quality" from "task outcome."

The three evaluation strategies and when to use each:

  1. Exact match - Use for deterministic tasks with one correct answer. Example: "What is the capital of France?" Expected: "Paris". Actual must match exactly (after normalization). Fast, cheap, but only works for factual queries.

  2. LLM-as-judge semantic comparison - Use for generation tasks with multiple valid answers. Example: "Summarize this document in 3 sentences." The LLM judge compares semantic similarity between expected and actual summaries. Flexible, handles paraphrasing, but costs tokens and can be inconsistent.

  3. Custom callable criteria - Use for programmatic validation. Example: "Generate valid JSON with these fields." Check if output parses and contains required keys. Deterministic, fast, domain-specific.

The TaskResult data structure is the agent's report card: It captures not just success/failure but the full story: how many steps were taken (efficiency), which tools were called (strategy), errors encountered (failure modes), tokens used (cost), and latency (performance). This rich signal enables diagnosing WHY an agent succeeded or failed, not just THAT it did.

The TaskEvaluator implementation below shows production-grade evaluation:

Key design decisions:

  • Multiple outcome types beyond binary success/failure (partial success, error, timeout) - real agents fail in many ways
  • Verified success - The agent reported success, but was it actually correct? Cross-check against ground truth
  • Efficiency metrics tracked alongside correctness - A slow, expensive success is still a problem
  • LLM judge with fallback - If judge unavailable, fall back to exact match rather than failing evaluation
  • Partial completion estimation - For debugging: if agent failed, how far did it get?

The _compare_answers method implements the LLM-as-judge pattern correctly: clear evaluation criteria (semantic similarity, key facts, meaning equivalence), request just a number (not verbose explanation), handle parsing failures gracefully. The prompt is deliberately simple—complex scoring rubrics increase judge variance.

Python
from dataclasses import dataclass
from typing import Optional, Callable
from enum import Enum

class TaskOutcome(Enum):
    SUCCESS = "success"
    PARTIAL_SUCCESS = "partial_success"
    FAILURE = "failure"
    ERROR = "error"
    TIMEOUT = "timeout"

@dataclass
class TaskResult:
    task_id: str
    outcome: TaskOutcome
    final_answer: Optional[str]
    expected_answer: Optional[str]
    steps_taken: int
    total_tokens: int
    latency_ms: float
    tool_calls: list[dict]
    errors: list[str]

class TaskEvaluator:
    """Evaluate task completion."""

    def __init__(self, judge_llm=None):
        self.judge_llm = judge_llm

    def evaluate(
        self,
        task: str,
        result: TaskResult,
        ground_truth: Optional[str] = None,
        success_criteria: Optional[Callable[[str], bool]] = None
    ) -> dict:
        """Evaluate a task result."""

        evaluation = {
            "task_id": result.task_id,
            "outcome": result.outcome.value,
            "metrics": {}
        }

        # Outcome-based evaluation
        if result.outcome == TaskOutcome.SUCCESS:
            if ground_truth:
                # Compare against ground truth
                match_score = self._compare_answers(
                    result.final_answer,
                    ground_truth
                )
                evaluation["metrics"]["answer_match"] = match_score
                evaluation["verified_success"] = match_score > 0.8

            if success_criteria:
                # Apply custom success criteria
                evaluation["verified_success"] = success_criteria(result.final_answer)

        elif result.outcome == TaskOutcome.PARTIAL_SUCCESS:
            # Evaluate partial completion
            evaluation["metrics"]["completion_percentage"] = self._estimate_completion(
                task, result
            )

        # Efficiency metrics
        evaluation["metrics"]["steps"] = result.steps_taken
        evaluation["metrics"]["tokens"] = result.total_tokens
        evaluation["metrics"]["latency_ms"] = result.latency_ms
        evaluation["metrics"]["tool_calls"] = len(result.tool_calls)

        return evaluation

    def _compare_answers(self, actual: str, expected: str) -> float:
        """Compare actual answer to expected using LLM judge."""
        if not self.judge_llm:
            # Fall back to exact match
            return 1.0 if actual.strip().lower() == expected.strip().lower() else 0.0

        prompt = f"""Compare these two answers and rate their semantic similarity from 0.0 to 1.0.

Expected answer: {expected}

Actual answer: {actual}

Consider:
- Do they convey the same information?
- Are key facts correct?
- Is the meaning equivalent even if wording differs?

Score (just the number):"""

        response = self.judge_llm.chat([{"role": "user", "content": prompt}])
        try:
            return float(response.content.strip())
        except ValueError:
            return 0.5

    def _estimate_completion(self, task: str, result: TaskResult) -> float:
        """Estimate partial task completion percentage."""
        if not self.judge_llm:
            return 0.5

        prompt = f"""Estimate what percentage of this task was completed.

Task: {task}

Agent's work:
- Steps taken: {result.steps_taken}
- Tool calls: {len(result.tool_calls)}
- Final output: {result.final_answer[:500] if result.final_answer else 'None'}
- Errors: {result.errors}

Completion percentage (0-100):"""

        response = self.judge_llm.chat([{"role": "user", "content": prompt}])
        try:
            return float(response.content.strip()) / 100
        except ValueError:
            return 0.5

2. Tool Use Correctness

Evaluate whether the agent used the right tools correctly. This goes beyond "did tools succeed?"—we measure:

  • Selection: Did the agent pick appropriate tools for the task?
  • Arguments: Were tool arguments valid and sensible?
  • Efficiency: Did the agent make redundant or unnecessary calls?
  • Coverage: Were all necessary tools used?

Why tool evaluation is often more informative than task success: A task can succeed despite poor tool use (lucky path), or fail despite excellent tool use (the task was impossible). Tool evaluation isolates agent behavior from task difficulty. If an agent makes 50 redundant API calls before succeeding, that's valuable signal even though task success is 100%. Conversely, if an agent uses tools perfectly but fails because the information wasn't in your database, that's not an agent problem—it's a data problem.

The taxonomy of tool failures: Not all tool failures are equal. Selection failures (choosing wrong tool) indicate reasoning problems. Argument failures (right tool, wrong inputs) indicate schema understanding problems. Redundant calls indicate state management problems—the agent forgot it already made this call. Missing calls indicate planning problems—the agent didn't realize a step was needed. Each failure type suggests different fixes.

The _detect_redundant_calls method catches a common failure mode: agents retrying the same call repeatedly (often from confusion or hallucinated errors). The time_window parameter groups calls close in time—if the same tool with same args is called within 5 seconds, it's likely a bug.

Python
@dataclass
class ToolCall:
    tool_name: str
    arguments: dict
    result: str
    timestamp: float
    success: bool

@dataclass
class ToolUseMetrics:
    total_calls: int
    successful_calls: int
    failed_calls: int
    redundant_calls: int
    missing_calls: list[str]
    incorrect_args: list[dict]
    tool_selection_score: float

class ToolUseEvaluator:
    """Evaluate tool use patterns."""

    def __init__(self, judge_llm=None):
        self.judge_llm = judge_llm

    def evaluate(
        self,
        task: str,
        tool_calls: list[ToolCall],
        expected_tools: Optional[list[str]] = None,
        tool_schemas: Optional[dict] = None
    ) -> ToolUseMetrics:
        """Evaluate tool use for a task."""

        total = len(tool_calls)
        successful = sum(1 for tc in tool_calls if tc.success)
        failed = total - successful

        # Detect redundant calls (same tool, same args, close in time)
        redundant = self._detect_redundant_calls(tool_calls)

        # Check for missing expected tools
        missing = []
        if expected_tools:
            used_tools = set(tc.tool_name for tc in tool_calls)
            missing = [t for t in expected_tools if t not in used_tools]

        # Validate arguments against schemas
        incorrect_args = []
        if tool_schemas:
            incorrect_args = self._validate_arguments(tool_calls, tool_schemas)

        # Overall tool selection score
        selection_score = self._evaluate_tool_selection(task, tool_calls)

        return ToolUseMetrics(
            total_calls=total,
            successful_calls=successful,
            failed_calls=failed,
            redundant_calls=redundant,
            missing_calls=missing,
            incorrect_args=incorrect_args,
            tool_selection_score=selection_score
        )

    def _detect_redundant_calls(
        self,
        tool_calls: list[ToolCall],
        time_window: float = 5.0
    ) -> int:
        """Detect redundant tool calls."""
        redundant = 0

        for i, call in enumerate(tool_calls[1:], 1):
            for prev_call in tool_calls[:i]:
                if (
                    call.tool_name == prev_call.tool_name and
                    call.arguments == prev_call.arguments and
                    call.timestamp - prev_call.timestamp < time_window
                ):
                    redundant += 1
                    break

        return redundant

    def _validate_arguments(
        self,
        tool_calls: list[ToolCall],
        schemas: dict
    ) -> list[dict]:
        """Validate tool arguments against schemas."""
        from jsonschema import validate, ValidationError

        incorrect = []

        for call in tool_calls:
            if call.tool_name in schemas:
                schema = schemas[call.tool_name]
                try:
                    validate(instance=call.arguments, schema=schema)
                except ValidationError as e:
                    incorrect.append({
                        "tool": call.tool_name,
                        "arguments": call.arguments,
                        "error": str(e)
                    })

        return incorrect

    def _evaluate_tool_selection(
        self,
        task: str,
        tool_calls: list[ToolCall]
    ) -> float:
        """Evaluate whether correct tools were selected for the task."""
        if not self.judge_llm or not tool_calls:
            return 0.5

        tool_sequence = " -> ".join(tc.tool_name for tc in tool_calls)

        prompt = f"""Evaluate the tool selection for this task.

Task: {task}

Tools used (in order): {tool_sequence}

Rate from 0.0 to 1.0:
- Were the right tools selected?
- Were they used in a logical order?
- Were any unnecessary tools called?
- Were any necessary tools missing?

Score (just the number):"""

        response = self.judge_llm.chat([{"role": "user", "content": prompt}])
        try:
            return float(response.content.strip())
        except ValueError:
            return 0.5

3. Reasoning Quality

Evaluate the quality of the agent's reasoning.

Why reasoning quality matters beyond outcomes: An agent that arrives at the correct answer through flawed reasoning is a liability—it will fail unpredictably on similar problems. Evaluating reasoning quality catches "right answer, wrong process" situations. This is especially important for high-stakes applications where you need to trust not just the answer, but the path that led there.

The four dimensions of reasoning quality: We evaluate coherence (does each step follow logically from the previous?), relevance (is the reasoning focused on the task or wandering?), efficiency (is this a reasonable path or needlessly convoluted?), and error recovery (when something goes wrong, does the agent adapt?). Different agents fail on different dimensions—some are coherent but inefficient, others efficient but easily derailed by errors.

Using LLM-as-judge for reasoning evaluation: Reasoning quality is inherently subjective—humans disagree about what constitutes "good" reasoning. LLM judges provide consistent (if imperfect) evaluation at scale. The key is well-designed rubrics: rather than asking "is this good reasoning?" (too vague), we ask specific questions about logical flow, contradiction detection, and goal relevance.

Python
@dataclass
class ReasoningMetrics:
    coherence_score: float
    relevance_score: float
    efficiency_score: float
    error_recovery_score: float
    overall_score: float
    issues: list[str]

class ReasoningEvaluator:
    """Evaluate agent reasoning quality."""

    def __init__(self, judge_llm):
        self.judge_llm = judge_llm

    def evaluate(
        self,
        task: str,
        reasoning_trace: list[str],
        tool_calls: list[ToolCall],
        final_answer: str
    ) -> ReasoningMetrics:
        """Evaluate reasoning quality."""

        # Evaluate different dimensions
        coherence = self._evaluate_coherence(reasoning_trace)
        relevance = self._evaluate_relevance(task, reasoning_trace)
        efficiency = self._evaluate_efficiency(reasoning_trace, tool_calls)
        recovery = self._evaluate_error_recovery(reasoning_trace, tool_calls)

        # Identify specific issues
        issues = self._identify_issues(task, reasoning_trace, tool_calls, final_answer)

        overall = (coherence + relevance + efficiency + recovery) / 4

        return ReasoningMetrics(
            coherence_score=coherence,
            relevance_score=relevance,
            efficiency_score=efficiency,
            error_recovery_score=recovery,
            overall_score=overall,
            issues=issues
        )

    def _evaluate_coherence(self, reasoning_trace: list[str]) -> float:
        """Evaluate logical coherence of reasoning."""
        if len(reasoning_trace) < 2:
            return 1.0

        trace_text = "\n".join(reasoning_trace)

        prompt = f"""Evaluate the logical coherence of this reasoning trace.

Reasoning:
{trace_text}

Consider:
- Does each step follow logically from the previous?
- Are there contradictions?
- Is the reasoning internally consistent?

Score from 0.0 (incoherent) to 1.0 (perfectly coherent):"""

        response = self.judge_llm.chat([{"role": "user", "content": prompt}])
        try:
            return float(response.content.strip())
        except ValueError:
            return 0.5

    def _evaluate_relevance(self, task: str, reasoning_trace: list[str]) -> float:
        """Evaluate relevance of reasoning to task."""
        trace_text = "\n".join(reasoning_trace)

        prompt = f"""Evaluate how relevant this reasoning is to the task.

Task: {task}

Reasoning:
{trace_text}

Consider:
- Does the reasoning address the task directly?
- Are there irrelevant tangents?
- Does it stay focused on the goal?

Score from 0.0 (irrelevant) to 1.0 (highly relevant):"""

        response = self.judge_llm.chat([{"role": "user", "content": prompt}])
        try:
            return float(response.content.strip())
        except ValueError:
            return 0.5

    def _evaluate_efficiency(
        self,
        reasoning_trace: list[str],
        tool_calls: list[ToolCall]
    ) -> float:
        """Evaluate reasoning efficiency."""
        # Heuristic: penalize excessive steps
        step_count = len(reasoning_trace)
        tool_count = len(tool_calls)

        # Baseline expectations (adjust based on your domain)
        expected_steps = 5
        expected_tools = 3

        step_efficiency = min(1.0, expected_steps / max(step_count, 1))
        tool_efficiency = min(1.0, expected_tools / max(tool_count, 1))

        return (step_efficiency + tool_efficiency) / 2

    def _evaluate_error_recovery(
        self,
        reasoning_trace: list[str],
        tool_calls: list[ToolCall]
    ) -> float:
        """Evaluate how well the agent recovered from errors."""
        failed_calls = [tc for tc in tool_calls if not tc.success]

        if not failed_calls:
            return 1.0  # No errors to recover from

        # Check if agent adapted after failures
        trace_text = "\n".join(reasoning_trace)

        prompt = f"""Evaluate how well the agent recovered from errors.

Failed tool calls: {len(failed_calls)}
Reasoning trace:
{trace_text}

Consider:
- Did the agent acknowledge errors?
- Did it try alternative approaches?
- Did it eventually succeed despite failures?

Score from 0.0 (no recovery) to 1.0 (excellent recovery):"""

        response = self.judge_llm.chat([{"role": "user", "content": prompt}])
        try:
            return float(response.content.strip())
        except ValueError:
            return 0.5

    def _identify_issues(
        self,
        task: str,
        reasoning_trace: list[str],
        tool_calls: list[ToolCall],
        final_answer: str
    ) -> list[str]:
        """Identify specific issues in reasoning."""
        issues = []

        # Check for common issues
        trace_text = " ".join(reasoning_trace).lower()

        # Hallucination indicators
        if "i don't have" in trace_text or "i cannot" in trace_text:
            if any(tc.success for tc in tool_calls):
                issues.append("Potential hallucination: claimed inability despite successful tool use")

        # Loop indicators
        if self._detect_reasoning_loop(reasoning_trace):
            issues.append("Reasoning loop detected")

        # Off-topic indicators
        if self._detect_off_topic(task, reasoning_trace):
            issues.append("Off-topic reasoning detected")

        # Ignored error indicators
        failed_calls = [tc for tc in tool_calls if not tc.success]
        if failed_calls and "error" not in trace_text:
            issues.append("Tool errors may have been ignored")

        return issues

    def _detect_reasoning_loop(self, reasoning_trace: list[str], window: int = 3) -> bool:
        """Detect if reasoning is stuck in a loop."""
        if len(reasoning_trace) < window * 2:
            return False

        # Check for repeated patterns
        for i in range(len(reasoning_trace) - window):
            pattern = reasoning_trace[i:i+window]
            for j in range(i + window, len(reasoning_trace) - window + 1):
                if reasoning_trace[j:j+window] == pattern:
                    return True

        return False

    def _detect_off_topic(self, task: str, reasoning_trace: list[str]) -> bool:
        """Detect if reasoning went off-topic."""
        # Simple heuristic: check if key task terms appear in reasoning
        task_terms = set(task.lower().split())
        reasoning_text = " ".join(reasoning_trace).lower()

        common_terms = {"the", "a", "an", "is", "are", "to", "for", "and", "or"}
        task_terms -= common_terms

        if not task_terms:
            return False

        mentioned = sum(1 for term in task_terms if term in reasoning_text)
        return mentioned / len(task_terms) < 0.3

4. Cost Efficiency

Track and evaluate resource usage:

Why cost metrics are critical for production agents: Agents can be expensive. A naive agent might retry failed operations indefinitely, rack up thousands of unnecessary tool calls, or use GPT-4 when GPT-4o-mini would suffice. Without cost monitoring, you'll get surprise bills. With it, you can identify inefficient behaviors and optimize.

The hidden costs beyond token usage: Tokens are obvious, but agents incur costs you might miss: (1) API calls to tools—each database query, web search, or external API has a cost, (2) Compute time—long-running agents tie up resources and hurt user experience, (3) Opportunity cost—slow agents mean fewer requests served per dollar of infrastructure.

Efficiency metrics require a baseline: Saying "this agent used 10K tokens" is meaningless without context. Is that good? The CostEvaluator compares against a baseline (previous version, competitor, or theoretical minimum) to compute efficiency scores. If your new agent uses 2x the tokens for the same success rate, that's a red flag—investigate before deploying.

The token breakdown insight: Not all tokens are equal. Input tokens (prompts, context) are typically 3-10x cheaper than output tokens (generation). An agent that uses 5K input + 1K output costs less than one using 2K input + 2K output, despite using more total tokens. Track them separately to optimize correctly.

Model pricing tiers as a first-class concern: The PRICING dictionary in the code reflects real-world costs as of January 2025. Notice the massive range: GPT-4o-mini is 16x cheaper than GPT-4o for input, 16x for output. For many tasks, the cheaper model works fine. Cost evaluation should drive model selection: measure success rate by model, choose the cheapest model that meets your quality threshold.

The cost-per-task metric is your north star: Total spend is less actionable than per-task costs. "500/month"doesnttellyouifyoureefficient."500/month" doesn't tell you if you're efficient. "0.15 per customer support query" does—you can compare to human support costs ($5-15 per query), see trends over time, and set cost budgets per task type.

Python
@dataclass
class CostMetrics:
    total_tokens: int
    input_tokens: int
    output_tokens: int
    tool_call_count: int
    api_calls: int
    compute_time_ms: float
    estimated_cost_usd: float
    cost_per_task: float
    efficiency_score: float

class CostEvaluator:
    """Evaluate agent cost efficiency."""

    # Pricing per 1K tokens (example rates)
    PRICING = {
        "gpt-4o": {"input": 0.0025, "output": 0.01},
        "gpt-4o-mini": {"input": 0.00015, "output": 0.0006},
        "claude-3-5-sonnet": {"input": 0.003, "output": 0.015},
        "claude-3-5-haiku": {"input": 0.0008, "output": 0.004},
    }

    def __init__(self, model: str = "gpt-4o"):
        self.model = model
        self.pricing = self.PRICING.get(model, {"input": 0.01, "output": 0.03})

    def evaluate(
        self,
        task_results: list[TaskResult],
        baseline_metrics: Optional[dict] = None
    ) -> CostMetrics:
        """Evaluate cost efficiency across tasks."""

        total_input = sum(r.total_tokens * 0.3 for r in task_results)  # Estimate
        total_output = sum(r.total_tokens * 0.7 for r in task_results)
        total_tokens = sum(r.total_tokens for r in task_results)
        total_tool_calls = sum(len(r.tool_calls) for r in task_results)
        total_time = sum(r.latency_ms for r in task_results)

        estimated_cost = (
            (total_input / 1000) * self.pricing["input"] +
            (total_output / 1000) * self.pricing["output"]
        )

        cost_per_task = estimated_cost / len(task_results) if task_results else 0

        # Calculate efficiency score
        efficiency = self._calculate_efficiency(
            task_results, baseline_metrics
        )

        return CostMetrics(
            total_tokens=int(total_tokens),
            input_tokens=int(total_input),
            output_tokens=int(total_output),
            tool_call_count=total_tool_calls,
            api_calls=len(task_results),
            compute_time_ms=total_time,
            estimated_cost_usd=estimated_cost,
            cost_per_task=cost_per_task,
            efficiency_score=efficiency
        )

    def _calculate_efficiency(
        self,
        results: list[TaskResult],
        baseline: Optional[dict]
    ) -> float:
        """Calculate efficiency relative to baseline."""
        if not baseline:
            return 0.5

        # Compare to baseline metrics
        avg_tokens = sum(r.total_tokens for r in results) / len(results)
        avg_steps = sum(r.steps_taken for r in results) / len(results)

        baseline_tokens = baseline.get("avg_tokens", avg_tokens)
        baseline_steps = baseline.get("avg_steps", avg_steps)

        token_efficiency = min(1.0, baseline_tokens / max(avg_tokens, 1))
        step_efficiency = min(1.0, baseline_steps / max(avg_steps, 1))

        return (token_efficiency + step_efficiency) / 2

Building Evaluation Datasets

Task Dataset Structure

Why evaluation datasets are the foundation of agent quality: You can't improve what you don't measure, and you can't measure without representative test cases. A good evaluation dataset is your agent's report card—it tells you what works, what breaks, and where to focus development effort. Without one, you're flying blind, deploying changes based on vibes instead of data.

The challenge of creating agent evaluation datasets: Unlike LLM benchmarks (where you just need question-answer pairs), agent evaluation requires specifying the task, the expected outcome, acceptable tool usage patterns, and success criteria that may be programmatic (output must be valid JSON) or semantic (answer must address the key points). You're not just evaluating text quality—you're evaluating task completion behavior.

Dataset structure principles:

  1. Rich metadata - Each task needs more than just description + expected output. You need category (for filtering), difficulty (for stratified evaluation), expected tools (to catch tool selection bugs), timeout (to catch infinite loops), and tags (for slicing results).

  2. Versioned and reproducible - Include version numbers. As your agent improves, earlier benchmarks become easier. Track version-over-version progress. Lock random seeds for sampling to ensure reproducibility.

  3. Hierarchical organization - Organize by category and difficulty. This enables targeted evaluation: "How does my agent perform on data analysis vs code generation?" or "What's the success rate on hard tasks?"

  4. Multiple success criteria types - Some tasks have exact answers (factual queries), some have semantic answers (summaries), some have programmatic validation (code that passes tests). Support all three via expected_output (for exact/semantic) and success_criteria (for programmatic).

The YAML format for agent test suites: YAML is human-editable and version-control friendly. Engineers can write tests, domain experts can review them, and you can diff changes easily. The structure maps directly to the Python dataclasses, making loading trivial.

Why filtering and sampling matter: You won't run your full benchmark on every change. For rapid iteration, you'll sample (e.g., 20 random tasks for quick checks). For debugging specific issues, you'll filter by category ("show me all failed database query tasks"). The dataset API must support these workflows efficiently.

Example YAML structure for reference:

YAML
name: "agent-eval-v1"
version: "1.0"
description: "Comprehensive agent evaluation dataset"
tasks:
  - id: "sql-001"
    description: "Generate SQL query for customer count by region"
    category: "database"
    difficulty: "easy"
    expected_output: "SELECT region, COUNT(*) FROM customers GROUP BY region"
    expected_tools: ["sql_generator", "sql_validator"]
    timeout_seconds: 30
    tags: ["sql", "aggregation"]
Python
from dataclasses import dataclass, field
from typing import Optional, Any
import json
import yaml

@dataclass
class EvalTask:
    id: str
    description: str
    category: str
    difficulty: str  # easy, medium, hard
    expected_output: Optional[str] = None
    expected_tools: list[str] = field(default_factory=list)
    success_criteria: Optional[str] = None  # Python expression
    timeout_seconds: int = 120
    tags: list[str] = field(default_factory=list)
    metadata: dict = field(default_factory=dict)

@dataclass
class EvalDataset:
    name: str
    version: str
    tasks: list[EvalTask]
    categories: list[str]
    description: str = ""

    def filter_by_category(self, category: str) -> "EvalDataset":
        """Filter tasks by category."""
        filtered = [t for t in self.tasks if t.category == category]
        return EvalDataset(
            name=f"{self.name}:{category}",
            version=self.version,
            tasks=filtered,
            categories=[category],
            description=self.description
        )

    def filter_by_difficulty(self, difficulty: str) -> "EvalDataset":
        """Filter tasks by difficulty."""
        filtered = [t for t in self.tasks if t.difficulty == difficulty]
        return EvalDataset(
            name=f"{self.name}:{difficulty}",
            version=self.version,
            tasks=filtered,
            categories=self.categories,
            description=self.description
        )

    def sample(self, n: int, seed: int = 42) -> "EvalDataset":
        """Sample n tasks randomly."""
        import random
        random.seed(seed)
        sampled = random.sample(self.tasks, min(n, len(self.tasks)))
        return EvalDataset(
            name=f"{self.name}:sample_{n}",
            version=self.version,
            tasks=sampled,
            categories=self.categories,
            description=self.description
        )

    @classmethod
    def from_yaml(cls, path: str) -> "EvalDataset":
        """Load dataset from YAML file."""
        with open(path) as f:
            data = yaml.safe_load(f)

        tasks = [EvalTask(**t) for t in data.get("tasks", [])]
        return cls(
            name=data.get("name", "unnamed"),
            version=data.get("version", "1.0"),
            tasks=tasks,
            categories=list(set(t.category for t in tasks)),
            description=data.get("description", "")
        )

    def to_yaml(self, path: str):
        """Save dataset to YAML file."""
        data = {
            "name": self.name,
            "version": self.version,
            "description": self.description,
            "tasks": [
                {
                    "id": t.id,
                    "description": t.description,
                    "category": t.category,
                    "difficulty": t.difficulty,
                    "expected_output": t.expected_output,
                    "expected_tools": t.expected_tools,
                    "success_criteria": t.success_criteria,
                    "timeout_seconds": t.timeout_seconds,
                    "tags": t.tags,
                    "metadata": t.metadata
                }
                for t in self.tasks
            ]
        }
        with open(path, "w") as f:
            yaml.dump(data, f, default_flow_style=False)

Example Dataset

YAML
# eval_dataset.yaml
name: "agent-eval-v1"
version: "1.0.0"
description: "Comprehensive agent evaluation dataset"

tasks:
  # Information Retrieval
  - id: "info-001"
    description: "Find the current population of Tokyo"
    category: "information_retrieval"
    difficulty: "easy"
    expected_output: "approximately 14 million"
    expected_tools: ["web_search"]
    tags: ["search", "factual"]

  - id: "info-002"
    description: "Find the CEOs of the top 3 tech companies by market cap and their tenure"
    category: "information_retrieval"
    difficulty: "medium"
    expected_tools: ["web_search"]
    tags: ["search", "multi-step", "synthesis"]

  # Data Analysis
  - id: "data-001"
    description: "Query the users table and find the top 5 users by order count"
    category: "data_analysis"
    difficulty: "easy"
    expected_tools: ["query_database"]
    success_criteria: "len(result.get('users', [])) == 5"
    tags: ["sql", "aggregation"]

  - id: "data-002"
    description: "Analyze user growth over the past 12 months and identify seasonal patterns"
    category: "data_analysis"
    difficulty: "hard"
    expected_tools: ["query_database", "calculate"]
    tags: ["sql", "analysis", "multi-step"]

  # Code Tasks
  - id: "code-001"
    description: "Read the main.py file and identify any potential security issues"
    category: "code_analysis"
    difficulty: "medium"
    expected_tools: ["read_file"]
    tags: ["security", "code-review"]

  - id: "code-002"
    description: "Find all TODO comments in the codebase and summarize them"
    category: "code_analysis"
    difficulty: "easy"
    expected_tools: ["search_code", "read_file"]
    tags: ["search", "summarization"]

  # Multi-Tool Tasks
  - id: "multi-001"
    description: "Research competitor pricing, calculate the average, and store results"
    category: "multi_tool"
    difficulty: "hard"
    expected_tools: ["web_search", "calculate", "write_file"]
    timeout_seconds: 180
    tags: ["research", "calculation", "storage"]

  # Error Recovery
  - id: "recovery-001"
    description: "Query user data from the users table (note: table might be named 'customers')"
    category: "error_recovery"
    difficulty: "medium"
    expected_tools: ["query_database"]
    metadata:
      expected_error: "table not found"
      recovery_expected: true
    tags: ["error-handling", "adaptation"]

Programmatic Dataset Generation

Why you can't manually create enough test cases: A good evaluation dataset needs hundreds of tasks to cover the agent's capability space. Writing them by hand is tedious and introduces bias—you'll test what you think of, not what users actually do. Programmatic generation scales test creation and finds edge cases you'd never manually write.

The three types of programmatic generation:

  1. Variations from templates - Take a base task ("Find population of Tokyo") and generate variants ("Find population of Paris/London/Berlin..."). Same capability tested, different surface forms. Catches agents that memorize rather than generalize.

  2. Adversarial generation - Deliberately create tasks designed to break your agent. Common adversarial patterns: tasks with ambiguous phrasing, missing information, conflicting requirements, or red herrings. If you only test "happy path" tasks, you'll miss failure modes.

  3. Domain-specific synthesis - Use domain knowledge to generate realistic tasks. For SQL agents: enumerate all SQL operations (SELECT, JOIN, GROUP BY, etc.) × different data types × different table schemas. For web search agents: categories × information types × temporal requirements.

Why LLM-generated test cases need human review: LLMs can generate test cases faster than humans, but quality varies. Common LLM generation failures: duplicate tasks (same semantic content, different wording), trivial variations (changing one number), invalid expected outputs (the LLM hallucinates), or unclear success criteria. Human-in-the-loop: LLM generates candidates → human reviews/edits → approved tests enter dataset.

The variation generation strategy: The generate_variations method creates semantically equivalent but syntactically different tasks. Example: "Find restaurants in New York" → variants: "Show me places to eat in NYC", "What are the top restaurants in Manhattan", "I'm looking for dining options in New York City". This tests robustness to query phrasing—critical for production where users don't follow templates.

Tagging generated tests: Notice the ["generated"] tag added to variations. This enables analysis: do generated tasks have different passing rates than hand-written ones? If generated tasks are systematically harder, that signals a distribution shift problem—your hand-written tests aren't representative of production.

Python
class DatasetGenerator:
    """Generate evaluation tasks programmatically."""

    def __init__(self, llm):
        self.llm = llm

    def generate_variations(
        self,
        base_task: EvalTask,
        num_variations: int = 5
    ) -> list[EvalTask]:
        """Generate variations of a base task."""
        prompt = f"""Generate {num_variations} variations of this task.

Original task: {base_task.description}
Category: {base_task.category}
Difficulty: {base_task.difficulty}

Create variations that:
1. Test the same capability
2. Have similar difficulty
3. Use different specific values/entities
4. Are clearly distinct from each other

Format each as:
TASK: [description]
---"""

        response = self.llm.chat([{"role": "user", "content": prompt}])

        variations = []
        for i, task_text in enumerate(response.content.split("---")):
            if "TASK:" in task_text:
                description = task_text.split("TASK:")[1].strip()
                variations.append(EvalTask(
                    id=f"{base_task.id}-var{i+1}",
                    description=description,
                    category=base_task.category,
                    difficulty=base_task.difficulty,
                    expected_tools=base_task.expected_tools,
                    tags=base_task.tags + ["generated"]
                ))

        return variations

    def generate_adversarial_tasks(
        self,
        base_task: EvalTask,
        num_tasks: int = 3
    ) -> list[EvalTask]:
        """Generate adversarial versions of a task."""
        prompt = f"""Generate {num_tasks} adversarial versions of this task that might trip up an AI agent.

Original task: {base_task.description}

Create versions that:
1. Have ambiguous requirements
2. Include misleading context
3. Require careful interpretation
4. Test edge cases

Format each as:
TASK: [description]
TRAP: [what makes this adversarial]
---"""

        response = self.llm.chat([{"role": "user", "content": prompt}])

        adversarial = []
        for i, section in enumerate(response.content.split("---")):
            if "TASK:" in section and "TRAP:" in section:
                task_text = section.split("TASK:")[1].split("TRAP:")[0].strip()
                trap_text = section.split("TRAP:")[1].strip()

                adversarial.append(EvalTask(
                    id=f"{base_task.id}-adv{i+1}",
                    description=task_text,
                    category=base_task.category,
                    difficulty="hard",
                    expected_tools=base_task.expected_tools,
                    tags=base_task.tags + ["adversarial"],
                    metadata={"trap": trap_text}
                ))

        return adversarial

Trajectory Analysis

Understanding how an agent arrived at its answer is as important as the answer itself.

Trajectory Capture

Python
from dataclasses import dataclass, field
from typing import Optional, Any
from datetime import datetime
import json

@dataclass
class TrajectoryStep:
    step_number: int
    timestamp: datetime
    step_type: str  # "reasoning", "tool_call", "tool_result", "answer"
    content: str
    metadata: dict = field(default_factory=dict)

@dataclass
class Trajectory:
    task_id: str
    task_description: str
    steps: list[TrajectoryStep] = field(default_factory=list)
    final_answer: Optional[str] = None
    outcome: Optional[TaskOutcome] = None
    total_tokens: int = 0
    start_time: Optional[datetime] = None
    end_time: Optional[datetime] = None

    def add_reasoning(self, content: str, metadata: dict = None):
        """Add a reasoning step."""
        self.steps.append(TrajectoryStep(
            step_number=len(self.steps) + 1,
            timestamp=datetime.now(),
            step_type="reasoning",
            content=content,
            metadata=metadata or {}
        ))

    def add_tool_call(self, tool_name: str, arguments: dict, metadata: dict = None):
        """Add a tool call step."""
        self.steps.append(TrajectoryStep(
            step_number=len(self.steps) + 1,
            timestamp=datetime.now(),
            step_type="tool_call",
            content=json.dumps({"tool": tool_name, "arguments": arguments}),
            metadata=metadata or {}
        ))

    def add_tool_result(self, tool_name: str, result: str, success: bool, metadata: dict = None):
        """Add a tool result step."""
        self.steps.append(TrajectoryStep(
            step_number=len(self.steps) + 1,
            timestamp=datetime.now(),
            step_type="tool_result",
            content=json.dumps({"tool": tool_name, "result": result[:1000], "success": success}),
            metadata=metadata or {}
        ))

    def set_answer(self, answer: str, outcome: TaskOutcome):
        """Set the final answer."""
        self.final_answer = answer
        self.outcome = outcome
        self.end_time = datetime.now()
        self.steps.append(TrajectoryStep(
            step_number=len(self.steps) + 1,
            timestamp=datetime.now(),
            step_type="answer",
            content=answer
        ))

    def to_dict(self) -> dict:
        """Convert to dictionary for serialization."""
        return {
            "task_id": self.task_id,
            "task_description": self.task_description,
            "steps": [
                {
                    "step_number": s.step_number,
                    "timestamp": s.timestamp.isoformat(),
                    "step_type": s.step_type,
                    "content": s.content,
                    "metadata": s.metadata
                }
                for s in self.steps
            ],
            "final_answer": self.final_answer,
            "outcome": self.outcome.value if self.outcome else None,
            "total_tokens": self.total_tokens,
            "duration_ms": (self.end_time - self.start_time).total_seconds() * 1000 if self.end_time and self.start_time else None
        }

    def get_tool_calls(self) -> list[dict]:
        """Extract tool calls from trajectory."""
        return [
            json.loads(s.content)
            for s in self.steps
            if s.step_type == "tool_call"
        ]

    def get_reasoning_steps(self) -> list[str]:
        """Extract reasoning steps."""
        return [
            s.content
            for s in self.steps
            if s.step_type == "reasoning"
        ]

Trajectory Analyzer

Python
class TrajectoryAnalyzer:
    """Analyze agent trajectories for patterns and issues."""

    def __init__(self, judge_llm=None):
        self.judge_llm = judge_llm

    def analyze(self, trajectory: Trajectory) -> dict:
        """Comprehensive trajectory analysis."""
        analysis = {
            "summary": self._generate_summary(trajectory),
            "patterns": self._identify_patterns(trajectory),
            "issues": self._identify_issues(trajectory),
            "metrics": self._compute_metrics(trajectory),
            "recommendations": self._generate_recommendations(trajectory)
        }

        return analysis

    def _generate_summary(self, trajectory: Trajectory) -> dict:
        """Generate trajectory summary."""
        tool_calls = trajectory.get_tool_calls()
        reasoning = trajectory.get_reasoning_steps()

        return {
            "total_steps": len(trajectory.steps),
            "reasoning_steps": len(reasoning),
            "tool_calls": len(tool_calls),
            "unique_tools": list(set(tc["tool"] for tc in tool_calls)),
            "outcome": trajectory.outcome.value if trajectory.outcome else "unknown",
            "duration_ms": (trajectory.end_time - trajectory.start_time).total_seconds() * 1000 if trajectory.end_time and trajectory.start_time else None
        }

    def _identify_patterns(self, trajectory: Trajectory) -> list[dict]:
        """Identify behavioral patterns in trajectory."""
        patterns = []
        tool_calls = trajectory.get_tool_calls()

        # Pattern: Search-then-verify
        if len(tool_calls) >= 2:
            for i in range(len(tool_calls) - 1):
                if tool_calls[i]["tool"] == "web_search" and tool_calls[i+1]["tool"] == "web_search":
                    patterns.append({
                        "type": "verification_search",
                        "description": "Agent performed follow-up search to verify information"
                    })

        # Pattern: Error retry
        tool_results = [s for s in trajectory.steps if s.step_type == "tool_result"]
        for i, result in enumerate(tool_results[:-1]):
            result_data = json.loads(result.content)
            if not result_data.get("success"):
                next_result = json.loads(tool_results[i+1].content)
                if result_data.get("tool") == next_result.get("tool"):
                    patterns.append({
                        "type": "error_retry",
                        "description": f"Agent retried {result_data['tool']} after failure"
                    })

        # Pattern: Tool chain
        if len(tool_calls) >= 3:
            tool_sequence = [tc["tool"] for tc in tool_calls]
            patterns.append({
                "type": "tool_chain",
                "description": f"Tool sequence: {' -> '.join(tool_sequence)}"
            })

        return patterns

    def _identify_issues(self, trajectory: Trajectory) -> list[dict]:
        """Identify issues in trajectory."""
        issues = []
        tool_calls = trajectory.get_tool_calls()
        reasoning = trajectory.get_reasoning_steps()

        # Issue: Excessive steps
        if len(trajectory.steps) > 20:
            issues.append({
                "type": "excessive_steps",
                "severity": "warning",
                "description": f"Trajectory has {len(trajectory.steps)} steps, which may indicate inefficiency"
            })

        # Issue: Repeated tool calls
        tool_signatures = [json.dumps(tc, sort_keys=True) for tc in tool_calls]
        repeated = len(tool_signatures) - len(set(tool_signatures))
        if repeated > 0:
            issues.append({
                "type": "repeated_calls",
                "severity": "warning",
                "description": f"Found {repeated} repeated tool calls with identical arguments"
            })

        # Issue: No tool use
        if len(tool_calls) == 0 and trajectory.outcome != TaskOutcome.SUCCESS:
            issues.append({
                "type": "no_tools_used",
                "severity": "error",
                "description": "Agent failed without attempting any tool use"
            })

        # Issue: Ignored errors
        tool_results = [s for s in trajectory.steps if s.step_type == "tool_result"]
        failed_results = [json.loads(r.content) for r in tool_results if not json.loads(r.content).get("success")]
        if failed_results:
            reasoning_text = " ".join(reasoning).lower()
            if "error" not in reasoning_text and "fail" not in reasoning_text:
                issues.append({
                    "type": "ignored_errors",
                    "severity": "error",
                    "description": f"Agent ignored {len(failed_results)} tool failures in reasoning"
                })

        return issues

    def _compute_metrics(self, trajectory: Trajectory) -> dict:
        """Compute trajectory metrics."""
        tool_calls = trajectory.get_tool_calls()
        tool_results = [s for s in trajectory.steps if s.step_type == "tool_result"]

        successful_calls = sum(
            1 for r in tool_results
            if json.loads(r.content).get("success")
        )

        return {
            "steps_per_tool_call": len(trajectory.steps) / max(len(tool_calls), 1),
            "tool_success_rate": successful_calls / max(len(tool_results), 1),
            "reasoning_density": len(trajectory.get_reasoning_steps()) / max(len(trajectory.steps), 1),
            "tokens_per_step": trajectory.total_tokens / max(len(trajectory.steps), 1)
        }

    def _generate_recommendations(self, trajectory: Trajectory) -> list[str]:
        """Generate improvement recommendations."""
        recommendations = []
        issues = self._identify_issues(trajectory)
        metrics = self._compute_metrics(trajectory)

        if any(i["type"] == "excessive_steps" for i in issues):
            recommendations.append("Consider adding planning phase to reduce step count")

        if any(i["type"] == "repeated_calls" for i in issues):
            recommendations.append("Implement caching or state tracking to avoid redundant tool calls")

        if metrics["tool_success_rate"] < 0.7:
            recommendations.append("Improve tool error handling and retry logic")

        if metrics["reasoning_density"] < 0.2:
            recommendations.append("Agent may benefit from more explicit reasoning steps")

        return recommendations

    def compare_trajectories(
        self,
        trajectory_a: Trajectory,
        trajectory_b: Trajectory
    ) -> dict:
        """Compare two trajectories for the same task."""
        metrics_a = self._compute_metrics(trajectory_a)
        metrics_b = self._compute_metrics(trajectory_b)

        return {
            "step_count": {
                "a": len(trajectory_a.steps),
                "b": len(trajectory_b.steps),
                "winner": "a" if len(trajectory_a.steps) < len(trajectory_b.steps) else "b"
            },
            "tool_success_rate": {
                "a": metrics_a["tool_success_rate"],
                "b": metrics_b["tool_success_rate"],
                "winner": "a" if metrics_a["tool_success_rate"] > metrics_b["tool_success_rate"] else "b"
            },
            "outcome": {
                "a": trajectory_a.outcome.value if trajectory_a.outcome else "unknown",
                "b": trajectory_b.outcome.value if trajectory_b.outcome else "unknown"
            }
        }

Sandboxed Execution

Safe evaluation requires isolating agent actions from production systems.

Docker-Based Sandbox

Python
import docker
import tempfile
import os
from typing import Optional

class DockerSandbox:
    """Docker-based sandbox for safe agent evaluation."""

    def __init__(
        self,
        image: str = "python:3.11-slim",
        memory_limit: str = "512m",
        cpu_limit: float = 1.0,
        network_mode: str = "none",
        timeout: int = 120
    ):
        self.image = image
        self.memory_limit = memory_limit
        self.cpu_limit = cpu_limit
        self.network_mode = network_mode
        self.timeout = timeout
        self.client = docker.from_env()

    def run_agent(
        self,
        agent_code: str,
        task: str,
        tools_config: dict,
        environment: Optional[dict] = None
    ) -> dict:
        """Run agent in sandbox."""
        with tempfile.TemporaryDirectory() as tmpdir:
            # Write agent code
            agent_path = os.path.join(tmpdir, "agent.py")
            with open(agent_path, "w") as f:
                f.write(agent_code)

            # Write task
            task_path = os.path.join(tmpdir, "task.json")
            with open(task_path, "w") as f:
                json.dump({"task": task, "tools": tools_config}, f)

            # Write runner script
            runner_path = os.path.join(tmpdir, "run.py")
            with open(runner_path, "w") as f:
                f.write(self._generate_runner())

            try:
                container = self.client.containers.run(
                    self.image,
                    command=["python", "/workspace/run.py"],
                    volumes={tmpdir: {"bind": "/workspace", "mode": "rw"}},
                    mem_limit=self.memory_limit,
                    cpu_quota=int(self.cpu_limit * 100000),
                    network_mode=self.network_mode,
                    environment=environment or {},
                    detach=True,
                    remove=False
                )

                # Wait for completion with timeout
                result = container.wait(timeout=self.timeout)

                # Get logs
                logs = container.logs().decode("utf-8")

                # Read output
                output_path = os.path.join(tmpdir, "output.json")
                if os.path.exists(output_path):
                    with open(output_path) as f:
                        output = json.load(f)
                else:
                    output = {"error": "No output file generated"}

                return {
                    "exit_code": result["StatusCode"],
                    "logs": logs,
                    "output": output
                }

            except docker.errors.ContainerError as e:
                return {"error": f"Container error: {e}"}
            except Exception as e:
                return {"error": f"Sandbox error: {e}"}
            finally:
                try:
                    container.remove(force=True)
                except:
                    pass

    def _generate_runner(self) -> str:
        """Generate the runner script."""
        return '''
import json
import sys
import traceback

def main():
    # Load task
    with open("/workspace/task.json") as f:
        config = json.load(f)

    task = config["task"]
    tools_config = config["tools"]

    # Import agent
    sys.path.insert(0, "/workspace")
    from agent import Agent

    # Create mock tools
    tools = create_mock_tools(tools_config)

    # Run agent
    agent = Agent(tools=tools)

    try:
        result = agent.run(task)
        output = {
            "success": True,
            "result": result,
            "trajectory": agent.get_trajectory() if hasattr(agent, "get_trajectory") else []
        }
    except Exception as e:
        output = {
            "success": False,
            "error": str(e),
            "traceback": traceback.format_exc()
        }

    # Write output
    with open("/workspace/output.json", "w") as f:
        json.dump(output, f)

def create_mock_tools(config):
    """Create mock tools based on config."""
    tools = {}
    for name, spec in config.items():
        tools[name] = MockTool(name, spec)
    return tools

class MockTool:
    def __init__(self, name, spec):
        self.name = name
        self.spec = spec

    def execute(self, **kwargs):
        # Return mock response or error based on spec
        if "mock_response" in self.spec:
            return self.spec["mock_response"]
        return {"mock": True, "tool": self.name, "args": kwargs}

if __name__ == "__main__":
    main()
'''

    def create_test_environment(
        self,
        database_fixture: Optional[str] = None,
        file_fixtures: Optional[dict] = None
    ) -> str:
        """Create a test environment with fixtures."""
        env_id = f"sandbox-{os.urandom(4).hex()}"

        # Create network for this environment
        network = self.client.networks.create(env_id, driver="bridge")

        # Optionally spin up database
        if database_fixture:
            db_container = self.client.containers.run(
                "postgres:15",
                name=f"{env_id}-db",
                environment={
                    "POSTGRES_PASSWORD": "test",
                    "POSTGRES_DB": "testdb"
                },
                network=env_id,
                detach=True
            )

            # Load fixture
            # ... load SQL fixture ...

        return env_id

    def cleanup_environment(self, env_id: str):
        """Clean up test environment."""
        # Remove containers
        for container in self.client.containers.list(filters={"name": env_id}):
            container.remove(force=True)

        # Remove network
        try:
            network = self.client.networks.get(env_id)
            network.remove()
        except:
            pass

Mock Tool System

Python
from dataclasses import dataclass
from typing import Callable, Any, Optional
import re

@dataclass
class MockResponse:
    content: Any
    success: bool = True
    latency_ms: float = 100

class MockToolSystem:
    """System for creating mock tools for evaluation."""

    def __init__(self):
        self.tools: dict[str, "MockTool"] = {}
        self.call_log: list[dict] = []

    def register_tool(
        self,
        name: str,
        response_generator: Callable[[dict], MockResponse],
        schema: Optional[dict] = None
    ):
        """Register a mock tool."""
        self.tools[name] = MockTool(
            name=name,
            response_generator=response_generator,
            schema=schema
        )

    def call_tool(self, name: str, arguments: dict) -> dict:
        """Call a mock tool."""
        if name not in self.tools:
            return {"error": f"Unknown tool: {name}"}

        tool = self.tools[name]
        response = tool.call(arguments)

        self.call_log.append({
            "tool": name,
            "arguments": arguments,
            "response": response,
            "timestamp": datetime.now().isoformat()
        })

        return response

    def get_call_log(self) -> list[dict]:
        """Get log of all tool calls."""
        return self.call_log

    def reset(self):
        """Reset call log."""
        self.call_log = []

class MockTool:
    """A mock tool for evaluation."""

    def __init__(
        self,
        name: str,
        response_generator: Callable[[dict], MockResponse],
        schema: Optional[dict] = None
    ):
        self.name = name
        self.response_generator = response_generator
        self.schema = schema

    def call(self, arguments: dict) -> dict:
        """Call the mock tool."""
        response = self.response_generator(arguments)
        return {
            "success": response.success,
            "content": response.content,
            "latency_ms": response.latency_ms
        }

# Pre-built mock generators
def create_web_search_mock(knowledge_base: dict) -> Callable:
    """Create a mock web search that uses a knowledge base."""
    def generator(args: dict) -> MockResponse:
        query = args.get("query", "").lower()

        # Search knowledge base
        results = []
        for topic, info in knowledge_base.items():
            if any(term in topic.lower() for term in query.split()):
                results.append({
                    "title": topic,
                    "snippet": info[:200],
                    "url": f"https://example.com/{topic.replace(' ', '-')}"
                })

        if results:
            return MockResponse(content={"results": results[:5]})
        else:
            return MockResponse(content={"results": [], "message": "No results found"})

    return generator

def create_database_mock(tables: dict) -> Callable:
    """Create a mock database that responds to SQL queries."""
    def generator(args: dict) -> MockResponse:
        query = args.get("query", "").upper()

        # Very simple SQL parsing
        if "SELECT" in query:
            # Extract table name
            match = re.search(r"FROM\s+(\w+)", query, re.IGNORECASE)
            if match:
                table = match.group(1).lower()
                if table in tables:
                    return MockResponse(content={"rows": tables[table][:100]})
                else:
                    return MockResponse(
                        content={"error": f"Table '{table}' not found"},
                        success=False
                    )

        return MockResponse(
            content={"error": "Only SELECT queries supported in mock"},
            success=False
        )

    return generator

# Usage example
mock_system = MockToolSystem()

mock_system.register_tool(
    "web_search",
    create_web_search_mock({
        "Tokyo population": "Tokyo has a population of approximately 14 million people in the city proper.",
        "Python programming": "Python is a high-level programming language known for its simplicity.",
    })
)

mock_system.register_tool(
    "query_database",
    create_database_mock({
        "users": [
            {"id": 1, "name": "Alice", "email": "alice@example.com"},
            {"id": 2, "name": "Bob", "email": "bob@example.com"},
        ],
        "orders": [
            {"id": 1, "user_id": 1, "total": 99.99},
            {"id": 2, "user_id": 2, "total": 149.99},
        ]
    })
)

Evaluation Pipeline

Complete Evaluation Runner

Python
import asyncio
from dataclasses import dataclass
from typing import Optional
import json
from datetime import datetime

@dataclass
class EvaluationConfig:
    dataset: EvalDataset
    agent_factory: Callable[[], Any]  # Factory to create agent instances
    mock_tools: MockToolSystem
    judge_llm: Any
    max_concurrent: int = 5
    timeout_seconds: int = 120
    save_trajectories: bool = True
    output_dir: str = "./eval_results"

@dataclass
class EvaluationRun:
    run_id: str
    config_name: str
    start_time: datetime
    end_time: Optional[datetime] = None
    results: list[dict] = None
    summary: dict = None

class EvaluationPipeline:
    """Complete evaluation pipeline for agents."""

    def __init__(self, config: EvaluationConfig):
        self.config = config
        self.task_evaluator = TaskEvaluator(config.judge_llm)
        self.tool_evaluator = ToolUseEvaluator(config.judge_llm)
        self.reasoning_evaluator = ReasoningEvaluator(config.judge_llm)
        self.trajectory_analyzer = TrajectoryAnalyzer(config.judge_llm)

    async def run(self) -> EvaluationRun:
        """Run complete evaluation."""
        run_id = f"eval-{datetime.now().strftime('%Y%m%d-%H%M%S')}"
        run = EvaluationRun(
            run_id=run_id,
            config_name=self.config.dataset.name,
            start_time=datetime.now(),
            results=[]
        )

        # Create output directory
        os.makedirs(self.config.output_dir, exist_ok=True)

        # Run evaluations with concurrency limit
        semaphore = asyncio.Semaphore(self.config.max_concurrent)

        async def evaluate_task(task: EvalTask):
            async with semaphore:
                return await self._evaluate_single_task(task)

        tasks = [evaluate_task(task) for task in self.config.dataset.tasks]
        results = await asyncio.gather(*tasks, return_exceptions=True)

        # Process results
        for task, result in zip(self.config.dataset.tasks, results):
            if isinstance(result, Exception):
                run.results.append({
                    "task_id": task.id,
                    "error": str(result),
                    "outcome": "error"
                })
            else:
                run.results.append(result)

        run.end_time = datetime.now()
        run.summary = self._compute_summary(run.results)

        # Save results
        self._save_results(run)

        return run

    async def _evaluate_single_task(self, task: EvalTask) -> dict:
        """Evaluate a single task."""
        # Create fresh agent instance
        agent = self.config.agent_factory()

        # Reset mock tools
        self.config.mock_tools.reset()

        # Create trajectory tracker
        trajectory = Trajectory(
            task_id=task.id,
            task_description=task.description,
            start_time=datetime.now()
        )

        # Run agent with timeout
        try:
            async with asyncio.timeout(task.timeout_seconds):
                result = await self._run_agent_with_tracking(
                    agent, task, trajectory
                )
        except asyncio.TimeoutError:
            trajectory.set_answer("", TaskOutcome.TIMEOUT)
            result = TaskResult(
                task_id=task.id,
                outcome=TaskOutcome.TIMEOUT,
                final_answer=None,
                expected_answer=task.expected_output,
                steps_taken=len(trajectory.steps),
                total_tokens=trajectory.total_tokens,
                latency_ms=(datetime.now() - trajectory.start_time).total_seconds() * 1000,
                tool_calls=trajectory.get_tool_calls(),
                errors=["Task timed out"]
            )

        # Evaluate result
        task_eval = self.task_evaluator.evaluate(
            task.description,
            result,
            ground_truth=task.expected_output,
            success_criteria=self._parse_success_criteria(task.success_criteria)
        )

        # Evaluate tool use
        tool_eval = self.tool_evaluator.evaluate(
            task.description,
            [ToolCall(
                tool_name=tc["tool"],
                arguments=tc["arguments"],
                result="",
                timestamp=0,
                success=True
            ) for tc in trajectory.get_tool_calls()],
            expected_tools=task.expected_tools
        )

        # Evaluate reasoning
        reasoning_eval = self.reasoning_evaluator.evaluate(
            task.description,
            trajectory.get_reasoning_steps(),
            [ToolCall(
                tool_name=tc["tool"],
                arguments=tc["arguments"],
                result="",
                timestamp=0,
                success=True
            ) for tc in trajectory.get_tool_calls()],
            result.final_answer or ""
        )

        # Analyze trajectory
        trajectory_analysis = self.trajectory_analyzer.analyze(trajectory)

        return {
            "task_id": task.id,
            "task_description": task.description,
            "category": task.category,
            "difficulty": task.difficulty,
            "outcome": result.outcome.value,
            "task_evaluation": task_eval,
            "tool_evaluation": {
                "total_calls": tool_eval.total_calls,
                "successful_calls": tool_eval.successful_calls,
                "selection_score": tool_eval.tool_selection_score
            },
            "reasoning_evaluation": {
                "coherence": reasoning_eval.coherence_score,
                "relevance": reasoning_eval.relevance_score,
                "efficiency": reasoning_eval.efficiency_score,
                "overall": reasoning_eval.overall_score
            },
            "trajectory_analysis": trajectory_analysis,
            "trajectory": trajectory.to_dict() if self.config.save_trajectories else None
        }

    async def _run_agent_with_tracking(
        self,
        agent,
        task: EvalTask,
        trajectory: Trajectory
    ) -> TaskResult:
        """Run agent and track trajectory."""
        # Hook into agent to capture trajectory
        # This depends on your agent implementation

        try:
            final_answer = await agent.run(task.description)
            outcome = TaskOutcome.SUCCESS
        except Exception as e:
            final_answer = str(e)
            outcome = TaskOutcome.ERROR
            trajectory.steps.append(TrajectoryStep(
                step_number=len(trajectory.steps) + 1,
                timestamp=datetime.now(),
                step_type="error",
                content=str(e)
            ))

        trajectory.set_answer(final_answer, outcome)

        return TaskResult(
            task_id=task.id,
            outcome=outcome,
            final_answer=final_answer,
            expected_answer=task.expected_output,
            steps_taken=len(trajectory.steps),
            total_tokens=trajectory.total_tokens,
            latency_ms=(trajectory.end_time - trajectory.start_time).total_seconds() * 1000,
            tool_calls=trajectory.get_tool_calls(),
            errors=[]
        )

    def _parse_success_criteria(self, criteria: Optional[str]) -> Optional[Callable]:
        """Parse success criteria string into function."""
        if not criteria:
            return None

        def evaluator(result: str) -> bool:
            try:
                # Safe evaluation of criteria
                return eval(criteria, {"result": result, "len": len, "str": str})
            except:
                return False

        return evaluator

    def _compute_summary(self, results: list[dict]) -> dict:
        """Compute summary statistics."""
        total = len(results)
        outcomes = [r.get("outcome", "error") for r in results]

        by_category = {}
        by_difficulty = {}

        for r in results:
            cat = r.get("category", "unknown")
            diff = r.get("difficulty", "unknown")

            if cat not in by_category:
                by_category[cat] = {"total": 0, "success": 0}
            by_category[cat]["total"] += 1
            if r.get("outcome") == "success":
                by_category[cat]["success"] += 1

            if diff not in by_difficulty:
                by_difficulty[diff] = {"total": 0, "success": 0}
            by_difficulty[diff]["total"] += 1
            if r.get("outcome") == "success":
                by_difficulty[diff]["success"] += 1

        return {
            "total_tasks": total,
            "success_count": outcomes.count("success"),
            "failure_count": outcomes.count("failure"),
            "error_count": outcomes.count("error"),
            "timeout_count": outcomes.count("timeout"),
            "success_rate": outcomes.count("success") / total if total > 0 else 0,
            "by_category": {
                k: {"success_rate": v["success"] / v["total"] if v["total"] > 0 else 0, **v}
                for k, v in by_category.items()
            },
            "by_difficulty": {
                k: {"success_rate": v["success"] / v["total"] if v["total"] > 0 else 0, **v}
                for k, v in by_difficulty.items()
            },
            "avg_reasoning_score": sum(
                r.get("reasoning_evaluation", {}).get("overall", 0)
                for r in results
            ) / total if total > 0 else 0,
            "avg_tool_selection_score": sum(
                r.get("tool_evaluation", {}).get("selection_score", 0)
                for r in results
            ) / total if total > 0 else 0
        }

    def _save_results(self, run: EvaluationRun):
        """Save evaluation results."""
        output_path = os.path.join(
            self.config.output_dir,
            f"{run.run_id}.json"
        )

        with open(output_path, "w") as f:
            json.dump({
                "run_id": run.run_id,
                "config_name": run.config_name,
                "start_time": run.start_time.isoformat(),
                "end_time": run.end_time.isoformat() if run.end_time else None,
                "summary": run.summary,
                "results": run.results
            }, f, indent=2)

        print(f"Results saved to {output_path}")

Production Monitoring

Real-Time Agent Monitoring

Python
from dataclasses import dataclass
from typing import Optional
import time
from collections import deque
import threading

@dataclass
class AgentMetrics:
    timestamp: float
    task_id: str
    outcome: str
    latency_ms: float
    tokens: int
    tool_calls: int
    errors: int

class AgentMonitor:
    """Real-time monitoring for production agents."""

    def __init__(
        self,
        window_size: int = 1000,
        alert_threshold_error_rate: float = 0.1,
        alert_threshold_latency_ms: float = 5000
    ):
        self.metrics: deque[AgentMetrics] = deque(maxlen=window_size)
        self.alert_callbacks: list[Callable] = []
        self.error_threshold = alert_threshold_error_rate
        self.latency_threshold = alert_threshold_latency_ms
        self._lock = threading.Lock()

    def record(self, metrics: AgentMetrics):
        """Record agent metrics."""
        with self._lock:
            self.metrics.append(metrics)

        # Check for alerts
        self._check_alerts()

    def on_alert(self, callback: Callable[[str, dict], None]):
        """Register alert callback."""
        self.alert_callbacks.append(callback)

    def _check_alerts(self):
        """Check if any alert thresholds are exceeded."""
        if len(self.metrics) < 10:
            return

        recent = list(self.metrics)[-100:]

        # Error rate alert
        error_count = sum(1 for m in recent if m.outcome == "error")
        error_rate = error_count / len(recent)

        if error_rate > self.error_threshold:
            self._trigger_alert("high_error_rate", {
                "error_rate": error_rate,
                "threshold": self.error_threshold,
                "sample_size": len(recent)
            })

        # Latency alert
        avg_latency = sum(m.latency_ms for m in recent) / len(recent)

        if avg_latency > self.latency_threshold:
            self._trigger_alert("high_latency", {
                "avg_latency_ms": avg_latency,
                "threshold": self.latency_threshold,
                "sample_size": len(recent)
            })

    def _trigger_alert(self, alert_type: str, details: dict):
        """Trigger alert callbacks."""
        for callback in self.alert_callbacks:
            try:
                callback(alert_type, details)
            except Exception as e:
                print(f"Alert callback error: {e}")

    def get_dashboard_data(self) -> dict:
        """Get data for monitoring dashboard."""
        with self._lock:
            recent = list(self.metrics)

        if not recent:
            return {"status": "no_data"}

        # Compute statistics
        outcomes = [m.outcome for m in recent]
        latencies = [m.latency_ms for m in recent]
        tokens = [m.tokens for m in recent]

        return {
            "total_requests": len(recent),
            "success_rate": outcomes.count("success") / len(outcomes),
            "error_rate": outcomes.count("error") / len(outcomes),
            "timeout_rate": outcomes.count("timeout") / len(outcomes),
            "latency": {
                "p50": sorted(latencies)[len(latencies) // 2],
                "p90": sorted(latencies)[int(len(latencies) * 0.9)],
                "p99": sorted(latencies)[int(len(latencies) * 0.99)],
                "avg": sum(latencies) / len(latencies)
            },
            "tokens": {
                "avg": sum(tokens) / len(tokens),
                "total": sum(tokens)
            },
            "time_range": {
                "start": recent[0].timestamp,
                "end": recent[-1].timestamp
            }
        }

# Prometheus metrics integration
try:
    from prometheus_client import Counter, Histogram, Gauge

    AGENT_REQUESTS = Counter(
        "agent_requests_total",
        "Total agent requests",
        ["outcome"]
    )

    AGENT_LATENCY = Histogram(
        "agent_latency_seconds",
        "Agent request latency",
        buckets=[0.1, 0.5, 1.0, 2.0, 5.0, 10.0, 30.0]
    )

    AGENT_TOKENS = Counter(
        "agent_tokens_total",
        "Total tokens used"
    )

    AGENT_TOOL_CALLS = Counter(
        "agent_tool_calls_total",
        "Total tool calls",
        ["tool_name"]
    )

    class PrometheusAgentMonitor(AgentMonitor):
        """Agent monitor with Prometheus metrics."""

        def record(self, metrics: AgentMetrics):
            super().record(metrics)

            # Update Prometheus metrics
            AGENT_REQUESTS.labels(outcome=metrics.outcome).inc()
            AGENT_LATENCY.observe(metrics.latency_ms / 1000)
            AGENT_TOKENS.inc(metrics.tokens)

except ImportError:
    PrometheusAgentMonitor = AgentMonitor

A/B Testing Framework

Python
import random
from dataclasses import dataclass
from typing import Optional
import hashlib

@dataclass
class Variant:
    name: str
    weight: float
    agent_factory: Callable

@dataclass
class ABTestConfig:
    test_id: str
    variants: list[Variant]
    start_time: datetime
    end_time: Optional[datetime] = None

class ABTestingFramework:
    """A/B testing framework for agents."""

    def __init__(self):
        self.active_tests: dict[str, ABTestConfig] = {}
        self.results: dict[str, list[dict]] = {}

    def create_test(
        self,
        test_id: str,
        variants: list[Variant],
        duration_hours: Optional[int] = None
    ):
        """Create a new A/B test."""
        # Normalize weights
        total_weight = sum(v.weight for v in variants)
        for v in variants:
            v.weight /= total_weight

        end_time = None
        if duration_hours:
            end_time = datetime.now() + timedelta(hours=duration_hours)

        self.active_tests[test_id] = ABTestConfig(
            test_id=test_id,
            variants=variants,
            start_time=datetime.now(),
            end_time=end_time
        )
        self.results[test_id] = []

    def get_variant(self, test_id: str, user_id: str) -> Variant:
        """Get variant for a user (deterministic assignment)."""
        if test_id not in self.active_tests:
            raise ValueError(f"Unknown test: {test_id}")

        test = self.active_tests[test_id]

        # Deterministic assignment based on user_id
        hash_input = f"{test_id}:{user_id}"
        hash_value = int(hashlib.md5(hash_input.encode()).hexdigest(), 16)
        normalized = (hash_value % 10000) / 10000

        cumulative = 0
        for variant in test.variants:
            cumulative += variant.weight
            if normalized < cumulative:
                return variant

        return test.variants[-1]  # Fallback

    def record_result(
        self,
        test_id: str,
        variant_name: str,
        metrics: dict
    ):
        """Record a result for an A/B test."""
        if test_id not in self.results:
            self.results[test_id] = []

        self.results[test_id].append({
            "variant": variant_name,
            "metrics": metrics,
            "timestamp": datetime.now().isoformat()
        })

    def analyze_test(self, test_id: str) -> dict:
        """Analyze A/B test results."""
        if test_id not in self.results:
            return {"error": "No results for test"}

        results = self.results[test_id]

        # Group by variant
        by_variant = {}
        for r in results:
            variant = r["variant"]
            if variant not in by_variant:
                by_variant[variant] = []
            by_variant[variant].append(r["metrics"])

        # Compute statistics for each variant
        analysis = {}
        for variant, metrics_list in by_variant.items():
            success_rate = sum(
                1 for m in metrics_list if m.get("outcome") == "success"
            ) / len(metrics_list)

            avg_latency = sum(
                m.get("latency_ms", 0) for m in metrics_list
            ) / len(metrics_list)

            analysis[variant] = {
                "sample_size": len(metrics_list),
                "success_rate": success_rate,
                "avg_latency_ms": avg_latency
            }

        # Statistical significance (simplified)
        if len(by_variant) == 2:
            variants = list(by_variant.keys())
            n1, n2 = len(by_variant[variants[0]]), len(by_variant[variants[1]])
            p1 = analysis[variants[0]]["success_rate"]
            p2 = analysis[variants[1]]["success_rate"]

            # Pooled proportion
            p_pool = (p1 * n1 + p2 * n2) / (n1 + n2)
            se = (p_pool * (1 - p_pool) * (1/n1 + 1/n2)) ** 0.5

            if se > 0:
                z_score = (p1 - p2) / se
                # Rough p-value estimate
                is_significant = abs(z_score) > 1.96  # 95% confidence

                analysis["comparison"] = {
                    "z_score": z_score,
                    "is_significant": is_significant,
                    "winner": variants[0] if p1 > p2 else variants[1] if p2 > p1 else "tie"
                }

        return analysis

CI/CD Integration

GitHub Actions for Agent Evaluation

YAML
# .github/workflows/agent-eval.yml
name: Agent Evaluation

on:
  pull_request:
    paths:
      - 'agent/**'
      - 'prompts/**'
      - 'tools/**'
  schedule:
    - cron: '0 2 * * *'  # Nightly full eval

jobs:
  quick-eval:
    name: Quick Evaluation (PR Check)
    runs-on: ubuntu-latest
    if: github.event_name == 'pull_request'

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: |
          pip install -r requirements.txt
          pip install pytest pytest-json-report

      - name: Run quick evaluation
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python -m pytest tests/eval/quick/ \
            --json-report \
            --json-report-file=eval-results.json

      - name: Check evaluation thresholds
        run: |
          python scripts/check_eval_thresholds.py eval-results.json \
            --min-success-rate 0.85 \
            --max-avg-latency 5000

      - name: Upload evaluation report
        uses: actions/upload-artifact@v4
        with:
          name: eval-report
          path: eval-results.json

  full-eval:
    name: Full Evaluation (Nightly)
    runs-on: ubuntu-latest
    if: github.event_name == 'schedule'

    steps:
      - uses: actions/checkout@v4

      - name: Set up Python
        uses: actions/setup-python@v5
        with:
          python-version: '3.11'

      - name: Install dependencies
        run: pip install -r requirements.txt

      - name: Run full evaluation
        env:
          OPENAI_API_KEY: ${{ secrets.OPENAI_API_KEY }}
        run: |
          python scripts/run_full_eval.py \
            --dataset eval/full_dataset.yaml \
            --output-dir results/ \
            --parallel 5

      - name: Generate report
        run: python scripts/generate_eval_report.py results/

      - name: Upload to dashboard
        run: |
          python scripts/upload_to_dashboard.py results/ \
            --dashboard-url ${{ secrets.DASHBOARD_URL }}

      - name: Notify on regression
        if: failure()
        uses: slackapi/slack-github-action@v1
        with:
          payload: |
            {
              "text": "Agent evaluation regression detected",
              "blocks": [
                {
                  "type": "section",
                  "text": {
                    "type": "mrkdwn",
                    "text": "*Agent Evaluation Failed*\nCheck the <${{ github.server_url }}/${{ github.repository }}/actions/runs/${{ github.run_id }}|workflow run> for details."
                  }
                }
              ]
            }
        env:
          SLACK_WEBHOOK_URL: ${{ secrets.SLACK_WEBHOOK }}

Threshold Checker Script

Python
# scripts/check_eval_thresholds.py
import json
import sys
import argparse

def check_thresholds(results_file: str, min_success_rate: float, max_avg_latency: float):
    """Check evaluation results against thresholds."""

    with open(results_file) as f:
        results = json.load(f)

    # Calculate metrics
    tests = results.get("tests", [])
    passed = sum(1 for t in tests if t.get("outcome") == "passed")
    total = len(tests)
    success_rate = passed / total if total > 0 else 0

    latencies = [
        t.get("metadata", {}).get("latency_ms", 0)
        for t in tests
    ]
    avg_latency = sum(latencies) / len(latencies) if latencies else 0

    print(f"Success rate: {success_rate:.2%} (threshold: {min_success_rate:.2%})")
    print(f"Avg latency: {avg_latency:.0f}ms (threshold: {max_avg_latency:.0f}ms)")

    # Check thresholds
    failures = []

    if success_rate < min_success_rate:
        failures.append(
            f"Success rate {success_rate:.2%} below threshold {min_success_rate:.2%}"
        )

    if avg_latency > max_avg_latency:
        failures.append(
            f"Avg latency {avg_latency:.0f}ms above threshold {max_avg_latency:.0f}ms"
        )

    if failures:
        print("\nThreshold violations:")
        for f in failures:
            print(f"  - {f}")
        sys.exit(1)

    print("\nAll thresholds passed!")

if __name__ == "__main__":
    parser = argparse.ArgumentParser()
    parser.add_argument("results_file")
    parser.add_argument("--min-success-rate", type=float, default=0.85)
    parser.add_argument("--max-avg-latency", type=float, default=5000)
    args = parser.parse_args()

    check_thresholds(args.results_file, args.min_success_rate, args.max_avg_latency)

Regression Testing Framework

Python
from dataclasses import dataclass
from typing import Optional
import json
from datetime import datetime

@dataclass
class RegressionTest:
    task_id: str
    expected_outcome: str
    expected_tools: list[str]
    max_steps: int
    max_latency_ms: float
    baseline_success_rate: float

class RegressionTestSuite:
    """Track regressions against baseline performance."""

    def __init__(self, baseline_file: str):
        self.baseline = self._load_baseline(baseline_file)
        self.current_results = []

    def _load_baseline(self, path: str) -> dict:
        """Load baseline metrics."""
        with open(path) as f:
            return json.load(f)

    def add_result(self, task_id: str, result: dict):
        """Add a test result."""
        self.current_results.append({
            "task_id": task_id,
            "result": result,
            "timestamp": datetime.now().isoformat()
        })

    def check_regressions(self) -> dict:
        """Compare current results against baseline."""
        regressions = []
        improvements = []

        for result in self.current_results:
            task_id = result["task_id"]
            baseline = self.baseline.get(task_id)

            if not baseline:
                continue

            current = result["result"]

            # Check success rate regression
            baseline_success = baseline.get("success_rate", 0)
            current_success = 1.0 if current.get("outcome") == "success" else 0.0

            if current_success < baseline_success - 0.1:  # 10% tolerance
                regressions.append({
                    "task_id": task_id,
                    "metric": "success_rate",
                    "baseline": baseline_success,
                    "current": current_success
                })
            elif current_success > baseline_success + 0.1:
                improvements.append({
                    "task_id": task_id,
                    "metric": "success_rate",
                    "baseline": baseline_success,
                    "current": current_success
                })

            # Check latency regression
            baseline_latency = baseline.get("avg_latency_ms", 0)
            current_latency = current.get("latency_ms", 0)

            if current_latency > baseline_latency * 1.5:  # 50% tolerance
                regressions.append({
                    "task_id": task_id,
                    "metric": "latency_ms",
                    "baseline": baseline_latency,
                    "current": current_latency
                })

        return {
            "regressions": regressions,
            "improvements": improvements,
            "total_tests": len(self.current_results),
            "regression_count": len(regressions),
            "improvement_count": len(improvements)
        }

    def update_baseline(self, output_path: str):
        """Update baseline with current results."""
        new_baseline = {}

        for result in self.current_results:
            task_id = result["task_id"]
            current = result["result"]

            new_baseline[task_id] = {
                "success_rate": 1.0 if current.get("outcome") == "success" else 0.0,
                "avg_latency_ms": current.get("latency_ms", 0),
                "updated_at": datetime.now().isoformat()
            }

        with open(output_path, "w") as f:
            json.dump(new_baseline, f, indent=2)

Human Evaluation Framework

Structured Human Review

Python
from dataclasses import dataclass, field
from typing import Optional, Literal
from datetime import datetime
import uuid

@dataclass
class HumanEvaluation:
    eval_id: str
    task_id: str
    evaluator_id: str
    timestamp: datetime

    # Core ratings (1-5 scale)
    task_completion: int  # Did the agent complete the task?
    answer_quality: int   # How good was the final answer?
    reasoning_quality: int  # How good was the reasoning?
    efficiency: int       # Was the approach efficient?

    # Binary flags
    factually_correct: bool
    used_appropriate_tools: bool
    handled_errors_well: bool
    would_trust_in_production: bool

    # Open feedback
    strengths: list[str] = field(default_factory=list)
    weaknesses: list[str] = field(default_factory=list)
    suggestions: str = ""

    # Metadata
    time_to_evaluate_seconds: int = 0

class HumanEvaluationManager:
    """Manage human evaluation workflow."""

    def __init__(self, storage_backend):
        self.storage = storage_backend
        self.active_reviews = {}

    def create_review_batch(
        self,
        trajectory_ids: list[str],
        evaluator_ids: list[str],
        reviews_per_trajectory: int = 2
    ) -> str:
        """Create a batch of reviews to be completed."""
        batch_id = str(uuid.uuid4())

        assignments = []
        for traj_id in trajectory_ids:
            # Assign multiple evaluators per trajectory
            assigned_evaluators = evaluator_ids[:reviews_per_trajectory]
            for eval_id in assigned_evaluators:
                assignments.append({
                    "trajectory_id": traj_id,
                    "evaluator_id": eval_id,
                    "status": "pending"
                })

        self.storage.create_batch(batch_id, assignments)
        return batch_id

    def get_next_review(self, evaluator_id: str) -> Optional[dict]:
        """Get the next trajectory for an evaluator to review."""
        pending = self.storage.get_pending_reviews(evaluator_id)
        if not pending:
            return None

        assignment = pending[0]
        trajectory = self.storage.get_trajectory(assignment["trajectory_id"])

        return {
            "assignment_id": assignment["id"],
            "trajectory": trajectory,
            "task": trajectory["task"],
            "steps": trajectory["steps"],
            "final_answer": trajectory["final_answer"]
        }

    def submit_review(
        self,
        assignment_id: str,
        evaluation: HumanEvaluation
    ):
        """Submit a completed review."""
        self.storage.save_evaluation(assignment_id, evaluation)
        self.storage.mark_complete(assignment_id)

    def compute_inter_rater_reliability(self, batch_id: str) -> dict:
        """Compute agreement between evaluators."""
        evaluations = self.storage.get_batch_evaluations(batch_id)

        # Group by trajectory
        by_trajectory = {}
        for eval in evaluations:
            traj_id = eval["trajectory_id"]
            if traj_id not in by_trajectory:
                by_trajectory[traj_id] = []
            by_trajectory[traj_id].append(eval)

        # Calculate agreement metrics
        agreements = {
            "task_completion": [],
            "answer_quality": [],
            "factually_correct": []
        }

        for traj_id, evals in by_trajectory.items():
            if len(evals) < 2:
                continue

            # Pairwise agreement
            for i, eval1 in enumerate(evals):
                for eval2 in evals[i+1:]:
                    # Task completion agreement (within 1 point)
                    tc_agree = abs(eval1["task_completion"] - eval2["task_completion"]) <= 1
                    agreements["task_completion"].append(tc_agree)

                    # Answer quality agreement
                    aq_agree = abs(eval1["answer_quality"] - eval2["answer_quality"]) <= 1
                    agreements["answer_quality"].append(aq_agree)

                    # Factual correctness agreement (exact)
                    fc_agree = eval1["factually_correct"] == eval2["factually_correct"]
                    agreements["factually_correct"].append(fc_agree)

        return {
            metric: sum(values) / len(values) if values else 0
            for metric, values in agreements.items()
        }

    def generate_calibration_report(self, evaluator_id: str) -> dict:
        """Generate calibration report for an evaluator."""
        evaluator_evals = self.storage.get_evaluator_history(evaluator_id)
        all_evals = self.storage.get_all_evaluations()

        # Compare evaluator to population
        metrics = ["task_completion", "answer_quality", "reasoning_quality"]

        calibration = {}
        for metric in metrics:
            evaluator_avg = sum(e[metric] for e in evaluator_evals) / len(evaluator_evals)
            population_avg = sum(e[metric] for e in all_evals) / len(all_evals)

            calibration[metric] = {
                "evaluator_avg": evaluator_avg,
                "population_avg": population_avg,
                "bias": evaluator_avg - population_avg,
                "bias_direction": "lenient" if evaluator_avg > population_avg else "strict"
            }

        return calibration

Evaluation UI Component

Python
# Flask routes for evaluation UI
from flask import Flask, render_template, request, jsonify

app = Flask(__name__)
eval_manager = HumanEvaluationManager(storage)

@app.route('/evaluate')
def evaluation_page():
    """Render evaluation interface."""
    evaluator_id = request.args.get('evaluator_id')
    review = eval_manager.get_next_review(evaluator_id)

    if not review:
        return render_template('no_reviews.html')

    return render_template('evaluate.html', review=review)

@app.route('/api/submit_evaluation', methods=['POST'])
def submit_evaluation():
    """Submit evaluation via API."""
    data = request.json

    evaluation = HumanEvaluation(
        eval_id=str(uuid.uuid4()),
        task_id=data['task_id'],
        evaluator_id=data['evaluator_id'],
        timestamp=datetime.now(),
        task_completion=data['task_completion'],
        answer_quality=data['answer_quality'],
        reasoning_quality=data['reasoning_quality'],
        efficiency=data['efficiency'],
        factually_correct=data['factually_correct'],
        used_appropriate_tools=data['used_appropriate_tools'],
        handled_errors_well=data['handled_errors_well'],
        would_trust_in_production=data['would_trust_in_production'],
        strengths=data.get('strengths', []),
        weaknesses=data.get('weaknesses', []),
        suggestions=data.get('suggestions', ''),
        time_to_evaluate_seconds=data.get('time_seconds', 0)
    )

    eval_manager.submit_review(data['assignment_id'], evaluation)

    return jsonify({'success': True})

@app.route('/api/dashboard/metrics')
def dashboard_metrics():
    """Get metrics for evaluation dashboard."""
    return jsonify({
        'total_evaluations': eval_manager.storage.count_evaluations(),
        'pending_reviews': eval_manager.storage.count_pending(),
        'avg_task_completion': eval_manager.storage.avg_metric('task_completion'),
        'avg_answer_quality': eval_manager.storage.avg_metric('answer_quality'),
        'trust_rate': eval_manager.storage.percentage_true('would_trust_in_production')
    })

Benchmark Suites

Standard Agent Benchmarks

Python
class AgentBenchmarkSuite:
    """Standard benchmark suite for agent evaluation."""

    BENCHMARKS = {
        "information_retrieval": {
            "description": "Test agent's ability to find and synthesize information",
            "tasks": [
                {
                    "id": "ir-001",
                    "task": "Find the current CEO of OpenAI and their background",
                    "expected_tools": ["web_search"],
                    "verification": "factual_check"
                },
                {
                    "id": "ir-002",
                    "task": "What are the top 3 Python web frameworks by GitHub stars?",
                    "expected_tools": ["web_search", "github_api"],
                    "verification": "list_comparison"
                }
            ]
        },
        "data_analysis": {
            "description": "Test agent's ability to analyze data",
            "tasks": [
                {
                    "id": "da-001",
                    "task": "Calculate the average and standard deviation of sales in the sales table",
                    "expected_tools": ["query_database"],
                    "verification": "numeric_check"
                },
                {
                    "id": "da-002",
                    "task": "Find the top 5 customers by total order value",
                    "expected_tools": ["query_database"],
                    "verification": "query_result_check"
                }
            ]
        },
        "multi_step_reasoning": {
            "description": "Test agent's ability to chain multiple steps",
            "tasks": [
                {
                    "id": "ms-001",
                    "task": "Find all users who signed up last month, get their order history, and identify who hasn't made a purchase",
                    "expected_tools": ["query_database"],
                    "min_steps": 2,
                    "verification": "multi_query_check"
                }
            ]
        },
        "error_recovery": {
            "description": "Test agent's ability to recover from errors",
            "tasks": [
                {
                    "id": "er-001",
                    "task": "Query the users table (note: might be named 'customers')",
                    "expected_behavior": "retry_with_correction",
                    "mock_error": {"table": "users", "error": "relation does not exist"}
                }
            ]
        }
    }

    def __init__(self, agent, mock_tools: dict = None):
        self.agent = agent
        self.mock_tools = mock_tools or {}

    async def run_benchmark(self, benchmark_name: str) -> dict:
        """Run a specific benchmark suite."""
        if benchmark_name not in self.BENCHMARKS:
            raise ValueError(f"Unknown benchmark: {benchmark_name}")

        benchmark = self.BENCHMARKS[benchmark_name]
        results = []

        for task in benchmark["tasks"]:
            result = await self._run_task(task)
            results.append(result)

        return {
            "benchmark": benchmark_name,
            "description": benchmark["description"],
            "results": results,
            "summary": self._summarize_results(results)
        }

    async def run_all_benchmarks(self) -> dict:
        """Run all benchmark suites."""
        all_results = {}

        for name in self.BENCHMARKS:
            all_results[name] = await self.run_benchmark(name)

        return {
            "benchmarks": all_results,
            "overall_summary": self._overall_summary(all_results)
        }

    async def _run_task(self, task: dict) -> dict:
        """Run a single benchmark task."""
        import time
        start = time.time()

        try:
            # Run agent
            result = await self.agent.run(task["task"])

            # Verify result
            verification = await self._verify_result(
                task,
                result,
                task.get("verification")
            )

            return {
                "task_id": task["id"],
                "success": verification["passed"],
                "result": result,
                "verification": verification,
                "latency_ms": (time.time() - start) * 1000
            }

        except Exception as e:
            return {
                "task_id": task["id"],
                "success": False,
                "error": str(e),
                "latency_ms": (time.time() - start) * 1000
            }

    async def _verify_result(self, task: dict, result: dict, verification_type: str) -> dict:
        """Verify task result."""
        if verification_type == "factual_check":
            # Use LLM to verify factual accuracy
            return await self._factual_verification(task, result)

        elif verification_type == "numeric_check":
            # Check numeric results
            return self._numeric_verification(task, result)

        elif verification_type == "list_comparison":
            # Compare list results
            return self._list_verification(task, result)

        else:
            # Default: check if task completed without error
            return {"passed": result.get("outcome") == "success"}

    def _summarize_results(self, results: list) -> dict:
        """Summarize benchmark results."""
        passed = sum(1 for r in results if r.get("success"))
        total = len(results)

        latencies = [r.get("latency_ms", 0) for r in results]

        return {
            "passed": passed,
            "total": total,
            "success_rate": passed / total if total > 0 else 0,
            "avg_latency_ms": sum(latencies) / len(latencies) if latencies else 0
        }

    def _overall_summary(self, all_results: dict) -> dict:
        """Generate overall benchmark summary."""
        total_passed = 0
        total_tasks = 0

        for benchmark in all_results.values():
            summary = benchmark.get("summary", {})
            total_passed += summary.get("passed", 0)
            total_tasks += summary.get("total", 0)

        return {
            "total_passed": total_passed,
            "total_tasks": total_tasks,
            "overall_success_rate": total_passed / total_tasks if total_tasks > 0 else 0
        }

Conclusion

Agent evaluation requires a multi-dimensional approach:

  1. Task success is the primary metric, but it's not enough alone
  2. Tool use correctness reveals whether agents are reasoning properly about capabilities
  3. Trajectory analysis helps understand and debug agent behavior
  4. Sandboxed execution enables safe evaluation without production risk
  5. Production monitoring catches regressions and issues in real-time
  6. A/B testing validates improvements before full rollout

Start with a small, well-curated evaluation dataset. Add metrics and analysis capabilities incrementally. The investment in evaluation infrastructure pays dividends in agent reliability and user trust.

Frequently Asked Questions

Enrico Piovano, PhD

Co-founder & CTO at Goji AI. Former Applied Scientist at Amazon (Alexa & AGI), focused on Agentic AI and LLMs. PhD in Electrical Engineering from Imperial College London. Gold Medalist at the National Mathematical Olympiad.

Related Articles