Part 4: Temporal motivation

The notebook ingestion loop now has several places that can fail:

  • downloading the podcast YAML over the network,
  • checking whether an Elasticsearch document exists,
  • fetching subtitles through YouTube and a proxy,
  • writing the final document to Elasticsearch.

The workshop uses Temporal because these are workflow steps that need retries and durable execution. A normal script exits when an exception escapes the loop. With Temporal, each external operation becomes an activity, and the workflow can retry failed activities while preserving progress.

Install Temporal

Install the Temporal CLI. On Linux, use:

mkdir -p ~/bin
echo 'PATH="${PATH}:~/bin"' >> ~/.bashrc
source ~/.bashrc

wget 'https://temporal.download/cli/archive/latest?platform=linux&arch=amd64' -O temporal.tar.gz
tar -xzf temporal.tar.gz
mv temporal ~/bin/
rm temporal.tar.gz LICENSE

For macOS or Windows, change the platform query parameter to darwin or windows. For Arm machines, use arch=arm64.

Check the version:

temporal -v

For example, a version in this family is fine:

$ temporal -v
temporal version 1.5.1 (Server 1.29.1, UI 2.42.1)

Start the local development server:

temporal server start-dev

Open the UI:

http://localhost:8233

start-dev uses an in-memory database. If you want local persistence, start it with a file:

temporal server start-dev --db-filename temporal.db

You can also run the dev server through Docker:

docker run --rm -p 7233:7233 -p 8233:8233 temporalio/temporal server start-dev --ip 0.0.0.0

Add the Temporal Python SDK to flow/:

uv add temporalio

Convert the notebook into Python

Before creating a Temporal workflow, turn the notebook into a script:

uv run jupyter nbconvert --to=script notebook.ipynb

The generated script is messy because notebooks accumulate exploration code. Clean it into three files:

  • create_index.py for the Elasticsearch index creation.
  • activities.py for the operations Temporal can retry.
  • workflow.py for the durable orchestration.

The create_index.py file is the index creation code from Part 2: Run Elasticsearch. This version deletes an existing podcasts index and recreates it:

from elasticsearch import Elasticsearch

es = Elasticsearch("http://localhost:9200")

index_name = "podcasts"

if es.indices.exists(index=index_name):
    es.indices.delete(index=index_name)

es.indices.create(index=index_name, body=index_settings)
print(f"Index '{index_name}' created successfully")

Keep the full stopwords and index_settings from the notebook in that file. We separate it because index setup is not part of every workflow run.

Extract activities

Temporal activities are the operations that can talk to external systems and fail. Start activities.py with the imports:

import os

import yaml
import requests

from elasticsearch import Elasticsearch
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.proxies import GenericProxyConfig

from temporalio import activity

Move proxy creation into a helper. The workshop code uses the misspelled name cteate_proxy_config; you can keep it for parity or rename it to create_proxy_config in your own code:

def cteate_proxy_config():
    proxy_user = os.environ['PROXY_USER']
    proxy_password = os.environ['PROXY_PASSWORD']
    proxy_base_url = os.environ['PROXY_BASE_URL']

    proxy_url = f'http://{proxy_user}:{proxy_password}@{proxy_base_url}'

    return GenericProxyConfig(
        http_url=proxy_url,
        https_url=proxy_url,
    )

Keep the subtitle formatting helpers from the notebook:

def format_timestamp(seconds: float) -> str:
    total_seconds = int(seconds)
    hours, remainder = divmod(total_seconds, 3600)
    minutes, secs = divmod(remainder, 60)

    if hours == 0:
        return f"{minutes}:{secs:02}"
    return f"{hours}:{minutes:02}:{secs:02}"

def make_subtitles(transcript) -> str:
    lines = []

    for entry in transcript:
        ts = format_timestamp(entry.start)
        text = entry.text.replace('\n', ' ')
        lines.append(ts + ' ' + text)

    return '\n'.join(lines)

The YouTube activity is a class because it owns proxy configuration. The worker will create one instance and register its method as an activity:

class YouTubeActivities:
    def __init__(self, use_proxy: bool = True):
        if use_proxy:
            self.proxy_config = cteate_proxy_config()
        else:
            self.proxy_config = None

    @activity.defn
    def fetch_subtitles(self, video_id):
        ytt_api = YouTubeTranscriptApi(proxy_config=self.proxy_config)
        transcript = ytt_api.fetch(video_id)
        subtitles = make_subtitles(transcript)
        return subtitles

Elasticsearch also becomes a class because both activities share the same client dependency:

class ElasticsearchActivities:
    def __init__(self, es_address: str = None):
        if es_address is None:
            es_address = os.getenv('ELASTICSEARCH_ADDRESS', 'http://localhost:9200')
        self.es = Elasticsearch(es_address)

    @activity.defn
    def video_exists(self, video_id):
        resp = self.es.exists(index="podcasts", id=video_id)
        return resp.body

Add the indexing activity to the same class:

    @activity.defn
    def index_video(self, video, subtitles):
        video_id = video['video_id']
        video_title = video['title']

        doc = {
            "video_id": video_id,
            "title": video_title,
            "subtitles": subtitles
        }

        self.es.index(index="podcasts", id=video_id, document=doc)

The podcast discovery function is stateless, so it can be a plain activity:

@activity.defn
def find_podcast_videos(commit_id):
    events_url = f'https://raw.githubusercontent.com/DataTalksClub/datatalksclub.github.io/{commit_id}/_data/events.yaml'

    raw_yaml = requests.get(events_url).content
    events_data = yaml.load(raw_yaml, yaml.CSafeLoader)

    podcasts = [d for d in events_data if (d.get('type') == 'podcast') and (d.get('youtube'))]

    videos = []
    for podcast in podcasts:
        _, video_id = podcast['youtube'].split('watch?v=')
        if video_id in ['FRi0SUtxdMw', 's8kyzy8V5b8']:
            continue
        videos.append({'title': podcast['title'], 'video_id': video_id})

    return videos

Now the external work is isolated. The next file defines the workflow and worker that execute these activities in Temporal.

Questions & Answers (0)

Sign in to ask questions