DEV Community

Fabio Ghirardello for Cockroach Labs

Posted on • Updated on

Generate multiple, large, sorted CSV files with pseudo-random data

For this exercise, I was tasked to create a 500 million rows dataset for an IMPORT INTO speed test.
Details about the IMPORT INTO command can be found here.
The requirement was to create 100 sorted CSV files, each containing 5 million rows, and save them to an S3 bucket.
The data should also be sorted across all files.

In this blog, I show how to use pgworkload to create such pseudo-random dataset.

Setup pgworkload and aws-cli

Provision a large machine with plenty of RAM.
The AWS instance type r7i.8xlarge, which sports 32 vCPUs and 256GB RAM, is a good candidate.
Make sure to attach an IAM Role with permissions to write to your S3 bucket.

Once ready, ssh into the box and install the tools

sudo apt update
sudo apt install -y python3-pip unzip

pip install -U pip
pip install pgworkload
Enter fullscreen mode Exit fullscreen mode

Logout and log back in so that pgworkload is in the PATH...

Now install AWS CLI, here the official docs.

curl "https://awscli.amazonaws.com/awscli-exe-linux-x86_64.zip" -o "awscliv2.zip"
unzip awscliv2.zip
sudo ./aws/install
Enter fullscreen mode Exit fullscreen mode

Confirm it's working

$ aws --version
aws-cli/2.15.17 Python/3.11.6 Linux/5.15.0-1052-aws exe/x86_64.ubuntu.20 prompt/off

# confirm you're authenticated..
$ aws s3 ls workshop-ca
                          PRE fab/

# put some files in the bucket
$ aws s3 cp s.sql 's3://workshop-ca/fab/'
upload: ./s.sql to s3://workshop-ca/fab/s.sql                  

# check the value is there
$ aws s3 ls workshop-ca/fab/
                           PRE 2024/
                           PRE metadata/
2024-02-02 19:03:10       3585 s.sql
Enter fullscreen mode Exit fullscreen mode

Good, we're all set!

Generate data

The data that needs to be created must match the schema of the table into which it will be imported.

The DDL of the schema is in file s.sql.

-- file s.sql
CREATE TABLE accounts (
    acc_no STRING(20) NOT NULL,
    pos_dt DATE NOT NULL,
    ent_id DECIMAL(25) NOT NULL,
    col_00 STRING(35) NOT NULL,
    col_01 STRING(16) NULL,
    col_02 STRING(9) NOT NULL,
    col_03 STRING(30) NOT NULL,
    col_04 DECIMAL(22,6) NOT NULL DEFAULT 0:::DECIMAL,
    col_05 DECIMAL(22,6) NOT NULL DEFAULT 0:::DECIMAL,
    col_06 STRING(3) NOT NULL,
    col_07 DECIMAL(22,8) NULL,
    col_08 DECIMAL(20,3) NULL,
    col_09 DECIMAL(20,4) NULL,
    col_10 DECIMAL(20,3) NULL,
    col_11 DECIMAL(20,5) NULL,
    col_12 DECIMAL(20,5) NULL DEFAULT 0:::DECIMAL,
    col_13 DECIMAL(20,5) NULL DEFAULT 0:::DECIMAL,
    col_14 DECIMAL(20,5) NULL,
    col_15 DECIMAL(20,3) NULL,
    col_16 DECIMAL(20,3) NULL DEFAULT 0:::DECIMAL,
    col_17 DECIMAL(21,8) NULL,
    col_18 DECIMAL(21,8) NULL DEFAULT 0:::DECIMAL,
    col_19 DECIMAL(21,2) NULL DEFAULT 0:::DECIMAL,
    col_20 DECIMAL(21,2) NULL,
    col_21 DECIMAL(21,2) NULL,
    col_22 DECIMAL(21,2) NULL DEFAULT 0:::DECIMAL,
    col_23 STRING(1) NOT NULL,
    col_24 STRING(3) NULL,
    col_25 DECIMAL(22,9) NULL,
    col_26 DECIMAL(21,8) NULL,
    col_27 DECIMAL(21,8) NULL DEFAULT 0:::DECIMAL,
    col_28 DECIMAL(22,9) NULL,
    col_29 DECIMAL(20,5) NULL,
    col_30 DECIMAL(20,5) NULL,
    col_31 DECIMAL(20,5) NULL,
    col_32 DECIMAL(20,5) NULL,
    col_33 DECIMAL(20,5) NULL,
    col_34 DECIMAL(20,5) NULL,
    col_35 DECIMAL(20,2) NULL,
    col_36 DECIMAL(20,2) NULL,
    col_37 DECIMAL(20,5) NULL,
    col_38 DATE NULL,
    col_39 DATE NULL,
    col_40 DATE NULL,
    col_41 STRING(25) NULL,
    col_42 DECIMAL(21,6) NULL,
    col_43 DECIMAL(21,6) NULL,
    col_44 DECIMAL(21,6) NULL,
    CONSTRAINT pk PRIMARY KEY (acc_no ASC, pos_dt ASC, ent_id ASC)
);
Enter fullscreen mode Exit fullscreen mode

Given the schema, pgworkload can generate an intermediate representation of what needs to be generated - a definition file - in YAML syntax.

pgworkload util yaml -i s.sql
Enter fullscreen mode Exit fullscreen mode

The result is a file called, by default, s.yaml, below a snippet of the first few lines

# file s.yaml
accounts:
- count: 1000
  sort-by: []
  columns:
    acc_no:
      type: string
      args:
        min: 10
        max: 30
        seed: 0.7225861820526325
        null_pct: 0.0
        array: 0
    pos_dt:
      type: date
      args:
        start: '2022-01-01'
        end: '2022-12-31'
        format: '%Y-%m-%d'
        seed: 0.24769809060740589
        null_pct: 0.0
        array: 0
    ent_id:
      type: float
      args:
        max: 10000
        round: 2
        seed: 0.028215986930010706
        null_pct: 0.0
        array: 0
    col_00:
      type: string
      args:
        min: 10
        max: 30
        seed: 0.8785098436269427
        null_pct: 0.0
        array: 0
    col_01:
      type: string
      args:
        min: 10
        max: 30
        seed: 0.8561702097239098
        null_pct: 0.0
        array: 0
[...]
Enter fullscreen mode Exit fullscreen mode

This is just a template, we need to configure it as per our needs:

  • generate 500,000,000 rows
  • ensure dataset is sorted as per Primary Key
  • the 2nd column must always be the same date.

Here is therefore the updated head of the file

accounts:
- count: 500000000
  sort-by: ["acc_no", "pos_dt", "ent_id"]
  columns:
    acc_no:
      type: string
      args:
        min: 10
        max: 30
        seed: 0.7225861820526325
        null_pct: 0.0
        array: 0
    pos_dt:
      type: costant
      args:
        value: "2024-02-01"
Enter fullscreen mode Exit fullscreen mode

At this point, we're ready to generate the data.

# check the options available for the `util csv` command
$ pgworkload util csv --help
Enter fullscreen mode Exit fullscreen mode

╭─ Options ────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╮
│ *  --input         -i      FILE       Filepath to the YAML data generation file. [default: None] [required]                                  │
│    --output        -o      DIRECTORY  Output directory for the CSV files. Defaults to <input-basename>.                                      │
│    --procs         -x      INTEGER    Number of processes to spawn. Defaults to <system-cpu-count>.                                          │
│    --csv-max-rows          INTEGER    Max count of rows per resulting CSV file. [default: 100000]                                            │
│    --hostname      -n      INTEGER    The hostname of the http server that serves the CSV files.                                             │
│    --port          -p      INTEGER    The port of the http server that servers the CSV files. [default: 3000]                                │
│    --table-name    -t      TEXT       The table name used in the import statement. [default: table_name]                                     │
│    --compression   -c      TEXT       The compression format. [default: None]                                                                │
│    --delimiter     -d      TEXT       The delimeter char to use for the CSV files. Defaults to "tab".                                        │
│    --help                             Show this message and exit.                                                                            │
╰──────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────────╯
Enter fullscreen mode Exit fullscreen mode

Here, you are interested in setting the --csv-max-rows and -x parameters.

To make sure we end up with 100 files, we will use 10 processes (-x) with each process generating files of size 5,000,000 until the total is 500,000,000.

pgworkload util csv -i s.yaml -d "," --csv-max-rows 5000000 -x 10
Enter fullscreen mode Exit fullscreen mode

This will take time, and memory: monitor memory utilization using top or free.
Once completed, you have 100 CSV files in a s directory.

$ ls -A1 s/
accounts.0_0_0.csv
accounts.0_0_1.csv
accounts.0_0_2.csv
accounts.0_0_3.csv
accounts.0_0_4.csv
accounts.0_0_5.csv
accounts.0_0_6.csv
accounts.0_0_7.csv
accounts.0_0_8.csv
accounts.0_0_9.csv
accounts.0_1_0.csv
accounts.0_1_1.csv
[...]
accounts.0_9_8.csv
accounts.0_9_9.csv
Enter fullscreen mode Exit fullscreen mode

Inspecting one file, we see it's sorted

$ head s/accounts.0_0_0.csv
abcdgzrjyphtemsbcjvt,2024-02-01,4314.45,jkiuotkttqwjnjnbxzbxhgsyke,uxejzkiirmitunpzybjnakoic,ovvqhgmsbwoajhwmiyhnugj,...
aefkzjdckjylb,2024-02-01,9375.53,bswvhyjkodukhwpcxf,uevjmwqhdfaobtlf,oahiaiztayyzftmfkyuez,qtxhjuwpfalfzaeoiiahuoxamns,...
aerkycbddriqtygvilb,2024-02-01,6150.55,mprfweeqoe,nvddlibqqzncrwdnffm,phcnnzvxrauxllj,vjnabkrzgiimmt,...
agjiqkisyshjeorqna,2024-02-01,9901.21,fipnlqezgzzdfreg,yokerzbkxcrzfdeckjkk,guaeeecdqgbbwtnzleopfznzcuv,lqglvuyetypvnovdbflbnodozfebz,...
atpkoobmhvhhxuqxceurv,2024-02-01,2455.17,cwmkzijlrqhtdcx,jtbelvfoajfdagwigpevnmameq,uedouumekwxagdgwbtivewaq,uytgiqpewfexlqkmbelpik,...
Enter fullscreen mode Exit fullscreen mode

Now you have the data, sorted by file, but not across all files.
That is, each file is sorted as per PK, but across all the files the data is not yet sorted.
Currently, pgworkload doesn't have the capability to do so, so we have to develop our own sorted-merge script.

Note You can't possibly be thinking to load everything into memory, sort, and save.
That's too big to fit into a single machine, no matter how big it is.
Instead, you must read in chunks and write in chunks, so that it can scale.
Below script will work no matter how many files you have or how large they are.

Here's my quick and dirty Python script.

UPDATE: as of version 0.1.8, the sort merge functionality has been added to pgworkload, check with pgworkload util merge --help.

# file: sort_merge.py
from io import TextIOWrapper
import sys

# input CSV files - it assumes files are already sorted
CSVs = sys.argv[1:]

CSV_MAX_ROWS = 5000000
COUNTER = 0
C = 0

source: dict[int, list] = {}
file_handlers: dict[int, TextIOWrapper] = {}


def initial_fill(csv: str, idx: int):
    """
    opens the CSV file, saves the file handler,
    read few lines into source list for the index.
    """
    f = open(csv, "r")
    file_handlers[idx] = f
    while len(source[idx]) < 5:
        line = f.readline()
        if line != "":
            source[idx].append(line)
        else:
            # reached end of file
            print(f"initial_fill: CSV file '{csv}' at source index {idx} reached EOF.")
            f.close()
            break


def replenish_source_list(idx: int):
    """
    Refills the source list with a new value from the source file
    """
    try:
        f = file_handlers.get(idx, None)
        if not f:
            return
        line = f.readline()
        if line != "":
            source[idx].append(line)
        else:
            # reached end of file
            print(f"index {idx} reached EOF.")
            f.close()
            del file_handlers[idx]
    except Exception as e:
        print("Excepton in replenish_queue: ", e)


def write_to_csv(v):
    global C
    global output
    global COUNTER
    if C >= CSV_MAX_ROWS:
        output.close()
        COUNTER += 1
        C = 0
        output = open(f"out_{str.zfill(str(COUNTER), 3)}.csv", "+w")

    output.write(v)
    C += 1    

# init the source dict by opening each CSV file
# and only reading few lines.
for idx, csv in enumerate(CSVs):
    source[idx] = []

    initial_fill(csv, idx)

# the source dict now has a key for every file and a list of the first values read

l = []
# pop the first value in each source to a list `l`
# `l` will have the first values of all source CSV files
for k, v in source.items():
    try:
        l.append((v.pop(0), k))
    except IndexError as e:
        pass

first_k = None
first_v = None
output = open(f"out_{str.zfill(str(COUNTER), 3)}.csv", "+w")

# sort list `l`
# pop the first value (the smallest) in `first_v`
# make a note of the source of that value in `first_k`
# replenish the corrisponding source
while True:
    if first_k is not None:
        try:
            replenish_source_list(first_k)
            l.append((source[first_k].pop(0), first_k))

        except IndexError as e:
            # the source list is empty
            print(f"source list {first_k} is now empty")
            first_k = None

    if l:
        l.sort(key=lambda x: x[0])
        try:
            first_v, first_k = l.pop(0)
            write_to_csv(first_v)
        except IndexError as e:
            print("Exception in main: ", e)
            output.close()
    else:
        break


output.close()

print("\ndone!")
Enter fullscreen mode Exit fullscreen mode

Run it

$ python3 sort_merge.py s/*
index 58 reached EOF.
index 82 reached EOF.
[...]
source list 26 is now empty
source list 69 is now empty

done!
Enter fullscreen mode Exit fullscreen mode

Inspect the new files, also 100 in total

$ ls -A1 out_*
out_000.csv
out_001.csv
out_002.csv
[...]
out_098.csv
out_099.csv
Enter fullscreen mode Exit fullscreen mode

You should now see that the data is now sorted also across files, too.

$ head out_000.csv 
aabrwawoedcqnosvgzcvf,2024-02-01,5285.54,...
aasobyznvehzvrppwijpbbxfrjzdj,2024-02-01,7942.57,..
abcppzyblqnksovdnf,2024-02-01,7577.34,...
abjfxjatqangpalindkcdzsmzbasfx,2024-02-01,9831.37,...
aepkvifbrl,2024-02-01,1239.02,...

$ head out_099.csv 
zxwcmhfnwuqarb,2024-02-01,8477.3,...
zyjucqqytplxf,2024-02-01,5049.06,...
zyvwzspgaxzcymlvo,2024-02-01,5590.13,...
zzmrrnytooz,2024-02-01,7936.68,...
zzqpnbksbdheo,2024-02-01,7950.73,...
Enter fullscreen mode Exit fullscreen mode

You can now upload those files to your S3 bucket using the aws cli

aws s3 cp s/ 's3://workshop-ca/sorted_across_files/' --recursive
Enter fullscreen mode Exit fullscreen mode

At this point, you can safely terminate the AWS instance.

Top comments (0)