Background
💎 This example demonstrates how to implement graceful shutdown for an asynchronous process that handles heavy email devivery tasks using SQS.
💎 In the case of an HTTP server, you can simply rely on the standard library Server.Shutdown() method to safely stop the server. However, since this is an asynchronous queue processor (not an HTTP server), we can't use that mechanism - so we need to implement graceful shutdown manually.
Why graceful shutdown is necessary
When a container is terminated, we want to ensure that some ongoing processes finish properly before the container actually stops.
SQS workers offen execute logn-running jobs. Some may takes several minutes, even close to 1 hour.
Without graceful shutdown, if a deployment happens while a worker is still processing a message, the process might be killed in the middle of the execution.
How to implement
We can handle graceful shutdown by responding to system signals.
-
SIGTERM- sent when the container is terminated (e.g., during a deployment) -
SIGINT- sent when the process is manually stopped (e.g., Ctrl+C) If you consider to handle these signals, you can ensure that no matter when a deployment happens, the process is always wait for all ongoing tasks to finish before exiting — allowing the system to shud down safely and gracefully.
Example Implementation
I will show the flows and code below.🚀
Startup
↓
Create SQS session
↓
ReceiveMessage in a for loop
↓
Process each message in a goroutine
↓
SIGINT / SIGTERM received → ctx.Cancel()
↓
Exit loop & wait for all wg to finish
↓
Log output and perform graceful shutdown
// main.go
sigChan := make(chan os.Signal, 1)
signal.Notify(sigChan, syscall.SIGINT, syscall.SIGTERM)
ctx, cancel := context.WithCancel(context.Background())
defer cancel()
var wg sync.WaitGroup
// Listen for signals in the goroutine
go func() {
 <-sigChan
 cancel()
 wg.Wait()
 os.Exit(0)
}()
// Initialize AWS session
session, err := session.NewSession(&aws.Config{
 Region: aws.String("your-region"),
})
if err != nil {
 return errors.Wrap(err, "session.NewSession")
}
client := sqs.New(session)
// Define SQS receive message configuration
receiveMessageInput := &sqs.ReceiveMessageInput{
 QueueUrl: xxx,
 MaxNumberOfMessages: yyy
 MessageAttributeNames: zzz
}
for {
 select {
 case <-ctx.Done():
  // Exit the loop if context is cancelled
  return nil
 default:
  result, err := client.ReceiveMessage(receiveMessageInput)
  if err != nil {
   time.Sleep(1 * time.Second) // Retry after a short delay
   continue
 }
  if len(result.Messages) == 0 {
 continue
 }
  for _, message := range result.Messages {
   wg.Add(1)
go func(msg *sqs.Message) {
// Process received messages
 }(message)
  }
 }
}
Top comments (0)