DEV Community

Minwook Je
Minwook Je

Posted on • Edited on

Fluent Bit on CloudWatch

Architecture

Event format

The Fluent Bit wire protocol represents an Event as a two-element array with a nested array as the first element:

[[TIMESTAMP, METADATA], MESSAGE]
Enter fullscreen mode Exit fullscreen mode
  • TIMESTAMP is a timestamp in seconds as an integer or floating point value (not a string).
  • METADATA is an object containing event metadata, and might be empty.
  • MESSAGE is an object containing the event body.

record_accessor syntax

If you need the log group or stream name to be based on the contents of the log record itself. Use record_accessor syntax

Fluent Bit on CloudWatch

  1. If you don't already have a namespace called amazon-cloudwatch, create one by entering the following command:
kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/cloudwatch-namespace.yaml
Enter fullscreen mode Exit fullscreen mode
# create amazon-cloudwatch namespace
apiVersion: v1
kind: Namespace
metadata:
  name: amazon-cloudwatch
  labels:
    name: amazon-cloudwatch
Enter fullscreen mode Exit fullscreen mode
  1. Run the following command to create a ConfigMap named cluster-info with the cluster name and the Region to send logs to. Replace cluster-name and cluster-region with your cluster's name and Region.
ClusterName=dev 🔥
RegionName=ap-northeast-2 🔥
FluentBitHttpPort=''
FluentBitReadFromHead='Off'
[[ ${FluentBitReadFromHead} = 'On' ]] && FluentBitReadFromTail='Off'|| FluentBitReadFromTail='On'
[[ -z ${FluentBitHttpPort} ]] && FluentBitHttpServer='Off' || FluentBitHttpServer='On'

kubectl create configmap fluent-bit-cluster-info \
--from-literal=cluster.name=${ClusterName} \
--from-literal=http.server=${FluentBitHttpServer} \
--from-literal=http.port=${FluentBitHttpPort} \
--from-literal=read.head=${FluentBitReadFromHead} \
--from-literal=read.tail=${FluentBitReadFromTail} \
--from-literal=logs.region=${RegionName} -n amazon-cloudwatch
Enter fullscreen mode Exit fullscreen mode

In this command, the FluentBitHttpServer for monitoring plugin metrics is on by default. To turn it off, change the third line in the command to FluentBitHttpPort='' (empty string) in the command.
Also by default, Fluent Bit reads log files from the tail, and will capture only new logs after it is deployed. If you want the opposite, set FluentBitReadFromHead='On' and it will collect all logs in the file system.

  1. apply fluent bit daemonset to the cluster
kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluent-bit/fluent-bit.yaml
Enter fullscreen mode Exit fullscreen mode
apiVersion: v1
kind: ServiceAccount
metadata:
  name: fluent-bit
  namespace: amazon-cloudwatch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
  name: fluent-bit-role
rules:
  - nonResourceURLs:
      - /metrics
    verbs:
      - get
  - apiGroups: [""]
    resources:
      - namespaces
      - pods
      - pods/logs
      - nodes
      - nodes/proxy
    verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
  name: fluent-bit-role-binding
roleRef:
  apiGroup: rbac.authorization.k8s.io
  kind: ClusterRole
  name: fluent-bit-role
subjects:
  - kind: ServiceAccount
    name: fluent-bit
    namespace: amazon-cloudwatch
---
apiVersion: v1
kind: ConfigMap
metadata:
  name: fluent-bit-config
  namespace: amazon-cloudwatch
  labels:
    k8s-app: fluent-bit
data:
  fluent-bit.conf: |
    [SERVICE]
        Flush                     5
        Grace                     30
        Log_Level                 error
        Daemon                    off
        Parsers_File              parsers.conf
        HTTP_Server               ${HTTP_SERVER}
        HTTP_Listen               0.0.0.0
        HTTP_Port                 ${HTTP_PORT}
        storage.path              /var/fluent-bit/state/flb-storage/
        storage.sync              normal
        storage.checksum          off
        storage.backlog.mem_limit 5M

    @INCLUDE application-log.conf
    @INCLUDE dataplane-log.conf
    @INCLUDE host-log.conf

  application-log.conf: |
    [INPUT]
        Name                tail
        Tag                 application.*
        Exclude_Path        /var/log/containers/cloudwatch-agent*, /var/log/containers/fluent-bit*, /var/log/containers/aws-node*, /var/log/containers/kube-proxy*, /var/log/containers/fluentd*
        Path                /var/log/containers/*.log
        multiline.parser    docker, cri
        DB                  /var/fluent-bit/state/flb_container.db
        Mem_Buf_Limit       50MB
        Skip_Long_Lines     On
        Refresh_Interval    10
        Rotate_Wait         30
        storage.type        filesystem
        Read_from_Head      ${READ_FROM_HEAD}

    [FILTER]
        Name                kubernetes
        Match               application.*
        Kube_URL            https://kubernetes.default.svc:443
        Kube_Tag_Prefix     application.var.log.containers.
        Merge_Log           On
        K8S-Logging.Parser  On
        K8S-Logging.Exclude On
        Labels              Off
        Annotations         On
        Use_Kubelet         On
        Kubelet_Port        10250
        Buffer_Size         0

    [FILTER]
        Name                grep
        Match               application.*
        Regex               $kubernetes['annotations']['fluentbit/enabled'] true

    [FILTER]
        Name                grep
        Match               application.*
        Regex               level .

    [FILTER]
        Name                nest
        Match               *
        Operation           lift
        Nested_under        kubernetes
        Add_prefix          k8s_

    [FILTER]
        Name                record_modifier
        Match               *
        Remove_key          stream
        Remove_key          _p
        Remove_key          log
        Remove_key          k8s_namespace_name
        Remove_key          k8s_pod_ip
        Remove_key          k8s_pod_name
        Remove_key          k8s_container_hash
        Remove_key          k8s_docker_id
        Remove_key          k8s_container_image
        Remove_key          k8s_container_name
        Remove_key          k8s_annotations
        Remove_key          kubernetes

    [FILTER]
        Name                nest
        Match               *
        Operation           nest
        Wildcard            k8s_*
        Nest_under          kubernetes
        Remove_prefix       k8s_

    [OUTPUT]
        Name                cloudwatch_logs
        Match               application.*
        region              ${AWS_REGION}
        log_group_name      /eks/${CLUSTER_NAME}/pipeline
        log_stream_prefix   ${HOST_NAME}-
        auto_create_group   true
        extra_user_agent    container-insights

  dataplane-log.conf: |
    [INPUT]
        Name                systemd
        Tag                 dataplane.systemd.*
        Systemd_Filter      _SYSTEMD_UNIT=docker.service
        Systemd_Filter      _SYSTEMD_UNIT=containerd.service
        Systemd_Filter      _SYSTEMD_UNIT=kubelet.service
        DB                  /var/fluent-bit/state/systemd.db
        Path                /var/log/journal
        Read_From_Tail      ${READ_FROM_TAIL}

    [INPUT]
        Name                tail
        Tag                 dataplane.tail.*
        Path                /var/log/containers/aws-node*, /var/log/containers/kube-proxy*
        multiline.parser    docker, cri
        DB                  /var/fluent-bit/state/flb_dataplane_tail.db
        Mem_Buf_Limit       50MB
        Skip_Long_Lines     On
        Refresh_Interval    10
        Rotate_Wait         30
        storage.type        filesystem
        Read_from_Head      ${READ_FROM_HEAD}

    [FILTER]
        Name                modify
        Match               dataplane.systemd.*
        Rename              _HOSTNAME                   hostname
        Rename              _SYSTEMD_UNIT               systemd_unit
        Rename              MESSAGE                     message
        Remove_regex        ^((?!hostname|systemd_unit|message).)*$

    [FILTER]
        Name                aws
        Match               dataplane.*
        imds_version        v2

    [OUTPUT]
        Name                cloudwatch_logs
        Match               dataplane.*
        region              ${AWS_REGION}
        log_group_name      /aws/containerinsights/${CLUSTER_NAME}/dataplane
        log_stream_prefix   ${HOST_NAME}-
        auto_create_group   true
        extra_user_agent    container-insights

  host-log.conf: |
    [INPUT]
        Name                tail
        Tag                 host.dmesg
        Path                /var/log/dmesg
        Key                 message
        DB                  /var/fluent-bit/state/flb_dmesg.db
        Mem_Buf_Limit       5MB
        Skip_Long_Lines     On
        Refresh_Interval    10
        Read_from_Head      ${READ_FROM_HEAD}

    [INPUT]
        Name                tail
        Tag                 host.messages
        Path                /var/log/messages
        Parser              syslog
        DB                  /var/fluent-bit/state/flb_messages.db
        Mem_Buf_Limit       5MB
        Skip_Long_Lines     On
        Refresh_Interval    10
        Read_from_Head      ${READ_FROM_HEAD}

    [INPUT]
        Name                tail
        Tag                 host.secure
        Path                /var/log/secure
        Parser              syslog
        DB                  /var/fluent-bit/state/flb_secure.db
        Mem_Buf_Limit       5MB
        Skip_Long_Lines     On
        Refresh_Interval    10
        Read_from_Head      ${READ_FROM_HEAD}

    [FILTER]
        Name                aws
        Match               host.*
        imds_version        v2

    [OUTPUT]
        Name                cloudwatch_logs
        Match               host.*
        region              ${AWS_REGION}
        log_group_name      /aws/containerinsights/${CLUSTER_NAME}/host
        log_stream_prefix   ${HOST_NAME}.
        auto_create_group   true
        extra_user_agent    container-insights

  parsers.conf: |
    [PARSER]
        Name                syslog
        Format              regex
        Regex               ^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$
        Time_Key            time
        Time_Format         %b %d %H:%M:%S

    [PARSER]
        Name                container_firstline
        Format              regex
        Regex               (?<log>(?<="log":")\S(?!\.).*?)(?<!\\)".*(?<stream>(?<="stream":").*?)".*(?<time>\d{4}-\d{1,2}-\d{1,2}T\d{2}:\d{2}:\d{2}\.\w*).*(?=})
        Time_Key            time
        Time_Format         %Y-%m-%dT%H:%M:%S.%LZ

    [PARSER]
        Name                cwagent_firstline
        Format              regex
        Regex               (?<log>(?<="log":")\d{4}[\/-]\d{1,2}[\/-]\d{1,2}[ T]\d{2}:\d{2}:\d{2}(?!\.).*?)(?<!\\)".*(?<stream>(?<="stream":").*?)".*(?<time>\d{4}-\d{1,2}-\d{1,2}T\d{2}:\d{2}:\d{2}\.\w*).*(?=})
        Time_Key            time
        Time_Format         %Y-%m-%dT%H:%M:%S.%LZ

---
apiVersion: apps/v1
kind: DaemonSet
metadata:
  name: fluent-bit
  namespace: amazon-cloudwatch
  labels:
    k8s-app: fluent-bit
    version: v1
    kubernetes.io/cluster-service: "true"
spec:
  selector:
    matchLabels:
      k8s-app: fluent-bit
  template:
    metadata:
      labels:
        k8s-app: fluent-bit
        version: v1
        kubernetes.io/cluster-service: "true"
    spec:
      containers:
      - name: fluent-bit
        # https://gallery.ecr.aws/aws-observability/aws-for-fluent-bit
        # Fluent Bit v4.1.1
        image: public.ecr.aws/aws-observability/aws-for-fluent-bit:3.0.0
        imagePullPolicy: Always
        env:
            - name: AWS_REGION
              valueFrom:
                configMapKeyRef:
                  name: fluent-bit-cluster-info
                  key: logs.region
            - name: CLUSTER_NAME
              valueFrom:
                configMapKeyRef:
                  name: fluent-bit-cluster-info
                  key: cluster.name
            - name: HTTP_SERVER
              valueFrom:
                configMapKeyRef:
                  name: fluent-bit-cluster-info
                  key: http.server
            - name: HTTP_PORT
              valueFrom:
                configMapKeyRef:
                  name: fluent-bit-cluster-info
                  key: http.port
            - name: READ_FROM_HEAD
              valueFrom:
                configMapKeyRef:
                  name: fluent-bit-cluster-info
                  key: read.head
            - name: READ_FROM_TAIL
              valueFrom:
                configMapKeyRef:
                  name: fluent-bit-cluster-info
                  key: read.tail
            - name: HOST_NAME
              valueFrom:
                fieldRef:
                  fieldPath: spec.nodeName
            - name: HOSTNAME
              valueFrom:
                fieldRef:
                  apiVersion: v1
                  fieldPath: metadata.name
            - name: CI_VERSION
              value: "k8s/1.3.37"
        resources:
            limits:
              memory: 200Mi
            requests:
              cpu: 100m
              memory: 100Mi
        volumeMounts:
        # Please don't change below read-only permissions
        - name: fluentbitstate
          mountPath: /var/fluent-bit/state
        - name: varlog
          mountPath: /var/log
          readOnly: true
        - name: varlibdockercontainers
          mountPath: /var/lib/docker/containers
          readOnly: true
        - name: fluent-bit-config
          mountPath: /fluent-bit/etc/
        - name: runlogjournal
          mountPath: /run/log/journal
          readOnly: true
        - name: dmesg
          mountPath: /var/log/dmesg
          readOnly: true
      terminationGracePeriodSeconds: 10
      hostNetwork: true
      dnsPolicy: ClusterFirstWithHostNet
      volumes:
      - name: fluentbitstate
        hostPath:
          path: /var/fluent-bit/state
      - name: varlog
        hostPath:
          path: /var/log
      - name: varlibdockercontainers
        hostPath:
          path: /var/lib/docker/containers
      - name: fluent-bit-config
        configMap:
          name: fluent-bit-config
      - name: runlogjournal
        hostPath:
          path: /run/log/journal
      - name: dmesg
        hostPath:
          path: /var/log/dmesg
      serviceAccountName: fluent-bit
      nodeSelector:
        kubernetes.io/os: linux
Enter fullscreen mode Exit fullscreen mode

Python custom logger

"""
minwook 2025-10
Format logger for kubernetes
"""

import logging
import json
import sys
import atexit


class PipelineLogger:
    _instance = None

    def __new__(cls):
        if cls._instance is None:
            cls._instance = super().__new__(cls)
            cls._instance._init()
        return cls._instance

    def _init(self):
        handler = logging.StreamHandler(sys.stdout)
        handler.setFormatter(_JsonFormatter())

        self.logger = logging.getLogger("perception")  # logger name "perception"
        self.logger.propagate = False
        self.logger.setLevel(logging.INFO)
        self.logger.addHandler(handler)
        self._context = {}

        # flush when exit
        atexit.register(self.flush)

    def flush(self):
        for handler in self.logger.handlers:
            try:
                handler.flush()
            except Exception:
                pass
        try:
            sys.stdout.flush()
        except Exception:
            pass

    def set_context(self, module: str, site: str, date: str, sync=None, cam_num=None):
        self._context = {"ai_module": module, "site": site, "date": date}
        if sync:
            self._context["sync"] = sync

        if cam_num:
            self._context["cam_num"] = cam_num

    def info(self, msg, *args, **kwargs):
        kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
        self.logger.info(msg, *args, **kwargs)

    def warning(self, msg, *args, **kwargs):
        kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
        self.logger.warning(msg, *args, **kwargs)

    def error(self, msg, *args, **kwargs):
        kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
        self.logger.error(msg, *args, **kwargs)

    def exception(self, msg, *args, exc_info=True, **kwargs):
        kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
        self.logger.exception(msg, *args, exc_info=exc_info, **kwargs)

    def critical(self, msg, *args, **kwargs):
        kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
        self.logger.critical(msg, *args, **kwargs)


class _JsonFormatter(logging.Formatter):
    def format(self, record) -> str:
        log_data = {
            "level": record.levelname,
            "message": record.getMessage(),
        }

        for key, value in record.__dict__.items():
            if key not in [
                "name",
                "msg",
                "args",
                "created",
                "filename",
                "funcName",
                "levelname",
                "levelno",
                "lineno",
                "module",
                "msecs",
                "message",
                "pathname",
                "process",
                "processName",
                "relativeCreated",
                "thread",
                "threadName",
                "exc_info",
                "exc_text",
                "stack_info",
            ]:
                log_data[key] = value

        return json.dumps(log_data)

Enter fullscreen mode Exit fullscreen mode

usage

def graceful_shutdown(signum, frame):
    global messages, main_pid, logger, is_processing

    cur_pid = os.getpid()
    if cur_pid != main_pid:
        return

    if is_processing:
        logger.warning(f"Process is busy. But SIGTERM Occured from main process.")

    logger.info(f"Received signal: {signum}. Shutting down gracefully.")
    for msg in messages[message_idx:]:
        reset_visibility(msg)

    logger.flush()
    sys.exit(0)


signal.signal(signal.SIGTERM, graceful_shutdown)

...

def main():
    global messages, message_idx, is_processing
    MaxNumberOfMessages = 3
    VisibilityTimeout = 720 * MaxNumberOfMessages
    while True:
        try:
            # Receive a message from the SQS queue
            response = sqs.receive_message(
                QueueUrl=queue_url,
                MaxNumberOfMessages=MaxNumberOfMessages,
                WaitTimeSeconds=10,  # Long polling
                VisibilityTimeout=VisibilityTimeout,
            )
            messages = response.get("Messages", [])
            for message_idx, message in enumerate(messages):
                    ...
                    logger.set_context(
                        module="pose",
                        site=site,
                        date=date,
                        sync=sync_nnn,
                        cam_num=cam_num,
                    )



if __name__ == "__main__":
    logger = PipelineLogger()
Enter fullscreen mode Exit fullscreen mode

K8s annotation

apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
  name: ...
spec:
  jobTargetRef:
    template:
      metadata:
        annotations:
          fluentbit/enabled: "true"
Enter fullscreen mode Exit fullscreen mode

Top comments (0)