Part 8: Wrap the agent in Temporal

The research agent now works as a normal Python process. It can still fail mid-run if the OpenAI API times out, Elasticsearch is briefly unavailable, or the process restarts. Temporal is useful here for the same reason it helped the ingestion pipeline: a long-running workflow should preserve state and retry work instead of starting from scratch.

Pydantic AI includes a Temporal integration. Add Temporal to the agent/ project:

uv add temporalio

Put this code in agent.py. Start with imports:

import uuid

from typing import Any
from pydantic import TypeAdapter
from pydantic_ai import Agent, RunContext
from pydantic_ai.messages import FunctionToolCallEvent, ModelMessage
from pydantic_ai.durable_exec.temporal import (
    TemporalAgent,
    PydanticAIPlugin,
    PydanticAIWorkflow,
    TemporalRunContext
)

Import Temporal and pass through modules that create external clients:

from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker

with workflow.unsafe.imports_passed_through():
    from tools import SearchTools, SummarizationTools
    from elasticsearch import Elasticsearch

Keep the NamedCallback, summarization_instructions, create_summarization_agent, and research_instructions from the notebook. Keep them in agent.py so the final file can create the whole agent from one function.

Create a TemporalAgent

First create the normal Pydantic AI agent:

def create_agent() -> Agent:
    es_client = Elasticsearch("http://localhost:9200")
    index_name = "podcasts"

    search_tools = SearchTools(
        es_client=es_client,
        index_name=index_name
    )

    summarization_agent = create_summarization_agent()

Create the summarization tools and the main agent:

    summarization_tools = SummarizationTools(
        search_tools=search_tools,
        summarization_agent=summarization_agent
    )

    agent = Agent(
        name='research_agent',
        instructions=research_instructions,
        model='openai:gpt-4o-mini',
        tools=[search_tools.search_videos, summarization_tools.summarize]
    )

At first it looks like we can return TemporalAgent(agent). There is one more issue: summarize uses RunContext, and that context is not fully serializable in the form Temporal needs. We need a custom run context adapter.

Serialize run context

Add a nested AppRunContext class inside create_agent. It keeps the messages and resets retries:

    class AppRunContext(TemporalRunContext):
        @classmethod
        def serialize_run_context(cls, ctx: RunContext) -> dict:
            return {
                'messages': ctx.messages,
                'retries': {},
            }

Deserialize messages back into Pydantic AI model message objects:

        @classmethod
        def deserialize_run_context(cls, serialized: dict, deps: Any) -> 'AppRunContext':
            serialized.pop('deps', None)

            if 'messages' in serialized:
                serialized_messages = TypeAdapter(list[ModelMessage]).validate_python(
                    serialized['messages']
                )
                serialized['messages'] = serialized_messages

            return cls(**serialized, deps=deps)

Return the Temporal-wrapped agent:

    return TemporalAgent(agent, run_context_type=AppRunContext)

The custom context is what lets the summarization tool read prior messages inside a durable Temporal execution.

Define the workflow

Create a module-level temporal_agent. The worker needs this object later so it can register the generated Pydantic AI activities:

temporal_agent = create_agent()

Define the workflow:

@workflow.defn
class ResearchWorkflow(PydanticAIWorkflow):

    @workflow.run
    async def run(self, prompt: str) -> str:
        result = await temporal_agent.run(prompt)
        return result.output

The workflow body is short because Pydantic AI turns model calls and tool calls into Temporal-compatible operations.

Run the worker and workflow together

Connect to Temporal with the Pydantic AI plugin:

async def run():
    client = await Client.connect(
        'localhost:7233',
        plugins=[PydanticAIPlugin()],
    )

    prompt = "how do I get started with machine learning?"
    activities = temporal_agent.temporal_activities

Start a worker and submit a workflow execution:

    async with Worker(
        client,
        task_queue='research',
        workflows=[ResearchWorkflow],
        activities=activities,
    ):
        output = await client.execute_workflow(
            ResearchWorkflow.run,
            args=(prompt,),
            id=f'research-{uuid.uuid4()}',
            task_queue='research',
        )

    print("FINAL OUTPUT:")
    print(output)

Add the entry point:

if __name__ == "__main__":
    import asyncio
    asyncio.run(run())

Run it:

uv run python agent.py

Open Temporal UI again:

http://localhost:8233

When this runs, the workflow sends a request, invokes tools, sends another request, and invokes another tool. The final output can answer how to get started with machine learning, but it still needs stronger references. That becomes a future improvement: use structured output to force source objects instead of trusting prose instructions alone.

Questions & Answers (0)

Sign in to ask questions