Async and await

In the previous lesson, we added a topic guardrail in front of the FAQ agent, but our runner code is sequential.

The runner checks the guardrail before it starts the FAQ agent:

async def run(self, question: str) -> str:
    for guardrail in self.input_guardrails:
        decision = await guardrail.check_input(question)

        if decision.fail:
            return f"[INPUT BLOCKED] {decision.reasoning}"

    return await self.agent.run(question)

First we run the guardrail, then we run the FAQ agent.

For some applications, that's okay: the code is simple, and the agent never starts until the guardrail passes.

But this adds latency, because the guardrail makes one model call and the FAQ agent makes another. When we run them one after another, the user waits for both calls.

It gets worse when we add more checks. If several checks run one after another, the user waits for every check before the agent even starts.

We want to run the guardrail and the FAQ agent in parallel:

  • Start the guardrail check.
  • Start the FAQ agent.
  • If the guardrail blocks the request, cancel the agent task.

We'll use async/await for that. First we await one async call, then we start multiple calls at the same time with asyncio.create_task.

Fake agent

Before we change the real class, we'll use smaller functions to explain the async pattern.

We create two fake async functions:

  • fake_agent - for the agent
  • fake_guardrail - for the guardrail

First create the fake agent:

import asyncio

async def fake_agent(question: str) -> str:
    print("[agent] start")
    await asyncio.sleep(2)
    print("[agent] done")
    return f"answer for: {question}"

We use asyncio.sleep(2) to fake a slow model call. Later we'll replace that wait with a real OpenAI API call.

Async and await

Call the fake agent:

call = fake_agent("hello")
call

Because we added the keyword async to the function definition, the function returns a coroutine object.

A coroutine object stores an async call that we can run later. It keeps the function and the arguments.

The function body isn't running yet, so to run it we await the coroutine:

answer = await call
answer

The code runs, and Python waits until it finishes and gives us the result.

If you create a coroutine and never await it, Python may show this warning:

RuntimeWarning: coroutine 'fake_agent' was never awaited

In practice, we usually create and await the coroutine in one line:

result = await fake_agent("hello")

Event loop

When we await, the current async task reaches a waiting point. Python needs to remember that task, run other ready tasks, and come back when the result arrives.

Internally, Python uses the event loop, which is the scheduler for async tasks.

It keeps track of all async tasks:

  • It maintains a queue of tasks that need to run.
  • Each task is a coroutine in one state: ready to run, running, or waiting.

At a high level, it does this:

  1. Pick a ready task.
  2. Run it until it finishes or reaches await.
  3. If it reaches await, mark that task as waiting.
  4. Pick another task from the queue.
  5. If there are no tasks, wait until there's work in the queue.

In our fake agent, the wait happens at await asyncio.sleep(2):

async def fake_agent(question: str) -> str:
    print("[agent] start")
    await asyncio.sleep(2)
    print("[agent] done")
    return f"answer for: {question}"

During that sleep, the code isn't doing useful work, so the event loop can run another task instead of sitting idle.

In the real FAQ agent, the wait is the OpenAI API call in both the guardrail function and the agent. Both wait for the API to return a response.

During that wait, the event loop can run something else. For example, if we're already running the guardrail call to the LLM, it can run the main agent.

But if we used time.sleep(2) instead of asyncio.sleep, Python would block the thread. The task wouldn't change status to "waiting", so the event loop can't pick anything else. It has to wait until the sleep finishes.

That's why we use the async OpenAI client: while one OpenAI request is waiting, we can run another request.

Guardrail

We discussed async/await, so now let's add the second fake routine: the guardrail.

In the real code, this would be guardrail.check_input(question).

We'll use the same GuardrailDecision return type:

async def fake_guardrail(question: str) -> GuardrailDecision:
    print("[guardrail] start")
    await asyncio.sleep(1)
    print("[guardrail] pass")

    return GuardrailDecision(
        reasoning="The question is allowed.",
        fail=False,
    )

It's similar to our fake agent, but faster: we sleep only for 1 second.

Run sequentially

Wrap this version in a function:

async def run_sequentially(question: str) -> str:
    decision = await fake_guardrail(question)

    if decision.fail:
        return f"[INPUT BLOCKED] {decision.reasoning}"

    answer = await fake_agent(question)
    return answer

Run it:

await run_sequentially("hello")

The code takes about three seconds. The guardrail waits for one second, then the agent waits for two more seconds.

This matches the real runner: one guardrail call and one agent call. The second call still starts only after the first one finishes.

Start both tasks

With await, Python starts the coroutine and waits for the result before moving to the next line:

decision = await fake_guardrail("hello")

Remember that without await, this only creates a coroutine object and won't run the function body.

So with only await, we can't run two model calls in parallel. We need a way to start the guardrail and the agent at the same time.

We use asyncio.create_task for that:

question = "hello"

guardrail_task = asyncio.create_task(fake_guardrail(question))
agent_task = asyncio.create_task(fake_agent(question))

A task is a coroutine scheduled on the event loop. When we create a task, we add it to the queue, so the event loop can pick it up.

After we create both tasks, the flow is:

  • The event loop runs the guardrail until it reaches await.
  • While the guardrail is waiting, the event loop picks the next task.
  • The next task is the agent, which also runs until it reaches await.
  • Now both tasks are waiting, so the event loop waits too.
  • The guardrail finishes waiting first because it sleeps only for 1 second.
  • The event loop resumes the guardrail and gets the decision.
  • Later, the agent finishes waiting after 2 seconds.
  • The event loop resumes the agent and gets the answer.

Await task results

Notice the call to create_task:

question = "hello"
guardrail_task = asyncio.create_task(fake_guardrail(question))

We pass the coroutine to create_task, we don't await:

# this won't work
guardrail_task = asyncio.create_task(await fake_guardrail("hello"))

create_task schedules the coroutine and returns an asyncio.Task object.

It's not the final result yet, it's a wrapper around a coroutine. It allows Python to know if the coroutine is running, finished, failed, or cancelled.

When we create a task, the event loop can pick it up and start working on it. We don't need to explicitly start it with await like with coroutines.

We use await later to wait for the task and get its return value.

That's how we do it:

question = "hello"

guardrail_task = asyncio.create_task(fake_guardrail(question))
agent_task = asyncio.create_task(fake_agent(question))

decision = await guardrail_task
answer = await agent_task

Parallel flow

Now we can put the full parallel runner into a function:

async def run_in_parallel(question: str) -> str:
    guardrail_task = asyncio.create_task(fake_guardrail(question))
    agent_task = asyncio.create_task(fake_agent(question))

    decision = await guardrail_task

    if decision.fail:
        return f"[INPUT BLOCKED] {decision.reasoning}"

    answer = await agent_task
    return answer

Call it:

await run_in_parallel("hello")

The full sequence looks like this:

sequenceDiagram participant Notebook participant EventLoop as Event loop participant Guardrail as guardrail task participant Agent as agent task Notebook->>EventLoop: create guardrail task Notebook->>EventLoop: create agent task Notebook->>EventLoop: await guardrail task EventLoop->>Guardrail: run until await Guardrail-->>EventLoop: waiting for 1 second EventLoop->>Agent: run until await Agent-->>EventLoop: waiting for 2 seconds Guardrail-->>EventLoop: ready EventLoop->>Guardrail: resume EventLoop-->>Notebook: guardrail result Agent-->>EventLoop: ready EventLoop->>Agent: resume EventLoop-->>Notebook: agent result

This code takes about two seconds because the one-second guardrail and the two-second agent overlap.

Guardrail blocks

There's still one problem with our code.

If the guardrail blocks the request, run_in_parallel returns the block message. But the agent task will keep running even though we won't use its answer.

So we need to cancel the agent task when the guardrail blocks.

Let's create a different fake guardrail:

async def fake_blocking_guardrail(question: str) -> GuardrailDecision:
    print("[guardrail] start")
    await asyncio.sleep(1)
    print("[guardrail] block")

    return GuardrailDecision(
        reasoning="The question is off topic.",
        fail=True,
    )

We'll use the same code, but switch to the blocking guardrail:

async def run_in_parallel_blocking(question: str) -> str:
    guardrail_task = asyncio.create_task(fake_blocking_guardrail(question))
    agent_task = asyncio.create_task(fake_agent(question))

    decision = await guardrail_task

    if decision.fail:
        return f"[INPUT BLOCKED] {decision.reasoning}"

    answer = await agent_task
    return answer

Run it:

await run_in_parallel_blocking("hello")

Even though the guardrail blocks the request, the agent task still continues and prints [agent] done.

Task cancellation

To stop the agent task, we use the cancel() method on the task:

guardrail_task = asyncio.create_task(fake_blocking_guardrail("hello"))
agent_task = asyncio.create_task(fake_agent("hello"))

decision = await guardrail_task

if decision.fail:
    agent_task.cancel()
    print("[agent] cancellation requested")

cancel() requests cancellation from the event loop.

If the task is already waiting at an await, Python raises asyncio.CancelledError, so the task can stop right away.

If the task is running, cancellation can't interrupt it. Python can raise asyncio.CancelledError only when the task reaches the next await.

When the waiting task is interrupted, Python doesn't throw it away. The event loop resumes the task only to deliver asyncio.CancelledError.

That's why, after cancelling, we still have to await on it and process the exception:

guardrail_task = asyncio.create_task(fake_blocking_guardrail("hello"))
agent_task = asyncio.create_task(fake_agent("hello"))

decision = await guardrail_task

if decision.fail:
    agent_task.cancel()
    print("[agent] cancellation requested")

    try:
        await agent_task
    except asyncio.CancelledError:
        print("[agent] cancelled")

In our fake agent, the flow is:

  1. The agent task starts running.
  2. It reaches await asyncio.sleep(2) and moves to the waiting state.
  3. The guardrail is triggered, so we call agent_task.cancel().
  4. The sleep is interrupted.
  5. The event loop resumes the task to deliver the exception.
  6. Python raises asyncio.CancelledError at that await.
  7. The agent task moves to the cancelled state.
  8. await agent_task raises asyncio.CancelledError in our runner.
  9. We catch the exception and print [agent] cancelled.

Run guardrail with cancellation

Put this together in a function:

async def run_with_cancellation(question: str) -> str:
    guardrail_task = asyncio.create_task(fake_blocking_guardrail(question))
    agent_task = asyncio.create_task(fake_agent(question))

    decision = await guardrail_task

    if decision.fail:
        agent_task.cancel()

        try:
            await agent_task
        except asyncio.CancelledError:
            print("[agent] cancelled")

        return f"[INPUT BLOCKED] {decision.reasoning}"

    answer = await agent_task
    return answer

Run it:

await run_with_cancellation("hello")

We now have the cancellation behavior we want.

The guardrail decision stops the FAQ agent before it finishes work the user will never see.

Update the runner

Use the cancellation logic in the guarded agent class.

First test the concurrent wrapper in the notebook:

import asyncio
from guardrails import InputGuardrail

class GuardedAgent(RunnableAgent):
    def __init__(
        self,
        agent: RunnableAgent,
        input_guardrails: list[InputGuardrail] | None = None,
    ):
        self.agent = agent
        self.input_guardrails = input_guardrails or []

    async def run(self, question: str) -> str:
        guardrail_tasks = [
            asyncio.create_task(guardrail.check_input(question))
            for guardrail in self.input_guardrails
        ]
        agent_task = asyncio.create_task(self.agent.run(question))

        for task in asyncio.as_completed(guardrail_tasks):
            decision = await task

            if decision.fail:
                agent_task.cancel()

                try:
                    await agent_task
                except asyncio.CancelledError:
                    pass

                return f"[INPUT BLOCKED] {decision.reasoning}"

        return await agent_task

Create the concurrent guarded agent:

from guardrails import GuardedAgent

guarded_agent = GuardedAgent(
    agent=agent,
    input_guardrails=[topic_guardrail],
)

The guarded agent starts the FAQ agent and topic guardrail together. If the guardrail passes, it returns the agent answer. If the guardrail blocks, it cancels the agent task and returns the block message.

After this works, replace the sequential GuardedAgent in guardrails.py with this concurrent version and add asyncio to that file's imports. Then import the class back into the notebook. Autoreload picks up later edits to the file.

Exercise

Run the concurrent guarded runner with two questions:

  • How do I set up Docker for the course?
  • Can you recommend a pizza recipe?

For the off-topic question, confirm that the runner returns the block message.

Next we add output guardrails. Those checks run after the FAQ agent answers, before the user sees the response.

Questions & Answers

Sign up to ask questions, track your progress, and get access to other workshops · Already have an account? Sign in