DEV Community

Sergey Telpuk
Sergey Telpuk

Posted on

Send `Job` from Node-app to PHP-app via Queue.

Hello guys!

In this article, I wanna share one very interesting and fast solution to executing jobs onto PHP side. Spiral Framework allows us to use jobs in an easy way. The whole done repository is here.

At my contrived app, Nodejs(Nestjs) sends the job to PHP.
Look at Nodejs job:

import {IJob} from "../interfaces/job.interface";

export class Ping implements IJob {
    readonly JOB = 'App.Job.Amqp.Ping'; //namespace(path) to job on PHP side

    constructor(
        private readonly value: string
    ) {

    }

}

job.interface


export interface IJob {
    JOB: string;
}

This dto will be sent to PHP-app via the queue (RabbitMQ).
Look at JobPusherService:

import {Injectable} from "@nestjs/common";
import uuid4 from "uuid/v4";
import {IJob} from "./interfaces/job.interface";
import amqp from "amqplib";

@Injectable()
export class JobPusherService {
    readonly CONNECT = 'amqp://rabbit:rabbit@rabbitmq:5672';

    async send(job: IJob) {
        const server = await amqp.connect(this.CONNECT);
        const channel = await server.createChannel();

        const jobExecuting = job.JOB;

        delete job.JOB;

        channel.sendToQueue(
            'contrived_queue',
            Buffer.from(
                JSON.stringify(job)
            ),
            {
                headers: {
                    "rr-id": uuid4(),
                    "rr-job": jobExecuting,
                    "rr-attempt": 1000_000_000_000,
                    "rr-maxAttempts": 1000_000_000_000,
                    "rr-timeout": 1000_000_000_000,
                    "rr-delay": 1000_000_000_000,
                    "rr-retryDelay": 1000_000_000_000,
                }
            }
        );
    }
}

That one wraps up dto with necessary headers and push it into the queue.

Move up to PHP-app. Underhood, Spiral Framework uses roadrunner. All necessary configuration is at .rr.yml:

# http service configuration.
http:
  address: 0.0.0.0:8080

  http2:
    # enable HTTP/2, only with TSL, enabled by default
    enabled: true

  workers:
    command: "php app.php"

    # max transfer channels, default 128
  maxConcurrentStreams: 128

  ssl:
    # force redirect to https connection
    redirect: true

    # custom https port (default 443)
    port:  443

    # ssl cert
    cert: ./certs/server.crt

    # ssl private key
    key: ./certs/server.key

# queue and jobs
jobs:
  amqp:
    addr: amqp://rabbit:rabbit@rabbitmq:5672/

  dispatch:
    app-job-amqp*.pipeline: amqp

  pipelines:
    amqp:
      broker: amqp
      queue:  contrived_queue

  consume: ["amqp"]

  workers:
    command: "php app.php"
    pool.numWorkers: 10

# serve static files
static:
  dir:    "public"
  forbid: [".php", ".htaccess"]

# control the max memory usage
limit:
  services:
    http.maxMemory: 100
    jobs.maxMemory: 100

rpc:
  listen: tcp://php-app:6001 #for watcher

For more details information look at this doc.

Job looks like:

/**
 * (QueueInterface)->push(new PingJob(["value"=>"my value"]));
 */
class Ping extends AbstractJob
{
    /**
     * @param string $id
     * @param string $value
     */
    public function do(string $id, string $value)
    {

        // do something
        error_log("pong by {$id}, {$value}");
    }
}

As you can see, It's very easy and quite fast to execute jobs.

To be continued...

Latest comments (0)