Part 8: Wrap the agent in Temporal
The research agent now works as a normal Python process. It can still fail mid-run if the OpenAI API times out, Elasticsearch is briefly unavailable, or the process restarts. Temporal is useful here for the same reason it helped the ingestion pipeline. A long-running workflow should preserve state and retry work instead of starting from scratch.
Pydantic AI includes a Temporal integration.
Add Temporal to the agent/ project:
uv add temporalio
Put this code in agent.py.
Start with these imports:
import uuid
from typing import Any
from pydantic import TypeAdapter
from pydantic_ai import Agent, RunContext
from pydantic_ai.messages import FunctionToolCallEvent, ModelMessage
from pydantic_ai.durable_exec.temporal import (
TemporalAgent,
PydanticAIPlugin,
PydanticAIWorkflow,
TemporalRunContext
)
Import Temporal and pass through modules that create external clients:
from temporalio import workflow
from temporalio.client import Client
from temporalio.worker import Worker
with workflow.unsafe.imports_passed_through():
from tools import SearchTools, SummarizationTools
from elasticsearch import Elasticsearch
Keep the NamedCallback, summarization_instructions,
create_summarization_agent, and research_instructions from the notebook.
Keep them in agent.py so the final file can create the whole agent from one
function.
Create a TemporalAgent
First create the normal Pydantic AI agent:
def create_agent() -> Agent:
es_client = Elasticsearch("http://localhost:9200")
index_name = "podcasts"
search_tools = SearchTools(
es_client=es_client,
index_name=index_name
)
summarization_agent = create_summarization_agent()
Create the summarization tools and the main agent:
summarization_tools = SummarizationTools(
search_tools=search_tools,
summarization_agent=summarization_agent
)
agent = Agent(
name='research_agent',
instructions=research_instructions,
model='openai:gpt-4o-mini',
tools=[search_tools.search_videos, summarization_tools.summarize]
)
At first it looks like we can return TemporalAgent(agent). There is one more
issue: summarize uses RunContext, and that context is not fully
serializable in the form Temporal needs. We need a custom run context adapter.
Serialize run context
Add a nested AppRunContext class inside create_agent.
It keeps the messages and resets retries:
class AppRunContext(TemporalRunContext):
@classmethod
def serialize_run_context(cls, ctx: RunContext) -> dict:
return {
'messages': ctx.messages,
'retries': {},
}
Deserialize messages back into Pydantic AI model message objects:
@classmethod
def deserialize_run_context(cls, serialized: dict, deps: Any) -> 'AppRunContext':
serialized.pop('deps', None)
if 'messages' in serialized:
serialized_messages = TypeAdapter(list[ModelMessage]).validate_python(
serialized['messages']
)
serialized['messages'] = serialized_messages
return cls(**serialized, deps=deps)
Return the Temporal-wrapped agent:
return TemporalAgent(agent, run_context_type=AppRunContext)
The custom context is what lets the summarization tool read prior messages inside a durable Temporal execution.
Define the workflow
Create a module-level temporal_agent.
The worker needs this object later so it can register the generated Pydantic AI activities.
Create it at module scope:
temporal_agent = create_agent()
Define the Temporal workflow:
@workflow.defn
class ResearchWorkflow(PydanticAIWorkflow):
@workflow.run
async def run(self, prompt: str) -> str:
result = await temporal_agent.run(prompt)
return result.output
The workflow body is short because Pydantic AI turns model calls and tool calls into Temporal-compatible operations.
Run the worker and workflow together
Connect to Temporal with the Pydantic AI plugin:
async def run():
client = await Client.connect(
'localhost:7233',
plugins=[PydanticAIPlugin()],
)
prompt = "how do I get started with machine learning?"
activities = temporal_agent.temporal_activities
Start a worker and submit a workflow execution:
async with Worker(
client,
task_queue='research',
workflows=[ResearchWorkflow],
activities=activities,
):
output = await client.execute_workflow(
ResearchWorkflow.run,
args=(prompt,),
id=f'research-{uuid.uuid4()}',
task_queue='research',
)
print("FINAL OUTPUT:")
print(output)
Add the entry point:
if __name__ == "__main__":
import asyncio
asyncio.run(run())
Run the agent workflow:
uv run python agent.py
Open Temporal UI again:
http://localhost:8233
When this runs, the workflow sends a request and invokes tools. Then it sends another request and invokes another tool. The final output can answer how to get started with machine learning. It still needs stronger references. That becomes a future improvement: use structured output to force source objects instead of trusting text instructions alone.