DEV Community

Dmitriy Zub ☀️ for SerpApi

Posted on • Originally published at serpapi.com

Making YouTube Mentions Tracker in Python

Intro

This blog post is created to show you how SerpApi was integrated into this small app with an explanation of how the app functionates and SerpApi's role in it. We'll go from top to bottom of the code shown below.

What about the demo itself, this demo project was inspired by a tool called Mention.

This project does kind of the same thing but for YouTube videos only. It transcribes videos (even if the video is without captions) and finds the target keyword and returns a URL with the timestamp.

The app doesn't scan/detect new videos all the time via cron job or something similar.

App Demo

Live demo:

Virtual Environment and Libraries Installation

If you want to follow along with your API key (instead of mine), follow these steps:

Clone repository:

$ git clone https://github.com/dimitryzub/youtube-mention-tracker.git
Enter fullscreen mode Exit fullscreen mode

Install dependencies:

$ cd youtube-mention-tracker && pip install -r requriements.txt
Enter fullscreen mode Exit fullscreen mode

Add SerpApi API key for the current shell and all processes started from the current shell:

# used to parse youtube videos, has a plan of 100 free searches
$ export SERPAPI_API_KEY=<your-api-key>
Enter fullscreen mode Exit fullscreen mode

Run the app (this will open streamlit in your browser):

$ cd youtube-tracker && streamlit run tracker.py
Enter fullscreen mode Exit fullscreen mode

Additionally, you might need to install FFmpeg. To reduce text duplication, have a look at how to Install FFmpeg on Windows, Mac, Linux Ubuntu, and Debian from VideoProc or any other resource on the internet that shows how to do it.

Full Code

from urllib.parse import (parse_qsl, urlsplit)
from serpapi import YoutubeSearch
from pytube import YouTube
import pytube.exceptions as exceptions
from queue import Queue
import streamlit as st
import whisper
import re, os, time
import subprocess
import pathlib
from stqdm import stqdm
import math, base64
import pandas as pd
from dotenv import load_dotenv, find_dotenv


load_dotenv(find_dotenv())

VIDEOS_DOWNLOAD_PATH = pathlib.Path(__file__).parent.resolve() / 'videos' 

if VIDEOS_DOWNLOAD_PATH.exists():  
    subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}']) # remove videos on each new run

# create videos folder if not exist. Temporary store videos.
if not VIDEOS_DOWNLOAD_PATH.exists():
    #TODO: change `videos/` to VIDEOS_DOWNLOAD_PATH or to subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}']) command  
    subprocess.run(['rm', '-r', 'videos/'])
    subprocess.run(['mkdir', 'videos/'])


def main():
    # TODO: add a blog post link to "SerpApi Demo Project"
    footer_modified = '''
            <style>
                #MainMenu {visibility: hidden;}
                footer {visibility: hidden;}
                .footer {
                    position: fixed;
                    left: 0;
                    bottom: 0;
                    width: 100%;
                    background-color: white;
                    color: black;
                    text-align: center;
                    font-size: 8px;
                }
            </style>
            <div class="footer">
                <p align="center" style="padding-top: 200px;">SerpApi Demo Project (<a href="https://github.com/dimitryzub/youtube-mention-tracker">repo</a>)<br>Made with <a href="https://streamlit.io/">Streamlit</a>, <a href="http://serpapi.com/">SerpApi</a>, <a href="https://github.com/pytube/pytube">PyTube</a>, <a href="https://github.com/openai/whisper">Whisper</a> 🧡</p>
            </div>
    '''
    st.markdown(footer_modified, unsafe_allow_html=True) 

    st.title('📺YouTube Videos Mention Tracker')
    st.markdown(body="This demo parses, downloads and transcribes YouTube videos to find target mention(s) inside the video(s). It's similar to [Mention](https://mention.com/en/) but for videos.")

    if 'visibility' not in st.session_state:
        st.session_state.visibility = 'visible'
        st.session_state.disabled = False

    SEARCH_QUERY: str = st.text_input(label='Search query', placeholder='Data Nerd', help='Multiple search queries is not supported')      # used to parsing youtube videos
    TARGET_KEYWORD: str = st.text_input(label='Target keyword', placeholder='SQL', help='Multiple target keywords is not supported')  # used for in the speech recognition
    NUMBER_OF_VIDEOS_TO_ANALYZE: int = st.slider(label='Number of videos to analyze', min_value=1, max_value=20, help='By default, extracted videos will be no longer than 20 minutes. Its done by filtering YouTube results with [`sp`](https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp) URL parameter.')
    SAVE_OPTION = st.selectbox(label='Choose file format to save', options=(None, 'CSV'), help='By default data wont be saved. Choose CSV format if you want to save the results to CSV file.')
    # PAGINATION = st.checkbox(label='Enable pagination') # increase amount of videos to parse

    # submit button
    col1, col2, col3 , col4, col5 = st.columns(5)
    with col1:
        pass
    with col2:
        pass
    with col4:
        pass
    with col5:
        pass
    with col3:
        submit_button_holder = st.empty()
        submit_search = submit_button_holder.button(label='Find mentions') # centered button

    if submit_search and not SEARCH_QUERY:
        st.error(body='Looks like you click a button without a search query. Please enter a search query 👆')
        st.stop()

    if submit_search and SEARCH_QUERY and NUMBER_OF_VIDEOS_TO_ANALYZE:
        search_queue = Queue()

        params = {
            'api_key': os.getenv('SERPAPI_API_KEY'),  # https://serpapi.com/manage-api-key
            'engine': 'youtube',                      # search engine
            'sp': 'EgIYAw%253D%253D',                 # filter to control length of the video: https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp
            'device': 'desktop',                      # device type
            'search_query': SEARCH_QUERY,             # search query
            'async': True                             # async batch requests
        }

        search = YoutubeSearch(params)                # where data extraction happens
        results = search.get_dict()                   # JSON -> Python dict

        search_queue.put(results)

        videos = []

        with st.spinner(text='Parsing YouTube Videos...'):
            while not search_queue.empty():
                result = search_queue.get()
                search_id = result['search_metadata']['id']

                # print(f'Get search from archive: {search_id}')
                search_archived = search.get_search_archive(search_id) # where all extracted data is stored and accessed

                # print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")

                if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
                    for video_result in search_archived.get('video_results', []):
                        if video_result.get('title') not in videos:
                            # can't contain emojies in the file name
                            title = video_result.get('title') \
                                .replace('|', '')\
                                .replace('/', '')\
                                .replace('?', '')\
                                .replace(':', '')\
                                .replace('<', '')\
                                .replace('>', '')\
                                .replace('\\','')\
                                .replace('', '') \
                                .replace('*', '')

                            absolute_title_path = pathlib.Path(title).absolute()
                            print(absolute_title_path)

                            videos.append({
                                'title': title,
                                'link': video_result.get('link'),
                                'file_path': f"{absolute_title_path}.mp4"
                            })

                        if len(videos) == NUMBER_OF_VIDEOS_TO_ANALYZE:
                            print(f'downloading {len(videos)} videos')
                            break

                    # increase amount of videos to parse
                    # if PAGINATION:    
                    #     if 'next' in search_archived.get('serpapi_pagination', {}):
                    #         search.params_dict.update(dict(parse_qsl(urlsplit(search_archived['serpapi_pagination']['next']).query)))

                    #         new_page_result = search.get_dict() # new results from updated (next) page
                    #         search_queue.put(new_page_result)   # add to queue results from updated (next) page
                else:
                    # print(f'Requeue search: {search_id}')
                    search_queue.put(result)

        parsing_is_success = st.success('Done parsing 🎉')
        time.sleep(1)

        if parsing_is_success:
            parsing_is_success.empty()

            with st.spinner(text='Downloading YouTube Videos...'):
                download_info = st.info(body='Note: downloading speed depends on video length (the longer the video the more time it take to download) and your internet speed 📡')

                # https://discuss.streamlit.io/t/stqdm-a-tqdm-like-progress-bar-for-streamlit/10097
                for video in stqdm(videos):
                    youtube_downloader = YouTube(url=video['link'])
                    print(f'Downloading {video["link"]}')

                    try: 
                        # download only audio from youtube video
                        youtube_downloader.streams \
                            .filter(only_audio=True) \
                            .first() \
                            .download(
                                output_path=VIDEOS_DOWNLOAD_PATH,
                                filename=video['file_path']
                                )
                    except exceptions.LiveStreamError: 
                        print(f"Video {video['link']} is a livestream, couldn't download.")
                        pass
                    except exceptions.VideoUnavailable:
                        print(f'Video {video["link"]} unavailable, skipping.')
                        pass

            downloading_videos_success = st.success('Done downloading 🎉')

            time.sleep(5)
            downloading_videos_success.empty()
            download_info.empty()
            submit_button_holder.empty()

        # transcribe with whisper 
        transcript_data = []

        if VIDEOS_DOWNLOAD_PATH.exists():
           with st.spinner(text='Transcribing Videos...'):
                transcript_note = st.info(body='Note: it may take some time to process all audio files, especially if video is 20+ minutes long.')

                # iterate through video files to transcribe
                model = whisper.load_model('base')

                for video in videos:
                    transcribed_audio = model.transcribe(video["file_path"], fp16=False)

                    # generic check to check if transcibe text is present to do further tasks
                    # it could be [] or str: "... ... ... ... ...", if it's a song with no text
                    if transcribed_audio['text']:
                        for segment in transcribed_audio['segments']:
                            if TARGET_KEYWORD in segment['text'].lower():
                                transcript_data.append({
                                    'video_title': video['title'],
                                    'timestamp_url': f'{video["link"]}&t={math.floor(segment["start"])}', # <url>&t=488s
                                    # 'timestamp': segment['start'],
                                    'text': segment['text']
                                })
                                # print(segment)
                                # print(f'Found target keyword from {video["title"]}, timestamp: {segment["start"]}')
                                # print(f'Text: {segment["text"]}')
                    else: pass

        transcipt_success = st.success('Done transcribing 🎉')

        time.sleep(4)
        transcript_note.empty()
        transcipt_success.empty()

        transcribed_results_info = st.markdown(body='#### Transcribed results')
        TRANSCRIPT_TABLE = st.table(data=transcript_data)

        time.sleep(3)

        # start over
        with col1:
            pass
        with col2:
            pass
        with col4:
            pass
        with col5:
            pass
        with col3:
            start_over_button_holder = st.empty()
            start_over_button = st.button(label='Start over') # centered button

        if SAVE_OPTION == 'CSV' and transcript_data: 
            # save to CSV with Pandas
            save_csv(df=transcript_data) # draw a download CSV anchor tag  

            start_over_info_holder = st.empty()
            start_over_info_holder.error(body="To rerun the script, click on the 'Start over' button, or refresh the page.")

            if start_over_button:  
                subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])

                TRANSCRIPT_TABLE.empty()
                transcribed_results_info.empty()
                start_over_button_holder.empty() 
                start_over_info_holder.empty()

        if SAVE_OPTION == 'CSV' and not transcript_data:
            TRANSCRIPT_TABLE.empty()
            transcribed_results_info.empty()

            no_data_holder = st.empty()
            no_data_holder.error('No target keyword found. Click *Start Over* button and try different keyword.')

            if start_over_button:  
                subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])

                no_data_holder.empty()
                start_over_button_holder.empty()

        if SAVE_OPTION is None and transcript_data:
            start_over_info_holder = st.empty()
            start_over_info_holder.error(body="To rerun the script, click on the 'Start over' button, or refresh the page.")

            if start_over_button:  
                subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])

                TRANSCRIPT_TABLE.empty()
                transcribed_results_info.empty()
                start_over_button_holder.empty() 
                start_over_info_holder.empty()

        if SAVE_OPTION is None and not transcript_data:
            TRANSCRIPT_TABLE.empty()
            transcribed_results_info.empty()

            no_data_holder = st.empty()
            no_data_holder.error('No target keyword found. Click *Start Over* button and try different keyword.')   

            if start_over_button:  
                subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])

                TRANSCRIPT_TABLE.empty()
                no_data_holder.empty()
                transcribed_results_info.empty()
                start_over_button_holder.empty()

def save_csv(df): 
    # https://stackoverflow.com/a/68071190/15164646
    csv_file = pd.DataFrame(data=df).to_csv(index=False)
    b64_csv_file = base64.b64encode(csv_file.encode()).decode()

    href = f'<a href="data:file/csv;base64,{b64_csv_file}" download="youtube-transcript.csv" >Download CSV</a>'
    st.markdown(href, unsafe_allow_html=True)


if __name__ == '__main__':
    main()
Enter fullscreen mode Exit fullscreen mode

Code Explanation

After a bunch of imports, we need to load SerpApi API key:

load_dotenv(find_dotenv())
Enter fullscreen mode Exit fullscreen mode

The dotenv will load any globally available envs or from the .env file. In our case, we have used export SERPAPI_API_KEY=<your-api-key> to store API key in the current terminal session, meaning when the terminal will be closed, API key no longer be active.

Next, we define videos download path using pathlib. This path will be used to temporarily store downloaded videos from YouTube that further be used in the transcript:

VIDEOS_DOWNLOAD_PATH = pathlib.Path(__file__).parent.resolve() / 'videos' 
Enter fullscreen mode Exit fullscreen mode

After that, we check if videos/ path exists or not:

if VIDEOS_DOWNLOAD_PATH.exists():  
    subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}']) # remove videos on each new run

# create videos folder if not exist. Temporary store videos.
if not VIDEOS_DOWNLOAD_PATH.exists():
    # remove folder and create new one afterwards.
    # wihtout subprocess.run(['rm', '-r', 'videos/']) can't be created, saying that it already exists. 
    subprocess.run(['rm', '-r', 'videos/'])
    subprocess.run(['mkdir', 'videos/'])
Enter fullscreen mode Exit fullscreen mode

Next, we define a main function where everything happens and right away I was modifying the default streamlit footer (optional) with custom HTML that passed to st.markdown, and added <h1> title and description of the ap:

def main():
    footer_modified = '''
            <style>
                #MainMenu {visibility: hidden;}
                footer {visibility: hidden;}
                .footer {
                    position: fixed;
                    left: 0;
                    bottom: 0;
                    width: 100%;
                    background-color: white;
                    color: black;
                    text-align: center;
                    font-size: 8px;
                }
            </style>
            <div class="footer">
                <p align="center" style="padding-top: 200px;">SerpApi Demo Project (<a href="https://github.com/dimitryzub/youtube-mention-tracker">repo</a>)<br>Made with <a href="https://streamlit.io/">Streamlit</a>, <a href="http://serpapi.com/">SerpApi</a>, <a href="https://github.com/pytube/pytube">PyTube</a>, <a href="https://github.com/openai/whisper">Whisper</a> 🧡</p>
            </div>
    '''
    st.markdown(footer_modified, unsafe_allow_html=True)

    st.title('📺YouTube Videos Mention Tracker')
    st.markdown(body="This demo parses, downloads and transcribes YouTube videos to find target mention(s) inside the video(s). It's similar to [Mention](https://mention.com/en/) but for videos.") 
Enter fullscreen mode Exit fullscreen mode

Next is to define streamlit session state. I've used this to hide or unhide certain widgets:

if 'visibility' not in st.session_state:
    st.session_state.visibility = 'visible'
    st.session_state.disabled = False
Enter fullscreen mode Exit fullscreen mode

After that, I defined a few input fields, a slider, and a select box. Pagination left commented as it just takes too long to transcribe videos:

SEARCH_QUERY: str = st.text_input(label='Search query', placeholder='Data Nerd', help='Multiple search queries is not supported')      # used to parsing youtube videos
TARGET_KEYWORD: str = st.text_input(label='Target keyword', placeholder='SQL', help='Multiple target keywords is not supported')  # used for in the speech recognition
NUMBER_OF_VIDEOS_TO_ANALYZE: int = st.slider(label='Number of videos to analyze', min_value=1, max_value=20, help='By default, extracted videos will be no longer than 20 minutes. Its done by filtering YouTube results with [`sp`](https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp) URL parameter.')
SAVE_OPTION = st.selectbox(label='Choose file format to save', options=(None, 'CSV'), help='By default data wont be saved. Choose CSV format if you want to save the results to CSV file.')
# PAGINATION = st.checkbox(label='Enable pagination') # increase amount of videos to parse
Enter fullscreen mode Exit fullscreen mode

Here I was creating a centered button (with kind of weird way I found on streamlit Discourse forum)

 # submit button
col1, col2, col3 , col4, col5 = st.columns(5)
with col1:
    pass
with col2:
    pass
with col4:
    pass
with col5:
    pass
with col3:
    submit_button_holder = st.empty()
    submit_search = submit_button_holder.button(label='Find mentions') # centered button

if submit_search and not SEARCH_QUERY:
    st.error(body='Looks like you click a button without a search query. Please enter a search query 👆')
    st.stop()
Enter fullscreen mode Exit fullscreen mode
  • submit_button_holder is used to hide or unhide widget.
  • st.stop is used to stop the script if user didn't provide any search query.

If the user entered a search query and hits "Find mentions" button, the following code executes:

if submit_search and SEARCH_QUERY and NUMBER_OF_VIDEOS_TO_ANALYZE:
    search_queue = Queue()

    params = {
        'api_key': os.getenv('SERPAPI_API_KEY'),  # https://serpapi.com/manage-api-key
        'engine': 'youtube',                      # search engine
        'sp': 'EgIYAw%253D%253D',                 # filter to control length of the video: https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp
        'device': 'desktop',                      # device type
        'search_query': SEARCH_QUERY,             # search query
        'async': True                             # async batch requests
    }

    search = YoutubeSearch(params)                # where data extraction happens
    results = search.get_dict()                   # JSON -> Python dict

    search_queue.put(results)

    videos = []

    with st.spinner(text='Parsing YouTube Videos...'):
        while not search_queue.empty():
            result = search_queue.get()
            search_id = result['search_metadata']['id']

            # print(f'Get search from archive: {search_id}')
            search_archived = search.get_search_archive(search_id) # where all extracted data is stored and accessed

            # print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")

            if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
                for video_result in search_archived.get('video_results', []):
                    if video_result.get('title') not in videos:
                        # can't contain emojies in the file name
                        title = video_result.get('title') \
                            .replace('|', '')\
                            .replace('/', '')\
                            .replace('?', '')\
                            .replace(':', '')\
                            .replace('<', '')\
                            .replace('>', '')\
                            .replace('\\','')\
                            .replace('', '') \
                            .replace('*', '')

                        absolute_title_path = pathlib.Path(title).absolute()

                        videos.append({
                            'title': title,
                            'link': video_result.get('link'),
                            'file_path': f"{absolute_title_path}.mp4"
                        })

                    if len(videos) == NUMBER_OF_VIDEOS_TO_ANALYZE:
                        print(f'downloading {len(videos)} videos')
                        break

                # increase amount of videos to parse
                # if PAGINATION:    
                #     if 'next' in search_archived.get('serpapi_pagination', {}):
                #         search.params_dict.update(dict(parse_qsl(urlsplit(search_archived['serpapi_pagination']['next']).query)))

                #         new_page_result = search.get_dict() # new results from updated (next) page
                #         search_queue.put(new_page_result)   # add to queue results from updated (next) page
            else:
                # print(f'Requeue search: {search_id}')
                search_queue.put(result)

    parsing_is_success = st.success('Done parsing 🎉')
    time.sleep(1)
Enter fullscreen mode Exit fullscreen mode

Quite a few things happening. First of all, this is a SerpApi part. It extracts YouTube URLs for further usage. Additionally, this example shows SerpApi's async approach instead of sync, meaning each result are not being waited to be completed which improves parsing speed quite a lot. However, this only affects when the PAGINATION option is enabled.

If you want to see a comparison, I've written a blog post specifically about it: SerpApi Async Requests with Pagination using Python. In short, I've record a 434% speed improvement when making requests with YouTube API. I believe speed could be improved further.

Here I defined a search_queue for async searches. Defined a params with all the search parameters, and at the end put() results to a search_queue.

search_queue = Queue()

params = {
    'api_key': os.getenv('SERPAPI_API_KEY'),  # https://serpapi.com/manage-api-key
    'engine': 'youtube',                      # search engine
    'sp': 'EgIYAw%253D%253D',                 # filter to control length of the video: https://serpapi.com/youtube-search-api#api-parameters-advanced-youtube-parameters-sp
    'device': 'desktop',                      # device type
    'search_query': SEARCH_QUERY,             # search query
    'async': True                             # async batch requests
}

search = YoutubeSearch(params)                # where data extraction happens
results = search.get_dict()                   # JSON -> Python dict

search_queue.put(results)
Enter fullscreen mode Exit fullscreen mode

This portion of the code below does actual parsing. We're looping through a search_queue until it's empty(). Then check if the search status is either "Cached" or "Success", and extract the title and link + creating an absolute title path from the title.

A lot of replace methods are used to remove forbidden characters for filenames. If the search status is not "Success" it will put it to the search_queue at the bottom to process it again:

videos = []

with st.spinner(text='Parsing YouTube Videos...'):
    while not search_queue.empty():
        result = search_queue.get() # access search result from the queue
        search_id = result['search_metadata']['id']

        # print(f'Get search from archive: {search_id}')
        search_archived = search.get_search_archive(search_id) # where all extracted data is stored and accessed
        # print(f"Search ID: {search_id}, Status: {search_archived['search_metadata']['status']}")

        if re.search(r'Cached|Success', search_archived['search_metadata']['status']):
            for video_result in search_archived.get('video_results', []):
                if video_result.get('title') not in videos:
                    # can't contain emojies in the file name
                    title = video_result.get('title') \
                        .replace('|', '')\
                        .replace('/', '')\
                        .replace('?', '')\
                        .replace(':', '')\
                        .replace('<', '')\
                        .replace('>', '')\
                        .replace('\\','')\
                        .replace('', '') \
                        .replace('*', '')

                    absolute_title_path = pathlib.Path(title).absolute()

                    videos.append({
                        'title': title,
                        'link': video_result.get('link'),
                        'file_path': f"{absolute_title_path}.mp4"
                    })

                if len(videos) == NUMBER_OF_VIDEOS_TO_ANALYZE:
                    print(f'downloading {len(videos)} videos')
                    break

            # increase amount of videos to parse
            # if PAGINATION:    
            #     if 'next' in search_archived.get('serpapi_pagination', {}):
            #         search.params_dict.update(dict(parse_qsl(urlsplit(search_archived['serpapi_pagination']['next']).query)))

            #         new_page_result = search.get_dict() # new results from updated (next) page
            #         search_queue.put(new_page_result)   # add to queue results from updated (next) page
        else:
            # print(f'Requeue search: {search_id}')
            search_queue.put(result)

parsing_is_success = st.success('Done parsing 🎉')
time.sleep(1)
Enter fullscreen mode Exit fullscreen mode

After all the parsing is done, we need to download videos from returned URLs using pytube. Here we only download audio with filter(only_audio=True) option as this is what we only need for transcript and defining output_path to create videos/ path with filename that was done at the parsing stage, i.e absolute filename.

if parsing_is_success:
    parsing_is_success.empty() # hides text

    with st.spinner(text='Downloading YouTube Videos...'):
        download_info = st.info(body='Note: downloading speed depends on video length (the longer the video the more time it take to download) and your internet speed 📡')

        # https://discuss.streamlit.io/t/stqdm-a-tqdm-like-progress-bar-for-streamlit/10097
        for video in stqdm(videos):
            youtube_downloader = YouTube(url=video['link'])
            print(f'Downloading {video["link"]}')

            try: 
                # download only audio from youtube video
                youtube_downloader.streams \
                    .filter(only_audio=True) \
                    .first() \
                    .download(
                        output_path=VIDEOS_DOWNLOAD_PATH,
                        filename=video['file_path']
                        )
            except exceptions.LiveStreamError: 
                print(f"Video {video['link']} is a livestream, couldn't download.")
                pass
            except exceptions.VideoUnavailable:
                print(f'Video {video["link"]} unavailable, skipping.')
                pass

    downloading_videos_success = st.success('Done downloading 🎉')

    time.sleep(5)
    # hiding all previously used widgets
    downloading_videos_success.empty()
    download_info.empty()
    submit_button_holder.empty()
Enter fullscreen mode Exit fullscreen mode

Next is the main part, audio transcription with whisper. Here I'm checking for video path existence (and not handling exceptions in any way 👀), load_model() for a transcript, and iterating over video file paths to get audio for the transcript.

If a transcribed text was found, I'm iterating over ['segments'] which contains all the needed info such as found text, and at what point it was found (really handy to generate YouTube timestamp URL):

transcript_data = []

if VIDEOS_DOWNLOAD_PATH.exists():
    with st.spinner(text='Transcribing Videos...'):
        transcript_note = st.info(body='Note: it may take some time to process all audio files, especially if video is 20+ minutes long.')

        # iterate through video files to transcribe
        model = whisper.load_model('base')

        for video in videos:
            transcribed_audio = model.transcribe(video["file_path"], fp16=False)

            # generic check to check if transcibe text is present to do further tasks
            # it could be [] or str: "... ... ... ... ...", if it's a song with no text
            if transcribed_audio['text']:
                for segment in transcribed_audio['segments']:
                    if TARGET_KEYWORD in segment['text'].lower():
                        transcript_data.append({
                            'video_title': video['title'],
                            'timestamp_url': f'{video["link"]}&t={math.floor(segment["start"])}', # <url>&t=488
                            # 'timestamp': segment['start'],
                            'text': segment['text']
                        })
            else: pass # use better exception handling than me

transcipt_success = st.success('Done transcribing 🎉')

time.sleep(4)
# hiding widgets
transcript_note.empty()
transcipt_success.empty()

transcribed_results_info = st.markdown(body='#### Transcribed results')
TRANSCRIPT_TABLE = st.table(data=transcript_data) # rendering table with found transcript results
Enter fullscreen mode Exit fullscreen mode

And almost the final chuck of the code is to reset results to start a new search. Here I'm creating a new button to start the search again. Button with label creation could be done with the custom created function, for example create_button(label: str, button_name, button_holder_name) or something similar to reduce code duplication.

Then I'm checking SAVE_OPTIONs defined when a user submitted search. If it's CSV, then I'm calling the addition function to render the download button with found data in the CSV. If nothing was found, an error appears.

When the start_over_button gets hit, all video files in the videos/ directory get deleted for a new search:

 # start over
with col1:
    pass
with col2:
    pass
with col4:
    pass
with col5:
    pass
with col3:
    start_over_button_holder = st.empty()
    start_over_button = st.button(label='Start over') # centered button

if SAVE_OPTION == 'CSV' and transcript_data: 
    # save to CSV with Pandas
    save_csv(df=transcript_data) # draw a download CSV anchor tag  

    start_over_info_holder = st.empty()
    start_over_info_holder.error(body="To rerun the script, click on the 'Start over' button, or refresh the page.")

    if start_over_button:  
        subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])

        TRANSCRIPT_TABLE.empty()
        transcribed_results_info.empty()
        start_over_button_holder.empty() 
        start_over_info_holder.empty()

if SAVE_OPTION == 'CSV' and not transcript_data:
    TRANSCRIPT_TABLE.empty()
    transcribed_results_info.empty()

    no_data_holder = st.empty()
    no_data_holder.error('No target keyword found. Click *Start Over* button and try different keyword.')

    if start_over_button:  
        subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])

        no_data_holder.empty()
        start_over_button_holder.empty()

if SAVE_OPTION is None and transcript_data:
    start_over_info_holder = st.empty()
    start_over_info_holder.error(body="To rerun the script, click on the 'Start over' button, or refresh the page.")

    if start_over_button:  
        subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])

        TRANSCRIPT_TABLE.empty()
        transcribed_results_info.empty()
        start_over_button_holder.empty() 
        start_over_info_holder.empty()

if SAVE_OPTION is None and not transcript_data:
    TRANSCRIPT_TABLE.empty()
    transcribed_results_info.empty()

    no_data_holder = st.empty()
    no_data_holder.error('No target keyword found. Click *Start Over* button and try different keyword.')   

    if start_over_button:  
        subprocess.run(['rm', '-rf', f'{VIDEOS_DOWNLOAD_PATH}'])

        TRANSCRIPT_TABLE.empty()
        no_data_holder.empty()
        transcribed_results_info.empty()
        start_over_button_holder.empty()

def save_csv(df): 
    # https://stackoverflow.com/a/68071190/15164646
    csv_file = pd.DataFrame(data=df).to_csv(index=False)
    b64_csv_file = base64.b64encode(csv_file.encode()).decode()

    href = f'<a href="data:file/csv;base64,{b64_csv_file}" download="youtube-transcript.csv" >Download CSV</a>'
    st.markdown(href, unsafe_allow_html=True)
Enter fullscreen mode Exit fullscreen mode

In the end I've added a if __name__ == '__main__' idiom which protects users from accidentally invoking the script when they didn't intend to, and call the main function which will run the whole script:

if __name__ == '__main__':
    main()
Enter fullscreen mode Exit fullscreen mode

Top comments (0)