DEV Community

Dr. Malte Polley
Dr. Malte Polley

Posted on

ELT as Compliance Enabler: Running Steampipe with Matillion Data Productivity Cloud

This blog post is the second part of my series about the Modern Data Platform (MDP) of MRH Trowe, a German insurance broker. The first part was about the Zero-ETL integration of Snowflake and S3. This part describes our data loading process using the example of Azure AD reporting with the help of steampipe and Matillion Data Productivity Cloud. Every blog post tells its dedicated story.

TLDR;

With steampipe, data engineers can easily query via SQL. In addition to compliance reporting, many other cases can be mapped. With the Data Productivity Cloud, Matillion, on the other hand, offers a low-code platform that can be deeply integrated into corporate networks via custom agents. This gives the customer various options for control mechanisms regarding data flows and network segmentation.

Compliance Requirement: Know your Azure AD Users

For this blog post we use a simple use case. We need a daily snapshot of our users in Azure AD. The list of users should be made available in a table in Snowflake so that it can be further processed from there. The idea is to continue working with login data to identify inactive users and then delete them from Azure AD.

Use Case: Azure AD Identity Reporting

With this reporting use case in hand, there are various options for implementation. MRH Trowe relies heavily on standardization and automation in all its projects. Accordingly, our Azure AD queries must run automatically every day and, in the best case, be easily adaptable for new use cases.

Azure offers its own SDK for various languages for Azure AD queries. This is also easy to use but requires the data engineer to know a programming language. While looking for another option for querying Software as a Service (SaaS) platforms, we at MRH Trowe came across steampipe.

Intro Steampipe

Steampipe is an open-source project from the company turbot. The claim is very catchy:

select * from cloud; Instantly query your cloud, code, logs & more with SQL. Build on thousands of open-source benchmarks & dashboards for security & insights.

With an SQL interface, steampipe is exactly what a data engineer loves. At the same time, the variety of options available from SaaS platforms is great. Finally, steampipe is relatively fast in the query - in any case fast enough for automation in daily batch runs.

In the background, steampipe was created with Go. The developer guide contains various examples of how you can not only access data, but also manipulate data.

Steampipe offers a plugin for Azure AD and many more. In the first, step we only need the azuread_user table. In addition to this, steampipe offers 14 more, so that many use cases should be able to be represented with this set alone. Ultimately, the documentation also offers a mapping between the plugin's tables and possible compliance guidelines that could be implemented in the organization according to a certain standard.

Matillion Data Productivity Cloud

Before starting the MDP, MRH Trowe didn't have much of a reporting perspective on the existing data. PowerBI was primarily used as a one-size-fits-all tool. PowerBI in combination with an SMB file share is problematic purely from a performance perspective. After Snowflake was chosen as the data warehouse and the data integration between the respective Snowflake DWH and S3 was clarified, the question of data manipulation arose. This is where the Matillion Data Productivity Cloud comes into play, which offers us two key advantages from the automation and integration perspective. On the one hand, it is possible to set up all possible communication paths between Snowflake and the source using ready-made components (SMB, SFTP and APIs, etc.) and Matillion also acts as a scheduler for the data transformation and loading processes. The Matillion jobs themselves are created as low code objects and compile both SQL and can also execute Python and Bash scripts.

The Ease of Access: Meet the (Team) Requirements

In addition to a technological analysis of what needs MDP should be met by MRH Trowe, conceptual requirements also had to be clarified. The two essential ones were the integration into the existing network infrastructure but also the easy, seamless access to data transformations. Matillion meets both needs.

As a SaaS product, Matillion offered the possibility of network integration through custom agents. They periodically poll the Matillion API and execute new jobs, if available. This gives MRH Trowe the opportunity to have precise control over the agents' access and to control them. This scenario is based on hybrid data architecture, where resources can be distributed across all network segments in the cloud or on-premises.

Thanks to the low-code approach, everyone - including IT-savvy department users - can understand how the jobs run or design them themselves. However, Matillion also offers high code users the opportunity to solve complex scenarios via SQL, Python and Bash scripts. In any case, what the flow is doing remains clear.

The Architecture: S3 Buckets, Matillion Custom Agents with Fargate and Snowflake Private Link

The data flow can also be private via the custom agents and in conjunction with the Zero-ETL approach via S3 and Snowflake. What is important here to MRH Trowe is to avoid data transport via the custom agents into the Matillion Cloud.

The diagram shows the AWS architecture. For reasons of clarity, not all options are shown. Matillion can be deployed with high availability in an ECS cluster in private subnets behind NAT gateways (green call direction). This means that the custom agents are protected from external access. The agents' security only requires outbound rules to ensure communication with the Matillion API, the Snowflake Private Link and the source and target systems.

To remain private, we need a gateway or interface endpoints for S3 (red call direction). For Snowflake we need the Business Critical service level in the tenant. The AWS PrivateLink Service can then be created afterwards (blue access direction).

AWS Network Integration

The Dockerfile: Taking Ownership

The integration with the custom agents requires a Docker image, which by default does not have to be touched by the Matillion customer because it does run patch and update cycles itself. However, the own care and maintenance of the Docker image offers the opportunity to carry out configurations yourself. In addition, the release cycle can also be controlled.

Regarding our use case, there is no possibility of installing steampipe from Matillion because the user has no permissions in the Docker container. You could now install an execution resource in addition to the custom agents, or you could configure steampipe yourself in the Docker image.

Below you will find an adaptation of the Matillion image for our use case. It is important that the user in the Docker Container does not have root rights, so paths and folders in particular must be made available to the user agentuser.

ARG matillion_image_tage

FROM public.ecr.aws/matillion/etl-agent:$matillion_image_tage

USER root
# Make important paths available
RUN chown -R agentuser /usr/local/bin
RUN chown -R agentuser /bin/uname
USER agentuser

# Install steampipe as non-root
RUN /bin/sh -c "$(curl -fsSL https://steampipe.io/install/steampipe.sh)"
# Install steampipe plugin(s)
RUN steampipe plugin install azuread

# Extend path with important paths
RUN PATH=$PATH:~/usr/local/bin/steampipe
RUN export PATH=$PATH:~/bin/uname
Enter fullscreen mode Exit fullscreen mode

To be able to communicate with Azure AD, tokens must be available (TenantID, ClientID and ClientSecret). These are made available to steampipe via the pipeline or project's environment variables.

The Pipeline: Keep things simple

In Matillion we use an orchestration pipeline. A bash component creates a steampipe logging and starts a limited query in advance as a DB cache update. In the next step, a Python script component carries out the actual query using subprocess and loads the result into an S3 bucket using pandas. In Snowflake, the bucket is known as the External Stage which will be updated via ALTER STAGE command. The existing table in Snowflake is emptied via TRUNCATE and filled with the current Azure AD user list via COPY INTO command. This ends the pipeline. We use SNS topics for the corresponding error logging to be informed about pipeline interruptions.

Matillion Pipeline: Running Steampipe

The Python script can run steampipe via the subprocess library and with the help of pandas manipulation of the data as well as the cross-account data loads can be carried out.

# Snippet
import pandas as pd
import subprocess

def upload_file(
    query: str,
    directory: str,
    env_vars: dict,
    s3_bucket: str,
    credentials: dict,
):
    """Query data from Azure AD and copy as file to s3.

    Args:
        query (str): steam pipe querey
        directory (str): directory for S3
        env_vars (dict): dictionary with Azure AD tokens
        s3_bucket (str): S3 bucket name
        credentials (dict): AWS credentials

    Returns:
        bool: indicates success
    """
    now = datetime.now()
    year = now.strftime("%Y")
    date = now.strftime("%Y-%m-%d")

    file_name = f"{date}_{directory}.csv"
    access_key = credentials["Credentials"]["AccessKeyId"]
    secret_access_key = credentials["Credentials"]["SecretAccessKey"]
    sessions_token = credentials["Credentials"]["SessionToken"]

    logging.info("Running steampipe query")
    logging.info(f'steampipe query {query} --output=json')
    result = subprocess.run(
        [f"steampipe query \"{query}\" --output=json"],
        env=env_vars,
        shell=True,
        capture_output=True,
        text=True
    )
    logging.info(result.stderr)
    user_dict = json.loads(result.stdout)
    df = pd.DataFrame.from_dict(user_dict) 
    df["Upload Date"] = date

    df.to_csv(
        f"s3://{s3_bucket}/{directory}/{year}/{file_name}",
        quoting=csv.QUOTE_ALL,
        quotechar='"',
        sep=";",
        index=False,
        storage_options={
            "key": access_key,
            "secret": secret_access_key,
            "token": sessions_token,
        },
    )
    logging.info(
        f"Uploaded {file_name} to s3://{s3_bucket}/{directory}/{year}/{file_name}"
    )
    return True
Enter fullscreen mode Exit fullscreen mode

Closing Words

This blog post shows how to use steampipe with Matillion and Snowflake to meet compliance reporting requirements. However, the way there wasn't easy either. The integration of subprocess was not trivial because the Docker Image would have to be modified, the PATH variable for subprocess would have to be known and all environment variables would also have to be passed to the subprocess.run() call.

The first versions of this pipeline showed that subprocess works significantly differently than executing the same command in Bash itself. The problem was solved by having the Bash component of Matillion upfront to execute the same, but limited query.

Due to the database in the background, it is recommended to execute steampipe queries in Matillion - especially complex ones - not in parallel, but sequentially. Unfortunately, it is (still) not possible to host the steampipe database on RDS. Although there is a service mode that enables the integration of external databases. The service mode does not seem to be an appropriate approach as in ECS IPs are changing constantly. But in the end, with Snowflake in the background we can grab flat tables and perform complex transformations later on.

Overall, the interaction between steampipe and Matillion is very valuable for MRH Trowe - and not just for compliance reporting.

Top comments (0)