DEV Community

Dsysd Dev
Dsysd Dev

Posted on

How I Implemented a Cost-Effective and Scalable AWS Alternative to Kafka in Golang

Pre-requisites

To understand this post, you will need to understand the differences between publisher-subscriber and producer-consumer pattern.

If you are not familiar with it, worry not!

I got you covered, read this post first.

All of the code is open-sourced at: https://github.com/dsysd-dev/hake

Let’s Dive in

As we discussed before, we can build a very scalable alternative to Kafka on AWS using SNS + SQS combination.

In this setting, a topic is equivalent to SNS topic while consumer groups are equivalent to SQS queues.

Server Struct

// Server is hake server side code
// Server is hake server side code
type Server struct {
 // nc is sns client to interact with aws sns
 nc *sns.SNS

 // qc is sqs client to interact with aws sns
 qc *sqs.SQS

 // tc is sts client to fetch account details for formulating ARNs
 tc *sts.STS

 region AwsRegion

 mu      sync.Mutex // for concurrent access to the vars below
 account AwsAccount

 accessKey AwsAccessKey
 secretKey AwsSecretKey
}
Enter fullscreen mode Exit fullscreen mode

The Server struct is a type that represents a server-side component of a system that interacts with AWS services such as SNS (Simple Notification Service), SQS (Simple Queue Service), and STS (Security Token Service).

The Server struct has the following fields:

  1. nc: An SNS client that is used to interact with AWS SNS. SNS is a messaging service that enables you to send push notifications to mobile devices, email addresses, and other endpoints.
  2. qc: An SQS client that is used to interact with AWS SQS. SQS is a message queue service that enables you to decouple and scale microservices, distributed systems, and serverless applications.
  3. tc: An STS client that is used to fetch account details for formulating ARNs (Amazon Resource Names). ARNs are unique identifiers for AWS resources, such as SNS topics and SQS queues.
  4. region: An AwsRegion type that represents the AWS region where the Server instance is deployed.
  5. mu: A sync.Mutex field that is used for concurrent access to the account, accessKey, and secretKey fields.
  6. account: An AwsAccount type that represents the AWS account associated with the Server instance.
  7. accessKey: An AwsAccessKey type that represents the access key associated with the Server instance.
  8. secretKey: An AwsSecretKey type that represents the secret key associated with the Server instance.

The Server struct is used to encapsulate the logic for interacting with AWS services in a single component, which can then be easily reused and extended in other parts of the system.

The nc, qc, and tc fields are clients for interacting with specific AWS services, while the region, account, accessKey, and secretKey fields represent the configuration for the AWS services.

The mu field is used to ensure that the account field is accessed in a thread-safe manner.

NewServer Function

// creates a new hake server instance
func NewServer(opts ...ServerOption) *Server {
 // create clients
 s := &Server{region: "ap-south-1"}

 for _, opt := range opts {
  opt(s)
 }

 credsConfig := aws.NewConfig()
 if s.accessKey != "" && s.secretKey != "" {
  credsConfig = credsConfig.WithCredentials(
   credentials.NewStaticCredentials(
    s.accessKey.String(),
    s.secretKey.String(),
    "",
   ),
  )
 }

 sess, err := session.NewSession(
  aws.NewConfig().WithRegion(string(s.region)),
  credsConfig,
 )
 s.secretKey = AwsSecretKey("") // remove from memory and let GC do its work

 if err != nil {
  panic(err)
 }

 s.nc = sns.New(sess)
 s.qc = sqs.New(sess)
 s.tc = sts.New(sess)

 return s
}
Enter fullscreen mode Exit fullscreen mode

The NewServer function creates a new Server instance with the specified ServerOption arguments. Here's a step-by-step breakdown of what's happening in the function:

  • The function starts by creating a new Server instance with the default region set to ap-south-1. This region can be overridden by passing an appropriate ServerOption argument.

  • Next, the function loops through the ServerOption arguments passed to the function and applies each one to the Server instance using the opt(s) syntax. These options can be used to further configure the Server instance beyond the default settings.

  • After applying the options, the function creates a new aws.Config instance with the appropriate region and credentials. If the accessKey and secretKey fields of the Server instance are set, the function creates a new credentials.NewStaticCredentials instance with those credentials and adds it to the aws.Config instance. This allows the Session to authenticate with AWS using the specified access and secret keys.

  • The function then creates a new Session instance using the aws.Config object created in the previous step. The Session object is the main entry point for interacting with AWS services in Go.

  • If there was an error creating the Session, the function panics with the error.

  • Otherwise, the function creates new sns, sqs, and sts clients using the Session object, and sets them on the Server instance. These clients are used to interact with the SNS, SQS, and STS services respectively.

  • Finally, the function sets the AwsSecretKey field of the Server instance to an empty string, which effectively removes the secret key from memory and lets the garbage collector free up the memory. This is a security measure to prevent sensitive data from lingering in memory unnecessarily.

CreateTopic Method

// Create a new kafka like topic
func (s *Server) CreateTopic(topic Topic) (topicARN string, err error) {

 // Todo: check if this is an idempotent operation or not ..
 out, err := s.nc.CreateTopic(&sns.CreateTopicInput{Name: aws.String(topic.String())})
 if err != nil {
  return "", err
 }

 return *out.TopicArn, nil

}
Enter fullscreen mode Exit fullscreen mode

The CreateTopic method takes a Topic struct as input and creates an SNS topic with the given topic name.

It returns the topic’s Amazon Resource Name (ARN), which is a unique identifier for the topic in AWS.
CreateSubscriberQueue Method

// Creates subscriber to a topic with correct policies
func (s *Server) CreateSubscriberQueue(topic Topic, queueName string) (err error) {
 // 1. create queue with correct policy
 _, err = s.qc.CreateQueue(&sqs.CreateQueueInput{
  QueueName: aws.String(queueName),
  Attributes: map[string]*string{
   "DelaySeconds":      aws.String("0"),
   "VisibilityTimeout": aws.String("120"), // 2min
   "Policy":            aws.String(s.Policy(topic, queueName)),
  },
 })

 if err != nil {
  return err
 }

 // 2. create subscription to the topic
 _, err = s.nc.Subscribe(&sns.SubscribeInput{
  Endpoint: aws.String(s.Arn(Sqs, queueName)),
  Protocol: aws.String(Sqs),
  TopicArn: aws.String(s.Arn(Sns, topic.String())),
  Attributes: map[string]*string{
   "RawMessageDelivery": aws.String("true"),
  },
 })
 if err != nil {
  log.Printf("err while creating subscription: %s\n", err)
  return
 }

 return nil

}
Enter fullscreen mode Exit fullscreen mode

The CreateSubscriberQueue method takes a Topic struct and a queueName string as input, and creates an SQS queue with the given queue name.

It then creates a subscription to the SNS topic identified by the Topic struct, with the SQS queue as the endpoint for the subscription.

This means that any messages published to the SNS topic will be delivered to the SQS queue.

The method also sets the appropriate queue attributes and policies to ensure that messages are delivered correctly.

SendMessageOnTopic Method

func (s *Server) SendMessageOnTopic(topic Topic, reader io.Reader) (string, error) {

 // create topic arn
 topicArn := s.Arn(Sns, topic.String())

 var buffer [10 * 1024 * 1024]byte // Todo: reduce the size of this buffer
 n, err := reader.Read(buffer[:])
 if err != nil {
  return "", err
 }

 out, err := s.nc.Publish(&sns.PublishInput{
  Message:  aws.String(string(buffer[:n])),
  TopicArn: &topicArn,
 })
 if err != nil {
  return "", err
 }

 return *out.MessageId, nil
}
Enter fullscreen mode Exit fullscreen mode

This is a method defined on the Server struct. The purpose of this method is to send a message on a specified topic in AWS SNS (Simple Notification Service).

The method takes two arguments:

  • topic of type Topic: specifies the topic to which the message needs to be sent.
  • reader of type io.Reader: a reader that provides the message to be sent.

The method returns two values:

  • string: the unique message ID assigned by AWS.
  • error: if there is any error encountered during the message-sending process.

The method first creates the topic ARN (Amazon Resource Name) by calling the Arn() method on the Server struct with arguments Sns and topic.String().

It then reads the message from the reader provided and stores it in a buffer of size 10MB.

Finally, it calls the Publish() method on the nc (SNS client) field of the Server struct, passing the message to be sent and the topic ARN. If the message is successfully published, it returns the message ID, otherwise, it returns an error.

This is it, there are some helper functions but it is left as an exercise for the reader to understand them.

Also, there are still many features under development, refer to the git repository.

Claps Please!

If you found this article helpful I would appreciate some claps 👏👏👏👏, it motivates me to write more such useful articles in the future.

Follow me for regular awesome content and insights.
Subscribe to my Newsletter

If you like my content, then consider subscribing to my free newsletter, to get exclusive, educational, technical, interesting and career related content directly delivered to your inbox

https://dsysd.beehiiv.com/subscribe

Important Links

Thanks for reading the post, be sure to follow the links below for even more awesome content in the future.

Twitter: https://twitter.com/dsysd_dev
Youtube: https://www.youtube.com/@dsysd-dev
Github: https://github.com/dsysd-dev
Medium: https://medium.com/@dsysd-dev
Email: dsysd.mail@gmail.com
Linkedin: https://www.linkedin.com/in/dsysd-dev/
Newsletter: https://dsysd.beehiiv.com/subscribe
Gumroad: https://dsysd.gumroad.com/
Dev.to: https://dev.to/dsysd_dev/

Top comments (2)

Collapse
 
rio_albert profile image
Albert Rio

The Search for an Alternative - After thorough research and testing by Nth Number of Golang developer, We came across an excellent alternative called NATS Streaming. NATS Streaming is an open-source, high-performance messaging system that provides reliable message streaming for distributed systems. It offers many of the same features as Kafka, such as publish-subscribe messaging and fault-tolerant data replication, but without the dependency on AWS.

Implementation with Golang - Implementing NATS Streaming in Golang was a breeze. The NATS Streaming client library for Golang provides a simple and intuitive API that allowed me to quickly integrate it into my existing codebase. With just a few lines of code, I was able to establish connections, publish messages, and consume streams of data.

Collapse
 
dsysd_dev profile image
Dsysd Dev

dev.to/dsysd_dev/deploying-a-go-ap...

I wrote a blog on that too, was referring to a more AWS alternative