DEV Community

Badrinarayanan Ravi
Badrinarayanan Ravi

Posted on

Over-engineering analytics for my personal website

Working with AWS can be really overwhelming, especially when they have gazillion services. As a developer, I like to try out various services and check if they can be useful to my organization. It also feels nice adding those swanky keywords in resume hoping the ATS picks your resume when applying for a new job.

I have been programming professionally since 2014 and to be honest never felt the need to have a personal landing page or a blog. I occupy myself with other hobbies, but I felt it was about time I had some kind of presence on the Internet apart from the social media profiles.

While working on my landing page, I was wondering what to use for analytics for my site. I need some kind of data as to how many users are visiting my site and from where. I didn’t want to integrate google analytics and surely was not going to look for a paid solution. Then, I had an epiphany why not just glue together a couple of AWS services and make it work. I don’t need a ton of functionality, just a user count and where are the users from.

So, here’s the gist, I decided to host the site in CloudFront. CloudFront stores access logs in a S3 bucket. Access logs contain IP addresses. I can get the location from the IP address.

Let's Build.

Image description

In the above architecture diagram, we get a glimpse of the AWS services used and how the system actually works.

AWS services used:

  1. CloudFront: The landing page is hosted on CloudFront.
  2. S3: To store CloudFront access logs and custom Json files which we create via code.
  3. Athena: To query, log file and Json files.
  4. EventBridge: To trigger StepFunction at a particular time.
  5. Step Functions: Run the serverless workflow.
  6. Lambda: Query Athena and put the result back in a S3 bucket.
  7. DynamoDB: It is not shown in the above diagram, I use it to store the aggregate result from Athena.

How does the system work?

When a user, visits https://www.badriravi.com/. The request is served by CloudFront which writes the access log to a S3 bucket which is configured at the time of creating the CloudFront distribution.

Note: CloudFront does not write access logs by default. The setting has to be enabled.

Now that access logs have been configured, we need to create an Athena table to query those logs. The schema for the table can be found here

Once that Athena table has been created, the logs can be queried. The site visitor location can be fetched from the
"ip_address" field, by using a third-party service. I am using http://ip-api.com/json/.

Let's write code to query Athena and get the location of the site visitor. Athena queries are not synchronous. They take some time to give back the query result. We need to "wait" for the query to be completed. This can be achieved by using the "long polling" technique in code. I decided to be creative and use Step Functions. I created two Lambda functions, first function will run the query in Athena and the second function will parse the result.
Step Functions will be used to orchestrate the serverless workflow. We can also introduce a "wait" state in Step Functions to wait for the Athena query to be completed.

Step Functions JSON

{
  "Comment": "<comment>",
  "StartAt": "Run Athena Query",
  "States": {
    "Run Athena Query": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$",
        "FunctionName": "<arn of Lambda function>"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "Next": "Wait"
    },
    "Wait": {
      "Type": "Wait",
      "Seconds": 3,
      "Next": "Write to DB"
    },
    "Write to DB": {
      "Type": "Task",
      "Resource": "arn:aws:states:::lambda:invoke",
      "Parameters": {
        "Payload.$": "$.Payload",
        "FunctionName": "<arn of Lambda function>"
      },
      "Retry": [
        {
          "ErrorEquals": [
            "Lambda.ServiceException",
            "Lambda.AWSLambdaException",
            "Lambda.SdkClientException",
            "Lambda.TooManyRequestsException"
          ],
          "IntervalSeconds": 2,
          "MaxAttempts": 6,
          "BackoffRate": 2
        }
      ],
      "End": true
    }
  }
}
Enter fullscreen mode Exit fullscreen mode

In our Step function config, we have 3 states:

  1. Run Athena Query
  2. Wait
  3. Write to DB

Run Athena Query

The first state is a Lambda function, which runs the Athena query. I used the GO SDK to do this.

package main

import (
    "context"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/service/athena"
    "github.com/aws/aws-sdk-go-v2/service/athena/types"
)

type MyEvent struct {
    QueryDateStart string `json:"queryDateStart"`
    QueryDateEnd string `json:"queryDateEnd"`
}

type QueryExecution struct {
    QueryExecutionId string `json:"queryId"`
}

const REGION = "<region>"
const OUTPUT_BUCKET = "<bucketname>"

func handler(ctx context.Context, event MyEvent) (QueryExecution, error) {

    QUERY := "select * from tablename where method = 'GET' and status = 200 and uri = '/' and date < DATE('"+event.QueryDateEnd+"') and date >= DATE('"+event.QueryDateStart+"')  order by date desc limit 1000"
    cfg, err := config.LoadDefaultConfig(context.TODO(), func(o *config.LoadOptions) error {
        o.Region = REGION
        return nil
    })

    if err != nil {
        fmt.Println(err)
        return QueryExecution{}, err
    }

    client := athena.NewFromConfig(cfg)

    resultConfig := &types.ResultConfiguration{
        OutputLocation: aws.String(OUTPUT_BUCKET),
    }

    executeParams := &athena.StartQueryExecutionInput{
        QueryString:         aws.String(QUERY),
        ResultConfiguration: resultConfig,
    }

    // Start Query Execution
    athenaExecution, err := client.StartQueryExecution(context.TODO(), executeParams)

    if err != nil {
        fmt.Println(err)
        return QueryExecution{}, err
    }
    executionId := *athenaExecution.QueryExecutionId

    return QueryExecution{
        QueryExecutionId: executionId,
    }, nil
}

func main() {
    lambda.Start(handler)
}
Enter fullscreen mode Exit fullscreen mode

When the Step Function is triggered, I pass the current date and current date - 1 (yesterday's date). Once the query execution starts, the 'QueryExecutionId' is returned by the function, which is passed to the subsequent state.

Wait State

Now the Step Function waits for the time specified in the config before proceeding.

Write to DB

The final state is another Lambda function which retrieves the query result from the S3 bucket and transforms the data to a Json object and writes the Json object back to another S3 bucket.
Here's the code:

package main

import (
    "bytes"
    "context"
    "encoding/csv"
    "encoding/json"
    "io"
    "net/http"

    "fmt"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/s3"
)

type QueryExecution struct {
    QueryExecutionId string `json:"queryId"`
}

type IpInfo struct {
    Query       string  `json:"query"`
    Status      string  `json:"status"`
    Country     string  `json:"country"`
    CountryCode string  `json:"countryCode"`
    Region      string  `json:"region"`
    RegionName  string  `json:"regionName"`
    City        string  `json:"city"`
    Zip         string  `json:"zip"`
    Lat         float64 `json:"lat"`
    Lon         float64 `json:"lon"`
    Timezone    string  `json:"timezone"`
    Isp         string  `json:"isp"`
    Org         string  `json:"org"`
    As          string  `json:"as"`
    Uri         string  `json:"uri"`
    RequestDate string  `json:"requestDate"`
    RequestTime string  `json:"requestTime"`
    UserAgent   string  `json:"userAgent"`
    Referer     string  `json:"referer"`
}

func handler(ctx context.Context, event QueryExecution) {
    sess, err := session.NewSession(&aws.Config{
        Region: aws.String(""), // replace with your desired region
    })
    if err != nil {
        fmt.Println("Error creating session:", err)
        return
    }
    // Create a new S3 service client
    svc := s3.New(sess)

    // Set the parameters for the object to retrieve
    params := &s3.GetObjectInput{
        Bucket: aws.String(""), // replace with your S3 bucket name
        Key:    aws.String(event.QueryExecutionId + ".csv"),     // replace with your S3 object key
    }

    resp, err := svc.GetObject(params)
    if err != nil {
        fmt.Println("Error retrieving object:", err)
        return
    }
    reader := csv.NewReader(resp.Body)

    record, err := reader.ReadAll()
    if err != nil {
        fmt.Println("Error reading CSV:", err)
        return
    }
    for i := 1; i < len(record); i++ {
        ipInfo := &IpInfo{}

        resp, err := http.Get("http://ip-api.com/json/" + record[i][4])
        if err != nil {
            fmt.Println(err)
        }
        defer resp.Body.Close()
        body, _ := io.ReadAll(resp.Body)
        err = json.Unmarshal(body, &ipInfo)
        if err != nil {
            fmt.Println(err)
            return
        }
        ipInfo.Uri = record[i][7]
        ipInfo.UserAgent = record[i][10]
        ipInfo.Referer = record[i][9]
        ipInfo.RequestDate = record[i][0]
        ipInfo.RequestTime = record[i][1]
        if err != nil {
            fmt.Println(err)
            return
        }
        jsonData, err := json.Marshal(ipInfo)
        if err != nil {
            fmt.Println(err)
            return
        }
        _, err = svc.PutObject(&s3.PutObjectInput{
            Body:   aws.ReadSeekCloser(bytes.NewReader(jsonData)),
            Bucket: aws.String("bucketname"),
            Key:    aws.String("refined-logs-" + record[i][0] + record[i][1] + ".json"),
        })
        if err != nil {
            fmt.Println(err)
            return
        }
    }
    resp.Body.Close()
}

func main() {
    lambda.Start(handler)
}

Enter fullscreen mode Exit fullscreen mode

Here, the Athena executionId is passed via the Step Function state and is processed. We call the third party api to fetch the location from the IP Address and form a Json object and write it back to S3.

Now another Lambda function is need which will be triggered by EventBridge at a particular time. This Lambda Function will trigger the step function and along with it pass today's and yesterday's date. Here's the code:

package main

import (
    "encoding/json"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go/aws"
    "github.com/aws/aws-sdk-go/aws/session"
    "github.com/aws/aws-sdk-go/service/sfn"
    "time"
)

const REGION = "<region>"

func handler() error {

    mySession := session.Must(session.NewSession())
    client := sfn.New(mySession, aws.NewConfig().WithRegion(REGION))
    layout := "2006-01-02"
    now := time.Now().UTC()
    tmw := now.AddDate(0, 0, 1)
    reqBody, err := json.Marshal(map[string]interface{}{
        "queryDateStart": now.Format(layout),
        "queryDateEnd":   tmw.Format(layout),
    })
    if err != nil {
        return err
    }
    input := string(reqBody)
    machineArn := "<arn-of-step-function>"
    _, err = client.StartExecution(&sfn.StartExecutionInput{
        Input:           &input,
        StateMachineArn: &machineArn,
    })
    if err != nil {
        return err
    }
    return nil
}

func main() {
    lambda.Start(handler)
}
Enter fullscreen mode Exit fullscreen mode

Now what to with the Json files in S3? Simple, create another Athena table and query the records you want.

Here's the CREATE TABLE Query

CREATE EXTERNAL TABLE `<tablename>`(
  `query` string COMMENT 'from deserializer', 
  `country` string COMMENT 'from deserializer', 
  `countrycode` string COMMENT 'from deserializer', 
  `city` string COMMENT 'from deserializer', 
  `uri` string COMMENT 'from deserializer', 
  `requestdate` date COMMENT 'from deserializer', 
  `requesttime` string COMMENT 'from deserializer', 
  `useragent` string COMMENT 'from deserializer', 
  `referer` string COMMENT 'from deserializer')
ROW FORMAT SERDE 
  'org.openx.data.jsonserde.JsonSerDe' 
WITH SERDEPROPERTIES ( 
  'case.insensitive'='TRUE', 
  'dots.in.keys'='FALSE', 
  'ignore.malformed.json'='FALSE', 
  'mapping'='TRUE') 
STORED AS INPUTFORMAT 
  'org.apache.hadoop.mapred.TextInputFormat' 
OUTPUTFORMAT 
  'org.apache.hadoop.hive.ql.io.HiveIgnoreKeyTextOutputFormat'
LOCATION
  's3://<bucket-name>/'
TBLPROPERTIES (
  'classification'='json', 
  'transient_lastDdlTime'='1683369051')
Enter fullscreen mode Exit fullscreen mode

Now I can query the site data based on visitor country, city and get the total visitor count. I wrote another Lambda function and added a scheduler to Event Bridge. This Lambda function will query the newly created Athena table, run the queries and put the result in a DynamoDB Table.
I will share the code which I use to fetch total users and users by country.

package main

import (
    "context"
    "encoding/json"
    "github.com/aws/aws-lambda-go/lambda"
    "github.com/aws/aws-sdk-go-v2/aws"
    "github.com/aws/aws-sdk-go-v2/config"
    "github.com/aws/aws-sdk-go-v2/feature/dynamodb/attributevalue"
    "github.com/aws/aws-sdk-go-v2/feature/dynamodb/expression"
    "github.com/aws/aws-sdk-go-v2/service/athena"
    "github.com/aws/aws-sdk-go-v2/service/athena/types"
    "github.com/aws/aws-sdk-go-v2/service/dynamodb"
    "log"
    "strconv"
    "time"
)

const REGION = "<region>"
const OUTPUT_BUCKET = "s3://<bucket-name>/"
const COUNTRY_USERS_QUERY = "SELECT country, count(*) as hits FROM <tablename>  group by country order by hits desc"
const REQUEST_DATE_QUERY = "select requestdate, count(*) as users from <tablename>  where requestdate > current_date - interval '30' day group by requestdate order by requestdate asc"

func extractData(athenaExecution *athena.StartQueryExecutionOutput,
    client *athena.Client,
    dynamodbClient *dynamodb.Client,
    calculateTotalUsers bool) {
    executionId := *athenaExecution.QueryExecutionId
    var qrop *athena.GetQueryExecutionOutput
    err := error(nil)
    for {
        qrop, err = client.GetQueryExecution(context.TODO(), &athena.GetQueryExecutionInput{
            QueryExecutionId: aws.String(executionId),
        })
        if err != nil {
            log.Print(err)
            return
        }
        if qrop.QueryExecution.Status.State != "RUNNING" && qrop.QueryExecution.Status.State != "QUEUED" {
            break
        }
        time.Sleep(2 * time.Second)
    }
    if qrop.QueryExecution.Status.State == "SUCCEEDED" {
        data, err := client.GetQueryResults(context.TODO(), &athena.GetQueryResultsInput{
            QueryExecutionId: aws.String(executionId),
        })
        if err != nil {
            log.Print(err)
            return
        }
        athenaData := make(map[string]string, 0)
        totalUsers := 0
        for _, item := range data.ResultSet.Rows[1:] {
            athenaData[*item.Data[0].VarCharValue] = *item.Data[1].VarCharValue
            userCountInt, _ := strconv.Atoi(*item.Data[1].VarCharValue)
            totalUsers += userCountInt
        }
        tableName := "<tablename>"
        key, err := attributevalue.MarshalMap(map[string]string{
            "_id": tableName,
        })
        if err != nil {
            log.Print(err)
        }
        jsonData, err := json.Marshal(athenaData)
        if err != nil {
            log.Print(err)
        }
        var upd expression.UpdateBuilder
        location, _ := time.LoadLocation(<timezone>)
        now := time.Now().In(location)
        layout := "2006-01-02 15:04:05"
        if calculateTotalUsers {
            upd = expression.Set(expression.Name("totalUsers"), expression.Value(totalUsers)).
                Set(expression.Name("usersByCountry"), expression.Value(string(jsonData))).
                Set(expression.Name("last_updated"), expression.Value(now.Format(layout)))
        } else {
            upd = expression.Set(expression.Name("monthlyUsers"), expression.Value(string(jsonData))).
                Set(expression.Name("last_updated"), expression.Value(now.Format(layout)))
        }
        expr, err := expression.NewBuilder().WithUpdate(upd).Build()
        if err != nil {
            log.Print(err)
        }
        _, err = dynamodbClient.UpdateItem(context.TODO(), &dynamodb.UpdateItemInput{
            Key:                       key,
            TableName:                 aws.String(tableName),
            ExpressionAttributeNames:  expr.Names(),
            ExpressionAttributeValues: expr.Values(),
            UpdateExpression:          expr.Update(),
        })
        if err != nil {
            log.Print(err)
        }
        return
    } else {
        log.Print(qrop.QueryExecution.Status.State)
    }
}

func handler() {
    cfg, err := config.LoadDefaultConfig(context.TODO(), func(o *config.LoadOptions) error {
        o.Region = REGION
        return nil
    })
    if err != nil {
        log.Print(err)
        return
    }
    client := athena.NewFromConfig(cfg)
    dynamodbClient := dynamodb.NewFromConfig(cfg)

    resultConfig := &types.ResultConfiguration{
        OutputLocation: aws.String(OUTPUT_BUCKET),
    }

    executeCountryUsersParams := &athena.StartQueryExecutionInput{
        QueryString:         aws.String(COUNTRY_USERS_QUERY),
        ResultConfiguration: resultConfig,
    }

    executeMonthlyUsersParams := &athena.StartQueryExecutionInput{
        QueryString:         aws.String(REQUEST_DATE_QUERY),
        ResultConfiguration: resultConfig,
    }

    // Start Query Execution
    athenaCountryUsersExecution, err := client.StartQueryExecution(context.TODO(), executeCountryUsersParams)
    if err != nil {
        log.Print(err)
        return
    }
    athenaMonthlyUsersExecution, err := client.StartQueryExecution(context.TODO(), executeMonthlyUsersParams)

    if err != nil {
        log.Print(err)
        return
    }
    extractData(athenaCountryUsersExecution, client, dynamodbClient, true)
    extractData(athenaMonthlyUsersExecution, client, dynamodbClient, false)

}

func main() {
    lambda.Start(handler)
}

Enter fullscreen mode Exit fullscreen mode

Now that the aggregate result is in DynamoDB. I plan to create a simple Dashboard using NextJS and Tailwind to display the data.
In this little endeavour of I used 7 AWS services. Overkill? Maybe but It is a good way to learn about AWS Services.

Top comments (0)