DEV Community

Cover image for Event Based System with Localstack (Elixir Edition): Notifing to SQS when a file its uploaded
Nicol Acosta
Nicol Acosta

Posted on

2

Event Based System with Localstack (Elixir Edition): Notifing to SQS when a file its uploaded

Configure SQS

Easy! if you read the past post "Uploading files to S3 with PresignedURL's" you know how to configure in the docker-compose.yml file, so add SQS to the localstack init its simple

The git diff its this

  localstack:
    image: localstack/localstack:latest
    ports:
      - 4566:4566
    environment:
      # own env vars
      BUCKET_NAME: files
+     BUCKET_QUEUE: new_files_queue
+     QUEUES: general_events_queue,example_queue
      # service env vars
      DEFAULT_REGION: us-west-2
      AWS_ACCESS_KEY_ID: test
      AWS_SECRET_ACCESS_KEY: test
-     SERVICES: s3
+     SERVICES: s3,sqs
      DISABLE_CORS_CHECKS: 1
      PROVIDER_OVERRIDE_S3: asf
      S3_SKIP_SIGNATURE_VALIDATION: 1
    volumes:
      - ./.localstack:/var/lib/localstack
      - ./init_localstack.sh:/etc/localstack/init/ready.d/init_localstack.sh
Enter fullscreen mode Exit fullscreen mode

Full code here

And add this diff of the localstack_init.sh file

...
echo "########### Setting resource names as env variables ###########"
LOCALSTACK_DUMMY_ID=000000000000

+ guess_arn_for_sqs() {
+     local QUEUE_NAME=$1
+     echo "arn:aws:sqs:${DEFAULT_REGION}:${LOCALSTACK_DUMMY_ID}:$QUEUE_+ NAME"
+ }
+ 
+ create_queue() {
+     local QUEUE_NAME_TO_CREATE=$1
+     awslocal sqs create-queue\
+         --region $DEFAULT_REGION\
+         --queue-name $QUEUE_NAME_TO_CREATE
+ }
+ 
+ echo "########### Creating upload file event SQS ###########"
+ create_queue $BUCKET_QUEUE
+ BUCKET_QUEUE_ARN=$(guess_arn_for_sqs $BUCKET_QUEUE)
+ 
+ echo "########### Creating queues in SQS ###########"
+ IFS=','
+ read -ra Queues <<< "$QUEUES"
+ for q in "${Queues[@]}";
+ do
+   create_queue $q
+ done
+ 
echo "########### Create S3 bucket ###########"
awslocal s3api create-bucket\
    --region $DEFAULT_REGION\
...
      ]
    }'

+ 
+ echo "########### Set S3 bucket notification configurations ###########"
+ aws --endpoint-url=http://localhost:4566 s3api put-bucket-notification-configuration\
+     --bucket $BUCKET_NAME\
+     --notification-configuration  '{
+                                       "QueueConfigurations": + [
+                                          {
+                                            "QueueArn": "'"$BUCKET_QUEUE_ARN"'",
+                                            "Events": ["s3:ObjectCreated:Put"]
+                                          }
+                                        ]
+                                      }'
+ 
echo "########### List S3 bucket ###########"
awslocal s3api list-buckets
+ 
+ echo "########### Get S3 bucket notification configurations ###########"
+ aws --endpoint-url=http://localhost:4566 s3api get-bucket-+ + notification-configuration\
+     --bucket $BUCKET_NAME
Enter fullscreen mode Exit fullscreen mode

Full code here

And add this config to the config.exs

aws_host = System.get_env("AWS_HOST")
aws_region = System.get_env("AWS_REGION")
aws_port = System.get_env("AWS_PORT")

config :ex_aws, :sqs,
  scheme: "http://",
  region: aws_region,
  host: aws_host,
  port: aws_port,
  base_queue_url: "http://#{aws_host}:#{aws_port}/000000000000/",
  new_files_queue: System.get_env("AWS_SQS_NEW_FILES_QUEUE"),
  general_events_queue: System.get_env("AWS_SQS_GENERAL_EVENTS_QUEUE")
Enter fullscreen mode Exit fullscreen mode

In this config we are configuring the whole AWS part and also a personal config that we will use in next steps

Listening the Queues

Now we have SQS Configured, but, who is listening?

To listen a message broker the most used library is broadway, this library helps to create GenServer's that listens a specific queue and process message by message (or by chunks).

The fisrt queue that needs to be listened it's the new files queues.

Basically get's the file data and adds a message to the next queue, see the function BroadwayGeneralEvents.insert_message(data)

defmodule EventsArqBackend.QueueWorkers.BroadwayNewFileEvents do
  @moduledoc false
  use Broadway

  alias Broadway.Message
  alias EventsArqBackend.QueueWorkers.BroadwayGeneralEvents

  def start_link(_opts) do
    {module, opts} = producer_module()
    options = opts ++ [queue_url: queue_url()]

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {module, options}
      ],
      processors: [
        default: []
      ]
    )
  end

  @impl true
  def handle_message(_processor, %Message{data: data} = message, _context) do
    decoded_data =
      case Jason.decode!(data) do
        %{"Message" => message} -> Jason.decode!(message)
        message -> message
      end

    %{"Records" => records} = decoded_data

    Enum.each(records, fn
      %{
        "eventName" => event_name,
        "eventSource" => "aws:s3",
        "eventTime" => event_time,
        "s3" => %{
          "bucket" => %{
            "name" => bucket_name
          },
          "object" => %{
            "eTag" => entity_id,
            "key" => object_key,
            "size" => object_size
          }
        }
      }
      when event_name in ["ObjectCreated:Put", "ObjectCreated:Post"] ->
        data = %{
          bucket_name: bucket_name,
          entity_id: entity_id,
          object_key: object_key,
          object_size: object_size,
          inserted_at: event_time
        }

        BroadwayGeneralEvents.insert_message(data)

      _ ->
        :ok
    end)

    Message.update_data(message, fn _data -> decoded_data end)
  end

  defp queue_url,
    do:
      "#{Application.get_env(:ex_aws, :sqs)[:base_queue_url]}#{Application.get_env(:ex_aws, :sqs)[:new_files_queue]}"

  defp producer_module, do: Application.get_env(:events_arq_backend, :broadway)[:producer_module]
end
Enter fullscreen mode Exit fullscreen mode

And the other queue listener

defmodule EventsArqBackend.QueueWorkers.BroadwayGeneralEvents do
  @moduledoc false
  use Broadway

  alias Broadway.Message

  def start_link(_opts) do
    {module, opts} = producer_module()
    options = opts ++ [queue_url: queue_url()]

    Broadway.start_link(__MODULE__,
      name: __MODULE__,
      producer: [
        module: {module, options}
      ],
      processors: [
        default: []
      ]
    )
  end

  @impl true
  def handle_message(_processor, %Message{data: data} = message, _context) do
    decoded_data =
      case Jason.decode!(data) do
        %{"Message" => message} -> Jason.decode!(message)
        message -> message
      end

    IO.inspect(decoded_data, label: "*****************")

    # do something with the data
    # send notification

    Message.update_data(message, fn _data -> decoded_data end)
  end

  def insert_message(data), do: SqsClient.add_message_to_queue(queue_name(), data)

  defp queue_name, do: Application.get_env(:ex_aws, :sqs)[:general_events_queue]
  defp queue_url, do: "#{Application.get_env(:ex_aws, :sqs)[:base_queue_url]}#{queue_name()}"
  defp producer_module, do: Application.get_env(:events_arq_backend, :broadway)[:producer_module]
end
Enter fullscreen mode Exit fullscreen mode

So... 2 queues for the same event? why?

Exist a reason, normally all the queues that you use in sqs are the .fifo queues, and the queue for the s3 must be a default queue (no-.fifo-queue) and that's the reason that i created 2 queues, a regular queue for s3 events and a .fifo queue for the other ones

The full code of this changes it's here

In the next post we will:

  • Create a Channel on Phoenix
  • Integrate Phoenix Channels to a React project
  • Broadcast to all connected clients for some update

See you in the next posts

Image of Docusign

🛠️ Bring your solution into Docusign. Reach over 1.6M customers.

Docusign is now extensible. Overcome challenges with disconnected products and inaccessible data by bringing your solutions into Docusign and publishing to 1.6M customers in the App Center.

Learn more

Top comments (0)

A Workflow Copilot. Tailored to You.

Pieces.app image

Our desktop app, with its intelligent copilot, streamlines coding by generating snippets, extracting code from screenshots, and accelerating problem-solving.

Read the docs