Background
π This example demonstrates how to implement graceful shutdown for an asynchronous process that handles heavy email devivery tasks using SQS.
π In the case of an HTTP server, you can simply rely on the standard library Server.Shutdown()
method to safely stop the server. However, since this is an asynchronous queue processor (not an HTTP server), we can't use that mechanism - so we need to implement graceful shutdown manually.
Why graceful shutdown is necessary
When a container is terminated, we want to ensure that some ongoing processes finish properly before the container actually stops.
SQS workers offen execute logn-running jobs. Some may takes several minutes, even close to 1 hour.
Without graceful shutdown, if a deployment happens while a worker is still processing a message, the process might be killed in the middle of the execution.
How to implement
We can handle graceful shutdown by responding to system signals.
-
SIGTERM
- sent when the container is terminated (e.g., during a deployment) -
SIGINT
- sent when the process is manually stopped (e.g., Ctrl+C) If you consider to handle these signals, you can ensure that no matter when a deployment happens, the process is always wait for all ongoing tasks to finish before exiting β allowing the system to shud down safely and gracefully.
Example Implementation
I will show the flows and code below.π
Startup
β
Create SQS session
β
ReceiveMessage in a for loop
β
Process each message in a goroutine
β
SIGINT / SIGTERM received β ctx.Cancel()
β
Exit loop & wait for all wg to finish
β
Log output and perform graceful shutdown
// main.go
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
// Listen for signals in the goroutine
go func() {
γ<-sigChan
γcancel()
γwg.Wait()
γos.Exit(0)
}()
// Initialize AWS session
session, err := session.NewSession(&aws.Config{
γRegion: aws.String("your-region"),
})
if err != nil {
γreturn errors.Wrap(err, "session.NewSession")
}
client := sqs.New(session)
// Define SQS receive message configuration
receiveMessageInput := &sqs.ReceiveMessageInput{
γQueueUrl: xxx,
γMaxNumberOfMessages: yyy
γMessageAttributeNames: zzz
}
for {
γselect {
γcase <-ctx.Done():
γγ// Exit the loop if context is cancelled
γγreturn nil
γdefault:
γγresult, err := client.ReceiveMessage(receiveMessageInput)
γγif err != nil {
γγγtime.Sleep(1 * time.Second) // Retry after a short delay
γγγcontinue
γ}
γγif len(result.Messages) == 0 {
γcontinue
γ}
γγfor _, message := range result.Messages {
γγγwg.Add(1)
go func(msg *sqs.Message) {
// Process received messages
γ}(message)
γγ}
γ}
}
Top comments (0)