DEV Community

Cover image for Summary of Spanner Change Streams
Ryo Yamaoka
Ryo Yamaoka

Posted on

2 2

Summary of Spanner Change Streams

Spanner Change Streams was recently released by GA, then here is a summary of what we found out about it.

What is Spanner Change Streams?

Change Streams is a mechanism to obtain the contents of data changes (INSERT, UPDATE, DELETE) made to Spanner in near real-time.

  • Replication of changes to BigQuery, etc. for analysis
  • Messaging to Pub/Sub, etc. triggered by data changes
  • Save changes to GCS and use as audit logs

Since Spanner did not have this kind of change detection functionality until now, it was impossible to implement the Transaction log tailing pattern when sending events triggered by data updates, for example, and instead it was necessary to implement the Transactional outbox pattern on the application side.

Delivery Integrity

The writes to the internal table for delivery by Change Streams are done in the same transaction as the application writes, so delivery is guaranteed as soon as the commit is successful.

The details are described below, but since the data is localized so that writing to the internal table does not cause delays, the impact on performance is probably not a concern.

The delivered values include table name, column name, column data type, timestamp, transaction ID, and other metadata, as well as modified values and PK. If you want to retrieve the entire record, you must use Spanner's Stale Read from the PK and commit timestamp.

Scalability of Change Streams

Spanner stores real data in units of splits on Colossus (distributed storage), and when the number of splits exceeds a certain size, the data is automatically divided into smaller units to achieve scalability.

Change Streams also follows the same mechanism, whereby a Change Stream Partition is tied to a single Split, and this Partition is linked to the increase or decrease of the Split to ensure scalability. In addition, the partitions are arranged to be localized and tied to the Split, like data interleaving, for the fastest possible processing.

Image description

picture from here: https://cloud.google.com/spanner/docs/change-streams/details#change_stream_partitions

Enabling

Putting in the settings itself is relatively simple and can be done by flowing DDL similar to indexes and unique constraints.
The scope is

  • database as a whole
  • per table
  • per column

For example, the following is a sample configuration for each scope.

CREATE TABLE Users (
    UserID STRING(MAX) NOT NULL,
    Name STRING(MAX) NOT NULL,
    Age INT64 NOT NULL,
) PRIMARY KEY (UserID);

CREATE TABLE Items (
    ItemID STRING(MAX) NOT NULL,
    Name STRING(MAX) NOT NULL,
) PRIMARY KEY (ItemID);

CREATE CHANGE STREAM EverythingStream FOR ALL;

CREATE CHANGE STREAM UsersTableStream FOR Users;

CREATE CHANGE STREAM UsersNameAndItemsNameColumnsStream FOR Users(Name), Items(Name);
Enter fullscreen mode Exit fullscreen mode

Usage Methods

The official documentation shows how to use Dataflow and how to use Spanner directly by calling the Spanner API, but it seems that Dataflow is the most important one, and several templates are provided that seem to be practical.

However, Dataflow seems to be the real deal, and there are several templates that seem to be practical. For example, when the aforementioned partitioning of a partition occurs, a record notifying that fact is returned, so it may be necessary to start a new process based on the contents of that notification to acquire the stream from another table. Conversely, when a merge of partitions (=merge of splits) occurs, it may be necessary to delete unnecessary processes and stop receiving streams.

Naturally, Dataflow templates have all of this covered and are readily available for correct operation, so it would be wise to consider using Dataflow unless there is a good reason to do so.

When I first saw the news, I was expecting to be able to easily realize transactional log tailing from Go, but it was not that simple.

I touched it a little bit.

However, I tried to touch it a little from Go, so I'm pasting the code as a memorial service. It took some trial and error, but I was able to run it from Go's client library without directly touching the API.

This code is just a simple like "I got something from Change Streams! Yeahh 🥰". This code is not a reference code at all, because it is not useful at all, and it has more holes than anthill in the savanna.

The schema is the same as the one I used in the "Enable" section.

package main

import (
    "context"
    "fmt"
    "os"
    "time"

    "cloud.google.com/go/spanner"
    "google.golang.org/api/iterator"
)

func main() {
    if err := run(); err != nil {
        fmt.Fprintln(os.Stderr, err)
    }
}

func run() error {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    dsn := fmt.Sprintf("projects/%s/instances/%s/databases/%s", os.Getenv("SPANNER_PROJECT_ID"), os.Getenv("SPANNER_INSTANCE_ID"), os.Getenv("SPANNER_DATABASE_ID"))
    cli, err := spanner.NewClient(ctx, dsn)
    if err != nil {
        return fmt.Errorf("failed to init spanner client: %w, dsn: %s", err, dsn)
    }

    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            now := time.Now().UTC().Format(time.RFC3339)
            stmt := spanner.NewStatement(
                `SELECT ChangeRecord FROM READ_EverythingStream ( ` +
                    `  start_timestamp => "` + now + `", ` +
                    `  end_timestamp => NULL, ` +
                    `  partition_token => NULL, ` +
                    `  heartbeat_milliseconds => 1000` +
                    `)`,
            )
            iter := cli.Single().Query(ctx, stmt)
            defer iter.Stop()
            for {
                row, err := iter.Next()
                if err != nil {
                    if err == iterator.Done {
                        <-time.After(1 * time.Second)
                        break
                    }
                    return err
                }
                fmt.Println(row.String())
            }
        }
    }
}
Enter fullscreen mode Exit fullscreen mode
$ go run main.go 
{fields: [name:"ChangeRecord" type:{code:ARRAY array_element_type:{code:STRUCT struct_type:{fields:{name:"data_change_record" type:{code:ARRAY array_element_type:{code:STRUCT struct_type:{fields:{name:"commit_timestamp" type:{code:TIMESTAMP}} fields:{name:"record_sequence" type:{code:STRING}} fields:{name:"server_trans
(略)
context deadline exceeded
Enter fullscreen mode Exit fullscreen mode

References

Image of Timescale

🚀 pgai Vectorizer: SQLAlchemy and LiteLLM Make Vector Search Simple

We built pgai Vectorizer to simplify embedding management for AI applications—without needing a separate database or complex infrastructure. Since launch, developers have created over 3,000 vectorizers on Timescale Cloud, with many more self-hosted.

Read more

Top comments (0)

Billboard image

Create up to 10 Postgres Databases on Neon's free plan.

If you're starting a new project, Neon has got your databases covered. No credit cards. No trials. No getting in your way.

Try Neon for Free →

AWS GenAI Live!

GenAI LIVE! is a dynamic live-streamed show exploring how AWS and our partners are helping organizations unlock real value with generative AI.

Tune in to the full event

DEV is partnering to bring live events to the community. Join us or dismiss this billboard if you're not interested. ❤️