Home / Companies / Temporal / Uncategorized / Page Details

Structured Outputs with Temporal and OpenAI | Temporal Platform Documentation

Uncategorized page from Temporal

Page Details
Company
Word Count
1,029
Language
English
Contains Code
Unknown
Date Parsed
2025-12-18
Text

On this page

The OpenAI Responses API provides the Structured Outputs API allowing you to request responses conforming to a specific data structure.

In this example, we use structured outputs in a business data cleaning scenario. Structured outputs are also commonly used for tool calling.

OpenAI usually returns the correct type. However, this is not always the case due to the non-deterministic nature of LLMs. When OpenAI returns an incorrect type, Temporal automatically retries the LLM call Activity.

Invoke Model Activity

We create a model-calling Activity that uses the responses.parse method of the OpenAI client.

Key challenges are related to serialization:

  1. In InvokeModelRequest the response_format field is a class reference. We provide custom Pydantic serialization and deserialization logic.
  2. In InvokeModelResponse the response_model must be deserialized to the correct type. We serialize the type in one field and the model, represented as a dictionary, in another.
from temporalio import activity
from openai import AsyncOpenAI
from typing import Optional, List, cast, Any, TypeVar, Generic
from typing_extensions import Annotated
from pydantic import BaseModel
from pydantic.functional_validators import BeforeValidator
from pydantic.functional_serializers import PlainSerializer

import importlib

T = TypeVar("T", bound=BaseModel)


def _coerce_class(v: Any) -> type[Any]:
    """Pydantic validator: convert string path to class during deserialization."""
    if isinstance(v, str):
        mod_path, sep, qual = v.partition(":")
        if not sep:  # support "package.module.Class"
            mod_path, _, qual = v.rpartition(".")
        module = importlib.import_module(mod_path)
        obj = module
        for attr in qual.split("."):
            obj = getattr(obj, attr)
        return cast(type[Any], obj)
    elif isinstance(v, type):
        return v
    else:
        raise ValueError(f"Cannot coerce {v} to class")


def _dump_class(t: type[Any]) -> str:
    """Pydantic serializer: convert class to string path during serialization."""
    return f"{t.__module__}:{t.__qualname__}"


# Custom type that automatically handles class <-> string conversion in Pydantic serialization
ClassReference = Annotated[
    type[T],
    BeforeValidator(_coerce_class),
    PlainSerializer(_dump_class, return_type=str),
]


class InvokeModelRequest(BaseModel, Generic[T]):
    model: str
    instructions: str
    input: str
    response_format: Optional[ClassReference[T]] = None
    tools: Optional[List[dict]] = None


class InvokeModelResponse(BaseModel, Generic[T]):
    # response_format records the type of the response model
    response_format: Optional[ClassReference[T]] = None
    response_model: Any

    @property
    def response(self) -> T:
        """Reconstruct the original response type if response_format was provided."""
        if self.response_format:
            model_cls = self.response_format
            return model_cls.model_validate(self.response_model)
        return self.response_model


@activity.defn
async def invoke_model(request: InvokeModelRequest[T]) -> InvokeModelResponse[T]:
    client = AsyncOpenAI(max_retries=0)

    kwargs = {
        "model": request.model,
        "instructions": request.instructions,
        "input": request.input,
    }

    if request.response_format:
        kwargs["text_format"] = request.response_format

    if request.tools:
        kwargs["tools"] = request.tools

    # Use responses API consistently
    resp = await client.responses.parse(**kwargs)

    if request.response_format:
        # Convert structured response to dict for managed serialization.
        # This allows us to reconstruct the original response type while maintaining type safety.
        parsed_model = cast(BaseModel, resp.output_parsed)
        return InvokeModelResponse(
            response_model=parsed_model.model_dump(),
            response_format=request.response_format,
        )
    else:
        return InvokeModelResponse(
            response_model=resp.output_text, response_format=None
        )

Workflow

We define the Business class as a Pydantic model. We use the Pydantic's EmailStr type for the email field. For the phone field, we use a custom validator to ensure the phone number is in E.164 format.

The validators should check for obvious structural errors that LLMs will only get wrong sporadically. If the LLM produces invalid responses consistently, Activity retries will fail consistently. To mitigate the cost of such futile retries, we limit the number of retry attempts when using structured outputs.

from pydantic import BaseModel, Field, field_validator, EmailStr
from pydantic_core import PydanticCustomError
import re
from temporalio import workflow
from activities import invoke_model
from activities.invoke_model import InvokeModelRequest
from typing import List, Optional
from datetime import timedelta
from temporalio.common import RetryPolicy

class Business(BaseModel):
    name: Optional[str] = Field(
        None,
        description="The business name",
        json_schema_extra={"example": "Acme Corporation"},
    )
    email: Optional[EmailStr] = Field(
        None,
        description="Primary business email address",
        json_schema_extra={"example": "[email protected]"},
    )
    phone: Optional[str] = Field(
        None,
        description="Primary business phone number in E.164 format",
        json_schema_extra={"example": "+12025550173"},
    )
    address: Optional[str] = Field(
        None,
        description="Business mailing address",
        json_schema_extra={
            "example": "123 Business Park Dr, Suite 100, New York, NY 10001"
        },
    )
    website: Optional[str] = Field(
        None,
        description="Business website URL",
        json_schema_extra={"example": "https://www.acmecorp.com"},
    )
    industry: Optional[str] = Field(
        None,
        description="Business industry or sector",
        json_schema_extra={"example": "Technology"},
    )

    @field_validator("phone", mode="before")
    def validate_phone(cls, v):
        # Allow None values
        if v is None:
            return None

        if isinstance(v, str):
            v = v.strip()
            # Allow empty strings to be converted to None for optional fields
            if not v:
                return None

            # E.164 format: + followed by 1-9, then 9-15 more digits
            e164_pattern = r"^\+[1-9]\d{9,15}$"

            if not re.match(e164_pattern, v):
                raise PydanticCustomError(
                    "phone_format",
                    "Phone number must be in E.164 format (e.g., +12025550173)",
                    {"invalid_phone": v},
                )

        return v

    @field_validator("name", mode="before")
    def validate_name(cls, v):
        # Allow None values
        if v is None:
            return None

        if isinstance(v, str):
            v = v.strip()
            # Convert empty strings to None (this is acceptable)
            if not v:
                return None

        return v


class BusinessList(BaseModel):
    businesses: List[Business]


@workflow.defn
class CleanDataWorkflow:
    @workflow.run
    async def run(self, data: str) -> BusinessList:
        results = await workflow.execute_activity(
            invoke_model.invoke_model,
            InvokeModelRequest(
                model="gpt-4o",
                instructions=f"""Extract and clean business data with these specific rules:

1. BUSINESS NAME: Extract the main business name, normalize capitalization (Title Case for proper nouns)
2. EMAIL:
   - Extract only ONE primary email address
   - If multiple emails, choose the one marked as "primary" or the first valid one
   - Validate format (must have @ and valid domain with .)
   - Set to null if invalid (e.g., "bob@email", "NONE PROVIDED")
3. PHONE:
   - Convert to E.164 format (+1 prefix for US numbers, add if not provided)
   - Convert letters to numbers where appropriate (e.g., "1-800-FLOWERS" → "+18003569377")
   - Set to null if cannot be converted to valid E.164 format
   - Examples: "(555) 123-4567" → "+15551234567", "555 234 5678 ext 349i" → null (invalid), "5551234567" → "+15551234567"
4. ADDRESS:
   - Provide complete, standardized address
   - Set to null if vague/incomplete (e.g., "north end of main st", "unknown", "[PRIVATE]")
5. WEBSITE:
   - Standardize to https:// format
   - Remove "www." prefix, add https:// if missing
   - Set to null if broken/invalid (e.g., "broken-link.com/404", "down for maintenance")
6. INDUSTRY:
   - Use clear, professional industry categories
   - Normalize similar terms (e.g., "fix cars and trucks" → "Automotive Repair")

Return null for any field that cannot be reliably extracted or validated.""",
                input=data,
                response_format=BusinessList,
            ),
            start_to_close_timeout=timedelta(seconds=300),
            retry_policy=RetryPolicy(
                maximum_attempts=3,
            ),
            summary="Clean data",
        )
        return results.response


Running

Start the Temporal Dev Server:

temporal server start-dev

Run the worker:

uv run python -m worker

Start execution:

uv run python -m start_workflow

Analysis

Looking through this documentation, I found several clear issues that need to be fixed:


Location: In the "Invoke Model Activity" section, in the invoke_model function

Context:

    if request.response_format:
        kwargs["text_format"] = request.response_format

    if request.tools:
        kwargs["tools"] = request.tools

    # Use responses API consistently
    resp = await client.responses.parse(**kwargs)

Problem: The code uses kwargs["text_format"] but the OpenAI responses.parse API expects response_format, not text_format. This would cause a runtime error.

Fix: Change kwargs["text_format"] = request.response_format to kwargs["response_format"] = request.response_format


Location: In the "Invoke Model Activity" section, in the invoke_model function

Context:

    # Use responses API consistently
    resp = await client.responses.parse(**kwargs)

    if request.response_format:
        # Convert structured response to dict for managed serialization.
        # This allows us to reconstruct the original response type while maintaining type safety.
        parsed_model = cast(BaseModel, resp.output_parsed)
        return InvokeModelResponse(
            response_model=parsed_model.model_dump(),
            response_format=request.response_format,
        )
    else:
        return InvokeModelResponse(
            response_model=resp.output_text, response_format=None
        )

Problem: The code references resp.output_parsed and resp.output_text, but the OpenAI responses.parse API returns a ParsedCompletion object with a parsed attribute, not output_parsed or output_text. This would cause AttributeError at runtime.

Fix: Change resp.output_parsed to resp.parsed and resp.output_text to resp.parsed (or handle the text case differently based on the actual API response structure)


Location: In the "Invoke Model Activity" section, in the invoke_model function

Context:

    kwargs = {
        "model": request.model,
        "instructions": request.instructions,
        "input": request.input,
    }

Problem: The OpenAI responses.parse API expects messages parameter, not instructions and input as separate parameters. This would cause a TypeError at runtime.

Fix: The kwargs should be structured to match the OpenAI API, likely something like:

kwargs = {
    "model": request.model,
    "messages": [
        {"role": "system", "content": request.instructions},
        {"role": "user", "content": request.input}
    ],
}
Product Messaging

No product messaging analysis available for this page.

Competitive Analysis

No competitive analysis available for this page.