DEV Community

Cover image for Advanced Job Scheduling System - Part 2
ZeeshanAli-0704
ZeeshanAli-0704

Posted on

Advanced Job Scheduling System - Part 2

System Design Blog: Advanced Job Scheduling System - Part 2: Implementation and Advanced Features

1. Introduction

In Part 1 of this series, we explored the fundamental concepts, requirements, high-level design, and operational flow of a general-purpose Job Scheduling System. We discussed how such a system manages job submission, prioritization, execution, and failure handling in various computing environments. Now, in Part 2, we dive into the practical implementation of this system using Java as the programming language and Redis as the underlying storage and priority queuing mechanism. This blog focuses on code examples, concurrency control with locking, and advanced features like delayed job processing, retry mechanisms with backoff policies, and fairness through aging. Our goal is to provide a robust, scalable solution that aligns with enterprise best practices for reliability, performance, and security.

2. Technology Stack and Rationale

To implement the Job Scheduling System, we’ve chosen the following technologies:

  • Java: A widely-used, platform-independent language with strong support for concurrency, scalability, and enterprise applications. It aligns well with modular design and robust error handling.
  • Redis: An in-memory data store that supports key-value storage, Sorted Sets for priority queuing, and atomic operations for concurrency control. Redis offers low-latency access (O(log n) for Sorted Set operations) and persistence capabilities, making it ideal for job scheduling.
  • Jedis: A Java client for Redis, providing a simple and efficient way to interact with Redis from Java code.

Why Redis?

  • Performance: Redis Sorted Sets enable fast priority-based job retrieval with O(log n) complexity.
  • Atomic Operations: Built-in commands like SETNX and Lua scripting support atomic updates, critical for concurrency control.
  • Persistence: Redis can persist data to disk, ensuring job metadata isn’t lost during failures.
  • Scalability: Redis supports clustering and replication, allowing the system to handle large job volumes and high worker concurrency.

3. Schema Design

Before diving into the code, let’s define the data structures used in Redis to store and manage jobs. These structures map to the high-level design components (Persistent Job Store and Priority Index) from Part 1.

Job Metadata (Redis Hash):

  • Key: job:<jobId>
  • Fields:
    • ownerId: String (identifies the job owner for tracking or isolation)
    • priority: Integer (base priority for execution order)
    • effectivePriority: Float (adjusted priority with aging to prevent starvation)
    • status: String (PENDING, RUNNING, COMPLETED, FAILED, RETRYING)
    • attempts: Integer (number of execution attempts made)
    • maxAttempts: Integer (maximum retries before marking as failed)
    • delayUntil: Long (timestamp for delayed execution)
    • createdAt: Long (timestamp of job creation)
    • updatedAt: Long (timestamp of last update)
    • visibilityTimeout: Long (timestamp until which job is invisible to other workers)
    • jobDetails: String (JSON payload, should be encrypted in production)
    • idempotencyKey: String (optional, for exactly-once delivery deduplication)

Redis Sorted Sets for Priority Indexing:

  • jobs:ready: Score = effectivePriority, Member = jobId (jobs ready for immediate execution)
  • jobs:delayed: Score = delayUntil, Member = jobId (jobs scheduled for future execution)
  • jobs:retry: Score = retryAt, Member = jobId (jobs awaiting retry after failure)

4. Concurrency and Locking Concepts

Implementing a distributed Job Scheduling System requires careful handling of concurrency to prevent issues like race conditions and duplicate job processing. Below are the key challenges and solutions:

Concurrency Challenges:

  • Race Conditions: Multiple workers may attempt to reserve the same job simultaneously, leading to duplicate execution.
  • Consistency Issues: Updates to job status or priority must be atomic to maintain consistency between metadata and priority queues.
  • Scalability with Contention: High contention during job reservation can degrade performance under heavy load.
  • Deadlocks and Lock Loss: Worker failures mid-processing can orphan locks, blocking jobs from being re-assigned.

Locking Mechanisms:

  • Redis Atomic Operations: Use commands like SETNX (set if not exists) for lightweight distributed locking and ZREM for atomic removal from queues.
  • Visibility Timeout: Reserve jobs for a fixed duration (e.g., 30 seconds); if a worker fails to acknowledge completion, the job becomes visible again to others.
  • Distributed Locks: For stronger guarantees, use the Redlock algorithm across multiple Redis instances (conceptual here, not implemented in code for simplicity).
  • Lua Scripts: Execute complex operations (e.g., fetch-and-reserve) atomically using Redis Lua scripts to avoid race conditions (noted as a future enhancement).

5. Implementation Details with Code

Below is a Java implementation of the Job Scheduling System using the Jedis client for Redis interactions. The code focuses on core operations like job submission, reservation, retry handling, and fairness, with detailed comments for clarity.

Dependencies:
Add Jedis to your project (Maven dependency):

<dependency>
    <groupId>redis.clients</groupId>
    <artifactId>jedis</artifactId>
    <version>4.4.3</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

JobScheduler Class (Core Implementation):

package org.example.scheduler;

import redis.clients.jedis.Jedis;
import redis.clients.jedis.JedisPool;
import redis.clients.jedis.JedisPoolConfig;
import java.time.Instant;
import java.util.*;

/**
 * JobScheduler manages the lifecycle of jobs in a distributed scheduling system.
 * It uses Redis for persistent storage and priority queuing, ensuring safe concurrent access
 * through locking and visibility timeouts.
 */
public class JobScheduler {
    private final JedisPool jedisPool; // Connection pool for Redis to manage resources efficiently
    private final long visibilityTimeoutSeconds = 30; // Duration a job is invisible after reservation
    private final String READY_QUEUE = "jobs:ready"; // Sorted Set for ready jobs
    private final String DELAYED_QUEUE = "jobs:delayed"; // Sorted Set for delayed jobs
    private final String RETRY_QUEUE = "jobs:retry"; // Sorted Set for retry jobs
    private final String JOB_PREFIX = "job:"; // Prefix for job metadata keys
    private final String LOCK_PREFIX = "lock:job:"; // Prefix for lock keys
    private final long LOCK_TIMEOUT_SECONDS = 10; // Timeout for locks to prevent permanent holding
    private final double AGE_FACTOR = 0.1; // Priority boost per second of waiting

    /**
     * Constructor initializes the Redis connection pool with configurable host and port.
     * @param redisHost Redis server hostname
     * @param redisPort Redis server port
     */
    public JobScheduler(String redisHost, int redisPort) {
        JedisPoolConfig config = new JedisPoolConfig();
        config.setMaxTotal(128); // Maximum number of connections
        config.setMaxIdle(16); // Maximum idle connections
        config.setMinIdle(8); // Minimum idle connections
        this.jedisPool = new JedisPool(config, redisHost, redisPort);
    }

    /**
     * Submits a new job to the system with specified details and optional delay.
     * @param jobId Unique identifier for the job
     * @param ownerId Owner of the job for tracking
     * @param priority Base priority for execution ordering
     * @param delayUntilSeconds Timestamp (epoch seconds) when the job should become ready
     */
    public void submitJob(String jobId, String ownerId, int priority, long delayUntilSeconds) {
        try (Jedis jedis = jedisPool.getResource()) {
            long now = Instant.now().getEpochSecond(); // Current timestamp
            Map<String, String> jobData = new HashMap<>();
            jobData.put("ownerId", ownerId);
            jobData.put("priority", String.valueOf(priority));
            jobData.put("effectivePriority", String.valueOf(priority)); // Initially same as base
            jobData.put("status", "PENDING"); // Initial state
            jobData.put("attempts", "0"); // No attempts yet
            jobData.put("maxAttempts", "3"); // Default max retries
            jobData.put("delayUntil", String.valueOf(delayUntilSeconds));
            jobData.put("createdAt", String.valueOf(now));
            jobData.put("updatedAt", String.valueOf(now));
            jobData.put("visibilityTimeout", "0"); // Not reserved
            jobData.put("jobDetails", "{}"); // Placeholder (encrypt in production)
            jedis.hset(JOB_PREFIX + jobId, jobData); // Store metadata
            if (delayUntilSeconds > now) {
                jedis.zadd(DELAYED_QUEUE, delayUntilSeconds, jobId); // Enqueue as delayed
            } else {
                jedis.zadd(READY_QUEUE, priority, jobId); // Enqueue as ready
            }
        } catch (Exception e) {
            System.err.println("Error submitting job " + jobId + ": " + e.getMessage());
            throw new RuntimeException("Failed to submit job", e);
        }
    }

    /**
     * Background process to move delayed jobs to ready queue when delay expires.
     */
    public void processDelayedJobs() {
        try (Jedis jedis = jedisPool.getResource()) {
            long now = Instant.now().getEpochSecond();
            Set<String> delayedJobs = jedis.zrangeByScore(DELAYED_QUEUE, 0, now);
            for (String jobId : delayedJobs) {
                jedis.zrem(DELAYED_QUEUE, jobId);
                String priorityStr = jedis.hget(JOB_PREFIX + jobId, "priority");
                if (priorityStr != null) {
                    double priority = Double.parseDouble(priorityStr);
                    jedis.zadd(READY_QUEUE, priority, jobId);
                }
            }
        } catch (Exception e) {
            System.err.println("Error processing delayed jobs: " + e.getMessage());
        }
    }

    /**
     * Fetches jobs for workers, ensuring atomic reservation with locks.
     * @param limit Maximum number of jobs to fetch
     * @return List of job metadata maps for reserved jobs
     */
    public List<Map<String, String>> fetchJobs(int limit) {
        try (Jedis jedis = jedisPool.getResource()) {
            List<Map<String, String>> reservedJobs = new ArrayList<>();
            Set<String> topJobs = jedis.zrevrange(READY_QUEUE, 0, limit - 1); // Highest priority first
            long now = Instant.now().getEpochSecond();
            long timeout = now + visibilityTimeoutSeconds;
            for (String jobId : topJobs) {
                String lockKey = LOCK_PREFIX + jobId;
                if (jedis.setnx(lockKey, String.valueOf(now)) == 1) { // Attempt lock
                    jedis.expire(lockKey, LOCK_TIMEOUT_SECONDS); // Set lock expiration
                    String status = jedis.hget(JOB_PREFIX + jobId, "status");
                    if ("PENDING".equals(status)) {
                        jedis.hset(JOB_PREFIX + jobId, "status", "RUNNING");
                        jedis.hset(JOB_PREFIX + jobId, "visibilityTimeout", String.valueOf(timeout));
                        jedis.zrem(READY_QUEUE, jobId); // Remove from queue
                        reservedJobs.add(jedis.hgetAll(JOB_PREFIX + jobId));
                    }
                    jedis.del(lockKey); // Release lock
                    if (reservedJobs.size() >= limit) break;
                }
            }
            return reservedJobs;
        } catch (Exception e) {
            System.err.println("Error fetching jobs: " + e.getMessage());
            return Collections.emptyList();
        }
    }

    /**
     * Acknowledges job completion or failure after processing.
     * @param jobId Unique identifier of the job
     * @param success True if completed successfully, false if failed
     */
    public void acknowledgeJob(String jobId, boolean success) {
        try (Jedis jedis = jedisPool.getResource()) {
            if (success) {
                jedis.hset(JOB_PREFIX + jobId, "status", "COMPLETED");
                jedis.hset(JOB_PREFIX + jobId, "visibilityTimeout", "0");
            } else {
                handleFailure(jedis, jobId);
            }
        } catch (Exception e) {
            System.err.println("Error acknowledging job " + jobId + ": " + e.getMessage());
        }
    }

    /**
     * Handles job failure with retry logic and exponential backoff.
     * @param jedis Redis connection instance
     * @param jobId Unique identifier of the failed job
     */
    private void handleFailure(Jedis jedis, String jobId) {
        String attemptsStr = jedis.hget(JOB_PREFIX + jobId, "attempts");
        String maxAttemptsStr = jedis.hget(JOB_PREFIX + jobId, "maxAttempts");
        int attempts = Integer.parseInt(attemptsStr);
        int maxAttempts = Integer.parseInt(maxAttemptsStr);
        attempts++;
        jedis.hset(JOB_PREFIX + jobId, "attempts", String.valueOf(attempts));
        jedis.hset(JOB_PREFIX + jobId, "visibilityTimeout", "0");
        if (attempts < maxAttempts) {
            long baseDelay = 5; // Base delay of 5 seconds
            long retryDelay = baseDelay * (1 << attempts); // Exponential backoff
            long retryAt = Instant.now().getEpochSecond() + retryDelay;
            jedis.hset(JOB_PREFIX + jobId, "status", "RETRYING");
            jedis.zadd(RETRY_QUEUE, retryAt, jobId);
        } else {
            jedis.hset(JOB_PREFIX + jobId, "status", "FAILED");
            jedis.sadd("jobs:dlq", jobId); // Move to Dead Letter Queue
        }
    }

    /**
     * Background process to move retry jobs to ready queue when delay expires.
     */
    public void processRetries() {
        try (Jedis jedis = jedisPool.getResource()) {
            long now = Instant.now().getEpochSecond();
            Set<String> retryJobs = jedis.zrangeByScore(RETRY_QUEUE, 0, now);
            for (String jobId : retryJobs) {
                jedis.zrem(RETRY_QUEUE, jobId);
                jedis.hset(JOB_PREFIX + jobId, "status", "PENDING");
                String priorityStr = jedis.hget(JOB_PREFIX + jobId, "priority");
                if (priorityStr != null) {
                    double priority = Double.parseDouble(priorityStr);
                    jedis.zadd(READY_QUEUE, priority, jobId);
                }
            }
        } catch (Exception e) {
            System.err.println("Error processing retries: " + e.getMessage());
        }
    }

    /**
     * Background process to apply aging, increasing priority over time.
     */
    public void applyAging() {
        try (Jedis jedis = jedisPool.getResource()) {
            long now = Instant.now().getEpochSecond();
            Set<String> pendingJobs = jedis.zrange(READY_QUEUE, 0, -1);
            for (String jobId : pendingJobs) {
                String createdAtStr = jedis.hget(JOB_PREFIX + jobId, "createdAt");
                String priorityStr = jedis.hget(JOB_PREFIX + jobId, "priority");
                if (createdAtStr != null && priorityStr != null) {
                    long createdAt = Long.parseLong(createdAtStr);
                    double basePriority = Double.parseDouble(priorityStr);
                    long waitSeconds = now - createdAt;
                    double effectivePriority = basePriority + AGE_FACTOR * waitSeconds;
                    jedis.hset(JOB_PREFIX + jobId, "effectivePriority", String.valueOf(effectivePriority));
                    jedis.zadd(READY_QUEUE, effectivePriority, jobId);
                }
            }
        } catch (Exception e) {
            System.err.println("Error applying aging: " + e.getMessage());
        }
    }

    /**
     * Background process to re-queue jobs with expired visibility timeouts.
     */
    public void processExpiredTimeouts() {
        try (Jedis jedis = jedisPool.getResource()) {
            long now = Instant.now().getEpochSecond();
            Set<String> allJobs = jedis.keys(JOB_PREFIX + "*");
            for (String jobKey : allJobs) {
                String status = jedis.hget(jobKey, "status");
                String timeoutStr = jedis.hget(jobKey, "visibilityTimeout");
                if ("RUNNING".equals(status) && timeoutStr != null) {
                    long timeout = Long.parseLong(timeoutStr);
                    if (timeout > 0 && timeout < now) {
                        jedis.hset(jobKey, "status", "PENDING");
                        jedis.hset(jobKey, "visibilityTimeout", "0");
                        String jobId = jobKey.substring(JOB_PREFIX.length());
                        String priorityStr = jedis.hget(jobKey, "effectivePriority");
                        if (priorityStr != null) {
                            double priority = Double.parseDouble(priorityStr);
                            jedis.zadd(READY_QUEUE, priority, jobId);
                        }
                    }
                }
            }
        } catch (Exception e) {
            System.err.println("Error processing expired timeouts: " + e.getMessage());
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

Usage Example:

public class JobSchedulerDemo {
    public static void main(String[] args) {
        JobScheduler scheduler = new JobScheduler("localhost", 6379);
        // Submit a job for immediate execution
        scheduler.submitJob("job1", "user1", 10, Instant.now().getEpochSecond());
        // Submit a delayed job (10 seconds delay)
        scheduler.submitJob("job2", "user2", 5, Instant.now().getEpochSecond() + 10);
        // Worker fetching jobs
        List<Map<String, String>> jobs = scheduler.fetchJobs(1);
        for (Map<String, String> job : jobs) {
            System.out.println("Processing job: " + job.get("jobId"));
            // Simulate success
            scheduler.acknowledgeJob(job.get("jobId"), true);
        }
    }
}
Enter fullscreen mode Exit fullscreen mode

6. Implementation Notes

  • Concurrency Control: The fetchJobs method uses SETNX for distributed locking, ensuring only one worker reserves a job. Locks expire automatically (LOCK_TIMEOUT_SECONDS) to prevent deadlocks from worker crashes.
  • Visibility Timeout: The processExpiredTimeouts method re-queues jobs if workers fail to acknowledge completion within the timeout, maintaining reliability.
  • Retry with Backoff: Failed jobs are retried with exponential backoff (baseDelay * 2^attempts) in handleFailure, spacing out retries to avoid system overload.
  • Aging for Fairness: The applyAging method boosts effectivePriority based on wait time, preventing starvation of low-priority jobs.
  • Error Handling: Basic try-catch blocks handle Redis connection or parsing errors. In production, integrate a proper logging framework (e.g., SLF4J) and metrics for monitoring.

7. Advanced Features and Challenges

  • Delayed Jobs: Managed via a separate jobs:delayed Sorted Set and moved to jobs:ready by processDelayedJobs when the delay expires, ensuring efficient handling without frequent polling.
  • Retry Overload: Exponential backoff with a cap on attempts (maxAttempts) prevents retry storms. Failed jobs move to a Dead Letter Queue (jobs:dlq) for analysis.
  • Lock Contention: Short lock timeouts and potential retry backoff (not shown) mitigate contention. Future enhancements could use Lua scripts for fully atomic operations.
  • Scalability: Redis clustering and replication can scale the system for millions of jobs. Background tasks (processRetries, applyAging) should run on separate threads or nodes in production.

8. Security Considerations

  • Encryption: Sensitive data (jobDetails) should be encrypted before storage using AES-256 or similar standards. Key management should follow enterprise guidelines.
  • Secure Connections: Use TLS for Redis connections and Jedis configuration in production to protect data in transit.
  • Input Validation: Sanitize inputs like jobId and ownerId to prevent injection or key collisions (not shown in code for brevity).
  • Access Control: Implement tenant isolation by prefixing keys with ownerId and restrict access via Redis authentication or IAM policies.

9. Monitoring and Maintenance

  • Metrics: Track job submission rate, reservation latency, retry rate, and queue backlog using tools like Prometheus.
  • Alerts: Set thresholds for queue length or failure rates to detect issues early.
  • Logging: Centralize logs for debugging and compliance using frameworks like ELK Stack.
  • Background Tasks: Schedule methods like processDelayedJobs, processRetries, and applyAging via cron jobs or a task scheduler to maintain system health.

10. Conclusion

This Part 2 of the Job Scheduling System blog has provided a detailed implementation using Java and Redis, translating the conceptual design from Part 1 into a working solution. By leveraging Redis Sorted Sets for priority queuing and atomic operations for concurrency control, we’ve built a system that supports concurrent workers, delayed jobs, retries with backoff, and fairness through aging. Security and scalability considerations ensure the system aligns with enterprise standards. Future enhancements could include full Lua script integration for atomicity, Redlock for distributed locking, or AI-based priority optimization. Together, Parts 1 and 2 offer a comprehensive guide to designing and implementing a robust Job Scheduling System for general-purpose use.


More Details:

Get all articles related to system design
Hastag: SystemDesignWithZeeshanAli

systemdesignwithzeeshanali

Git: https://github.com/ZeeshanAli-0704/SystemDesignWithZeeshanAli

Top comments (0)