Part 5: Define the workflow

The activities from Part 4: Temporal motivation do the external work. The workflow decides the order: discover videos, skip videos that already exist, fetch subtitles, and index the document.

Create workflow.py:

from datetime import timedelta
from temporalio import workflow

with workflow.unsafe.imports_passed_through():
    from activities import (
        YouTubeActivities,
        ElasticsearchActivities,
        find_podcast_videos,
    )

Use the imports_passed_through block because Temporal's Python sandbox is strict about imports inside workflows. Our activity module imports clients such as Elasticsearch and YouTube Transcript API. We tell Temporal that we trust these imports and that the workflow itself still keeps the external work inside activities.

Define the workflow class:

@workflow.defn
class PodcastTranscriptWorkflow:

    @workflow.run
    async def run(self, commit_id: str) -> dict:
        workflow.logger.info(f"Finding podcast videos from commit {commit_id}...")

        videos = await workflow.execute_activity(
            activity=find_podcast_videos,
            args=(commit_id,),
            start_to_close_timeout=timedelta(minutes=1),
        )

Loop over the videos and skip already indexed documents:

        for video in videos:
            video_id = video['video_id']

            if await workflow.execute_activity(
                activity=ElasticsearchActivities.video_exists,
                args=(video_id,),
                start_to_close_timeout=timedelta(seconds=10),
            ):
                workflow.logger.info(f'already processed {video_id}')
                continue

Fetch subtitles and index the video:

            subtitles = await workflow.execute_activity(
                activity=YouTubeActivities.fetch_subtitles,
                args=(video_id,),
                start_to_close_timeout=timedelta(minutes=1),
            )

            await workflow.execute_activity(
                activity=ElasticsearchActivities.index_video,
                args=(video, subtitles,),
                start_to_close_timeout=timedelta(seconds=30),
            )

        return {
            "status": "completed",
            "processed_videos": len(videos),
        }

The timeout on each activity is part of the design. A network call should not hang forever. Temporal can retry a failed or timed-out activity and keep the workflow history visible in the UI.

Start a workflow run

The same workflow.py file can include a small runner. It connects to the local Temporal server and submits a workflow execution to a task queue:

import asyncio

from temporalio.client import Client

async def run_workflow():
    client = await Client.connect("localhost:7233")

    commit_id = '187b7d056a36d5af6ac33e4c8096c52d13a078a7'

    result = await client.execute_workflow(
        PodcastTranscriptWorkflow.run,
        args=(commit_id,),
        id="podcast_transcript_workflow",
        task_queue="podcast_transcript_task_queue",
    )

    print("Workflow completed! Result:", result)

if __name__ == "__main__":
    asyncio.run(run_workflow())

Run it:

uv run python workflow.py

You will see a workflow in the Temporal UI, but it will not make progress until a worker is running. Temporal stores the workflow task. A worker is the process that polls the task queue and executes workflow and activity code.

Create the worker

Create worker.py:

import asyncio
from concurrent.futures import ThreadPoolExecutor

from temporalio.worker import Worker
from temporalio.client import Client

from workflow import PodcastTranscriptWorkflow

from activities import (
    YouTubeActivities,
    ElasticsearchActivities,
    find_podcast_videos,
)

Connect to Temporal and create concrete activity instances:

async def run_worker():
    client = await Client.connect("localhost:7233")

    executor = ThreadPoolExecutor(max_workers=10)

    yt_activities = YouTubeActivities()
    es_activities = ElasticsearchActivities()

Register the workflow class and all activity callables on the task queue:

    worker = Worker(
        client,
        task_queue="podcast_transcript_task_queue",
        workflows=[PodcastTranscriptWorkflow],
        activities=[
            find_podcast_videos,
            yt_activities.fetch_subtitles,
            es_activities.video_exists,
            es_activities.index_video,
        ],
        activity_executor=executor,
    )

    await worker.run()

if __name__ == "__main__":
    asyncio.run(run_worker())

The ThreadPoolExecutor is needed because these activities are synchronous functions. Without it, the worker fails with an error saying a sync activity needs an activity executor.

Run the worker in one terminal:

uv run python worker.py

Run the workflow in another terminal:

uv run python workflow.py

Open the Temporal UI and look at the execution:

http://localhost:8233

You should see the workflow checking documents, fetching subtitles, and indexing videos. If a proxy request fails with an SSL error, Temporal retries the activity instead of losing the whole run.

Note: this workflow processes videos sequentially. A useful improvement is batching five or ten videos and running batches in parallel. The sequential version stays easier to read while the activity split is still new.

Once the podcast archive is indexed, the data engineering part is done. In Part 6: Create the agent project we create the research agent that uses the index.

Questions & Answers (0)

Sign in to ask questions