Architecture
Event format
The Fluent Bit wire protocol represents an Event as a two-element array with a nested array as the first element:
[[TIMESTAMP, METADATA], MESSAGE]
-
TIMESTAMP
is a timestamp in seconds as an integer or floating point value (not a string). -
METADATA
is an object containing event metadata, and might be empty. -
MESSAGE
is an object containing the event body.
record_accessor
syntax
If you need the log group or stream name to be based on the contents of the log record itself. Use record_accessor
syntax
Fluent Bit on CloudWatch
- If you don't already have a namespace called amazon-cloudwatch, create one by entering the following command:
kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/cloudwatch-namespace.yaml
# create amazon-cloudwatch namespace
apiVersion: v1
kind: Namespace
metadata:
name: amazon-cloudwatch
labels:
name: amazon-cloudwatch
- Run the following command to create a ConfigMap named
cluster-info
with the cluster name and the Region to send logs to. Replace cluster-name and cluster-region with your cluster's name and Region.
ClusterName=dev 🔥
RegionName=ap-northeast-2 🔥
FluentBitHttpPort=''
FluentBitReadFromHead='Off'
[[ ${FluentBitReadFromHead} = 'On' ]] && FluentBitReadFromTail='Off'|| FluentBitReadFromTail='On'
[[ -z ${FluentBitHttpPort} ]] && FluentBitHttpServer='Off' || FluentBitHttpServer='On'
kubectl create configmap fluent-bit-cluster-info \
--from-literal=cluster.name=${ClusterName} \
--from-literal=http.server=${FluentBitHttpServer} \
--from-literal=http.port=${FluentBitHttpPort} \
--from-literal=read.head=${FluentBitReadFromHead} \
--from-literal=read.tail=${FluentBitReadFromTail} \
--from-literal=logs.region=${RegionName} -n amazon-cloudwatch
In this command, the FluentBitHttpServer for monitoring plugin metrics is on by default. To turn it off, change the third line in the command to FluentBitHttpPort='' (empty string) in the command.
Also by default, Fluent Bit reads log files from the tail, and will capture only new logs after it is deployed. If you want the opposite, set FluentBitReadFromHead='On' and it will collect all logs in the file system.
- apply fluent bit daemonset to the cluster
kubectl apply -f https://raw.githubusercontent.com/aws-samples/amazon-cloudwatch-container-insights/latest/k8s-deployment-manifest-templates/deployment-mode/daemonset/container-insights-monitoring/fluent-bit/fluent-bit.yaml
apiVersion: v1
kind: ServiceAccount
metadata:
name: fluent-bit
namespace: amazon-cloudwatch
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRole
metadata:
name: fluent-bit-role
rules:
- nonResourceURLs:
- /metrics
verbs:
- get
- apiGroups: [""]
resources:
- namespaces
- pods
- pods/logs
- nodes
- nodes/proxy
verbs: ["get", "list", "watch"]
---
apiVersion: rbac.authorization.k8s.io/v1
kind: ClusterRoleBinding
metadata:
name: fluent-bit-role-binding
roleRef:
apiGroup: rbac.authorization.k8s.io
kind: ClusterRole
name: fluent-bit-role
subjects:
- kind: ServiceAccount
name: fluent-bit
namespace: amazon-cloudwatch
---
apiVersion: v1
kind: ConfigMap
metadata:
name: fluent-bit-config
namespace: amazon-cloudwatch
labels:
k8s-app: fluent-bit
data:
fluent-bit.conf: |
[SERVICE]
Flush 5
Grace 30
Log_Level error
Daemon off
Parsers_File parsers.conf
HTTP_Server ${HTTP_SERVER}
HTTP_Listen 0.0.0.0
HTTP_Port ${HTTP_PORT}
storage.path /var/fluent-bit/state/flb-storage/
storage.sync normal
storage.checksum off
storage.backlog.mem_limit 5M
@INCLUDE application-log.conf
@INCLUDE dataplane-log.conf
@INCLUDE host-log.conf
application-log.conf: |
[INPUT]
Name tail
Tag application.*
Exclude_Path /var/log/containers/cloudwatch-agent*, /var/log/containers/fluent-bit*, /var/log/containers/aws-node*, /var/log/containers/kube-proxy*, /var/log/containers/fluentd*
Path /var/log/containers/*.log
multiline.parser docker, cri
DB /var/fluent-bit/state/flb_container.db
Mem_Buf_Limit 50MB
Skip_Long_Lines On
Refresh_Interval 10
Rotate_Wait 30
storage.type filesystem
Read_from_Head ${READ_FROM_HEAD}
[FILTER]
Name kubernetes
Match application.*
Kube_URL https://kubernetes.default.svc:443
Kube_Tag_Prefix application.var.log.containers.
Merge_Log On
K8S-Logging.Parser On
K8S-Logging.Exclude On
Labels Off
Annotations On
Use_Kubelet On
Kubelet_Port 10250
Buffer_Size 0
[FILTER]
Name grep
Match application.*
Regex $kubernetes['annotations']['fluentbit/enabled'] true
[FILTER]
Name grep
Match application.*
Regex level .
[FILTER]
Name nest
Match *
Operation lift
Nested_under kubernetes
Add_prefix k8s_
[FILTER]
Name record_modifier
Match *
Remove_key stream
Remove_key _p
Remove_key log
Remove_key k8s_namespace_name
Remove_key k8s_pod_ip
Remove_key k8s_pod_name
Remove_key k8s_container_hash
Remove_key k8s_docker_id
Remove_key k8s_container_image
Remove_key k8s_container_name
Remove_key k8s_annotations
Remove_key kubernetes
[FILTER]
Name nest
Match *
Operation nest
Wildcard k8s_*
Nest_under kubernetes
Remove_prefix k8s_
[OUTPUT]
Name cloudwatch_logs
Match application.*
region ${AWS_REGION}
log_group_name /eks/${CLUSTER_NAME}/pipeline
log_stream_prefix ${HOST_NAME}-
auto_create_group true
extra_user_agent container-insights
dataplane-log.conf: |
[INPUT]
Name systemd
Tag dataplane.systemd.*
Systemd_Filter _SYSTEMD_UNIT=docker.service
Systemd_Filter _SYSTEMD_UNIT=containerd.service
Systemd_Filter _SYSTEMD_UNIT=kubelet.service
DB /var/fluent-bit/state/systemd.db
Path /var/log/journal
Read_From_Tail ${READ_FROM_TAIL}
[INPUT]
Name tail
Tag dataplane.tail.*
Path /var/log/containers/aws-node*, /var/log/containers/kube-proxy*
multiline.parser docker, cri
DB /var/fluent-bit/state/flb_dataplane_tail.db
Mem_Buf_Limit 50MB
Skip_Long_Lines On
Refresh_Interval 10
Rotate_Wait 30
storage.type filesystem
Read_from_Head ${READ_FROM_HEAD}
[FILTER]
Name modify
Match dataplane.systemd.*
Rename _HOSTNAME hostname
Rename _SYSTEMD_UNIT systemd_unit
Rename MESSAGE message
Remove_regex ^((?!hostname|systemd_unit|message).)*$
[FILTER]
Name aws
Match dataplane.*
imds_version v2
[OUTPUT]
Name cloudwatch_logs
Match dataplane.*
region ${AWS_REGION}
log_group_name /aws/containerinsights/${CLUSTER_NAME}/dataplane
log_stream_prefix ${HOST_NAME}-
auto_create_group true
extra_user_agent container-insights
host-log.conf: |
[INPUT]
Name tail
Tag host.dmesg
Path /var/log/dmesg
Key message
DB /var/fluent-bit/state/flb_dmesg.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 10
Read_from_Head ${READ_FROM_HEAD}
[INPUT]
Name tail
Tag host.messages
Path /var/log/messages
Parser syslog
DB /var/fluent-bit/state/flb_messages.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 10
Read_from_Head ${READ_FROM_HEAD}
[INPUT]
Name tail
Tag host.secure
Path /var/log/secure
Parser syslog
DB /var/fluent-bit/state/flb_secure.db
Mem_Buf_Limit 5MB
Skip_Long_Lines On
Refresh_Interval 10
Read_from_Head ${READ_FROM_HEAD}
[FILTER]
Name aws
Match host.*
imds_version v2
[OUTPUT]
Name cloudwatch_logs
Match host.*
region ${AWS_REGION}
log_group_name /aws/containerinsights/${CLUSTER_NAME}/host
log_stream_prefix ${HOST_NAME}.
auto_create_group true
extra_user_agent container-insights
parsers.conf: |
[PARSER]
Name syslog
Format regex
Regex ^(?<time>[^ ]* {1,2}[^ ]* [^ ]*) (?<host>[^ ]*) (?<ident>[a-zA-Z0-9_\/\.\-]*)(?:\[(?<pid>[0-9]+)\])?(?:[^\:]*\:)? *(?<message>.*)$
Time_Key time
Time_Format %b %d %H:%M:%S
[PARSER]
Name container_firstline
Format regex
Regex (?<log>(?<="log":")\S(?!\.).*?)(?<!\\)".*(?<stream>(?<="stream":").*?)".*(?<time>\d{4}-\d{1,2}-\d{1,2}T\d{2}:\d{2}:\d{2}\.\w*).*(?=})
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%LZ
[PARSER]
Name cwagent_firstline
Format regex
Regex (?<log>(?<="log":")\d{4}[\/-]\d{1,2}[\/-]\d{1,2}[ T]\d{2}:\d{2}:\d{2}(?!\.).*?)(?<!\\)".*(?<stream>(?<="stream":").*?)".*(?<time>\d{4}-\d{1,2}-\d{1,2}T\d{2}:\d{2}:\d{2}\.\w*).*(?=})
Time_Key time
Time_Format %Y-%m-%dT%H:%M:%S.%LZ
---
apiVersion: apps/v1
kind: DaemonSet
metadata:
name: fluent-bit
namespace: amazon-cloudwatch
labels:
k8s-app: fluent-bit
version: v1
kubernetes.io/cluster-service: "true"
spec:
selector:
matchLabels:
k8s-app: fluent-bit
template:
metadata:
labels:
k8s-app: fluent-bit
version: v1
kubernetes.io/cluster-service: "true"
spec:
containers:
- name: fluent-bit
# https://gallery.ecr.aws/aws-observability/aws-for-fluent-bit
# Fluent Bit v4.1.1
image: public.ecr.aws/aws-observability/aws-for-fluent-bit:3.0.0
imagePullPolicy: Always
env:
- name: AWS_REGION
valueFrom:
configMapKeyRef:
name: fluent-bit-cluster-info
key: logs.region
- name: CLUSTER_NAME
valueFrom:
configMapKeyRef:
name: fluent-bit-cluster-info
key: cluster.name
- name: HTTP_SERVER
valueFrom:
configMapKeyRef:
name: fluent-bit-cluster-info
key: http.server
- name: HTTP_PORT
valueFrom:
configMapKeyRef:
name: fluent-bit-cluster-info
key: http.port
- name: READ_FROM_HEAD
valueFrom:
configMapKeyRef:
name: fluent-bit-cluster-info
key: read.head
- name: READ_FROM_TAIL
valueFrom:
configMapKeyRef:
name: fluent-bit-cluster-info
key: read.tail
- name: HOST_NAME
valueFrom:
fieldRef:
fieldPath: spec.nodeName
- name: HOSTNAME
valueFrom:
fieldRef:
apiVersion: v1
fieldPath: metadata.name
- name: CI_VERSION
value: "k8s/1.3.37"
resources:
limits:
memory: 200Mi
requests:
cpu: 100m
memory: 100Mi
volumeMounts:
# Please don't change below read-only permissions
- name: fluentbitstate
mountPath: /var/fluent-bit/state
- name: varlog
mountPath: /var/log
readOnly: true
- name: varlibdockercontainers
mountPath: /var/lib/docker/containers
readOnly: true
- name: fluent-bit-config
mountPath: /fluent-bit/etc/
- name: runlogjournal
mountPath: /run/log/journal
readOnly: true
- name: dmesg
mountPath: /var/log/dmesg
readOnly: true
terminationGracePeriodSeconds: 10
hostNetwork: true
dnsPolicy: ClusterFirstWithHostNet
volumes:
- name: fluentbitstate
hostPath:
path: /var/fluent-bit/state
- name: varlog
hostPath:
path: /var/log
- name: varlibdockercontainers
hostPath:
path: /var/lib/docker/containers
- name: fluent-bit-config
configMap:
name: fluent-bit-config
- name: runlogjournal
hostPath:
path: /run/log/journal
- name: dmesg
hostPath:
path: /var/log/dmesg
serviceAccountName: fluent-bit
nodeSelector:
kubernetes.io/os: linux
Python custom logger
"""
minwook 2025-10
Format logger for kubernetes
"""
import logging
import json
import sys
import atexit
class PipelineLogger:
_instance = None
def __new__(cls):
if cls._instance is None:
cls._instance = super().__new__(cls)
cls._instance._init()
return cls._instance
def _init(self):
handler = logging.StreamHandler(sys.stdout)
handler.setFormatter(_JsonFormatter())
self.logger = logging.getLogger("perception") # logger name "perception"
self.logger.propagate = False
self.logger.setLevel(logging.INFO)
self.logger.addHandler(handler)
self._context = {}
# flush when exit
atexit.register(self.flush)
def flush(self):
for handler in self.logger.handlers:
try:
handler.flush()
except Exception:
pass
try:
sys.stdout.flush()
except Exception:
pass
def set_context(self, module: str, site: str, date: str, sync=None, cam_num=None):
self._context = {"ai_module": module, "site": site, "date": date}
if sync:
self._context["sync"] = sync
if cam_num:
self._context["cam_num"] = cam_num
def info(self, msg, *args, **kwargs):
kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
self.logger.info(msg, *args, **kwargs)
def warning(self, msg, *args, **kwargs):
kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
self.logger.warning(msg, *args, **kwargs)
def error(self, msg, *args, **kwargs):
kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
self.logger.error(msg, *args, **kwargs)
def exception(self, msg, *args, exc_info=True, **kwargs):
kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
self.logger.exception(msg, *args, exc_info=exc_info, **kwargs)
def critical(self, msg, *args, **kwargs):
kwargs["extra"] = {**kwargs.pop("extra", {}), **self._context}
self.logger.critical(msg, *args, **kwargs)
class _JsonFormatter(logging.Formatter):
def format(self, record) -> str:
log_data = {
"level": record.levelname,
"message": record.getMessage(),
}
for key, value in record.__dict__.items():
if key not in [
"name",
"msg",
"args",
"created",
"filename",
"funcName",
"levelname",
"levelno",
"lineno",
"module",
"msecs",
"message",
"pathname",
"process",
"processName",
"relativeCreated",
"thread",
"threadName",
"exc_info",
"exc_text",
"stack_info",
]:
log_data[key] = value
return json.dumps(log_data)
usage
def graceful_shutdown(signum, frame):
global messages, main_pid, logger, is_processing
cur_pid = os.getpid()
if cur_pid != main_pid:
return
if is_processing:
logger.warning(f"Process is busy. But SIGTERM Occured from main process.")
logger.info(f"Received signal: {signum}. Shutting down gracefully.")
for msg in messages[message_idx:]:
reset_visibility(msg)
logger.flush()
sys.exit(0)
signal.signal(signal.SIGTERM, graceful_shutdown)
...
def main():
global messages, message_idx, is_processing
MaxNumberOfMessages = 3
VisibilityTimeout = 720 * MaxNumberOfMessages
while True:
try:
# Receive a message from the SQS queue
response = sqs.receive_message(
QueueUrl=queue_url,
MaxNumberOfMessages=MaxNumberOfMessages,
WaitTimeSeconds=10, # Long polling
VisibilityTimeout=VisibilityTimeout,
)
messages = response.get("Messages", [])
for message_idx, message in enumerate(messages):
...
logger.set_context(
module="pose",
site=site,
date=date,
sync=sync_nnn,
cam_num=cam_num,
)
if __name__ == "__main__":
logger = PipelineLogger()
K8s annotation
apiVersion: keda.sh/v1alpha1
kind: ScaledJob
metadata:
name: ...
spec:
jobTargetRef:
template:
metadata:
annotations:
fluentbit/enabled: "true"
Top comments (0)