DEV Community

Choon-Siang Lai
Choon-Siang Lai

Posted on • Originally published at kitfucoda.Medium on

Video data IO through ffmpeg subprocess

As I restarted my job search (yes, I am still #OpenToWork, ping me!), in one of the job applications, I was asked to implement a prototype that processes video data. While working through the project, I unexpectedly got a lot of help from generative AI chatbots due to my relatively inexperience in the area.

As mentioned in the title, ffmpeg was used to perform some preprocessing work. One of the goals of the project was to be able to play multiple video files one after another. While there are multiple ways to achieve that, I decided to go with the most obvious solution, concatenating them together.

$ cat video1 video2 video3 | python further-work.py
Enter fullscreen mode Exit fullscreen mode

In order to achieve that, I first had to re-encode the files into formats that would allow it. After “discussing” with Google Gemini on this, the chatbot recommended I go with MPEG-TS for the purpose.

MPEG Transport Stream (MPEG-TS) works by encapsulating packetized elementary streams. These streams include audio, video, and PSIP data, which are packetized into small segments. Each stream is chopped into 188-byte sections and interleaved together. This process ensures less latency and greater error resilience, making it ideal for videoconferencing where large frames may introduce audio delay.

Quoted from https://castr.com/blog/mpeg-transport-stream-mpeg-ts/

There are other file formats that could be used for the purpose, but they are irrelevant to the discussion. After I get the video re-encoded in to this format, the video data would be sent to a queue, to be consumed by other modules, running in other processes.

After defining both input (a list of video files to be fetched online) and output (re-encoded video file content), it was time to figure out how to do it. Unfortunately, ffmpeg is such a complicated utility that does so many things. There are/were multiple attempts to provide some interface to help users with it (I really wanted to try this, but it is dead now, apparently). However, with how helpful generative AI is these days, getting the right command is just a few prompts away.

ffmpeg -hwaccel cuda -i pipe:0 -c:v h264_nvenc -b:v 1.5M -c:a aac -b:a 128k -f mpegts -y pipe:1
Enter fullscreen mode Exit fullscreen mode

It even gave an explanation on what each of those argument means, as shown in the screenshot below.


Gemini’s attempt at explaining the ffmpeg command

In short, the command accepts video file content through stdin, and outputs the re-encoded video file content as stdout.

Now time to code the implementation, as I wanted to both read from and write to ffmpeg concurrently, so this is going to be an asyncio application. The http client library we are using this time is httpx, which has a method to fetch download in smaller batches:

import httpx

client = httpx.AsyncClient()

async def write_input(
    client: httpx.AsyncClient, video_link: str, process: asyncio.subprocess.Process
) -> None:
    async with client.stream("GET", video_link) as response:
        async for chunk in response.aiter_raw(1024):
            print(chunk) # this is the downloaded video file, in chunks
Enter fullscreen mode Exit fullscreen mode

We worry about the actual processing later, for now we would just get the code to print the chunks to the screen.

Next we write a function to call ffmpeg, through asyncio.create_subprocess_exec

async def video_send(client: httpx.AsyncClient, video_link: str) -> None:
    logger.info("DATA: Fetching video from link", link=video_link)
    process = await asyncio.create_subprocess_exec(
        "ffmpeg",
        "-hwaccel",
        "cuda",
        "-i",
        "pipe:0",
        "-c:v",
        "h264_nvenc",
        # "libx264",
        "-b:v",
        "1.5M",
        "-c:a",
        "aac",
        "-b:a",
        "128k",
        "-f",
        "mpegts",
        "-y",
        "pipe:1",
        stdin=subprocess.PIPE,
        stdout=subprocess.PIPE,
    )

    asyncio.create_task(write_input(client, video_link, process))
Enter fullscreen mode Exit fullscreen mode

Ideally, we would use process.communicate(file_content) here as advised in the documentation, unfortunately if we did that, we would have to first download the whole file, which would inevitably delay the response, which was not ideal.

Instead, we could use process.stdin.write(), let’s update the original write_input function:

async def write_input(
    client: httpx.AsyncClient, video_link: str, process: asyncio.subprocess.Process
) -> None:
    assert isinstance(process.stdin, asyncio.StreamWriter)

    async with client.stream("GET", video_link) as response:
        logger.info("DATA: Streaming video to queue", link=video_link)
        async for chunk in response.aiter_raw(1024):
            process.stdin.write(chunk)
            await process.stdin.drain()

        if process.stdin.can_write_eof():
            process.stdin.write_eof()

        process.stdin.close()
        await process.stdin.wait_closed()

    logger.info("DATA: Done downloading video to ffmpeg")
Enter fullscreen mode Exit fullscreen mode

With every downloaded chunk,

  1. we feed it to the process through process.stdin.write(chunk).
  2. Once done, we would write an EOF (process.stdin.write_eof()) to denote the end of file input,
  3. followed by a .close() (and corresponding await .wait_closed())

Back to video_send function, we continue the function by reading through process.stdout. Being able to do both reading and writing is exactly why we are doing this through asyncio. Previously in synchronous setting, we could only do one after another in a fixed order, but now we could let the scheduler worry about the order. Now the function has the following code added for reading the re-encoded file content, and post it to the queue:

async def video_send(queue: Queue, client: httpx.AsyncClient, video_link: str) -> None:
    ...

    assert isinstance(process.stdout, asyncio.StreamReader)

    while True:
        chunk = await process.stdout.read(1024)

        if not chunk:
            break
        else:
            await asyncio.to_thread(partial(queue.put, chunk))

    await process.wait()
    logger.info("DATA: Done sending video to queue")
Enter fullscreen mode Exit fullscreen mode

In a loop, we

  1. Fetch a chunk of data from ffmpeg stdout
  2. If chunk is an empty string, break from the loop
  3. Otherwise, push the chunk to the queue (through asyncio.to_thread, as we are using process-safe version here)
  4. Then we wait for the command to exit gracefully, through process.wait()

It seems very straightforward now, but it took me the whole night to actually get this done correctly (and I was still revising the code while writing this). Half the time I was checking through the documentation to ensure I wasn’t missing anything, other time I would be getting Gemini to review my code.

Hopefully you find this useful, and that’s it for today, hopefully we will get back to the previously promised Advent of Code content next week.

Top comments (0)