DEV Community

Cover image for Integrate CRUD APIs with Amazon OpenSearch
Asadullah Al Galib
Asadullah Al Galib

Posted on • Originally published at aws.plainenglish.io

Integrate CRUD APIs with Amazon OpenSearch

Learn how to create CRUD APIs using API Gateway, Lambda, and OpenSearch.


We will pick it up from our previous post, Create a knowledge base using Amazon OpenSearch. In the previous post, we created the SAM template containing OpenSearch, API Gateway, and Lambda configuration for the CRUD APIs. In this post, we will add the implementation of all those APIs and finally deploy the stack to AWS. At the end of this tutorial, we will have a working knowledge base in our hands.

This post consists of three parts, outlined as follows:

  1. Explanation of the Lambda function configuration for each API in the template.yaml file.
  2. Implementation code for each of the Lambda functions.
  3. Deployment of the stack to AWS.

Let’s go! 🔥


Part 1:

Refer to the Previous post for the SAM configuration of APIs. E.g. here, we are going to take a look at the definition of Create Document API.

  CreateDocumentFunction:
    Type: AWS::Serverless::Function
    Properties:
      CodeUri: lambdas/
      Handler: create_doc.lambda_handler
      Role: !GetAtt CustomMainRole.Arn
      Environment:
        Variables:
          OPEN_SEARCH_SECRET: !Ref OpenSearchSecret
          OPEN_SEARCH_DOMAIN_ENDPOINT: !GetAtt OpenSearchServiceDomain.DomainEndpoint
      Events:
        PingRootEvent:
          Type: Api
          Properties:
            Path: /{index_name}/kb-docs
            Method: post
            RestApiId: !Ref DefaultApi
Enter fullscreen mode Exit fullscreen mode

So, what’s happening here? I will explain the most important parts.

  1. CodeUri: This denotes the directory where the handler module is located for this Lambda function. In our case, it is the lambdas/ directory.
  2. Handler: This tells which handler module and function to call when this Lambda is triggered. In our case, it is the function called lambda_handler inside create_doc.py module.
  3. Environment: Here we pass the environment variables that we would like to access from within our handler module, i.e. create_doc.py
  4. Type: This says for which event this Lambda will be triggered. In our case, we want to run this Lambda when an API endpoint is hit, so the value is set to Api.
  5. Path: This denotes the API endpoint associated with this Lambda. The Base URL of an API Gateway will be prepended to this path. The part(s) inside {} denotes path parameters, i.e. values that users will provide dynamically. In our case, it will be the name of the OpenSearch index.
  6. Method: This is the HTTP method that will be accessible for this endpoint. In our case, this endpoint will only allow POST requests.
  7. RestApiId: The API Gateway this Lambda is associated with. If no value is provided for this property, a default Gateway will be created.

Wheeeeeeeeh! Now comes the fun part. 🏝️


Part 2:

In this section, we are going to describe the handler code for each of the Lambda functions.

At the end of this section, your project hierarchy will look something like this,

Project directory hierarchy

But first, we need to do a couple of things to make our life easier for later.

  1. As we will be using some external Python library in our code to interact with the OpenSearch instance, we need to specify the required packages in a requirements.txt file inside /lambdas directory. Paste the following line inside this file.

    opensearch-py
    
  2. Create another Python file called utils.py inside /lambdas directory. As the name suggests, this file will contain lots of helper functions, custom exceptions, and initialization code for OpenSearch. Following is the content of this file.

    import json
    import os
    import boto3
    from opensearchpy import OpenSearch
    
    # Env params
    os_secret = os.environ.get("OPEN_SEARCH_SECRET")
    os_domain_endpoint =         os.environ.get("OPEN_SEARCH_DOMAIN_ENDPOINT")
    
    # Initialization
    sm_client = boto3.client("secretsmanager")
    sm_response = sm_client.get_secret_value(SecretId=os_secret)
    secret = json.loads(sm_response["SecretString"])
    auth_pass = (secret.get("username"), secret.get("password"))
    os_domain_port = {"host": os_domain_endpoint, "port": 443}
    
    os_client = OpenSearch(
        hosts=[os_domain_port],
        http_compress=True,
        http_auth=auth_pass,
        use_ssl=True,
        verify_certs=True,
    )
    
    # Util functions
    def check_index_exists(index_name):
        if not os_client.indices.exists(index=index_name):
            raise IndexNotFoundException("Invalid parameter!")
    
    def get_response(status=400, message="", data=None):
        headers = {
            "Content-Type": "application/json",
            "Access-Control-Allow-Origin": "*",
            "Access-Control-Allow-Methods": "*",
            "Access-Control-Allow-Headers": "*",
        }
        return {
            "statusCode": status,
            "headers": headers,
            "body": json.dumps({"message": message, "data":     data}, default=str),
        }
    
    # Exceptions
    class ValidationError(Exception):
        pass
    
    class IndexNotFoundException(Exception):
        pass
    
    

All you need to know from here is this —

  • We will import this file in each of our Lambda handlers and get ready-made access to the OpenSearch instance through the library we installed earlier.
  • Authentication to OpenSearch using the credentials stored in Secrets Manager will be taken care of by this helper. And now you see why we passed those two environment variables in the template file? 😎

Now, we will describe the code for each Lambda handler in detail. Create the specified files and copy the code for each handler module as shown here.

1. Create Document (create_doc.py): This will create new documents and send them to the OpenSearch for indexing.
import time
import json
from utils import os_client, get_response, check_index_exists, ValidationError, IndexNotFoundException


def lambda_handler(event, context):
    try:
        print(event)
        index_name = event["pathParameters"]["index_name"]
        parsed_body = json.loads(event["body"])

        validate_payload(parsed_body)
        check_index_exists(index_name)

        created_by = parsed_body["created_by"]
        created_at_ms = int(time.time() * 1000)

        form_data = {
            "category": parsed_body["category"],
            "title": parsed_body["title"],
            "tags": parsed_body["tags"],
            "md_content": parsed_body["md_content"],
            "created_by": created_by,
            "created_at_ms": created_at_ms,
            "last_updated_by": created_by,
            "last_updated_at_ms": created_at_ms,
        }

        response = os_client.index(
            index=index_name,
            body=form_data,
            refresh=True,
        )
        print(response)

        return get_response(
            status=200,
            message=f"Document (ID: {response.get('_id')}) created successfully",
            data="",
        )
    except ValidationError as e:
        print(e)
        return get_response(
            status=400,
            message=str(e),
        )
    except IndexNotFoundException as e:
        print(e)
        return get_response(
            status=400,
            message=str(e),
        )
    except Exception as e:
        print(e)
        return get_response(
            status=400,
            message="error",
        )

def validate_payload(body):
    is_payload_valid = True
    is_payload_valid = is_payload_valid and "category" in body and body["category"]
    is_payload_valid = is_payload_valid and "title" in body and body["title"]
    is_payload_valid = is_payload_valid and "tags" in body and body["tags"]
    is_payload_valid = is_payload_valid and "md_content" in body and body["md_content"]
    is_payload_valid = is_payload_valid and "created_by" in body and body["created_by"]

    if not is_payload_valid:
        raise ValidationError("Must provide all values!")

Enter fullscreen mode Exit fullscreen mode
2. Update Document (update_doc.py): This will update the existing documents already indexed by the OpenSearch.
import time
import json
from utils import os_client, get_response, check_index_exists, ValidationError, IndexNotFoundException
from opensearchpy import NotFoundError


def lambda_handler(event, context):
    try:
        print(event)
        index_name = event["pathParameters"]["index_name"]
        doc_id = event["pathParameters"]["doc_id"]

        parsed_body = json.loads(event["body"])
        validate_payload(parsed_body)
        check_index_exists(index_name)

        last_updated_by = parsed_body["last_updated_by"]
        last_updated_at_ms = int(time.time() * 1000)
        form_data = {
            "category": parsed_body["category"],
            "title": parsed_body["title"],
            "tags": parsed_body["tags"],
            "md_content": parsed_body["md_content"],
            "last_updated_by": last_updated_by,
            "last_updated_at_ms": last_updated_at_ms,
        }

        body = {"doc": form_data}
        response = os_client.update(
            index=index_name,
            id=doc_id,
            body=body,
            _source=True,
        )
        print(response)

        return get_response(
            status=200,
            message=f"Document (ID: {response.get('_id')}) updated successfully",
            data="",
        )
    except ValidationError as e:
        print(e)
        return get_response(
            status=400,
            message=str(e),
        )
    except IndexNotFoundException as e:
        print(e)
        return get_response(
            status=400,
            message=str(e),
        )
    except NotFoundError as e:
        print(e)
        return get_response(
            status=400,
            message="Document not found!",
        )
    except Exception as e:
        print(e)
        return get_response(
            status=400,
            message="error",
        )

def validate_payload(body):
    is_payload_valid = True
    is_payload_valid = is_payload_valid and "category" in body and body["category"]
    is_payload_valid = is_payload_valid and "title" in body and body["title"]
    is_payload_valid = is_payload_valid and "tags" in body and body["tags"]
    is_payload_valid = is_payload_valid and "md_content" in body and body["md_content"]
    is_payload_valid = is_payload_valid and "last_updated_by" in body and body["last_updated_by"]

    if not is_payload_valid:
        raise ValidationError("Must provide all values!")

Enter fullscreen mode Exit fullscreen mode
3. Get Document (get_doc.py): As the name suggests, this will fetch a particular document from OpenSearch by document-id.
from utils import os_client, get_response, check_index_exists, IndexNotFoundException
from opensearchpy import NotFoundError


def lambda_handler(event, context):
    try:
        print(event)
        index_name = event["pathParameters"]["index_name"]
        doc_id = event["pathParameters"]["doc_id"]

        check_index_exists(index_name)

        response = os_client.get(
            index=index_name,
            id=doc_id,
        )
        print(response)
        temp = response["_source"]
        temp["doc_id"] = response["_id"]

        return get_response(
            status=200,
            message="",
            data=temp,
        )
    except IndexNotFoundException as e:
        print(e)
        return get_response(
            status=400,
            message=str(e),
        )
    except NotFoundError as e:
        print(e)
        return get_response(
            status=400,
            message="Document not found!",
        )
    except Exception as e:
        print(e)
        return get_response(
            status=400,
            message="error",
        )
Enter fullscreen mode Exit fullscreen mode
4. Search Documents (search_docs.py): This will take some query parameters and use the power of full-text searching by OpenSearch to retrieve the most relevant documents.
import json
from utils import os_client, get_response, check_index_exists, ValidationError, IndexNotFoundException


def lambda_handler(event, context):
    try:
        print(event)
        index_name = event["pathParameters"]["index_name"]
        parsed_body = json.loads(event["body"])

        validate_payload(parsed_body)
        check_index_exists(index_name)

        query_dict = {
          "query": {
            "bool": {
                "minimum_should_match": 1,
                "should": [
                {
                  "multi_match": {
                    "query": parsed_body["text"],
                    "fields": ["title^5", "tags^4", "md_content^3", "created_by^2", "last_updated_by^2"],
                    "fuzziness": "AUTO"
                  }
                }
              ],
            }
          }
        }
        if parsed_body.get("category") and parsed_body.get("category").lower() != "all":
            query_dict["query"]["bool"]["filter"] = [
                {"term": { "category": parsed_body.get("category")}}
            ]

        response = os_client.search(
            body=query_dict, index=index_name
        )
        print(response)
        formatted_docs = []
        for doc in response["hits"]["hits"]:
            temp = doc["_source"]
            temp["doc_id"] = doc["_id"]
            formatted_docs.append(temp)

        result = {
            "count": response["hits"]["total"]["value"],
            "documents": formatted_docs
        }
        return get_response(
            status=200,
            message="",
            data=result,
        )
    except ValidationError as e:
        print(e)
        return get_response(
            status=400,
            message=str(e),
        )
    except IndexNotFoundException as e:
        print(e)
        return get_response(
            status=400,
            message=str(e),
        )
    except Exception as e:
        print(e)
        return get_response(
            status=400,
            message="error",
        )

def validate_payload(body):
    is_payload_valid = True
    is_payload_valid = is_payload_valid and "text" in body and body["text"]

    if not is_payload_valid:
        raise ValidationError("Must provide search text!")
Enter fullscreen mode Exit fullscreen mode

We are DONE! 🎆🎆🎆

You can compare your project that we have built from scratch with an existing working version from this repo. In case you face any issues during deployment to AWS, simply compare your template and code with this version. 🙌

Part 3:

Now, we will deploy our completed project to AWS using SAM CLI.

Follow the steps below to do just that:

  • Open a terminal.
  • Change the current working directory to the project’s root directory (where the template.yaml file is located). cd DIRECTORY_NAME
  • Type in sam build to prepare the project to be deployed. It will create a hidden folder /.aws-sam in the root directory of the project.
  • Type in sam deploy --guided (Use the guided option ONLY if you are deploying the template for the very first time. For all subsequent deployments, simply use sam deploy).
  • Follow the on-screen instructions and accept the default options. A sample configuration settings is provided below:
$> sam deploy --guided
Stack Name [sam-app]: opensearch-knowledge-base
AWS Region [us-east-1]: 
Parameter AllPrefix [knowledge-base]: 
Parameter StageName [dev]: 
Confirm changes before deploy [y/N]: y
Allow SAM CLI IAM role creation [Y/n]: y
Disable rollback [y/N]: y
CreateDocumentFunction has no authentication. Is this okay? [y/N]: y
UpdateDocumentFunction has no authentication. Is this okay? [y/N]: y
GetDocumentFunction has no authentication. Is this okay? [y/N]: y
SearchDocumentsFunction has no authentication. Is this okay? [y/N]: y
Save arguments to configuration file [Y/n]: y
SAM configuration file [samconfig.toml]: 
SAM configuration environment [default]:
Enter fullscreen mode Exit fullscreen mode

Output of 'sam deploy'


Test API:

Now that our project is deployed, we can call the APIs to interact with OpenSearch. Let’s create a document using CURL by calling the endpoint https://aaaa1111bbbb.execute-api.us-east-1.amazonaws.com/dev/{index_name}/kb-docs”. Replace the {index_name} with knowledgebase.

curl -X POST -H "Content-Type: application/json" -d '{"title":"How to fix ABC error", "category":"Admin", "created_by":"Test User", "tags":"Error, Issue", "md_content":"**Need to manually update the field**"}' https://aaaa1111bbbb.execute-api.us-west-2.amazonaws.com/dev/knowledgebase/kb-docs
Enter fullscreen mode Exit fullscreen mode

Aaaaand, we get an error, {“message”: “Invalid parameter!”, “data”: null}. 🧐

What did we miss?! 🤯

No need to panic! We simply forgot to create the index that we provided in the API endpoint(knowledgebase) in OpenSearch. 🤓

Create OpenSearch Index:

  1. Log in to AWS Console.

AWS Console

  1. Go to OpenSearch console and click on the OpenSearch instance under the Domain section. Click on the link under “OpenSearch Dashboards URL (IPv4)”. Wait for a Login page to appear.

OpenSearch Dashboard

Dashboard Login URL

Dashboard Login Page

  1. Go to Secrets Manager console and click on the secret created as part of our project. On the details page, locate the “Secret Value” section and click on the “Retrieve secret value”. This will show the username and password for OpenSearch Dashboard. Use these to log into OpenSearch.

Secrets Manager Console

Retrieve Secrets Manager

Retrieve Secrets Manager

OpenSearch Login

  1. Select Explore on my own and then when prompted, select Global. From the Navigation menu, go to Management > Dev Tools.

OpenSearch Dev Tools

  1. We can see an editor section on our left. We can directly interact with our OpenSearch instance by entering various commands here.

Dev Tools Editor

Paste the following block of code in the editor and press Run (Shown above).

PUT /knowledgebase
{
  "mappings": {
    "properties": {
      "category": {
        "type": "keyword"
      },
      "title": {
        "type": "text",
        "analyzer": "english",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "tags": {
        "type": "text",
        "analyzer": "english",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "md_content": {
        "type": "text",
        "analyzer": "english"
      },
      "created_by": {
        "type": "text",
        "analyzer": "english",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "created_at_ms": {
        "type": "date",
        "format": "epoch_millis"
      },
      "last_updated_by": {
        "type": "text",
        "analyzer": "english",
        "fields": {
          "keyword": {
            "type": "keyword",
            "ignore_above": 256
          }
        }
      },
      "last_updated_at_ms": {
        "type": "date",
        "format": "epoch_millis"
      }
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

Now that we have successfully created the index knowledgebase, execute the CURL command again from above, and voila!

We have successfully deployed a knowledge base backend built using Amazon OpenSearch, API Gateway, and Lambda. 🎉🎉

The entire project can be found in this repository.

If you found this post useful, please give it a 👏🏽 and follow me on Medium. Let’s get connected on LinkedIn.

Top comments (0)