Have you ever wished some queued jobs would be processed way ahead others? For example, if you want your VIP client's transanctions to be completed before everyone else's, this article might help you.
Simple approach β¨
If you have only a few tiers of clients, you might want to consider assigning them to a different queue so that VIP clients won't be blocked by other customers:
<?php
namespace App\Jobs;
use Illuminate\Bus\Queueable;
use Illuminate\Contracts\Queue\ShouldQueue;
use Illuminate\Foundation\Bus\Dispatchable;
use Illuminate\Queue\InteractsWithQueue;
use Illuminate\Queue\SerializesModels;
class ProcessJob implements ShouldQueue
{
use Dispatchable, InteractsWithQueue, Queueable, SerializesModels;
public function __construct(private Customer $customer)
{
$this->onQueue($this->determineQueue());
}
/**
* Determine which queue to use based on client type.
*/
private function determineQueue(): string
{
return $customer->is_vip ? 'vip' : 'default';
}
public function handle(): void
{
// Job logic
}
}
βοΈ Then running separate workers for both queues:
php artisan queue:work --queue=default
php artisan queue:work --queue=vip
β This is really easy and will work with any queue driver, like Redis, SQS, Database, etc.
β Unfortunately, this is not really scalable if you wish to have more control over the order of queued jobs or have many different tiers of users.
Queue prioritization using custom queue driver π¨π»βπ»
If you wish to control the order of transactions in more sophisticated way, for example based on a transaction amount or any other factor, consider creating a custom queue driver.
1. Add priority
column to jobs
table
To start, add a priority
column in your jobs
table. You can do this by creating a migration:
php artisan make:migration add_priority_to_jobs_table
Update the migration file:
public function up()
{
Schema::table('jobs', function (Blueprint $table) {
$table->unsignedInteger('priority')->default(0); // Default priority is 0
});
}
public function down()
{
Schema::table('jobs', function (Blueprint $table) {
$table->dropColumn('priority');
});
}
Run the migration:
php artisan migrate
2. Create a Custom Queue Connector
Create a custom queue connector that extends Laravel's database queue connector. In this custom connector, you'll override the push
and pop
methods to handle job priority:
<?php
namespace App\Queue;
use Illuminate\Queue\DatabaseQueue;
use Illuminate\Contracts\Queue\Job;
use Illuminate\Queue\Jobs\DatabaseJob;
use Illuminate\Queue\Jobs\DatabaseJobRecord;
class PriorityDatabaseQueue extends DatabaseQueue
{
/**
* Push a new job onto the queue.
*
* @param string $queue
* @param string $payload
* @param string|null $queue
* @return mixed
*/
public function push($job, $data = '', $queue = null)
{
$queue = $this->getQueue($queue);
$payload = $this->createPayload($job, $queue, $data);
return $this->database->table($this->table)->insertGetId([
'queue' => $queue,
'payload' => $payload,
'attempts' => 0,
'reserved' => 0,
'reserved_at' => null,
'available_at' => $this->availableAt(),
'created_at' => $this->currentTime(),
'priority' => $payload['data']['priority'] ?? 0, // Set the priority
]);
}
/**
* Pop the next job off of the queue with prioritization.
*
* @param string|null $queue
* @return \Illuminate\Contracts\Queue\Job|null
*/
public function pop($queue = null)
{
$queue = $this->getQueue($queue);
$job = $this->database->table($this->table)
->where('queue', $queue)
->where('reserved', 0)
->orderBy('priority', 'desc') // Prioritize jobs based on their priority level
->orderBy('id', 'asc')
->lockForUpdate()
->first();
if (!is_null($job)) {
return new DatabaseJob(
$this->container, $this, new DatabaseJobRecord((object) $job)
);
}
}
}
3. Create a Custom Connector Class
Create a custom connector class to register the custom queue connector:
<?php
namespace App\Queue\Connectors;
use Illuminate\Queue\Connectors\DatabaseConnector;
use App\Queue\PriorityDatabaseQueue;
class PriorityDatabaseConnector extends DatabaseConnector
{
public function connect(array $config)
{
return new PriorityDatabaseQueue(
$this->connections->connection($config['connection'] ?? null),
$config['table'],
$config['queue'],
$config['retry_after'] ?? 60
);
}
}
4. Register the Custom Connector
Register your custom connector in a service provider.
use Illuminate\Support\ServiceProvider;
use App\Queue\Connectors\PriorityDatabaseConnector;
class AppServiceProvider extends ServiceProvider
{
public function register(): void
{
$this->app->resolving('queue', function ($queue) {
$queue->addConnector('prioritydatabase', function () {
return new PriorityDatabaseConnector($this->app['db']);
});
});
}
}
From now on, using a prioritydatabase
driver will use our custom logic to push and fetch jobs to the queue based on their priority.
5. Configure Queue Connection
Update your config/queue.php
to use the new driver:
'connections' => [
'prioritydatabase' => [
'driver' => 'prioritydatabase',
'table' => 'jobs',
'queue' => 'priority',
'retry_after' => 60,
],
],
6. Set Job Priority When Dispatching Jobs
When dispatching jobs, set the priority level. You can do this by adding a $priority
field to the job's payload.
class SomeJob implements ShouldQueue
{
public function __construct(public int $priority = 0)
{
}
public function handle(): void
{
// Job logic
}
}
// Dispatching the job with priority
$job = new SomeJob(10); // Priority 10
dispatch($job)->onQueue('priority');
With these steps, you will have a priority queue setup in Laravel where jobs are stored with a priority
column in the jobs
table. The custom queue connector ensures that jobs are popped from the queue based on their priority, allowing higher priority jobs to be processed first.
Feel free to customize the way you pop the jobs based on your needs.
β
More control over the job processing order.
β Changing the queue driver requires major refactor to adapt the popping logic. Not all drivers might be supported.
Summary
π΅π»ββοΈ Digging deep in Laravel queue connectors could be fun and it's interesting to learn how it works internally so that we are able to customize it to our needs.
While the DB driver is easy to understand and customize, I strongly suggest not to use database driver in production. If you wish to maintain scalability and avoid lock issues, I recommend creating a similar connector based on Redis with ZADD
and ZRANGE/ZREM
to handle similar logic instead.
Top comments (0)