DEV Community

flash-prog
flash-prog

Posted on

InfluxDB ORM API Wrapper

InfluxDB is an open source distributed temporal, time and metrics database for storing and retrieving data. It is a custom high performance storage engine with timestamp indexing and SQL-like query language.
influxdb-client-java is the official Java client , but its API is not very convenient to use , so I have made a second package to make it easier to use .

github source

dependencies

<!-- https://mvnrepository.com/artifact/com.influxdb/influxdb-client-java -->
<dependency>
    <groupId>com.influxdb</groupId>
    <artifactId>influxdb-client-java</artifactId>
    <version>6.3.0</version>
</dependency>
Enter fullscreen mode Exit fullscreen mode

Configuration

Before you can use InfluxDB, you need to create a bucket in InfluxDB and then create a token to access that bucket.

spring:
  influx:
    url: http://localhost:8086
    org: xxx-org
    token: xxx
    bucket: xxx-bucket
Enter fullscreen mode Exit fullscreen mode

Inject InfluxDBClient

import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.InfluxDBClientFactory;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

@Configuration
public class InfluxDbConfig {

    @Value("${spring.influx.url:''}")
    private String url;

    @Value("${spring.influx.org:''}")
    private String org;

    @Value("${spring.influx.token:''}")
    private String token;

    @Value("${spring.influx.bucket:''}")
    private String bucket;

    @Bean
    public InfluxDBClient influxDB(){
        InfluxDBClient client = InfluxDBClientFactory.create(url, token.toCharArray(), org, bucket);
        return client;
    }
}
Enter fullscreen mode Exit fullscreen mode

Annotation

In order to differentiate between different instances of the same metrics, such as cpu0 and cpu1 which are the same metrics, it is necessary to add prefixes and suffixes to Measurement, such as: cpu_measure_0, cpu_measure_1, so as to realize the differentiation.
In order to automate this function, you need to add prefix and suffix annotations on Measurement, such as:


import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;

@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface MeasurementPrefix {
}
Enter fullscreen mode Exit fullscreen mode
import java.lang.annotation.ElementType;
import java.lang.annotation.Retention;
import java.lang.annotation.RetentionPolicy;
import java.lang.annotation.Target;


@Retention(RetentionPolicy.RUNTIME)
@Target({ElementType.FIELD})
public @interface MeasurementSuffix {
}
Enter fullscreen mode Exit fullscreen mode

ORM Tools

Completes the conversion between POJO and InfluxDB, including prefix and suffix splicing, as well as commonly used Query, Write and other operations.

import com.influxdb.annotations.Column;
import com.influxdb.annotations.Measurement;
import com.influxdb.client.InfluxDBClient;
import com.influxdb.client.WriteApiBlocking;
import com.influxdb.client.domain.WritePrecision;
import com.influxdb.client.write.Point;
import com.influxdb.query.FluxRecord;
import com.influxdb.query.FluxTable;
import com.sme.smebackend.influx.MeasurementPrefix;
import com.sme.smebackend.influx.MeasurementSuffix;
import lombok.SneakyThrows;
import lombok.extern.slf4j.Slf4j;
import org.jetbrains.annotations.NotNull;
import org.jetbrains.annotations.Nullable;

import java.lang.annotation.Annotation;
import java.lang.reflect.Field;
import java.time.Instant;
import java.time.OffsetDateTime;
import java.util.List;
import java.util.Map;
import java.util.Objects;

@Slf4j
public class InfluxDBUtil {
    public static String measurementName(Object o) {
        Class clazz = o.getClass();

        Measurement meaAnno = (Measurement) clazz.getAnnotation(Measurement.class);
        if (null == meaAnno) {
            return null;
        }

        String measurement = meaAnno.name();

        String prefix = getFieldValueByAnno(o, MeasurementPrefix.class);
        String suffix = getFieldValueByAnno(o, MeasurementSuffix.class);

        if (null != prefix) {
            measurement = prefix + "_" + measurement;
        }

        if (null != suffix) {
            measurement = measurement + "_" + suffix;
        }

        return measurement;
    }

    @Nullable
    private static String getFieldValueByAnno(Object o, Class annoClazz) {
        Class clazz = o.getClass();
        String val = null;

        for (Field field : clazz.getDeclaredFields()) {
            Annotation anno = field.getAnnotation(annoClazz);
            if (null != anno) {
                try {
                    field.setAccessible(true);
                    val = (String) field.get(o);
                } catch (IllegalAccessException e) {
                    log.error("failed to get value of field: {}", field.getName(), e);
                }
            }
        }

        return val;
    }

    public static Point measurement2Point(Object o) {
        String measurement = measurementName(o);
        if (null == measurement) {
            return null;
        }

        Class clazz = o.getClass();

        Point point = Point.measurement(measurement);
        for (Field field : clazz.getDeclaredFields()) {
            Column colAnno = field.getAnnotation(Column.class);
            MeasurementPrefix prefixAnno = field.getAnnotation(MeasurementPrefix.class);
            MeasurementSuffix suffixAnno = field.getAnnotation(MeasurementSuffix.class);
            if (null == colAnno || null != suffixAnno || null != prefixAnno) {
                continue;
            }

            try {
                field.setAccessible(true);

                String name = colAnno.name();
                boolean tag = colAnno.tag();
                if (tag) {
                    point.addTag(field.getName(), (String) field.get(o));
                } else {
                    if (Objects.equals("time", name)) {
                        point.time((Instant) field.get(o), WritePrecision.MS);
                    } else {
                        point.addField(field.getName(), (long) field.get(o));
                    }
                }
            } catch (IllegalAccessException e) {
                log.error("failed to get value of field: {}", field.getName(), e);
            }
        }

        return point;
    }

    public static void write(InfluxDBClient influxDB, Object measurement) {
        WriteApiBlocking w = influxDB.getWriteApiBlocking();
        Point point = measurement2Point(measurement);
        if (null == point) {
            return;
        }

        w.writePoint(point);
    }

    public static <T> List<T> query1d(InfluxDBClient influxDB,
                                      String bucket,
                                      String measurement, Class<T> clazz) {
        return query1d(influxDB, bucket, measurement, null, clazz);
    }

    public static <T> List<T> query1d(InfluxDBClient influxDB,
                                      String bucket,
                                      String measurement, List<String> fields, Class<T> clazz) {
        String query = baseQ(bucket, measurement, fields, "-1d");
        return doQuery(influxDB, clazz, query);
    }

    private static <T> List<T> doQuery(InfluxDBClient influxDB, Class<T> clazz, String query) {
        List<FluxTable> ts = influxDB.getQueryApi().query(query);
        List<T> res = InfluxDBUtil.fluxTable2Pojo(ts, clazz);
        return res;
    }

    @NotNull
    private static String baseQ(String bucket, String measurement, List<String> fields, String start) {
        String query = "from(bucket: \"" + bucket + "\")" +
                " |> range(start: " + start + ")" +
                " |> filter(fn: (r) => r._measurement == \"" + measurement + "\")";

        if (!J.empty(fields)) {
            query += " |> filter(fn: (r) => ";

            for (int i = 0; i < fields.size(); i++) {
                String field = fields.get(i);
                if (i == 0) {
                    query += " r._field == \"" + field + "\"";
                } else {
                    query += " or r._field == \"" + field + "\"";
                }
            }

            query += ")";
        }
        return query;
    }

    public static String aggrQ(String bucket, String measurement, List<String> fields,
                               String start, String every) {
        String query = baseQ(bucket, measurement, fields, start);
        query += " |> aggregateWindow(every: " + every + ", fn: mean, createEmpty: true)";
        return query;
    }

    public static <T> List<T> queryAggr(InfluxDBClient influxDB,
                                        String bucket, String measurement, Class<T> clazz,
                                        String start, String every) {
        String query = aggrQ(bucket, measurement, null, start, every);
        return doQuery(influxDB, clazz, query);
    }


    public static <T> List<T> queryAggr(InfluxDBClient influxDB,
                                        String bucket, String measurement, List<String> fields, Class<T> clazz,
                                        String start, String every) {
        String query = aggrQ(bucket, measurement, fields, start, every);
        return doQuery(influxDB, clazz, query);
    }

    public static <T> List<T> queryBy5Min(InfluxDBClient influxDB,
                                          String bucket,
                                          String measurement, Class<T> clazz) {
        return queryBy5Min(influxDB, bucket, measurement, null, clazz);
    }

    public static <T> List<T> queryBy5Min(InfluxDBClient influxDB,
                                          String bucket,
                                          String measurement, List<String> fields, Class<T> clazz) {
        return queryAggr(influxDB, bucket, measurement, fields, clazz, "-1d", "5m");
    }

    public static <T> List<T> queryBy1Min(InfluxDBClient influxDB,
                                          String bucket,
                                          String measurement, Class<T> clazz) {
        return queryBy1Min(influxDB, bucket, measurement, null, clazz);
    }

    public static <T> List<T> queryBy1Min(InfluxDBClient influxDB,
                                          String bucket,
                                          String measurement, List<String> fields, Class<T> clazz) {
        return queryAggr(influxDB, bucket, measurement, fields, clazz, "-1d", "1m");
    }

    public static <T> List<T> queryByHour(InfluxDBClient influxDB,
                                          String bucket,
                                          String measurement, Class<T> clazz) {
        return queryByHour(influxDB, bucket, measurement, null, clazz);
    }

    public static <T> List<T> queryByHour(InfluxDBClient influxDB,
                                          String bucket,
                                          String measurement, List<String> fields, Class<T> clazz) {
        return queryAggr(influxDB, bucket, measurement, fields, clazz, "-7d", "1h");
    }

    public static <T> List<T> queryByDay(InfluxDBClient influxDB,
                                         String bucket,
                                         String measurement, Class<T> clazz) {
        return queryByDay(influxDB, bucket, measurement, null, clazz);
    }

    public static <T> List<T> queryByDay(InfluxDBClient influxDB,
                                         String bucket,
                                         String measurement, List<String> fields, Class<T> clazz) {
        return queryAggr(influxDB, bucket, measurement, fields, clazz, "-1M", "1d");
    }

    public static void deleteBeforeOneMonth(InfluxDBClient influxDB, String bucket, String org, String measurement) {
        OffsetDateTime now = OffsetDateTime.now();

        influxDB.getDeleteApi().delete(
                now.minusYears(2), now.minusMonths(1),
                String.format("_measurement=\"%s\"", measurement),
                bucket, org
        );
    }

    public static void deleteAll(InfluxDBClient influxDB, String bucket, String org, String measurement) {
        OffsetDateTime now = OffsetDateTime.now();

        influxDB.getDeleteApi().delete(
                now.minusYears(100), now,
                String.format("_measurement=\"%s\"", measurement),
                bucket, org
        );
    }

    @SneakyThrows
    public static <T> List<T> fluxTable2Pojo(List<FluxTable> tables, Class<T> clazz) {
        List<T> ts = J.list();

        for (FluxTable tab : tables) {
            List<FluxRecord> recs = tab.getRecords();
            for (int i = 0; i < recs.size(); i++) {
                FluxRecord r = recs.get(i);

                T t = null;
                if (i > ts.size() - 1) {
                    t = clazz.newInstance();
                    ts.add(t);
                } else {
                    t = ts.get(i);
                }

                record2Pojo(r, t);
            }
        }

        return ts;
    }

    public static <T> T record2Pojo(FluxRecord record, T t) {
        try {
            Class clazz = t.getClass();

            Map<String, Object> recordValues = record.getValues();

            String recordField = record.getField();
            Object value = record.getValue();

            Field[] fileds = clazz.getDeclaredFields();
            for (Field field : fileds) {
                Column colAnno = field.getAnnotation(Column.class);

                MeasurementPrefix prefixAnno = field.getAnnotation(MeasurementPrefix.class);
                MeasurementSuffix suffixAnno = field.getAnnotation(MeasurementSuffix.class);
                if (null == colAnno || null != suffixAnno || null != prefixAnno) {
                    continue;
                }

                field.setAccessible(true);

                String colName = colAnno.name();

                if (Objects.equals("time", colName)) {
                    field.set(t, record.getTime());
                } else if (Objects.equals(colName, recordField)) {
                    if (null == value) {
                        continue;
                    }
                    String type = field.getType().getName();

                    if (colName.equals("writeSectTotal")) {
                        System.out.println("writeSectTotal");
                    }

                    if ((type.equals("long") || type.equals("java.lang.Long"))
                            && value instanceof Double) {
                        field.set(t, ((Double) value).longValue());
                    } else if ((type.equals("int") || type.equals("java.lang.Integer"))
                            && value instanceof Double) {
                        field.set(t, ((Double) value).intValue());
                    } else {
                        field.set(t, value);
                    }

                } else {
                    Object v = recordValues.get(field.getName());
                    if (null != v) {
                        field.set(t, v);
                    }
                }
            }

            return t;
        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }

    public static <T> T record2Pojo(FluxRecord record, Class<T> clazz) {
        try {
            T t = clazz.newInstance();

            Map<String, Object> recordValues = record.getValues();

            String recordField = record.getField();
            Object value = record.getValue();

            for (Field field : clazz.getDeclaredFields()) {
                Column colAnno = field.getAnnotation(Column.class);

                MeasurementPrefix prefixAnno = field.getAnnotation(MeasurementPrefix.class);
                MeasurementSuffix suffixAnno = field.getAnnotation(MeasurementSuffix.class);
                if (null == colAnno || null != suffixAnno || null != prefixAnno) {
                    continue;
                }

                field.setAccessible(true);

                String colName = colAnno.name();

                if (Objects.equals("time", colName)) {
                    field.set(t, record.getTime());
                } else if (Objects.equals(colName, recordField)) {
                    field.set(t, value);
                } else {
                    Object v = recordValues.get(field.getName());
                    if (null != v) {
                        field.set(t, v);
                    }
                }
            }

            return t;
        } catch (Exception e) {
            e.printStackTrace();
        }

        return null;
    }
}
Enter fullscreen mode Exit fullscreen mode

The above tools are some common methods I summarized when using InfluxDB, which can be used directly and greatly simplify the use of InfluxDB.

Top comments (0)