Structured Outputs and Tool Use: Patterns for Reliable AI Applications
Master structured output generation and tool use patterns—JSON mode, schema enforcement, Instructor library, function calling best practices, error handling, and production patterns for reliable AI applications.
Table of Contents
The Reliability Problem
LLMs are probabilistic text generators. They're incredibly powerful, but getting them to produce exactly the output format you need—every time, without fail—requires deliberate engineering. A chatbot can tolerate occasional formatting quirks. An agentic system that parses LLM output to make API calls cannot.
2025: The year structured outputs became production-ready: Both OpenAI and Anthropic now offer native structured output APIs that guarantee schema compliance. OpenAI's structured outputs with gpt-4o-2024-08-06 scores a perfect 100% on complex JSON schema following. Anthropic's structured outputs (beta header structured-outputs-2025-11-13) brings the same guarantees to Claude models.
Best practice: Always prefer structured outputs over JSON mode. According to production LLM guides, structured outputs are "the evolution of JSON mode" because they strictly enforce adherence to a specified schema, ensuring consistent, valid, and type-safe responses. This eliminates the need for fragile post-processing or regex hacks.
Why structured outputs are the foundation of agentic AI: Agents work by parsing LLM output to determine what action to take. If the model returns {"action": "search", "query": "latest news"}, your code can execute that action. If the model returns The action I'll take is to search for "latest news", your parsing breaks. Structured outputs transform LLMs from conversational interfaces into programmable APIs. This is the bridge between "AI that talks" and "AI that acts."
The reliability pyramid: There's a hierarchy of approaches, each trading flexibility for reliability. Prompt engineering is most flexible but least reliable. JSON mode guarantees valid JSON but not schema compliance. Structured outputs guarantee schema compliance but limit schema complexity. Instructor/Outlines add retry logic and validation. Understanding this pyramid helps you choose the right tool for each use case.
This guide covers everything you need to build reliable structured output pipelines: native JSON modes, schema enforcement with Instructor, function calling patterns, error handling, and production-ready implementations.
Prerequisites:
- Familiarity with building agentic AI systems
- Basic understanding of JSON Schema
- Python experience
What you'll learn:
- Native JSON mode vs. structured output APIs
- Schema enforcement with Instructor and Pydantic
- Function calling patterns and best practices
- Error handling and retry strategies
- Production patterns for reliable extraction
The Structured Output Landscape
Different approaches to getting structured output from LLMs:
| Approach | Reliability | Flexibility | Latency | Provider Support |
|---|---|---|---|---|
| Prompt engineering | Low | High | Low | All |
| JSON mode | Medium | Medium | Low | OpenAI, Anthropic, Google |
| Structured outputs | High | Medium | Low | OpenAI |
| Function calling | High | Medium | Low | OpenAI, Anthropic, Google |
| Instructor/Outlines | Very High | High | Medium | All (via wrapper) |
Native JSON Mode
The simplest approach: tell the model to output JSON. JSON mode tells the model "output must be valid JSON" but doesn't enforce a specific schema. It's a lightweight solution that works across providers, but requires additional validation in your code.
JSON mode is best for simple extraction tasks where the schema is straightforward and you're okay handling occasional format variations. For critical applications, use structured outputs (next section) which guarantee schema compliance.
OpenAI JSON Mode
OpenAI's JSON mode is enabled with response_format={"type": "json_object"}. The model will always output parseable JSON, but the schema depends entirely on your prompt. Be explicit about the exact keys and types you expect.
from openai import OpenAI
client = OpenAI()
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Extract user information. Output valid JSON with keys: name, email, age"
},
{
"role": "user",
"content": "John Smith is 32 years old and can be reached at john@example.com"
}
],
response_format={"type": "json_object"}
)
import json
data = json.loads(response.choices[0].message.content)
print(data)
# {"name": "John Smith", "email": "john@example.com", "age": 32}
Anthropic JSON Mode
import anthropic
import json
client = anthropic.Anthropic()
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{
"role": "user",
"content": """Extract user information from this text and return as JSON:
John Smith is 32 years old and can be reached at john@example.com
Return JSON with keys: name, email, age"""
}
]
)
# Claude often wraps JSON in markdown code blocks
content = response.content[0].text
if "```json" in content:
content = content.split("```json")[1].split("```")[0]
elif "```" in content:
content = content.split("```")[1].split("```")[0]
data = json.loads(content.strip())
Limitations of Basic JSON Mode
JSON mode guarantees valid JSON but not valid schema:
# You asked for: {"name": str, "email": str, "age": int}
# You might get:
{"name": "John Smith", "email": "john@example.com", "age": "32"} # age is string
{"name": "John", "surname": "Smith", "email": "john@example.com"} # missing age
{"user": {"name": "John Smith"}, "contact": "john@example.com"} # different structure
OpenAI Structured Outputs
OpenAI's structured outputs API guarantees schema compliance:
from openai import OpenAI
from pydantic import BaseModel
client = OpenAI()
class UserInfo(BaseModel):
name: str
email: str
age: int
response = client.beta.chat.completions.parse(
model="gpt-4o-2024-08-06",
messages=[
{
"role": "system",
"content": "Extract user information from the text."
},
{
"role": "user",
"content": "John Smith is 32 years old and can be reached at john@example.com"
}
],
response_format=UserInfo
)
user = response.choices[0].message.parsed
print(user.name) # "John Smith"
print(user.email) # "john@example.com"
print(user.age) # 32 (guaranteed to be int)
Complex Schemas
from pydantic import BaseModel, Field
from typing import Optional, Literal
from enum import Enum
class Priority(str, Enum):
LOW = "low"
MEDIUM = "medium"
HIGH = "high"
CRITICAL = "critical"
class Task(BaseModel):
title: str = Field(..., description="Brief task title")
description: str = Field(..., description="Detailed task description")
priority: Priority
estimated_hours: Optional[float] = Field(None, ge=0, le=100)
tags: list[str] = Field(default_factory=list)
class ProjectPlan(BaseModel):
project_name: str
goal: str
tasks: list[Task]
total_estimated_hours: float
response = client.beta.chat.completions.parse(
model="gpt-4o-2024-08-06",
messages=[
{
"role": "user",
"content": """Create a project plan for building a REST API:
The API should handle user authentication, product catalog,
and order management. Estimate 2 weeks of work."""
}
],
response_format=ProjectPlan
)
plan = response.choices[0].message.parsed
for task in plan.tasks:
print(f"[{task.priority.value}] {task.title}: {task.estimated_hours}h")
Handling Refusals
Structured outputs can still refuse to answer:
response = client.beta.chat.completions.parse(
model="gpt-4o-2024-08-06",
messages=[...],
response_format=UserInfo
)
if response.choices[0].message.refusal:
print(f"Model refused: {response.choices[0].message.refusal}")
else:
user = response.choices[0].message.parsed
# Use parsed data
Instructor: The Structured Output Library
Instructor is the go-to library for production structured outputs. It wraps LLM clients (OpenAI, Anthropic, etc.) and adds: (1) automatic retries when validation fails, (2) Pydantic model integration for type safety, (3) streaming partial objects, and (4) multiple extraction modes.
The key insight: when the model outputs invalid data, Instructor feeds the validation error back to the model and asks it to try again. This "retry with feedback" loop dramatically improves reliability. Instead of writing complex error handling, you define your schema with Pydantic validators, and Instructor handles the rest.
Basic Usage
The instructor.from_openai() function patches the OpenAI client to support a response_model parameter. Your Pydantic model defines the expected output schema, and Instructor ensures the response matches it.
pip install instructor
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field, field_validator
# Patch the client
client = instructor.from_openai(OpenAI())
class UserInfo(BaseModel):
name: str
email: str
age: int = Field(..., ge=0, le=150)
@field_validator("email")
@classmethod
def validate_email(cls, v):
if "@" not in v:
raise ValueError("Invalid email format")
return v
# Get structured output with automatic retries on validation failure
user = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "John Smith, 32, john@example.com"}
],
response_model=UserInfo,
max_retries=3 # Retry if validation fails
)
print(user) # UserInfo(name='John Smith', email='john@example.com', age=32)
With Anthropic
import instructor
from anthropic import Anthropic
client = instructor.from_anthropic(Anthropic())
user = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
messages=[
{"role": "user", "content": "John Smith, 32, john@example.com"}
],
response_model=UserInfo
)
Complex Extraction
from pydantic import BaseModel, Field
from typing import Optional
from datetime import date
class Address(BaseModel):
street: str
city: str
state: str
zip_code: str
country: str = "USA"
class Employment(BaseModel):
company: str
title: str
start_date: date
end_date: Optional[date] = None
current: bool = False
class Person(BaseModel):
name: str
email: Optional[str] = None
phone: Optional[str] = None
address: Optional[Address] = None
employment_history: list[Employment] = Field(default_factory=list)
skills: list[str] = Field(default_factory=list)
# Extract from unstructured resume text
resume_text = """
JOHN SMITH
john.smith@email.com | (555) 123-4567
123 Main St, San Francisco, CA 94102
EXPERIENCE
Senior Software Engineer
TechCorp Inc. | Jan 2020 - Present
- Led development of microservices architecture
- Managed team of 5 engineers
Software Engineer
StartupXYZ | Jun 2017 - Dec 2019
- Built REST APIs and frontend applications
SKILLS
Python, JavaScript, AWS, Docker, Kubernetes
"""
person = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": "Extract structured information from this resume."
},
{"role": "user", "content": resume_text}
],
response_model=Person
)
print(f"Name: {person.name}")
print(f"Skills: {', '.join(person.skills)}")
for job in person.employment_history:
status = "(current)" if job.current else ""
print(f" {job.title} at {job.company} {status}")
Streaming Structured Outputs
from instructor import Partial
class Article(BaseModel):
title: str
summary: str
key_points: list[str]
sentiment: Literal["positive", "negative", "neutral"]
# Stream partial results as they're generated
for partial_article in client.chat.completions.create_partial(
model="gpt-4o",
messages=[
{"role": "user", "content": "Analyze this article: [article text]"}
],
response_model=Article
):
# partial_article has fields populated as they're generated
if partial_article.title:
print(f"Title: {partial_article.title}")
if partial_article.key_points:
print(f"Points so far: {len(partial_article.key_points)}")
Validation and Retries
Pydantic validators are your first line of defense against bad LLM output. Field validators check individual values; model validators check relationships between fields. When validation fails, Instructor shows the error to the model and retries.
This is powerful: you can encode domain rules (no SQL keywords, valid date ranges, consistent references) as validators. The model learns from its mistakes and usually succeeds within 2-3 retries. Validation errors become training signals, not crash causes.
from pydantic import BaseModel, Field, field_validator, model_validator
class SearchQuery(BaseModel):
query: str = Field(..., min_length=3, max_length=200)
filters: dict[str, str] = Field(default_factory=dict)
limit: int = Field(default=10, ge=1, le=100)
@field_validator("query")
@classmethod
def clean_query(cls, v):
# Remove potential SQL injection
dangerous = ["DROP", "DELETE", "INSERT", "--", ";"]
for d in dangerous:
if d.lower() in v.lower():
raise ValueError(f"Query contains forbidden term: {d}")
return v.strip()
@model_validator(mode="after")
def validate_filters(self):
allowed_filter_keys = {"category", "date_range", "author", "status"}
invalid_keys = set(self.filters.keys()) - allowed_filter_keys
if invalid_keys:
raise ValueError(f"Invalid filter keys: {invalid_keys}")
return self
# Instructor will retry with the validation error message
# helping the model correct its output
query = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "Search for python tutorials from last month, limit 25"}
],
response_model=SearchQuery,
max_retries=3
)
Custom Retry Logic
from instructor import Maybe, Mode
from tenacity import Retrying, stop_after_attempt, wait_exponential
class ExtractedData(BaseModel):
value: float
unit: str
confidence: float = Field(..., ge=0, le=1)
# Maybe wrapper for graceful failures
class MaybeExtractedData(BaseModel):
result: Optional[ExtractedData] = None
error: Optional[str] = None
# Custom retry configuration
client = instructor.from_openai(
OpenAI(),
mode=Mode.TOOLS # Use function calling mode
)
def extract_with_custom_retry(text: str) -> MaybeExtractedData:
try:
for attempt in Retrying(
stop=stop_after_attempt(3),
wait=wait_exponential(multiplier=1, min=1, max=10)
):
with attempt:
result = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": f"Extract measurement: {text}"}
],
response_model=ExtractedData
)
return MaybeExtractedData(result=result)
except Exception as e:
return MaybeExtractedData(error=str(e))
Function Calling (Tool Use)
Function calling lets LLMs trigger actions in your application. Instead of generating text that you parse, the model outputs structured function calls: "call get_weather with location='San Francisco'". Your code executes the function and returns results to the model.
This is the foundation of agentic AI. The model decides which tool to use based on context, generates valid arguments, and interprets results. Function calling is more reliable than parsing free-form text because the model's output is constrained to valid function signatures.
OpenAI Function Calling
Define tools as JSON schemas. The description field is critical—it's what the model reads to decide when to use each tool. Be specific about capabilities and when the tool is appropriate.
from openai import OpenAI
import json
client = OpenAI()
# Define tools
tools = [
{
"type": "function",
"function": {
"name": "get_weather",
"description": "Get current weather for a location. Use this when user asks about weather.",
"parameters": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City and state, e.g., 'San Francisco, CA'"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"],
"description": "Temperature unit"
}
},
"required": ["location"]
}
}
},
{
"type": "function",
"function": {
"name": "search_web",
"description": "Search the web for information. Use for current events or facts.",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query"
},
"num_results": {
"type": "integer",
"description": "Number of results (1-10)",
"default": 5
}
},
"required": ["query"]
}
}
}
]
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "What's the weather like in Tokyo?"}
],
tools=tools,
tool_choice="auto" # Let model decide
)
# Check if model wants to call a function
message = response.choices[0].message
if message.tool_calls:
for tool_call in message.tool_calls:
function_name = tool_call.function.name
arguments = json.loads(tool_call.function.arguments)
print(f"Calling: {function_name}")
print(f"Arguments: {arguments}")
# Execute the function
if function_name == "get_weather":
result = get_weather(**arguments)
elif function_name == "search_web":
result = search_web(**arguments)
# Continue conversation with result
messages = [
{"role": "user", "content": "What's the weather like in Tokyo?"},
message,
{
"role": "tool",
"tool_call_id": tool_call.id,
"content": json.dumps(result)
}
]
final_response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools
)
print(final_response.choices[0].message.content)
Anthropic Tool Use
import anthropic
client = anthropic.Anthropic()
tools = [
{
"name": "get_weather",
"description": "Get current weather for a location",
"input_schema": {
"type": "object",
"properties": {
"location": {
"type": "string",
"description": "City and state"
},
"unit": {
"type": "string",
"enum": ["celsius", "fahrenheit"]
}
},
"required": ["location"]
}
}
]
response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "What's the weather in Tokyo?"}
]
)
# Handle tool use
for content in response.content:
if content.type == "tool_use":
tool_name = content.name
tool_input = content.input
tool_use_id = content.id
print(f"Tool: {tool_name}, Input: {tool_input}")
# Execute tool and continue
result = get_weather(**tool_input)
# Send result back
final_response = client.messages.create(
model="claude-3-5-sonnet-20241022",
max_tokens=1024,
tools=tools,
messages=[
{"role": "user", "content": "What's the weather in Tokyo?"},
{"role": "assistant", "content": response.content},
{
"role": "user",
"content": [
{
"type": "tool_result",
"tool_use_id": tool_use_id,
"content": json.dumps(result)
}
]
}
]
)
Parallel Tool Calls
Models can request multiple tool calls simultaneously:
response = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "What's the weather in Tokyo and New York?"}
],
tools=tools,
parallel_tool_calls=True
)
message = response.choices[0].message
if message.tool_calls:
# Execute all tool calls in parallel
import asyncio
async def execute_tools(tool_calls):
tasks = []
for tc in tool_calls:
func_name = tc.function.name
args = json.loads(tc.function.arguments)
tasks.append(execute_tool_async(func_name, args))
return await asyncio.gather(*tasks)
results = asyncio.run(execute_tools(message.tool_calls))
# Send all results back
tool_messages = [
{
"role": "tool",
"tool_call_id": tc.id,
"content": json.dumps(result)
}
for tc, result in zip(message.tool_calls, results)
]
messages = [
{"role": "user", "content": "What's the weather in Tokyo and New York?"},
message,
*tool_messages
]
final_response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
tools=tools
)
Tool Definition Best Practices
Descriptive Tool Definitions
# BAD: Vague description
{
"name": "search",
"description": "Search for things",
"parameters": {
"type": "object",
"properties": {
"q": {"type": "string"}
}
}
}
# GOOD: Detailed description with examples
{
"name": "search_products",
"description": """Search the product catalog.
Use this tool when the user wants to:
- Find products by name, category, or description
- Browse available items
- Check if a specific product exists
DO NOT use for:
- Checking inventory/stock levels (use check_inventory instead)
- Price comparisons (use compare_prices instead)
- Order history (use get_orders instead)
Examples of good queries:
- "wireless headphones under $100"
- "blue running shoes size 10"
- "laptop with 16GB RAM"
""",
"parameters": {
"type": "object",
"properties": {
"query": {
"type": "string",
"description": "Search query. Can include product names, categories, attributes, or natural language descriptions."
},
"category": {
"type": "string",
"enum": ["electronics", "clothing", "home", "sports", "books"],
"description": "Optional: Filter by product category"
},
"price_max": {
"type": "number",
"description": "Optional: Maximum price in USD"
},
"in_stock_only": {
"type": "boolean",
"default": True,
"description": "Only return products currently in stock"
}
},
"required": ["query"]
}
}
Schema Design Patterns
from pydantic import BaseModel, Field
from typing import Optional, Literal, Annotated
from datetime import datetime
# Pattern 1: Use enums for constrained values
class OrderStatus(str, Enum):
PENDING = "pending"
PROCESSING = "processing"
SHIPPED = "shipped"
DELIVERED = "delivered"
CANCELLED = "cancelled"
# Pattern 2: Use Literal for small fixed sets
class CreateOrder(BaseModel):
product_id: str
quantity: int = Field(..., ge=1, le=100)
shipping_method: Literal["standard", "express", "overnight"]
gift_wrap: bool = False
# Pattern 3: Nested objects for complex data
class Address(BaseModel):
street: str
city: str
state: str = Field(..., min_length=2, max_length=2)
zip_code: str = Field(..., pattern=r"^\d{5}(-\d{4})?$")
class Customer(BaseModel):
name: str
email: str
shipping_address: Address
billing_address: Optional[Address] = None # Defaults to shipping
# Pattern 4: Discriminated unions for polymorphic data
class TextContent(BaseModel):
type: Literal["text"] = "text"
text: str
class ImageContent(BaseModel):
type: Literal["image"] = "image"
url: str
alt_text: Optional[str] = None
class CodeContent(BaseModel):
type: Literal["code"] = "code"
language: str
code: str
Content = Annotated[
TextContent | ImageContent | CodeContent,
Field(discriminator="type")
]
class Message(BaseModel):
role: Literal["user", "assistant"]
content: list[Content]
# Pattern 5: Self-referential schemas (with care)
class TreeNode(BaseModel):
value: str
children: list["TreeNode"] = Field(default_factory=list)
TreeNode.model_rebuild() # Required for forward references
Converting Pydantic to JSON Schema
from pydantic import BaseModel, Field
from typing import get_type_hints
import json
def pydantic_to_openai_tool(model: type[BaseModel], name: str, description: str) -> dict:
"""Convert Pydantic model to OpenAI tool definition."""
schema = model.model_json_schema()
# Remove $defs if present (OpenAI doesn't support references well)
if "$defs" in schema:
schema = _resolve_refs(schema)
return {
"type": "function",
"function": {
"name": name,
"description": description,
"parameters": schema
}
}
def _resolve_refs(schema: dict) -> dict:
"""Inline $ref references in JSON schema."""
defs = schema.pop("$defs", {})
def resolve(obj):
if isinstance(obj, dict):
if "$ref" in obj:
ref_path = obj["$ref"].split("/")[-1]
return resolve(defs.get(ref_path, {}))
return {k: resolve(v) for k, v in obj.items()}
elif isinstance(obj, list):
return [resolve(item) for item in obj]
return obj
return resolve(schema)
# Usage
class SearchParams(BaseModel):
query: str = Field(..., description="Search query")
filters: dict[str, str] = Field(default_factory=dict)
limit: int = Field(default=10, ge=1, le=100)
tool = pydantic_to_openai_tool(
SearchParams,
name="search",
description="Search the database"
)
print(json.dumps(tool, indent=2))
Error Handling Patterns
Graceful Degradation
from pydantic import BaseModel, ValidationError
from typing import Optional, TypeVar, Generic
import json
T = TypeVar("T", bound=BaseModel)
class ExtractionResult(BaseModel, Generic[T]):
"""Wrapper for extraction results with error handling."""
success: bool
data: Optional[T] = None
raw_output: Optional[str] = None
error: Optional[str] = None
retries: int = 0
def extract_with_fallback(
client,
messages: list[dict],
response_model: type[T],
max_retries: int = 3,
fallback_model: str = "gpt-4o-mini"
) -> ExtractionResult[T]:
"""Extract structured data with graceful fallback."""
models = ["gpt-4o", fallback_model]
last_error = None
for model in models:
for attempt in range(max_retries):
try:
result = client.chat.completions.create(
model=model,
messages=messages,
response_model=response_model
)
return ExtractionResult(
success=True,
data=result,
retries=attempt
)
except ValidationError as e:
last_error = str(e)
# Add error context to messages for retry
messages = messages + [
{
"role": "assistant",
"content": f"Previous attempt failed validation: {e}"
},
{
"role": "user",
"content": "Please fix the errors and try again."
}
]
except Exception as e:
last_error = str(e)
break # Try next model
return ExtractionResult(
success=False,
error=last_error,
retries=max_retries * len(models)
)
Partial Extraction
from pydantic import BaseModel, Field, model_validator
from typing import Optional
class PartialUserInfo(BaseModel):
"""User info with optional fields for partial extraction."""
name: Optional[str] = None
email: Optional[str] = None
phone: Optional[str] = None
address: Optional[str] = None
age: Optional[int] = None
@model_validator(mode="after")
def at_least_one_field(self):
"""Ensure at least one field was extracted."""
if all(v is None for v in [self.name, self.email, self.phone, self.address, self.age]):
raise ValueError("Must extract at least one field")
return self
def completeness_score(self) -> float:
"""Calculate how complete the extraction is."""
fields = [self.name, self.email, self.phone, self.address, self.age]
return sum(1 for f in fields if f is not None) / len(fields)
# Extract whatever we can
user = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": "Contact John at john@example.com" # Limited info
}
],
response_model=PartialUserInfo
)
print(f"Extracted: {user}")
print(f"Completeness: {user.completeness_score():.0%}")
Schema Evolution
from pydantic import BaseModel, Field
from typing import Optional, Any
class UserInfoV1(BaseModel):
"""Original schema."""
name: str
email: str
class UserInfoV2(BaseModel):
"""Updated schema with new fields."""
name: str
email: str
phone: Optional[str] = None # New in v2
preferences: dict[str, Any] = Field(default_factory=dict) # New in v2
def migrate_v1_to_v2(v1: UserInfoV1) -> UserInfoV2:
"""Migrate from v1 to v2 schema."""
return UserInfoV2(
name=v1.name,
email=v1.email,
phone=None,
preferences={}
)
class SchemaVersionHandler:
"""Handle multiple schema versions."""
def __init__(self):
self.schemas = {
"v1": UserInfoV1,
"v2": UserInfoV2
}
self.migrations = {
("v1", "v2"): migrate_v1_to_v2
}
self.current_version = "v2"
def extract(self, client, messages: list[dict], version: str = None):
"""Extract using specified or current schema version."""
version = version or self.current_version
schema = self.schemas[version]
result = client.chat.completions.create(
model="gpt-4o",
messages=messages,
response_model=schema
)
# Migrate to current version if needed
if version != self.current_version:
migration_key = (version, self.current_version)
if migration_key in self.migrations:
result = self.migrations[migration_key](result)
return result
Production Patterns
Caching Structured Outputs
import hashlib
import json
from functools import lru_cache
from typing import TypeVar, Type
from pydantic import BaseModel
T = TypeVar("T", bound=BaseModel)
class StructuredOutputCache:
"""Cache for structured output extractions."""
def __init__(self, ttl_seconds: int = 3600):
self.cache: dict[str, tuple[float, Any]] = {}
self.ttl = ttl_seconds
def _hash_key(self, model: str, messages: list[dict], response_model: type) -> str:
"""Generate cache key."""
content = json.dumps({
"model": model,
"messages": messages,
"schema": response_model.model_json_schema()
}, sort_keys=True)
return hashlib.sha256(content.encode()).hexdigest()
def get(self, key: str) -> Optional[BaseModel]:
"""Get cached result."""
if key in self.cache:
timestamp, value = self.cache[key]
if time.time() - timestamp < self.ttl:
return value
del self.cache[key]
return None
def set(self, key: str, value: BaseModel):
"""Cache a result."""
self.cache[key] = (time.time(), value)
cache = StructuredOutputCache()
def cached_extract(
client,
model: str,
messages: list[dict],
response_model: Type[T]
) -> T:
"""Extract with caching."""
key = cache._hash_key(model, messages, response_model)
cached = cache.get(key)
if cached:
return cached
result = client.chat.completions.create(
model=model,
messages=messages,
response_model=response_model
)
cache.set(key, result)
return result
Batch Processing
import asyncio
from dataclasses import dataclass
from typing import TypeVar, Generic
T = TypeVar("T", bound=BaseModel)
@dataclass
class BatchResult(Generic[T]):
index: int
success: bool
result: Optional[T] = None
error: Optional[str] = None
async def batch_extract(
client,
items: list[str],
response_model: Type[T],
system_prompt: str,
max_concurrent: int = 10
) -> list[BatchResult[T]]:
"""Extract structured data from multiple items concurrently."""
semaphore = asyncio.Semaphore(max_concurrent)
async def process_item(index: int, item: str) -> BatchResult[T]:
async with semaphore:
try:
result = await asyncio.to_thread(
client.chat.completions.create,
model="gpt-4o",
messages=[
{"role": "system", "content": system_prompt},
{"role": "user", "content": item}
],
response_model=response_model
)
return BatchResult(index=index, success=True, result=result)
except Exception as e:
return BatchResult(index=index, success=False, error=str(e))
tasks = [process_item(i, item) for i, item in enumerate(items)]
results = await asyncio.gather(*tasks)
return sorted(results, key=lambda r: r.index)
# Usage
class ProductInfo(BaseModel):
name: str
price: float
category: str
descriptions = [
"iPhone 15 Pro - $999, Electronics",
"Nike Air Max - $150, Shoes",
"The Great Gatsby - $15, Books"
]
results = asyncio.run(batch_extract(
client,
descriptions,
ProductInfo,
"Extract product information."
))
for r in results:
if r.success:
print(f"{r.result.name}: ${r.result.price}")
else:
print(f"Error at index {r.index}: {r.error}")
Monitoring and Observability
import time
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict
@dataclass
class ExtractionMetrics:
model: str
schema_name: str
success: bool
latency_ms: float
tokens_used: int
retries: int
validation_errors: list[str] = field(default_factory=list)
class ExtractionMonitor:
"""Monitor structured output extractions."""
def __init__(self):
self.metrics: list[ExtractionMetrics] = []
def record(self, metrics: ExtractionMetrics):
"""Record extraction metrics."""
self.metrics.append(metrics)
def get_stats(self, schema_name: Optional[str] = None) -> dict:
"""Get aggregate statistics."""
filtered = self.metrics
if schema_name:
filtered = [m for m in self.metrics if m.schema_name == schema_name]
if not filtered:
return {"error": "No data"}
success_count = sum(1 for m in filtered if m.success)
latencies = [m.latency_ms for m in filtered]
return {
"total_extractions": len(filtered),
"success_rate": success_count / len(filtered),
"avg_latency_ms": sum(latencies) / len(latencies),
"p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
"avg_retries": sum(m.retries for m in filtered) / len(filtered),
"common_errors": self._get_common_errors(filtered)
}
def _get_common_errors(self, metrics: list[ExtractionMetrics]) -> list[tuple[str, int]]:
"""Get most common validation errors."""
error_counts = defaultdict(int)
for m in metrics:
for error in m.validation_errors:
error_counts[error] += 1
return sorted(error_counts.items(), key=lambda x: -x[1])[:5]
monitor = ExtractionMonitor()
def monitored_extract(
client,
model: str,
messages: list[dict],
response_model: Type[T]
) -> T:
"""Extract with monitoring."""
start_time = time.time()
retries = 0
validation_errors = []
try:
result = client.chat.completions.create(
model=model,
messages=messages,
response_model=response_model,
max_retries=3
)
success = True
except Exception as e:
success = False
validation_errors.append(str(e))
raise
finally:
monitor.record(ExtractionMetrics(
model=model,
schema_name=response_model.__name__,
success=success,
latency_ms=(time.time() - start_time) * 1000,
tokens_used=0, # Would need to track from response
retries=retries,
validation_errors=validation_errors
))
return result
Testing Structured Outputs
import pytest
from pydantic import BaseModel, ValidationError
class UserInfo(BaseModel):
name: str
email: str
age: int
@pytest.fixture
def mock_llm_response():
"""Mock LLM responses for testing."""
responses = {
"valid": '{"name": "John", "email": "john@example.com", "age": 30}',
"invalid_type": '{"name": "John", "email": "john@example.com", "age": "thirty"}',
"missing_field": '{"name": "John", "email": "john@example.com"}',
"extra_field": '{"name": "John", "email": "john@example.com", "age": 30, "phone": "555-1234"}',
}
return responses
class TestStructuredOutputs:
def test_valid_extraction(self, mock_llm_response):
"""Test extraction with valid response."""
data = UserInfo.model_validate_json(mock_llm_response["valid"])
assert data.name == "John"
assert data.age == 30
def test_invalid_type_raises(self, mock_llm_response):
"""Test that invalid types raise ValidationError."""
with pytest.raises(ValidationError) as exc_info:
UserInfo.model_validate_json(mock_llm_response["invalid_type"])
assert "age" in str(exc_info.value)
def test_missing_field_raises(self, mock_llm_response):
"""Test that missing required fields raise ValidationError."""
with pytest.raises(ValidationError):
UserInfo.model_validate_json(mock_llm_response["missing_field"])
def test_extra_fields_ignored(self, mock_llm_response):
"""Test that extra fields are ignored by default."""
data = UserInfo.model_validate_json(mock_llm_response["extra_field"])
assert data.name == "John"
assert not hasattr(data, "phone")
def test_schema_generation(self):
"""Test JSON schema generation."""
schema = UserInfo.model_json_schema()
assert "properties" in schema
assert "name" in schema["properties"]
assert schema["properties"]["age"]["type"] == "integer"
def test_round_trip(self):
"""Test serialization round-trip."""
original = UserInfo(name="John", email="john@example.com", age=30)
json_str = original.model_dump_json()
restored = UserInfo.model_validate_json(json_str)
assert original == restored
Advanced Schema Patterns
Recursive and Self-Referential Schemas
For tree structures, graphs, and nested hierarchies:
from pydantic import BaseModel, Field
from typing import Optional, ForwardRef
# Pattern 1: Simple recursion (file system)
class FileNode(BaseModel):
name: str
type: Literal["file", "directory"]
size_bytes: Optional[int] = None # Only for files
children: list["FileNode"] = Field(default_factory=list) # Only for directories
@model_validator(mode="after")
def validate_structure(self):
if self.type == "file" and self.children:
raise ValueError("Files cannot have children")
if self.type == "directory" and self.size_bytes is not None:
raise ValueError("Directories don't have direct size")
return self
FileNode.model_rebuild()
# Pattern 2: Organizational hierarchy
class Employee(BaseModel):
id: str
name: str
title: str
department: str
direct_reports: list["Employee"] = Field(default_factory=list)
def total_reports(self) -> int:
"""Recursively count all reports."""
return len(self.direct_reports) + sum(
r.total_reports() for r in self.direct_reports
)
Employee.model_rebuild()
# Pattern 3: Expression trees (for calculators, query builders)
class Expression(BaseModel):
type: Literal["number", "variable", "binary_op", "function_call"]
class NumberExpr(Expression):
type: Literal["number"] = "number"
value: float
class VariableExpr(Expression):
type: Literal["variable"] = "variable"
name: str
class BinaryOpExpr(Expression):
type: Literal["binary_op"] = "binary_op"
operator: Literal["+", "-", "*", "/", "^"]
left: "AnyExpression"
right: "AnyExpression"
class FunctionCallExpr(Expression):
type: Literal["function_call"] = "function_call"
function: str
arguments: list["AnyExpression"]
AnyExpression = NumberExpr | VariableExpr | BinaryOpExpr | FunctionCallExpr
# Usage: Parse mathematical expressions
expression = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "user",
"content": "Parse this expression into a tree: (x + 2) * sin(y)"
}
],
response_model=AnyExpression
)
Dynamic Schema Generation
Generate schemas at runtime based on configuration:
from pydantic import create_model, Field
from typing import Any, Type
def create_extraction_schema(
fields: dict[str, dict],
schema_name: str = "DynamicSchema"
) -> Type[BaseModel]:
"""
Create a Pydantic model from a field configuration.
fields = {
"name": {"type": "str", "required": True, "description": "Person's name"},
"age": {"type": "int", "required": False, "default": None},
"tags": {"type": "list[str]", "required": False, "default_factory": list}
}
"""
type_mapping = {
"str": str,
"int": int,
"float": float,
"bool": bool,
"list[str]": list[str],
"list[int]": list[int],
"dict": dict[str, Any],
}
field_definitions = {}
for field_name, config in fields.items():
field_type = type_mapping.get(config["type"], str)
if config.get("required", True):
field_definitions[field_name] = (
field_type,
Field(..., description=config.get("description", ""))
)
else:
default = config.get("default")
if "default_factory" in config:
field_definitions[field_name] = (
Optional[field_type],
Field(default_factory=config["default_factory"])
)
else:
field_definitions[field_name] = (
Optional[field_type],
Field(default=default, description=config.get("description", ""))
)
return create_model(schema_name, **field_definitions)
# Usage: Create schema from user configuration
user_config = {
"company_name": {"type": "str", "required": True, "description": "Company name"},
"revenue": {"type": "float", "required": False, "description": "Annual revenue in USD"},
"employees": {"type": "int", "required": False, "default": None},
"industries": {"type": "list[str]", "required": False, "default_factory": list}
}
CompanySchema = create_extraction_schema(user_config, "CompanyInfo")
# Now use it for extraction
company = client.chat.completions.create(
model="gpt-4o",
messages=[
{"role": "user", "content": "Acme Corp reported $50M revenue with 200 employees in tech and finance."}
],
response_model=CompanySchema
)
Context-Aware Schema Selection
Choose schemas based on input analysis:
from pydantic import BaseModel, Field
from typing import Union
from enum import Enum
class DocumentType(str, Enum):
INVOICE = "invoice"
RECEIPT = "receipt"
CONTRACT = "contract"
RESUME = "resume"
EMAIL = "email"
# Different schemas for different document types
class Invoice(BaseModel):
document_type: Literal["invoice"] = "invoice"
invoice_number: str
vendor: str
customer: str
line_items: list[dict[str, Any]]
subtotal: float
tax: float
total: float
due_date: Optional[str] = None
class Receipt(BaseModel):
document_type: Literal["receipt"] = "receipt"
merchant: str
items: list[dict[str, Any]]
subtotal: float
tax: float
total: float
payment_method: str
transaction_date: str
class Resume(BaseModel):
document_type: Literal["resume"] = "resume"
name: str
contact: dict[str, str]
summary: Optional[str] = None
experience: list[dict[str, Any]]
education: list[dict[str, Any]]
skills: list[str]
class DocumentClassification(BaseModel):
document_type: DocumentType
confidence: float = Field(..., ge=0, le=1)
reasoning: str
class AdaptiveDocumentExtractor:
"""Extract structured data with automatic schema selection."""
def __init__(self, client):
self.client = client
self.schemas = {
DocumentType.INVOICE: Invoice,
DocumentType.RECEIPT: Receipt,
DocumentType.RESUME: Resume,
}
def extract(self, document_text: str) -> BaseModel:
# Step 1: Classify document
classification = self.client.chat.completions.create(
model="gpt-4o-mini", # Fast model for classification
messages=[
{
"role": "system",
"content": "Classify this document type and explain your reasoning."
},
{"role": "user", "content": document_text[:2000]} # First 2000 chars
],
response_model=DocumentClassification
)
# Step 2: Extract with appropriate schema
schema = self.schemas.get(classification.document_type)
if not schema:
raise ValueError(f"No schema for document type: {classification.document_type}")
result = self.client.chat.completions.create(
model="gpt-4o", # Better model for extraction
messages=[
{
"role": "system",
"content": f"Extract {classification.document_type.value} information."
},
{"role": "user", "content": document_text}
],
response_model=schema
)
return result
# Usage
extractor = AdaptiveDocumentExtractor(client)
result = extractor.extract(document_text)
print(f"Extracted {result.document_type}: {result}")
Union Types with Discriminators
Handle polymorphic data cleanly:
from pydantic import BaseModel, Field
from typing import Annotated, Union, Literal
# Different action types an agent might take
class SearchAction(BaseModel):
action_type: Literal["search"] = "search"
query: str
filters: dict[str, str] = Field(default_factory=dict)
max_results: int = Field(default=10, ge=1, le=100)
class CalculateAction(BaseModel):
action_type: Literal["calculate"] = "calculate"
expression: str
precision: int = Field(default=2, ge=0, le=10)
class EmailAction(BaseModel):
action_type: Literal["email"] = "email"
to: list[str]
subject: str
body: str
cc: list[str] = Field(default_factory=list)
priority: Literal["low", "normal", "high"] = "normal"
class CodeAction(BaseModel):
action_type: Literal["code"] = "code"
language: str
code: str
description: str
class RespondAction(BaseModel):
action_type: Literal["respond"] = "respond"
message: str
citations: list[str] = Field(default_factory=list)
# Discriminated union - Pydantic will use action_type to determine which model
AgentAction = Annotated[
Union[SearchAction, CalculateAction, EmailAction, CodeAction, RespondAction],
Field(discriminator="action_type")
]
class AgentPlan(BaseModel):
"""A plan consisting of multiple actions."""
reasoning: str = Field(..., description="Explanation of the plan")
actions: list[AgentAction] = Field(..., description="Ordered list of actions to take")
estimated_steps: int
# Usage
plan = client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": """You are a planning agent. Create action plans to accomplish user goals.
Available actions: search, calculate, email, code, respond"""
},
{
"role": "user",
"content": "Find the current Bitcoin price, convert it to EUR, and email the result to finance@company.com"
}
],
response_model=AgentPlan
)
for i, action in enumerate(plan.actions, 1):
print(f"Step {i}: {action.action_type}")
if action.action_type == "search":
print(f" Query: {action.query}")
elif action.action_type == "email":
print(f" To: {action.to}")
print(f" Subject: {action.subject}")
Multi-Tool Orchestration
Building systems where LLMs coordinate multiple tools requires careful schema design and execution patterns.
Tool Registry Pattern
from pydantic import BaseModel, Field
from typing import Callable, Any, TypeVar
from dataclasses import dataclass
import inspect
T = TypeVar("T", bound=BaseModel)
@dataclass
class Tool:
name: str
description: str
parameters_schema: type[BaseModel]
handler: Callable
requires_confirmation: bool = False
timeout_seconds: int = 30
class ToolRegistry:
"""Registry for managing available tools."""
def __init__(self):
self.tools: dict[str, Tool] = {}
def register(
self,
name: str = None,
description: str = None,
requires_confirmation: bool = False,
timeout_seconds: int = 30
):
"""Decorator to register a tool."""
def decorator(func: Callable) -> Callable:
# Extract parameter schema from type hints
hints = get_type_hints(func)
params_type = hints.get("params")
if not params_type or not issubclass(params_type, BaseModel):
raise ValueError(f"Tool {func.__name__} must have a 'params' argument with Pydantic type")
tool_name = name or func.__name__
tool_description = description or func.__doc__ or ""
self.tools[tool_name] = Tool(
name=tool_name,
description=tool_description,
parameters_schema=params_type,
handler=func,
requires_confirmation=requires_confirmation,
timeout_seconds=timeout_seconds
)
return func
return decorator
def get_tool_definitions(self) -> list[dict]:
"""Get OpenAI-compatible tool definitions."""
definitions = []
for tool in self.tools.values():
definitions.append({
"type": "function",
"function": {
"name": tool.name,
"description": tool.description,
"parameters": tool.parameters_schema.model_json_schema()
}
})
return definitions
async def execute(self, tool_name: str, params: dict) -> Any:
"""Execute a tool with given parameters."""
if tool_name not in self.tools:
raise ValueError(f"Unknown tool: {tool_name}")
tool = self.tools[tool_name]
validated_params = tool.parameters_schema.model_validate(params)
if asyncio.iscoroutinefunction(tool.handler):
return await asyncio.wait_for(
tool.handler(validated_params),
timeout=tool.timeout_seconds
)
else:
return await asyncio.to_thread(tool.handler, validated_params)
# Usage
registry = ToolRegistry()
class SearchParams(BaseModel):
query: str
max_results: int = 10
@registry.register(
name="web_search",
description="Search the web for information",
timeout_seconds=15
)
def search_web(params: SearchParams) -> list[dict]:
# Implementation
return [{"title": "Result", "url": "https://example.com"}]
class DatabaseQueryParams(BaseModel):
table: str
conditions: dict[str, Any]
limit: int = 100
@registry.register(
name="query_database",
description="Query the internal database",
requires_confirmation=True # Sensitive operation
)
async def query_database(params: DatabaseQueryParams) -> list[dict]:
# Implementation
return [{"id": 1, "data": "..."}]
Intelligent Tool Selection
Let the LLM choose and chain tools:
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum
class ToolSelectionReasoning(BaseModel):
"""Model's reasoning about which tools to use."""
user_intent: str = Field(..., description="What the user is trying to accomplish")
required_information: list[str] = Field(..., description="What information is needed")
selected_tools: list[str] = Field(..., description="Tools to use, in order")
tool_reasoning: dict[str, str] = Field(
...,
description="Why each tool was selected"
)
parallel_execution: list[list[str]] = Field(
default_factory=list,
description="Groups of tools that can run in parallel"
)
class ToolOrchestrator:
"""Orchestrates multi-tool execution with intelligent planning."""
def __init__(self, client, registry: ToolRegistry):
self.client = client
self.registry = registry
async def plan_and_execute(self, user_request: str) -> dict:
# Step 1: Plan tool usage
tool_descriptions = "\n".join([
f"- {t.name}: {t.description}"
for t in self.registry.tools.values()
])
plan = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": f"""You are a tool orchestration planner.
Available tools:
{tool_descriptions}
Analyze the user request and plan which tools to use."""
},
{"role": "user", "content": user_request}
],
response_model=ToolSelectionReasoning
)
# Step 2: Execute tools based on plan
results = {}
# Execute parallel groups first
for parallel_group in plan.parallel_execution:
group_tasks = []
for tool_name in parallel_group:
if tool_name in plan.selected_tools:
# Get parameters from LLM
params = await self._get_tool_params(tool_name, user_request, results)
group_tasks.append(self._execute_with_name(tool_name, params))
group_results = await asyncio.gather(*group_tasks, return_exceptions=True)
for tool_name, result in zip(parallel_group, group_results):
results[tool_name] = result
# Execute remaining tools sequentially
executed = set(t for group in plan.parallel_execution for t in group)
for tool_name in plan.selected_tools:
if tool_name not in executed:
params = await self._get_tool_params(tool_name, user_request, results)
results[tool_name] = await self.registry.execute(tool_name, params)
return {
"plan": plan.model_dump(),
"results": results
}
async def _get_tool_params(
self,
tool_name: str,
user_request: str,
prior_results: dict
) -> dict:
"""Use LLM to generate tool parameters."""
tool = self.registry.tools[tool_name]
context = f"User request: {user_request}"
if prior_results:
context += f"\n\nPrior tool results: {json.dumps(prior_results, default=str)}"
params = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": f"Generate parameters for the '{tool_name}' tool based on the context."
},
{"role": "user", "content": context}
],
response_model=tool.parameters_schema
)
return params.model_dump()
async def _execute_with_name(self, tool_name: str, params: dict) -> tuple[str, Any]:
result = await self.registry.execute(tool_name, params)
return result
Tool Chaining with Dependencies
Handle tools that depend on outputs from other tools:
from pydantic import BaseModel, Field
from typing import Optional, Any
from dataclasses import dataclass
import networkx as nx
@dataclass
class ToolDependency:
tool_name: str
depends_on: list[str] # Tools that must complete first
output_mapping: dict[str, str] # How to map outputs to inputs
class ToolChainDefinition(BaseModel):
"""Definition of a tool chain extracted by LLM."""
steps: list["ToolStep"]
final_output_tool: str
class ToolStep(BaseModel):
tool_name: str
input_mappings: dict[str, str] = Field(
default_factory=dict,
description="Map from this tool's param to 'tool_name.output_field'"
)
static_inputs: dict[str, Any] = Field(
default_factory=dict,
description="Static values for parameters"
)
class DependencyAwareExecutor:
"""Execute tool chains respecting dependencies."""
def __init__(self, registry: ToolRegistry):
self.registry = registry
def build_execution_graph(self, chain: ToolChainDefinition) -> nx.DiGraph:
"""Build a DAG of tool dependencies."""
graph = nx.DiGraph()
for step in chain.steps:
graph.add_node(step.tool_name, step=step)
# Add edges for dependencies
for param, source in step.input_mappings.items():
if "." in source:
source_tool = source.split(".")[0]
graph.add_edge(source_tool, step.tool_name)
if not nx.is_directed_acyclic_graph(graph):
raise ValueError("Tool chain contains cycles")
return graph
async def execute_chain(self, chain: ToolChainDefinition) -> dict[str, Any]:
"""Execute tools in dependency order."""
graph = self.build_execution_graph(chain)
results = {}
# Process in topological order
for tool_name in nx.topological_sort(graph):
step = graph.nodes[tool_name].get("step")
if not step:
continue
# Build parameters
params = dict(step.static_inputs)
for param, source in step.input_mappings.items():
if "." in source:
source_tool, source_field = source.split(".", 1)
source_result = results.get(source_tool, {})
params[param] = self._get_nested_value(source_result, source_field)
else:
params[param] = source
# Execute
results[tool_name] = await self.registry.execute(tool_name, params)
return results
def _get_nested_value(self, obj: Any, path: str) -> Any:
"""Get nested value from dict using dot notation."""
for key in path.split("."):
if isinstance(obj, dict):
obj = obj.get(key)
elif hasattr(obj, key):
obj = getattr(obj, key)
else:
return None
return obj
# Example usage
chain = ToolChainDefinition(
steps=[
ToolStep(
tool_name="web_search",
static_inputs={"query": "Bitcoin current price USD"}
),
ToolStep(
tool_name="extract_number",
input_mappings={"text": "web_search.results[0].snippet"}
),
ToolStep(
tool_name="currency_convert",
input_mappings={"amount": "extract_number.value"},
static_inputs={"from_currency": "USD", "to_currency": "EUR"}
),
ToolStep(
tool_name="send_email",
input_mappings={"body": "currency_convert.formatted_result"},
static_inputs={
"to": ["finance@company.com"],
"subject": "Bitcoin Price Update"
}
)
],
final_output_tool="send_email"
)
Tool Result Aggregation
Combine results from multiple tools:
from pydantic import BaseModel, Field
from typing import Any
class AggregatedResult(BaseModel):
"""Aggregated and synthesized result from multiple tools."""
summary: str = Field(..., description="Natural language summary of findings")
sources: list[str] = Field(..., description="Tools that provided data")
key_facts: list[str] = Field(..., description="Most important facts discovered")
confidence: float = Field(..., ge=0, le=1, description="Confidence in the aggregation")
conflicts: list[str] = Field(
default_factory=list,
description="Any conflicting information found"
)
raw_data: dict[str, Any] = Field(
default_factory=dict,
description="Raw data from each tool"
)
class ResultAggregator:
"""Aggregate and synthesize results from multiple tools."""
def __init__(self, client):
self.client = client
def aggregate(
self,
tool_results: dict[str, Any],
user_question: str
) -> AggregatedResult:
"""Use LLM to aggregate tool results into a coherent response."""
# Format results for the LLM
formatted_results = []
for tool_name, result in tool_results.items():
formatted_results.append(f"## {tool_name}\n{json.dumps(result, default=str, indent=2)}")
results_text = "\n\n".join(formatted_results)
aggregation = self.client.chat.completions.create(
model="gpt-4o",
messages=[
{
"role": "system",
"content": """You are a result aggregation specialist.
Synthesize information from multiple tool outputs into a coherent response.
Identify key facts, note any conflicts, and assess confidence."""
},
{
"role": "user",
"content": f"""User question: {user_question}
Tool results:
{results_text}
Aggregate these results into a comprehensive response."""
}
],
response_model=AggregatedResult
)
aggregation.raw_data = tool_results
return aggregation
Advanced Streaming Patterns
Streaming with Real-Time Validation
from pydantic import BaseModel, ValidationError
from typing import Generator, Optional
import json
class StreamingValidator:
"""Validate streaming JSON as it arrives."""
def __init__(self, schema: type[BaseModel]):
self.schema = schema
self.buffer = ""
self.partial_object = {}
self.completed_fields = set()
def process_chunk(self, chunk: str) -> tuple[dict, list[str]]:
"""
Process a chunk of streaming JSON.
Returns (partial_object, newly_completed_fields).
"""
self.buffer += chunk
newly_completed = []
# Try to parse as much as possible
try:
# Attempt to parse complete JSON
parsed = json.loads(self.buffer)
self._update_partial(parsed, newly_completed)
except json.JSONDecodeError:
# Try to extract completed fields from partial JSON
self._extract_partial_fields(newly_completed)
return self.partial_object, newly_completed
def _update_partial(self, parsed: dict, newly_completed: list[str]):
"""Update partial object with parsed data."""
for key, value in parsed.items():
if key not in self.completed_fields:
# Validate individual field
try:
field_info = self.schema.model_fields.get(key)
if field_info:
# Field exists in schema
self.partial_object[key] = value
self.completed_fields.add(key)
newly_completed.append(key)
except Exception:
pass
def _extract_partial_fields(self, newly_completed: list[str]):
"""Extract fields from incomplete JSON."""
# Simple heuristic: look for completed key-value pairs
import re
# Match completed string fields: "key": "value"
string_pattern = r'"(\w+)":\s*"([^"]*)"(?:,|\s*})'
for match in re.finditer(string_pattern, self.buffer):
key, value = match.groups()
if key not in self.completed_fields:
self.partial_object[key] = value
self.completed_fields.add(key)
newly_completed.append(key)
# Match completed number fields: "key": 123
number_pattern = r'"(\w+)":\s*(\d+(?:\.\d+)?)(?:,|\s*})'
for match in re.finditer(number_pattern, self.buffer):
key, value = match.groups()
if key not in self.completed_fields:
self.partial_object[key] = float(value) if "." in value else int(value)
self.completed_fields.add(key)
newly_completed.append(key)
def get_validated(self) -> Optional[BaseModel]:
"""Try to validate the partial object against the schema."""
try:
return self.schema.model_validate(self.partial_object)
except ValidationError:
return None
# Usage with streaming response
def stream_with_validation(
client,
messages: list[dict],
response_model: type[BaseModel]
) -> Generator[tuple[dict, list[str]], None, BaseModel]:
"""Stream structured output with real-time validation."""
validator = StreamingValidator(response_model)
response = client.chat.completions.create(
model="gpt-4o",
messages=messages,
response_format={"type": "json_object"},
stream=True
)
for chunk in response:
if chunk.choices[0].delta.content:
partial, new_fields = validator.process_chunk(
chunk.choices[0].delta.content
)
if new_fields:
yield partial, new_fields
# Final validation
final = validator.get_validated()
if final:
return final
else:
raise ValidationError("Failed to validate complete response")
Progressive UI Updates
Pattern for updating UI as structured data streams in:
from pydantic import BaseModel, Field
from typing import Callable, Any
from dataclasses import dataclass
import asyncio
@dataclass
class FieldUpdate:
field_name: str
value: Any
is_complete: bool
timestamp: float
class ProgressiveOutputHandler:
"""Handle streaming structured outputs with progressive UI updates."""
def __init__(self):
self.field_callbacks: dict[str, list[Callable]] = {}
self.completion_callbacks: list[Callable] = []
def on_field(self, field_name: str, callback: Callable[[Any, bool], None]):
"""Register callback for when a specific field updates."""
if field_name not in self.field_callbacks:
self.field_callbacks[field_name] = []
self.field_callbacks[field_name].append(callback)
def on_complete(self, callback: Callable[[BaseModel], None]):
"""Register callback for when extraction completes."""
self.completion_callbacks.append(callback)
async def stream_extract(
self,
client,
messages: list[dict],
response_model: type[BaseModel]
):
"""Stream extraction with callbacks."""
async for partial_result in client.chat.completions.create_partial(
model="gpt-4o",
messages=messages,
response_model=response_model
):
# Check each field for updates
for field_name in response_model.model_fields:
value = getattr(partial_result, field_name, None)
if value is not None and field_name in self.field_callbacks:
for callback in self.field_callbacks[field_name]:
await self._safe_callback(callback, value, False)
# Final complete object
for callback in self.completion_callbacks:
await self._safe_callback(callback, partial_result)
async def _safe_callback(self, callback: Callable, *args):
"""Safely execute callback."""
try:
if asyncio.iscoroutinefunction(callback):
await callback(*args)
else:
callback(*args)
except Exception as e:
print(f"Callback error: {e}")
# Example: Real-time document analysis UI
class DocumentAnalysis(BaseModel):
title: str
summary: str
key_points: list[str]
sentiment: Literal["positive", "negative", "neutral"]
entities: list[dict[str, str]]
word_count: int
async def analyze_document_with_ui(document_text: str):
handler = ProgressiveOutputHandler()
# UI callbacks
def update_title(title: str, complete: bool):
print(f"📄 Title: {title}")
def update_summary(summary: str, complete: bool):
if complete:
print(f"📝 Summary: {summary}")
else:
print(f"📝 Summary (partial): {summary[:100]}...")
def update_sentiment(sentiment: str, complete: bool):
emoji = {"positive": "😊", "negative": "😔", "neutral": "😐"}
print(f"Sentiment: {emoji.get(sentiment, '❓')} {sentiment}")
def update_key_points(points: list[str], complete: bool):
print(f"Key points ({len(points)}):")
for point in points[-3:]: # Show last 3
print(f" • {point}")
handler.on_field("title", update_title)
handler.on_field("summary", update_summary)
handler.on_field("sentiment", update_sentiment)
handler.on_field("key_points", update_key_points)
handler.on_complete(lambda result: print(f"\n✅ Analysis complete! Word count: {result.word_count}"))
await handler.stream_extract(
client,
messages=[
{"role": "system", "content": "Analyze this document."},
{"role": "user", "content": document_text}
],
response_model=DocumentAnalysis
)
Streaming with Cancellation
Handle user cancellation during streaming:
import asyncio
from typing import Optional
from contextlib import asynccontextmanager
class CancellableExtraction:
"""Structured extraction that can be cancelled mid-stream."""
def __init__(self, client):
self.client = client
self._cancel_event: Optional[asyncio.Event] = None
self._current_task: Optional[asyncio.Task] = None
@asynccontextmanager
async def extraction_context(self):
"""Context manager for cancellable extraction."""
self._cancel_event = asyncio.Event()
try:
yield self
finally:
if self._current_task and not self._current_task.done():
self._current_task.cancel()
self._cancel_event = None
def cancel(self):
"""Cancel the current extraction."""
if self._cancel_event:
self._cancel_event.set()
async def extract_with_progress(
self,
messages: list[dict],
response_model: type[BaseModel],
on_progress: Callable[[dict, float], None] = None
) -> Optional[BaseModel]:
"""Extract with progress updates and cancellation support."""
partial_result = {}
total_fields = len(response_model.model_fields)
try:
async for partial in self.client.chat.completions.create_partial(
model="gpt-4o",
messages=messages,
response_model=response_model
):
# Check for cancellation
if self._cancel_event and self._cancel_event.is_set():
return None
# Update progress
completed_fields = sum(
1 for f in response_model.model_fields
if getattr(partial, f, None) is not None
)
progress = completed_fields / total_fields
if on_progress:
on_progress(partial.model_dump(exclude_unset=True), progress)
partial_result = partial
return partial_result
except asyncio.CancelledError:
return None
# Usage
async def main():
extractor = CancellableExtraction(client)
async with extractor.extraction_context():
# Start extraction in background
task = asyncio.create_task(
extractor.extract_with_progress(
messages=[{"role": "user", "content": "Analyze this long document..."}],
response_model=DocumentAnalysis,
on_progress=lambda p, pct: print(f"Progress: {pct:.0%}")
)
)
# Simulate user cancellation after 2 seconds
await asyncio.sleep(2)
extractor.cancel()
result = await task
if result is None:
print("Extraction was cancelled")
Provider-Specific Patterns
OpenAI-Specific Optimizations
from openai import OpenAI
class OpenAIStructuredOutputs:
"""OpenAI-specific structured output patterns."""
def __init__(self):
self.client = OpenAI()
def extract_with_strict_mode(
self,
messages: list[dict],
response_model: type[BaseModel]
) -> BaseModel:
"""Use OpenAI's strict structured output mode."""
# Convert Pydantic to JSON Schema
schema = response_model.model_json_schema()
# OpenAI requires additionalProperties: false for strict mode
def make_strict(s: dict) -> dict:
if s.get("type") == "object":
s["additionalProperties"] = False
if "properties" in s:
for prop in s["properties"].values():
make_strict(prop)
if "items" in s:
make_strict(s["items"])
return s
strict_schema = make_strict(schema.copy())
response = self.client.chat.completions.create(
model="gpt-4o-2024-08-06",
messages=messages,
response_format={
"type": "json_schema",
"json_schema": {
"name": response_model.__name__,
"strict": True,
"schema": strict_schema
}
}
)
return response_model.model_validate_json(
response.choices[0].message.content
)
def batch_extract(
self,
items: list[str],
response_model: type[BaseModel],
system_prompt: str
) -> list[BaseModel]:
"""Use OpenAI batch API for large-scale extraction."""
import json
import tempfile
# Create batch file
requests = []
for i, item in enumerate(items):
requests.append({
"custom_id": f"item-{i}",
"method": "POST",
"url": "/v1/chat/completions",
"body": {
"model": "gpt-4o-mini",
"messages": [
{"role": "system", "content": system_prompt},
{"role": "user", "content": item}
],
"response_format": {"type": "json_object"}
}
})
# Write to temp file
with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
for req in requests:
f.write(json.dumps(req) + '\n')
batch_file = f.name
# Upload and create batch
with open(batch_file, 'rb') as f:
file = self.client.files.create(file=f, purpose="batch")
batch = self.client.batches.create(
input_file_id=file.id,
endpoint="/v1/chat/completions",
completion_window="24h"
)
return batch.id # Poll for completion separately
Anthropic-Specific Patterns
import anthropic
class AnthropicStructuredOutputs:
"""Anthropic-specific structured output patterns."""
def __init__(self):
self.client = anthropic.Anthropic()
def extract_with_tool_use(
self,
messages: list[dict],
response_model: type[BaseModel]
) -> BaseModel:
"""Use Anthropic's tool use for structured extraction."""
# Define extraction as a tool
tool = {
"name": "extract_data",
"description": f"Extract and return structured data as {response_model.__name__}",
"input_schema": response_model.model_json_schema()
}
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
tools=[tool],
tool_choice={"type": "tool", "name": "extract_data"},
messages=messages
)
# Extract tool use result
for content in response.content:
if content.type == "tool_use" and content.name == "extract_data":
return response_model.model_validate(content.input)
raise ValueError("No tool use in response")
def extract_with_prefill(
self,
user_message: str,
response_model: type[BaseModel]
) -> BaseModel:
"""Use Claude's prefill feature for JSON extraction."""
schema_hint = json.dumps(response_model.model_json_schema(), indent=2)
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=4096,
messages=[
{
"role": "user",
"content": f"""Extract structured data from this text.
Schema:
```json
{schema_hint}
Text: {user_message}
Return only valid JSON matching the schema.""" }, { "role": "assistant", "content": "{" # Prefill to force JSON output } ] )
# Complete the JSON
json_str = "{" + response.content[0].text
return response_model.model_validate_json(json_str)
def extract_with_thinking(
self,
messages: list[dict],
response_model: type[BaseModel]
) -> tuple[BaseModel, str]:
"""Use extended thinking for complex extraction."""
response = self.client.messages.create(
model="claude-sonnet-4-20250514",
max_tokens=16000,
thinking={
"type": "enabled",
"budget_tokens": 10000
},
messages=messages + [
{
"role": "user",
"content": f"Extract data matching this schema: {response_model.model_json_schema()}"
}
]
)
thinking_content = ""
result_content = ""
for block in response.content:
if block.type == "thinking":
thinking_content = block.thinking
elif block.type == "text":
result_content = block.text
# Parse JSON from result
if "```json" in result_content:
result_content = result_content.split("```json")[1].split("```")[0]
return response_model.model_validate_json(result_content), thinking_content
---
## Cost Optimization Strategies
### Model Selection Based on Complexity
```python
from pydantic import BaseModel, Field
from enum import Enum
class ComplexityLevel(str, Enum):
SIMPLE = "simple" # Basic extraction, few fields
MODERATE = "moderate" # Multiple fields, some nesting
COMPLEX = "complex" # Deep nesting, validation, reasoning required
class SchemaComplexityAnalyzer:
"""Analyze schema complexity to choose appropriate model."""
@staticmethod
def analyze(schema: type[BaseModel]) -> ComplexityLevel:
json_schema = schema.model_json_schema()
# Count metrics
num_properties = len(json_schema.get("properties", {}))
max_depth = SchemaComplexityAnalyzer._get_depth(json_schema)
has_refs = "$defs" in json_schema
num_required = len(json_schema.get("required", []))
# Determine complexity
if max_depth <= 1 and num_properties <= 5 and not has_refs:
return ComplexityLevel.SIMPLE
elif max_depth <= 2 and num_properties <= 15:
return ComplexityLevel.MODERATE
else:
return ComplexityLevel.COMPLEX
@staticmethod
def _get_depth(schema: dict, current: int = 0) -> int:
max_depth = current
if "properties" in schema:
for prop in schema["properties"].values():
max_depth = max(max_depth, SchemaComplexityAnalyzer._get_depth(prop, current + 1))
if "items" in schema:
max_depth = max(max_depth, SchemaComplexityAnalyzer._get_depth(schema["items"], current + 1))
return max_depth
class CostOptimizedExtractor:
"""Extract with automatic model selection based on complexity."""
MODEL_MAP = {
ComplexityLevel.SIMPLE: "gpt-4o-mini",
ComplexityLevel.MODERATE: "gpt-4o-mini",
ComplexityLevel.COMPLEX: "gpt-4o"
}
# Approximate costs per 1M tokens
COST_MAP = {
"gpt-4o-mini": {"input": 0.15, "output": 0.60},
"gpt-4o": {"input": 2.50, "output": 10.00}
}
def __init__(self, client):
self.client = client
self.total_cost = 0.0
def extract(
self,
messages: list[dict],
response_model: type[BaseModel],
force_model: str = None
) -> tuple[BaseModel, dict]:
"""Extract with cost tracking."""
# Select model
if force_model:
model = force_model
else:
complexity = SchemaComplexityAnalyzer.analyze(response_model)
model = self.MODEL_MAP[complexity]
# Extract
response = self.client.chat.completions.create(
model=model,
messages=messages,
response_model=response_model
)
# Track cost (approximate)
# Note: actual token counts would come from response.usage
input_tokens = sum(len(m.get("content", "")) / 4 for m in messages)
output_tokens = 500 # Estimate
costs = self.COST_MAP[model]
cost = (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000
self.total_cost += cost
return response, {
"model": model,
"estimated_cost": cost,
"total_session_cost": self.total_cost
}
Caching with Semantic Similarity
from pydantic import BaseModel
import numpy as np
from typing import Optional, TypeVar
T = TypeVar("T", bound=BaseModel)
class SemanticCache:
"""Cache structured outputs with semantic similarity matching."""
def __init__(self, embedding_client, similarity_threshold: float = 0.95):
self.embedding_client = embedding_client
self.threshold = similarity_threshold
self.cache: list[tuple[np.ndarray, str, BaseModel]] = [] # (embedding, schema_name, result)
def _get_embedding(self, text: str) -> np.ndarray:
"""Get embedding for text."""
response = self.embedding_client.embeddings.create(
model="text-embedding-3-small",
input=text
)
return np.array(response.data[0].embedding)
def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
"""Calculate cosine similarity."""
return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))
def get(self, query: str, schema: type[T]) -> Optional[T]:
"""Check cache for similar query."""
query_embedding = self._get_embedding(query)
schema_name = schema.__name__
for cached_embedding, cached_schema, cached_result in self.cache:
if cached_schema != schema_name:
continue
similarity = self._cosine_similarity(query_embedding, cached_embedding)
if similarity >= self.threshold:
# Validate cached result still matches schema
try:
return schema.model_validate(cached_result.model_dump())
except:
continue
return None
def set(self, query: str, schema: type[BaseModel], result: BaseModel):
"""Add result to cache."""
embedding = self._get_embedding(query)
self.cache.append((embedding, schema.__name__, result))
# Limit cache size
if len(self.cache) > 1000:
self.cache = self.cache[-500:]
class CachedExtractor:
"""Extractor with semantic caching."""
def __init__(self, client, embedding_client):
self.client = client
self.cache = SemanticCache(embedding_client)
self.cache_hits = 0
self.cache_misses = 0
def extract(
self,
query: str,
response_model: type[T],
use_cache: bool = True
) -> tuple[T, bool]:
"""Extract with caching. Returns (result, was_cached)."""
if use_cache:
cached = self.cache.get(query, response_model)
if cached:
self.cache_hits += 1
return cached, True
self.cache_misses += 1
result = self.client.chat.completions.create(
model="gpt-4o",
messages=[{"role": "user", "content": query}],
response_model=response_model
)
if use_cache:
self.cache.set(query, response_model, result)
return result, False
def get_stats(self) -> dict:
"""Get cache statistics."""
total = self.cache_hits + self.cache_misses
return {
"cache_hits": self.cache_hits,
"cache_misses": self.cache_misses,
"hit_rate": self.cache_hits / total if total > 0 else 0,
"cache_size": len(self.cache.cache)
}
Common Pitfalls and Solutions
Pitfall 1: Overly Complex Schemas
# BAD: Deeply nested schema that confuses the model
class BadSchema(BaseModel):
data: dict[str, dict[str, list[dict[str, Any]]]]
# GOOD: Flatten and simplify
class GoodSchema(BaseModel):
items: list["Item"]
class Item(BaseModel):
id: str
name: str
value: float
Pitfall 2: Ambiguous Field Names
# BAD: Ambiguous field name
class BadSchema(BaseModel):
date: str # What format? Created date? Due date?
# GOOD: Specific field name with description
class GoodSchema(BaseModel):
created_at: str = Field(
...,
description="Creation timestamp in ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)"
)
Pitfall 3: Not Handling Empty/Null Cases
# BAD: Assumes data always exists
class BadSchema(BaseModel):
items: list[str] # What if no items found?
# GOOD: Handle empty cases explicitly
class GoodSchema(BaseModel):
items: list[str] = Field(default_factory=list)
no_results_reason: Optional[str] = Field(
None,
description="If no items found, explain why"
)
Pitfall 4: Inconsistent Enum Values
# BAD: Free-form string that will vary
class BadSchema(BaseModel):
status: str # "Active", "active", "ACTIVE", "enabled"...
# GOOD: Constrained enum
class Status(str, Enum):
ACTIVE = "active"
INACTIVE = "inactive"
PENDING = "pending"
class GoodSchema(BaseModel):
status: Status
Conclusion
Structured outputs transform LLMs from text generators into reliable data extraction and processing tools. The key principles:
- Use the strongest guarantees available: Native structured outputs > Instructor > JSON mode > prompting
- Design schemas for clarity: Good descriptions, specific field names, appropriate types
- Handle errors gracefully: Retries, fallbacks, partial extraction
- Monitor in production: Track success rates, latencies, common failures
- Test thoroughly: Schema validation, edge cases, round-trip serialization
Start with Instructor for most use cases—it provides the best balance of reliability, flexibility, and provider support.
Frequently Asked Questions
Related Articles
Building Agentic AI Systems: A Complete Implementation Guide
A comprehensive guide to building AI agents—tool use, ReAct pattern, planning, memory, context management, MCP integration, and multi-agent orchestration. With full prompt examples and production patterns.
Building MCP Servers: Custom Tool Integrations for AI Agents
A comprehensive guide to building Model Context Protocol (MCP) servers—from basic tool exposure to production-grade integrations with authentication, streaming, and error handling.
LLM Evaluation in Production: Beyond Benchmarks
How to evaluate LLM performance in real-world applications, where academic benchmarks often fail to capture what matters.