While the standard database/sql
package and sqlx
are great general-purpose database tools, pgx
offers PostgreSQL-specific features that can significantly improve your application's performance and developer experience. Let's explore how to leverage pgx's powerful capabilities.
Why Choose pgx?
pgx offers several PostgreSQL-specific advantages:
- Better performance than
lib/pq
- Native support for PostgreSQL data types
- Automatic prepared statement caching
- Connection pooling with
pgxpool
- COPY protocol support
- Listen/Notify support
- Custom type mapping
- Batch operations
Getting Started
First, let's set up a new project:
mkdir go-pgx-demo
cd go-pgx-demo
go mod init go-pgx-demo
go get github.com/jackc/pgx/v5
Basic Setup and Connection
package main
import (
"context"
"fmt"
"os"
"github.com/jackc/pgx/v5"
"github.com/jackc/pgx/v5/pgxpool"
)
type Book struct {
ID int
Title string
AuthorID int
PublishedYear int
Data pgx.JSONB // Native PostgreSQL JSONB support
}
func main() {
// Connection URL with additional options
dbURL := "postgres://username:password@localhost:5432/bookstore?sslmode=disable&pool_max_conns=10"
// Create a connection pool
config, err := pgxpool.ParseConfig(dbURL)
if err != nil {
fmt.Fprintf(os.Stderr, "Failed to parse config: %v\n", err)
os.Exit(1)
}
// Custom pool configuration
config.MaxConns = 10
config.MinConns = 2
pool, err := pgxpool.NewWithConfig(context.Background(), config)
if err != nil {
fmt.Fprintf(os.Stderr, "Unable to connect to database: %v\n", err)
os.Exit(1)
}
defer pool.Close()
}
Advanced Features and Examples
1. Batch Operations
func BatchInsertBooks(ctx context.Context, pool *pgxpool.Pool, books []Book) error {
batch := &pgx.Batch{}
for _, book := range books {
batch.Queue(`
INSERT INTO books (title, author_id, published_year, data)
VALUES ($1, $2, $3, $4)`,
book.Title, book.AuthorID, book.PublishedYear, book.Data)
}
br := pool.SendBatch(ctx, batch)
defer br.Close()
return br.Close()
}
2. COPY Protocol
func BulkImportBooks(ctx context.Context, pool *pgxpool.Pool, books []Book) error {
_, err := pool.Exec(ctx, `
CREATE TEMPORARY TABLE temp_books (
title TEXT,
author_id INTEGER,
published_year INTEGER,
data JSONB
)`)
if err != nil {
return fmt.Errorf("failed to create temp table: %v", err)
}
copyCount, err := pool.CopyFrom(
ctx,
pgx.Identifier{"temp_books"},
[]string{"title", "author_id", "published_year", "data"},
pgx.CopyFromSlice(len(books), func(i int) ([]interface{}, error) {
return []interface{}{
books[i].Title,
books[i].AuthorID,
books[i].PublishedYear,
books[i].Data,
}, nil
}),
)
if err != nil {
return fmt.Errorf("copy failed: %v", err)
}
fmt.Printf("Copied %d records\n", copyCount)
return nil
}
3. Listen/Notify for Real-time Updates
func ListenForBookUpdates(ctx context.Context, pool *pgxpool.Pool) error {
conn, err := pool.Acquire(ctx)
if err != nil {
return err
}
defer conn.Release()
_, err = conn.Exec(ctx, "LISTEN book_updates")
if err != nil {
return err
}
for {
notification, err := conn.Conn().WaitForNotification(ctx)
if err != nil {
return err
}
fmt.Printf("Received notification on channel %s: %s\n",
notification.Channel, notification.Payload)
}
}
// Trigger notification
func NotifyBookUpdate(ctx context.Context, pool *pgxpool.Pool, bookID int) error {
_, err := pool.Exec(ctx,
"SELECT pg_notify('book_updates', $1)",
fmt.Sprintf("Book %d updated", bookID))
return err
}
4. Custom Type Handling
type ISBN string
func (isbn *ISBN) ScanISBN(v interface{}) error {
if v == nil {
*isbn = ""
return nil
}
switch v := v.(type) {
case string:
*isbn = ISBN(v)
return nil
case []byte:
*isbn = ISBN(string(v))
return nil
}
return fmt.Errorf("cannot scan %T into ISBN", v)
}
type BookWithISBN struct {
ID int
Title string
ISBN ISBN
}
func RegisterCustomTypes(conn *pgx.Conn) error {
isbn := pgtype.NewTextCodec(nil)
conn.TypeMap().RegisterType(&pgtype.Type{
Name: "isbn",
OID: pgtype.TextOID,
Codec: isbn,
})
return nil
}
5. Transaction Management
func TransferBooks(ctx context.Context, pool *pgxpool.Pool, fromAuthorID, toAuthorID int) error {
return pool.BeginTxFunc(ctx, pgx.TxOptions{
IsoLevel: pgx.Serializable,
AccessMode: pgx.ReadWrite,
}, func(tx pgx.Tx) error {
// Update books
_, err := tx.Exec(ctx, `
UPDATE books
SET author_id = $1
WHERE author_id = $2`,
toAuthorID, fromAuthorID)
if err != nil {
return err
}
// Update author statistics
_, err = tx.Exec(ctx, `
UPDATE author_stats
SET book_count = (
SELECT COUNT(*)
FROM books
WHERE author_id = $1
)
WHERE author_id IN ($1, $2)`,
toAuthorID, fromAuthorID)
return err
})
}
6. Query Row Scanning
func GetBookDetails(ctx context.Context, pool *pgxpool.Pool, bookID int) (*Book, error) {
var book Book
err := pool.QueryRow(ctx, `
SELECT id, title, author_id, published_year, data
FROM books
WHERE id = $1`, bookID).Scan(
&book.ID,
&book.Title,
&book.AuthorID,
&book.PublishedYear,
&book.Data,
)
if err == pgx.ErrNoRows {
return nil, fmt.Errorf("book not found")
}
return &book, err
}
Performance Optimization
1. Connection Pool Tuning
func ConfigurePool(config *pgxpool.Config) {
config.MaxConns = 20
config.MinConns = 5
config.MaxConnLifetime = time.Hour
config.MaxConnIdleTime = 30 * time.Minute
config.HealthCheckPeriod = time.Minute
// Custom connection configuration
config.ConnConfig.RuntimeParams["application_name"] = "MyApp"
config.ConnConfig.PreferSimpleProtocol = true
}
2. Prepared Statement Cache
func ConfigurePreparedStatements(config *pgxpool.Config) {
// Disable prepared statement caching if needed
config.ConnConfig.PreferSimpleProtocol = true
// Or configure cache size
config.ConnConfig.StatementCacheCapacity = 512
}
Best Practices
- Use Connection Pooling
// Don't create individual connections
pool, err := pgxpool.New(context.Background(), dbURL)
- Handle Context Properly
ctx, cancel := context.WithTimeout(context.Background(), 5*time.Second)
defer cancel()
- Use Transactions for Multiple Operations
err := pool.BeginTxFunc(ctx, pgx.TxOptions{}, func(tx pgx.Tx) error {
// Multiple operations here
return nil
})
- Proper Resource Cleanup
rows, err := pool.Query(ctx, "SELECT * FROM books")
if err != nil {
return err
}
defer rows.Close()
Common Pitfalls and Solutions
- Connection Leaks: Always release acquired connections
- Transaction Management: Use BeginTxFunc for automatic cleanup
- Resource Exhaustion: Configure pool sizes appropriately
- Error Handling: Check for specific error types
Monitoring and Debugging
type QueryTracer struct {
logger *log.Logger
}
func (qt *QueryTracer) TraceQueryStart(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryStartData) context.Context {
qt.logger.Printf("Query started: %s", data.SQL)
return ctx
}
func (qt *QueryTracer) TraceQueryEnd(ctx context.Context, conn *pgx.Conn, data pgx.TraceQueryEndData) {
qt.logger.Printf("Query completed in %s", data.Duration)
}
Integration with Other Tools
pgx works well with:
- sqlc for query generation
- migrate for database migrations
- postgresql-contrib for additional features
Conclusion
pgx is the go-to choice for PostgreSQL applications in Go when you need:
- Maximum performance
- PostgreSQL-specific features
- Advanced connection pooling
- Real-time notifications
- Batch operations
Resources
Happy coding with pgx! 🚀
Top comments (0)