Part 5: Define the workflow
The activities from Part 4: Temporal motivation do the external work. The workflow decides the order: discover videos, skip videos that already exist, fetch subtitles, and index the document.
Create workflow.py:
from datetime import timedelta
from temporalio import workflow
with workflow.unsafe.imports_passed_through():
from activities import (
YouTubeActivities,
ElasticsearchActivities,
find_podcast_videos,
)
Use the imports_passed_through block because Temporal's Python sandbox is
strict about imports inside workflows. Our activity module imports clients
such as Elasticsearch and YouTube Transcript API. We tell Temporal that we
trust these imports and that the workflow itself still keeps the external work
inside activities.
Define the workflow class:
@workflow.defn
class PodcastTranscriptWorkflow:
@workflow.run
async def run(self, commit_id: str) -> dict:
workflow.logger.info(f"Finding podcast videos from commit {commit_id}...")
videos = await workflow.execute_activity(
activity=find_podcast_videos,
args=(commit_id,),
start_to_close_timeout=timedelta(minutes=1),
)
Loop over the videos and skip already indexed documents:
for video in videos:
video_id = video['video_id']
if await workflow.execute_activity(
activity=ElasticsearchActivities.video_exists,
args=(video_id,),
start_to_close_timeout=timedelta(seconds=10),
):
workflow.logger.info(f'already processed {video_id}')
continue
Fetch subtitles and index the video:
subtitles = await workflow.execute_activity(
activity=YouTubeActivities.fetch_subtitles,
args=(video_id,),
start_to_close_timeout=timedelta(minutes=1),
)
await workflow.execute_activity(
activity=ElasticsearchActivities.index_video,
args=(video, subtitles,),
start_to_close_timeout=timedelta(seconds=30),
)
return {
"status": "completed",
"processed_videos": len(videos),
}
The timeout on each activity is part of the design. A network call should not hang forever. Temporal can retry a failed or timed-out activity and keep the workflow history visible in the UI.
Start a workflow run
The same workflow.py file can include a small runner. It connects to the
local Temporal server and submits a workflow execution to a task queue:
import asyncio
from temporalio.client import Client
async def run_workflow():
client = await Client.connect("localhost:7233")
commit_id = '187b7d056a36d5af6ac33e4c8096c52d13a078a7'
result = await client.execute_workflow(
PodcastTranscriptWorkflow.run,
args=(commit_id,),
id="podcast_transcript_workflow",
task_queue="podcast_transcript_task_queue",
)
print("Workflow completed! Result:", result)
if __name__ == "__main__":
asyncio.run(run_workflow())
Run it:
uv run python workflow.py
You will see a workflow in the Temporal UI, but it will not make progress until a worker is running. Temporal stores the workflow task. A worker is the process that polls the task queue and executes workflow and activity code.
Create the worker
Create worker.py:
import asyncio
from concurrent.futures import ThreadPoolExecutor
from temporalio.worker import Worker
from temporalio.client import Client
from workflow import PodcastTranscriptWorkflow
from activities import (
YouTubeActivities,
ElasticsearchActivities,
find_podcast_videos,
)
Connect to Temporal and create concrete activity instances:
async def run_worker():
client = await Client.connect("localhost:7233")
executor = ThreadPoolExecutor(max_workers=10)
yt_activities = YouTubeActivities()
es_activities = ElasticsearchActivities()
Register the workflow class and all activity callables on the task queue:
worker = Worker(
client,
task_queue="podcast_transcript_task_queue",
workflows=[PodcastTranscriptWorkflow],
activities=[
find_podcast_videos,
yt_activities.fetch_subtitles,
es_activities.video_exists,
es_activities.index_video,
],
activity_executor=executor,
)
await worker.run()
if __name__ == "__main__":
asyncio.run(run_worker())
The ThreadPoolExecutor is needed because these activities are synchronous
functions. Without it, the worker fails with an error saying a sync activity
needs an activity executor.
Run the worker in one terminal:
uv run python worker.py
Run the workflow in another terminal:
uv run python workflow.py
Open the Temporal UI and look at the execution:
http://localhost:8233
You should see the workflow checking documents, fetching subtitles, and indexing videos. If a proxy request fails with an SSL error, Temporal retries the activity instead of losing the whole run.
Note: this workflow processes videos sequentially. A useful improvement is batching five or ten videos and running batches in parallel. The sequential version stays easier to read while the activity split is still new.
Once the podcast archive is indexed, the data engineering part is done. In Part 6: Create the agent project we create the research agent that uses the index.