Async and await
In the previous lesson, we added a topic guardrail in front of the FAQ agent, but our runner code is sequential.
The runner checks the guardrail before it starts the FAQ agent:
async def run(self, question: str) -> str:
for guardrail in self.input_guardrails:
decision = await guardrail.check_input(question)
if decision.fail:
return f"[INPUT BLOCKED] {decision.reasoning}"
return await self.agent.run(question)
First we run the guardrail, then we run the FAQ agent.
For some applications, that's okay: the code is simple, and the agent never starts until the guardrail passes.
But this adds latency, because the guardrail makes one model call and the FAQ agent makes another. When we run them one after another, the user waits for both calls.
It gets worse when we add more checks. If several checks run one after another, the user waits for every check before the agent even starts.
We want to run the guardrail and the FAQ agent in parallel:
- Start the guardrail check.
- Start the FAQ agent.
- If the guardrail blocks the request, cancel the agent task.
We'll use async/await for that. First we await one async call, then we
start multiple calls at the same time with asyncio.create_task.
Fake agent
Before we change the real class, we'll use smaller functions to explain the async pattern.
We create two fake async functions:
fake_agent- for the agentfake_guardrail- for the guardrail
First create the fake agent:
import asyncio
async def fake_agent(question: str) -> str:
print("[agent] start")
await asyncio.sleep(2)
print("[agent] done")
return f"answer for: {question}"
We use asyncio.sleep(2) to fake a slow model call. Later we'll replace
that wait with a real OpenAI API call.
Async and await
Call the fake agent:
call = fake_agent("hello")
call
Because we added the keyword async to the function definition, the
function returns a coroutine object.
A coroutine object stores an async call that we can run later. It keeps the function and the arguments.
The function body isn't running yet, so to run it we await the coroutine:
answer = await call
answer
The code runs, and Python waits until it finishes and gives us the result.
If you create a coroutine and never await it, Python may show this warning:
RuntimeWarning: coroutine 'fake_agent' was never awaited
In practice, we usually create and await the coroutine in one line:
result = await fake_agent("hello")
Event loop
When we await, the current async task reaches a waiting point. Python
needs to remember that task, run other ready tasks, and come back when
the result arrives.
Internally, Python uses the event loop, which is the scheduler for async tasks.
It keeps track of all async tasks:
- It maintains a queue of tasks that need to run.
- Each task is a coroutine in one state: ready to run, running, or waiting.
At a high level, it does this:
- Pick a ready task.
- Run it until it finishes or reaches
await. - If it reaches
await, mark that task as waiting. - Pick another task from the queue.
- If there are no tasks, wait until there's work in the queue.
In our fake agent, the wait happens at await asyncio.sleep(2):
async def fake_agent(question: str) -> str:
print("[agent] start")
await asyncio.sleep(2)
print("[agent] done")
return f"answer for: {question}"
During that sleep, the code isn't doing useful work, so the event loop can run another task instead of sitting idle.
In the real FAQ agent, the wait is the OpenAI API call in both the guardrail function and the agent. Both wait for the API to return a response.
During that wait, the event loop can run something else. For example, if we're already running the guardrail call to the LLM, it can run the main agent.
But if we used time.sleep(2) instead of asyncio.sleep, Python would
block the thread. The task wouldn't change status to "waiting", so the
event loop can't pick anything else. It has to wait until the sleep
finishes.
That's why we use the async OpenAI client: while one OpenAI request is waiting, we can run another request.
Guardrail
We discussed async/await, so now let's add the second fake routine: the guardrail.
In the real code, this would be guardrail.check_input(question).
We'll use the same GuardrailDecision return type:
async def fake_guardrail(question: str) -> GuardrailDecision:
print("[guardrail] start")
await asyncio.sleep(1)
print("[guardrail] pass")
return GuardrailDecision(
reasoning="The question is allowed.",
fail=False,
)
It's similar to our fake agent, but faster: we sleep only for 1 second.
Run sequentially
Wrap this version in a function:
async def run_sequentially(question: str) -> str:
decision = await fake_guardrail(question)
if decision.fail:
return f"[INPUT BLOCKED] {decision.reasoning}"
answer = await fake_agent(question)
return answer
Run it:
await run_sequentially("hello")
The code takes about three seconds. The guardrail waits for one second, then the agent waits for two more seconds.
This matches the real runner: one guardrail call and one agent call. The second call still starts only after the first one finishes.
Start both tasks
With await, Python starts the coroutine and waits for the result before
moving to the next line:
decision = await fake_guardrail("hello")
Remember that without await, this only creates a coroutine object and
won't run the function body.
So with only await, we can't run two model calls in parallel. We need a
way to start the guardrail and the agent at the same time.
We use asyncio.create_task for that:
question = "hello"
guardrail_task = asyncio.create_task(fake_guardrail(question))
agent_task = asyncio.create_task(fake_agent(question))
A task is a coroutine scheduled on the event loop. When we create a task, we add it to the queue, so the event loop can pick it up.
After we create both tasks, the flow is:
- The event loop runs the guardrail until it reaches
await. - While the guardrail is waiting, the event loop picks the next task.
- The next task is the agent, which also runs until it reaches
await. - Now both tasks are waiting, so the event loop waits too.
- The guardrail finishes waiting first because it sleeps only for 1 second.
- The event loop resumes the guardrail and gets the decision.
- Later, the agent finishes waiting after 2 seconds.
- The event loop resumes the agent and gets the answer.
Await task results
Notice the call to create_task:
question = "hello"
guardrail_task = asyncio.create_task(fake_guardrail(question))
We pass the coroutine to create_task, we don't await:
# this won't work
guardrail_task = asyncio.create_task(await fake_guardrail("hello"))
create_task schedules the coroutine and returns an asyncio.Task
object.
It's not the final result yet, it's a wrapper around a coroutine. It allows Python to know if the coroutine is running, finished, failed, or cancelled.
When we create a task, the event loop can pick it up and start working
on it. We don't need to explicitly start it with await like with
coroutines.
We use await later to wait for the task and get its return value.
That's how we do it:
question = "hello"
guardrail_task = asyncio.create_task(fake_guardrail(question))
agent_task = asyncio.create_task(fake_agent(question))
decision = await guardrail_task
answer = await agent_task
Parallel flow
Now we can put the full parallel runner into a function:
async def run_in_parallel(question: str) -> str:
guardrail_task = asyncio.create_task(fake_guardrail(question))
agent_task = asyncio.create_task(fake_agent(question))
decision = await guardrail_task
if decision.fail:
return f"[INPUT BLOCKED] {decision.reasoning}"
answer = await agent_task
return answer
Call it:
await run_in_parallel("hello")
The full sequence looks like this:
This code takes about two seconds because the one-second guardrail and the two-second agent overlap.
Guardrail blocks
There's still one problem with our code.
If the guardrail blocks the request, run_in_parallel returns the block
message. But the agent task will keep running even though we won't use
its answer.
So we need to cancel the agent task when the guardrail blocks.
Let's create a different fake guardrail:
async def fake_blocking_guardrail(question: str) -> GuardrailDecision:
print("[guardrail] start")
await asyncio.sleep(1)
print("[guardrail] block")
return GuardrailDecision(
reasoning="The question is off topic.",
fail=True,
)
We'll use the same code, but switch to the blocking guardrail:
async def run_in_parallel_blocking(question: str) -> str:
guardrail_task = asyncio.create_task(fake_blocking_guardrail(question))
agent_task = asyncio.create_task(fake_agent(question))
decision = await guardrail_task
if decision.fail:
return f"[INPUT BLOCKED] {decision.reasoning}"
answer = await agent_task
return answer
Run it:
await run_in_parallel_blocking("hello")
Even though the guardrail blocks the request, the agent task
still continues and prints [agent] done.
Task cancellation
To stop the agent task, we use the cancel() method on the task:
guardrail_task = asyncio.create_task(fake_blocking_guardrail("hello"))
agent_task = asyncio.create_task(fake_agent("hello"))
decision = await guardrail_task
if decision.fail:
agent_task.cancel()
print("[agent] cancellation requested")
cancel() requests cancellation from the event loop.
If the task is already waiting at an await, Python raises
asyncio.CancelledError, so the task can stop right away.
If the task is running, cancellation can't interrupt it. Python can raise
asyncio.CancelledError only when the task reaches the next await.
When the waiting task is interrupted, Python doesn't throw it away. The
event loop resumes the task only to deliver asyncio.CancelledError.
That's why, after cancelling, we still have to await on it and process
the exception:
guardrail_task = asyncio.create_task(fake_blocking_guardrail("hello"))
agent_task = asyncio.create_task(fake_agent("hello"))
decision = await guardrail_task
if decision.fail:
agent_task.cancel()
print("[agent] cancellation requested")
try:
await agent_task
except asyncio.CancelledError:
print("[agent] cancelled")
In our fake agent, the flow is:
- The agent task starts running.
- It reaches
await asyncio.sleep(2)and moves to the waiting state. - The guardrail is triggered, so we call
agent_task.cancel(). - The sleep is interrupted.
- The event loop resumes the task to deliver the exception.
- Python raises
asyncio.CancelledErrorat thatawait. - The agent task moves to the cancelled state.
await agent_taskraisesasyncio.CancelledErrorin our runner.- We catch the exception and print
[agent] cancelled.
Run guardrail with cancellation
Put this together in a function:
async def run_with_cancellation(question: str) -> str:
guardrail_task = asyncio.create_task(fake_blocking_guardrail(question))
agent_task = asyncio.create_task(fake_agent(question))
decision = await guardrail_task
if decision.fail:
agent_task.cancel()
try:
await agent_task
except asyncio.CancelledError:
print("[agent] cancelled")
return f"[INPUT BLOCKED] {decision.reasoning}"
answer = await agent_task
return answer
Run it:
await run_with_cancellation("hello")
We now have the cancellation behavior we want.
The guardrail decision stops the FAQ agent before it finishes work the user will never see.
Update the runner
Use the cancellation logic in the guarded agent class.
First test the concurrent wrapper in the notebook:
import asyncio
from guardrails import InputGuardrail
class GuardedAgent(RunnableAgent):
def __init__(
self,
agent: RunnableAgent,
input_guardrails: list[InputGuardrail] | None = None,
):
self.agent = agent
self.input_guardrails = input_guardrails or []
async def run(self, question: str) -> str:
guardrail_tasks = [
asyncio.create_task(guardrail.check_input(question))
for guardrail in self.input_guardrails
]
agent_task = asyncio.create_task(self.agent.run(question))
for task in asyncio.as_completed(guardrail_tasks):
decision = await task
if decision.fail:
agent_task.cancel()
try:
await agent_task
except asyncio.CancelledError:
pass
return f"[INPUT BLOCKED] {decision.reasoning}"
return await agent_task
Create the concurrent guarded agent:
from guardrails import GuardedAgent
guarded_agent = GuardedAgent(
agent=agent,
input_guardrails=[topic_guardrail],
)
The guarded agent starts the FAQ agent and topic guardrail together. If the guardrail passes, it returns the agent answer. If the guardrail blocks, it cancels the agent task and returns the block message.
After this works, replace the sequential GuardedAgent in
guardrails.py with this concurrent version and add asyncio to that
file's imports. Then import the class back into the notebook. Autoreload
picks up later edits to the file.
Exercise
Run the concurrent guarded runner with two questions:
- How do I set up Docker for the course?
- Can you recommend a pizza recipe?
For the off-topic question, confirm that the runner returns the block message.
Next we add output guardrails. Those checks run after the FAQ agent answers, before the user sees the response.