Spanner Change Streams was recently released by GA, then here is a summary of what we found out about it.
What is Spanner Change Streams?
Change Streams is a mechanism to obtain the contents of data changes (INSERT, UPDATE, DELETE) made to Spanner in near real-time.
- Replication of changes to BigQuery, etc. for analysis
- Messaging to Pub/Sub, etc. triggered by data changes
- Save changes to GCS and use as audit logs
Since Spanner did not have this kind of change detection functionality until now, it was impossible to implement the Transaction log tailing pattern when sending events triggered by data updates, for example, and instead it was necessary to implement the Transactional outbox pattern on the application side.
Delivery Integrity
The writes to the internal table for delivery by Change Streams are done in the same transaction as the application writes, so delivery is guaranteed as soon as the commit is successful.
The details are described below, but since the data is localized so that writing to the internal table does not cause delays, the impact on performance is probably not a concern.
The delivered values include table name, column name, column data type, timestamp, transaction ID, and other metadata, as well as modified values and PK. If you want to retrieve the entire record, you must use Spanner's Stale Read from the PK and commit timestamp.
Scalability of Change Streams
Spanner stores real data in units of splits on Colossus (distributed storage), and when the number of splits exceeds a certain size, the data is automatically divided into smaller units to achieve scalability.
Change Streams also follows the same mechanism, whereby a Change Stream Partition is tied to a single Split, and this Partition is linked to the increase or decrease of the Split to ensure scalability. In addition, the partitions are arranged to be localized and tied to the Split, like data interleaving, for the fastest possible processing.
picture from here: https://cloud.google.com/spanner/docs/change-streams/details#change_stream_partitions
Enabling
Putting in the settings itself is relatively simple and can be done by flowing DDL similar to indexes and unique constraints.
The scope is
- database as a whole
- per table
- per column
For example, the following is a sample configuration for each scope.
CREATE TABLE Users (
    UserID STRING(MAX) NOT NULL,
    Name STRING(MAX) NOT NULL,
    Age INT64 NOT NULL,
) PRIMARY KEY (UserID);
CREATE TABLE Items (
    ItemID STRING(MAX) NOT NULL,
    Name STRING(MAX) NOT NULL,
) PRIMARY KEY (ItemID);
CREATE CHANGE STREAM EverythingStream FOR ALL;
CREATE CHANGE STREAM UsersTableStream FOR Users;
CREATE CHANGE STREAM UsersNameAndItemsNameColumnsStream FOR Users(Name), Items(Name);
Usage Methods
The official documentation shows how to use Dataflow and how to use Spanner directly by calling the Spanner API, but it seems that Dataflow is the most important one, and several templates are provided that seem to be practical.
However, Dataflow seems to be the real deal, and there are several templates that seem to be practical. For example, when the aforementioned partitioning of a partition occurs, a record notifying that fact is returned, so it may be necessary to start a new process based on the contents of that notification to acquire the stream from another table. Conversely, when a merge of partitions (=merge of splits) occurs, it may be necessary to delete unnecessary processes and stop receiving streams.
Naturally, Dataflow templates have all of this covered and are readily available for correct operation, so it would be wise to consider using Dataflow unless there is a good reason to do so.
When I first saw the news, I was expecting to be able to easily realize transactional log tailing from Go, but it was not that simple.
I touched it a little bit.
However, I tried to touch it a little from Go, so I'm pasting the code as a memorial service. It took some trial and error, but I was able to run it from Go's client library without directly touching the API.
This code is just a simple like "I got something from Change Streams! Yeahh 🥰". This code is not a reference code at all, because it is not useful at all, and it has more holes than anthill in the savanna.
The schema is the same as the one I used in the "Enable" section.
package main
import (
    "context"
    "fmt"
    "os"
    "time"
    "cloud.google.com/go/spanner"
    "google.golang.org/api/iterator"
)
func main() {
    if err := run(); err != nil {
        fmt.Fprintln(os.Stderr, err)
    }
}
func run() error {
    ctx, cancel := context.WithTimeout(context.Background(), 3*time.Second)
    defer cancel()
    dsn := fmt.Sprintf("projects/%s/instances/%s/databases/%s", os.Getenv("SPANNER_PROJECT_ID"), os.Getenv("SPANNER_INSTANCE_ID"), os.Getenv("SPANNER_DATABASE_ID"))
    cli, err := spanner.NewClient(ctx, dsn)
    if err != nil {
        return fmt.Errorf("failed to init spanner client: %w, dsn: %s", err, dsn)
    }
    for {
        select {
        case <-ctx.Done():
            return ctx.Err()
        default:
            now := time.Now().UTC().Format(time.RFC3339)
            stmt := spanner.NewStatement(
                `SELECT ChangeRecord FROM READ_EverythingStream ( ` +
                    `  start_timestamp => "` + now + `", ` +
                    `  end_timestamp => NULL, ` +
                    `  partition_token => NULL, ` +
                    `  heartbeat_milliseconds => 1000` +
                    `)`,
            )
            iter := cli.Single().Query(ctx, stmt)
            defer iter.Stop()
            for {
                row, err := iter.Next()
                if err != nil {
                    if err == iterator.Done {
                        <-time.After(1 * time.Second)
                        break
                    }
                    return err
                }
                fmt.Println(row.String())
            }
        }
    }
}
$ go run main.go 
{fields: [name:"ChangeRecord" type:{code:ARRAY array_element_type:{code:STRUCT struct_type:{fields:{name:"data_change_record" type:{code:ARRAY array_element_type:{code:STRUCT struct_type:{fields:{name:"commit_timestamp" type:{code:TIMESTAMP}} fields:{name:"record_sequence" type:{code:STRING}} fields:{name:"server_trans
(略)
context deadline exceeded
 
 
              

 
    
Top comments (0)