Part 8: Wrap the agent in Temporal
The research agent now works as a normal Python process. It can still fail mid-run if the OpenAI API times out, Elasticsearch is briefly unavailable, or the process restarts. Temporal is useful here for the same reason it helped the ingestion pipeline: a long-running workflow should preserve state and retry work instead of starting from scratch.
Pydantic AI includes a Temporal integration. Add Temporal to the agent/
project:
uv add temporalio
Put this code in agent.py. Start with imports:
import uuid
from typing import Any
from pydantic import TypeAdapter
from pydantic_ai import Agent, RunContext
from pydantic_ai.messages import FunctionToolCallEvent, ModelMessage
from pydantic_ai.durable_exec.temporal import (
TemporalAgent,
PydanticAIPlugin,
PydanticAIWorkflow,
TemporalRunContext
)
Import Temporal and pass through modules that create external clients:
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
with workflow.unsafe.imports_passed_through():
from tools import SearchTools, SummarizationTools
from elasticsearch import Elasticsearch
Keep the NamedCallback, summarization_instructions,
create_summarization_agent, and research_instructions from the notebook.
Keep them in agent.py so the final file can create the whole agent from one
function.
Create a TemporalAgent
First create the normal Pydantic AI agent:
def create_agent() -> Agent:
es_client = Elasticsearch("http://localhost:9200")
index_name = "podcasts"
search_tools = SearchTools(
es_client=es_client,
index_name=index_name
)
summarization_agent = create_summarization_agent()
Create the summarization tools and the main agent:
summarization_tools = SummarizationTools(
search_tools=search_tools,
summarization_agent=summarization_agent
)
agent = Agent(
name='research_agent',
instructions=research_instructions,
model='openai:gpt-4o-mini',
tools=[search_tools.search_videos, summarization_tools.summarize]
)
At first it looks like we can return TemporalAgent(agent). There is one more
issue: summarize uses RunContext, and that context is not fully
serializable in the form Temporal needs. We need a custom run context adapter.
Serialize run context
Add a nested AppRunContext class inside create_agent. It keeps the
messages and resets retries:
class AppRunContext(TemporalRunContext):
@classmethod
def serialize_run_context(cls, ctx: RunContext) -> dict:
return {
'messages': ctx.messages,
'retries': {},
}
Deserialize messages back into Pydantic AI model message objects:
@classmethod
def deserialize_run_context(cls, serialized: dict, deps: Any) -> 'AppRunContext':
serialized.pop('deps', None)
if 'messages' in serialized:
serialized_messages = TypeAdapter(list[ModelMessage]).validate_python(
serialized['messages']
)
serialized['messages'] = serialized_messages
return cls(**serialized, deps=deps)
Return the Temporal-wrapped agent:
return TemporalAgent(agent, run_context_type=AppRunContext)
The custom context is what lets the summarization tool read prior messages inside a durable Temporal execution.
Define the workflow
Create a module-level temporal_agent. The worker needs this object later so
it can register the generated Pydantic AI activities:
temporal_agent = create_agent()
Define the workflow:
@workflow.defn
class ResearchWorkflow(PydanticAIWorkflow):
@workflow.run
async def run(self, prompt: str) -> str:
result = await temporal_agent.run(prompt)
return result.output
The workflow body is short because Pydantic AI turns model calls and tool calls into Temporal-compatible operations.
Run the worker and workflow together
Connect to Temporal with the Pydantic AI plugin:
async def run():
client = await Client.connect(
'localhost:7233',
plugins=[PydanticAIPlugin()],
)
prompt = "how do I get started with machine learning?"
activities = temporal_agent.temporal_activities
Start a worker and submit a workflow execution:
async with Worker(
client,
task_queue='research',
workflows=[ResearchWorkflow],
activities=activities,
):
output = await client.execute_workflow(
ResearchWorkflow.run,
args=(prompt,),
id=f'research-{uuid.uuid4()}',
task_queue='research',
)
print("FINAL OUTPUT:")
print(output)
Add the entry point:
if __name__ == "__main__":
import asyncio
asyncio.run(run())
Run it:
uv run python agent.py
Open Temporal UI again:
http://localhost:8233
When this runs, the workflow sends a request, invokes tools, sends another request, and invokes another tool. The final output can answer how to get started with machine learning, but it still needs stronger references. That becomes a future improvement: use structured output to force source objects instead of trusting prose instructions alone.