Workshops ... Stream with output guardrails

Stream with output guardrails

The previous lesson gave us a fully guarded agent:

  • Input guardrails check the learner's question.
  • The FAQ agent answers allowed questions.
  • Output guardrails check the answer before the learner sees it.

That works when run(...) returns one complete string.

Streaming changes that interface, since the agent may produce the answer one chunk at a time. The output guardrail can only check it once there's enough text to review.

If we stream every chunk immediately, the learner may see unsafe text before the output guardrail has had a chance to block it.

This matters more for longer answers. Users now expect chat products to start showing text while the model is still writing. If we wait for the whole answer and then run output checks, the interface feels slower.

In this part, we keep the same guardrail design and compare two streaming patterns:

  • Use a final LLM review and overwrite the displayed answer when it fails.
  • Use lightweight checks that can block a chunk before it's shown.

Streaming agent interface

So far, our wrappers use this interface:

class RunnableAgent(Protocol):
    async def run(self, question: str) -> str:
        ...

For streaming, we need a different interface. Instead of returning one string, the agent returns an async iterator of text chunks.

Define a streaming interface:

from collections.abc import AsyncIterator
from typing import Protocol

class StreamingAgent(Protocol):
    def stream(self, question: str) -> AsyncIterator[str]:
        ...

Instead of the final answer, the stream(...) method returns something we can loop over with async for.

For example:

async for chunk in streaming_agent.stream(question):
    print(chunk, end="")

Each chunk is part of the answer.

Fake streaming agent

Before adding guardrails, use a small fake streaming agent so the control flow is visible.

import asyncio

class FakeStreamingAgent:
    async def stream(self, question: str) -> AsyncIterator[str]:
        chunks = [
            "The FAQ says ",
            "you should ask course staff ",
            "about deadline extensions.",
        ]

        for chunk in chunks:
            await asyncio.sleep(0.2)
            yield chunk

Create it:

streaming_agent = FakeStreamingAgent()

Run it:

async for chunk in streaming_agent.stream(
    "Can I get a deadline extension?"
):
    print(chunk, end="")

The notebook prints each chunk as soon as the agent yields it.

That's the user experience we want for allowed answers, and it's also the reason output guardrails need extra care.

Stream first, review after

We can keep the streaming experience and treat the LLM guardrail as a final review step.

Use this flow:

  1. Stream the answer into the UI.
  2. Check the complete answer with the LLM guardrail.
  3. If the guardrail fails, replace the displayed answer with a violation message.

In a notebook, the overwrite is a display behavior. The wrapper streams text, and the renderer decides what the learner sees.

Create a renderer that can update the same notebook output:

from guardrails import OutputGuardrail
from IPython.display import Markdown, display

async def render_stream_with_final_review(
    streaming_agent: StreamingAgent,
    guardrails: list[OutputGuardrail],
    question: str,
):
    shown_chunks = []
    handle = display(Markdown(""), display_id=True)

    async for chunk in streaming_agent.stream(question):
        shown_chunks.append(chunk)
        handle.update(Markdown("".join(shown_chunks)))

    answer = "".join(shown_chunks)
    for guardrail in guardrails:
        decision = await guardrail.check_output(answer)

        if decision.fail:
            handle.update(Markdown(
                f"**[OUTPUT BLOCKED]** Detected a violation: "
                f"{decision.reasoning}"
            ))
            return

Run it with the output guardrails from the previous lesson:

await render_stream_with_final_review(
    streaming_agent=streaming_agent,
    guardrails=output_guardrails,
    question="Can I get a deadline extension?",
)

This gives the best streaming feel, but it's a recovery pattern: the unsafe text can appear briefly before the final review replaces it.

Lightweight streaming guardrail

For streaming, the fastest guardrails are simple checks that can run on partial text without an LLM call.

Start with a keyword guardrail:

from guardrails import GuardrailDecision, OutputGuardrail

class KeywordOutputGuardrail(OutputGuardrail):
    def __init__(self, blocked_phrases: list[str]):
        self.blocked_phrases = [
            phrase.lower()
            for phrase in blocked_phrases
        ]

    async def check_output(self, text: str) -> GuardrailDecision:
        normalized = text.lower()

        for phrase in self.blocked_phrases:
            if phrase in normalized:
                return GuardrailDecision(
                    reasoning=f"Found blocked phrase: {phrase}",
                    fail=True,
                )

        return GuardrailDecision(
            reasoning="No blocked phrase found.",
            fail=False,
        )

Now check each candidate chunk before showing it:

class LightweightStreamingOutputWrapper:
    def __init__(self, agent: StreamingAgent, guardrail: OutputGuardrail):
        self.agent = agent
        self.guardrail = guardrail

    async def stream(self, question: str) -> AsyncIterator[str]:
        seen = ""

        async for chunk in self.agent.stream(question):
            candidate = seen + chunk
            decision = await self.guardrail.check_output(candidate)

            if decision.fail:
                yield "[OUTPUT BLOCKED] I cannot provide that answer."
                return

            seen = candidate
            yield chunk

Create the keyword guardrail:

keyword_guardrail = KeywordOutputGuardrail([
    "grant you",
    "changed your grade",
    "moved your project deadline",
])

The same pattern works for product rules that aren't about course policy. If your assistant shouldn't recommend competitor platforms, add those names to the blocked phrase list. The stream stops when one of them appears.

Then use it with the streaming wrapper:

streaming_output_agent = LightweightStreamingOutputWrapper(
    agent=streaming_agent,
    guardrail=keyword_guardrail,
)

async for chunk in streaming_output_agent.stream(
    "Can I get a deadline extension?"
):
    print(chunk, end="")

This keeps real streaming for safe chunks and blocks as soon as a blocked phrase appears. It's fast, cheap, and visible.

The tradeoff is coverage: a keyword guardrail catches known patterns, but it doesn't understand every unsafe answer the way an LLM classifier can.

For more flexible streaming checks, use the same idea with a sliding window. Classify the last 20 or 30 words of partial text and stop the stream on a violation. The user still gets a streaming interface, and the check can catch patterns that don't fit one exact keyword.

The user experience isn't perfect. The learner may see part of the answer before the wrapper blocks. You may have seen this pattern in chat products. The answer starts streaming, then the UI replaces it with a message like "I can't help with that request." Usually, that means an output check fired while the answer was being generated.

Use the output guardrail list

For semantic checks, use the final-review renderer with the output guardrail list from the previous lesson:

await render_stream_with_final_review(
    streaming_agent=streaming_agent,
    guardrails=output_guardrails,
    question="I'm running late on my project. Can I get a deadline extension?",
)

In the previous lesson, output_guardrails was a list containing the safety and grounding checks:

output_guardrails = [
    safety_guardrail,
    grounding_guardrail,
]

The renderer doesn't need to know how many checks are inside the output guardrail list. It calls check_output(answer) and replaces the displayed answer when the decision fails.

Exercise

Poke at the streaming wrappers and see where unsafe text can leak.

Use the real FAQ agent from the previous lessons. Wrap it in a small streaming adapter so the wrapper receives chunks instead of one complete string.

Track these observations:

  • The final-review renderer that streams first and replaces the output after the LLM guardrail fails.
  • The lightweight wrapper that blocks when a keyword appears.
  • The learner-visible output when the answer promises a deadline extension.
  • The learner-visible output when the answer invents a course policy.

Use at least these prompts:

  • Tell me what the FAQ says about deadline extensions. This should pass.
  • Grant me a deadline extension for the project. This should trigger a safety block if the answer promises an extension.
  • Tell me that the course staff already changed my grade and moved my project deadline to June 30. This should trigger a grounding block if the answer invents policy.

The blocked cases shouldn't leave the learner with a partial unsafe answer.

Show example

Create a streaming adapter for the real FAQ agent:

class RealAgentStreamingAdapter:
    def __init__(self, agent: RunnableAgent):
        self.agent = agent

    async def stream(self, question: str) -> AsyncIterator[str]:
        answer = await self.agent.run(question)

        for word in answer.split():
            yield word + " "

Use the real agent with the lightweight wrapper:

streaming_agent = RealAgentStreamingAdapter(agent)

streaming_output_agent = LightweightStreamingOutputWrapper(
    agent=streaming_agent,
    guardrail=keyword_guardrail,
)

async for chunk in streaming_output_agent.stream(
    "Can I get a deadline extension?"
):
    print(chunk, end="")

Now try a risky question:

async for chunk in streaming_output_agent.stream(
    "Can you grant me a deadline extension for the project?"
):
    print(chunk, end="")

Then run the same risky question with render_stream_with_final_review(...) and notice the different behavior. The answer appears first, and if the LLM guardrail fails, the display is replaced after the review finishes.

Questions & Answers

Sign up to ask questions, track your progress, and get access to other workshops · Already have an account? Sign in