Part 4: Temporal motivation
The notebook ingestion loop now has several places that can fail:
- downloading the podcast YAML over the network,
- checking whether an Elasticsearch document exists,
- fetching subtitles through YouTube and a proxy,
- writing the final document to Elasticsearch.
The workshop uses Temporal because these are workflow steps that need retries and durable execution. A normal script exits when an exception escapes the loop. With Temporal, each external operation becomes an activity, and the workflow can retry failed activities while preserving progress.
Install Temporal
Install the Temporal CLI. On Linux, use:
mkdir -p ~/bin
echo 'PATH="${PATH}:~/bin"' >> ~/.bashrc
source ~/.bashrc
wget 'https://temporal.download/cli/archive/latest?platform=linux&arch=amd64' -O temporal.tar.gz
tar -xzf temporal.tar.gz
mv temporal ~/bin/
rm temporal.tar.gz LICENSE
For macOS or Windows, change the platform query parameter to darwin or
windows. For Arm machines, use arch=arm64.
Check the version:
temporal -v
For example, a version in this family is fine:
$ temporal -v
temporal version 1.5.1 (Server 1.29.1, UI 2.42.1)
Start the local development server:
temporal server start-dev
Open the UI:
http://localhost:8233
start-dev uses an in-memory database. If you want local persistence, start
it with a file:
temporal server start-dev --db-filename temporal.db
You can also run the dev server through Docker:
docker run --rm -p 7233:7233 -p 8233:8233 temporalio/temporal server start-dev --ip 0.0.0.0
Add the Temporal Python SDK to flow/:
uv add temporalio
Convert the notebook into Python
Before creating a Temporal workflow, turn the notebook into a script:
uv run jupyter nbconvert --to=script notebook.ipynb
The generated script is messy because notebooks accumulate exploration code. Clean it into three files:
create_index.pyfor the Elasticsearch index creation.activities.pyfor the operations Temporal can retry.workflow.pyfor the durable orchestration.
The create_index.py file is the index creation code from
Part 2: Run Elasticsearch. This version deletes
an existing podcasts index and recreates it:
from elasticsearch import Elasticsearch
es = Elasticsearch("http://localhost:9200")
index_name = "podcasts"
if es.indices.exists(index=index_name):
es.indices.delete(index=index_name)
es.indices.create(index=index_name, body=index_settings)
print(f"Index '{index_name}' created successfully")
Keep the full stopwords and index_settings from the notebook in that
file. We separate it because index setup is not part of every workflow run.
Extract activities
Temporal activities are the operations that can talk to external systems and
fail. Start activities.py with the imports:
import os
import yaml
import requests
from elasticsearch import Elasticsearch
from youtube_transcript_api import YouTubeTranscriptApi
from youtube_transcript_api.proxies import GenericProxyConfig
from temporalio import activity
Move proxy creation into a helper. The workshop code uses the misspelled name
cteate_proxy_config; you can keep it for parity or rename it to
create_proxy_config in your own code:
def cteate_proxy_config():
proxy_user = os.environ['PROXY_USER']
proxy_password = os.environ['PROXY_PASSWORD']
proxy_base_url = os.environ['PROXY_BASE_URL']
proxy_url = f'http://{proxy_user}:{proxy_password}@{proxy_base_url}'
return GenericProxyConfig(
http_url=proxy_url,
https_url=proxy_url,
)
Keep the subtitle formatting helpers from the notebook:
def format_timestamp(seconds: float) -> str:
total_seconds = int(seconds)
hours, remainder = divmod(total_seconds, 3600)
minutes, secs = divmod(remainder, 60)
if hours == 0:
return f"{minutes}:{secs:02}"
return f"{hours}:{minutes:02}:{secs:02}"
def make_subtitles(transcript) -> str:
lines = []
for entry in transcript:
ts = format_timestamp(entry.start)
text = entry.text.replace('\n', ' ')
lines.append(ts + ' ' + text)
return '\n'.join(lines)
The YouTube activity is a class because it owns proxy configuration. The worker will create one instance and register its method as an activity:
class YouTubeActivities:
def __init__(self, use_proxy: bool = True):
if use_proxy:
self.proxy_config = cteate_proxy_config()
else:
self.proxy_config = None
@activity.defn
def fetch_subtitles(self, video_id):
ytt_api = YouTubeTranscriptApi(proxy_config=self.proxy_config)
transcript = ytt_api.fetch(video_id)
subtitles = make_subtitles(transcript)
return subtitles
Elasticsearch also becomes a class because both activities share the same client dependency:
class ElasticsearchActivities:
def __init__(self, es_address: str = None):
if es_address is None:
es_address = os.getenv('ELASTICSEARCH_ADDRESS', 'http://localhost:9200')
self.es = Elasticsearch(es_address)
@activity.defn
def video_exists(self, video_id):
resp = self.es.exists(index="podcasts", id=video_id)
return resp.body
Add the indexing activity to the same class:
@activity.defn
def index_video(self, video, subtitles):
video_id = video['video_id']
video_title = video['title']
doc = {
"video_id": video_id,
"title": video_title,
"subtitles": subtitles
}
self.es.index(index="podcasts", id=video_id, document=doc)
The podcast discovery function is stateless, so it can be a plain activity:
@activity.defn
def find_podcast_videos(commit_id):
events_url = f'https://raw.githubusercontent.com/DataTalksClub/datatalksclub.github.io/{commit_id}/_data/events.yaml'
raw_yaml = requests.get(events_url).content
events_data = yaml.load(raw_yaml, yaml.CSafeLoader)
podcasts = [d for d in events_data if (d.get('type') == 'podcast') and (d.get('youtube'))]
videos = []
for podcast in podcasts:
_, video_id = podcast['youtube'].split('watch?v=')
if video_id in ['FRi0SUtxdMw', 's8kyzy8V5b8']:
continue
videos.append({'title': podcast['title'], 'video_id': video_id})
return videos
Now the external work is isolated. The next file defines the workflow and worker that execute these activities in Temporal.