DEV Community

Cover image for Migrate log analysis from AWS ElasticSearch to BigQuery
Jinwook Baek
Jinwook Baek

Posted on

Migrate log analysis from AWS ElasticSearch to BigQuery

Original blog post

Introduction

I have been using AWS ElasticSearch for near-real time analysis for API servers. AWS provides built-in Elasticsearch subscription filter for CloudWatch. With less than a hour of effort I can spin up an Elasticsearch cluster to visualize and analyze server logs. However AWS Elasticsearch is not cheap in production setup, also as logs accumulate, it needs some maintenance (lifecycle policy, JVM pressure, etc). Also dev team was not fully utilizing the elastic stack as much. I have been meaning to decommission the Elasticsearch and find alternative option for several month. And finally I decided that it's time to migrate to other solution for following reason.

Amazon: NOT OK - why we had to change Elastic licensing

After some survey, I listed requirements for the substitue

  • Near-realtime analysis (no batch load)
  • Simple filter and aggregation by time range, api status, etc
  • Access management
  • Support for visualization tool (substitue Kibana)
  • No extra setup on server code (no splunk, fluntd, etc)

Anchoring CloudWatch logs are the inception point of all the logs(nginx, api logs, etc) There were couple of alternatives I could think about.

  • Vanilla CloudWatch (manually searching through logs and no visulization, query support → no need to implement anything else)
  • CloudWatch → S3(firehose) → Athena
  • CloudWatch → S3(firehose) → Redshift Spectrum
  • CloudWatch → Kinesis Stream → Kinesis Analysis
  • CloudWatch → Lambda → BigQuery

I was initially inclined to choose something from AWS since I prefer to have less moving pieces and do less coding. However I am already using BigQuery for Data warehouse. I concluded that if I was going to stash some data somewhere I might as well centralize all of them in single place. So I chose to go with lambda+Bigquery. For this post, I will be using nginx log for example, since these formats are common for most of people.

Load vs. Stream

Before going into actual setup process I must acknowledge couple things such as distinction betweenLoad and streaming. Load is when you load batch of data for once or in recurrence. In BigQuery, Load is free and it only charge for storage. However Stream incurs charge and each insert is $0.010 per 200 MB .

Prerequisite

Setup

I am going to cover following components

  • Inspect Cloudwatch logs
  • BigQuery table
  • Lambda code
  • Cloudwatch subscription
  • Biquery + datastudio

Alt Text

BigQuery Table and Schema

First of all, I need to create BigQuery table for the nginx logs to be stored. In order to create correct schema, let's take a look at how the logs will be sent from CloudWatch to lambda subscription filter.

Nginx log

Default nginx log format looks like this. (My log sample has remote ip appended at the end.)

log_format combined '$remote_addr - $remote_user [$time_local] '
                    '"$request" $status $body_bytes_sent '
                    '"$http_referer" "$http_user_agent"';

//sample
172.11.1.1111 - - [28/Jan/2021:16:24:03 +0000] "GET /api/healthcheck/ HTTP/1.1" 200 2 "-" "CRAZY_USER_AGENT/2.0" "111.111.111.111"
Enter fullscreen mode Exit fullscreen mode

I need to transform this log in order to insert to bigquery table in certain format. Thankfully, CloudWatch subscription filter provide filter pattern.

Filter and Pattern Syntax

If I provide following filter pattern like [internal_ip, identitiy , auth_user, date, request, status, bytes, referer, useragent, remote_addr], subscription filter automatically breaks down the log in json format as extractedFields. You can even select certain logs for this filter like [internal_ip, identity , auth_user, date, request !="**/api/healthcheck/**", status, bytes, referer, useragent, remote_addr]

Now we know that the log will come in certain format, I will create a table with following format.

id:string,
timestamp:timestamp,
remote_addr:string,
useragent:string,
referer:string,
bytes:numeric,
status:string,
request:string,
auth_user:string,
date:string,
internal_ip:string,
identity:string
Enter fullscreen mode Exit fullscreen mode

If I need more fields later, I will add them. Make sure that the table is partitioned by timestamp for the query performance and also expire the table after certain time for the cost reduction. (BQ applies cold storage for untouched data after certain periods of time so you might just want to keep them)

Create BigQuery table

Let's go to GCP console and create table with following schema.

Alt Text

You can use cloudshell if you prefer CLI.

bq mk \
--table \
--description description \
--time_partitioning_field timestamp \
--time_partitioning_type DAY \
--label key:value, key:value \
<project_id>:<dataset>.<table> \
id:string,timestamp:timestamp,remote_addr:string,\
useragent:string,referer:string,bytes:numeric,status:string,\
request:string,auth_user:string,date:string,internal_ip:string,\
identity:string,
Enter fullscreen mode Exit fullscreen mode

Once table is created, copy the table name from detail tab. <project_id>.<dataset>.<table-name>

Service Account

Before going back to AWS and create a lambda function, we need to create a service account for lambda in order to gain permission to insert rows to BQ.

Go to service account under IAM menu and click + create service account

Google Cloud Platform

service account name : lambda_bq_stream

permissions : bigquery dataeditor

you can defined custom role with more restrictive permission you only need

  • bigquery.tables.list
  • bigquery.tables.get
  • bigquery.tables.updateData

Once you created a service account you can download a json key click Add key -> create new key. Download and stash somewhere safe, we will be using this key later.

Json key sample

{
  "type": "service_account",
  "project_id": "****",
  "private_key_id": "****",
  "private_key": "****",
  "client_email": "****",
  "client_id": "****",
  "auth_uri": "https://accounts.google.com/o/oauth2/auth",
  "token_uri": "https://oauth2.googleapis.com/token",
  "auth_provider_x509_cert_url": "https://www.googleapis.com/oauth2/v1/certs",
  "client_x509_cert_url": "https://www.googleapis.com/robot/v1/metadata/x509/lambda-bigquery-stream%40hangfive-26bb4.iam.gserviceaccount.com"
}
Enter fullscreen mode Exit fullscreen mode

I will use SAM for lambda deployment, please refer to previous post for the SAM setup.

Migrate python async worker to asynchrounous Lambda

Lambda Fucntion

Ok, now let's focus on the code. This function will receive stream of cloudwatch logs and stream insert rows to BQ table using google bigquery SDK written in python. I will break down the code in pieces to explanation.

Library

All you need is single pip library.

google-cloud-bigquery

Authentication

Client requires authentication and there are two ways to authenticate; from file or json dictionary. There are couple options to do this.
google-auth

  • Json file
    • upload file to s3 and load them from lambda
    • package json with the lambda code → (x)

google-auth

  • dictionary
    • environment variable → I will use this for the purpose of simplicity
    • load key info from secret manager or parameter store

Environment Variable

Although it is better to manage key from secret manager, I will use env var for the simplicity of exercise. I will supply following Environment Variable from json dictionary I have downloaded from previous sections

  "project_id":
  "private_key_id":
  "private_key":
  "client_email": 
  "client_id": 
Enter fullscreen mode Exit fullscreen mode

Decompress and decode cloudwatch logs

When you add lambda subscription filter to CloudWatch, the CloudWatch log event will be sent in gzipped / base64 encoded data. This is an example format. (Don't try to decode the data it's broken.)

Alt Text

You decompress and decode the message with following code.

compressed_payload = b64decode(data)
cloudwatch_payload = zlib.decompress(compressed_payload, 16 + zlib.MAX_WBITS)
json_payload = json.loads(cloudwatch_payload)
Enter fullscreen mode Exit fullscreen mode

Resulting message will be decoded as following example format. Notice logEvents for the list of logs. Notice that message is actually log and they are broken as into extractedFields according to filter supplied as [internal_ip, identity , auth_user, date, request, status, bytes, referer, useragent, remote_addr]

{
    'messageType': 'DATA_MESSAGE',
    'owner': '12341244',
    'logGroup': '/ecs/nginx',
    'logStream': 'ecs/nginx/65366ed9299f4554a2cf0dbd4c49ee08',
    'subscriptionFilters': ['nginx_filter_sample'],
    'logEvents': [{
        'id': '35945479414329140852051181308733124824760966176424001536',
        'timestamp': 1611851043287,
        'message': '172.11.1.1111 - - [28/Jan/2021:16:24:03 +0000] "GET /api/healthcheck/ HTTP/1.1" 200 2 "-" "CRAZY_USER_AGENT/2.0" "111.111.111.111"',
        'extractedFields': {
            'date': '28/Jan/2021:16:24:03 +0000',
            'request': 'GET /api/healthcheck/ HTTP/1.1',
            'referer': '-',
            'remote_addr': '-',
            'bytes': '2',
            'ip': '172.11.1.111',
            'useragent': 'ELB-HealthChecker/2.0',
            'identity': '-',
            'auth_user': '-',
            'status': '200'
        }
    }]
}
Enter fullscreen mode Exit fullscreen mode

SKIP CONTROL MESSAGE

If control message is passed, we will skip insert

if json_payload['messageType'] == 'CONTROL_MESSAGE':
        return
Enter fullscreen mode Exit fullscreen mode

Create payload

We will create payload for the nginx log. Notice I have devided timestamp by 1000 since BQ supports microsecond resolution for timestamp type column.

rows_to_insert = []

for row in json_payload['logEvents']:
    item = {}
    item['id'] = row['id']
    item['timestamp'] = row['timestamp'] / 1000
    if 'extractedFields' in row:
        for k, v in row['extractedFields'].items():
            item[k] = v

    rows_to_insert.append(item)
Enter fullscreen mode Exit fullscreen mode

Inserting rows

Before inserting to BQ, let's take a quick look at the description of insert_rows_json method.

  • insert_rows_json method description Make sure to read row_ids params. It's a unique identifier in order to maintain deduplication.
"""Insert rows into a table without applying local type conversions.

        See
        https://cloud.google.com/bigquery/docs/reference/rest/v2/tabledata/insertAll

        Args:
            table (Union[ \
                google.cloud.bigquery.table.Table \
                google.cloud.bigquery.table.TableReference, \
                str \
            ]):
                The destination table for the row data, or a reference to it.
            json_rows (Sequence[Dict]):
                Row data to be inserted. Keys must match the table schema fields
                and values must be JSON-compatible representations.
            row_ids (Optional[Sequence[Optional[str]]]):
                Unique IDs, one per row being inserted. An ID can also be
                ``None``, indicating that an explicit insert ID should **not**
                be used for that row. If the argument is omitted altogether,
                unique IDs are created automatically.
            skip_invalid_rows (Optional[bool]):
                Insert all valid rows of a request, even if invalid rows exist.
                The default value is ``False``, which causes the entire request
                to fail if any invalid rows exist.
            ignore_unknown_values (Optional[bool]):
                Accept rows that contain values that do not match the schema.
                The unknown values are ignored. Default is ``False``, which
                treats unknown values as errors.
            template_suffix (Optional[str]):
                Treat ``name`` as a template table and provide a suffix.
                BigQuery will create the table ``<name> + <template_suffix>``
                based on the schema of the template table. See
                https://cloud.google.com/bigquery/streaming-data-into-bigquery#template-tables
            retry (Optional[google.api_core.retry.Retry]):
                How to retry the RPC.
            timeout (Optional[float]):
                The number of seconds to wait for the underlying HTTP transport
                before using ``retry``.

        Returns:
            Sequence[Mappings]:
                One mapping per row with insert errors: the "index" key
                identifies the row, and the "errors" key contains a list of
                the mappings describing one or more problems with the row.

        Raises:
            TypeError: if `json_rows` is not a `Sequence`.
        """
Enter fullscreen mode Exit fullscreen mode

Following code will insert a row into BQ table

table_id = '<project_id>.<dataset>.<table-name>'
errors = client.insert_rows_json(table_id, rows_to_insert)  # Make an API request.
    if errors == []:
        print("New rows have been added.")
    else:
        print("Encountered errors while inserting rows: {}".format(errors))
Enter fullscreen mode Exit fullscreen mode

Full Lambda Code

Now that we have covered all the parts, let's combine together and create a function.

Full Lambda Code

Now that we have covered all the parts, let's combine together and create a function.

Test the function with sample events

Let's test the code for sample event payload. As a reminder, you can invoke function with following command

sam local invoke --env-vars env.json -e events/payload.json
Enter fullscreen mode Exit fullscreen mode

Once you see following success message, let's go to BigQuery table and check if data is inserted successfully.
Alt Text
Successfully streamed to the table!
Alt Text

CloudWatch Subscription filter

IT's time to attach lambda subscription on log group. Refer to following CLI command or console.

put-subscription-filter - AWS CLI 1.18.221 Command Reference

put-subscription-filter
--log-group-name <value>
--filter-name nginxToBQ
--filter-pattern [internal_ip, identity , auth_user, date, request, status, bytes, referer, useragent, remote_addr]
--destination-arn <lambdaFUNCTIONARN>
Enter fullscreen mode Exit fullscreen mode

Alt Text

Check Lambda Log

Let's make sure everything is working as expected before setting it on cruise mode. Check cloudwatch logs for the lambda for the cloudwatch subscription filter. If you don't see any error logs, it's all set!

If you are not certain that the logs will be delivered as expected, there are couple way to get notification from abnormal errors.

Data in BigQuery

After a while if everything is working as expected, you will see streams of data in your table.
Alt Text

It's time to make some visualization with the data we have. I will make a heat map of user request from the data. First using aggregation, create a query to get count of each valid remote addr

SELECT remote_addr ip, count(*) c
FROM `<project_id>.<dataset>.<table>` 
WHERE DATE(timestamp) = "2021-01-29" and remote_addr not like "-" 
group by 1
Enter fullscreen mode Exit fullscreen mode

Then join this info with geolite2 table. (Since this query uses huge geolite2 table, it will incur cost.)

WITH source_of_ip_addresses AS (

    SELECT remote_addr ip, count(*) c
    FROM `hangfive-26bb4.cloudwatch.hangfive-nginx` 
    WHERE DATE(timestamp) = "2021-01-29" and remote_addr not  like "-" 
    group by 1
)
SELECT city_name, SUM(c) c, ST_GeogPoint(AVG(longitude), AVG(latitude)) point
FROM (
  SELECT ip, city_name, c, latitude, longitude, geoname_id
  FROM (
    SELECT *, NET.SAFE_IP_FROM_STRING(ip) & NET.IP_NET_MASK(4, mask) network_bin
    FROM source_of_ip_addresses, UNNEST(GENERATE_ARRAY(9,32)) mask
    WHERE BYTE_LENGTH(NET.SAFE_IP_FROM_STRING(ip)) = 4
  )
  JOIN `fh-bigquery.geocode.201806_geolite2_city_ipv4_locs`  
  USING (network_bin, mask)
)
WHERE city_name  IS NOT null
GROUP BY city_name, geoname_id
ORDER BY c DESC
LIMIT 5000
Enter fullscreen mode Exit fullscreen mode

This query will result in count of user in each city.
Alt Text

Visualize

Since we don't have support from Kibana anymore, I will visualize the data with Data Studio.
Alt Text

Data studio

visualize with data studio or your favorite BI tool.
Alt Text

Conclusion

We have looked bunch of different services to get here. At the core, as long as you know how to write a lambda code, you can stream and ingest app logs to anywhere regardless cloudplatform. It's different feeling knowing that you span across other cloud platform. I hope my blog helped you and thank you for reading.

Extra

Concurrency

After letting the lambda to work for a while, my devops guru setup actaully kicked in warning me of concurrency issues. If traffic spike, lambda is bound to be throttled without conccurency configuration.

{
    'AccountId': '1111111111',
    'Region': 'us-east-1',
    'MessageType': 'NEW_RECOMMENDATION',
    'InsightId': 'AGp3byn_PgRryjAItOHdAlcAAAAAAAAAAV8rYO_CKM4aynEOZWZiTd2_OV_0bsqS',
    'Recommendations': [{
        'Name': 'Troubleshoot errors and set up automatic retries in AWS Lambda',
        'Description': 'Your Lambda function is throwing a high number of errors. To learn about common Lambda errors, their causes, and mitigation strategies, see this link.',
        'Reason': 'The Errors metric in AWS::Lambda::FunctionName breached a high threshold. ',
        'Link': 'https://docs.aws.amazon.com/lambda/latest/dg/invocation-retries.html',
        'RelatedEvents': [],
        'RelatedAnomalies': [{
            'SourceDetails': {
                'CloudWatchMetrics': [{
                    'MetricName': 'Errors',
                    'Namespace': 'AWS::Lambda::FunctionName'
                }]
            },
            'Resources': [{
                'Name': 'CloudwatchToBQ',
                'Type': 'AWS::Lambda::FunctionName'
            }]
        }]
    }, {
        'Name': 'Configure provisioned concurrency for AWS Lambda',
        'Description': 'Your Lambda function is having trouble scaling. To learn how to enable provisioned concurrency, which allows your function to scale without fluctuations in latency, see this link.',
        'Reason': 'The Duration metric in AWS::Lambda::FunctionName breached a high threshold. ',
        'Link': 'https://docs.aws.amazon.com/lambda/latest/dg/configuration-concurrency.html#configuration-concurrency-provisioned',
        'RelatedEvents': [],
        'RelatedAnomalies': [{
            'SourceDetails': {
                'CloudWatchMetrics': [{
                    'MetricName': 'Duration',
                    'Namespace': 'AWS::Lambda::FunctionName'
                }]
            },
            'Resources': [{
                'Name': 'CloudwatchToBQ',
                'Type': 'AWS::Lambda::FunctionName'
            }]
        }]
    }]
}
Enter fullscreen mode Exit fullscreen mode

Alt Text

Managing concurrency for a Lambda function

For remedial action I have inspected lambda metrics and updated reserved concurrencies.

Alt Text

you should inspect your lambda metrics and update concurrency accordingly. Also if this is not enough, I recommend you to add pub/sub or queue service such as SQS to handle the traffic.

Streaming Quota

I need to remind you that Google BigQuery also has streaming quota. (It's 500000 rows per second)

Quotas and limits | BigQuery | Google Cloud

Reference

Streaming data into BigQuery | Google Cloud

Streaming insert | BigQuery | Google Cloud

google-cloud-bigquery

google-auth

Geolocation with BigQuery: De-identify 76 million IP addresses in 20 seconds | Google Cloud Blog

Top comments (0)