Skip to main content
Back to Blog

Structured Outputs and Tool Use: Patterns for Reliable AI Applications

Master structured output generation and tool use patterns—JSON mode, schema enforcement, Instructor library, function calling best practices, error handling, and production patterns for reliable AI applications.

8 min read
Share:

The Reliability Problem

LLMs are probabilistic text generators. They're incredibly powerful, but getting them to produce exactly the output format you need—every time, without fail—requires deliberate engineering. A chatbot can tolerate occasional formatting quirks. An agentic system that parses LLM output to make API calls cannot.

2025: The year structured outputs became production-ready: Both OpenAI and Anthropic now offer native structured output APIs that guarantee schema compliance. OpenAI's structured outputs with gpt-4o-2024-08-06 scores a perfect 100% on complex JSON schema following. Anthropic's structured outputs (beta header structured-outputs-2025-11-13) brings the same guarantees to Claude models.

Best practice: Always prefer structured outputs over JSON mode. According to production LLM guides, structured outputs are "the evolution of JSON mode" because they strictly enforce adherence to a specified schema, ensuring consistent, valid, and type-safe responses. This eliminates the need for fragile post-processing or regex hacks.

Why structured outputs are the foundation of agentic AI: Agents work by parsing LLM output to determine what action to take. If the model returns {"action": "search", "query": "latest news"}, your code can execute that action. If the model returns The action I'll take is to search for "latest news", your parsing breaks. Structured outputs transform LLMs from conversational interfaces into programmable APIs. This is the bridge between "AI that talks" and "AI that acts."

The reliability pyramid: There's a hierarchy of approaches, each trading flexibility for reliability. Prompt engineering is most flexible but least reliable. JSON mode guarantees valid JSON but not schema compliance. Structured outputs guarantee schema compliance but limit schema complexity. Instructor/Outlines add retry logic and validation. Understanding this pyramid helps you choose the right tool for each use case.

This guide covers everything you need to build reliable structured output pipelines: native JSON modes, schema enforcement with Instructor, function calling patterns, error handling, and production-ready implementations.

Prerequisites:

  • Familiarity with building agentic AI systems
  • Basic understanding of JSON Schema
  • Python experience

What you'll learn:

  • Native JSON mode vs. structured output APIs
  • Schema enforcement with Instructor and Pydantic
  • Function calling patterns and best practices
  • Error handling and retry strategies
  • Production patterns for reliable extraction

The Structured Output Landscape

Different approaches to getting structured output from LLMs:

ApproachReliabilityFlexibilityLatencyProvider Support
Prompt engineeringLowHighLowAll
JSON modeMediumMediumLowOpenAI, Anthropic, Google
Structured outputsHighMediumLowOpenAI
Function callingHighMediumLowOpenAI, Anthropic, Google
Instructor/OutlinesVery HighHighMediumAll (via wrapper)

Native JSON Mode

The simplest approach: tell the model to output JSON. JSON mode tells the model "output must be valid JSON" but doesn't enforce a specific schema. It's a lightweight solution that works across providers, but requires additional validation in your code.

JSON mode is best for simple extraction tasks where the schema is straightforward and you're okay handling occasional format variations. For critical applications, use structured outputs (next section) which guarantee schema compliance.

OpenAI JSON Mode

OpenAI's JSON mode is enabled with response_format={"type": "json_object"}. The model will always output parseable JSON, but the schema depends entirely on your prompt. Be explicit about the exact keys and types you expect.

Python
from openai import OpenAI

client = OpenAI()

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {
            "role": "system",
            "content": "Extract user information. Output valid JSON with keys: name, email, age"
        },
        {
            "role": "user",
            "content": "John Smith is 32 years old and can be reached at john@example.com"
        }
    ],
    response_format={"type": "json_object"}
)

import json
data = json.loads(response.choices[0].message.content)
print(data)
# {"name": "John Smith", "email": "john@example.com", "age": 32}

Anthropic JSON Mode

Python
import anthropic
import json

client = anthropic.Anthropic()

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[
        {
            "role": "user",
            "content": """Extract user information from this text and return as JSON:

John Smith is 32 years old and can be reached at john@example.com

Return JSON with keys: name, email, age"""
        }
    ]
)

# Claude often wraps JSON in markdown code blocks
content = response.content[0].text
if "```json" in content:
    content = content.split("```json")[1].split("```")[0]
elif "```" in content:
    content = content.split("```")[1].split("```")[0]

data = json.loads(content.strip())

Limitations of Basic JSON Mode

JSON mode guarantees valid JSON but not valid schema:

Python
# You asked for: {"name": str, "email": str, "age": int}
# You might get:
{"name": "John Smith", "email": "john@example.com", "age": "32"}  # age is string
{"name": "John", "surname": "Smith", "email": "john@example.com"}  # missing age
{"user": {"name": "John Smith"}, "contact": "john@example.com"}  # different structure

OpenAI Structured Outputs

OpenAI's structured outputs API guarantees schema compliance:

Python
from openai import OpenAI
from pydantic import BaseModel

client = OpenAI()

class UserInfo(BaseModel):
    name: str
    email: str
    age: int

response = client.beta.chat.completions.parse(
    model="gpt-4o-2024-08-06",
    messages=[
        {
            "role": "system",
            "content": "Extract user information from the text."
        },
        {
            "role": "user",
            "content": "John Smith is 32 years old and can be reached at john@example.com"
        }
    ],
    response_format=UserInfo
)

user = response.choices[0].message.parsed
print(user.name)   # "John Smith"
print(user.email)  # "john@example.com"
print(user.age)    # 32 (guaranteed to be int)

Complex Schemas

Python
from pydantic import BaseModel, Field
from typing import Optional, Literal
from enum import Enum

class Priority(str, Enum):
    LOW = "low"
    MEDIUM = "medium"
    HIGH = "high"
    CRITICAL = "critical"

class Task(BaseModel):
    title: str = Field(..., description="Brief task title")
    description: str = Field(..., description="Detailed task description")
    priority: Priority
    estimated_hours: Optional[float] = Field(None, ge=0, le=100)
    tags: list[str] = Field(default_factory=list)

class ProjectPlan(BaseModel):
    project_name: str
    goal: str
    tasks: list[Task]
    total_estimated_hours: float

response = client.beta.chat.completions.parse(
    model="gpt-4o-2024-08-06",
    messages=[
        {
            "role": "user",
            "content": """Create a project plan for building a REST API:

            The API should handle user authentication, product catalog,
            and order management. Estimate 2 weeks of work."""
        }
    ],
    response_format=ProjectPlan
)

plan = response.choices[0].message.parsed
for task in plan.tasks:
    print(f"[{task.priority.value}] {task.title}: {task.estimated_hours}h")

Handling Refusals

Structured outputs can still refuse to answer:

Python
response = client.beta.chat.completions.parse(
    model="gpt-4o-2024-08-06",
    messages=[...],
    response_format=UserInfo
)

if response.choices[0].message.refusal:
    print(f"Model refused: {response.choices[0].message.refusal}")
else:
    user = response.choices[0].message.parsed
    # Use parsed data

Instructor: The Structured Output Library

Instructor is the go-to library for production structured outputs. It wraps LLM clients (OpenAI, Anthropic, etc.) and adds: (1) automatic retries when validation fails, (2) Pydantic model integration for type safety, (3) streaming partial objects, and (4) multiple extraction modes.

The key insight: when the model outputs invalid data, Instructor feeds the validation error back to the model and asks it to try again. This "retry with feedback" loop dramatically improves reliability. Instead of writing complex error handling, you define your schema with Pydantic validators, and Instructor handles the rest.

Basic Usage

The instructor.from_openai() function patches the OpenAI client to support a response_model parameter. Your Pydantic model defines the expected output schema, and Instructor ensures the response matches it.

Bash
pip install instructor
Python
import instructor
from openai import OpenAI
from pydantic import BaseModel, Field, field_validator

# Patch the client
client = instructor.from_openai(OpenAI())

class UserInfo(BaseModel):
    name: str
    email: str
    age: int = Field(..., ge=0, le=150)

    @field_validator("email")
    @classmethod
    def validate_email(cls, v):
        if "@" not in v:
            raise ValueError("Invalid email format")
        return v

# Get structured output with automatic retries on validation failure
user = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "John Smith, 32, john@example.com"}
    ],
    response_model=UserInfo,
    max_retries=3  # Retry if validation fails
)

print(user)  # UserInfo(name='John Smith', email='john@example.com', age=32)

With Anthropic

Python
import instructor
from anthropic import Anthropic

client = instructor.from_anthropic(Anthropic())

user = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    messages=[
        {"role": "user", "content": "John Smith, 32, john@example.com"}
    ],
    response_model=UserInfo
)

Complex Extraction

Python
from pydantic import BaseModel, Field
from typing import Optional
from datetime import date

class Address(BaseModel):
    street: str
    city: str
    state: str
    zip_code: str
    country: str = "USA"

class Employment(BaseModel):
    company: str
    title: str
    start_date: date
    end_date: Optional[date] = None
    current: bool = False

class Person(BaseModel):
    name: str
    email: Optional[str] = None
    phone: Optional[str] = None
    address: Optional[Address] = None
    employment_history: list[Employment] = Field(default_factory=list)
    skills: list[str] = Field(default_factory=list)

# Extract from unstructured resume text
resume_text = """
JOHN SMITH
john.smith@email.com | (555) 123-4567
123 Main St, San Francisco, CA 94102

EXPERIENCE

Senior Software Engineer
TechCorp Inc. | Jan 2020 - Present
- Led development of microservices architecture
- Managed team of 5 engineers

Software Engineer
StartupXYZ | Jun 2017 - Dec 2019
- Built REST APIs and frontend applications

SKILLS
Python, JavaScript, AWS, Docker, Kubernetes
"""

person = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {
            "role": "system",
            "content": "Extract structured information from this resume."
        },
        {"role": "user", "content": resume_text}
    ],
    response_model=Person
)

print(f"Name: {person.name}")
print(f"Skills: {', '.join(person.skills)}")
for job in person.employment_history:
    status = "(current)" if job.current else ""
    print(f"  {job.title} at {job.company} {status}")

Streaming Structured Outputs

Python
from instructor import Partial

class Article(BaseModel):
    title: str
    summary: str
    key_points: list[str]
    sentiment: Literal["positive", "negative", "neutral"]

# Stream partial results as they're generated
for partial_article in client.chat.completions.create_partial(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "Analyze this article: [article text]"}
    ],
    response_model=Article
):
    # partial_article has fields populated as they're generated
    if partial_article.title:
        print(f"Title: {partial_article.title}")
    if partial_article.key_points:
        print(f"Points so far: {len(partial_article.key_points)}")

Validation and Retries

Pydantic validators are your first line of defense against bad LLM output. Field validators check individual values; model validators check relationships between fields. When validation fails, Instructor shows the error to the model and retries.

This is powerful: you can encode domain rules (no SQL keywords, valid date ranges, consistent references) as validators. The model learns from its mistakes and usually succeeds within 2-3 retries. Validation errors become training signals, not crash causes.

Python
from pydantic import BaseModel, Field, field_validator, model_validator

class SearchQuery(BaseModel):
    query: str = Field(..., min_length=3, max_length=200)
    filters: dict[str, str] = Field(default_factory=dict)
    limit: int = Field(default=10, ge=1, le=100)

    @field_validator("query")
    @classmethod
    def clean_query(cls, v):
        # Remove potential SQL injection
        dangerous = ["DROP", "DELETE", "INSERT", "--", ";"]
        for d in dangerous:
            if d.lower() in v.lower():
                raise ValueError(f"Query contains forbidden term: {d}")
        return v.strip()

    @model_validator(mode="after")
    def validate_filters(self):
        allowed_filter_keys = {"category", "date_range", "author", "status"}
        invalid_keys = set(self.filters.keys()) - allowed_filter_keys
        if invalid_keys:
            raise ValueError(f"Invalid filter keys: {invalid_keys}")
        return self

# Instructor will retry with the validation error message
# helping the model correct its output
query = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "Search for python tutorials from last month, limit 25"}
    ],
    response_model=SearchQuery,
    max_retries=3
)

Custom Retry Logic

Python
from instructor import Maybe, Mode
from tenacity import Retrying, stop_after_attempt, wait_exponential

class ExtractedData(BaseModel):
    value: float
    unit: str
    confidence: float = Field(..., ge=0, le=1)

# Maybe wrapper for graceful failures
class MaybeExtractedData(BaseModel):
    result: Optional[ExtractedData] = None
    error: Optional[str] = None

# Custom retry configuration
client = instructor.from_openai(
    OpenAI(),
    mode=Mode.TOOLS  # Use function calling mode
)

def extract_with_custom_retry(text: str) -> MaybeExtractedData:
    try:
        for attempt in Retrying(
            stop=stop_after_attempt(3),
            wait=wait_exponential(multiplier=1, min=1, max=10)
        ):
            with attempt:
                result = client.chat.completions.create(
                    model="gpt-4o",
                    messages=[
                        {"role": "user", "content": f"Extract measurement: {text}"}
                    ],
                    response_model=ExtractedData
                )
                return MaybeExtractedData(result=result)
    except Exception as e:
        return MaybeExtractedData(error=str(e))

Function Calling (Tool Use)

Function calling lets LLMs trigger actions in your application. Instead of generating text that you parse, the model outputs structured function calls: "call get_weather with location='San Francisco'". Your code executes the function and returns results to the model.

This is the foundation of agentic AI. The model decides which tool to use based on context, generates valid arguments, and interprets results. Function calling is more reliable than parsing free-form text because the model's output is constrained to valid function signatures.

OpenAI Function Calling

Define tools as JSON schemas. The description field is critical—it's what the model reads to decide when to use each tool. Be specific about capabilities and when the tool is appropriate.

Python
from openai import OpenAI
import json

client = OpenAI()

# Define tools
tools = [
    {
        "type": "function",
        "function": {
            "name": "get_weather",
            "description": "Get current weather for a location. Use this when user asks about weather.",
            "parameters": {
                "type": "object",
                "properties": {
                    "location": {
                        "type": "string",
                        "description": "City and state, e.g., 'San Francisco, CA'"
                    },
                    "unit": {
                        "type": "string",
                        "enum": ["celsius", "fahrenheit"],
                        "description": "Temperature unit"
                    }
                },
                "required": ["location"]
            }
        }
    },
    {
        "type": "function",
        "function": {
            "name": "search_web",
            "description": "Search the web for information. Use for current events or facts.",
            "parameters": {
                "type": "object",
                "properties": {
                    "query": {
                        "type": "string",
                        "description": "Search query"
                    },
                    "num_results": {
                        "type": "integer",
                        "description": "Number of results (1-10)",
                        "default": 5
                    }
                },
                "required": ["query"]
            }
        }
    }
]

response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "What's the weather like in Tokyo?"}
    ],
    tools=tools,
    tool_choice="auto"  # Let model decide
)

# Check if model wants to call a function
message = response.choices[0].message

if message.tool_calls:
    for tool_call in message.tool_calls:
        function_name = tool_call.function.name
        arguments = json.loads(tool_call.function.arguments)

        print(f"Calling: {function_name}")
        print(f"Arguments: {arguments}")

        # Execute the function
        if function_name == "get_weather":
            result = get_weather(**arguments)
        elif function_name == "search_web":
            result = search_web(**arguments)

        # Continue conversation with result
        messages = [
            {"role": "user", "content": "What's the weather like in Tokyo?"},
            message,
            {
                "role": "tool",
                "tool_call_id": tool_call.id,
                "content": json.dumps(result)
            }
        ]

        final_response = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            tools=tools
        )
        print(final_response.choices[0].message.content)

Anthropic Tool Use

Python
import anthropic

client = anthropic.Anthropic()

tools = [
    {
        "name": "get_weather",
        "description": "Get current weather for a location",
        "input_schema": {
            "type": "object",
            "properties": {
                "location": {
                    "type": "string",
                    "description": "City and state"
                },
                "unit": {
                    "type": "string",
                    "enum": ["celsius", "fahrenheit"]
                }
            },
            "required": ["location"]
        }
    }
]

response = client.messages.create(
    model="claude-3-5-sonnet-20241022",
    max_tokens=1024,
    tools=tools,
    messages=[
        {"role": "user", "content": "What's the weather in Tokyo?"}
    ]
)

# Handle tool use
for content in response.content:
    if content.type == "tool_use":
        tool_name = content.name
        tool_input = content.input
        tool_use_id = content.id

        print(f"Tool: {tool_name}, Input: {tool_input}")

        # Execute tool and continue
        result = get_weather(**tool_input)

        # Send result back
        final_response = client.messages.create(
            model="claude-3-5-sonnet-20241022",
            max_tokens=1024,
            tools=tools,
            messages=[
                {"role": "user", "content": "What's the weather in Tokyo?"},
                {"role": "assistant", "content": response.content},
                {
                    "role": "user",
                    "content": [
                        {
                            "type": "tool_result",
                            "tool_use_id": tool_use_id,
                            "content": json.dumps(result)
                        }
                    ]
                }
            ]
        )

Parallel Tool Calls

Models can request multiple tool calls simultaneously:

Python
response = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "What's the weather in Tokyo and New York?"}
    ],
    tools=tools,
    parallel_tool_calls=True
)

message = response.choices[0].message

if message.tool_calls:
    # Execute all tool calls in parallel
    import asyncio

    async def execute_tools(tool_calls):
        tasks = []
        for tc in tool_calls:
            func_name = tc.function.name
            args = json.loads(tc.function.arguments)
            tasks.append(execute_tool_async(func_name, args))
        return await asyncio.gather(*tasks)

    results = asyncio.run(execute_tools(message.tool_calls))

    # Send all results back
    tool_messages = [
        {
            "role": "tool",
            "tool_call_id": tc.id,
            "content": json.dumps(result)
        }
        for tc, result in zip(message.tool_calls, results)
    ]

    messages = [
        {"role": "user", "content": "What's the weather in Tokyo and New York?"},
        message,
        *tool_messages
    ]

    final_response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        tools=tools
    )

Tool Definition Best Practices

Descriptive Tool Definitions

Python
# BAD: Vague description
{
    "name": "search",
    "description": "Search for things",
    "parameters": {
        "type": "object",
        "properties": {
            "q": {"type": "string"}
        }
    }
}

# GOOD: Detailed description with examples
{
    "name": "search_products",
    "description": """Search the product catalog.

Use this tool when the user wants to:
- Find products by name, category, or description
- Browse available items
- Check if a specific product exists

DO NOT use for:
- Checking inventory/stock levels (use check_inventory instead)
- Price comparisons (use compare_prices instead)
- Order history (use get_orders instead)

Examples of good queries:
- "wireless headphones under $100"
- "blue running shoes size 10"
- "laptop with 16GB RAM"
""",
    "parameters": {
        "type": "object",
        "properties": {
            "query": {
                "type": "string",
                "description": "Search query. Can include product names, categories, attributes, or natural language descriptions."
            },
            "category": {
                "type": "string",
                "enum": ["electronics", "clothing", "home", "sports", "books"],
                "description": "Optional: Filter by product category"
            },
            "price_max": {
                "type": "number",
                "description": "Optional: Maximum price in USD"
            },
            "in_stock_only": {
                "type": "boolean",
                "default": True,
                "description": "Only return products currently in stock"
            }
        },
        "required": ["query"]
    }
}

Schema Design Patterns

Python
from pydantic import BaseModel, Field
from typing import Optional, Literal, Annotated
from datetime import datetime

# Pattern 1: Use enums for constrained values
class OrderStatus(str, Enum):
    PENDING = "pending"
    PROCESSING = "processing"
    SHIPPED = "shipped"
    DELIVERED = "delivered"
    CANCELLED = "cancelled"

# Pattern 2: Use Literal for small fixed sets
class CreateOrder(BaseModel):
    product_id: str
    quantity: int = Field(..., ge=1, le=100)
    shipping_method: Literal["standard", "express", "overnight"]
    gift_wrap: bool = False

# Pattern 3: Nested objects for complex data
class Address(BaseModel):
    street: str
    city: str
    state: str = Field(..., min_length=2, max_length=2)
    zip_code: str = Field(..., pattern=r"^\d{5}(-\d{4})?$")

class Customer(BaseModel):
    name: str
    email: str
    shipping_address: Address
    billing_address: Optional[Address] = None  # Defaults to shipping

# Pattern 4: Discriminated unions for polymorphic data
class TextContent(BaseModel):
    type: Literal["text"] = "text"
    text: str

class ImageContent(BaseModel):
    type: Literal["image"] = "image"
    url: str
    alt_text: Optional[str] = None

class CodeContent(BaseModel):
    type: Literal["code"] = "code"
    language: str
    code: str

Content = Annotated[
    TextContent | ImageContent | CodeContent,
    Field(discriminator="type")
]

class Message(BaseModel):
    role: Literal["user", "assistant"]
    content: list[Content]

# Pattern 5: Self-referential schemas (with care)
class TreeNode(BaseModel):
    value: str
    children: list["TreeNode"] = Field(default_factory=list)

TreeNode.model_rebuild()  # Required for forward references

Converting Pydantic to JSON Schema

Python
from pydantic import BaseModel, Field
from typing import get_type_hints
import json

def pydantic_to_openai_tool(model: type[BaseModel], name: str, description: str) -> dict:
    """Convert Pydantic model to OpenAI tool definition."""
    schema = model.model_json_schema()

    # Remove $defs if present (OpenAI doesn't support references well)
    if "$defs" in schema:
        schema = _resolve_refs(schema)

    return {
        "type": "function",
        "function": {
            "name": name,
            "description": description,
            "parameters": schema
        }
    }

def _resolve_refs(schema: dict) -> dict:
    """Inline $ref references in JSON schema."""
    defs = schema.pop("$defs", {})

    def resolve(obj):
        if isinstance(obj, dict):
            if "$ref" in obj:
                ref_path = obj["$ref"].split("/")[-1]
                return resolve(defs.get(ref_path, {}))
            return {k: resolve(v) for k, v in obj.items()}
        elif isinstance(obj, list):
            return [resolve(item) for item in obj]
        return obj

    return resolve(schema)

# Usage
class SearchParams(BaseModel):
    query: str = Field(..., description="Search query")
    filters: dict[str, str] = Field(default_factory=dict)
    limit: int = Field(default=10, ge=1, le=100)

tool = pydantic_to_openai_tool(
    SearchParams,
    name="search",
    description="Search the database"
)
print(json.dumps(tool, indent=2))

Error Handling Patterns

Graceful Degradation

Python
from pydantic import BaseModel, ValidationError
from typing import Optional, TypeVar, Generic
import json

T = TypeVar("T", bound=BaseModel)

class ExtractionResult(BaseModel, Generic[T]):
    """Wrapper for extraction results with error handling."""
    success: bool
    data: Optional[T] = None
    raw_output: Optional[str] = None
    error: Optional[str] = None
    retries: int = 0

def extract_with_fallback(
    client,
    messages: list[dict],
    response_model: type[T],
    max_retries: int = 3,
    fallback_model: str = "gpt-4o-mini"
) -> ExtractionResult[T]:
    """Extract structured data with graceful fallback."""

    models = ["gpt-4o", fallback_model]
    last_error = None

    for model in models:
        for attempt in range(max_retries):
            try:
                result = client.chat.completions.create(
                    model=model,
                    messages=messages,
                    response_model=response_model
                )
                return ExtractionResult(
                    success=True,
                    data=result,
                    retries=attempt
                )
            except ValidationError as e:
                last_error = str(e)
                # Add error context to messages for retry
                messages = messages + [
                    {
                        "role": "assistant",
                        "content": f"Previous attempt failed validation: {e}"
                    },
                    {
                        "role": "user",
                        "content": "Please fix the errors and try again."
                    }
                ]
            except Exception as e:
                last_error = str(e)
                break  # Try next model

    return ExtractionResult(
        success=False,
        error=last_error,
        retries=max_retries * len(models)
    )

Partial Extraction

Python
from pydantic import BaseModel, Field, model_validator
from typing import Optional

class PartialUserInfo(BaseModel):
    """User info with optional fields for partial extraction."""
    name: Optional[str] = None
    email: Optional[str] = None
    phone: Optional[str] = None
    address: Optional[str] = None
    age: Optional[int] = None

    @model_validator(mode="after")
    def at_least_one_field(self):
        """Ensure at least one field was extracted."""
        if all(v is None for v in [self.name, self.email, self.phone, self.address, self.age]):
            raise ValueError("Must extract at least one field")
        return self

    def completeness_score(self) -> float:
        """Calculate how complete the extraction is."""
        fields = [self.name, self.email, self.phone, self.address, self.age]
        return sum(1 for f in fields if f is not None) / len(fields)

# Extract whatever we can
user = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {
            "role": "user",
            "content": "Contact John at john@example.com"  # Limited info
        }
    ],
    response_model=PartialUserInfo
)

print(f"Extracted: {user}")
print(f"Completeness: {user.completeness_score():.0%}")

Schema Evolution

Python
from pydantic import BaseModel, Field
from typing import Optional, Any

class UserInfoV1(BaseModel):
    """Original schema."""
    name: str
    email: str

class UserInfoV2(BaseModel):
    """Updated schema with new fields."""
    name: str
    email: str
    phone: Optional[str] = None  # New in v2
    preferences: dict[str, Any] = Field(default_factory=dict)  # New in v2

def migrate_v1_to_v2(v1: UserInfoV1) -> UserInfoV2:
    """Migrate from v1 to v2 schema."""
    return UserInfoV2(
        name=v1.name,
        email=v1.email,
        phone=None,
        preferences={}
    )

class SchemaVersionHandler:
    """Handle multiple schema versions."""

    def __init__(self):
        self.schemas = {
            "v1": UserInfoV1,
            "v2": UserInfoV2
        }
        self.migrations = {
            ("v1", "v2"): migrate_v1_to_v2
        }
        self.current_version = "v2"

    def extract(self, client, messages: list[dict], version: str = None):
        """Extract using specified or current schema version."""
        version = version or self.current_version
        schema = self.schemas[version]

        result = client.chat.completions.create(
            model="gpt-4o",
            messages=messages,
            response_model=schema
        )

        # Migrate to current version if needed
        if version != self.current_version:
            migration_key = (version, self.current_version)
            if migration_key in self.migrations:
                result = self.migrations[migration_key](result)

        return result

Production Patterns

Caching Structured Outputs

Python
import hashlib
import json
from functools import lru_cache
from typing import TypeVar, Type
from pydantic import BaseModel

T = TypeVar("T", bound=BaseModel)

class StructuredOutputCache:
    """Cache for structured output extractions."""

    def __init__(self, ttl_seconds: int = 3600):
        self.cache: dict[str, tuple[float, Any]] = {}
        self.ttl = ttl_seconds

    def _hash_key(self, model: str, messages: list[dict], response_model: type) -> str:
        """Generate cache key."""
        content = json.dumps({
            "model": model,
            "messages": messages,
            "schema": response_model.model_json_schema()
        }, sort_keys=True)
        return hashlib.sha256(content.encode()).hexdigest()

    def get(self, key: str) -> Optional[BaseModel]:
        """Get cached result."""
        if key in self.cache:
            timestamp, value = self.cache[key]
            if time.time() - timestamp < self.ttl:
                return value
            del self.cache[key]
        return None

    def set(self, key: str, value: BaseModel):
        """Cache a result."""
        self.cache[key] = (time.time(), value)

cache = StructuredOutputCache()

def cached_extract(
    client,
    model: str,
    messages: list[dict],
    response_model: Type[T]
) -> T:
    """Extract with caching."""
    key = cache._hash_key(model, messages, response_model)

    cached = cache.get(key)
    if cached:
        return cached

    result = client.chat.completions.create(
        model=model,
        messages=messages,
        response_model=response_model
    )

    cache.set(key, result)
    return result

Batch Processing

Python
import asyncio
from dataclasses import dataclass
from typing import TypeVar, Generic

T = TypeVar("T", bound=BaseModel)

@dataclass
class BatchResult(Generic[T]):
    index: int
    success: bool
    result: Optional[T] = None
    error: Optional[str] = None

async def batch_extract(
    client,
    items: list[str],
    response_model: Type[T],
    system_prompt: str,
    max_concurrent: int = 10
) -> list[BatchResult[T]]:
    """Extract structured data from multiple items concurrently."""

    semaphore = asyncio.Semaphore(max_concurrent)

    async def process_item(index: int, item: str) -> BatchResult[T]:
        async with semaphore:
            try:
                result = await asyncio.to_thread(
                    client.chat.completions.create,
                    model="gpt-4o",
                    messages=[
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": item}
                    ],
                    response_model=response_model
                )
                return BatchResult(index=index, success=True, result=result)
            except Exception as e:
                return BatchResult(index=index, success=False, error=str(e))

    tasks = [process_item(i, item) for i, item in enumerate(items)]
    results = await asyncio.gather(*tasks)

    return sorted(results, key=lambda r: r.index)

# Usage
class ProductInfo(BaseModel):
    name: str
    price: float
    category: str

descriptions = [
    "iPhone 15 Pro - $999, Electronics",
    "Nike Air Max - $150, Shoes",
    "The Great Gatsby - $15, Books"
]

results = asyncio.run(batch_extract(
    client,
    descriptions,
    ProductInfo,
    "Extract product information."
))

for r in results:
    if r.success:
        print(f"{r.result.name}: ${r.result.price}")
    else:
        print(f"Error at index {r.index}: {r.error}")

Monitoring and Observability

Python
import time
from dataclasses import dataclass, field
from typing import Optional
from collections import defaultdict

@dataclass
class ExtractionMetrics:
    model: str
    schema_name: str
    success: bool
    latency_ms: float
    tokens_used: int
    retries: int
    validation_errors: list[str] = field(default_factory=list)

class ExtractionMonitor:
    """Monitor structured output extractions."""

    def __init__(self):
        self.metrics: list[ExtractionMetrics] = []

    def record(self, metrics: ExtractionMetrics):
        """Record extraction metrics."""
        self.metrics.append(metrics)

    def get_stats(self, schema_name: Optional[str] = None) -> dict:
        """Get aggregate statistics."""
        filtered = self.metrics
        if schema_name:
            filtered = [m for m in self.metrics if m.schema_name == schema_name]

        if not filtered:
            return {"error": "No data"}

        success_count = sum(1 for m in filtered if m.success)
        latencies = [m.latency_ms for m in filtered]

        return {
            "total_extractions": len(filtered),
            "success_rate": success_count / len(filtered),
            "avg_latency_ms": sum(latencies) / len(latencies),
            "p95_latency_ms": sorted(latencies)[int(len(latencies) * 0.95)],
            "avg_retries": sum(m.retries for m in filtered) / len(filtered),
            "common_errors": self._get_common_errors(filtered)
        }

    def _get_common_errors(self, metrics: list[ExtractionMetrics]) -> list[tuple[str, int]]:
        """Get most common validation errors."""
        error_counts = defaultdict(int)
        for m in metrics:
            for error in m.validation_errors:
                error_counts[error] += 1
        return sorted(error_counts.items(), key=lambda x: -x[1])[:5]

monitor = ExtractionMonitor()

def monitored_extract(
    client,
    model: str,
    messages: list[dict],
    response_model: Type[T]
) -> T:
    """Extract with monitoring."""
    start_time = time.time()
    retries = 0
    validation_errors = []

    try:
        result = client.chat.completions.create(
            model=model,
            messages=messages,
            response_model=response_model,
            max_retries=3
        )
        success = True
    except Exception as e:
        success = False
        validation_errors.append(str(e))
        raise
    finally:
        monitor.record(ExtractionMetrics(
            model=model,
            schema_name=response_model.__name__,
            success=success,
            latency_ms=(time.time() - start_time) * 1000,
            tokens_used=0,  # Would need to track from response
            retries=retries,
            validation_errors=validation_errors
        ))

    return result

Testing Structured Outputs

Python
import pytest
from pydantic import BaseModel, ValidationError

class UserInfo(BaseModel):
    name: str
    email: str
    age: int

@pytest.fixture
def mock_llm_response():
    """Mock LLM responses for testing."""
    responses = {
        "valid": '{"name": "John", "email": "john@example.com", "age": 30}',
        "invalid_type": '{"name": "John", "email": "john@example.com", "age": "thirty"}',
        "missing_field": '{"name": "John", "email": "john@example.com"}',
        "extra_field": '{"name": "John", "email": "john@example.com", "age": 30, "phone": "555-1234"}',
    }
    return responses

class TestStructuredOutputs:
    def test_valid_extraction(self, mock_llm_response):
        """Test extraction with valid response."""
        data = UserInfo.model_validate_json(mock_llm_response["valid"])
        assert data.name == "John"
        assert data.age == 30

    def test_invalid_type_raises(self, mock_llm_response):
        """Test that invalid types raise ValidationError."""
        with pytest.raises(ValidationError) as exc_info:
            UserInfo.model_validate_json(mock_llm_response["invalid_type"])
        assert "age" in str(exc_info.value)

    def test_missing_field_raises(self, mock_llm_response):
        """Test that missing required fields raise ValidationError."""
        with pytest.raises(ValidationError):
            UserInfo.model_validate_json(mock_llm_response["missing_field"])

    def test_extra_fields_ignored(self, mock_llm_response):
        """Test that extra fields are ignored by default."""
        data = UserInfo.model_validate_json(mock_llm_response["extra_field"])
        assert data.name == "John"
        assert not hasattr(data, "phone")

    def test_schema_generation(self):
        """Test JSON schema generation."""
        schema = UserInfo.model_json_schema()
        assert "properties" in schema
        assert "name" in schema["properties"]
        assert schema["properties"]["age"]["type"] == "integer"

    def test_round_trip(self):
        """Test serialization round-trip."""
        original = UserInfo(name="John", email="john@example.com", age=30)
        json_str = original.model_dump_json()
        restored = UserInfo.model_validate_json(json_str)
        assert original == restored

Advanced Schema Patterns

Recursive and Self-Referential Schemas

For tree structures, graphs, and nested hierarchies:

Python
from pydantic import BaseModel, Field
from typing import Optional, ForwardRef

# Pattern 1: Simple recursion (file system)
class FileNode(BaseModel):
    name: str
    type: Literal["file", "directory"]
    size_bytes: Optional[int] = None  # Only for files
    children: list["FileNode"] = Field(default_factory=list)  # Only for directories

    @model_validator(mode="after")
    def validate_structure(self):
        if self.type == "file" and self.children:
            raise ValueError("Files cannot have children")
        if self.type == "directory" and self.size_bytes is not None:
            raise ValueError("Directories don't have direct size")
        return self

FileNode.model_rebuild()

# Pattern 2: Organizational hierarchy
class Employee(BaseModel):
    id: str
    name: str
    title: str
    department: str
    direct_reports: list["Employee"] = Field(default_factory=list)

    def total_reports(self) -> int:
        """Recursively count all reports."""
        return len(self.direct_reports) + sum(
            r.total_reports() for r in self.direct_reports
        )

Employee.model_rebuild()

# Pattern 3: Expression trees (for calculators, query builders)
class Expression(BaseModel):
    type: Literal["number", "variable", "binary_op", "function_call"]

class NumberExpr(Expression):
    type: Literal["number"] = "number"
    value: float

class VariableExpr(Expression):
    type: Literal["variable"] = "variable"
    name: str

class BinaryOpExpr(Expression):
    type: Literal["binary_op"] = "binary_op"
    operator: Literal["+", "-", "*", "/", "^"]
    left: "AnyExpression"
    right: "AnyExpression"

class FunctionCallExpr(Expression):
    type: Literal["function_call"] = "function_call"
    function: str
    arguments: list["AnyExpression"]

AnyExpression = NumberExpr | VariableExpr | BinaryOpExpr | FunctionCallExpr

# Usage: Parse mathematical expressions
expression = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {
            "role": "user",
            "content": "Parse this expression into a tree: (x + 2) * sin(y)"
        }
    ],
    response_model=AnyExpression
)

Dynamic Schema Generation

Generate schemas at runtime based on configuration:

Python
from pydantic import create_model, Field
from typing import Any, Type

def create_extraction_schema(
    fields: dict[str, dict],
    schema_name: str = "DynamicSchema"
) -> Type[BaseModel]:
    """
    Create a Pydantic model from a field configuration.

    fields = {
        "name": {"type": "str", "required": True, "description": "Person's name"},
        "age": {"type": "int", "required": False, "default": None},
        "tags": {"type": "list[str]", "required": False, "default_factory": list}
    }
    """
    type_mapping = {
        "str": str,
        "int": int,
        "float": float,
        "bool": bool,
        "list[str]": list[str],
        "list[int]": list[int],
        "dict": dict[str, Any],
    }

    field_definitions = {}

    for field_name, config in fields.items():
        field_type = type_mapping.get(config["type"], str)

        if config.get("required", True):
            field_definitions[field_name] = (
                field_type,
                Field(..., description=config.get("description", ""))
            )
        else:
            default = config.get("default")
            if "default_factory" in config:
                field_definitions[field_name] = (
                    Optional[field_type],
                    Field(default_factory=config["default_factory"])
                )
            else:
                field_definitions[field_name] = (
                    Optional[field_type],
                    Field(default=default, description=config.get("description", ""))
                )

    return create_model(schema_name, **field_definitions)

# Usage: Create schema from user configuration
user_config = {
    "company_name": {"type": "str", "required": True, "description": "Company name"},
    "revenue": {"type": "float", "required": False, "description": "Annual revenue in USD"},
    "employees": {"type": "int", "required": False, "default": None},
    "industries": {"type": "list[str]", "required": False, "default_factory": list}
}

CompanySchema = create_extraction_schema(user_config, "CompanyInfo")

# Now use it for extraction
company = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {"role": "user", "content": "Acme Corp reported $50M revenue with 200 employees in tech and finance."}
    ],
    response_model=CompanySchema
)

Context-Aware Schema Selection

Choose schemas based on input analysis:

Python
from pydantic import BaseModel, Field
from typing import Union
from enum import Enum

class DocumentType(str, Enum):
    INVOICE = "invoice"
    RECEIPT = "receipt"
    CONTRACT = "contract"
    RESUME = "resume"
    EMAIL = "email"

# Different schemas for different document types
class Invoice(BaseModel):
    document_type: Literal["invoice"] = "invoice"
    invoice_number: str
    vendor: str
    customer: str
    line_items: list[dict[str, Any]]
    subtotal: float
    tax: float
    total: float
    due_date: Optional[str] = None

class Receipt(BaseModel):
    document_type: Literal["receipt"] = "receipt"
    merchant: str
    items: list[dict[str, Any]]
    subtotal: float
    tax: float
    total: float
    payment_method: str
    transaction_date: str

class Resume(BaseModel):
    document_type: Literal["resume"] = "resume"
    name: str
    contact: dict[str, str]
    summary: Optional[str] = None
    experience: list[dict[str, Any]]
    education: list[dict[str, Any]]
    skills: list[str]

class DocumentClassification(BaseModel):
    document_type: DocumentType
    confidence: float = Field(..., ge=0, le=1)
    reasoning: str

class AdaptiveDocumentExtractor:
    """Extract structured data with automatic schema selection."""

    def __init__(self, client):
        self.client = client
        self.schemas = {
            DocumentType.INVOICE: Invoice,
            DocumentType.RECEIPT: Receipt,
            DocumentType.RESUME: Resume,
        }

    def extract(self, document_text: str) -> BaseModel:
        # Step 1: Classify document
        classification = self.client.chat.completions.create(
            model="gpt-4o-mini",  # Fast model for classification
            messages=[
                {
                    "role": "system",
                    "content": "Classify this document type and explain your reasoning."
                },
                {"role": "user", "content": document_text[:2000]}  # First 2000 chars
            ],
            response_model=DocumentClassification
        )

        # Step 2: Extract with appropriate schema
        schema = self.schemas.get(classification.document_type)
        if not schema:
            raise ValueError(f"No schema for document type: {classification.document_type}")

        result = self.client.chat.completions.create(
            model="gpt-4o",  # Better model for extraction
            messages=[
                {
                    "role": "system",
                    "content": f"Extract {classification.document_type.value} information."
                },
                {"role": "user", "content": document_text}
            ],
            response_model=schema
        )

        return result

# Usage
extractor = AdaptiveDocumentExtractor(client)
result = extractor.extract(document_text)
print(f"Extracted {result.document_type}: {result}")

Union Types with Discriminators

Handle polymorphic data cleanly:

Python
from pydantic import BaseModel, Field
from typing import Annotated, Union, Literal

# Different action types an agent might take
class SearchAction(BaseModel):
    action_type: Literal["search"] = "search"
    query: str
    filters: dict[str, str] = Field(default_factory=dict)
    max_results: int = Field(default=10, ge=1, le=100)

class CalculateAction(BaseModel):
    action_type: Literal["calculate"] = "calculate"
    expression: str
    precision: int = Field(default=2, ge=0, le=10)

class EmailAction(BaseModel):
    action_type: Literal["email"] = "email"
    to: list[str]
    subject: str
    body: str
    cc: list[str] = Field(default_factory=list)
    priority: Literal["low", "normal", "high"] = "normal"

class CodeAction(BaseModel):
    action_type: Literal["code"] = "code"
    language: str
    code: str
    description: str

class RespondAction(BaseModel):
    action_type: Literal["respond"] = "respond"
    message: str
    citations: list[str] = Field(default_factory=list)

# Discriminated union - Pydantic will use action_type to determine which model
AgentAction = Annotated[
    Union[SearchAction, CalculateAction, EmailAction, CodeAction, RespondAction],
    Field(discriminator="action_type")
]

class AgentPlan(BaseModel):
    """A plan consisting of multiple actions."""
    reasoning: str = Field(..., description="Explanation of the plan")
    actions: list[AgentAction] = Field(..., description="Ordered list of actions to take")
    estimated_steps: int

# Usage
plan = client.chat.completions.create(
    model="gpt-4o",
    messages=[
        {
            "role": "system",
            "content": """You are a planning agent. Create action plans to accomplish user goals.
            Available actions: search, calculate, email, code, respond"""
        },
        {
            "role": "user",
            "content": "Find the current Bitcoin price, convert it to EUR, and email the result to finance@company.com"
        }
    ],
    response_model=AgentPlan
)

for i, action in enumerate(plan.actions, 1):
    print(f"Step {i}: {action.action_type}")
    if action.action_type == "search":
        print(f"  Query: {action.query}")
    elif action.action_type == "email":
        print(f"  To: {action.to}")
        print(f"  Subject: {action.subject}")

Multi-Tool Orchestration

Building systems where LLMs coordinate multiple tools requires careful schema design and execution patterns.

Tool Registry Pattern

Python
from pydantic import BaseModel, Field
from typing import Callable, Any, TypeVar
from dataclasses import dataclass
import inspect

T = TypeVar("T", bound=BaseModel)

@dataclass
class Tool:
    name: str
    description: str
    parameters_schema: type[BaseModel]
    handler: Callable
    requires_confirmation: bool = False
    timeout_seconds: int = 30

class ToolRegistry:
    """Registry for managing available tools."""

    def __init__(self):
        self.tools: dict[str, Tool] = {}

    def register(
        self,
        name: str = None,
        description: str = None,
        requires_confirmation: bool = False,
        timeout_seconds: int = 30
    ):
        """Decorator to register a tool."""
        def decorator(func: Callable) -> Callable:
            # Extract parameter schema from type hints
            hints = get_type_hints(func)
            params_type = hints.get("params")

            if not params_type or not issubclass(params_type, BaseModel):
                raise ValueError(f"Tool {func.__name__} must have a 'params' argument with Pydantic type")

            tool_name = name or func.__name__
            tool_description = description or func.__doc__ or ""

            self.tools[tool_name] = Tool(
                name=tool_name,
                description=tool_description,
                parameters_schema=params_type,
                handler=func,
                requires_confirmation=requires_confirmation,
                timeout_seconds=timeout_seconds
            )

            return func
        return decorator

    def get_tool_definitions(self) -> list[dict]:
        """Get OpenAI-compatible tool definitions."""
        definitions = []
        for tool in self.tools.values():
            definitions.append({
                "type": "function",
                "function": {
                    "name": tool.name,
                    "description": tool.description,
                    "parameters": tool.parameters_schema.model_json_schema()
                }
            })
        return definitions

    async def execute(self, tool_name: str, params: dict) -> Any:
        """Execute a tool with given parameters."""
        if tool_name not in self.tools:
            raise ValueError(f"Unknown tool: {tool_name}")

        tool = self.tools[tool_name]
        validated_params = tool.parameters_schema.model_validate(params)

        if asyncio.iscoroutinefunction(tool.handler):
            return await asyncio.wait_for(
                tool.handler(validated_params),
                timeout=tool.timeout_seconds
            )
        else:
            return await asyncio.to_thread(tool.handler, validated_params)

# Usage
registry = ToolRegistry()

class SearchParams(BaseModel):
    query: str
    max_results: int = 10

@registry.register(
    name="web_search",
    description="Search the web for information",
    timeout_seconds=15
)
def search_web(params: SearchParams) -> list[dict]:
    # Implementation
    return [{"title": "Result", "url": "https://example.com"}]

class DatabaseQueryParams(BaseModel):
    table: str
    conditions: dict[str, Any]
    limit: int = 100

@registry.register(
    name="query_database",
    description="Query the internal database",
    requires_confirmation=True  # Sensitive operation
)
async def query_database(params: DatabaseQueryParams) -> list[dict]:
    # Implementation
    return [{"id": 1, "data": "..."}]

Intelligent Tool Selection

Let the LLM choose and chain tools:

Python
from pydantic import BaseModel, Field
from typing import Optional
from enum import Enum

class ToolSelectionReasoning(BaseModel):
    """Model's reasoning about which tools to use."""
    user_intent: str = Field(..., description="What the user is trying to accomplish")
    required_information: list[str] = Field(..., description="What information is needed")
    selected_tools: list[str] = Field(..., description="Tools to use, in order")
    tool_reasoning: dict[str, str] = Field(
        ...,
        description="Why each tool was selected"
    )
    parallel_execution: list[list[str]] = Field(
        default_factory=list,
        description="Groups of tools that can run in parallel"
    )

class ToolOrchestrator:
    """Orchestrates multi-tool execution with intelligent planning."""

    def __init__(self, client, registry: ToolRegistry):
        self.client = client
        self.registry = registry

    async def plan_and_execute(self, user_request: str) -> dict:
        # Step 1: Plan tool usage
        tool_descriptions = "\n".join([
            f"- {t.name}: {t.description}"
            for t in self.registry.tools.values()
        ])

        plan = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": f"""You are a tool orchestration planner.

Available tools:
{tool_descriptions}

Analyze the user request and plan which tools to use."""
                },
                {"role": "user", "content": user_request}
            ],
            response_model=ToolSelectionReasoning
        )

        # Step 2: Execute tools based on plan
        results = {}

        # Execute parallel groups first
        for parallel_group in plan.parallel_execution:
            group_tasks = []
            for tool_name in parallel_group:
                if tool_name in plan.selected_tools:
                    # Get parameters from LLM
                    params = await self._get_tool_params(tool_name, user_request, results)
                    group_tasks.append(self._execute_with_name(tool_name, params))

            group_results = await asyncio.gather(*group_tasks, return_exceptions=True)
            for tool_name, result in zip(parallel_group, group_results):
                results[tool_name] = result

        # Execute remaining tools sequentially
        executed = set(t for group in plan.parallel_execution for t in group)
        for tool_name in plan.selected_tools:
            if tool_name not in executed:
                params = await self._get_tool_params(tool_name, user_request, results)
                results[tool_name] = await self.registry.execute(tool_name, params)

        return {
            "plan": plan.model_dump(),
            "results": results
        }

    async def _get_tool_params(
        self,
        tool_name: str,
        user_request: str,
        prior_results: dict
    ) -> dict:
        """Use LLM to generate tool parameters."""
        tool = self.registry.tools[tool_name]

        context = f"User request: {user_request}"
        if prior_results:
            context += f"\n\nPrior tool results: {json.dumps(prior_results, default=str)}"

        params = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": f"Generate parameters for the '{tool_name}' tool based on the context."
                },
                {"role": "user", "content": context}
            ],
            response_model=tool.parameters_schema
        )

        return params.model_dump()

    async def _execute_with_name(self, tool_name: str, params: dict) -> tuple[str, Any]:
        result = await self.registry.execute(tool_name, params)
        return result

Tool Chaining with Dependencies

Handle tools that depend on outputs from other tools:

Python
from pydantic import BaseModel, Field
from typing import Optional, Any
from dataclasses import dataclass
import networkx as nx

@dataclass
class ToolDependency:
    tool_name: str
    depends_on: list[str]  # Tools that must complete first
    output_mapping: dict[str, str]  # How to map outputs to inputs

class ToolChainDefinition(BaseModel):
    """Definition of a tool chain extracted by LLM."""
    steps: list["ToolStep"]
    final_output_tool: str

class ToolStep(BaseModel):
    tool_name: str
    input_mappings: dict[str, str] = Field(
        default_factory=dict,
        description="Map from this tool's param to 'tool_name.output_field'"
    )
    static_inputs: dict[str, Any] = Field(
        default_factory=dict,
        description="Static values for parameters"
    )

class DependencyAwareExecutor:
    """Execute tool chains respecting dependencies."""

    def __init__(self, registry: ToolRegistry):
        self.registry = registry

    def build_execution_graph(self, chain: ToolChainDefinition) -> nx.DiGraph:
        """Build a DAG of tool dependencies."""
        graph = nx.DiGraph()

        for step in chain.steps:
            graph.add_node(step.tool_name, step=step)

            # Add edges for dependencies
            for param, source in step.input_mappings.items():
                if "." in source:
                    source_tool = source.split(".")[0]
                    graph.add_edge(source_tool, step.tool_name)

        if not nx.is_directed_acyclic_graph(graph):
            raise ValueError("Tool chain contains cycles")

        return graph

    async def execute_chain(self, chain: ToolChainDefinition) -> dict[str, Any]:
        """Execute tools in dependency order."""
        graph = self.build_execution_graph(chain)
        results = {}

        # Process in topological order
        for tool_name in nx.topological_sort(graph):
            step = graph.nodes[tool_name].get("step")
            if not step:
                continue

            # Build parameters
            params = dict(step.static_inputs)

            for param, source in step.input_mappings.items():
                if "." in source:
                    source_tool, source_field = source.split(".", 1)
                    source_result = results.get(source_tool, {})
                    params[param] = self._get_nested_value(source_result, source_field)
                else:
                    params[param] = source

            # Execute
            results[tool_name] = await self.registry.execute(tool_name, params)

        return results

    def _get_nested_value(self, obj: Any, path: str) -> Any:
        """Get nested value from dict using dot notation."""
        for key in path.split("."):
            if isinstance(obj, dict):
                obj = obj.get(key)
            elif hasattr(obj, key):
                obj = getattr(obj, key)
            else:
                return None
        return obj

# Example usage
chain = ToolChainDefinition(
    steps=[
        ToolStep(
            tool_name="web_search",
            static_inputs={"query": "Bitcoin current price USD"}
        ),
        ToolStep(
            tool_name="extract_number",
            input_mappings={"text": "web_search.results[0].snippet"}
        ),
        ToolStep(
            tool_name="currency_convert",
            input_mappings={"amount": "extract_number.value"},
            static_inputs={"from_currency": "USD", "to_currency": "EUR"}
        ),
        ToolStep(
            tool_name="send_email",
            input_mappings={"body": "currency_convert.formatted_result"},
            static_inputs={
                "to": ["finance@company.com"],
                "subject": "Bitcoin Price Update"
            }
        )
    ],
    final_output_tool="send_email"
)

Tool Result Aggregation

Combine results from multiple tools:

Python
from pydantic import BaseModel, Field
from typing import Any

class AggregatedResult(BaseModel):
    """Aggregated and synthesized result from multiple tools."""
    summary: str = Field(..., description="Natural language summary of findings")
    sources: list[str] = Field(..., description="Tools that provided data")
    key_facts: list[str] = Field(..., description="Most important facts discovered")
    confidence: float = Field(..., ge=0, le=1, description="Confidence in the aggregation")
    conflicts: list[str] = Field(
        default_factory=list,
        description="Any conflicting information found"
    )
    raw_data: dict[str, Any] = Field(
        default_factory=dict,
        description="Raw data from each tool"
    )

class ResultAggregator:
    """Aggregate and synthesize results from multiple tools."""

    def __init__(self, client):
        self.client = client

    def aggregate(
        self,
        tool_results: dict[str, Any],
        user_question: str
    ) -> AggregatedResult:
        """Use LLM to aggregate tool results into a coherent response."""

        # Format results for the LLM
        formatted_results = []
        for tool_name, result in tool_results.items():
            formatted_results.append(f"## {tool_name}\n{json.dumps(result, default=str, indent=2)}")

        results_text = "\n\n".join(formatted_results)

        aggregation = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[
                {
                    "role": "system",
                    "content": """You are a result aggregation specialist.
                    Synthesize information from multiple tool outputs into a coherent response.
                    Identify key facts, note any conflicts, and assess confidence."""
                },
                {
                    "role": "user",
                    "content": f"""User question: {user_question}

Tool results:
{results_text}

Aggregate these results into a comprehensive response."""
                }
            ],
            response_model=AggregatedResult
        )

        aggregation.raw_data = tool_results
        return aggregation

Advanced Streaming Patterns

Streaming with Real-Time Validation

Python
from pydantic import BaseModel, ValidationError
from typing import Generator, Optional
import json

class StreamingValidator:
    """Validate streaming JSON as it arrives."""

    def __init__(self, schema: type[BaseModel]):
        self.schema = schema
        self.buffer = ""
        self.partial_object = {}
        self.completed_fields = set()

    def process_chunk(self, chunk: str) -> tuple[dict, list[str]]:
        """
        Process a chunk of streaming JSON.
        Returns (partial_object, newly_completed_fields).
        """
        self.buffer += chunk
        newly_completed = []

        # Try to parse as much as possible
        try:
            # Attempt to parse complete JSON
            parsed = json.loads(self.buffer)
            self._update_partial(parsed, newly_completed)
        except json.JSONDecodeError:
            # Try to extract completed fields from partial JSON
            self._extract_partial_fields(newly_completed)

        return self.partial_object, newly_completed

    def _update_partial(self, parsed: dict, newly_completed: list[str]):
        """Update partial object with parsed data."""
        for key, value in parsed.items():
            if key not in self.completed_fields:
                # Validate individual field
                try:
                    field_info = self.schema.model_fields.get(key)
                    if field_info:
                        # Field exists in schema
                        self.partial_object[key] = value
                        self.completed_fields.add(key)
                        newly_completed.append(key)
                except Exception:
                    pass

    def _extract_partial_fields(self, newly_completed: list[str]):
        """Extract fields from incomplete JSON."""
        # Simple heuristic: look for completed key-value pairs
        import re

        # Match completed string fields: "key": "value"
        string_pattern = r'"(\w+)":\s*"([^"]*)"(?:,|\s*})'
        for match in re.finditer(string_pattern, self.buffer):
            key, value = match.groups()
            if key not in self.completed_fields:
                self.partial_object[key] = value
                self.completed_fields.add(key)
                newly_completed.append(key)

        # Match completed number fields: "key": 123
        number_pattern = r'"(\w+)":\s*(\d+(?:\.\d+)?)(?:,|\s*})'
        for match in re.finditer(number_pattern, self.buffer):
            key, value = match.groups()
            if key not in self.completed_fields:
                self.partial_object[key] = float(value) if "." in value else int(value)
                self.completed_fields.add(key)
                newly_completed.append(key)

    def get_validated(self) -> Optional[BaseModel]:
        """Try to validate the partial object against the schema."""
        try:
            return self.schema.model_validate(self.partial_object)
        except ValidationError:
            return None

# Usage with streaming response
def stream_with_validation(
    client,
    messages: list[dict],
    response_model: type[BaseModel]
) -> Generator[tuple[dict, list[str]], None, BaseModel]:
    """Stream structured output with real-time validation."""

    validator = StreamingValidator(response_model)

    response = client.chat.completions.create(
        model="gpt-4o",
        messages=messages,
        response_format={"type": "json_object"},
        stream=True
    )

    for chunk in response:
        if chunk.choices[0].delta.content:
            partial, new_fields = validator.process_chunk(
                chunk.choices[0].delta.content
            )
            if new_fields:
                yield partial, new_fields

    # Final validation
    final = validator.get_validated()
    if final:
        return final
    else:
        raise ValidationError("Failed to validate complete response")

Progressive UI Updates

Pattern for updating UI as structured data streams in:

Python
from pydantic import BaseModel, Field
from typing import Callable, Any
from dataclasses import dataclass
import asyncio

@dataclass
class FieldUpdate:
    field_name: str
    value: Any
    is_complete: bool
    timestamp: float

class ProgressiveOutputHandler:
    """Handle streaming structured outputs with progressive UI updates."""

    def __init__(self):
        self.field_callbacks: dict[str, list[Callable]] = {}
        self.completion_callbacks: list[Callable] = []

    def on_field(self, field_name: str, callback: Callable[[Any, bool], None]):
        """Register callback for when a specific field updates."""
        if field_name not in self.field_callbacks:
            self.field_callbacks[field_name] = []
        self.field_callbacks[field_name].append(callback)

    def on_complete(self, callback: Callable[[BaseModel], None]):
        """Register callback for when extraction completes."""
        self.completion_callbacks.append(callback)

    async def stream_extract(
        self,
        client,
        messages: list[dict],
        response_model: type[BaseModel]
    ):
        """Stream extraction with callbacks."""

        async for partial_result in client.chat.completions.create_partial(
            model="gpt-4o",
            messages=messages,
            response_model=response_model
        ):
            # Check each field for updates
            for field_name in response_model.model_fields:
                value = getattr(partial_result, field_name, None)
                if value is not None and field_name in self.field_callbacks:
                    for callback in self.field_callbacks[field_name]:
                        await self._safe_callback(callback, value, False)

        # Final complete object
        for callback in self.completion_callbacks:
            await self._safe_callback(callback, partial_result)

    async def _safe_callback(self, callback: Callable, *args):
        """Safely execute callback."""
        try:
            if asyncio.iscoroutinefunction(callback):
                await callback(*args)
            else:
                callback(*args)
        except Exception as e:
            print(f"Callback error: {e}")

# Example: Real-time document analysis UI
class DocumentAnalysis(BaseModel):
    title: str
    summary: str
    key_points: list[str]
    sentiment: Literal["positive", "negative", "neutral"]
    entities: list[dict[str, str]]
    word_count: int

async def analyze_document_with_ui(document_text: str):
    handler = ProgressiveOutputHandler()

    # UI callbacks
    def update_title(title: str, complete: bool):
        print(f"📄 Title: {title}")

    def update_summary(summary: str, complete: bool):
        if complete:
            print(f"📝 Summary: {summary}")
        else:
            print(f"📝 Summary (partial): {summary[:100]}...")

    def update_sentiment(sentiment: str, complete: bool):
        emoji = {"positive": "😊", "negative": "😔", "neutral": "😐"}
        print(f"Sentiment: {emoji.get(sentiment, '❓')} {sentiment}")

    def update_key_points(points: list[str], complete: bool):
        print(f"Key points ({len(points)}):")
        for point in points[-3:]:  # Show last 3
            print(f"  • {point}")

    handler.on_field("title", update_title)
    handler.on_field("summary", update_summary)
    handler.on_field("sentiment", update_sentiment)
    handler.on_field("key_points", update_key_points)

    handler.on_complete(lambda result: print(f"\n✅ Analysis complete! Word count: {result.word_count}"))

    await handler.stream_extract(
        client,
        messages=[
            {"role": "system", "content": "Analyze this document."},
            {"role": "user", "content": document_text}
        ],
        response_model=DocumentAnalysis
    )

Streaming with Cancellation

Handle user cancellation during streaming:

Python
import asyncio
from typing import Optional
from contextlib import asynccontextmanager

class CancellableExtraction:
    """Structured extraction that can be cancelled mid-stream."""

    def __init__(self, client):
        self.client = client
        self._cancel_event: Optional[asyncio.Event] = None
        self._current_task: Optional[asyncio.Task] = None

    @asynccontextmanager
    async def extraction_context(self):
        """Context manager for cancellable extraction."""
        self._cancel_event = asyncio.Event()
        try:
            yield self
        finally:
            if self._current_task and not self._current_task.done():
                self._current_task.cancel()
            self._cancel_event = None

    def cancel(self):
        """Cancel the current extraction."""
        if self._cancel_event:
            self._cancel_event.set()

    async def extract_with_progress(
        self,
        messages: list[dict],
        response_model: type[BaseModel],
        on_progress: Callable[[dict, float], None] = None
    ) -> Optional[BaseModel]:
        """Extract with progress updates and cancellation support."""

        partial_result = {}
        total_fields = len(response_model.model_fields)

        try:
            async for partial in self.client.chat.completions.create_partial(
                model="gpt-4o",
                messages=messages,
                response_model=response_model
            ):
                # Check for cancellation
                if self._cancel_event and self._cancel_event.is_set():
                    return None

                # Update progress
                completed_fields = sum(
                    1 for f in response_model.model_fields
                    if getattr(partial, f, None) is not None
                )
                progress = completed_fields / total_fields

                if on_progress:
                    on_progress(partial.model_dump(exclude_unset=True), progress)

                partial_result = partial

            return partial_result

        except asyncio.CancelledError:
            return None

# Usage
async def main():
    extractor = CancellableExtraction(client)

    async with extractor.extraction_context():
        # Start extraction in background
        task = asyncio.create_task(
            extractor.extract_with_progress(
                messages=[{"role": "user", "content": "Analyze this long document..."}],
                response_model=DocumentAnalysis,
                on_progress=lambda p, pct: print(f"Progress: {pct:.0%}")
            )
        )

        # Simulate user cancellation after 2 seconds
        await asyncio.sleep(2)
        extractor.cancel()

        result = await task
        if result is None:
            print("Extraction was cancelled")

Provider-Specific Patterns

OpenAI-Specific Optimizations

Python
from openai import OpenAI

class OpenAIStructuredOutputs:
    """OpenAI-specific structured output patterns."""

    def __init__(self):
        self.client = OpenAI()

    def extract_with_strict_mode(
        self,
        messages: list[dict],
        response_model: type[BaseModel]
    ) -> BaseModel:
        """Use OpenAI's strict structured output mode."""

        # Convert Pydantic to JSON Schema
        schema = response_model.model_json_schema()

        # OpenAI requires additionalProperties: false for strict mode
        def make_strict(s: dict) -> dict:
            if s.get("type") == "object":
                s["additionalProperties"] = False
                if "properties" in s:
                    for prop in s["properties"].values():
                        make_strict(prop)
            if "items" in s:
                make_strict(s["items"])
            return s

        strict_schema = make_strict(schema.copy())

        response = self.client.chat.completions.create(
            model="gpt-4o-2024-08-06",
            messages=messages,
            response_format={
                "type": "json_schema",
                "json_schema": {
                    "name": response_model.__name__,
                    "strict": True,
                    "schema": strict_schema
                }
            }
        )

        return response_model.model_validate_json(
            response.choices[0].message.content
        )

    def batch_extract(
        self,
        items: list[str],
        response_model: type[BaseModel],
        system_prompt: str
    ) -> list[BaseModel]:
        """Use OpenAI batch API for large-scale extraction."""
        import json
        import tempfile

        # Create batch file
        requests = []
        for i, item in enumerate(items):
            requests.append({
                "custom_id": f"item-{i}",
                "method": "POST",
                "url": "/v1/chat/completions",
                "body": {
                    "model": "gpt-4o-mini",
                    "messages": [
                        {"role": "system", "content": system_prompt},
                        {"role": "user", "content": item}
                    ],
                    "response_format": {"type": "json_object"}
                }
            })

        # Write to temp file
        with tempfile.NamedTemporaryFile(mode='w', suffix='.jsonl', delete=False) as f:
            for req in requests:
                f.write(json.dumps(req) + '\n')
            batch_file = f.name

        # Upload and create batch
        with open(batch_file, 'rb') as f:
            file = self.client.files.create(file=f, purpose="batch")

        batch = self.client.batches.create(
            input_file_id=file.id,
            endpoint="/v1/chat/completions",
            completion_window="24h"
        )

        return batch.id  # Poll for completion separately

Anthropic-Specific Patterns

Python
import anthropic

class AnthropicStructuredOutputs:
    """Anthropic-specific structured output patterns."""

    def __init__(self):
        self.client = anthropic.Anthropic()

    def extract_with_tool_use(
        self,
        messages: list[dict],
        response_model: type[BaseModel]
    ) -> BaseModel:
        """Use Anthropic's tool use for structured extraction."""

        # Define extraction as a tool
        tool = {
            "name": "extract_data",
            "description": f"Extract and return structured data as {response_model.__name__}",
            "input_schema": response_model.model_json_schema()
        }

        response = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            tools=[tool],
            tool_choice={"type": "tool", "name": "extract_data"},
            messages=messages
        )

        # Extract tool use result
        for content in response.content:
            if content.type == "tool_use" and content.name == "extract_data":
                return response_model.model_validate(content.input)

        raise ValueError("No tool use in response")

    def extract_with_prefill(
        self,
        user_message: str,
        response_model: type[BaseModel]
    ) -> BaseModel:
        """Use Claude's prefill feature for JSON extraction."""

        schema_hint = json.dumps(response_model.model_json_schema(), indent=2)

        response = self.client.messages.create(
            model="claude-sonnet-4-20250514",
            max_tokens=4096,
            messages=[
                {
                    "role": "user",
                    "content": f"""Extract structured data from this text.

Schema:
```json
{schema_hint}

Text: {user_message}

Return only valid JSON matching the schema.""" }, { "role": "assistant", "content": "{" # Prefill to force JSON output } ] )

Code
    # Complete the JSON
    json_str = "{" + response.content[0].text
    return response_model.model_validate_json(json_str)

def extract_with_thinking(
    self,
    messages: list[dict],
    response_model: type[BaseModel]
) -> tuple[BaseModel, str]:
    """Use extended thinking for complex extraction."""

    response = self.client.messages.create(
        model="claude-sonnet-4-20250514",
        max_tokens=16000,
        thinking={
            "type": "enabled",
            "budget_tokens": 10000
        },
        messages=messages + [
            {
                "role": "user",
                "content": f"Extract data matching this schema: {response_model.model_json_schema()}"
            }
        ]
    )

    thinking_content = ""
    result_content = ""

    for block in response.content:
        if block.type == "thinking":
            thinking_content = block.thinking
        elif block.type == "text":
            result_content = block.text

    # Parse JSON from result
    if "```json" in result_content:
        result_content = result_content.split("```json")[1].split("```")[0]

    return response_model.model_validate_json(result_content), thinking_content
Code

---

## Cost Optimization Strategies

### Model Selection Based on Complexity

```python
from pydantic import BaseModel, Field
from enum import Enum

class ComplexityLevel(str, Enum):
    SIMPLE = "simple"      # Basic extraction, few fields
    MODERATE = "moderate"  # Multiple fields, some nesting
    COMPLEX = "complex"    # Deep nesting, validation, reasoning required

class SchemaComplexityAnalyzer:
    """Analyze schema complexity to choose appropriate model."""

    @staticmethod
    def analyze(schema: type[BaseModel]) -> ComplexityLevel:
        json_schema = schema.model_json_schema()

        # Count metrics
        num_properties = len(json_schema.get("properties", {}))
        max_depth = SchemaComplexityAnalyzer._get_depth(json_schema)
        has_refs = "$defs" in json_schema
        num_required = len(json_schema.get("required", []))

        # Determine complexity
        if max_depth <= 1 and num_properties <= 5 and not has_refs:
            return ComplexityLevel.SIMPLE
        elif max_depth <= 2 and num_properties <= 15:
            return ComplexityLevel.MODERATE
        else:
            return ComplexityLevel.COMPLEX

    @staticmethod
    def _get_depth(schema: dict, current: int = 0) -> int:
        max_depth = current

        if "properties" in schema:
            for prop in schema["properties"].values():
                max_depth = max(max_depth, SchemaComplexityAnalyzer._get_depth(prop, current + 1))

        if "items" in schema:
            max_depth = max(max_depth, SchemaComplexityAnalyzer._get_depth(schema["items"], current + 1))

        return max_depth

class CostOptimizedExtractor:
    """Extract with automatic model selection based on complexity."""

    MODEL_MAP = {
        ComplexityLevel.SIMPLE: "gpt-4o-mini",
        ComplexityLevel.MODERATE: "gpt-4o-mini",
        ComplexityLevel.COMPLEX: "gpt-4o"
    }

    # Approximate costs per 1M tokens
    COST_MAP = {
        "gpt-4o-mini": {"input": 0.15, "output": 0.60},
        "gpt-4o": {"input": 2.50, "output": 10.00}
    }

    def __init__(self, client):
        self.client = client
        self.total_cost = 0.0

    def extract(
        self,
        messages: list[dict],
        response_model: type[BaseModel],
        force_model: str = None
    ) -> tuple[BaseModel, dict]:
        """Extract with cost tracking."""

        # Select model
        if force_model:
            model = force_model
        else:
            complexity = SchemaComplexityAnalyzer.analyze(response_model)
            model = self.MODEL_MAP[complexity]

        # Extract
        response = self.client.chat.completions.create(
            model=model,
            messages=messages,
            response_model=response_model
        )

        # Track cost (approximate)
        # Note: actual token counts would come from response.usage
        input_tokens = sum(len(m.get("content", "")) / 4 for m in messages)
        output_tokens = 500  # Estimate

        costs = self.COST_MAP[model]
        cost = (input_tokens * costs["input"] + output_tokens * costs["output"]) / 1_000_000
        self.total_cost += cost

        return response, {
            "model": model,
            "estimated_cost": cost,
            "total_session_cost": self.total_cost
        }

Caching with Semantic Similarity

Python
from pydantic import BaseModel
import numpy as np
from typing import Optional, TypeVar

T = TypeVar("T", bound=BaseModel)

class SemanticCache:
    """Cache structured outputs with semantic similarity matching."""

    def __init__(self, embedding_client, similarity_threshold: float = 0.95):
        self.embedding_client = embedding_client
        self.threshold = similarity_threshold
        self.cache: list[tuple[np.ndarray, str, BaseModel]] = []  # (embedding, schema_name, result)

    def _get_embedding(self, text: str) -> np.ndarray:
        """Get embedding for text."""
        response = self.embedding_client.embeddings.create(
            model="text-embedding-3-small",
            input=text
        )
        return np.array(response.data[0].embedding)

    def _cosine_similarity(self, a: np.ndarray, b: np.ndarray) -> float:
        """Calculate cosine similarity."""
        return np.dot(a, b) / (np.linalg.norm(a) * np.linalg.norm(b))

    def get(self, query: str, schema: type[T]) -> Optional[T]:
        """Check cache for similar query."""
        query_embedding = self._get_embedding(query)
        schema_name = schema.__name__

        for cached_embedding, cached_schema, cached_result in self.cache:
            if cached_schema != schema_name:
                continue

            similarity = self._cosine_similarity(query_embedding, cached_embedding)
            if similarity >= self.threshold:
                # Validate cached result still matches schema
                try:
                    return schema.model_validate(cached_result.model_dump())
                except:
                    continue

        return None

    def set(self, query: str, schema: type[BaseModel], result: BaseModel):
        """Add result to cache."""
        embedding = self._get_embedding(query)
        self.cache.append((embedding, schema.__name__, result))

        # Limit cache size
        if len(self.cache) > 1000:
            self.cache = self.cache[-500:]

class CachedExtractor:
    """Extractor with semantic caching."""

    def __init__(self, client, embedding_client):
        self.client = client
        self.cache = SemanticCache(embedding_client)
        self.cache_hits = 0
        self.cache_misses = 0

    def extract(
        self,
        query: str,
        response_model: type[T],
        use_cache: bool = True
    ) -> tuple[T, bool]:
        """Extract with caching. Returns (result, was_cached)."""

        if use_cache:
            cached = self.cache.get(query, response_model)
            if cached:
                self.cache_hits += 1
                return cached, True

        self.cache_misses += 1

        result = self.client.chat.completions.create(
            model="gpt-4o",
            messages=[{"role": "user", "content": query}],
            response_model=response_model
        )

        if use_cache:
            self.cache.set(query, response_model, result)

        return result, False

    def get_stats(self) -> dict:
        """Get cache statistics."""
        total = self.cache_hits + self.cache_misses
        return {
            "cache_hits": self.cache_hits,
            "cache_misses": self.cache_misses,
            "hit_rate": self.cache_hits / total if total > 0 else 0,
            "cache_size": len(self.cache.cache)
        }

Common Pitfalls and Solutions

Pitfall 1: Overly Complex Schemas

Python
# BAD: Deeply nested schema that confuses the model
class BadSchema(BaseModel):
    data: dict[str, dict[str, list[dict[str, Any]]]]

# GOOD: Flatten and simplify
class GoodSchema(BaseModel):
    items: list["Item"]

class Item(BaseModel):
    id: str
    name: str
    value: float

Pitfall 2: Ambiguous Field Names

Python
# BAD: Ambiguous field name
class BadSchema(BaseModel):
    date: str  # What format? Created date? Due date?

# GOOD: Specific field name with description
class GoodSchema(BaseModel):
    created_at: str = Field(
        ...,
        description="Creation timestamp in ISO 8601 format (YYYY-MM-DDTHH:MM:SSZ)"
    )

Pitfall 3: Not Handling Empty/Null Cases

Python
# BAD: Assumes data always exists
class BadSchema(BaseModel):
    items: list[str]  # What if no items found?

# GOOD: Handle empty cases explicitly
class GoodSchema(BaseModel):
    items: list[str] = Field(default_factory=list)
    no_results_reason: Optional[str] = Field(
        None,
        description="If no items found, explain why"
    )

Pitfall 4: Inconsistent Enum Values

Python
# BAD: Free-form string that will vary
class BadSchema(BaseModel):
    status: str  # "Active", "active", "ACTIVE", "enabled"...

# GOOD: Constrained enum
class Status(str, Enum):
    ACTIVE = "active"
    INACTIVE = "inactive"
    PENDING = "pending"

class GoodSchema(BaseModel):
    status: Status

Conclusion

Structured outputs transform LLMs from text generators into reliable data extraction and processing tools. The key principles:

  1. Use the strongest guarantees available: Native structured outputs > Instructor > JSON mode > prompting
  2. Design schemas for clarity: Good descriptions, specific field names, appropriate types
  3. Handle errors gracefully: Retries, fallbacks, partial extraction
  4. Monitor in production: Track success rates, latencies, common failures
  5. Test thoroughly: Schema validation, edge cases, round-trip serialization

Start with Instructor for most use cases—it provides the best balance of reliability, flexibility, and provider support.

Frequently Asked Questions

Enrico Piovano, PhD

Co-founder & CTO at Goji AI. Former Applied Scientist at Amazon (Alexa & AGI), focused on Agentic AI and LLMs. PhD in Electrical Engineering from Imperial College London. Gold Medalist at the National Mathematical Olympiad.

Related Articles