As a best-selling author, I invite you to explore my books on Amazon. Don't forget to follow me on Medium and show your support. Thank you! Your support means the world!
Let’s talk about getting things done in the background. In any application, there are tasks you don’t want to handle right away—sending emails, processing uploads, generating reports. Making users wait for these is a poor experience. Instead, you schedule the work to happen later, smoothly and reliably. That’s where task scheduling and job queues come in.
I want to build a system in Go that can manage these background jobs efficiently. It needs to be fast, handle a lot of work, and never lose track of a task, even if the server restarts. It also needs to work across multiple servers, not just one. Let’s build that system, piece by piece.
At its heart, the system has a few key jobs: decide when to run a task (scheduling), decide which task to run next (queueing), actually run the task (workers), and remember the task so it’s not lost (storage). We’ll build a JobScheduler to orchestrate all of this.
First, we define what a job is. It’s more than just a function; it needs an identity, some data, and instructions on how to behave.
type Job struct {
ID string
Type string
Data map[string]interface{}
Priority int
Timeout time.Duration
RetryCount int
RetryAt time.Time
ScheduledAt time.Time
Execute func(context.Context) error
}
The Execute function is the core—it’s the actual work. The Priority field is crucial. Not all jobs are equal. A password reset email is urgent; a weekly analytics digest can wait. Our queue needs to understand this.
This is where a priority queue shines. Think of it like a hospital emergency room. Patients are sorted by the severity of their condition, not by arrival time. We’ll implement this using a min-heap (or max-heap, depending on your viewpoint) based on priority and scheduled time.
type ScheduledJob struct {
*Job
Deadline time.Time
AddedAt time.Time
}
type PriorityQueue struct {
mu sync.RWMutex
jobs []*ScheduledJob
index map[string]int
}
The index map lets us find a specific job quickly. The heapifyUp and heapifyDown methods maintain the heap property. When we add a job, we place it at the end and bubble it up. When we remove one, we take the root and sink the last element down.
func (pq *PriorityQueue) Push(job *ScheduledJob) {
pq.mu.Lock()
defer pq.mu.Unlock()
pq.jobs = append(pq.jobs, job)
pq.index[job.ID] = len(pq.jobs) - 1
pq.heapifyUp(len(pq.jobs) - 1)
}
func (pq *PriorityQueue) less(i, j int) bool {
if pq.jobs[i].Priority != pq.jobs[j].Priority {
return pq.jobs[i].Priority > pq.jobs[j].Priority // Higher number = higher priority
}
return pq.jobs[i].ScheduledAt.Before(pq.jobs[j].ScheduledAt)
}
This less function is the sorting rule. First, we compare priority. A job with priority 10 runs before a job with priority 1. If two jobs have the same priority, the one scheduled earlier runs first.
Now we have an ordered list. But jobs just sitting in a slice in memory are fragile. If our program crashes, they vanish. We need persistence. I’ll use a two-layer approach: a PostgreSQL database for permanent, queryable storage, and Redis for fast, in-memory caching and cross-server communication.
type JobStore struct {
db *sql.DB
redis *redis.Client
cache *JobCache
}
func (js *JobStore) SaveJob(job *Job) error {
data, err := json.Marshal(job.Data)
if err != nil {
return err
}
_, err = js.db.Exec(
`INSERT INTO jobs (id, type, data, priority, retry_count, created_at, scheduled_at)
VALUES ($1, $2, $3, $4, $5, $6, $7)`,
job.ID, job.Type, data, job.Priority, job.RetryCount, time.Now(), job.ScheduledAt,
)
if err == nil {
ctx := context.Background()
key := fmt.Sprintf("job:%s", job.ID)
js.redis.Set(ctx, key, data, 24*time.Hour)
}
return err
}
The database is our source of truth. Redis acts as a fast lookup layer and, as we’ll see later, a messaging bus between servers.
With jobs stored and ordered, we need muscle to process them. Enter the worker pool. We don’t want to run an unlimited number of jobs simultaneously; that could crash the server. A worker pool limits concurrency.
type WorkerPool struct {
workers int
jobChan chan *Job
resultChan chan *JobResult
semaphore chan struct{}
}
func (wp *WorkerPool) Start(ctx context.Context, config SchedulerConfig) {
for i := 0; i < wp.workers; i++ {
go wp.worker(ctx, i, config)
}
}
func (wp *WorkerPool) worker(ctx context.Context, id int, config SchedulerConfig) {
for {
select {
case job := <-wp.jobChan:
wp.semaphore <- struct{}{} // Acquire a slot
result := wp.executeJob(ctx, job, config)
<-wp.semaphore // Release the slot
wp.resultChan <- result
case <-ctx.Done():
return
}
}
}
The semaphore channel is a clever trick. Its capacity is the number of workers. To start work, a worker tries to send a value into it (blocking if it’s full). When done, it receives a value out, freeing a slot. This perfectly controls the number of concurrent jobs.
The executeJob method is where the user’s defined function runs. We wrap it with a timeout using context.WithTimeout. This prevents a single bad job from halting everything.
func (wp *WorkerPool) executeJob(ctx context.Context, job *Job, config SchedulerConfig) *JobResult {
start := time.Now()
ctx, cancel := context.WithTimeout(ctx, config.Timeout)
defer cancel()
err := job.Execute(ctx)
result := &JobResult{
JobID: job.ID,
Success: err == nil,
Error: err,
Duration: time.Since(start),
Completed: time.Now(),
}
if err != nil && job.RetryCount < config.MaxRetries {
result.Retry = true
job.RetryCount++
job.RetryAt = time.Now().Add(config.RetryDelay * time.Duration(job.RetryCount))
}
return result
}
Notice the retry logic. Network blips happen. A third-party API might be briefly down. We don’t give up immediately. We increment a counter and schedule the job for a later retry, often with a growing delay (exponential backoff).
Now, let’s wire these components together in the main JobScheduler. Its Start method kicks off several important loops.
func (js *JobScheduler) Start(ctx context.Context) {
go js.workers.Start(ctx, js.config)
go js.processQueue(ctx)
go js.processResults(ctx)
go js.handleDeadLetters(ctx)
}
The processQueue method is the brain’s timing loop. It wakes up periodically (every 100 milliseconds in our example) and tries to dispatch jobs that are ready to run to available workers.
func (js *JobScheduler) dispatchJobs() {
js.queue.mu.Lock()
defer js.queue.mu.Unlock()
now := time.Now()
dispatched := 0
for len(js.queue.jobs) > 0 && dispatched < js.workers.workers {
job := js.queue.jobs[0]
if job.ScheduledAt.After(now) {
break // Not time yet
}
if !job.Deadline.IsZero() && job.Deadline.Before(now) {
js.queue.Pop()
js.handleExpiredJob(job) // Too late, mark as failed
continue
}
select {
case js.workers.jobChan <- job.Job:
js.queue.Pop()
dispatched++
default:
return // Workers are busy, try next tick
}
}
}
This is careful work. We check if it’s the job’s scheduled time. We also check if it has passed its deadline—this prevents running obsolete jobs. We try to send the job to the worker channel. If the channel is full (workers are busy), we stop and wait for the next cycle. This back-pressure is essential for stability.
So far, this works on one machine. Modern applications run on many. We need distributed coordination. We use Redis Pub/Sub for this. When any scheduler instance adds a job, it publishes a message.
func (js *JobScheduler) publishJob(job *Job) error {
ctx := context.Background()
data, err := json.Marshal(job)
if err != nil {
return err
}
return js.redisClient.Publish(ctx, "jobs", data).Err()
}
Every other instance is subscribed to that channel. When a message arrives, it deserializes the job and adds it to its own local priority queue.
func (js *JobScheduler) subscribeToJobs(ctx context.Context) {
pubsub := js.redisClient.Subscribe(ctx, "jobs")
ch := pubsub.Channel()
for msg := range ch {
var job Job
if err := json.Unmarshal([]byte(msg.Payload), &job); err == nil {
js.ScheduleJob(&job) // Adds to queue and storage
}
}
}
This means the job is duplicated in every scheduler’s memory. But that’s okay, because only one will successfully execute it. They all race to dispatch it, but the first to send it to a free worker wins. The others will find it’s already been taken from their queue (via a shared ‘in-progress’ lock in Redis, a more advanced pattern, or simply by being marked complete in the shared database).
What happens after a job runs? The processResults loop listens on the resultChan.
func (js *JobScheduler) handleJobResult(result *JobResult) {
if result.Success {
js.store.DeleteJob(result.JobID) // Clean up
} else if result.Retry {
js.retryJob(result.JobID) // Re-schedule with delay
} else {
js.moveToDeadLetter(result.JobID, result.Error) // Manual inspection needed
}
}
Failed jobs that have exhausted their retries are moved to a dead-letter queue. This is a safety net. You can inspect these later to see what went wrong—maybe a bug in the job code or a permanently unavailable service.
We cannot manage what we cannot measure. So, we gather statistics atomically.
type SchedulerStats struct {
JobsScheduled uint64
JobsDispatched uint64
JobsCompleted uint64
JobsFailed uint64
JobsRetried uint64
QueueLength uint64
WorkersActive uint64
}
func (js *JobScheduler) GetStats() SchedulerStats {
return SchedulerStats{
JobsScheduled: atomic.LoadUint64(&js.stats.JobsScheduled),
QueueLength: uint64(len(js.queue.jobs)),
WorkersActive: uint64(len(js.workers.semaphore)),
// ... other fields
}
}
These metrics let you see the health of the system at a glance. Is the queue growing faster than workers can handle? You might need to add more workers. Is the failure rate high? There might be a problem with a service your jobs depend on.
Let’s see how to initialize the whole system.
func main() {
scheduler, err := NewJobScheduler(
"localhost:6379", // Redis
"postgres://user:pass@localhost/jobs", // PostgreSQL
10, // Number of workers
)
if err != nil {
log.Fatal(err)
}
ctx := context.Background()
scheduler.Start(ctx)
// Schedule 100 sample jobs
for i := 0; i < 100; i++ {
job := &Job{
ID: fmt.Sprintf("job-%d", i),
Type: "process_data",
Data: map[string]interface{}{"index": i},
Priority: i % 10, // Varying priorities
Timeout: 30 * time.Second,
Execute: func(ctx context.Context) error {
time.Sleep(100 * time.Millisecond) // Simulate work
return nil
},
}
scheduler.ScheduleJob(job)
}
// Monitor every 5 seconds
go func() {
ticker := time.NewTicker(5 * time.Second)
for range ticker.C {
stats := scheduler.GetStats()
fmt.Printf("Stats: %d scheduled, %d completed, %d in queue\n",
stats.JobsScheduled, stats.JobsCompleted, stats.QueueLength)
}
}()
select {} // Run forever
}
In a production system, you’d run this on multiple servers behind a load balancer. They’d all connect to the same Redis and PostgreSQL instances, forming a cluster.
There are always considerations for production. You need to add thorough logging, so you can trace a job’s journey. You should implement idempotency—ensuring that running the same job twice doesn’t cause problems. You might also consider job dependencies (run Job B only after Job A succeeds).
This architecture keeps the scheduling logic simple and fast. The priority queue operations are efficient. The worker pool prevents resource overload. Persistence guarantees safety. The distributed layer, while adding some complexity, provides the scalability needed for modern applications. It’s a balance of speed, reliability, and clarity. By building it this way, you create a robust backbone for all the asynchronous work your application needs to do, letting the main part of your app stay fast and responsive for users.
📘 Checkout my latest ebook for free on my channel!
Be sure to like, share, comment, and subscribe to the channel!
101 Books
101 Books is an AI-driven publishing company co-founded by author Aarav Joshi. By leveraging advanced AI technology, we keep our publishing costs incredibly low—some books are priced as low as $4—making quality knowledge accessible to everyone.
Check out our book Golang Clean Code available on Amazon.
Stay tuned for updates and exciting news. When shopping for books, search for Aarav Joshi to find more of our titles. Use the provided link to enjoy special discounts!
Our Creations
Be sure to check out our creations:
Investor Central | Investor Central Spanish | Investor Central German | Smart Living | Epochs & Echoes | Puzzling Mysteries | Hindutva | Elite Dev | Java Elite Dev | Golang Elite Dev | Python Elite Dev | JS Elite Dev | JS Schools
We are on Medium
Tech Koala Insights | Epochs & Echoes World | Investor Central Medium | Puzzling Mysteries Medium | Science & Epochs Medium | Modern Hindutva
Top comments (0)