DEV Community

Cover image for Optimising your OpenSearch Ingestion pipeline using AWS CDK
Peter McAree for AWS Community Builders

Posted on • Originally published at petermcaree.com

Optimising your OpenSearch Ingestion pipeline using AWS CDK

In my previous post, I delved into how you can use one of the latest features that AWS announced at re:Invent 2023, combining DynamoDB and OpenSearch providing an incredibly powerful zero-ETL ingestion mechanism for driving search and analytic applications.

This integration uses OpenSearch Ingestion Service (OSIS) pipelines under the hood to consume from the DynamoDB streams. The pricing model for the ingestion pipelines is separate from the OpenSearch domains, so you will be charged additionally on top of your instance running.

Depending on your use case, and workloads, you can tweak your ingestion pipelines to optimise for cost - especially for the likes of non-production environments.

Let's take a look at how we might achieve this.

💰 Cost Optimisation

OSIS pipelines are defined in the OpenSearch pricing page under the ingestion section. The charge for majority of regions is $0.24 per OCR per hour - whilst this is a pretty competitive price, depending on your use case, data throughput, how many pipelines you require, and how many environments you are running, the cost can scale up quite quickly.

For example, if you have many pipelines that push into indexes in OpenSearch, some of these may not need their data refreshed as often as others.

An optimisation that can be made, especially for non-production environments, is to only have your pipeline(s) run as often as you need them to (up to a max of 24 hours).

This is made possible by the fact that the DynamoDB stream will retain any data for up to 24 hours and whenever the ingestion pipeline begins, it will consume all of the items on the stream.

Combine this with EventBridge Scheduler, and we have a very powerful optimisation. EventBridge Scheduler allows us to emit StartPipeline and StopPipeline events to target individual pipelines within OSIS. This means that I can have one particular pipeline only run once per day (for an hour) and others running constantly.

✍️ Define some CDK

With the introduction of EventBridge Scheduler, it allows direct integration with thousands of APIs, including the OSIS pipeline APIs.

For my non-production environments, I had no need to run the pipeline for more than once per day, simply because I didn't need the data to be updated and ingested that often.

To run my target pipelines once per day, I just created two separate EventBridge scheduler rules - one for starting and one for stopping.

IAM Role

Firstly, we'll need to create an IAM role that gets assumed by the schedules and has permissions to interface with OSIS:

const eventBridgeOsisIamRole = new cdk.aws_iam.Role(
  this,
  "eventBridgeOsisIamRole", {
    assumedBy: new cdk.aws_iam.ServicePrincipal("scheduler.amazonaws.com"),
    managedPolicies: [
      cdk.aws_iam.ManagedPolicy.fromAwsManagedPolicyName(
        "AmazonOpenSearchIngestionFullAccess",
      ),
    ],
    roleName: "eventBridgeOsisIamRole",
  },
);
Enter fullscreen mode Exit fullscreen mode

EventBridge Schedules

Next we create our two schedules which target the specific OSIS pipeline by the unique name:

const createStartSchedule = (
  pipelineName: string,
  scheduleIamRole: cdk.aws_iam.Role,
) => {
  const CRON_START_PIPELINE_SCHEDULE = "cron(0 5 ? * MON-FRI *)"; // 5am Weekdays
  const startScheduleName = `${pipelineName}-start`;

  return new cdk.aws_scheduler.CfnSchedule(this, startScheduleName, {
    flexibleTimeWindow: {
      mode: "OFF",
    },
    name: startScheduleName,
    scheduleExpression: CRON_START_PIPELINE_SCHEDULE,
    target: {
      arn: "arn:aws:scheduler:::aws-sdk:osis:startPipeline",
      roleArn: scheduleIamRole.roleArn,
      input: JSON.stringify({
        PipelineName: pipelineName,
      }),
    },
  });
};

const createStopSchedule = (
  pipelineName: string,
  scheduleIamRole: cdk.aws_iam.Role,
) => {
  const CRON_STOP_PIPELINE_SCHEDULE = "cron(0 6 ? * MON-FRI *)"; // 6am Weekdays
  const stopScheduleName = `${pipelineName}-stop`;

  return new cdk.aws_scheduler.CfnSchedule(this, stopScheduleName, {
    flexibleTimeWindow: {
      mode: "OFF",
    },
    name: stopScheduleName,
    scheduleExpression: CRON_STOP_PIPELINE_SCHEDULE,
    target: {
      arn: "arn:aws:scheduler:::aws-sdk:osis:stopPipeline",
      roleArn: scheduleIamRole.roleArn,
      input: JSON.stringify({
        PipelineName: pipelineName,
      }),
    },
  });
};
Enter fullscreen mode Exit fullscreen mode

I've abstracted these out to separate methods so that they can be reused in the event that you have multiple pipelines - define the pipeline names and then iterate over them:

["first-osis-pipeline", "second-osis-pipeline"].map((pipelineName: string) => {
  createStartSchedule(pipelineName, eventBridgeOsisIamRole);
  createStopSchedule(pipelineName, eventBridgeOsisIamRole);
});
Enter fullscreen mode Exit fullscreen mode

These constructs will create the two schedules (at 5am & 6am respectively as I've defined in the cron expression), targeting the pipelines and starts/stops using the OSIS API.

And that's it! EventBridge scheduler makes this super easy since it has a first-class integration with the ingestion pipeline API.

Conclusion

Remember to cdk destroy once you are complete to destroy the CloudFormation stack in order to avoid any unnecessary costs if you're just testing this demo out!

  • Leveraging the fact that DynamoDB streams retain their data change for 24 hours, means that we can optimise our OpenSearch ingestion rate.
  • This approach works really well for non-production environments and use cases where you don't need to have real-time (or even slightly less frequent) ingestion into OpenSearch.
  • We can utilise the power of EventBridge Scheduler and the fact that it has a first-class integration with OSIS pipelines to start & stop our pipelines on a cron schedule.
  • Whenever the pipeline runs and there are items on the DynamoDB stream that haven't been read, it begins consuming and ingesting into OpenSearch.
  • This allows us to optimise our cost and ensure that we are only consuming OCUs whenever we need to.
  • Happy optimising!

Top comments (0)