Intro
This blog post is created to show you how SerpApi was integrated into this small app with an explanation of how the app functionates and SerpApi's role in it. We'll go from top to bottom of the code shown below.
What about the demo itself, this demo project was inspired by a tool called Mention.
This project does kind of the same thing but for YouTube videos only. It transcribes videos (even if the video is without captions) and finds the target keyword and returns a URL with the timestamp.
The app doesn't scan/detect new videos all the time via cron job or something similar.
App Demo
Live demo:
Virtual Environment and Libraries Installation
If you want to follow along with your API key (instead of mine), follow these steps:
Clone repository:
$ git clone https://github.com/dimitryzub/youtube-mention-tracker.git
Install dependencies:
$ cd youtube-mention-tracker && pip install -r requriements.txt
Add SerpApi API key for the current shell and all processes started from the current shell:
# used to parse youtube videos, has a plan of 100 free searches
$ export SERPAPI_API_KEY=<your-api-key>
Run the app (this will open streamlit
in your browser):
$ cd youtube-tracker && streamlit run tracker.py
Additionally, you might need to install FFmpeg. To reduce text duplication, have a look at how to Install FFmpeg on Windows, Mac, Linux Ubuntu, and Debian from VideoProc or any other resource on the internet that shows how to do it.
Full Code
from urllib.parse import (parse_qsl, urlsplit)
from serpapi import YoutubeSearch
from pytube import YouTube
import pytube.exceptions as exceptions
from queue import Queue
import streamlit as st
import whisper
import re, os, time
import subprocess
import pathlib
from stqdm import stqdm
import math, base64
import pandas as pd
from dotenv import load_dotenv, find_dotenv
load_dotenv(find_dotenv())
VIDEOS_DOWNLOAD_PATH = pathlib.Path(__file__).parent.resolve() / 'videos'
if VIDEOS_DOWNLOAD_PATH.exists():
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}']) # remove videos on each new run
# create videos folder if not exist. Temporary store videos.
if not VIDEOS_DOWNLOAD_PATH.exists():
#TODO: change `videos/` to VIDEOS_DOWNLOAD_PATH or to subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}']) command
subprocess.run(['rm', '-r', 'videos/'])
subprocess.run(['mkdir', 'videos/'])
def main():
# TODO: add a blog post link to "SerpApi Demo Project"
footer_modified = '''
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
.footer {
position: fixed;
left: 0;
bottom: 0;
width: 100%;
background-color: white;
color: black;
text-align: center;
font-size: 8px;
}
</style>
<div class="footer">
<p align="center" style="padding-top: 200px;">SerpApi Demo Project (<a href="https://github.com/dimitryzub/youtube-mention-tracker">repo</a>)<br>Made with <a href="https://streamlit.io/">Streamlit</a>, <a href="http://serpapi.com/">SerpApi</a>, <a href="https://github.com/pytube/pytube">PyTube</a>, <a href="https://github.com/openai/whisper">Whisper</a> 🧡</p>
</div>
'''
st.markdown(footer_modified, unsafe_allow_html=True)
st.title('📺YouTube Videos Mention Tracker')
st.markdown(body="This demo parses, downloads and transcribes YouTube videos to find target mention(s) inside the video(s). It's similar to [Mention](https://mention.com/en/) but for videos.")
if 'visibility' not in st.session_state:
st.session_state.visibility = 'visible'
st.session_state.disabled = False
SEARCH_QUERY: str = st.text_input(label='Search query', placeholder='Data Nerd', help='Multiple search queries is not supported') # used to parsing youtube videos
TARGET_KEYWORD: str = st.text_input(label='Target keyword', placeholder='SQL', help='Multiple target keywords is not supported') # used for in the speech recognition
NUMBER_OF_VIDEOS_TO_ANALYZE: int = st.slider(label='Number of videos to analyze', min_value=1, max_value=20, help='By default, extracted videos will be no longer than 20 minutes. Its done by filtering YouTube results with [`sp`](https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp) URL parameter.')
SAVE_OPTION = st.selectbox(label='Choose file format to save', options=(None, 'CSV'), help='By default data wont be saved. Choose CSV format if you want to save the results to CSV file.')
# PAGINATION = st.checkbox(label='Enable pagination') # increase amount of videos to parse
# submit button
col1, col2, col3 , col4, col5 = st.columns(5)
with col1:
pass
with col2:
pass
with col4:
pass
with col5:
pass
with col3:
submit_button_holder = st.empty()
submit_search = submit_button_holder.button(label='Find mentions') # centered button
if submit_search and not SEARCH_QUERY:
st.error(body='Looks like you click a button without a search query. Please enter a search query 👆')
st.stop()
if submit_search and SEARCH_QUERY and NUMBER_OF_VIDEOS_TO_ANALYZE:
search_queue = Queue()
params = {
'api_key': os.getenv('SERPAPI_API_KEY'), # https://serpapi.com/manage-api-key
'engine': 'youtube', # search engine
'sp': 'EgIYAw%253D%253D', # filter to control length of the video: https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp
'device': 'desktop', # device type
'search_query': SEARCH_QUERY, # search query
'async': True # async batch requests
}
search = YoutubeSearch(params) # where data extraction happens
results = search.get_dict() # JSON -> Python dict
search_queue.put(results)
videos = []
with st.spinner(text='Parsing YouTube Videos...'):
while not search_queue.empty():
result = search_queue.get()
search_id = result['search_metadata']['id']
# print(f'Get search from archive: {search_id}')
search_archived = search.get_search_archive(search_id) # where all extracted data is stored and accessed
# print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")
if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
for video_result in search_archived.get('video_results', []):
if video_result.get('title') not in videos:
# can't contain emojies in the file name
title = video_result.get('title') \
.replace('|', '')\
.replace('/', '')\
.replace('?', '')\
.replace(':', '')\
.replace('<', '')\
.replace('>', '')\
.replace('\\','')\
.replace('', '') \
.replace('*', '')
absolute_title_path = pathlib.Path(title).absolute()
print(absolute_title_path)
videos.append({
'title': title,
'link': video_result.get('link'),
'file_path': f"{absolute_title_path}.mp4"
})
if len(videos) == NUMBER_OF_VIDEOS_TO_ANALYZE:
print(f'downloading {len(videos)} videos')
break
# increase amount of videos to parse
# if PAGINATION:
# if 'next' in search_archived.get('serpapi_pagination', {}):
# search.params_dict.update(dict(parse_qsl(urlsplit(search_archived['serpapi_pagination']['next']).query)))
# new_page_result = search.get_dict() # new results from updated (next) page
# search_queue.put(new_page_result) # add to queue results from updated (next) page
else:
# print(f'Requeue search: {search_id}')
search_queue.put(result)
parsing_is_success = st.success('Done parsing 🎉')
time.sleep(1)
if parsing_is_success:
parsing_is_success.empty()
with st.spinner(text='Downloading YouTube Videos...'):
download_info = st.info(body='Note: downloading speed depends on video length (the longer the video the more time it take to download) and your internet speed 📡')
# https://discuss.streamlit.io/t/stqdm-a-tqdm-like-progress-bar-for-streamlit/10097
for video in stqdm(videos):
youtube_downloader = YouTube(url=video['link'])
print(f'Downloading {video["link"]}')
try:
# download only audio from youtube video
youtube_downloader.streams \
.filter(only_audio=True) \
.first() \
.download(
output_path=VIDEOS_DOWNLOAD_PATH,
filename=video['file_path']
)
except exceptions.LiveStreamError:
print(f"Video {video['link']} is a livestream, couldn't download.")
pass
except exceptions.VideoUnavailable:
print(f'Video {video["link"]} unavailable, skipping.')
pass
downloading_videos_success = st.success('Done downloading 🎉')
time.sleep(5)
downloading_videos_success.empty()
download_info.empty()
submit_button_holder.empty()
# transcribe with whisper
transcript_data = []
if VIDEOS_DOWNLOAD_PATH.exists():
with st.spinner(text='Transcribing Videos...'):
transcript_note = st.info(body='Note: it may take some time to process all audio files, especially if video is 20+ minutes long.')
# iterate through video files to transcribe
model = whisper.load_model('base')
for video in videos:
transcribed_audio = model.transcribe(video["file_path"], fp16=False)
# generic check to check if transcibe text is present to do further tasks
# it could be [] or str: "... ... ... ... ...", if it's a song with no text
if transcribed_audio['text']:
for segment in transcribed_audio['segments']:
if TARGET_KEYWORD in segment['text'].lower():
transcript_data.append({
'video_title': video['title'],
'timestamp_url': f'{video["link"]}&t={math.floor(segment["start"])}', # <url>&t=488s
# 'timestamp': segment['start'],
'text': segment['text']
})
# print(segment)
# print(f'Found target keyword from {video["title"]}, timestamp: {segment["start"]}')
# print(f'Text: {segment["text"]}')
else: pass
transcipt_success = st.success('Done transcribing 🎉')
time.sleep(4)
transcript_note.empty()
transcipt_success.empty()
transcribed_results_info = st.markdown(body='#### Transcribed results')
TRANSCRIPT_TABLE = st.table(data=transcript_data)
time.sleep(3)
# start over
with col1:
pass
with col2:
pass
with col4:
pass
with col5:
pass
with col3:
start_over_button_holder = st.empty()
start_over_button = st.button(label='Start over') # centered button
if SAVE_OPTION == 'CSV' and transcript_data:
# save to CSV with Pandas
save_csv(df=transcript_data) # draw a download CSV anchor tag
start_over_info_holder = st.empty()
start_over_info_holder.error(body="To rerun the script, click on the 'Start over' button, or refresh the page.")
if start_over_button:
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])
TRANSCRIPT_TABLE.empty()
transcribed_results_info.empty()
start_over_button_holder.empty()
start_over_info_holder.empty()
if SAVE_OPTION == 'CSV' and not transcript_data:
TRANSCRIPT_TABLE.empty()
transcribed_results_info.empty()
no_data_holder = st.empty()
no_data_holder.error('No target keyword found. Click *Start Over* button and try different keyword.')
if start_over_button:
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])
no_data_holder.empty()
start_over_button_holder.empty()
if SAVE_OPTION is None and transcript_data:
start_over_info_holder = st.empty()
start_over_info_holder.error(body="To rerun the script, click on the 'Start over' button, or refresh the page.")
if start_over_button:
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])
TRANSCRIPT_TABLE.empty()
transcribed_results_info.empty()
start_over_button_holder.empty()
start_over_info_holder.empty()
if SAVE_OPTION is None and not transcript_data:
TRANSCRIPT_TABLE.empty()
transcribed_results_info.empty()
no_data_holder = st.empty()
no_data_holder.error('No target keyword found. Click *Start Over* button and try different keyword.')
if start_over_button:
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])
TRANSCRIPT_TABLE.empty()
no_data_holder.empty()
transcribed_results_info.empty()
start_over_button_holder.empty()
def save_csv(df):
# https://stackoverflow.com/a/68071190/15164646
csv_file = pd.DataFrame(data=df).to_csv(index=False)
b64_csv_file = base64.b64encode(csv_file.encode()).decode()
href = f'<a href="data:file/csv;base64,{b64_csv_file}" download="youtube-transcript.csv" >Download CSV</a>'
st.markdown(href, unsafe_allow_html=True)
if __name__ == '__main__':
main()
Code Explanation
After a bunch of imports, we need to load SerpApi API key:
load_dotenv(find_dotenv())
The dotenv
will load any globally available envs or from the .env
file. In our case, we have used export SERPAPI_API_KEY=<your-api-key>
to store API key in the current terminal session, meaning when the terminal will be closed, API key no longer be active.
Next, we define videos download path using pathlib
. This path will be used to temporarily store downloaded videos from YouTube that further be used in the transcript:
VIDEOS_DOWNLOAD_PATH = pathlib.Path(__file__).parent.resolve() / 'videos'
After that, we check if videos/
path exists or not:
if VIDEOS_DOWNLOAD_PATH.exists():
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}']) # remove videos on each new run
# create videos folder if not exist. Temporary store videos.
if not VIDEOS_DOWNLOAD_PATH.exists():
# remove folder and create new one afterwards.
# wihtout subprocess.run(['rm', '-r', 'videos/']) can't be created, saying that it already exists.
subprocess.run(['rm', '-r', 'videos/'])
subprocess.run(['mkdir', 'videos/'])
Next, we define a main
function where everything happens and right away I was modifying the default streamlit
footer (optional) with custom HTML that passed to st.markdown
, and added <h1>
title and description of the ap:
def main():
footer_modified = '''
<style>
#MainMenu {visibility: hidden;}
footer {visibility: hidden;}
.footer {
position: fixed;
left: 0;
bottom: 0;
width: 100%;
background-color: white;
color: black;
text-align: center;
font-size: 8px;
}
</style>
<div class="footer">
<p align="center" style="padding-top: 200px;">SerpApi Demo Project (<a href="https://github.com/dimitryzub/youtube-mention-tracker">repo</a>)<br>Made with <a href="https://streamlit.io/">Streamlit</a>, <a href="http://serpapi.com/">SerpApi</a>, <a href="https://github.com/pytube/pytube">PyTube</a>, <a href="https://github.com/openai/whisper">Whisper</a> 🧡</p>
</div>
'''
st.markdown(footer_modified, unsafe_allow_html=True)
st.title('📺YouTube Videos Mention Tracker')
st.markdown(body="This demo parses, downloads and transcribes YouTube videos to find target mention(s) inside the video(s). It's similar to [Mention](https://mention.com/en/) but for videos.")
Next is to define streamlit
session state. I've used this to hide or unhide certain widgets:
if 'visibility' not in st.session_state:
st.session_state.visibility = 'visible'
st.session_state.disabled = False
After that, I defined a few input fields, a slider, and a select box. Pagination left commented as it just takes too long to transcribe videos:
SEARCH_QUERY: str = st.text_input(label='Search query', placeholder='Data Nerd', help='Multiple search queries is not supported') # used to parsing youtube videos
TARGET_KEYWORD: str = st.text_input(label='Target keyword', placeholder='SQL', help='Multiple target keywords is not supported') # used for in the speech recognition
NUMBER_OF_VIDEOS_TO_ANALYZE: int = st.slider(label='Number of videos to analyze', min_value=1, max_value=20, help='By default, extracted videos will be no longer than 20 minutes. Its done by filtering YouTube results with [`sp`](https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp) URL parameter.')
SAVE_OPTION = st.selectbox(label='Choose file format to save', options=(None, 'CSV'), help='By default data wont be saved. Choose CSV format if you want to save the results to CSV file.')
# PAGINATION = st.checkbox(label='Enable pagination') # increase amount of videos to parse
Here I was creating a centered button (with kind of weird way I found on streamlit
Discourse forum)
# submit button
col1, col2, col3 , col4, col5 = st.columns(5)
with col1:
pass
with col2:
pass
with col4:
pass
with col5:
pass
with col3:
submit_button_holder = st.empty()
submit_search = submit_button_holder.button(label='Find mentions') # centered button
if submit_search and not SEARCH_QUERY:
st.error(body='Looks like you click a button without a search query. Please enter a search query 👆')
st.stop()
-
submit_button_holder
is used to hide or unhide widget. -
st.stop
is used to stop the script if user didn't provide any search query.
If the user entered a search query and hits "Find mentions" button, the following code executes:
if submit_search and SEARCH_QUERY and NUMBER_OF_VIDEOS_TO_ANALYZE:
search_queue = Queue()
params = {
'api_key': os.getenv('SERPAPI_API_KEY'), # https://serpapi.com/manage-api-key
'engine': 'youtube', # search engine
'sp': 'EgIYAw%253D%253D', # filter to control length of the video: https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp
'device': 'desktop', # device type
'search_query': SEARCH_QUERY, # search query
'async': True # async batch requests
}
search = YoutubeSearch(params) # where data extraction happens
results = search.get_dict() # JSON -> Python dict
search_queue.put(results)
videos = []
with st.spinner(text='Parsing YouTube Videos...'):
while not search_queue.empty():
result = search_queue.get()
search_id = result['search_metadata']['id']
# print(f'Get search from archive: {search_id}')
search_archived = search.get_search_archive(search_id) # where all extracted data is stored and accessed
# print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")
if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
for video_result in search_archived.get('video_results', []):
if video_result.get('title') not in videos:
# can't contain emojies in the file name
title = video_result.get('title') \
.replace('|', '')\
.replace('/', '')\
.replace('?', '')\
.replace(':', '')\
.replace('<', '')\
.replace('>', '')\
.replace('\\','')\
.replace('', '') \
.replace('*', '')
absolute_title_path = pathlib.Path(title).absolute()
videos.append({
'title': title,
'link': video_result.get('link'),
'file_path': f"{absolute_title_path}.mp4"
})
if len(videos) == NUMBER_OF_VIDEOS_TO_ANALYZE:
print(f'downloading {len(videos)} videos')
break
# increase amount of videos to parse
# if PAGINATION:
# if 'next' in search_archived.get('serpapi_pagination', {}):
# search.params_dict.update(dict(parse_qsl(urlsplit(search_archived['serpapi_pagination']['next']).query)))
# new_page_result = search.get_dict() # new results from updated (next) page
# search_queue.put(new_page_result) # add to queue results from updated (next) page
else:
# print(f'Requeue search: {search_id}')
search_queue.put(result)
parsing_is_success = st.success('Done parsing 🎉')
time.sleep(1)
Quite a few things happening. First of all, this is a SerpApi part. It extracts YouTube URLs for further usage. Additionally, this example shows SerpApi's async
approach instead of sync
, meaning each result are not being waited to be completed which improves parsing speed quite a lot. However, this only affects when the PAGINATION
option is enabled.
If you want to see a comparison, I've written a blog post specifically about it: SerpApi Async Requests with Pagination using Python. In short, I've record a 434% speed improvement when making requests with YouTube API. I believe speed could be improved further.
Here I defined a search_queue
for async
searches. Defined a params
with all the search parameters, and at the end put()
results to a search_queue
.
search_queue = Queue()
params = {
'api_key': os.getenv('SERPAPI_API_KEY'), # https://serpapi.com/manage-api-key
'engine': 'youtube', # search engine
'sp': 'EgIYAw%253D%253D', # filter to control length of the video: https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp
'device': 'desktop', # device type
'search_query': SEARCH_QUERY, # search query
'async': True # async batch requests
}
search = YoutubeSearch(params) # where data extraction happens
results = search.get_dict() # JSON -> Python dict
search_queue.put(results)
This portion of the code below does actual parsing. We're looping through a search_queue
until it's empty()
. Then check if the search status is either "Cached" or "Success", and extract the title and link + creating an absolute title path from the title.
A lot of replace
methods are used to remove forbidden characters for filenames. If the search status is not "Success" it will put
it to the search_queue
at the bottom to process it again:
videos = []
with st.spinner(text='Parsing YouTube Videos...'):
while not search_queue.empty():
result = search_queue.get() # access search result from the queue
search_id = result['search_metadata']['id']
# print(f'Get search from archive: {search_id}')
search_archived = search.get_search_archive(search_id) # where all extracted data is stored and accessed
# print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")
if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
for video_result in search_archived.get('video_results', []):
if video_result.get('title') not in videos:
# can't contain emojies in the file name
title = video_result.get('title') \
.replace('|', '')\
.replace('/', '')\
.replace('?', '')\
.replace(':', '')\
.replace('<', '')\
.replace('>', '')\
.replace('\\','')\
.replace('', '') \
.replace('*', '')
absolute_title_path = pathlib.Path(title).absolute()
videos.append({
'title': title,
'link': video_result.get('link'),
'file_path': f"{absolute_title_path}.mp4"
})
if len(videos) == NUMBER_OF_VIDEOS_TO_ANALYZE:
print(f'downloading {len(videos)} videos')
break
# increase amount of videos to parse
# if PAGINATION:
# if 'next' in search_archived.get('serpapi_pagination', {}):
# search.params_dict.update(dict(parse_qsl(urlsplit(search_archived['serpapi_pagination']['next']).query)))
# new_page_result = search.get_dict() # new results from updated (next) page
# search_queue.put(new_page_result) # add to queue results from updated (next) page
else:
# print(f'Requeue search: {search_id}')
search_queue.put(result)
parsing_is_success = st.success('Done parsing 🎉')
time.sleep(1)
After all the parsing is done, we need to download videos from returned URLs using pytube
. Here we only download audio with filter(only_audio=True)
option as this is what we only need for transcript and defining output_path
to create videos/
path with filename
that was done at the parsing stage, i.e absolute filename.
if parsing_is_success:
parsing_is_success.empty() # hides text
with st.spinner(text='Downloading YouTube Videos...'):
download_info = st.info(body='Note: downloading speed depends on video length (the longer the video the more time it take to download) and your internet speed 📡')
# https://discuss.streamlit.io/t/stqdm-a-tqdm-like-progress-bar-for-streamlit/10097
for video in stqdm(videos):
youtube_downloader = YouTube(url=video['link'])
print(f'Downloading {video["link"]}')
try:
# download only audio from youtube video
youtube_downloader.streams \
.filter(only_audio=True) \
.first() \
.download(
output_path=VIDEOS_DOWNLOAD_PATH,
filename=video['file_path']
)
except exceptions.LiveStreamError:
print(f"Video {video['link']} is a livestream, couldn't download.")
pass
except exceptions.VideoUnavailable:
print(f'Video {video["link"]} unavailable, skipping.')
pass
downloading_videos_success = st.success('Done downloading 🎉')
time.sleep(5)
# hiding all previously used widgets
downloading_videos_success.empty()
download_info.empty()
submit_button_holder.empty()
Next is the main part, audio transcription with whisper
. Here I'm checking for video path existence (and not handling exceptions in any way 👀), load_model()
for a transcript, and iterating over video file paths to get audio for the transcript.
If a transcribed text was found, I'm iterating over ['segments']
which contains all the needed info such as found text, and at what point it was found (really handy to generate YouTube timestamp URL):
transcript_data = []
if VIDEOS_DOWNLOAD_PATH.exists():
with st.spinner(text='Transcribing Videos...'):
transcript_note = st.info(body='Note: it may take some time to process all audio files, especially if video is 20+ minutes long.')
# iterate through video files to transcribe
model = whisper.load_model('base')
for video in videos:
transcribed_audio = model.transcribe(video["file_path"], fp16=False)
# generic check to check if transcibe text is present to do further tasks
# it could be [] or str: "... ... ... ... ...", if it's a song with no text
if transcribed_audio['text']:
for segment in transcribed_audio['segments']:
if TARGET_KEYWORD in segment['text'].lower():
transcript_data.append({
'video_title': video['title'],
'timestamp_url': f'{video["link"]}&t={math.floor(segment["start"])}', # <url>&t=488
# 'timestamp': segment['start'],
'text': segment['text']
})
else: pass # use better exception handling than me
transcipt_success = st.success('Done transcribing 🎉')
time.sleep(4)
# hiding widgets
transcript_note.empty()
transcipt_success.empty()
transcribed_results_info = st.markdown(body='#### Transcribed results')
TRANSCRIPT_TABLE = st.table(data=transcript_data) # rendering table with found transcript results
And almost the final chuck of the code is to reset results to start a new search. Here I'm creating a new button to start the search again. Button with label creation could be done with the custom created function, for example create_button(label: str, button_name, button_holder_name)
or something similar to reduce code duplication.
Then I'm checking SAVE_OPTION
s defined when a user submitted search. If it's CSV
, then I'm calling the addition function to render the download button with found data in the CSV. If nothing was found, an error appears.
When the start_over_button
gets hit, all video files in the videos/
directory get deleted for a new search:
# start over
with col1:
pass
with col2:
pass
with col4:
pass
with col5:
pass
with col3:
start_over_button_holder = st.empty()
start_over_button = st.button(label='Start over') # centered button
if SAVE_OPTION == 'CSV' and transcript_data:
# save to CSV with Pandas
save_csv(df=transcript_data) # draw a download CSV anchor tag
start_over_info_holder = st.empty()
start_over_info_holder.error(body="To rerun the script, click on the 'Start over' button, or refresh the page.")
if start_over_button:
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])
TRANSCRIPT_TABLE.empty()
transcribed_results_info.empty()
start_over_button_holder.empty()
start_over_info_holder.empty()
if SAVE_OPTION == 'CSV' and not transcript_data:
TRANSCRIPT_TABLE.empty()
transcribed_results_info.empty()
no_data_holder = st.empty()
no_data_holder.error('No target keyword found. Click *Start Over* button and try different keyword.')
if start_over_button:
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])
no_data_holder.empty()
start_over_button_holder.empty()
if SAVE_OPTION is None and transcript_data:
start_over_info_holder = st.empty()
start_over_info_holder.error(body="To rerun the script, click on the 'Start over' button, or refresh the page.")
if start_over_button:
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])
TRANSCRIPT_TABLE.empty()
transcribed_results_info.empty()
start_over_button_holder.empty()
start_over_info_holder.empty()
if SAVE_OPTION is None and not transcript_data:
TRANSCRIPT_TABLE.empty()
transcribed_results_info.empty()
no_data_holder = st.empty()
no_data_holder.error('No target keyword found. Click *Start Over* button and try different keyword.')
if start_over_button:
subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])
TRANSCRIPT_TABLE.empty()
no_data_holder.empty()
transcribed_results_info.empty()
start_over_button_holder.empty()
def save_csv(df):
# https://stackoverflow.com/a/68071190/15164646
csv_file = pd.DataFrame(data=df).to_csv(index=False)
b64_csv_file = base64.b64encode(csv_file.encode()).decode()
href = f'<a href="data:file/csv;base64,{b64_csv_file}" download="youtube-transcript.csv" >Download CSV</a>'
st.markdown(href, unsafe_allow_html=True)
In the end I've added a if __name__ == '__main__'
idiom which protects users from accidentally invoking the script when they didn't intend to, and call the main
function which will run the whole script:
if __name__ == '__main__':
main()
Top comments (0)