DEV Community

Sateesh Madagoni
Sateesh Madagoni

Posted on • Edited on

Orchestrate AWS Lambdas using MongoDB - Part 2

Continuation to the first part. This post assumes you are a developer with working knowledge on AWS, Lambda, EventBridge, MongoDB, NodeJs.

Technical Implementation:

1. Design State-Machine
This step you need to figure out, how do you want to orchestrate your jobs and create an object like below.

const stateMachine = {
  id: '1',
  purposeId: 'catch_the_fish',
  currentPhase: 'thinking',
  phases: {
    thinking: {
      steps: {
        `thinking`: {
          jobs: ['buy_fishing_gear'],
        },
        buy_fishing_gear: {
          jobs: ['rent_a_boat'],
        },
        rent_a_boat: {
          jobs: ['go_to_fishing_ground'],
        },
        go_to_fishing_ground: {
          jobs: ['cast_fishing_pole', 'drink_beer', 'catch_fish'],
        },
        catch_fish: {
          jobs: ['go_home'],
        },
      },
      finished: [],
      next: 'end',
      waitFor: [
        'buy_fishing_gear',
        'rent_a_boat',
        'go_to_fishing_ground',
        'cast_fishing_pole',
        'drink_beer',
        'catch_fish',
        'go_home',
      ],
    },
  },
};

Enter fullscreen mode Exit fullscreen mode

In the above example we are running buy_fishing_gear after thinking job is finished in a sequential manner, but we run cast_fishing_pole, drink_beer and catch_fish in parallel after go_to_fishing_ground is successful.

2. Start the Process:
Creation of the statemachine could be anything from API to a cron job. Lets take API as an example as POST /process/catch-the-fish.

router.post('/process/catch-the-fish', async (req, res, next) => {
  const state = {}; // above mentioned state
  // Insert into state-machines
  const createdState = await db.collection('state-machines').insertOne(state);
  // Start the process by sending first event. As thinking is success we start the process of catching fish
  await db.collection('statuses').insertOne({
    stateId: createdState._id,
    status: 'success',
    job: 'thinking',
    date: new Date(),
    // additional parameters as needed
  });
});

Enter fullscreen mode Exit fullscreen mode

3. State Machine Job:

exports.handler = async (event, ctx, callback) => {
  // Mongodb Document from the trigger
  const document = event.detail.fullDocument;
  // Find the state machine
  const state = await db
    .collection('state-machines')
    .findOne({ _id: document.stateId });
  // if its ended do nothing
  if (state?.currentPhase === 'end') {
    callback();
  }
  // Update the job as finished for the state machine
  const currentPhase = state.phases[state.currentPhase];
  currentPhase.finished.push(document.job);
  await db
    .collection('state-machines')
    .findOneAndUpdate(
      { _id: document.stateId },
      { $set: { [`phases.${state.currentPhase}`]: currentPhase } }
    );
  const jobsRemaining = currentPhase.waitFor.filter(
    (s) => !currentPhase.finished.includes(s)
  );
  // If there are no remaining jobs in the current phase
  if (jobsRemaining.length === 0) {
    // Update the currentPhase to be the next Phase.
    await db.collection('state-machines').findOneAndUpdate(
      { _id: document.stateId },
      {
        $set: {
          currentPhase: currentPhase.next,
        },
      }
    );
    // If the next phase is not end trigger the phase.
    if (currentPhase.next !== 'end') {
      await db.collection('statuses').insertOne({
        ...document,
        job: `${currentPhase.next}Start`,
        status: 'success',
      });
    }
  }
};

Enter fullscreen mode Exit fullscreen mode

Above, whenever a job succeeds this will update the finished jobs and check if the phase is finished and move onto next phase until it meets the end phase.
4. Orchestrator Job:

exports.handler = async (event, ctx, callback) => {
  ctx.callbackWaitsForEmptyEventLoop = false;
  const document = event.detail.fullDocument;
  const state = await db
    .collection('pipeline-state-machines')
    .findOne({ _id: document.stateId });
  const currentPhase = state[state.currentPhase];
  if (!document.stateId) {
    console.log('Cannot execute the pipeline without the state id:', state);
    return { success: false };
  }
  if (state.currentPhase === 'end') {
    console.log('Cannot execute the pipeline without the state id:', state);
    return { success: false };
  }
  if (!currentPhase) {
    console.log('Cannot execute the pipeline without the state id:', state);
    return { success: false };
  }
  const step = currentPhase.steps[document.job];
  if (step && step.jobs && step.jobs.length > 0) {
    await Promise.all(
      step.jobs.map(async (job) => {
        const payload = {}
        const event = {
          FunctionName: job,
          InvocationType: 'Event',
          LogType: 'Tail',
          Payload: JSON.stringify(payload),
        };
        return lambda.invoke(event);
      })
    );
  }
  callback();
};
Enter fullscreen mode Exit fullscreen mode

The above job receives the success event and finds the next jobs to trigger and invoke them.
5. Notifications Job:
As above the job receives all the events so we can use the data and structure the message to send.

What if there are multiple phases:
Then we just have to add another phase to existing state machine configuration.

const stateMachine = {
  id: '1',
  purposeId: 'catch_the_fish',
  currentPhase: 'thinking',
  phases: {
    thinking: {
      // Prev things
    },
    cooking: {
      steps: {
        cookingStart: {
          jobs: ['clean'],
        },
        clean: {
          jobs: ['cook'],
        },
      },
      waitFor: ['cook', 'clean'],
      finished: [],
      next: 'end',
    },
  },
};

Enter fullscreen mode Exit fullscreen mode

I hope the above code and explanation gives you a way to implement your own solutions. For anymore details please do comment, I would be happy to help. Thanks.

Top comments (0)

Postmark Image

Speedy emails, satisfied customers

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up