Building AI Coding Agents: From Code Understanding to Autonomous Development
A comprehensive guide to building AI coding agents—code understanding, edit planning, test generation, iterative debugging, sandboxed execution, and production patterns for autonomous software development.
Table of Contents
The Coding Agent Revolution
AI coding assistants have evolved from autocomplete tools to autonomous agents capable of understanding codebases, planning multi-file changes, writing tests, and iteratively debugging until code works. Claude Code, Cursor, GitHub Copilot Workspace, and Devin represent different points on this spectrum.
This guide covers how to build coding agents: from foundational code understanding to sophisticated multi-step development workflows.
Prerequisites:
- Familiarity with building agentic AI systems
- Understanding of structured outputs and tool use
- Experience with software development workflows
What you'll learn:
- Code understanding and retrieval strategies
- Edit planning and multi-file coordination
- Test generation and validation
- Iterative debugging loops
- Sandboxed execution environments
- Production safety patterns
Coding Agent Architecture
A coding agent needs several interconnected capabilities:
┌─────────────────────────────────────────────────────────────┐
│ Coding Agent │
├─────────────────────────────────────────────────────────────┤
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Code │ │ Edit │ │ Execution │ │
│ │Understanding│─▶│ Planning │─▶│ & Validation │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │ │ │
│ ▼ ▼ ▼ │
│ ┌─────────────┐ ┌─────────────┐ ┌─────────────────────┐ │
│ │ Retrieval │ │ File │ │ Sandboxed │ │
│ │ (Search, │ │ Editing │ │ Environment │ │
│ │ Symbols) │ │ Tools │ │ (Tests, Run) │ │
│ └─────────────┘ └─────────────┘ └─────────────────────┘ │
│ │ │
│ ▼ │
│ ┌─────────────────────┐ │
│ │ Iterative Debug │ │
│ │ Loop (Fix Errors) │ │
│ └─────────────────────┘ │
└─────────────────────────────────────────────────────────────┘
Code Understanding
Before an agent can modify code, it must understand what exists. This is the critical first step that separates toy demos from production-ready coding agents. Without proper code understanding, agents make changes that break existing functionality, miss context that leads to inconsistent code style, or simply can't find the right place to make edits.
Code understanding involves three key capabilities:
- Structural indexing: Know what files, classes, and functions exist
- Semantic search: Find code by what it does, not just its name
- Dependency tracking: Understand how pieces of code relate to each other
Codebase Indexing
The foundation of code understanding is a fast, comprehensive index of the codebase. This index serves multiple purposes: it lets the agent quickly navigate to any file, search for symbols by name, and understand the structure of the project without repeatedly reading files from disk.
Why not just read files on demand? For small projects, reading files on demand works. But for larger codebases (thousands of files), the latency adds up quickly, and you can't efficiently search across all files. An index lets you answer queries like "find all functions named authenticate" in milliseconds.
The evolution of code understanding in 2025: Modern AI coding agents use AST (Abstract Syntax Tree) parsing combined with semantic search rather than naive text search. According to recent research (cAST paper, June 2025), AST-based chunking yields more self-contained code units that improve both retrieval and generation. Tree-sitter has become the de facto standard for multi-language parsing because it preserves exact syntax and positions while being extremely fast (Rust-based).
The dual-index architecture is critical for production: The implementation below uses two complementary indexes: (1) a symbol index mapping symbol names to locations (fast lookup by name), and (2) a file hash index detecting when files have changed (for incremental re-indexing). Without hashing, you'd re-index the entire codebase on every change—wasteful for large repos. The hash lets you detect changes in O(1) time.
Why we extract both symbols AND imports/exports: Symbols tell you what exists in a file. Imports/exports tell you what that file depends on and what it provides to others. This dependency information is crucial for edit planning—if you change a function signature, the agent needs to know which other files import that function and might break.
Multi-language support is non-negotiable: Real codebases are polyglot—Python backend, TypeScript frontend, Go services, SQL queries. A production coding agent must handle at least 3-5 languages. The LANGUAGE_EXTENSIONS mapping provides language detection by file extension, and the _extract_symbols method dispatches to language-specific parsers. This design makes adding new languages straightforward—just add a new parser method.
The signature extraction insight: Notice we extract function signatures with type annotations (def foo(x: int) -> str). This is critical for the agent to understand how to use functions correctly. Without type information, the agent must guess argument types from names/docstrings—error-prone. Modern Python uses type hints, modern TypeScript has native types, and extracting them elevates the agent from "blind code manipulation" to "type-aware refactoring."
Best practices from Anthropic's Claude Code team (source):
- Research-plan-implement workflow: Always have agents research (read files, search symbols) before writing code. Agents that jump straight to coding produce significantly more bugs.
- Incremental indexing: Only re-index changed files. Use file hashes for change detection.
- Structured retrieval: Don't pass raw file content to LLMs—pass structured symbol information with signatures and docstrings.
The data model design:
The CodeFile dataclass is the unit of indexing. It stores content (for semantic search), hash (for change detection), language (for syntax highlighting and parsing), and extracted metadata (symbols, imports, exports). This rich metadata enables the agent to answer questions like "where is function X defined?" or "what does this file export?" without LLM inference.
The CodeSymbol dataclass captures everything the agent needs to understand and use a symbol: its name, kind (function/class/method), exact location (file, line range), signature (how to call it), docstring (what it does), and references (what uses it). This is the semantic code search foundation—embeddings are created from docstrings + signatures, not raw code.
import os
from dataclasses import dataclass, field
from typing import Optional
import hashlib
@dataclass
class CodeFile:
path: str
content: str
language: str
hash: str
symbols: list["CodeSymbol"] = field(default_factory=list)
imports: list[str] = field(default_factory=list)
exports: list[str] = field(default_factory=list)
@dataclass
class CodeSymbol:
name: str
kind: str # function, class, method, variable, type
file_path: str
line_start: int
line_end: int
signature: Optional[str] = None
docstring: Optional[str] = None
references: list[str] = field(default_factory=list)
class CodebaseIndex:
"""Index a codebase for fast retrieval."""
LANGUAGE_EXTENSIONS = {
".py": "python",
".js": "javascript",
".ts": "typescript",
".tsx": "typescript",
".jsx": "javascript",
".go": "go",
".rs": "rust",
".java": "java",
".cpp": "cpp",
".c": "c",
".rb": "ruby",
}
def __init__(self, root_path: str):
self.root_path = root_path
self.files: dict[str, CodeFile] = {}
self.symbols: dict[str, CodeSymbol] = {}
self.symbol_index: dict[str, list[str]] = {} # name -> [file paths]
def index(self, ignore_patterns: list[str] = None):
"""Index the entire codebase."""
ignore_patterns = ignore_patterns or [
"node_modules", "__pycache__", ".git", "venv", ".venv",
"dist", "build", ".next", "target"
]
for root, dirs, files in os.walk(self.root_path):
# Filter ignored directories
dirs[:] = [d for d in dirs if d not in ignore_patterns]
for file in files:
ext = os.path.splitext(file)[1]
if ext in self.LANGUAGE_EXTENSIONS:
file_path = os.path.join(root, file)
self._index_file(file_path)
def _index_file(self, file_path: str):
"""Index a single file."""
try:
with open(file_path, "r", encoding="utf-8") as f:
content = f.read()
except (UnicodeDecodeError, IOError):
return
ext = os.path.splitext(file_path)[1]
language = self.LANGUAGE_EXTENSIONS.get(ext, "unknown")
code_file = CodeFile(
path=file_path,
content=content,
language=language,
hash=hashlib.md5(content.encode()).hexdigest()
)
# Extract symbols based on language
symbols = self._extract_symbols(content, language, file_path)
code_file.symbols = symbols
# Extract imports/exports
code_file.imports = self._extract_imports(content, language)
code_file.exports = self._extract_exports(content, language)
self.files[file_path] = code_file
# Index symbols
for symbol in symbols:
self.symbols[f"{file_path}:{symbol.name}"] = symbol
if symbol.name not in self.symbol_index:
self.symbol_index[symbol.name] = []
self.symbol_index[symbol.name].append(file_path)
def _extract_symbols(
self,
content: str,
language: str,
file_path: str
) -> list[CodeSymbol]:
"""Extract symbols from source code."""
symbols = []
if language == "python":
symbols = self._extract_python_symbols(content, file_path)
elif language in ["javascript", "typescript"]:
symbols = self._extract_js_symbols(content, file_path)
# Add more language parsers as needed
return symbols
def _extract_python_symbols(self, content: str, file_path: str) -> list[CodeSymbol]:
"""Extract symbols from Python code."""
import ast
symbols = []
try:
tree = ast.parse(content)
except SyntaxError:
return symbols
for node in ast.walk(tree):
if isinstance(node, ast.FunctionDef):
symbols.append(CodeSymbol(
name=node.name,
kind="function",
file_path=file_path,
line_start=node.lineno,
line_end=node.end_lineno or node.lineno,
signature=self._get_function_signature(node),
docstring=ast.get_docstring(node)
))
elif isinstance(node, ast.ClassDef):
symbols.append(CodeSymbol(
name=node.name,
kind="class",
file_path=file_path,
line_start=node.lineno,
line_end=node.end_lineno or node.lineno,
docstring=ast.get_docstring(node)
))
# Also extract methods
for item in node.body:
if isinstance(item, ast.FunctionDef):
symbols.append(CodeSymbol(
name=f"{node.name}.{item.name}",
kind="method",
file_path=file_path,
line_start=item.lineno,
line_end=item.end_lineno or item.lineno,
signature=self._get_function_signature(item),
docstring=ast.get_docstring(item)
))
return symbols
def _get_function_signature(self, node: "ast.FunctionDef") -> str:
"""Get function signature string."""
args = []
for arg in node.args.args:
arg_str = arg.arg
if arg.annotation:
arg_str += f": {ast.unparse(arg.annotation)}"
args.append(arg_str)
returns = ""
if node.returns:
returns = f" -> {ast.unparse(node.returns)}"
return f"def {node.name}({', '.join(args)}){returns}"
def _extract_imports(self, content: str, language: str) -> list[str]:
"""Extract import statements."""
imports = []
if language == "python":
import ast
try:
tree = ast.parse(content)
for node in ast.walk(tree):
if isinstance(node, ast.Import):
for alias in node.names:
imports.append(alias.name)
elif isinstance(node, ast.ImportFrom):
imports.append(node.module or "")
except SyntaxError:
pass
return imports
def _extract_exports(self, content: str, language: str) -> list[str]:
"""Extract exported symbols."""
# For Python, top-level functions and classes are exports
# For JS/TS, look for export statements
return [] # Simplified
def search_symbols(self, query: str) -> list[CodeSymbol]:
"""Search for symbols by name."""
results = []
query_lower = query.lower()
for name, paths in self.symbol_index.items():
if query_lower in name.lower():
for path in paths:
key = f"{path}:{name}"
if key in self.symbols:
results.append(self.symbols[key])
return results
def get_file_context(self, file_path: str, line: int, context_lines: int = 10) -> str:
"""Get code context around a specific line."""
if file_path not in self.files:
return ""
lines = self.files[file_path].content.split("\n")
start = max(0, line - context_lines)
end = min(len(lines), line + context_lines)
context_lines = []
for i in range(start, end):
marker = ">>>" if i == line - 1 else " "
context_lines.append(f"{marker} {i+1}: {lines[i]}")
return "\n".join(context_lines)
def find_references(self, symbol_name: str) -> list[tuple[str, int]]:
"""Find all references to a symbol."""
references = []
for path, code_file in self.files.items():
lines = code_file.content.split("\n")
for i, line in enumerate(lines):
if symbol_name in line:
references.append((path, i + 1))
return references
Key design decisions in this indexer:
- Content hashing: The MD5 hash of each file lets you detect changes efficiently. On re-index, only files with changed hashes need re-processing.
- Symbol naming convention: Methods are stored as
ClassName.method_nameto distinguish them from standalone functions and allow precise lookups. - Lazy import extraction: Imports are extracted per-file rather than resolved globally. Resolving imports to actual files is expensive and often unnecessary.
- Line number tracking: Both
line_startandline_endare stored, enabling precise context retrieval for any symbol.
When to rebuild the index: The index should be rebuilt when the agent starts (or on demand). For long-running agents, consider watching the filesystem for changes and incrementally updating affected files. The hash comparison makes incremental updates cheap.
Semantic Code Search
While the structural index handles exact name lookups, real coding tasks often need fuzzy search: "find the function that handles user authentication" or "where is database connection setup?". Semantic search uses embeddings to find code by meaning, not just text matching.
Why semantic search is transformative for coding agents: According to recent work on semantic code indexing, agents using semantic search can find relevant code even when the user's query uses completely different terminology than the code. Example: user asks "where do we check passwords?", semantic search finds verify_authentication() even though the words "check" and "passwords" never appear in that function name.
The fundamental trade-off: structured vs semantic retrieval:
- Structured retrieval (AST-based symbol lookup): Fast, precise, but requires knowing the exact symbol name. Perfect for "go to definition" or "find all references."
- Semantic retrieval (embedding-based): Slower, fuzzy, but handles natural language queries. Perfect for "find code that does X."
Production agents need both. The agent should use structured retrieval when it knows the symbol name, and semantic retrieval when it's exploring or the user provides a natural language description.
The chunking strategy that actually works: Research from cAST (2025) shows that AST-aware chunking (one chunk per function/class) dramatically outperforms fixed-size chunking. Why? Fixed-size chunks split code mid-function, creating meaningless fragments. AST-aware chunks preserve complete, self-contained units of meaning.
Embedding model selection for code: The implementation below uses all-MiniLM-L6-v2, which is fast and decent for code. But specialized code embedding models perform better:
- microsoft/codebert-base: Pre-trained on 6.4M code-comment pairs, understands code structure
- OpenAI text-embedding-3-large: General-purpose but works well on code with proper context
- Voyage Code-2: Specialized for code search (2025), optimized for multi-language retrieval
The enrichment technique that doubles search quality: The key innovation in the _create_chunks method below is augmenting code with metadata. Instead of embedding raw code (def foo(x):\n return x + 1), we embed:
function: foo
signature: def foo(x: int) -> int
docstring: Adds 1 to the input value
[actual code]
This structured representation helps the embedding model understand both what the code does (from the docstring) and how to use it (from the signature). Queries like "function to increment a number" will match even if the function is named foo.
The approach is to:
- Split code into meaningful chunks (typically one chunk per function/class)
- Embed each chunk using a sentence transformer model
- At query time, embed the query and find the most similar chunks
This works surprisingly well because modern embedding models understand code semantics—a query like "parse JSON response" will match functions that decode JSON even if they use different variable names:
from sentence_transformers import SentenceTransformer
import numpy as np
class SemanticCodeSearch:
"""Semantic search over code using embeddings."""
def __init__(self, model_name: str = "all-MiniLM-L6-v2"):
self.model = SentenceTransformer(model_name)
self.embeddings: dict[str, np.ndarray] = {}
self.chunks: dict[str, str] = {}
def index_codebase(self, index: CodebaseIndex):
"""Create embeddings for code chunks."""
for path, code_file in index.files.items():
# Create chunks from the file
chunks = self._create_chunks(code_file)
for i, chunk in enumerate(chunks):
chunk_id = f"{path}:{i}"
self.chunks[chunk_id] = chunk
self.embeddings[chunk_id] = self.model.encode(chunk)
def _create_chunks(self, code_file: CodeFile) -> list[str]:
"""Create searchable chunks from a code file."""
chunks = []
# Chunk by function/class
for symbol in code_file.symbols:
lines = code_file.content.split("\n")
chunk_lines = lines[symbol.line_start - 1:symbol.line_end]
chunk = "\n".join(chunk_lines)
# Include signature and docstring for better search
search_text = f"{symbol.kind}: {symbol.name}"
if symbol.signature:
search_text += f"\n{symbol.signature}"
if symbol.docstring:
search_text += f"\n{symbol.docstring}"
search_text += f"\n{chunk}"
chunks.append(search_text)
# Also add file-level chunk for imports and structure
file_summary = f"File: {code_file.path}\nLanguage: {code_file.language}\n"
file_summary += f"Imports: {', '.join(code_file.imports[:10])}\n"
file_summary += f"Symbols: {', '.join(s.name for s in code_file.symbols[:20])}"
chunks.append(file_summary)
return chunks
def search(self, query: str, top_k: int = 10) -> list[tuple[str, float, str]]:
"""Search for relevant code chunks."""
query_embedding = self.model.encode(query)
scores = []
for chunk_id, embedding in self.embeddings.items():
similarity = np.dot(query_embedding, embedding) / (
np.linalg.norm(query_embedding) * np.linalg.norm(embedding)
)
scores.append((chunk_id, similarity, self.chunks[chunk_id]))
scores.sort(key=lambda x: -x[1])
return scores[:top_k]
Chunking strategy matters: The _create_chunks method creates chunks at the function/class level rather than arbitrary line splits. This is crucial—embedding a random 50-line chunk that starts in the middle of a function produces poor results. By chunking at semantic boundaries (functions, classes), each chunk is self-contained and meaningful.
Embedding enrichment: Notice how each chunk includes not just the raw code but also the symbol's kind ("function", "class"), name, signature, and docstring. This enriched text helps the embedding model understand the code's purpose, making search more accurate.
Model choice: all-MiniLM-L6-v2 is a good default for code search—it's fast and works well on technical text. For larger codebases or more precise results, consider code-specific models like microsoft/codebert-base or OpenAI's embedding models.
Code Understanding Tools
With indexing and search in place, we need to expose these capabilities as tools the LLM agent can call. The tool abstraction wraps each capability with a description (so the LLM knows when to use it) and a JSON schema for parameters (so the LLM can call it correctly).
Why wrap everything as tools? This is the key to agentic behavior. Instead of hardcoding a fixed sequence of operations, we give the agent a toolkit and let it decide which tools to use based on the task. A simple task might only need read_file, while a complex refactoring might require search_code → find_symbol → find_references → read_file for each file.
The tools below cover the essential code understanding operations. Each tool is designed to be self-contained—it takes specific parameters and returns useful output that the LLM can reason about:
from dataclasses import dataclass
from typing import Optional
import subprocess
@dataclass
class Tool:
name: str
description: str
parameters: dict
function: callable
class CodingTools:
"""Tools for coding agents."""
def __init__(self, codebase_index: CodebaseIndex, semantic_search: SemanticCodeSearch):
self.index = codebase_index
self.search = semantic_search
def get_tools(self) -> list[Tool]:
"""Get all available coding tools."""
return [
Tool(
name="read_file",
description="Read the contents of a file. Use this to understand existing code.",
parameters={
"type": "object",
"properties": {
"path": {"type": "string", "description": "Path to the file"},
"start_line": {"type": "integer", "description": "Starting line (optional)"},
"end_line": {"type": "integer", "description": "Ending line (optional)"}
},
"required": ["path"]
},
function=self.read_file
),
Tool(
name="search_code",
description="Search for code by description or functionality. Use natural language queries.",
parameters={
"type": "object",
"properties": {
"query": {"type": "string", "description": "Natural language search query"},
"top_k": {"type": "integer", "description": "Number of results", "default": 5}
},
"required": ["query"]
},
function=self.search_code
),
Tool(
name="find_symbol",
description="Find a function, class, or variable by name.",
parameters={
"type": "object",
"properties": {
"name": {"type": "string", "description": "Symbol name to find"}
},
"required": ["name"]
},
function=self.find_symbol
),
Tool(
name="find_references",
description="Find all places where a symbol is used.",
parameters={
"type": "object",
"properties": {
"symbol": {"type": "string", "description": "Symbol name"}
},
"required": ["symbol"]
},
function=self.find_references
),
Tool(
name="list_directory",
description="List files and directories at a path.",
parameters={
"type": "object",
"properties": {
"path": {"type": "string", "description": "Directory path"}
},
"required": ["path"]
},
function=self.list_directory
),
Tool(
name="grep_code",
description="Search for exact text patterns in code.",
parameters={
"type": "object",
"properties": {
"pattern": {"type": "string", "description": "Text pattern to search"},
"file_pattern": {"type": "string", "description": "File glob pattern (e.g., '*.py')"}
},
"required": ["pattern"]
},
function=self.grep_code
)
]
def read_file(self, path: str, start_line: int = None, end_line: int = None) -> str:
"""Read file contents."""
try:
with open(path, "r") as f:
lines = f.readlines()
if start_line is not None or end_line is not None:
start = (start_line or 1) - 1
end = end_line or len(lines)
lines = lines[start:end]
# Add line numbers
numbered = []
start_num = start_line or 1
for i, line in enumerate(lines):
numbered.append(f"{start_num + i}: {line.rstrip()}")
return "\n".join(numbered)
except Exception as e:
return f"Error reading file: {e}"
def search_code(self, query: str, top_k: int = 5) -> str:
"""Semantic code search."""
results = self.search.search(query, top_k)
output = []
for chunk_id, score, content in results:
path = chunk_id.rsplit(":", 1)[0]
output.append(f"--- {path} (relevance: {score:.2f}) ---")
output.append(content[:500] + "..." if len(content) > 500 else content)
output.append("")
return "\n".join(output)
def find_symbol(self, name: str) -> str:
"""Find symbol definitions."""
symbols = self.index.search_symbols(name)
if not symbols:
return f"No symbols found matching '{name}'"
output = []
for symbol in symbols[:10]:
output.append(f"{symbol.kind}: {symbol.name}")
output.append(f" File: {symbol.file_path}")
output.append(f" Lines: {symbol.line_start}-{symbol.line_end}")
if symbol.signature:
output.append(f" Signature: {symbol.signature}")
if symbol.docstring:
output.append(f" Docstring: {symbol.docstring[:100]}...")
output.append("")
return "\n".join(output)
def find_references(self, symbol: str) -> str:
"""Find symbol references."""
refs = self.index.find_references(symbol)
if not refs:
return f"No references found for '{symbol}'"
output = [f"Found {len(refs)} references to '{symbol}':"]
for path, line in refs[:20]:
context = self.index.get_file_context(path, line, context_lines=1)
output.append(f"\n{path}:{line}")
output.append(context)
if len(refs) > 20:
output.append(f"\n... and {len(refs) - 20} more")
return "\n".join(output)
def list_directory(self, path: str) -> str:
"""List directory contents."""
try:
entries = os.listdir(path)
dirs = []
files = []
for entry in sorted(entries):
full_path = os.path.join(path, entry)
if os.path.isdir(full_path):
dirs.append(f"📁 {entry}/")
else:
size = os.path.getsize(full_path)
files.append(f"📄 {entry} ({size} bytes)")
return "\n".join(dirs + files)
except Exception as e:
return f"Error listing directory: {e}"
def grep_code(self, pattern: str, file_pattern: str = None) -> str:
"""Search for text patterns."""
cmd = ["grep", "-rn", pattern, self.index.root_path]
if file_pattern:
cmd.extend(["--include", file_pattern])
try:
result = subprocess.run(
cmd,
capture_output=True,
text=True,
timeout=30
)
output = result.stdout
if len(output) > 5000:
output = output[:5000] + "\n... (truncated)"
return output or "No matches found"
except Exception as e:
return f"Error running grep: {e}"
Tool design principles:
-
Clear descriptions: Each tool has a description that tells the LLM when to use it. "Search for code by description or functionality" is much more useful than just "search_code".
-
Structured output: Tools return formatted text that's easy for the LLM to parse. Line numbers, file paths, and relevance scores help the agent make informed decisions.
-
Graceful errors: Every tool returns meaningful error messages rather than crashing. "File not found: /foo/bar.py" is actionable; an exception traceback is not.
-
Output truncation: Notice how
grep_codetruncates output over 5000 characters. Large tool outputs consume context tokens and can confuse the LLM. Truncation with a "... (truncated)" marker preserves usability.
Complementary tools: The six tools here serve different purposes:
read_file: Get exact content when you know the filesearch_code: Find code when you don't know where it isfind_symbol: Locate definitions by namefind_references: See where something is usedlist_directory: Navigate project structuregrep_code: Find exact text patterns
Most coding tasks can be accomplished with combinations of these tools. More complex agents might add tools for running tests, checking git status, or querying documentation.
Edit Planning
Before making changes, the agent should plan what to modify. Jumping straight from task description to file editing is a recipe for inconsistent, incomplete, or conflicting changes. Planning creates a checkpoint where the agent (or a human) can review the proposed approach before any files are touched.
Why edit planning is the difference between toy demos and production agents: According to Anthropic's best practices for Claude Code, the research-plan-implement workflow is critical—"asking AI agents to research and plan first significantly improves performance for problems requiring deeper thinking upfront, rather than letting them jump straight to coding." Anthropic's engineers report "big drops in re-work when these initial steps are never skipped."
The compounding cost of unplanned edits: When an agent edits files without planning:
- It modifies File A, creating a new function signature
- It doesn't realize File B imports that function
- File B now has broken imports
- Tests fail with cryptic errors
- The agent tries to fix File B but doesn't understand the original goal anymore
- After 5 iterations, the codebase is in a worse state than before
With planning, the agent identifies all affected files upfront, sequences edits correctly (File A first, then File B), and anticipates test failures.
Why planning matters:
- Prevents partial implementations: Without a plan, the agent might modify one file but miss related files that also need changes
- Enables review: A plan can be shown to the user for approval before any edits happen
- Handles dependencies: Some edits must happen in a specific order (e.g., create the interface before implementing it)
- Estimates risk: Complex plans with many file edits are inherently riskier than simple single-file changes
The review checkpoint is essential for production: Many teams require human approval before agents execute plans. The plan becomes a contract: "I will make these changes for these reasons." The human reviews, suggests modifications, or approves. Only then does execution begin. This human-in-the-loop approach (source: enterprise AI adoption practices) prevents agents from making large, irreversible changes without oversight.
Multi-step plans require dependency ordering: Consider "Add user authentication." This might require:
- Create
auth.pywith authentication logic (no dependencies) - Modify
models.pyto add User model (depends on auth types) - Update
api.pyto use authentication (depends on both) - Write tests in
test_auth.py(depends on all above)
The dependencies list in the EditPlan captures this ordering. Executing out-of-order breaks things. The plan makes dependencies explicit.
Risk estimation helps with deployment decisions: A plan to rename a variable used in 2 files is low risk. A plan to refactor the entire database layer touching 50 files is high risk. The estimated_risk field lets systems route appropriately:
- Low risk: Execute automatically
- Medium risk: Show summary to user, require approval
- High risk: Require detailed review + manual testing
Edit Plan Structure
from dataclasses import dataclass, field
from typing import Optional, Literal
from enum import Enum
class EditType(str, Enum):
CREATE = "create"
MODIFY = "modify"
DELETE = "delete"
RENAME = "rename"
@dataclass
class FileEdit:
file_path: str
edit_type: EditType
description: str
old_content: Optional[str] = None # For modify/delete
new_content: Optional[str] = None # For create/modify
line_start: Optional[int] = None
line_end: Optional[int] = None
@dataclass
class EditPlan:
goal: str
reasoning: str
edits: list[FileEdit]
dependencies: list[str] = field(default_factory=list) # Order of edits
tests_to_run: list[str] = field(default_factory=list)
estimated_risk: Literal["low", "medium", "high"] = "medium"
def validate(self) -> tuple[bool, list[str]]:
"""Validate the edit plan."""
issues = []
# Check for conflicting edits to same file
file_edits = {}
for edit in self.edits:
if edit.file_path in file_edits:
issues.append(f"Multiple edits to {edit.file_path} - may conflict")
file_edits[edit.file_path] = edit
# Check for missing content
for edit in self.edits:
if edit.edit_type == EditType.CREATE and not edit.new_content:
issues.append(f"CREATE edit for {edit.file_path} missing new_content")
if edit.edit_type == EditType.MODIFY and not edit.new_content:
issues.append(f"MODIFY edit for {edit.file_path} missing new_content")
return len(issues) == 0, issues
The EditType enum captures the four fundamental file operations: create, modify, delete, and rename. The FileEdit dataclass captures everything needed to perform a single edit, including line ranges for surgical modifications. The EditPlan ties multiple edits together with ordering dependencies and test requirements.
The validation method is crucial: Before executing any plan, validate() checks for obvious problems like conflicting edits to the same file or missing content. Catching these issues at plan time is much better than discovering them mid-execution when some files have already been modified.
Edit Planning Agent
The edit planner uses the LLM to create structured plans from natural language tasks. The key is providing enough context (relevant code snippets) for the LLM to make informed decisions about what needs to change.
This implementation uses Pydantic models with response_model for structured output. The LLM returns a properly typed EditPlanResponse rather than free-form text that would need parsing:
from pydantic import BaseModel, Field
from typing import Optional
class PlannedEdit(BaseModel):
file_path: str = Field(..., description="Path to the file to edit")
edit_type: Literal["create", "modify", "delete"] = Field(..., description="Type of edit")
description: str = Field(..., description="What this edit accomplishes")
changes_summary: str = Field(..., description="Summary of specific changes")
class EditPlanResponse(BaseModel):
goal_understanding: str = Field(..., description="Your understanding of what needs to be done")
approach: str = Field(..., description="High-level approach to accomplish the goal")
edits: list[PlannedEdit] = Field(..., description="List of file edits needed")
execution_order: list[str] = Field(..., description="Order to execute edits (file paths)")
tests_needed: list[str] = Field(..., description="Tests that should be run/created")
potential_risks: list[str] = Field(default_factory=list, description="Potential issues to watch for")
class EditPlanner:
"""Plan code edits before execution."""
def __init__(self, client, codebase_index: CodebaseIndex):
self.client = client
self.index = codebase_index
def create_plan(
self,
task: str,
relevant_context: str,
constraints: list[str] = None
) -> EditPlanResponse:
"""Create an edit plan for a task."""
system_prompt = """You are a senior software engineer planning code changes.
Given a task and relevant code context, create a detailed edit plan.
Guidelines:
1. Understand the existing code structure before proposing changes
2. Make minimal, focused changes - don't refactor unrelated code
3. Consider the impact on other parts of the codebase
4. Plan for testability - changes should be verifiable
5. Order edits to avoid breaking intermediate states
Be specific about what changes are needed in each file."""
user_prompt = f"""Task: {task}
Relevant Code Context:
{relevant_context}
{f"Constraints: {chr(10).join(constraints)}" if constraints else ""}
Create a detailed edit plan."""
plan = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_model=EditPlanResponse
)
return plan
def refine_plan(
self,
plan: EditPlanResponse,
feedback: str
) -> EditPlanResponse:
"""Refine a plan based on feedback."""
prompt = f"""Previous plan:
{plan.model_dump_json(indent=2)}
Feedback:
{feedback}
Create an improved edit plan addressing the feedback."""
refined = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": prompt}
],
response_model=EditPlanResponse
)
return refined
System prompt design: The system prompt for the planner emphasizes key principles: understand before changing, minimize changes, consider impact, and order edits correctly. These guidelines help the LLM produce plans that are more likely to succeed and less likely to introduce regressions.
Iterative refinement: The refine_plan method allows for human-in-the-loop iteration. A user might review a plan and say "don't modify the database schema" or "add a test file too"—the planner can incorporate this feedback into an updated plan.
Code Editing
With a plan in hand, the agent needs to actually modify files. This is where things get dangerous—file edits are persistent and potentially destructive. A well-designed file editor must handle failures gracefully and provide rollback capabilities.
Safe File Editing
The FileEditor class implements safe file editing with automatic backups and rollback support. Before any edit, the original file is copied to a backup location. If an edit fails or causes problems, the agent (or user) can restore the original state.
Why backup before editing? Even with perfect planning, edits can fail. The file might have changed since it was read, permissions might prevent writing, or the edit might introduce syntax errors. Backups provide an escape hatch.
The key methods are:
edit_file: Complete file replacement with automatic backupapply_patch: Search-and-replace for surgical edits (safer than full replacement)rollback_last/rollback_all: Restore previous state
import difflib
import shutil
from datetime import datetime
class FileEditor:
"""Safe file editing with backup and rollback."""
def __init__(self, backup_dir: str = ".code_agent_backups"):
self.backup_dir = backup_dir
self.edit_history: list[dict] = []
def edit_file(
self,
file_path: str,
new_content: str,
create_if_missing: bool = False
) -> dict:
"""Edit a file with backup."""
# Create backup
backup_path = None
original_content = None
if os.path.exists(file_path):
original_content = self._read_file(file_path)
backup_path = self._create_backup(file_path)
elif not create_if_missing:
return {"success": False, "error": f"File not found: {file_path}"}
# Write new content
try:
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as f:
f.write(new_content)
edit_record = {
"file_path": file_path,
"backup_path": backup_path,
"original_content": original_content,
"new_content": new_content,
"timestamp": datetime.now().isoformat()
}
self.edit_history.append(edit_record)
# Generate diff for logging
diff = self._generate_diff(original_content or "", new_content, file_path)
return {
"success": True,
"diff": diff,
"backup_path": backup_path
}
except Exception as e:
# Rollback on failure
if backup_path and os.path.exists(backup_path):
shutil.copy(backup_path, file_path)
return {"success": False, "error": str(e)}
def apply_patch(self, file_path: str, search: str, replace: str) -> dict:
"""Apply a search-and-replace patch."""
if not os.path.exists(file_path):
return {"success": False, "error": f"File not found: {file_path}"}
content = self._read_file(file_path)
if search not in content:
return {
"success": False,
"error": f"Search text not found in {file_path}",
"hint": "The code may have changed. Re-read the file and try again."
}
# Count occurrences
occurrences = content.count(search)
if occurrences > 1:
return {
"success": False,
"error": f"Search text found {occurrences} times - ambiguous",
"hint": "Include more context to make the search unique."
}
new_content = content.replace(search, replace, 1)
return self.edit_file(file_path, new_content)
def rollback_last(self) -> dict:
"""Rollback the last edit."""
if not self.edit_history:
return {"success": False, "error": "No edits to rollback"}
last_edit = self.edit_history.pop()
if last_edit["backup_path"] and os.path.exists(last_edit["backup_path"]):
shutil.copy(last_edit["backup_path"], last_edit["file_path"])
return {"success": True, "rolled_back": last_edit["file_path"]}
elif last_edit["original_content"] is None:
# File was created, delete it
os.remove(last_edit["file_path"])
return {"success": True, "deleted": last_edit["file_path"]}
else:
return {"success": False, "error": "Backup not found"}
def rollback_all(self) -> dict:
"""Rollback all edits in reverse order."""
results = []
while self.edit_history:
result = self.rollback_last()
results.append(result)
return {"success": True, "rollbacks": results}
def _read_file(self, path: str) -> str:
with open(path, "r") as f:
return f.read()
def _create_backup(self, file_path: str) -> str:
os.makedirs(self.backup_dir, exist_ok=True)
timestamp = datetime.now().strftime("%Y%m%d_%H%M%S")
backup_name = f"{os.path.basename(file_path)}.{timestamp}.bak"
backup_path = os.path.join(self.backup_dir, backup_name)
shutil.copy(file_path, backup_path)
return backup_path
def _generate_diff(self, old: str, new: str, file_path: str) -> str:
diff = difflib.unified_diff(
old.splitlines(keepends=True),
new.splitlines(keepends=True),
fromfile=f"a/{file_path}",
tofile=f"b/{file_path}"
)
return "".join(diff)
The apply_patch method is safer than full file replacement. When you use search-and-replace, you only change what you intend to change. If the search text isn't found (file was modified externally) or is found multiple times (ambiguous), the patch fails gracefully rather than overwriting the wrong content.
Diff generation: Every edit records a unified diff. This serves two purposes: logging (you can see exactly what changed) and debugging (if something breaks, the diff shows what the agent did).
Edit history: The edit_history list tracks all changes in order, enabling selective rollback. For complex multi-file changes, you might want to rollback just the last edit while keeping earlier changes.
Code Generation
The code generator is responsible for producing new code from descriptions. Unlike editing, which modifies existing code, generation creates code from scratch (or fills in function bodies from signatures).
The key challenge with code generation is matching context. Generated code should match the project's style, use the right imports, and follow established patterns. We address this by passing surrounding code context to the LLM:
class CodeGenerator:
"""Generate code using LLM."""
def __init__(self, client):
self.client = client
def generate_function(
self,
description: str,
signature: str,
context: str,
language: str = "python"
) -> str:
"""Generate a function implementation."""
system_prompt = f"""You are an expert {language} developer.
Generate a complete, working function implementation.
Guidelines:
1. Follow the exact signature provided
2. Include appropriate error handling
3. Add type hints (for Python/TypeScript)
4. Keep the implementation focused and minimal
5. Match the style of the surrounding code context
Return ONLY the function code, no explanations."""
prompt = f"""Function signature:
{signature}
Description:
{description}
Surrounding code context:
{context}
Generate the function implementation:"""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
)
code = response.choices[0].message.content
# Extract code from markdown if present
if "```" in code:
code = code.split("```")[1]
if code.startswith(language):
code = code[len(language):]
code = code.strip()
return code
def generate_tests(
self,
code: str,
function_name: str,
language: str = "python"
) -> str:
"""Generate tests for a function."""
test_framework = {
"python": "pytest",
"javascript": "jest",
"typescript": "jest"
}.get(language, "generic")
system_prompt = f"""You are a test engineer specializing in {language}.
Generate comprehensive tests using {test_framework}.
Guidelines:
1. Test happy path scenarios
2. Test edge cases (empty inputs, large inputs, etc.)
3. Test error conditions
4. Use descriptive test names
5. Keep tests independent and focused
Return ONLY the test code."""
prompt = f"""Generate tests for this function:
{code}
Function to test: {function_name}"""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": prompt}
]
)
return self._extract_code(response.choices[0].message.content, language)
def fix_code(
self,
code: str,
error_message: str,
language: str = "python"
) -> str:
"""Fix code based on error message."""
prompt = f"""This {language} code has an error:
```{language}
{code}
Error message: {error_message}
Fix the code. Return ONLY the corrected code, no explanations."""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": prompt}
]
)
return self._extract_code(response.choices[0].message.content, language)
def _extract_code(self, text: str, language: str) -> str:
"""Extract code from markdown."""
if "```" in text:
parts = text.split("```")
for part in parts[1::2]: # Every other part starting from 1
if part.startswith(language):
return part[len(language):].strip()
elif not any(part.startswith(lang) for lang in ["python", "javascript", "typescript"]):
return part.strip()
return text.strip()
**Three core generation methods:**
1. **`generate_function`**: Given a signature and description, generates a complete function implementation. The system prompt emphasizes matching the surrounding code style.
2. **`generate_tests`**: Creates test cases for existing code. The prompt specifies the testing framework (pytest, jest) and requests coverage of happy paths, edge cases, and error conditions.
3. **`fix_code`**: Takes broken code and an error message, produces corrected code. This is used in the debugging loop when tests fail.
**Code extraction from markdown:** LLM responses often include markdown code blocks. The `_extract_code` method strips the markdown syntax to get clean code. It handles multiple code blocks and language identifiers.
**Why specify "Return ONLY the code"?** Without this instruction, LLMs tend to add explanations before and after code. These explanations are helpful for humans but problematic when the output goes directly into a file.
---
## Test Generation and Validation
Tests are the foundation of reliable code changes. Without tests, you can't know if your changes work or if they broke something else. For coding agents, tests serve as automated verification—the agent can run tests after each change to check its work.
### Test Runner
The test runner abstracts away framework-specific details. Whether you're running pytest, jest, or go test, the runner provides a consistent interface: run tests, capture output, and parse results into a structured format.
**Why parse test output?** Raw test output is hard for LLMs to reason about. By extracting passed/failed test names and structured error information, we give the LLM actionable data for debugging:
```python
import subprocess
import tempfile
from dataclasses import dataclass
from typing import Optional
@dataclass
class TestResult:
passed: bool
output: str
error_output: str
duration_ms: float
failed_tests: list[str]
passed_tests: list[str]
class TestRunner:
"""Run tests and capture results."""
def __init__(self, project_root: str):
self.project_root = project_root
def run_pytest(
self,
test_path: str = None,
test_pattern: str = None,
timeout: int = 300
) -> TestResult:
"""Run pytest tests."""
cmd = ["python", "-m", "pytest", "-v"]
if test_path:
cmd.append(test_path)
if test_pattern:
cmd.extend(["-k", test_pattern])
cmd.append("--tb=short") # Shorter tracebacks
return self._run_command(cmd, timeout)
def run_jest(
self,
test_path: str = None,
timeout: int = 300
) -> TestResult:
"""Run Jest tests."""
cmd = ["npx", "jest", "--verbose"]
if test_path:
cmd.append(test_path)
return self._run_command(cmd, timeout)
def run_single_test(
self,
test_code: str,
language: str = "python",
timeout: int = 60
) -> TestResult:
"""Run a single test in isolation."""
with tempfile.NamedTemporaryFile(
mode="w",
suffix=f".{'py' if language == 'python' else 'js'}",
delete=False
) as f:
f.write(test_code)
test_file = f.name
try:
if language == "python":
return self.run_pytest(test_file, timeout=timeout)
else:
return self.run_jest(test_file, timeout=timeout)
finally:
os.unlink(test_file)
def _run_command(self, cmd: list[str], timeout: int) -> TestResult:
"""Run a command and parse results."""
import time
start = time.time()
try:
result = subprocess.run(
cmd,
cwd=self.project_root,
capture_output=True,
text=True,
timeout=timeout
)
duration = (time.time() - start) * 1000
# Parse test results
passed_tests, failed_tests = self._parse_test_output(
result.stdout + result.stderr
)
return TestResult(
passed=result.returncode == 0,
output=result.stdout,
error_output=result.stderr,
duration_ms=duration,
failed_tests=failed_tests,
passed_tests=passed_tests
)
except subprocess.TimeoutExpired:
return TestResult(
passed=False,
output="",
error_output="Test timed out",
duration_ms=timeout * 1000,
failed_tests=["TIMEOUT"],
passed_tests=[]
)
def _parse_test_output(self, output: str) -> tuple[list[str], list[str]]:
"""Parse test output to extract passed/failed tests."""
passed = []
failed = []
for line in output.split("\n"):
# pytest format
if "PASSED" in line:
test_name = line.split("::")[1].split()[0] if "::" in line else line
passed.append(test_name.strip())
elif "FAILED" in line:
test_name = line.split("::")[1].split()[0] if "::" in line else line
failed.append(test_name.strip())
# jest format
elif "✓" in line:
passed.append(line.strip())
elif "✕" in line:
failed.append(line.strip())
return passed, failed
Framework-specific methods: run_pytest and run_jest handle the differences between Python and JavaScript testing. The methods add appropriate flags (-v for verbose output, --tb=short for concise tracebacks) that make the output more useful.
Timeout handling: Tests can hang or run forever (infinite loops, deadlocks). The timeout parameter prevents the agent from getting stuck. When a timeout occurs, we return a special "TIMEOUT" failed test result so the agent knows what happened.
Output parsing: The _parse_test_output method looks for framework-specific patterns (PASSED/FAILED for pytest, checkmarks for jest) to extract individual test results. This lets the agent know exactly which tests failed, not just that something failed.
Test-Driven Development Loop
TDD (Test-Driven Development) is a natural fit for coding agents. The loop is:
- Write tests that define the expected behavior
- Run tests (they should fail—the code doesn't exist yet)
- Write implementation to make tests pass
- Run tests again
- If tests fail, fix the implementation and repeat
Why TDD for agents? Tests provide an unambiguous success criterion. Instead of asking "is this implementation correct?", we ask "do tests pass?". This binary feedback is much easier for agents to work with than subjective quality judgments.
class TDDLoop:
"""Test-driven development loop for coding agents."""
def __init__(
self,
client,
code_generator: CodeGenerator,
test_runner: TestRunner,
file_editor: FileEditor,
max_iterations: int = 5
):
self.client = client
self.generator = code_generator
self.runner = test_runner
self.editor = file_editor
self.max_iterations = max_iterations
def implement_with_tests(
self,
task: str,
target_file: str,
test_file: str,
context: str = ""
) -> dict:
"""Implement a feature using TDD."""
# Step 1: Generate tests first
test_code = self._generate_tests_for_task(task, context)
# Save tests
self.editor.edit_file(test_file, test_code, create_if_missing=True)
# Step 2: Run tests (should fail)
initial_result = self.runner.run_pytest(test_file)
if initial_result.passed:
return {
"success": True,
"message": "Tests already pass - feature may already exist",
"iterations": 0
}
# Step 3: Implement to make tests pass
for iteration in range(self.max_iterations):
# Generate implementation
implementation = self._generate_implementation(
task,
context,
test_code,
initial_result.error_output if iteration == 0 else test_result.error_output
)
# Save implementation
self.editor.edit_file(target_file, implementation, create_if_missing=True)
# Run tests
test_result = self.runner.run_pytest(test_file)
if test_result.passed:
return {
"success": True,
"message": "All tests pass",
"iterations": iteration + 1,
"implementation": implementation,
"tests": test_code
}
# If tests still fail, try to fix
context += f"\n\nPrevious attempt failed:\n{test_result.error_output}"
# Max iterations reached
return {
"success": False,
"message": f"Failed to pass tests after {self.max_iterations} iterations",
"last_error": test_result.error_output,
"implementation": implementation,
"tests": test_code
}
def _generate_tests_for_task(self, task: str, context: str) -> str:
"""Generate tests for a task."""
prompt = f"""Generate pytest tests for this task:
Task: {task}
Context:
{context}
Write comprehensive tests that:
1. Define the expected behavior
2. Cover edge cases
3. Are specific and verifiable
Return only the test code with necessary imports."""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return self.generator._extract_code(
response.choices[0].message.content,
"python"
)
def _generate_implementation(
self,
task: str,
context: str,
test_code: str,
error_output: str
) -> str:
"""Generate implementation to pass tests."""
prompt = f"""Implement code to pass these tests:
Task: {task}
Tests:
```python
{test_code}
{f"Current error output:{chr(10)}{error_output}" if error_output else ""}
Context: {context}
Write the implementation that makes all tests pass. Return only the implementation code with necessary imports."""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return self.generator._extract_code(
response.choices[0].message.content,
"python"
)
**The TDD loop flow:**
1. **Generate tests first**: The `_generate_tests_for_task` method creates tests from the task description. These tests define what "success" looks like.
2. **Initial test run**: We run tests expecting them to fail. If they pass, the feature may already exist (no work needed).
3. **Iterative implementation**: For up to `max_iterations`, we generate/refine the implementation. Each iteration includes the previous error output, so the LLM learns from its mistakes.
4. **Context accumulation**: Failed attempts are added to the context: "Previous attempt failed: [error]". This prevents the LLM from repeating the same mistake.
**Why limit iterations?** Without a limit, the agent could loop forever on an impossible task. The default of 5 iterations usually suffices—if the agent can't fix the code in 5 tries, human intervention is likely needed.
---
## Iterative Debugging
When tests fail, the agent needs to debug. Unlike humans who can reason about code holistically, LLM agents work best with a structured debugging process: analyze error → form hypothesis → apply fix → test → repeat.
### Debug Loop
The `IterativeDebugger` class implements a systematic debugging approach. Each iteration records what was tried and what happened, building up a history that helps avoid repeating failed approaches:
```python
@dataclass
class DebugAttempt:
iteration: int
error_type: str
error_message: str
hypothesis: str
fix_applied: str
result: str
class IterativeDebugger:
"""Iteratively debug code until it works."""
def __init__(
self,
client,
code_generator: CodeGenerator,
test_runner: TestRunner,
file_editor: FileEditor,
max_iterations: int = 10
):
self.client = client
self.generator = code_generator
self.runner = test_runner
self.editor = file_editor
self.max_iterations = max_iterations
def debug_until_passing(
self,
file_path: str,
test_command: str = None,
run_command: str = None
) -> dict:
"""Debug code until tests pass or command succeeds."""
attempts = []
for iteration in range(self.max_iterations):
# Run tests/command
if test_command:
result = self._run_command(test_command)
elif run_command:
result = self._run_command(run_command)
else:
result = self.runner.run_pytest()
if result["success"]:
return {
"success": True,
"iterations": iteration,
"attempts": attempts
}
# Analyze error
error_analysis = self._analyze_error(
file_path,
result["error"],
attempts
)
# Generate fix
fix = self._generate_fix(
file_path,
error_analysis,
attempts
)
# Apply fix
edit_result = self.editor.apply_patch(
file_path,
fix["search"],
fix["replace"]
)
if not edit_result["success"]:
# Try full file rewrite if patch fails
current_content = self._read_file(file_path)
fixed_code = self.generator.fix_code(
current_content,
result["error"]
)
edit_result = self.editor.edit_file(file_path, fixed_code)
attempts.append(DebugAttempt(
iteration=iteration,
error_type=error_analysis["error_type"],
error_message=result["error"][:500],
hypothesis=error_analysis["hypothesis"],
fix_applied=fix.get("description", "Full rewrite"),
result="pending"
))
return {
"success": False,
"iterations": self.max_iterations,
"attempts": attempts,
"last_error": result["error"]
}
def _analyze_error(
self,
file_path: str,
error: str,
previous_attempts: list[DebugAttempt]
) -> dict:
"""Analyze an error to understand the root cause."""
code = self._read_file(file_path)
previous_context = ""
if previous_attempts:
previous_context = "\n\nPrevious attempts:\n"
for attempt in previous_attempts[-3:]:
previous_context += f"- {attempt.hypothesis} -> {attempt.fix_applied}\n"
prompt = f"""Analyze this error and identify the root cause.
Code:
{code}
Error:
{error}
{previous_context}
Respond with:
1. error_type: The category of error (syntax, type, logic, import, etc.)
2. hypothesis: Your theory about what's causing the error
3. location: Where in the code the problem likely is
4. suggested_fix: What should be changed"""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
# Parse response (simplified)
content = response.choices[0].message.content
return {
"error_type": self._extract_field(content, "error_type") or "unknown",
"hypothesis": self._extract_field(content, "hypothesis") or error[:200],
"location": self._extract_field(content, "location"),
"suggested_fix": self._extract_field(content, "suggested_fix")
}
def _generate_fix(
self,
file_path: str,
error_analysis: dict,
previous_attempts: list[DebugAttempt]
) -> dict:
"""Generate a code fix."""
code = self._read_file(file_path)
# Build context from previous attempts
avoid_list = []
for attempt in previous_attempts:
avoid_list.append(attempt.fix_applied)
prompt = f"""Generate a fix for this code.
Code:
{code}
Error analysis:
- Type: {error_analysis["error_type"]}
- Hypothesis: {error_analysis["hypothesis"]}
- Location: {error_analysis["location"]}
- Suggested fix: {error_analysis["suggested_fix"]}
{"Avoid these fixes (already tried): " + str(avoid_list) if avoid_list else ""}
Respond with:
1. search: The exact text to find (must match exactly)
2. replace: The text to replace it with
3. description: Brief description of the fix"""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
content = response.choices[0].message.content
return {
"search": self._extract_field(content, "search") or "",
"replace": self._extract_field(content, "replace") or "",
"description": self._extract_field(content, "description") or ""
}
def _run_command(self, command: str) -> dict:
"""Run a shell command."""
try:
result = subprocess.run(
command,
shell=True,
capture_output=True,
text=True,
timeout=60
)
return {
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr or result.stdout
}
except Exception as e:
return {
"success": False,
"output": "",
"error": str(e)
}
def _read_file(self, path: str) -> str:
with open(path, "r") as f:
return f.read()
def _extract_field(self, text: str, field: str) -> Optional[str]:
"""Extract a field from LLM response."""
# Simple extraction - would be better with structured output
for line in text.split("\n"):
if line.lower().startswith(f"{field}:"):
return line.split(":", 1)[1].strip()
if line.lower().startswith(f"- {field}:"):
return line.split(":", 1)[1].strip()
return None
The debugging cycle:
-
Error analysis: The
_analyze_errormethod asks the LLM to categorize the error (syntax, type, logic, import) and form a hypothesis about the root cause. This structured analysis is more effective than just passing raw errors. -
Fix generation: Based on the analysis,
_generate_fixcreates a search-and-replace patch. The prompt includes previous attempts to avoid repeating failed fixes. -
Patch application: We prefer
apply_patchover full file rewrite. If the patch fails (search text not found), we fall back tofix_codefor a complete rewrite. -
Attempt tracking: Each iteration is recorded as a
DebugAttempt. This history is shown to the LLM in subsequent iterations: "Avoid these fixes (already tried): [...]".
Why track previous attempts? LLMs can get stuck in loops, proposing the same fix repeatedly. By explicitly showing what was already tried, we guide the model toward novel approaches.
Fallback strategy: When patches fail, we try a full file rewrite. This is more expensive (regenerates the entire file) but sometimes necessary when the code has drifted significantly from what the agent expects.
Sandboxed Execution
Running agent-generated code is risky. The code might have bugs that cause infinite loops, consume excessive memory, or worse—contain security vulnerabilities. Sandboxed execution isolates code in a controlled environment where it can't harm the host system.
Docker Sandbox for Code Execution
Docker containers provide excellent isolation. Each code execution happens in a fresh container with:
- Limited memory and CPU
- No network access (can't phone home)
- Ephemeral filesystem (changes don't persist)
- Automatic cleanup (container is removed after execution)
import docker
import tempfile
import tarfile
import io
class CodeSandbox:
"""Sandboxed environment for running code."""
def __init__(
self,
image: str = "python:3.11-slim",
memory_limit: str = "512m",
cpu_limit: float = 1.0,
timeout: int = 60
):
self.image = image
self.memory_limit = memory_limit
self.cpu_limit = cpu_limit
self.timeout = timeout
self.client = docker.from_env()
def run_code(
self,
code: str,
language: str = "python",
requirements: list[str] = None
) -> dict:
"""Run code in sandbox."""
# Create container
container = self.client.containers.create(
self.image,
command="sleep infinity",
mem_limit=self.memory_limit,
cpu_quota=int(self.cpu_limit * 100000),
network_mode="none", # No network access
detach=True
)
try:
container.start()
# Install requirements if any
if requirements:
req_cmd = f"pip install {' '.join(requirements)}"
exit_code, output = container.exec_run(req_cmd)
if exit_code != 0:
return {
"success": False,
"error": f"Failed to install requirements: {output.decode()}"
}
# Copy code to container
self._copy_to_container(container, code, "/app/main.py")
# Run code
if language == "python":
cmd = "python /app/main.py"
elif language == "javascript":
cmd = "node /app/main.js"
else:
cmd = f"python /app/main.py"
exit_code, output = container.exec_run(
cmd,
workdir="/app",
demux=True
)
stdout = output[0].decode() if output[0] else ""
stderr = output[1].decode() if output[1] else ""
return {
"success": exit_code == 0,
"exit_code": exit_code,
"stdout": stdout,
"stderr": stderr
}
finally:
container.stop(timeout=1)
container.remove(force=True)
def run_tests(
self,
code: str,
test_code: str,
requirements: list[str] = None
) -> dict:
"""Run tests in sandbox."""
requirements = (requirements or []) + ["pytest"]
container = self.client.containers.create(
self.image,
command="sleep infinity",
mem_limit=self.memory_limit,
network_mode="none",
detach=True
)
try:
container.start()
# Install requirements
req_cmd = f"pip install {' '.join(requirements)}"
container.exec_run(req_cmd)
# Copy code files
self._copy_to_container(container, code, "/app/main.py")
self._copy_to_container(container, test_code, "/app/test_main.py")
# Run pytest
exit_code, output = container.exec_run(
"python -m pytest /app/test_main.py -v",
workdir="/app",
demux=True
)
stdout = output[0].decode() if output[0] else ""
stderr = output[1].decode() if output[1] else ""
return {
"success": exit_code == 0,
"output": stdout + stderr,
"passed": exit_code == 0
}
finally:
container.stop(timeout=1)
container.remove(force=True)
def _copy_to_container(self, container, content: str, path: str):
"""Copy content to container as a file."""
# Create tar archive
tar_stream = io.BytesIO()
with tarfile.open(fileobj=tar_stream, mode='w') as tar:
data = content.encode('utf-8')
tarinfo = tarfile.TarInfo(name=os.path.basename(path))
tarinfo.size = len(data)
tar.addfile(tarinfo, io.BytesIO(data))
tar_stream.seek(0)
container.put_archive(os.path.dirname(path), tar_stream)
Security constraints explained:
-
network_mode="none": The container has no network access. Generated code can't make HTTP requests, exfiltrate data, or download malicious payloads. -
mem_limit="512m": Memory is capped at 512MB. A runaway program can't consume all system memory. -
cpu_quota: CPU is limited to prevent the container from monopolizing compute resources.
The run_code method lifecycle:
- Create a container (but don't start it yet)
- Start the container in "sleep mode" (keeps it alive)
- Install any required packages
- Copy code into the container via tar archive
- Execute the code and capture output
- Stop and remove the container (always, even on error)
Why tar archive for file copy? Docker's put_archive API expects tar format. The _copy_to_container method creates an in-memory tar archive, which avoids creating temporary files on the host.
Alternative sandboxing approaches:
- gVisor/Firecracker: Lighter-weight isolation than full containers
- WebAssembly: Run code in WASM runtime (sandboxed by design)
- Separate VMs: Maximum isolation but highest overhead
Complete Coding Agent
Now we bring all the components together into a complete coding agent. The agent orchestrates the entire workflow: understanding → planning → implementing → testing → debugging, progressing through states until the task succeeds or fails.
Putting It All Together
The CodingAgent class is a state machine that moves through defined states. Each state has a handler method that performs work and determines the next state:
from dataclasses import dataclass
from typing import Optional, Callable
from enum import Enum
class AgentState(Enum):
UNDERSTANDING = "understanding"
PLANNING = "planning"
IMPLEMENTING = "implementing"
TESTING = "testing"
DEBUGGING = "debugging"
COMPLETE = "complete"
FAILED = "failed"
@dataclass
class AgentContext:
task: str
state: AgentState
plan: Optional[EditPlanResponse] = None
edits_made: list[dict] = None
test_results: list[TestResult] = None
iterations: int = 0
max_iterations: int = 10
class CodingAgent:
"""Complete coding agent implementation."""
def __init__(
self,
client,
project_root: str,
on_state_change: Callable[[AgentState, str], None] = None
):
self.client = client
self.project_root = project_root
self.on_state_change = on_state_change
# Initialize components
self.index = CodebaseIndex(project_root)
self.index.index()
self.semantic_search = SemanticCodeSearch()
self.semantic_search.index_codebase(self.index)
self.tools = CodingTools(self.index, self.semantic_search)
self.planner = EditPlanner(client, self.index)
self.generator = CodeGenerator(client)
self.editor = FileEditor()
self.runner = TestRunner(project_root)
self.debugger = IterativeDebugger(
client, self.generator, self.runner, self.editor
)
def execute_task(self, task: str) -> dict:
"""Execute a coding task end-to-end."""
context = AgentContext(
task=task,
state=AgentState.UNDERSTANDING,
edits_made=[],
test_results=[]
)
while context.state not in [AgentState.COMPLETE, AgentState.FAILED]:
context = self._step(context)
context.iterations += 1
if context.iterations >= context.max_iterations:
context.state = AgentState.FAILED
break
return {
"success": context.state == AgentState.COMPLETE,
"state": context.state.value,
"iterations": context.iterations,
"plan": context.plan.model_dump() if context.plan else None,
"edits": context.edits_made,
"test_results": [
{"passed": r.passed, "output": r.output[:500]}
for r in (context.test_results or [])
]
}
def _step(self, context: AgentContext) -> AgentContext:
"""Execute one step of the agent."""
self._notify_state(context.state, f"Processing: {context.task[:50]}...")
if context.state == AgentState.UNDERSTANDING:
return self._understand(context)
elif context.state == AgentState.PLANNING:
return self._plan(context)
elif context.state == AgentState.IMPLEMENTING:
return self._implement(context)
elif context.state == AgentState.TESTING:
return self._test(context)
elif context.state == AgentState.DEBUGGING:
return self._debug(context)
return context
def _understand(self, context: AgentContext) -> AgentContext:
"""Understand the task and gather context."""
# Search for relevant code
search_results = self.tools.search_code(context.task, top_k=10)
# Store context for planning
context.relevant_context = search_results
context.state = AgentState.PLANNING
return context
def _plan(self, context: AgentContext) -> AgentContext:
"""Create an edit plan."""
plan = self.planner.create_plan(
context.task,
context.relevant_context
)
# Validate plan
if not plan.edits:
context.state = AgentState.FAILED
return context
context.plan = plan
context.state = AgentState.IMPLEMENTING
return context
def _implement(self, context: AgentContext) -> AgentContext:
"""Implement the planned changes."""
for edit in context.plan.edits:
if edit.edit_type == "create":
# Generate new file content
content = self.generator.generate_function(
edit.description,
"",
context.relevant_context
)
result = self.editor.edit_file(
edit.file_path,
content,
create_if_missing=True
)
elif edit.edit_type == "modify":
# Read existing content and modify
current = self._read_file(edit.file_path)
modified = self._apply_modification(
current,
edit.description,
context.relevant_context
)
result = self.editor.edit_file(edit.file_path, modified)
context.edits_made.append({
"file": edit.file_path,
"type": edit.edit_type,
"success": result.get("success", False)
})
context.state = AgentState.TESTING
return context
def _test(self, context: AgentContext) -> AgentContext:
"""Run tests to validate changes."""
# Run specified tests or all tests
if context.plan.tests_to_run:
for test_path in context.plan.tests_to_run:
result = self.runner.run_pytest(test_path)
context.test_results.append(result)
else:
result = self.runner.run_pytest()
context.test_results.append(result)
# Check if all tests pass
all_passed = all(r.passed for r in context.test_results)
if all_passed:
context.state = AgentState.COMPLETE
else:
context.state = AgentState.DEBUGGING
return context
def _debug(self, context: AgentContext) -> AgentContext:
"""Debug failing tests."""
# Get the failing test info
last_result = context.test_results[-1]
if last_result.failed_tests:
# Try to fix the code
for edit in context.edits_made:
if edit["success"]:
debug_result = self.debugger.debug_until_passing(
edit["file"]
)
if debug_result["success"]:
context.state = AgentState.TESTING
return context
# If debugging failed too many times
if context.iterations > 5:
context.state = AgentState.FAILED
else:
context.state = AgentState.TESTING
return context
def _apply_modification(
self,
current: str,
description: str,
context: str
) -> str:
"""Apply a modification to existing code."""
prompt = f"""Modify this code according to the description.
Current code:
{current}
Modification needed:
{description}
Context:
{context}
Return the complete modified code."""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}]
)
return self.generator._extract_code(
response.choices[0].message.content,
"python"
)
def _read_file(self, path: str) -> str:
with open(path, "r") as f:
return f.read()
def _notify_state(self, state: AgentState, message: str):
if self.on_state_change:
self.on_state_change(state, message)
The state machine pattern explained:
-
UNDERSTANDING: Search the codebase for relevant context. This ensures the agent knows what code already exists before proposing changes.
-
PLANNING: Create an edit plan based on the task and context. The plan specifies which files to modify and how.
-
IMPLEMENTING: Execute the plan by generating/modifying files. Each edit is tracked for potential rollback.
-
TESTING: Run tests to verify changes work. Success → COMPLETE, failure → DEBUGGING.
-
DEBUGGING: Fix failing tests. After debugging, return to TESTING to verify the fix.
-
COMPLETE/FAILED: Terminal states. The agent reports success or failure with details.
Why a state machine? The explicit states make the agent's behavior transparent and debuggable. You can pause at any state for human review, resume from a checkpoint, or customize behavior per-state.
The on_state_change callback: This optional callback is called whenever the agent transitions states. Use it for logging, progress UI, or triggering external systems (e.g., notify Slack when implementation starts).
Maximum iterations: The max_iterations limit prevents infinite loops. If the agent cycles through TESTING → DEBUGGING → TESTING too many times, it gives up. This is essential for production reliability.
Code Review Agent
Beyond writing code, agents can review code. A code review agent examines changes for bugs, security issues, style problems, and maintainability concerns. This is valuable for both agent-generated code (self-review) and human code (automated PR review).
Building an agent that reviews code changes automatically.
Code Review Framework
from pydantic import BaseModel, Field
from typing import Literal
from enum import Enum
class SeverityLevel(str, Enum):
CRITICAL = "critical" # Security issues, data loss risks
HIGH = "high" # Bugs, performance problems
MEDIUM = "medium" # Code quality, maintainability
LOW = "low" # Style, suggestions
INFO = "info" # Informational comments
class ReviewComment(BaseModel):
file_path: str
line_start: int
line_end: Optional[int] = None
severity: SeverityLevel
category: Literal["security", "bug", "performance", "style", "maintainability", "testing"]
title: str
description: str
suggested_fix: Optional[str] = None
confidence: float = Field(..., ge=0, le=1)
class CodeReviewResult(BaseModel):
summary: str
overall_quality: Literal["excellent", "good", "acceptable", "needs_work", "reject"]
comments: list[ReviewComment]
suggested_improvements: list[str]
security_concerns: list[str]
test_coverage_assessment: str
class CodeReviewAgent:
"""Automated code review agent."""
def __init__(self, client, codebase_index: CodebaseIndex):
self.client = client
self.index = codebase_index
def review_diff(self, diff: str, context: str = "") -> CodeReviewResult:
"""Review a git diff."""
system_prompt = """You are a senior software engineer conducting a code review.
Review the provided diff carefully, looking for:
1. **Security issues**: SQL injection, XSS, command injection, hardcoded secrets, insecure crypto
2. **Bugs**: Logic errors, off-by-one errors, null pointer issues, race conditions
3. **Performance**: N+1 queries, unnecessary allocations, blocking operations, inefficient algorithms
4. **Code quality**: DRY violations, unclear naming, missing error handling, tight coupling
5. **Testing**: Missing tests, inadequate coverage, flaky test patterns
6. **Documentation**: Missing docstrings, outdated comments, unclear intent
Be specific and actionable. Reference exact line numbers. Suggest fixes when possible."""
user_prompt = f"""Review this code change:
```diff
{diff}
{f"Additional context: {context}" if context else ""}
Provide a thorough code review."""
review = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_model=CodeReviewResult
)
return review
def review_file(self, file_path: str) -> CodeReviewResult:
"""Review an entire file."""
if file_path not in self.index.files:
raise ValueError(f"File not found: {file_path}")
code_file = self.index.files[file_path]
system_prompt = f"""You are a senior {code_file.language} engineer reviewing code.
Analyze the code for:
- Security vulnerabilities
- Potential bugs
- Performance issues
- Code quality and maintainability
- Missing tests or documentation
Be constructive and specific."""
user_prompt = f"""Review this {code_file.language} file:
File: {file_path}
{code_file.content}
```"""
return self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_model=CodeReviewResult
)
def review_pull_request(
self,
pr_diff: str,
pr_description: str,
changed_files: list[str]
) -> CodeReviewResult:
"""Comprehensive PR review."""
# Gather context for changed files
context_parts = []
for file_path in changed_files[:10]: # Limit to prevent context overflow
if file_path in self.index.files:
file = self.index.files[file_path]
imports = ", ".join(file.imports[:10])
symbols = ", ".join(s.name for s in file.symbols[:10])
context_parts.append(
f"File: {file_path}\nImports: {imports}\nSymbols: {symbols}"
)
context = "\n\n".join(context_parts)
system_prompt = """You are a senior engineer reviewing a pull request.
Consider:
1. Does the change accomplish its stated goal?
2. Are there any security, performance, or reliability concerns?
3. Is the code well-tested?
4. Does it follow project conventions?
5. Are there edge cases not handled?
6. Is the change scope appropriate (not too large)?
Provide actionable feedback."""
user_prompt = f"""Review this pull request:
## PR Description
{pr_description}
## Changed Files Context
{context}
## Diff
```diff
{pr_diff}
Provide a comprehensive review."""
return self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
],
response_model=CodeReviewResult
)
def check_security(self, code: str, language: str) -> list[ReviewComment]:
"""Focused security review."""
security_patterns = {
"python": [
("SQL injection", r"execute\s*\(.*%.*\)", "Use parameterized queries"),
("Command injection", r"os\.system|subprocess\.call.*shell=True", "Use subprocess with shell=False"),
("Hardcoded secret", r"(password|api_key|secret)\s*=\s*['\"][^'\"]+['\"]", "Use environment variables"),
("Pickle deserialization", r"pickle\.loads?", "Avoid pickle for untrusted data"),
("Eval usage", r"\beval\s*\(", "Avoid eval, use ast.literal_eval if needed"),
],
"javascript": [
("XSS", r"innerHTML\s*=|dangerouslySetInnerHTML", "Sanitize HTML or use textContent"),
("Eval usage", r"\beval\s*\(", "Avoid eval"),
("Hardcoded secret", r"(password|apiKey|secret)\s*[:=]\s*['\"][^'\"]+['\"]", "Use environment variables"),
("SQL injection", r"query\s*\(.*\$\{", "Use parameterized queries"),
]
}
comments = []
patterns = security_patterns.get(language, [])
for i, line in enumerate(code.split("\n"), 1):
for name, pattern, fix in patterns:
if re.search(pattern, line, re.IGNORECASE):
comments.append(ReviewComment(
file_path="<inline>",
line_start=i,
severity=SeverityLevel.CRITICAL,
category="security",
title=f"Potential {name}",
description=f"Line contains pattern that may indicate {name.lower()}",
suggested_fix=fix,
confidence=0.7
))
return comments
class IncrementalReviewer: """Review code incrementally as it's written."""
def __init__(self, review_agent: CodeReviewAgent):
self.agent = review_agent
self.previous_content: dict[str, str] = {}
self.pending_comments: list[ReviewComment] = []
def on_file_change(self, file_path: str, new_content: str) -> list[ReviewComment]:
"""Called when a file changes. Returns new comments."""
old_content = self.previous_content.get(file_path, "")
self.previous_content[file_path] = new_content
if not old_content:
# New file - full review
result = self.agent.review_file(file_path)
return result.comments
# Generate diff
diff = self._generate_diff(old_content, new_content, file_path)
if not diff.strip():
return []
# Review just the changes
result = self.agent.review_diff(diff)
return result.comments
def _generate_diff(self, old: str, new: str, file_path: str) -> str:
import difflib
diff = difflib.unified_diff(
old.splitlines(keepends=True),
new.splitlines(keepends=True),
fromfile=f"a/{file_path}",
tofile=f"b/{file_path}"
)
return "".join(diff)
**Severity levels guide action:**
- **CRITICAL**: Must be fixed before merge (security vulnerabilities, data loss risks)
- **HIGH**: Should be fixed (bugs, performance problems)
- **MEDIUM**: Worth discussing (code quality, maintainability)
- **LOW**: Nice to have (style suggestions)
- **INFO**: FYI only (informational comments)
**Structured output for reviews:** The `CodeReviewResult` model ensures reviews are consistent and machine-parseable. This enables automation: auto-approve if all issues are LOW/INFO, require changes if any CRITICAL/HIGH issues exist.
**Security pattern matching:** The `check_security` method uses regex patterns to catch common vulnerabilities. This is a quick first pass—obvious issues like `eval()` or hardcoded passwords are caught immediately. The LLM review catches more subtle issues.
**Incremental reviewing:** The `IncrementalReviewer` class reviews changes as they happen, not just at commit time. This is useful for IDE integrations where you want real-time feedback. It tracks previous file content to generate diffs and only reviews the changed parts.
### Automated PR Comments
The code review becomes truly useful when integrated with GitHub. The `GitHubIntegration` class fetches PR details, runs the review, and posts structured feedback as GitHub review comments:
```python
class GitHubIntegration:
"""Integrate code review with GitHub PRs."""
def __init__(self, github_token: str, review_agent: CodeReviewAgent):
self.token = github_token
self.agent = review_agent
self.headers = {
"Authorization": f"token {github_token}",
"Accept": "application/vnd.github.v3+json"
}
async def review_pr(self, owner: str, repo: str, pr_number: int):
"""Review a PR and post comments."""
import aiohttp
async with aiohttp.ClientSession(headers=self.headers) as session:
# Get PR details
pr_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}"
async with session.get(pr_url) as resp:
pr_data = await resp.json()
# Get diff
diff_url = f"{pr_url}.diff"
async with session.get(diff_url) as resp:
diff = await resp.text()
# Get changed files
files_url = f"{pr_url}/files"
async with session.get(files_url) as resp:
files_data = await resp.json()
changed_files = [f["filename"] for f in files_data]
# Run review
review = self.agent.review_pull_request(
pr_diff=diff,
pr_description=pr_data.get("body", ""),
changed_files=changed_files
)
# Post review
await self._post_review(session, owner, repo, pr_number, review)
async def _post_review(
self,
session: "aiohttp.ClientSession",
owner: str,
repo: str,
pr_number: int,
review: CodeReviewResult
):
"""Post review comments to GitHub."""
# Build review body
body = f"## Automated Code Review\n\n{review.summary}\n\n"
body += f"**Overall Quality**: {review.overall_quality}\n\n"
if review.security_concerns:
body += "### Security Concerns\n"
for concern in review.security_concerns:
body += f"- ⚠️ {concern}\n"
body += "\n"
if review.suggested_improvements:
body += "### Suggested Improvements\n"
for improvement in review.suggested_improvements:
body += f"- {improvement}\n"
# Determine review action
event = "COMMENT"
if review.overall_quality == "reject":
event = "REQUEST_CHANGES"
elif review.overall_quality in ["excellent", "good"]:
event = "APPROVE"
# Build inline comments
comments = []
for comment in review.comments:
if comment.severity in [SeverityLevel.CRITICAL, SeverityLevel.HIGH]:
comments.append({
"path": comment.file_path,
"line": comment.line_start,
"body": f"**{comment.severity.value.upper()}**: {comment.title}\n\n{comment.description}"
+ (f"\n\n**Suggested fix**: {comment.suggested_fix}" if comment.suggested_fix else "")
})
# Submit review
review_url = f"https://api.github.com/repos/{owner}/{repo}/pulls/{pr_number}/reviews"
payload = {
"body": body,
"event": event,
"comments": comments[:50] # GitHub limit
}
async with session.post(review_url, json=payload) as resp:
return await resp.json()
The GitHub workflow:
-
Fetch PR data: Get the PR description, diff, and list of changed files using GitHub's API.
-
Build context: For each changed file, gather relevant metadata (imports, symbols) to help the reviewer understand the changes.
-
Run review: Pass the diff and context to the code review agent. The agent returns structured feedback.
-
Post review: Convert the
CodeReviewResultinto GitHub's review format—a main body with inline comments on specific lines.
Review actions: The code maps overall quality to GitHub review events:
excellent/good→APPROVEacceptable/needs_work→COMMENTreject→REQUEST_CHANGES
Comment limit: GitHub limits inline comments to 50 per review. For PRs with many issues, we prioritize CRITICAL and HIGH severity comments.
Pull Request Generation
Coding agents can go beyond editing files—they can create complete pull requests with appropriate titles, descriptions, and commit messages. This automates the entire "task → PR" workflow:
Automatically generate PRs from task descriptions.
PR Generator
from dataclasses import dataclass
from typing import Optional
import subprocess
@dataclass
class PRContent:
title: str
body: str
branch_name: str
files_changed: list[str]
commits: list[str]
class PRGenerator:
"""Generate pull requests from code changes."""
def __init__(self, client, project_root: str):
self.client = client
self.project_root = project_root
def generate_pr(
self,
task: str,
changes: list[dict], # [{file_path, old_content, new_content}]
base_branch: str = "main"
) -> PRContent:
"""Generate a complete PR from changes."""
# Create branch name
branch_name = self._generate_branch_name(task)
# Generate commit messages
commits = self._generate_commits(changes)
# Generate PR title and body
title, body = self._generate_pr_content(task, changes, commits)
return PRContent(
title=title,
body=body,
branch_name=branch_name,
files_changed=[c["file_path"] for c in changes],
commits=commits
)
def _generate_branch_name(self, task: str) -> str:
"""Generate a descriptive branch name."""
prompt = f"""Generate a git branch name for this task:
Task: {task}
Rules:
- Use lowercase with hyphens
- Max 50 characters
- Format: type/short-description (e.g., feature/add-auth, fix/login-bug)
- No special characters
Return only the branch name."""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=50
)
branch = response.choices[0].message.content.strip()
# Sanitize
branch = re.sub(r"[^a-z0-9/-]", "-", branch.lower())
return branch[:50]
def _generate_commits(self, changes: list[dict]) -> list[str]:
"""Generate commit messages for changes."""
commits = []
for change in changes:
diff = self._generate_diff(
change.get("old_content", ""),
change["new_content"],
change["file_path"]
)
prompt = f"""Generate a git commit message for this change:
File: {change['file_path']}
Diff:
```diff
{diff[:2000]}
Rules:
- Use conventional commit format: type(scope): description
- Types: feat, fix, refactor, docs, test, chore
- Keep under 72 characters
- Be specific about what changed
Return only the commit message."""
response = self.client.chat.completions.create(
model="gpt-4o-mini",
messages=[{"role": "user", "content": prompt}],
max_tokens=100
)
commits.append(response.choices[0].message.content.strip())
return commits
def _generate_pr_content(
self,
task: str,
changes: list[dict],
commits: list[str]
) -> tuple[str, str]:
"""Generate PR title and body."""
changes_summary = []
for change in changes:
diff = self._generate_diff(
change.get("old_content", ""),
change["new_content"],
change["file_path"]
)
changes_summary.append(f"### {change['file_path']}\n```diff\n{diff[:500]}\n```")
changes_text = "\n\n".join(changes_summary)
commits_text = "\n".join(f"- {c}" for c in commits)
prompt = f"""Generate a pull request title and description.
Task: {task}
Commits: {commits_text}
Changes Summary: {changes_text}
Generate:
- A clear, concise PR title (max 72 chars)
- A detailed PR description with:
- Summary of changes
- Motivation/context
- Testing done (suggest tests if none exist)
- Any breaking changes or migration notes
Format: TITLE:
BODY:
""" response = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": prompt}],
max_tokens=1000
)
content = response.choices[0].message.content
# Parse response
title_match = re.search(r"TITLE:\s*(.+)", content)
title = title_match.group(1).strip() if title_match else task[:72]
body_match = re.search(r"BODY:\s*(.+)", content, re.DOTALL)
body = body_match.group(1).strip() if body_match else ""
return title, body
def _generate_diff(self, old: str, new: str, file_path: str) -> str:
import difflib
diff = difflib.unified_diff(
old.splitlines(keepends=True),
new.splitlines(keepends=True),
fromfile=f"a/{file_path}",
tofile=f"b/{file_path}"
)
return "".join(diff)
def create_and_push_pr(
self,
pr_content: PRContent,
changes: list[dict],
base_branch: str = "main"
) -> str:
"""Create branch, commit changes, push, and create PR."""
# Create and checkout branch
self._run_git(["checkout", "-b", pr_content.branch_name])
try:
# Apply changes and commit
for change, commit_msg in zip(changes, pr_content.commits):
# Write file
file_path = os.path.join(self.project_root, change["file_path"])
os.makedirs(os.path.dirname(file_path), exist_ok=True)
with open(file_path, "w") as f:
f.write(change["new_content"])
# Stage and commit
self._run_git(["add", change["file_path"]])
self._run_git(["commit", "-m", commit_msg])
# Push branch
self._run_git(["push", "-u", "origin", pr_content.branch_name])
# Create PR using gh CLI
result = subprocess.run(
[
"gh", "pr", "create",
"--title", pr_content.title,
"--body", pr_content.body,
"--base", base_branch
],
cwd=self.project_root,
capture_output=True,
text=True
)
if result.returncode == 0:
# Extract PR URL from output
return result.stdout.strip()
else:
raise Exception(f"Failed to create PR: {result.stderr}")
finally:
# Return to original branch
self._run_git(["checkout", base_branch])
def _run_git(self, args: list[str]) -> str:
result = subprocess.run(
["git"] + args,
cwd=self.project_root,
capture_output=True,
text=True
)
if result.returncode != 0:
raise Exception(f"Git command failed: {result.stderr}")
return result.stdout
**PR content generation:**
1. **Branch name**: Generated from the task description using conventional prefixes (`feature/`, `fix/`, `refactor/`). Sanitized to remove special characters.
2. **Commit messages**: Each file change gets its own commit message in conventional commit format. The LLM analyzes the diff to determine the appropriate type and description.
3. **PR title and body**: The title summarizes the change in 72 characters. The body includes motivation, change summary, and testing notes.
**The `create_and_push_pr` workflow:**
1. Create a new branch from the task name
2. For each change, write the file, stage it, and commit
3. Push the branch with tracking (`-u origin`)
4. Use the `gh` CLI to create the actual PR
5. Return to the original branch (clean up)
**Error handling:** The `finally` block ensures we return to the original branch even if something fails. This prevents leaving the repository in a dirty state.
**Why use `gh` CLI?** GitHub's CLI handles authentication and API details. It's simpler than making direct API calls and respects existing GitHub credentials.
---
## Multi-Language Support
Real-world projects use multiple languages. A Python backend might have a TypeScript frontend, tests in both languages, and shell scripts for deployment. A production coding agent needs to handle this diversity.
Handling different programming languages effectively.
### Language-Specific Parsers
```python
from abc import ABC, abstractmethod
from dataclasses import dataclass
from typing import Optional
import subprocess
@dataclass
class LanguageFeatures:
name: str
extensions: list[str]
comment_single: str
comment_multi: tuple[str, str]
string_delimiters: list[str]
has_types: bool
test_framework: str
package_manager: str
LANGUAGE_CONFIGS = {
"python": LanguageFeatures(
name="Python",
extensions=[".py", ".pyi"],
comment_single="#",
comment_multi=('"""', '"""'),
string_delimiters=['"', "'", '"""', "'''"],
has_types=True, # Optional typing
test_framework="pytest",
package_manager="pip"
),
"typescript": LanguageFeatures(
name="TypeScript",
extensions=[".ts", ".tsx"],
comment_single="//",
comment_multi=("/*", "*/"),
string_delimiters=['"', "'", "`"],
has_types=True,
test_framework="jest",
package_manager="npm"
),
"javascript": LanguageFeatures(
name="JavaScript",
extensions=[".js", ".jsx", ".mjs"],
comment_single="//",
comment_multi=("/*", "*/"),
string_delimiters=['"', "'", "`"],
has_types=False,
test_framework="jest",
package_manager="npm"
),
"go": LanguageFeatures(
name="Go",
extensions=[".go"],
comment_single="//",
comment_multi=("/*", "*/"),
string_delimiters=['"', "`"],
has_types=True,
test_framework="go test",
package_manager="go mod"
),
"rust": LanguageFeatures(
name="Rust",
extensions=[".rs"],
comment_single="//",
comment_multi=("/*", "*/"),
string_delimiters=['"'],
has_types=True,
test_framework="cargo test",
package_manager="cargo"
),
"java": LanguageFeatures(
name="Java",
extensions=[".java"],
comment_single="//",
comment_multi=("/*", "*/"),
string_delimiters=['"'],
has_types=True,
test_framework="junit",
package_manager="maven"
),
}
class LanguageParser(ABC):
"""Abstract base for language-specific parsing."""
@abstractmethod
def extract_symbols(self, content: str, file_path: str) -> list[CodeSymbol]:
pass
@abstractmethod
def extract_imports(self, content: str) -> list[str]:
pass
@abstractmethod
def get_function_at_line(self, content: str, line: int) -> Optional[CodeSymbol]:
pass
class TreeSitterParser(LanguageParser):
"""Universal parser using tree-sitter."""
def __init__(self, language: str):
self.language = language
self._setup_parser()
def _setup_parser(self):
"""Initialize tree-sitter parser for language."""
try:
import tree_sitter_languages
self.parser = tree_sitter_languages.get_parser(self.language)
self.tree_language = tree_sitter_languages.get_language(self.language)
except ImportError:
self.parser = None
def extract_symbols(self, content: str, file_path: str) -> list[CodeSymbol]:
if not self.parser:
return []
tree = self.parser.parse(bytes(content, "utf8"))
symbols = []
# Language-specific queries
queries = self._get_symbol_queries()
for query_name, query_string in queries.items():
try:
query = self.tree_language.query(query_string)
captures = query.captures(tree.root_node)
for node, capture_name in captures:
if capture_name == "name":
symbols.append(CodeSymbol(
name=content[node.start_byte:node.end_byte],
kind=query_name,
file_path=file_path,
line_start=node.start_point[0] + 1,
line_end=node.end_point[0] + 1
))
except Exception:
continue
return symbols
def _get_symbol_queries(self) -> dict[str, str]:
"""Get tree-sitter queries for symbol extraction."""
queries = {
"python": {
"function": "(function_definition name: (identifier) @name)",
"class": "(class_definition name: (identifier) @name)",
"method": "(function_definition name: (identifier) @name)",
},
"typescript": {
"function": "(function_declaration name: (identifier) @name)",
"class": "(class_declaration name: (type_identifier) @name)",
"method": "(method_definition name: (property_identifier) @name)",
"interface": "(interface_declaration name: (type_identifier) @name)",
},
"go": {
"function": "(function_declaration name: (identifier) @name)",
"method": "(method_declaration name: (field_identifier) @name)",
"type": "(type_declaration (type_spec name: (type_identifier) @name))",
},
"rust": {
"function": "(function_item name: (identifier) @name)",
"struct": "(struct_item name: (type_identifier) @name)",
"impl": "(impl_item type: (type_identifier) @name)",
"trait": "(trait_item name: (type_identifier) @name)",
},
}
return queries.get(self.language, {})
def extract_imports(self, content: str) -> list[str]:
if not self.parser:
return []
tree = self.parser.parse(bytes(content, "utf8"))
imports = []
import_queries = {
"python": "(import_statement) @import (import_from_statement) @import",
"typescript": "(import_statement) @import",
"javascript": "(import_statement) @import",
"go": "(import_declaration) @import",
"rust": "(use_declaration) @import",
}
query_string = import_queries.get(self.language)
if not query_string:
return []
try:
query = self.tree_language.query(query_string)
captures = query.captures(tree.root_node)
for node, _ in captures:
imports.append(content[node.start_byte:node.end_byte])
except Exception:
pass
return imports
def get_function_at_line(self, content: str, line: int) -> Optional[CodeSymbol]:
symbols = self.extract_symbols(content, "<inline>")
for symbol in symbols:
if symbol.line_start <= line <= symbol.line_end:
return symbol
return None
class MultiLanguageCodeAgent:
"""Coding agent with multi-language support."""
def __init__(self, client, project_root: str):
self.client = client
self.project_root = project_root
self.parsers: dict[str, LanguageParser] = {}
# Initialize parsers for detected languages
self._detect_and_setup_languages()
def _detect_and_setup_languages(self):
"""Detect project languages and set up parsers."""
detected = set()
for root, _, files in os.walk(self.project_root):
if any(ignore in root for ignore in ["node_modules", ".git", "venv"]):
continue
for file in files:
ext = os.path.splitext(file)[1]
for lang, config in LANGUAGE_CONFIGS.items():
if ext in config.extensions:
detected.add(lang)
for lang in detected:
self.parsers[lang] = TreeSitterParser(lang)
def get_language(self, file_path: str) -> Optional[str]:
"""Determine language from file path."""
ext = os.path.splitext(file_path)[1]
for lang, config in LANGUAGE_CONFIGS.items():
if ext in config.extensions:
return lang
return None
def generate_code(
self,
task: str,
language: str,
context: str = ""
) -> str:
"""Generate code in the specified language."""
config = LANGUAGE_CONFIGS.get(language)
if not config:
raise ValueError(f"Unsupported language: {language}")
system_prompt = f"""You are an expert {config.name} developer.
Language-specific guidelines:
- Use {config.name} idioms and best practices
- {"Include type annotations" if config.has_types else "Use JSDoc for documentation"}
- Follow the project's existing style
- Handle errors appropriately for {config.name}
- Write tests using {config.test_framework}
Return only the code, no explanations."""
user_prompt = f"""Write {config.name} code for this task:
Task: {task}
Context:
{context}
Generate clean, production-ready code."""
response = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": user_prompt}
]
)
return self._extract_code(response.choices[0].message.content, language)
def run_tests(self, language: str, test_path: str = None) -> dict:
"""Run tests for a specific language."""
config = LANGUAGE_CONFIGS.get(language)
if not config:
return {"success": False, "error": f"Unknown language: {language}"}
commands = {
"python": ["python", "-m", "pytest", "-v"],
"typescript": ["npx", "jest", "--verbose"],
"javascript": ["npx", "jest", "--verbose"],
"go": ["go", "test", "-v", "./..."],
"rust": ["cargo", "test"],
"java": ["mvn", "test"],
}
cmd = commands.get(language, [])
if not cmd:
return {"success": False, "error": f"No test command for {language}"}
if test_path:
cmd.append(test_path)
try:
result = subprocess.run(
cmd,
cwd=self.project_root,
capture_output=True,
text=True,
timeout=300
)
return {
"success": result.returncode == 0,
"output": result.stdout,
"error": result.stderr
}
except Exception as e:
return {"success": False, "error": str(e)}
def install_dependencies(self, language: str, packages: list[str]) -> dict:
"""Install dependencies for a language."""
config = LANGUAGE_CONFIGS.get(language)
if not config:
return {"success": False, "error": f"Unknown language: {language}"}
commands = {
"python": ["pip", "install"] + packages,
"typescript": ["npm", "install"] + packages,
"javascript": ["npm", "install"] + packages,
"go": ["go", "get"] + packages,
"rust": ["cargo", "add"] + packages,
}
cmd = commands.get(language)
if not cmd:
return {"success": False, "error": f"No package manager for {language}"}
try:
result = subprocess.run(
cmd,
cwd=self.project_root,
capture_output=True,
text=True,
timeout=120
)
return {
"success": result.returncode == 0,
"output": result.stdout
}
except Exception as e:
return {"success": False, "error": str(e)}
def _extract_code(self, text: str, language: str) -> str:
"""Extract code from markdown response."""
if "```" in text:
parts = text.split("```")
for i, part in enumerate(parts):
if i % 2 == 1: # Code block
# Remove language identifier
lines = part.split("\n")
if lines[0].strip().lower() in [language, "ts", "js", "py"]:
return "\n".join(lines[1:]).strip()
return part.strip()
return text.strip()
Language configurations: The LANGUAGE_CONFIGS dictionary captures essential metadata for each language: file extensions, comment syntax, string delimiters, type system presence, test framework, and package manager. This lets the agent adapt its behavior automatically.
Tree-sitter for parsing: Tree-sitter is a parser generator that creates fast, incremental parsers. The TreeSitterParser class uses tree-sitter to extract symbols from any supported language using declarative queries. This is more robust than regex-based parsing.
Query-based symbol extraction: Each language has queries that match its syntax:
- Python:
(function_definition name: (identifier) @name) - TypeScript:
(function_declaration name: (identifier) @name) - Rust:
(function_item name: (identifier) @name)
The query syntax is consistent, but the node types vary by language grammar.
The MultiLanguageCodeAgent: This class detects which languages are present in a project and sets up appropriate parsers. It provides unified methods for generating code, running tests, and installing dependencies that dispatch to the right language-specific tools.
Language-Aware Context Building
When building context for code generation, we need to understand cross-file relationships. The LanguageAwareContext class resolves imports to actual files, finds related code, and includes relevant configuration files:
class LanguageAwareContext:
"""Build context that respects language-specific patterns."""
def __init__(self, index: CodebaseIndex, parsers: dict[str, LanguageParser]):
self.index = index
self.parsers = parsers
def build_context_for_task(
self,
task: str,
target_file: str,
max_tokens: int = 8000
) -> str:
"""Build relevant context for a coding task."""
language = self._get_language(target_file)
if not language:
return ""
context_parts = []
current_tokens = 0
# 1. Get the target file content
if target_file in self.index.files:
file_content = self.index.files[target_file].content
context_parts.append(f"=== Target File: {target_file} ===\n{file_content}")
current_tokens += len(file_content) // 4
# 2. Find related files by imports
related_by_imports = self._find_related_by_imports(target_file, language)
for related_file in related_by_imports[:5]:
if current_tokens >= max_tokens:
break
if related_file in self.index.files:
content = self.index.files[related_file].content
context_parts.append(f"=== Related (import): {related_file} ===\n{content[:2000]}")
current_tokens += len(content[:2000]) // 4
# 3. Find files with similar symbols
if target_file in self.index.files:
symbols = self.index.files[target_file].symbols
for symbol in symbols[:3]:
refs = self.index.find_references(symbol.name)
for ref_file, _ in refs[:3]:
if ref_file != target_file and current_tokens < max_tokens:
if ref_file in self.index.files:
content = self.index.files[ref_file].content
context_parts.append(
f"=== Uses {symbol.name}: {ref_file} ===\n{content[:1500]}"
)
current_tokens += len(content[:1500]) // 4
# 4. Add project configuration context
config_files = self._get_project_configs(language)
for config_file in config_files:
if current_tokens >= max_tokens:
break
full_path = os.path.join(self.index.root_path, config_file)
if os.path.exists(full_path):
try:
with open(full_path) as f:
content = f.read()
context_parts.append(f"=== Config: {config_file} ===\n{content[:1000]}")
current_tokens += len(content[:1000]) // 4
except:
pass
return "\n\n".join(context_parts)
def _find_related_by_imports(self, file_path: str, language: str) -> list[str]:
"""Find files that import or are imported by target."""
related = []
if file_path not in self.index.files:
return related
file = self.index.files[file_path]
# Files this file imports
for imp in file.imports:
# Resolve import to file path
resolved = self._resolve_import(imp, language, file_path)
if resolved:
related.append(resolved)
# Files that import this file
target_module = self._file_to_module(file_path, language)
for other_path, other_file in self.index.files.items():
if target_module in other_file.imports:
related.append(other_path)
return list(set(related))
def _resolve_import(self, import_path: str, language: str, from_file: str) -> Optional[str]:
"""Resolve an import to a file path."""
# Simplified - would need language-specific logic
base_dir = os.path.dirname(from_file)
candidates = []
if language == "python":
# Try relative import
parts = import_path.split(".")
candidates.append(os.path.join(base_dir, *parts) + ".py")
candidates.append(os.path.join(self.index.root_path, *parts) + ".py")
elif language in ["typescript", "javascript"]:
candidates.append(os.path.join(base_dir, import_path) + ".ts")
candidates.append(os.path.join(base_dir, import_path) + ".tsx")
candidates.append(os.path.join(base_dir, import_path, "index.ts"))
for candidate in candidates:
if candidate in self.index.files:
return candidate
return None
def _file_to_module(self, file_path: str, language: str) -> str:
"""Convert file path to module name."""
relative = os.path.relpath(file_path, self.index.root_path)
if language == "python":
return relative.replace("/", ".").replace(".py", "")
return relative
def _get_language(self, file_path: str) -> Optional[str]:
ext = os.path.splitext(file_path)[1]
for lang, config in LANGUAGE_CONFIGS.items():
if ext in config.extensions:
return lang
return None
def _get_project_configs(self, language: str) -> list[str]:
"""Get relevant config files for a language."""
configs = {
"python": ["pyproject.toml", "setup.py", "requirements.txt", "setup.cfg"],
"typescript": ["package.json", "tsconfig.json", ".eslintrc.js"],
"javascript": ["package.json", ".eslintrc.js", "babel.config.js"],
"go": ["go.mod", "go.sum"],
"rust": ["Cargo.toml"],
"java": ["pom.xml", "build.gradle"],
}
return configs.get(language, [])
Context building strategy:
-
Target file first: Always include the file being modified. This is the primary context.
-
Import resolution: Find files that the target imports or that import the target. These are directly related and likely to be affected by changes.
-
Symbol references: For each symbol in the target file, find where else it's used. Changes to a function signature need to update all callers.
-
Configuration files: Include relevant config files (package.json, tsconfig.json, etc.) so the agent understands project settings.
Token budget management: The max_tokens parameter limits context size. We prioritize the most relevant context (target file, imports) and truncate less important context to fit within the budget.
Import resolution is language-specific: Python imports like from utils.helpers import foo resolve differently than TypeScript imports like import { foo } from './utils/helpers'. The _resolve_import method handles these differences.
Production Safety
Coding agents that modify files are inherently risky. A bug could delete important code, introduce security vulnerabilities, or corrupt the repository. Production safety is about minimizing these risks through checks, limits, and human oversight.
Safety Checks
The SafetyChecker class implements pattern-based detection of dangerous code and sensitive file paths. It's a defense-in-depth measure—even if the LLM produces unsafe code, the checker blocks it before execution:
class SafetyChecker:
"""Safety checks for coding agent operations."""
DANGEROUS_PATTERNS = [
r"os\.system\(",
r"subprocess\.call\(",
r"eval\(",
r"exec\(",
r"__import__\(",
r"rm\s+-rf",
r"sudo\s+",
r"chmod\s+777",
r"curl\s+.*\|\s*sh",
r"wget\s+.*\|\s*sh",
]
SENSITIVE_PATHS = [
"/etc/",
"/root/",
"~/.ssh/",
".env",
"credentials",
"secrets",
".git/config",
]
def check_code(self, code: str) -> tuple[bool, list[str]]:
"""Check code for dangerous patterns."""
issues = []
for pattern in self.DANGEROUS_PATTERNS:
if re.search(pattern, code, re.IGNORECASE):
issues.append(f"Dangerous pattern detected: {pattern}")
return len(issues) == 0, issues
def check_file_access(self, path: str) -> tuple[bool, str]:
"""Check if file access is safe."""
path = os.path.abspath(path)
for sensitive in self.SENSITIVE_PATHS:
if sensitive in path:
return False, f"Access to sensitive path blocked: {path}"
return True, "OK"
def check_edit(self, file_path: str, new_content: str) -> tuple[bool, list[str]]:
"""Check if an edit is safe."""
issues = []
# Check path
path_safe, path_msg = self.check_file_access(file_path)
if not path_safe:
issues.append(path_msg)
# Check content
code_safe, code_issues = self.check_code(new_content)
issues.extend(code_issues)
return len(issues) == 0, issues
Dangerous patterns blocked:
- Shell execution:
os.system,subprocess.callwithshell=Truecan run arbitrary commands - Code execution:
eval,exec,__import__can execute arbitrary Python - Destructive commands:
rm -rf,sudo,chmod 777can damage the system - Remote code execution:
curl | sh,wget | shdownload and run unknown code
Sensitive paths protected:
- System directories (
/etc/,/root/) - SSH keys (
~/.ssh/) - Environment files (
.env) - Git configuration (
.git/config)
Layered checking: The check_edit method combines path checking and code checking. Both must pass for an edit to proceed. This prevents attacks that try to write dangerous code to a safe path or safe code to a dangerous path.
Human-in-the-Loop
Even with safety checks, some operations should require human approval. The HumanApproval class provides a framework for gating sensitive operations:
class HumanApproval:
"""Require human approval for sensitive operations."""
def __init__(self, approval_callback: Callable[[str, str], bool]):
self.approval_callback = approval_callback
self.auto_approve_patterns = []
def request_approval(
self,
operation: str,
details: str,
risk_level: str = "medium"
) -> bool:
"""Request human approval for an operation."""
# Auto-approve low-risk operations
if risk_level == "low":
return True
# Check auto-approve patterns
for pattern in self.auto_approve_patterns:
if re.match(pattern, operation):
return True
# Request human approval
return self.approval_callback(operation, details)
def add_auto_approve(self, pattern: str):
"""Add a pattern for auto-approval."""
self.auto_approve_patterns.append(pattern)
Risk-based approval: Operations are classified by risk level:
- Low risk: Auto-approved (e.g., reading files, running tests)
- Medium risk: May be auto-approved if matching trusted patterns
- High risk: Always requires human approval (e.g., deleting files, pushing to main)
The approval callback: The approval_callback is a function you provide that presents the operation to the user and returns True or False. This could be:
- A CLI prompt asking yes/no
- A Slack message awaiting reaction
- A web UI with approve/reject buttons
- An API call to an approval system
Auto-approve patterns: For workflows where certain operations are pre-approved, you can add patterns. For example, add_auto_approve(r"edit:.*/tests/.*") would auto-approve edits to test files.
Combining safety layers: In a production system, you'd use both SafetyChecker (hard blocks on known-dangerous patterns) and HumanApproval (soft gates for sensitive-but-legitimate operations). The checker prevents obvious mistakes; the approval system handles nuanced decisions.
Conclusion
Building AI coding agents requires orchestrating multiple capabilities:
- Code understanding: Index, search, and analyze existing code
- Edit planning: Plan changes before making them
- Safe editing: Make changes with backups and rollback capability
- Test generation: Create tests to validate changes
- Iterative debugging: Fix issues until code works
- Sandboxed execution: Run untrusted code safely
- Safety checks: Prevent dangerous operations
Start simple—a basic agent that can read, search, and make single-file edits. Add complexity (multi-file coordination, TDD loops, debugging) as you validate the core functionality works reliably.
Frequently Asked Questions
Related Articles
AI Coding Assistants 2025: Cursor vs Copilot vs Windsurf vs Claude Code
A comprehensive comparison of AI coding assistants in 2025—Cursor, GitHub Copilot, Windsurf, Claude Code, and more. Features, pricing, use cases, and how to maximize productivity with each tool.
Building Agentic AI Systems: A Complete Implementation Guide
A comprehensive guide to building AI agents—tool use, ReAct pattern, planning, memory, context management, MCP integration, and multi-agent orchestration. With full prompt examples and production patterns.
Agent Evaluation and Testing: From Development to Production
A comprehensive guide to evaluating AI agents—task success metrics, trajectory analysis, tool use correctness, sandboxing, and building robust testing pipelines for production agent systems.