DEV Community

Florian Polster
Florian Polster

Posted on

Long-running jobs in Temporal.io

TL;DR

A long-running job should be implemented as one long-running activity that
uses heartbeat for resumption.

What I needed to do

There was a data-sync backfilling to be done where we backfill data from one
service's database into another one. It was about 5.5M DB entities that needed
to be copied, so we expected the total running time to be some number of hours.

We wanted to write a job with some kind of progress record, i.e. a "cursor".
Such that, should the job fail at any point, it could, on retry, continue where
it left off.

First instinct: Parameterized activity

After reading the Temporal docs, my first instinct was to implement one workflow
with one parameterized activity which the workflow would call in a loop.

This idea was dismissed after talking to my colleagues. Because each activity
invocation results in events being added to the Temporal Events. Temporal works
under the assumption that there will be few events per workflow, not tens of
thousands. We didn't want to risk exhausting Temporal's disk space.

The good solution: One long-running Activity with Heartbeats

I'll just show you some code here.

At the beginning of the Activity we use GetHeartbeatDetails to get the cursor
that was recorded by a previous, unsuccessful activity execution. After each
batch we record the cursor via RecordHeartbeat.

You'll want to do some info logging throughout the activity execution. Errors
don't need to be logged, just return them to the runtime, it will surface them.

As you can see, the batch size is configurable. We ran it with a size of 100 and
that worked well.

package workflows

import (
    "context"
    "fmt"

    "go.temporal.io/sdk/activity"
)

type BackfillingActivities struct {
    BatchSize int
}

type Heartbeat struct {
    Cursor string
}

func (a *BackfillingActivities) CommentsBackfillingActivity(ctx context.Context) error {
    cursor := ""
    if activity.HasHeartbeatDetails(ctx) {
        var heartbeat Heartbeat
        if err := activity.GetHeartbeatDetails(ctx, &heartbeat); err != nil {
            return fmt.Errorf("error getting heartbeat: %w", err)
        }
        cursor = heartbeat.Cursor
    }

    var err error
    for {
        cursor, err = a.processBatch(ctx, cursor, a.BatchSize)
        if err != nil {
            return err
        }
        if cursor == "" {
            break
        } else {
            activity.RecordHeartbeat(ctx, Heartbeat{cursor})
        }
    }
    return nil
}

// processBatch processes one batch starting from cursor and returns the next cursor or an error.
func (a BackfillingActivities) processBatch(ctx context.Context, cursor string, batchSize int) (string, error) {
    logger := activity.GetLogger(ctx)
    logger.Info(fmt.Sprintf("Starting processing batch with cursor \"%s\"", cursor)

    //-> batch := SELECT FROM WHERE primary_key > {cursor} ORDER BY primary_key ASC LIMIT {batchSize}

    logger.Info(fmt.Sprintf("Found %d items", len(batch))

    //-> Transform into target data structure

    //-> Load into target DB

    if len(batch) < batchSize {
        return "", nil
    }
    lastItem := batch[len(batch)-1]
    return lastItem.Id, nil
}
Enter fullscreen mode Exit fullscreen mode

Image of Quadratic

AI, code, and data connections in a familiar spreadsheet UI

Simplify data analysis by connecting directly to your database or API, writing code, and using the latest LLMs.

Try Quadratic free

Top comments (0)

Sentry image

See why 4M developers consider Sentry, “not bad.”

Fixing code doesn’t have to be the worst part of your day. Learn how Sentry can help.

Learn more

👋 Kindness is contagious

Engage with a wealth of insights in this thoughtful article, valued within the supportive DEV Community. Coders of every background are welcome to join in and add to our collective wisdom.

A sincere "thank you" often brightens someone’s day. Share your gratitude in the comments below!

On DEV, the act of sharing knowledge eases our journey and fortifies our community ties. Found value in this? A quick thank you to the author can make a significant impact.

Okay