Looking for a simple abstraction to help you run and debug your LLM data processing pipeline without losing your mind? Look no further!
While working on Open Recommender, my open source, LLM-powered recommendation system for YouTube videos, I quickly ran into issues due to the incredibly slow development loop.
The data pipeline so far. Read more about the project here or watch my video
A full run of the pipeline takes up to 40 minutes to complete for a single user due to the large number of GPT-4 calls involved. When I encountered crashes and bugs I would have to re-run the entire pipeline from scratch without any guarantee of reproducing the error due to the non-determinism inherent in LLM applications. As you can imagine, this became incredibly tiring and annoying.
The Solution
The solution I came up with was to add individual parts of the data processing chain into a Pipeline
class and use this to automatically save the inputs and outputs of each stage while the pipeline is running. This all gets saved to disk in run-id.json
files. If a pipeline crashes, I can look at the error, add debugger statements or logging and restore the failed run from a checkpoint, allowing me to immediately debug and fix the failure without running the entire pipeline again from scratch.
How it Works
const pipeline = new Pipeline(opts)
.addStage(validateArgs)
.addStage(getTweets)
.addStage(createQueries)
.addStage(searchForVideos)
.addStage(filterSearchResults)
.addStage(downloadTranscripts)
.addStage(appraiseTranscripts)
.addStage(chunkTranscripts);
const recommendations = await pipeline.execute();
simplified from src/pipeline/main.ts
Each stage in the pipeline is just a function with a name and description that takes the previous stage's output as input.
export const getTweets = {
name: "get-tweets",
description: "Get tweets from Twitter user",
run: async function (
args: GetTweetsStageArgs
): Promise<Success<CreateQueriesArgs> | Failure> {
const { user } = args;
const tweets = await twitter.tweets.fetch({
user,
n_tweets: 30,
});
if (!tweets.length) {
return failure("No tweets found");
} else {
return success({ ...args, tweets });
}
},
};
The getTweets
stage. Simplified from src/pipeline/stages
Since errors can occur at each stage in the pipeline, each PipelineFunction is modelled as function which can either succeed with a value of type T
or fail.
export type Success<T> = {
success: true;
result: T;
};
export type Failure = {
success: false;
result: any;
};
export type PipelineFunction<T, U> = (
input: T
) => Promise<Success<U> | Failure>;
export type PipelineStage<T, U> = {
name: string;
description: string;
run: PipelineFunction<T, U>;
};
Simplified from src/pipeline/stages
When I execute the pipeline, the Pipeline
class iterates over each of the stages, executing them in turn and passing the result of the prior stage as input to the next. All intermediate results are saved into a run-id.json
file so checkpoints can be restored later if required.
private saveStage(
stage: PipelineStage<any, any>,
result: Success<any> | Failure
) {
const run = getRunById(this.initialValue.runId)
run.stages.push({
name: stage.name,
result,
});
saveRun(run);
}
Now when I encounter unexpected errors, I can add any debugger and logging statements required to understand the issue and re-run the pipeline from the beginning of the failed stage.
yarn main --cloneRunID="<run-id>" --stage="<name>"
This has improved the developer experience 10,000x compared to how it was before!
Improvements
Here are some improvements I'll consider making to the pipeline in the future, especially once I get it running in production.
- Create different base classes for different kinds of errors. Eg.
RetryableError
for errors that the pipeline can recover from. - It's common for parts of a stage to be able to proceed without waiting around for the rest of the stage to complete. Forcing stages to run synchronously means the pipeline runs slower than it could.
- Maybe it's possible to support more concurrency but still have checkpoints to save the results of a complete stage.
- I should probably break up some of the stages a little bit more, eg. the
chunkTranscripts
stage is remarkably slow and if it crashes in the middle it can still take 10 mins to re-run.
Top comments (2)
Hey James, some images are blank.
How strange, they all seem to be showing up for me...