TL;DR
A long-running job should be implemented as one long-running activity that
uses heartbeat for resumption.
What I needed to do
There was a data-sync backfilling to be done where we backfill data from one
service's database into another one. It was about 5.5M DB entities that needed
to be copied, so we expected the total running time to be some number of hours.
We wanted to write a job with some kind of progress record, i.e. a "cursor".
Such that, should the job fail at any point, it could, on retry, continue where
it left off.
First instinct: Parameterized activity
After reading the Temporal docs, my first instinct was to implement one workflow
with one parameterized activity which the workflow would call in a loop.
This idea was dismissed after talking to my colleagues. Because each activity
invocation results in events being added to the Temporal Events. Temporal works
under the assumption that there will be few events per workflow, not tens of
thousands. We didn't want to risk exhausting Temporal's disk space.
The good solution: One long-running Activity with Heartbeats
I'll just show you some code here.
At the beginning of the Activity we use GetHeartbeatDetails
to get the cursor
that was recorded by a previous, unsuccessful activity execution. After each
batch we record the cursor via RecordHeartbeat
.
You'll want to do some info logging throughout the activity execution. Errors
don't need to be logged, just return them to the runtime, it will surface them.
As you can see, the batch size is configurable. We ran it with a size of 100 and
that worked well.
package workflows
import (
"context"
"fmt"
"go.temporal.io/sdk/activity"
)
type BackfillingActivities struct {
BatchSize int
}
type Heartbeat struct {
Cursor string
}
func (a *BackfillingActivities) CommentsBackfillingActivity(ctx context.Context) error {
cursor := ""
if activity.HasHeartbeatDetails(ctx) {
var heartbeat Heartbeat
if err := activity.GetHeartbeatDetails(ctx, &heartbeat); err != nil {
return fmt.Errorf("error getting heartbeat: %w", err)
}
cursor = heartbeat.Cursor
}
var err error
for {
cursor, err = a.processBatch(ctx, cursor, a.BatchSize)
if err != nil {
return err
}
if cursor == "" {
break
} else {
activity.RecordHeartbeat(ctx, Heartbeat{cursor})
}
}
return nil
}
// processBatch processes one batch starting from cursor and returns the next cursor or an error.
func (a BackfillingActivities) processBatch(ctx context.Context, cursor string, batchSize int) (string, error) {
logger := activity.GetLogger(ctx)
logger.Info(fmt.Sprintf("Starting processing batch with cursor \"%s\"", cursor)
//-> batch := SELECT FROM WHERE primary_key > {cursor} ORDER BY primary_key ASC LIMIT {batchSize}
logger.Info(fmt.Sprintf("Found %d items", len(batch))
//-> Transform into target data structure
//-> Load into target DB
if len(batch) < batchSize {
return "", nil
}
lastItem := batch[len(batch)-1]
return lastItem.Id, nil
}
Top comments (0)