Pre-requisites
To understand this post, you will need to understand the differences between publisher-subscriber and producer-consumer pattern.
If you are not familiar with it, worry not!
I got you covered, read this post first.
All of the code is open-sourced at: https://github.com/dsysd-dev/hake
Let’s Dive in
As we discussed before, we can build a very scalable alternative to Kafka on AWS using SNS + SQS combination.
In this setting, a topic is equivalent to SNS topic while consumer groups are equivalent to SQS queues.
Server Struct
// Server is hake server side code
// Server is hake server side code
type Server struct {
// nc is sns client to interact with aws sns
nc *sns.SNS
// qc is sqs client to interact with aws sns
qc *sqs.SQS
// tc is sts client to fetch account details for formulating ARNs
tc *sts.STS
region AwsRegion
mu sync.Mutex // for concurrent access to the vars below
account AwsAccount
accessKey AwsAccessKey
secretKey AwsSecretKey
}
The Server struct is a type that represents a server-side component of a system that interacts with AWS services such as SNS (Simple Notification Service), SQS (Simple Queue Service), and STS (Security Token Service).
The Server struct has the following fields:
-
nc
: An SNS client that is used to interact with AWS SNS. SNS is a messaging service that enables you to send push notifications to mobile devices, email addresses, and other endpoints. -
qc
: An SQS client that is used to interact with AWS SQS. SQS is a message queue service that enables you to decouple and scale microservices, distributed systems, and serverless applications. -
tc
: An STS client that is used to fetch account details for formulating ARNs (Amazon Resource Names). ARNs are unique identifiers for AWS resources, such as SNS topics and SQS queues. -
region
: An AwsRegion type that represents the AWS region where the Server instance is deployed. -
mu
: A sync.Mutex field that is used for concurrent access to the account, accessKey, and secretKey fields. -
account
: An AwsAccount type that represents the AWS account associated with the Server instance. -
accessKey
: An AwsAccessKey type that represents the access key associated with the Server instance. -
secretKey
: An AwsSecretKey type that represents the secret key associated with the Server instance.
The Server struct is used to encapsulate the logic for interacting with AWS services in a single component, which can then be easily reused and extended in other parts of the system.
The nc
, qc
, and tc
fields are clients for interacting with specific AWS services, while the region
, account
, accessKey
, and secretKey
fields represent the configuration for the AWS services.
The mu
field is used to ensure that the account field is accessed in a thread-safe manner.
NewServer
Function
// creates a new hake server instance
func NewServer(opts ...ServerOption) *Server {
// create clients
s := &Server{region: "ap-south-1"}
for _, opt := range opts {
opt(s)
}
credsConfig := aws.NewConfig()
if s.accessKey != "" && s.secretKey != "" {
credsConfig = credsConfig.WithCredentials(
credentials.NewStaticCredentials(
s.accessKey.String(),
s.secretKey.String(),
"",
),
)
}
sess, err := session.NewSession(
aws.NewConfig().WithRegion(string(s.region)),
credsConfig,
)
s.secretKey = AwsSecretKey("") // remove from memory and let GC do its work
if err != nil {
panic(err)
}
s.nc = sns.New(sess)
s.qc = sqs.New(sess)
s.tc = sts.New(sess)
return s
}
The NewServer function creates a new Server instance with the specified ServerOption
arguments. Here's a step-by-step breakdown of what's happening in the function:
The function starts by creating a new Server instance with the default region set to
ap-south-1
. This region can be overridden by passing an appropriateServerOption
argument.Next, the function loops through the
ServerOption
arguments passed to the function and applies each one to the Server instance using the opt(s) syntax. These options can be used to further configure the Server instance beyond the default settings.After applying the options, the function creates a new
aws.Config
instance with the appropriate region and credentials. If theaccessKey
andsecretKey
fields of the Server instance are set, the function creates a newcredentials.NewStaticCredentials
instance with those credentials and adds it to theaws.Config
instance. This allows the Session to authenticate with AWS using the specified access and secret keys.The function then creates a new Session instance using the
aws.Config
object created in the previous step. The Session object is the main entry point for interacting with AWS services in Go.If there was an error creating the Session, the function panics with the error.
Otherwise, the function creates new sns, sqs, and sts clients using the Session object, and sets them on the Server instance. These clients are used to interact with the SNS, SQS, and STS services respectively.
Finally, the function sets the
AwsSecretKey
field of the Server instance to an empty string, which effectively removes the secret key from memory and lets the garbage collector free up the memory. This is a security measure to prevent sensitive data from lingering in memory unnecessarily.
CreateTopic
Method
// Create a new kafka like topic
func (s *Server) CreateTopic(topic Topic) (topicARN string, err error) {
// Todo: check if this is an idempotent operation or not ..
out, err := s.nc.CreateTopic(&sns.CreateTopicInput{Name: aws.String(topic.String())})
if err != nil {
return "", err
}
return *out.TopicArn, nil
}
The CreateTopic
method takes a Topic struct as input and creates an SNS topic with the given topic name.
It returns the topic’s Amazon Resource Name (ARN), which is a unique identifier for the topic in AWS.
CreateSubscriberQueue Method
// Creates subscriber to a topic with correct policies
func (s *Server) CreateSubscriberQueue(topic Topic, queueName string) (err error) {
// 1. create queue with correct policy
_, err = s.qc.CreateQueue(&sqs.CreateQueueInput{
QueueName: aws.String(queueName),
Attributes: map[string]*string{
"DelaySeconds": aws.String("0"),
"VisibilityTimeout": aws.String("120"), // 2min
"Policy": aws.String(s.Policy(topic, queueName)),
},
})
if err != nil {
return err
}
// 2. create subscription to the topic
_, err = s.nc.Subscribe(&sns.SubscribeInput{
Endpoint: aws.String(s.Arn(Sqs, queueName)),
Protocol: aws.String(Sqs),
TopicArn: aws.String(s.Arn(Sns, topic.String())),
Attributes: map[string]*string{
"RawMessageDelivery": aws.String("true"),
},
})
if err != nil {
log.Printf("err while creating subscription: %s\n", err)
return
}
return nil
}
The CreateSubscriberQueue
method takes a Topic struct and a queueName string as input, and creates an SQS queue with the given queue name.
It then creates a subscription to the SNS topic identified by the Topic struct, with the SQS queue as the endpoint for the subscription.
This means that any messages published to the SNS topic will be delivered to the SQS queue.
The method also sets the appropriate queue attributes and policies to ensure that messages are delivered correctly.
SendMessageOnTopic
Method
func (s *Server) SendMessageOnTopic(topic Topic, reader io.Reader) (string, error) {
// create topic arn
topicArn := s.Arn(Sns, topic.String())
var buffer [10 * 1024 * 1024]byte // Todo: reduce the size of this buffer
n, err := reader.Read(buffer[:])
if err != nil {
return "", err
}
out, err := s.nc.Publish(&sns.PublishInput{
Message: aws.String(string(buffer[:n])),
TopicArn: &topicArn,
})
if err != nil {
return "", err
}
return *out.MessageId, nil
}
This is a method defined on the Server struct. The purpose of this method is to send a message on a specified topic in AWS SNS (Simple Notification Service).
The method takes two arguments:
- topic of type Topic: specifies the topic to which the message needs to be sent.
- reader of type
io.Reader
: a reader that provides the message to be sent.
The method returns two values:
-
string
: the unique message ID assigned by AWS. -
error
: if there is any error encountered during the message-sending process.
The method first creates the topic ARN (Amazon Resource Name) by calling the Arn()
method on the Server struct with arguments Sns and topic.String()
.
It then reads the message from the reader provided and stores it in a buffer of size 10MB.
Finally, it calls the Publish()
method on the nc
(SNS client) field of the Server struct, passing the message to be sent and the topic ARN. If the message is successfully published, it returns the message ID, otherwise, it returns an error.
This is it, there are some helper functions but it is left as an exercise for the reader to understand them.
Also, there are still many features under development, refer to the git repository.
Claps Please!
If you found this article helpful I would appreciate some claps 👏👏👏👏, it motivates me to write more such useful articles in the future.
Follow me for regular awesome content and insights.
Subscribe to my Newsletter
If you like my content, then consider subscribing to my free newsletter, to get exclusive, educational, technical, interesting and career related content directly delivered to your inbox
Important Links
Thanks for reading the post, be sure to follow the links below for even more awesome content in the future.
Twitter: https://twitter.com/dsysd_dev
Youtube: https://www.youtube.com/@dsysd-dev
Github: https://github.com/dsysd-dev
Medium: https://medium.com/@dsysd-dev
Email: dsysd.mail@gmail.com
Linkedin: https://www.linkedin.com/in/dsysd-dev/
Newsletter: https://dsysd.beehiiv.com/subscribe
Gumroad: https://dsysd.gumroad.com/
Dev.to: https://dev.to/dsysd_dev/
Top comments (2)
The Search for an Alternative - After thorough research and testing by Nth Number of Golang developer, We came across an excellent alternative called NATS Streaming. NATS Streaming is an open-source, high-performance messaging system that provides reliable message streaming for distributed systems. It offers many of the same features as Kafka, such as publish-subscribe messaging and fault-tolerant data replication, but without the dependency on AWS.
Implementation with Golang - Implementing NATS Streaming in Golang was a breeze. The NATS Streaming client library for Golang provides a simple and intuitive API that allowed me to quickly integrate it into my existing codebase. With just a few lines of code, I was able to establish connections, publish messages, and consume streams of data.
dev.to/dsysd_dev/deploying-a-go-ap...
I wrote a blog on that too, was referring to a more
AWS
alternative