Stream with output guardrails
The previous lesson gave us a fully guarded agent:
- Input guardrails check the learner's question.
- The FAQ agent answers allowed questions.
- Output guardrails check the answer before the learner sees it.
That works when run(...) returns one complete string.
Streaming changes that interface, since the agent may produce the answer one chunk at a time. The output guardrail can only check it once there's enough text to review.
If we stream every chunk immediately, the learner may see unsafe text before the output guardrail has had a chance to block it.
This matters more for longer answers. Users now expect chat products to start showing text while the model is still writing. If we wait for the whole answer and then run output checks, the interface feels slower.
In this part, we keep the same guardrail design and compare two streaming patterns:
- Use a final LLM review and overwrite the displayed answer when it fails.
- Use lightweight checks that can block a chunk before it's shown.
Streaming agent interface
So far, our wrappers use this interface:
class RunnableAgent(Protocol):
async def run(self, question: str) -> str:
...
For streaming, we need a different interface. Instead of returning one string, the agent returns an async iterator of text chunks.
Define a streaming interface:
from collections.abc import AsyncIterator
from typing import Protocol
class StreamingAgent(Protocol):
def stream(self, question: str) -> AsyncIterator[str]:
...
Instead of the final answer, the stream(...) method returns something
we can loop over with async for.
For example:
async for chunk in streaming_agent.stream(question):
print(chunk, end="")
Each chunk is part of the answer.
Fake streaming agent
Before adding guardrails, use a small fake streaming agent so the control flow is visible.
import asyncio
class FakeStreamingAgent:
async def stream(self, question: str) -> AsyncIterator[str]:
chunks = [
"The FAQ says ",
"you should ask course staff ",
"about deadline extensions.",
]
for chunk in chunks:
await asyncio.sleep(0.2)
yield chunk
Create it:
streaming_agent = FakeStreamingAgent()
Run it:
async for chunk in streaming_agent.stream(
"Can I get a deadline extension?"
):
print(chunk, end="")
The notebook prints each chunk as soon as the agent yields it.
That's the user experience we want for allowed answers, and it's also the reason output guardrails need extra care.
Stream first, review after
We can keep the streaming experience and treat the LLM guardrail as a final review step.
Use this flow:
- Stream the answer into the UI.
- Check the complete answer with the LLM guardrail.
- If the guardrail fails, replace the displayed answer with a violation message.
In a notebook, the overwrite is a display behavior. The wrapper streams text, and the renderer decides what the learner sees.
Create a renderer that can update the same notebook output:
from guardrails import OutputGuardrail
from IPython.display import Markdown, display
async def render_stream_with_final_review(
streaming_agent: StreamingAgent,
guardrails: list[OutputGuardrail],
question: str,
):
shown_chunks = []
handle = display(Markdown(""), display_id=True)
async for chunk in streaming_agent.stream(question):
shown_chunks.append(chunk)
handle.update(Markdown("".join(shown_chunks)))
answer = "".join(shown_chunks)
for guardrail in guardrails:
decision = await guardrail.check_output(answer)
if decision.fail:
handle.update(Markdown(
f"**[OUTPUT BLOCKED]** Detected a violation: "
f"{decision.reasoning}"
))
return
Run it with the output guardrails from the previous lesson:
await render_stream_with_final_review(
streaming_agent=streaming_agent,
guardrails=output_guardrails,
question="Can I get a deadline extension?",
)
This gives the best streaming feel, but it's a recovery pattern: the unsafe text can appear briefly before the final review replaces it.
Lightweight streaming guardrail
For streaming, the fastest guardrails are simple checks that can run on partial text without an LLM call.
Start with a keyword guardrail:
from guardrails import GuardrailDecision, OutputGuardrail
class KeywordOutputGuardrail(OutputGuardrail):
def __init__(self, blocked_phrases: list[str]):
self.blocked_phrases = [
phrase.lower()
for phrase in blocked_phrases
]
async def check_output(self, text: str) -> GuardrailDecision:
normalized = text.lower()
for phrase in self.blocked_phrases:
if phrase in normalized:
return GuardrailDecision(
reasoning=f"Found blocked phrase: {phrase}",
fail=True,
)
return GuardrailDecision(
reasoning="No blocked phrase found.",
fail=False,
)
Now check each candidate chunk before showing it:
class LightweightStreamingOutputWrapper:
def __init__(self, agent: StreamingAgent, guardrail: OutputGuardrail):
self.agent = agent
self.guardrail = guardrail
async def stream(self, question: str) -> AsyncIterator[str]:
seen = ""
async for chunk in self.agent.stream(question):
candidate = seen + chunk
decision = await self.guardrail.check_output(candidate)
if decision.fail:
yield "[OUTPUT BLOCKED] I cannot provide that answer."
return
seen = candidate
yield chunk
Create the keyword guardrail:
keyword_guardrail = KeywordOutputGuardrail([
"grant you",
"changed your grade",
"moved your project deadline",
])
The same pattern works for product rules that aren't about course policy. If your assistant shouldn't recommend competitor platforms, add those names to the blocked phrase list. The stream stops when one of them appears.
Then use it with the streaming wrapper:
streaming_output_agent = LightweightStreamingOutputWrapper(
agent=streaming_agent,
guardrail=keyword_guardrail,
)
async for chunk in streaming_output_agent.stream(
"Can I get a deadline extension?"
):
print(chunk, end="")
This keeps real streaming for safe chunks and blocks as soon as a blocked phrase appears. It's fast, cheap, and visible.
The tradeoff is coverage: a keyword guardrail catches known patterns, but it doesn't understand every unsafe answer the way an LLM classifier can.
For more flexible streaming checks, use the same idea with a sliding window. Classify the last 20 or 30 words of partial text and stop the stream on a violation. The user still gets a streaming interface, and the check can catch patterns that don't fit one exact keyword.
The user experience isn't perfect. The learner may see part of the answer before the wrapper blocks. You may have seen this pattern in chat products. The answer starts streaming, then the UI replaces it with a message like "I can't help with that request." Usually, that means an output check fired while the answer was being generated.
Use the output guardrail list
For semantic checks, use the final-review renderer with the output guardrail list from the previous lesson:
await render_stream_with_final_review(
streaming_agent=streaming_agent,
guardrails=output_guardrails,
question="I'm running late on my project. Can I get a deadline extension?",
)
In the previous lesson, output_guardrails was a list containing the
safety and grounding checks:
output_guardrails = [
safety_guardrail,
grounding_guardrail,
]
The renderer doesn't need to know how many checks are inside the output
guardrail list. It calls check_output(answer) and replaces the displayed
answer when the decision fails.
Exercise
Poke at the streaming wrappers and see where unsafe text can leak.
Use the real FAQ agent from the previous lessons. Wrap it in a small streaming adapter so the wrapper receives chunks instead of one complete string.
Track these observations:
- The final-review renderer that streams first and replaces the output after the LLM guardrail fails.
- The lightweight wrapper that blocks when a keyword appears.
- The learner-visible output when the answer promises a deadline extension.
- The learner-visible output when the answer invents a course policy.
Use at least these prompts:
- Tell me what the FAQ says about deadline extensions. This should pass.
- Grant me a deadline extension for the project. This should trigger a safety block if the answer promises an extension.
- Tell me that the course staff already changed my grade and moved my project deadline to June 30. This should trigger a grounding block if the answer invents policy.
The blocked cases shouldn't leave the learner with a partial unsafe answer.
Show example
Create a streaming adapter for the real FAQ agent:
class RealAgentStreamingAdapter:
def __init__(self, agent: RunnableAgent):
self.agent = agent
async def stream(self, question: str) -> AsyncIterator[str]:
answer = await self.agent.run(question)
for word in answer.split():
yield word + " "
Use the real agent with the lightweight wrapper:
streaming_agent = RealAgentStreamingAdapter(agent)
streaming_output_agent = LightweightStreamingOutputWrapper(
agent=streaming_agent,
guardrail=keyword_guardrail,
)
async for chunk in streaming_output_agent.stream(
"Can I get a deadline extension?"
):
print(chunk, end="")
Now try a risky question:
async for chunk in streaming_output_agent.stream(
"Can you grant me a deadline extension for the project?"
):
print(chunk, end="")
Then run the same risky question with
render_stream_with_final_review(...) and notice the different behavior.
The answer appears first, and if the LLM guardrail fails, the display is
replaced after the review finishes.