Putting it All Together
It's time to connect all the dots and build the final tool. The most important components were already covered in the previous posts but are missing something else: the initial HTTP request meant to be used for downloading the gzip file.
Let's work on that first and then we can put everything together.
Minimum Requirements
All the code relevant to this post is on Github, feel free to explore it for more details, the following is the minimum required for running the example:
- Go 1.14
- PostgreSQL 12.3: in theory any recent version should work, the README.md includes specific instructions for running it with Docker.
Downloading the file
To download the file via HTTP we have to use the standard library, specifically the types net/http.Client
and compress/gzip.Reader
, this is because the file we expect to download is a gzipped one.
For both requirements the following short snippet should cover that:
// XXX omitting error handling to keep code short
req, _ := http.NewRequest(http.MethodGet, "https://datasets.imdbws.com/name.basics.tsv.gz", nil)
client := &http.Client{
Timeout: 10 * time.Minute, // XXX: use something reasonable
}
resp, _ := client.Do(req)
defer resp.Body.Close()
gr, _ := gzip.NewReader(resp.Body)
defer gr.Close()
for {
line, err := cr.ReadString('\n')
if err == io.EOF {
return
}
// XXX: do something with the read value!
}
Connecting all the dots
The biggest and most important thing to consider during this integration (of all the previous posts, that is) is how we should handle downstream errors coming from PostgreSQL, specifically the change in batcher
, which in the end consolidates a type introduced in part 2 called copyFromSourceMediator
this is with the idea of handling errors more closely.
The reason being of this change is the delay between the actual pgx
calls (and therefore PostgreSQL) and ours, in practice what this means is that we require sync.Mutex
for synchronizing the two goroutines handling sending messages to PostgreSQL and receiving messages from upstream.
See:
func (b *batcher) Copy(ctx context.Context, namesC <-chan name) <-chan error {
outErrC := make(chan error)
var mutex sync.Mutex
var copyFromErr error
copyFrom := func(batchNamesC <-chan name, batchErrC <-chan error) <-chan error {
cpOutErrorC := make(chan error)
go func() {
defer close(cpOutErrorC)
copier := newCopyFromSource(batchNamesC, batchErrC)
_, err := b.conn.CopyFrom(ctx,
pgx.Identifier{"names"},
[]string{
"nconst",
"primary_name",
"birth_year",
"death_year",
"primary_professions",
"known_for_titles",
},
copier)
if err != nil {
mutex.Lock()
copyFromErr = err
mutex.Unlock()
}
}()
return cpOutErrorC
}
go func() {
batchErrC := make(chan error)
batchNameC := make(chan name)
cpOutErrorC := copyFrom(batchNameC, batchErrC)
defer func() {
close(batchErrC)
close(batchNameC)
close(outErrC)
}()
var index int64
for {
select {
case n, open := <-namesC:
if !open {
return
}
mutex.Lock()
if copyFromErr != nil {
namesC = nil
mutex.Unlock()
outErrC <- copyFromErr
return
}
mutex.Unlock()
batchNameC <- n
index++
if index == b.size {
close(batchErrC)
close(batchNameC)
if err := <-cpOutErrorC; err != nil {
outErrC <- err
return
}
batchErrC = make(chan error)
batchNameC = make(chan name)
cpOutErrorC = copyFrom(batchNameC, batchErrC)
index = 0
}
case <-ctx.Done():
if err := ctx.Err(); err != nil {
batchErrC <- err
outErrC <- err
return
}
}
}
}()
return outErrC
}
The code looks like a lot but really the important bits are in the variable/function copyFrom
which still uses copyFromSource
for dealing with events coming from upstream and also uses a mutex to set errors coming from pgx's CopyFrom
.
What's next?
This is the last post of the series but I don't think this is the end, I will follow up and improve the existing code in the future. I will answer (at least) the two following questions:
- How to provide processing status? How many events left? How many events processed?
- How to resume processing events in case of failures?
I will improve this implementation, wait for it.
Top comments (0)