DEV Community

Harish
Harish

Posted on

How a 500 MB Buffer Killed Our Archival Job — And Why Streaming Fixed It

The job that kept dying

We run a nightly archival job that exports a few large Postgres tables to S3 as gzipped JSONL. On paper it's a humble piece of glue: read rows, serialize, compress, upload. In practice, it was getting OOM-killed by Kubernetes most nights on the larger tenants.

The pod's memory limit was 1 GiB. The biggest table being archived had ~466K rows. At peak we measured the Go heap pushing past 700 MB before the kernel reaped us.

That ratio — 466K small rows producing hundreds of megabytes of heap — was the smell. None of those rows is large. Something in the pipeline was hoarding all of them at once.

What the original code looked like

Roughly, the archival path was this:

rows, _ := q.GetFetchQueueForArchive(ctx, domainID)  // []Row, materialized

var buf bytes.Buffer
gz := gzip.NewWriter(&buf)
for _, r := range rows {
    line, _ := json.Marshal(r)
    gz.Write(line)
    gz.Write([]byte("\n"))
}
gz.Close()

s3.PutObject(ctx, &s3.PutObjectInput{
    Bucket: &bucket,
    Key:    &key,
    Body:   bytes.NewReader(buf.Bytes()),
})
Enter fullscreen mode Exit fullscreen mode

Read that with a memory profiler in your head and the bug is obvious:

  1. GetFetchQueueForArchive is a SQLC :many query. It loops over rows.Next() internally, scans every row into a struct, and appends to a slice. The whole result set lands in the heap before the function returns. For 466K rows of ~1 KB each, that's about 500 MB.
  2. Then we compress into bytes.Buffer — another ~100 MB of gzipped bytes, in memory.
  3. s3.PutObject requires either a Content-Length header or a seekable body. We satisfy that by handing it the fully-buffered bytes. So the whole compressed payload has to coexist in RAM with the source slice until at least the slice goes out of scope.

Peak heap ≈ rows slice + gzip buffer ≈ ~600 MB. Add the rest of the process (DB driver, AWS SDK, goroutine stacks) and we cross the 1 GiB ceiling. The pod dies. K8s restarts it. It dies again on the same table.

The crucial observation: memory usage scales linearly with table size, even though we never need more than one row at a time to do the work. That's a design bug, not a tuning problem. No amount of bumping the memory limit fixes it — the next tenant with twice the rows blows past whatever ceiling you pick.

The shape of the fix

The goal: never hold more than one row's worth of data in Go memory. The pipeline should look like a hose, not a tank. Bytes flow Postgres → JSON → gzip → S3 in a single producer/consumer pipeline, where each stage processes one small unit at a time and exerts backpressure on the previous stage.

Concretely:

pgx.Rows (server-side cursor)
   ↓  rows.Next() → scan one row → ~1 KB
json.Marshal(row)
   ↓                              ~1 KB line
gzip.Writer                       ~32 KB internal compression window
   ↓
io.Pipe (synchronous, zero-buffer)
   ↓
s3manager.Uploader                 5 MB per multipart part
   ↓
S3 (multipart upload)
Enter fullscreen mode Exit fullscreen mode

Peak memory: 1 row + 32 KB gzip window + 5 MB part buffer × upload concurrency ≈ ~25 MB, regardless of whether the table has 1K rows or 100M rows.

The four primitives that make this work

1. pgx.Rows — a server-side cursor

pool.Query() returns a Rows handle backed by a Postgres cursor. rows.Next() pulls one row at a time across the wire; only the row currently being scanned lives in Go memory. This is fundamentally different from what SQLC's :many generates, which loops and appends every row into a slice before returning. The streaming path bypasses SQLC and talks to the pool directly with the raw SQL string.

2. io.Pipe — a synchronous in-memory pipe

pr, pw := io.Pipe()
Enter fullscreen mode Exit fullscreen mode

pr is a Reader, pw is a Writer. Bytes written to pw become readable from pr — with no internal buffer. It's a synchronization point, not a queue. Write blocks until something reads pr, and Read blocks until something writes pw. Producer and consumer rate-match each other naturally. If S3 upload stalls, our DB iteration stalls too, and memory stays flat.

3. compress/gzip.Writer — streaming compression

gzip.NewWriter(w io.Writer) returns a writer that compresses on the fly. As you Write() to it, it buffers up to ~32 KB internally, then flushes compressed bytes to the underlying writer. You never have to hold the whole input or output in memory.

4. s3manager.Uploader — multipart upload from an io.Reader

The naive s3.PutObject(Body: io.Reader) is a trap: it needs a Content-Length or seek support to compute the body size, neither of which a true stream provides. s3manager.Uploader reads the body in 5 MB chunks (configurable), uploads each as a multipart "part", and stitches them together with CompleteMultipartUpload. Memory bound: 5 MB × concurrency (default 5) ≈ ~25 MB, well below pod limits.

The producer/consumer pipeline

func (a *Archiver) streamJSONL(
    ctx context.Context,
    s3Key string,
    sqlText string,
    args []any,
    scanFn func(rows pgx.Rows) (any, error),
) (rowCount int, err error) {
    pr, pw := io.Pipe()

    go func() {
        // On exit, close pw with the producer's error (or nil) so the
        // reader side learns the outcome.
        defer func() {
            if err != nil {
                pw.CloseWithError(err)
            } else {
                pw.Close()
            }
        }()

        gz := gzip.NewWriter(pw)
        defer gz.Close() // flushes gzip trailer BEFORE pipe closes

        rows, qErr := a.pool.Query(ctx, sqlText, args...)
        if qErr != nil {
            err = qErr
            return
        }
        defer rows.Close()

        for rows.Next() {
            row, sErr := scanFn(rows)
            if sErr != nil {
                err = sErr
                return
            }
            line, mErr := json.Marshal(row)
            if mErr != nil {
                err = mErr
                return
            }
            if _, wErr := gz.Write(line); wErr != nil {
                err = wErr
                return
            }
            if _, wErr := gz.Write([]byte("\n")); wErr != nil {
                err = wErr
                return
            }
            rowCount++
        }
        if rErr := rows.Err(); rErr != nil {
            err = rErr
        }
    }()

    // Hand pr to s3manager. Blocks until the producer finishes AND all
    // parts are uploaded.
    upErr := a.s3Uploader.UploadStream(ctx, s3Key, pr, "application/gzip",
        map[string]string{"content-encoding": "gzip"})
    if upErr != nil {
        pr.CloseWithError(upErr) // tell producer to stop if still running
        return 0, fmt.Errorf("s3 upload failed: %w", upErr)
    }
    return rowCount, err
}
Enter fullscreen mode Exit fullscreen mode

A few subtleties that look small but matter:

  • defer gz.Close() is registered before defer pw.Close() and runs first. Gzip needs to flush its trailer bytes through pw. If the pipe closed first, the gzip stream on the consumer side would be truncated and unreadable.
  • CloseWithError on both ends. If the producer hits a DB error mid-iteration, it closes pw with that error. The consumer reading pr sees ErrClosedPipe wrapping the cause, and s3manager aborts the multipart upload, sending AbortMultipartUpload to S3 — no orphaned partial uploads. The same trick goes the other way: if the S3 upload fails, we pr.CloseWithError(upErr) so the producer's next gz.Write returns and the goroutine exits instead of hanging.
  • No mutex needed. io.Pipe is its own synchronization primitive; the producer's writes serialize naturally against the consumer's reads.
  • Reading rowCount after the call is safe because UploadStream only returns after pr is fully drained, which only happens after the producer goroutine closes pw. By the time we return, the producer is done writing to rowCount.

What the numbers looked like after

Same 466K-row table, same pod, same 1 GiB memory limit:

  • Before: heap peaked around 700 MB; OOM-killed roughly 4 nights out of 5.
  • After: heap held flat at ~30 MB through the whole archive. Zero OOM kills since deploy.

Wall-clock time went up slightly (a couple of percent) because we no longer issue one fat PutObject — but that's a fair trade for not getting murdered by the kernel.

What streaming doesn't fix

Worth being honest about the limits:

  • DB-side memory. Some of the queries powering these archives do non-trivial joins. That's Postgres' problem, not the pod's. Streaming doesn't make the planner cheaper.
  • Retry cost. If the upload fails 90% of the way through, we re-iterate from row zero on retry. For 50M-row tables that's slow but still memory-flat. Idempotent multipart resumption is a follow-up, not a blocker.
  • In-memory tree builds elsewhere. Other parts of the system still build whole trees in memory before serializing. They're small enough today that it's fine, but it's the same anti-pattern waiting to bite.

The general lesson

Whenever your data pipeline's memory usage scales with input size and you don't actually need random access to the data, you have a streaming bug waiting to happen. The fix is almost always the same set of primitives: a cursor on the source, an io.Pipe for backpressure, a streaming codec in the middle, and a chunked uploader at the sink. Bound the working set to a few megabytes and the pipeline stops caring whether you throw a thousand rows at it or a billion.

Top comments (1)

Some comments may only be visible to logged-in visitors. Sign in to view all comments.