DEV Community

Jamesb
Jamesb

Posted on • Updated on

Managing Long-Running LLM Data Processing Pipelines

Looking for a simple abstraction to help you run and debug your LLM data processing pipeline without losing your mind? Look no further!

While working on Open Recommender, my open source, LLM-powered recommendation system for YouTube videos, I quickly ran into issues due to the incredibly slow development loop.

Data Pipeline Image
The data pipeline so far. Read more about the project here or watch my video

A full run of the pipeline takes up to 40 minutes to complete for a single user due to the large number of GPT-4 calls involved. When I encountered crashes and bugs I would have to re-run the entire pipeline from scratch without any guarantee of reproducing the error due to the non-determinism inherent in LLM applications. As you can imagine, this became incredibly tiring and annoying.

Me when it crashes
me.png

The Solution

The solution I came up with was to add individual parts of the data processing chain into a Pipeline class and use this to automatically save the inputs and outputs of each stage while the pipeline is running. This all gets saved to disk in run-id.json files. If a pipeline crashes, I can look at the error, add debugger statements or logging and restore the failed run from a checkpoint, allowing me to immediately debug and fix the failure without running the entire pipeline again from scratch.

How it Works

const pipeline = new Pipeline(opts)
  .addStage(validateArgs)
  .addStage(getTweets)
  .addStage(createQueries)
  .addStage(searchForVideos)
  .addStage(filterSearchResults)
  .addStage(downloadTranscripts)
  .addStage(appraiseTranscripts)
  .addStage(chunkTranscripts);
const recommendations = await pipeline.execute();
Enter fullscreen mode Exit fullscreen mode

simplified from src/pipeline/main.ts

Each stage in the pipeline is just a function with a name and description that takes the previous stage's output as input.

export const getTweets = {
  name: "get-tweets",
  description: "Get tweets from Twitter user",
  run: async function (
    args: GetTweetsStageArgs
  ): Promise<Success<CreateQueriesArgs> | Failure> {
    const { user } = args;
    const tweets = await twitter.tweets.fetch({
      user,
      n_tweets: 30,
    });
    if (!tweets.length) {
      return failure("No tweets found");
    } else {
      return success({ ...args, tweets });
    }
  },
};
Enter fullscreen mode Exit fullscreen mode

The getTweets stage. Simplified from src/pipeline/stages

Since errors can occur at each stage in the pipeline, each PipelineFunction is modelled as function which can either succeed with a value of type T or fail.

export type Success<T> = {
  success: true;
  result: T;
};

export type Failure = {
  success: false;
  result: any;
};

export type PipelineFunction<T, U> = (
  input: T
) => Promise<Success<U> | Failure>;

export type PipelineStage<T, U> = {
  name: string;
  description: string;
  run: PipelineFunction<T, U>;
};
Enter fullscreen mode Exit fullscreen mode

Simplified from src/pipeline/stages

When I execute the pipeline, the Pipeline class iterates over each of the stages, executing them in turn and passing the result of the prior stage as input to the next. All intermediate results are saved into a run-id.json file so checkpoints can be restored later if required.

private saveStage(
  stage: PipelineStage<any, any>,
  result: Success<any> | Failure
) {
  const run = getRunById(this.initialValue.runId)
  run.stages.push({
    name: stage.name,
    result,
  });
  saveRun(run);
}
Enter fullscreen mode Exit fullscreen mode

Now when I encounter unexpected errors, I can add any debugger and logging statements required to understand the issue and re-run the pipeline from the beginning of the failed stage.

yarn main --cloneRunID="<run-id>" --stage="<name>"
Enter fullscreen mode Exit fullscreen mode

This has improved the developer experience 10,000x compared to how it was before!

Improvements

Here are some improvements I'll consider making to the pipeline in the future, especially once I get it running in production.

  • Create different base classes for different kinds of errors. Eg. RetryableError for errors that the pipeline can recover from.
  • It's common for parts of a stage to be able to proceed without waiting around for the rest of the stage to complete. Forcing stages to run synchronously means the pipeline runs slower than it could.
    • Maybe it's possible to support more concurrency but still have checkpoints to save the results of a complete stage.
  • I should probably break up some of the stages a little bit more, eg. the chunkTranscripts stage is remarkably slow and if it crashes in the middle it can still take 10 mins to re-run.

Top comments (2)

Collapse
 
fk profile image
Fırat Küçük

Hey James, some images are blank.

Collapse
 
experilearning profile image
Jamesb

How strange, they all seem to be showing up for me...