DEV Community

Dr. Malte Polley
Dr. Malte Polley

Posted on

Exploring the Matillion API in Data Productivity Cloud (DPC): Generic Pipeline Monitoring as well as Execution

Handling pipeline monitoring can often feel like a daunting task, especially when it comes to managing pipeline dependencies. For small teams, gaining visibility into pipeline executions is essential, yet challenging. Having worked with Matillion for over a year, I’ve encountered a persistent issue: obtaining comprehensive logs with minimal effort.

While you can utilize SNS components for logging each step, this approach demands numerous components for precise logging. Alternatively, custom scripts can be created to connect with third-party solutions. However, the absence of a built-in generic logging notification option in Matillion DPC has always been a drawback.

In this article, I will demonstrate how to leverage the DPC API to initiate and track pipelines, paving the way for event-based child pipeline executions.

APIs are there to rule the IT World – also within Matillion

APIs have become a cornerstone of modern IT solutions, and fortunately, Matillion offers a robust API (LINK) that allows you to initiate pipeline executions seamlessly. As we delve deeper into the API documentation, we discover the necessity of creating a technical user. This user will receive a CLIENT_ID and a CLIENT_SECRET, which are pivotal before you can start working with the DPC. To kick off our journey, we need to obtain a BEARER token:

curl --location 'https://id.core.matillion.com/oauth/dpc/token' \
--header 'Content-Type: application/x-www-form-urlencoded' \
--data-urlencode 'grant_type=client_credentials' \
--data-urlencode 'client_id=<CLIENT_ID>' \
--data-urlencode 'client_secret=<CLIENT_SECRET>' \
--data-urlencode 'audience=https://api.matillion.com'
Enter fullscreen mode Exit fullscreen mode

The response from the DPC API provides the much-needed BEARER token:

{
  "access_token": "eyJhbGciOiJSUzI1NiIsInR5cCIgOiAiSldUIiwia2lkIiA6ICJYTzUtTWtvM0hPYWtJRkdIeXNCSFp2RnQ5SElYRzcxWmhudlJjVnc4UEtvIn0...",
  "expires_in": 1800,
  "refresh_expires_in": 0,
  "token_type": "Bearer",
  "not-before-policy": 0,
  "scope": "pipeline-execution"
}
Enter fullscreen mode Exit fullscreen mode

Matillion DPC provides extensive API documentation, including a comprehensive tutorial for executing and tracking pipeline executions: Executing and managing a pipeline - Matillion Docs.

Translation into Python Scripts

Armed with the tutorial, we can create a Python class that streamlines our interaction with the Matillion API. This class will encompass several key tasks:

  • Authentication and Interaction: Manage authentication and API interactions.
  • Initialization: Accept client ID and secret as arguments to set up URLs for token retrieval and API access.
  • Token Request: Implement a method to obtain a Bearer token for API access.
  • Project Data Retrieval: Create a method to fetch project data from Matillion, crucial for tracking pipeline status later on. Pipeline Execution: Trigger the execution of a specified pipeline, updating the project data with the execution ID and status.
  • Pipeline Status Check: Monitor the status of the pipeline execution and update the project data accordingly.

As we examine the code, we notice that the script iterates through responses extensively. This can be streamlined in your projects. I utilize these scripts in conjunction with AWS CDK and AWS Lambda, which necessitates multiple iterations to pinpoint the correct pipeline.

Ultimately, we aim to receive notifications about errors for every pipeline execution. Thus, we’ve integrated Microsoft Teams to facilitate error logging and provide a direct link to the relevant Matillion Dashboard:

  • Notification Method: Sends a message regarding the pipeline execution status to a designated Teams channel via a webhook.
class MatillionCheck(requests.auth.AuthBase):
    """Matillion DPC API class.

    Args:
        requests (Object): requests.auth.AuthBase class
    """

    def __init__(self, client_id: str, client_secret: str):
        """Initiate Matillion DPC API class.

        Args:
            client_id (str): Matillion client id
            client_secret (str): Matillion client secret
        """
        self.client_id = client_id
        self.client_secret = client_secret
        self.acces_token_url = "https://id.core.matillion.com/oauth/dpc/token"
        self.base_url = "https://eu1.api.matillion.com/dpc"

    def request_api_access_token(self, verify_ssl: bool):
        """Request a Bearer token from Matillion API.

        Args:
            verify_ssl (bool): secure transport flag

        Returns:
            access_token: Bearer token
        """
        headers = {"Content-Type": "application/x-www-form-urlencoded"}
        data = {
            "grant_type": "client_credentials",
            "client_id": self.client_id,
            "client_secret": self.client_secret,
            "audience": "https://api.matillion.com",
        }

        response = requests.post(
            self.acces_token_url, headers=headers, data=data, verify=verify_ssl
        )
        return response.json()["access_token"]

    def get_project_data(
        self,
        access_token: str,
        verify_ssl: bool,
        target_project_name: str,
        target_environment_name: str,
        target_pipeline_name: str,
    ):
        """Get project data form Matillion API.

        Args:
            access_token (str): Bearer token
            verify_ssl (bool): secure transport flag
            target_environment_name (str): name of the environment
            target_pipeline_name (str): name of the pipeline name

        Returns:
            project_data: data collection
        """
        url_projects_ids = self.base_url + "/v1/projects"
        url_environments = self.base_url + "/v1/projects/{projectId}/environments"
        url_published_pipelines = (
            self.base_url + "/v1/projects/{projectId}/published-pipelines"
        )
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/json",
        }
        params = {"size": 100, "page": 0}
        projects_left = True

        while projects_left:
            try:
                projects = requests.get(
                    url_projects_ids, headers=headers, params=params, verify=verify_ssl
                )
                logging.info(f"Check for API errors: {projects.raise_for_status()}")
                projects_json = projects.json()
                if len(projects_json["results"]) > 0:
                    logging.info(f"Found {len(projects_json['results'])} projects")
                    for i in projects_json["results"]:
                        if target_project_name == i["name"]:
                            logging.info(
                                f"Gathering information on project name {i['name']} with id {i['id']}"
                            )

                            environments = requests.get(
                                url_environments.replace("{projectId}", i["id"]),
                                headers=headers,
                                verify=verify_ssl,
                            )
                            logging.info(
                                f"Check for API errors: {environments.raise_for_status()}"
                            )
                            environments_json = environments.json()
                            for n in environments_json["results"]:
                                if target_environment_name in n["name"]:
                                    environment_name = n["name"]
                                    logging.info(
                                        f"Found target environment {n['name']}"
                                    )
                            params["environmentName"] = environment_name
                            published_pipelines = requests.get(
                                url_published_pipelines.replace("{projectId}", i["id"]),
                                params=params,
                                headers=headers,
                                verify=verify_ssl,
                            )
                            logging.info(
                                f"Check for API errors: {published_pipelines.raise_for_status()}"
                            )
                            published_pipelines_json = published_pipelines.json()
                            for p in published_pipelines_json["results"]:
                                if target_pipeline_name in p["name"]:
                                    published_pipeline_name = p["name"]
                                    logging.info(
                                        f"Found target pipeline {published_pipeline_name}"
                                    )

                            project_data = {
                                "project_id": i["id"],
                                "project_name": i["name"],
                                "environment_name": environment_name,
                                "agent_id": n["defaultAgentId"],
                                "agent_name": n["defaultAgentName"],
                                "published_pipeline_name": published_pipeline_name,
                            }
                            return project_data
                    params["page"] = params["page"] + 1
                else:
                    break
            except Exception as err:
                logging.exception(f"Error: {err}")
                raise
        return True

    def execute_pipeline(self, project_data: dict, access_token: str, verify_ssl: bool, log_group_name: str, log_stream: str):
        """Execute a Matillion pipeline via API.

        Args:
            access_token (str): Bearer token
            verify_ssl (bool): secure transport flag
            project_data (dict): data collection

        Returns:
            project_data: extended data collection
        """
        url_execution = self.base_url + "/v1/projects/{projectId}/pipeline-executions"
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/json",
        }

        body = {
            "pipelineName": project_data["published_pipeline_name"],
            "environmentName": project_data["environment_name"],
        }
        try:
            execution = requests.post(
                url_execution.replace("{projectId}", project_data["project_id"]),
                json=body,
                headers=headers,
                verify=verify_ssl,
            )
            logging.info(f"Check for API errors: {execution.raise_for_status()}")
            execution_json = execution.json()
            logging.info(execution_json)
            execution_id = execution_json["pipelineExecutionId"]
            project_data["last_execution_id"] = execution_id
            project_data["execution_status"] = "RUNNING"
        except Exception as err:
            logging.exception(f"Error: {err}")

        return project_data

    def get_pipeline_status(
        self,
        project_data: dict,
        access_token: str,
        verify_ssl: bool,
    ):
        """Get Matillion pipeline execution status.

        Args:
            project_data: data collection
            access_token (str): Bearer token
            verify_ssl (bool): secure transport flag

        Returns:
            project_data: data collection
        """
        url_execution_status = (
            self.base_url
            + "/v1/projects/{projectId}/pipeline-executions/{pipelineExecutionId}"
        )
        url_execution_status_steps = (
            self.base_url
            + "/v1/projects/{projectId}/pipeline-executions/{pipelineExecutionId}/steps"
        )
        headers = {
            "Authorization": f"Bearer {access_token}",
            "Accept": "application/json",
        }
        pipeline_execution_id = project_data["last_execution_id"]
        project_id = project_data["project_id"]
        body = {
            "pipelineName": project_data["published_pipeline_name"],
            "environmentName": project_data["environment_name"],
        }
        try:
            response = requests.get(
                url_execution_status.replace("{projectId}", project_id).replace(
                    "{pipelineExecutionId}", pipeline_execution_id
                ),
                json=body,
                headers=headers,
                verify=verify_ssl,
            )
            logging.info(f"Check for API errors: {response.raise_for_status()}")
            response_json = response.json()
            logging.info("Logging response pipeline status")
            logging.info(response_json)
            if response_json["result"]["status"] == "RUNNING":
                project_data["execution_status"] = response_json["result"]["status"]
            elif response_json["result"]["status"] == "FAILED":
                project_data["execution_status"] = response_json["result"]["status"]
                try:
                    project_data["execution_message"] = response_json["result"][
                        "message"
                    ]
                except KeyError as err:
                    logging.exception(err)
                    response = requests.get(
                        url_execution_status_steps.replace(
                            "{projectId}", project_id
                        ).replace("{pipelineExecutionId}", pipeline_execution_id),
                        params={"size": 100, "page": 0},
                        json=body,
                        headers=headers,
                        verify=verify_ssl,
                    )
                    logging.info(f"Check for API errors: {response.raise_for_status()}")
                    response_json = response.json()
                    logging.info("Logging response pipeline status on steps")
                    logging.info(response_json)
                    for i in response_json["results"]:
                        if i["result"]["status"] == "FAILED":
                            try:
                                project_data[
                                    "execution_message"
                                ] = f"""Step {i["name"]} failed with error {i["result"]["message"]}"""
                            except KeyError:
                                break
                        else:
                            project_data[
                                "execution_message"
                            ] = "Unknown error. Look into Matillion Dashboard."
            else:
                project_data["execution_status"] = response_json["result"]["status"]
        except Exception as err:
            logging.exception(f"Error: {err}")
            raise
        return project_data

    def send_notifcation(self, event: dict, log_group_name: str, log_stream: str):
        """Send a message about the current rotation status.

        Args:
            event (dict): Message dict to be sent via Teams
            log_group_name (str): Name of the CWL Log Group
            log_stream (str): Name of the Log Stream
        """
        try:
            project_name = event["project_name"]
            published_pipeline_name = event["published_pipeline_name"]
            last_execution_id = event["last_execution_id"]
            execution_message = event["execution_message"]
        except KeyError as e:
            logging.exception(e)
            raise

        logging.info("Starting webhook message to Teams")
        try:
            msg = {
                "type": "adaptiveCard",
                "attachments": [
                    {
                        "contentType": "application/vnd.microsoft.card.adaptive",
                        "content": {
                            "$schema": "http://adaptivecards.io/schemas/adaptive-card.json",
                            "type": "AdaptiveCard",
                            "version": "1.5",
                            "body": [
                                {
                                    "type": "Image",
                                    "url": "https://docs.matillion.com/assets/mtln-docs-logo-white.png",
                                    "altText": "Matillion",
                                },
                                {
                                    "type": "Container",
                                    "style": "attention",
                                    "bleed": True,
                                    "spacing": "None",
                                    "items": [
                                        {
                                            "type": "TextBlock",
                                            "text": f"**Abbruch im Projekt: {project_name}**",
                                        }
                                    ],
                                },
                                {
                                    "type": "Container",
                                    "style": "emphasis",
                                    "items": [
                                        {
                                            "type": "TextBlock",
                                            "text": f"**Umgebung**: {published_pipeline_name}",
                                        },
                                        {
                                            "type": "TextBlock",
                                            "text": f"**Fehler**: {execution_message}",
                                        },
                                    ],
                                },
                            ],
                            "actions": [
                                {
                                    "type": "Action.OpenUrl",
                                    "title": "Matillion Dashboard zur Pipeline öffnen",
                                    "url": f"https://mrht.observability-dashboard.eu1.matillion.com/pipeline/{last_execution_id}",
                                }
                            ],
                        },
                    }
                ],
            }
            execution = requests.post(
                web_hook,
                json=msg,
                verify=True,
            )
            logging.info(f"Check for API errors: {execution.raise_for_status()}")
        except Exception as err:
            logging.exception(f"Error: {err}")
            raise
        logging.info("Sent message to Teams")
Enter fullscreen mode Exit fullscreen mode

In the event of an error within Matillion, you’ll receive a message in Teams that looks something like this:

Teams Notification Example

The button “Matillion Dashboard zur Pipeline öffnen” will direct you straight to the corresponding execution dashboard within DPC.

Using AWS to Execute Our Matillion Monitoring Handler

With our code in place, we can leverage an AWS Lambda function in conjunction with AWS Step Functions to execute and monitor the status of a Matillion pipeline execution. AWS Lambda serves as a code execution service, while AWS Step Functions act as a serverless workflow manager and state machine.
The workflow for our DPC Pipeline Execution and monitoring service is depicted below:

Workflow Diagram within AWS Step Functions

This workflow, managed by AWS Step Functions, begins with invoking a Lambda function named "SubmitJob-matrix-42-it-reporting." Following this, a wait state called "Wait30Seconds-matrix-42-it-reporting" pauses execution for 30 seconds. Subsequently, another Lambda function, "GetStatus-matrix-42-it-reporting," checks the job's status. A choice state, "JobComplete-matrix-42-it-reporting," evaluates the execution status. If the status is "FAILED," the workflow transitions to a fail state; if "SUCCESS," it moves to a succeed state, marking the completion of the workflow.

We can utilize the same Lambda function to execute the states “SubmitJob” and “GetStatus.” By using environment variables, we can create generic code to identify the correct Matillion resource for this workflow. Here’s a snippet of the generic Lambda:

def main(event, context, secretsmanager=secretsmanager_client):
    """Orchestrate all other functions.

    Args:
        event (object): Lambda event object
        context (object): Lambda context object
        secretsmanager (boto3, optional): secretsmanager boto3 client object. Defaults to secretsmanager_client.

    Returns:
        project_data (dict): data collection
    """
    logging.info("Starting Matillion Execution Management")
    logging.info(event)
    logging.info(context)
    try:
        secret_arn = os.environ["CLIENT_SECRET_ARN"]
        target_pipeline_name = os.environ["TARGET_PIPELINE_NAME"]
        target_environment_name = os.environ["TARGET_ENVIRONMENT_NAME"]
        target_project_name = os.environ["TARGET_PROJECT_NAME"]
    except KeyError as e:
        logging.exception(e)
        lambda_notifcation(message=e, project_name=target_project_name, log_group_name=log_group_name, log_stream=log_stream)
        raise e

    try:
        response = secretsmanager.get_secret_value(SecretId=secret_arn)
        secret_dict = json.loads(response["SecretString"])
    except Exception as err:
        logging.exception(err)
        lambda_notifcation(message=err, project_name=target_project_name)
        raise err

    matillion = MatillionCheck(
        client_id=secret_dict["client_id"], client_secret=secret_dict["client_secret"]
    )

    logging.info("Getting Bearer Token")
    access_token = matillion.request_api_access_token(verify_ssl=True)
    logging.info("Got Bearer Token")

    try:
        execution_status = event["execution_status"]
        last_execution_id = event["last_execution_id"]
        published_pipeline_name = event["published_pipeline_name"]
        environment_name = event["environment_name"]
        project_data = event
    except KeyError:
        logging.info("Starting new pipeline execution")
        logging.info("Searching for project data ...")
        project_data = matillion.get_project_data(
            access_token=access_token,
            verify_ssl=True,
            target_project_name=target_project_name,
            target_environment_name=target_environment_name,
            target_pipeline_name=target_pipeline_name,
        )
        if project_data is not True:
            logging.info(f"Found project: {project_data} ...")

            logging.info("Executing pipelines ...")
            project_data = matillion.execute_pipeline(
                project_data=project_data, 
                access_token=access_token, 
                verify_ssl=True,
            )
            logging.info(f"Executed pipeline: {project_data} ...")

            published_pipeline_name = project_data["published_pipeline_name"]
            environment_name = project_data["environment_name"]
            last_execution_id = project_data["last_execution_id"]
            execution_status = project_data["execution_status"]

    logging.info(
        f"Checking pipeline status in project {target_project_name} for pipeline {published_pipeline_name} in environment {environment_name}"
    )
    logging.info(f"Last execution: id {last_execution_id} - status {execution_status}")
    project_data = matillion.get_pipeline_status(
        project_data=project_data,
        access_token=access_token,
        verify_ssl=True,
    )

    if project_data["execution_status"] == "FAILED":
        logging.exception(project_data["execution_message"])
        matillion.send_notifcation(
            event=project_data,
        )
    logging.info("Checked Pipeline status ...")

    logging.info("Finished Matillion Execution Management")

    return project_data
Enter fullscreen mode Exit fullscreen mode

This function orchestrates the lifecycle of a Matillion pipeline execution, managing both new executions and status checks for existing ones.

Handling Matillion Child Pipelines in an Event-Based Way

By implementing Lambda and Step Functions, you can listen within AWS for a successful event of the Step Functions execution. To achieve this, connect Step Functions via Amazon EventBridge Rules. The rule requires a pattern to filter relevant API calls:

{
  "detail-type": ["Step Functions Execution Status Change"],
  "detail": {
    "stateMachineArn": ["arn:aws:states:eu-central-1:ACCOUNT_ID:stateMachine:StateMachinematrix42etl1788E6C1-afV30G0wnBsD"],
    "status": ["SUCCEEDED"]
  },
  "source": ["aws.states"]
}
Enter fullscreen mode Exit fullscreen mode

In this setup, if the status of a specific Step Function is “SUCCEEDED,” a target Step Function will be initiated, allowing for seamless event-driven execution.

Target Step Functions for relevant API calls

Summary

In conclusion, the generic pipeline monitoring feature is essential for Matillion DPC. It simplifies processes for all customers and, on the flip side, I am thrilled about the API's capabilities, which empower us to create remarkable solutions and event-based pipeline dependencies—an invaluable asset.

While I haven’t delved into Infrastructure as Code (IaC) in this blog post, it's certainly a great idea to incorporate it. With the Cloud Development Kit, you can leverage multiple Lambdas and Step Functions to automate your deployment process.

On the Microsoft Teams side, all you need is access to Teams workflows to allow incoming webhook workflows, whether for a channel or group chat. And there you have it—a robust Microsoft Teams and AWS Matillion monitoring integration!

Happy Coding!

Top comments (0)