This blog post will show how I created a simple log-based-exporter that measures the occurance of logs in kubernetes containers
Why
Major cloud providers like GCP & AWS offer log based metrics but at scale they can become extremely cost prohibitive. My aim was to create a cheaper solution that converts the logs into prometheus metrics out of the box, with the added benefit that they are measured in real time.
How it works
- You specify the logs you want to measure, the containers that they occur in and the namespaces where the containers run. The exporter will automatically discover all matching pods as they appear
- The exporter measures the metrics in real time, every time prometheus pings the
/metrics
endpoint, the gauge's are reset
Generating some logs
This quick little program will spit out copious amounts of logs. I've containerized it and deployed to a local cluster
package main
import (
"fmt"
"time"
)
func main() {
i := 0
for {
l := fmt.Sprintf("log entry %v", i)
fmt.Println(l)
if i > 10000 {
i = 0
}
i += 1
time.Sleep(time.Millisecond * 50)
}
}
Fetching these logs using the kube api
func StreamLogs(ctx context.Context, client *kubernetes.Clientset, ns string, pod string, callback LogCallback) error {
opts := &corev1.PodLogOptions{
Follow: true,
}
req := client.CoreV1().Pods(ns).GetLogs(pod, opts)
stream, err := req.Stream(ctx)
if err != nil {
return err
}
defer stream.Close()
buffer := make([]byte, 4096) // Adjust buffer size as needed
for {
n, readErr := stream.Read(buffer)
if readErr != nil {
// Check if it's the end of the stream
if readErr == context.Canceled {
log.Println("Stream canceled")
break
}
return readErr
}
// Call the callback with the data chunk
callback(ns, pod, buffer[:n])
}
return nil
}
This function is using a callback function to parse the data outside of the function. Heres an example implementation of the callback function.
We're doing a couple of things.
- initializing the
StreamLogs
function - Define a callback that will measure logs as they appear
- Incrementing metrics based on the outcome of the measurement
func monitorPodForRule(ctx context.Context, rule rules.Rule, ns string, pod string) {
select {
case <-ctx.Done():
log.Infof("stopped monitoring %s/%s for rule %s", ns, pod, rule.Name)
return
default:
callback := func(ns string, pod string, data []byte) {
log.Debugf("namespace %s pod %s bytes processed %v", ns, pod, len(data))
match := measureLogAgainstCondition(data, rule)
if match {
m.IncrementMetric(rule, ns, pod)
} else if os.Getenv("EXPORT_ZERO") == "true" {
m.SetMetric(rule, ns, pod, 0)
}
}
client, err := kube.GenClient()
if err != nil {
log.Error(err)
return
}
err = kube.StreamLogs(ctx, client, ns, pod, callback)
if err != nil {
log.Error(err)
}
}
}
The measurement is pretty simple, we just iterate through each condition and check for sub string matches
func measureLogAgainstCondition(data []byte, rule rules.Rule) bool {
for _, cond := range rule.Condition {
if !strings.Contains(string(data), cond) {
return false
}
}
return true
}
How to dynamically monitor pods as they appear
Kubernetes has the following hierachy Namespace --> Pod --> Container
. Since pod names are random, we need to perform a lookup and find pods that contain the containers we wish to monitor.
This function will do the needed.
func findPodsForRule(ctx context.Context, rule rules.Rule) (map[string][]string, error) {
log.Infof("finding pods for rule %s", rule.Name)
// scan namespaces for containers
client, err := kube.GenClient()
if err != nil {
return nil, err
}
namespaces, err := kube.ListNamespaces(ctx, client)
if err != nil {
return nil, err
}
validNamespaces := make([]string, 0)
for _, ns := range rule.Namespace {
for _, actualNS := range namespaces {
if actualNS == ns {
validNamespaces = append(validNamespaces, ns)
break
}
}
}
if len(validNamespaces) == 0 {
return nil, fmt.Errorf("rule %s no valid namespaces were found", rule.Name)
}
log.Infof("rule %s. %v of %v namespaces found", rule.Name, len(validNamespaces), len(rule.Namespace))
matchingPods := make([]string, 0)
result := make(map[string][]string)
for _, ns := range validNamespaces {
info, err := kube.ListContainers(ctx, client, ns)
if err != nil {
log.Errorf("could not list containers for namespace %s", ns)
continue
}
pods, ok := info[ns]
if !ok {
log.Errorf("could not list containers for namespace %s", ns)
continue
}
result[ns] = make([]string, 0)
for podName, containers := range pods {
for _, container := range containers {
for _, ruleContainer := range rule.Container {
if ruleContainer == container {
matchingPods = append(matchingPods, podName)
result[ns] = append(result[ns], podName)
break
}
}
}
}
}
if len(matchingPods) == 0 {
return nil, fmt.Errorf("no matching pods found for rule %s", rule.Name)
}
log.Infof("for rule %s found the following namespaces %v", rule.Name, validNamespaces)
log.Infof("for rule %s found the following pods %v", rule.Name, matchingPods)
return result, nil
}
The last thing we need for the exporter is a way to bring up and down monitoring for a pod, we'll use context to control this. We'll run a scan every 15 seconds for new pods that match, any pods that no longer exist we'll cancel the monitoring for them, and bring up monitoring for the new pods.
func monitorRule(ctx context.Context, rule rules.Rule) {
// refresh every 15 seconds
ticker := time.NewTicker(15 * time.Second)
defer ticker.Stop()
for ; true; <-ticker.C {
if ctx.Err() != nil {
return
}
pods, err := findPodsForRule(ctx, rule)
if err != nil {
log.Errorf("error finding pods for rule: %v", err)
continue
}
updateMonitoring(ctx, rule, pods)
}
}
func updateMonitoring(ctx context.Context, rule rules.Rule, currentPods map[string][]string) {
monitorState.Lock()
defer monitorState.Unlock()
// Create a set of current pods for quick lookup
currentSet := make(map[string]bool)
for ns, pods := range currentPods {
for _, pod := range pods {
key := podKey(ns, pod)
currentSet[key] = true
if _, monitored := monitorState.pods[key]; !monitored {
// Start monitoring new pod
podCtx, cancel := context.WithCancel(ctx)
monitorState.pods[key] = monitoredPod{cancel: cancel}
metrics.IncActiveMetrics()
go monitorPodForRule(podCtx, rule, ns, pod)
}
}
}
// Check for pods that are no longer current and cancel their monitoring
for key, mp := range monitorState.pods {
if !currentSet[key] {
mp.cancel()
metrics.DecActiveMetrics()
delete(monitorState.pods, key)
}
}
}
Configuration
If you remember the log generating app I made at the top, here I'll create a rule to measure the logs generated from it. In this rule we'll match everytime log entry
& 4
are detected in the incoming log stream
[
{
"name": "log entry 400",
"metric": "log_entry_400",
"namespace": ["default"],
"container": ["dummy"],
"condition": ["4", "log entry"]
}
]
Deploying to Kubernetes
Here is a full working example of a k8s deployment manifest. Once it's deployed we can now see that its picking up the log based metrics.
log_based_metric{metric="log_entry_400",name="log entry 4",namespace="default",pod="dummy-59b956b77c-2h78c"} 0
log_based_metric{metric="log_entry_400",name="log entry 4",namespace="default",pod="dummy-59b956b77c-99rvv"} 73
Thanks for reading!
Top comments (0)