DEV Community

Luan Barbosa Ramalho
Luan Barbosa Ramalho

Posted on

MongoDB Aggregation Pipeline Stages: Using addFields stage with Java

Hey there! What's up? I would like to talk about the stage $addFields, responsible for adding new fields to documents in MongoDB.

With $addFields, we can run an aggregate pipeline and return documents with new fields, but we can do computations with these new fields as well.

Let's imagine that we have an application responsible for sending reports to many clients and these clients can configure the time in minutes that they would like to receive these reports and the application runs every five minutes, for example.

We have this collection with the client report configuration:

{
 "name":"Politics CDE News Inc.",
 "lastExecution":"2023-11-02T05:35:00", 
 "waitingTimeInMinutes":"5",
 "active":"true"
}
Enter fullscreen mode Exit fullscreen mode

I did a sample Rest API using Java and Quarkus to do the tests. In my code, the class that represents the collection is:

package br.com.luanbrdev;

import java.time.LocalDateTime;
import java.util.Objects;

public class ClientReportConfig {
    private String id;
    private String name;
    private LocalDateTime lastExecution;
    private int waitingTimeInMinutes;
    private boolean active;

    public ClientReportConfig() {
    }

    //gets, setters, equals and hashCode ...
}

Enter fullscreen mode Exit fullscreen mode

I wrote a resource class, with the methods list responsible for listing all regardless of configuration, add responsible for adding new configurations, and listReportsScheduled responsible for listing the configurations based on the schedule.

package br.com.luanbrdev;

import jakarta.inject.Inject;
import jakarta.ws.rs.GET;
import jakarta.ws.rs.POST;
import jakarta.ws.rs.Path;

import java.util.List;

@Path("/report/configs")
public class ClientReportResource {
    @Inject
    ClientReportService service;

    @GET
    public List<ClientReportConfig> list() {
        return service.list();
    }

    @POST
    public List<ClientReportConfig> add(ClientReportConfig clientReportConfig) {
        service.add(clientReportConfig);
        return list();
    }

    @GET
    @Path("scheduled")
    public List<ClientReportConfig> listReportsScheduled() {
        return service.listByConfiguredTime();
    }
}
Enter fullscreen mode Exit fullscreen mode

And a service class responsible for communicating with Mongo:

package br.com.luanbrdev;

import com.mongodb.client.MongoClient;
import com.mongodb.client.MongoCollection;
import com.mongodb.client.MongoCursor;
import jakarta.enterprise.context.ApplicationScoped;
import jakarta.inject.Inject;
import org.bson.Document;

import java.time.LocalDateTime;
import java.time.ZoneId;
import java.util.ArrayList;
import java.util.Arrays;
import java.util.List;

@ApplicationScoped
public class ClientReportService {
    @Inject
    MongoClient mongoClient;

    public List<ClientReportConfig> list() {
        List<ClientReportConfig> list = new ArrayList<>();

        try (MongoCursor<Document> cursor = getCollection().find().iterator()) {
            while (cursor.hasNext()) {
                Document document = cursor.next();
                buildResult(list, document);
            }
        }
        return list;
    }

    public void add(ClientReportConfig reportConfig) {
        Document document = new Document()
                .append("name", reportConfig.getName())
                .append("lastExecution", reportConfig.getLastExecution())
                .append("waitingTimeInMinutes", reportConfig.getWaitingTimeInMinutes())
                .append("active", reportConfig.isActive());

        getCollection().insertOne(document);
    }

    public List<ClientReportConfig> listByConfiguredTime() {
        List<ClientReportConfig> list = new ArrayList<>();
        var now = LocalDateTime.now(ZoneId.of("UTC"));

        var dateNow = new Document("dateNow", now);
        var timeInMillisecondsAggregate = new Document("timeInMillisecondsAggregate",
                new Document("$multiply", Arrays.asList("$waitingTimeInMinutes", 60000L)));
        var addedDateAggregate = new Document("addedDateAggregate",
                new Document("$add", Arrays.asList("$lastExecution", "$timeInMillisecondsAggregate")));

        var shouldExecuteAggregate = new Document("shouldExecuteAggregate",
                new Document("$and", Arrays.asList(
                        new Document("$gt", Arrays.asList("$dateNow", "$addedDateAggregate")),
                        new Document("$eq", Arrays.asList("$active", true))
                )));
        var match = new Document("shouldExecuteAggregate", true);

        var iterator = getCollection().aggregate(Arrays.asList(
                new Document("$addFields", dateNow),
                new Document("$addFields", timeInMillisecondsAggregate),
                new Document("$addFields", addedDateAggregate),
                new Document("$addFields", shouldExecuteAggregate),
                new Document("$match", match))).iterator();

        try (MongoCursor<Document> cursor = iterator) {
            while (cursor.hasNext()) {
                Document document = cursor.next();
                buildResult(list, document);
            }
        }
        return list;
    }

    private static void buildResult(List<ClientReportConfig> list, Document document) {
        ClientReportConfig reportConfig = new ClientReportConfig();

        reportConfig.setId(document.get("_id").toString());
        reportConfig.setName(document.getString("name"));
        reportConfig.setLastExecution(document.getDate("lastExecution")
                .toInstant()
                .atZone(ZoneId.systemDefault())
                .toLocalDateTime());
        reportConfig.setWaitingTimeInMinutes(document.getInteger("waitingTimeInMinutes"));
        reportConfig.setActive(document.getBoolean("active"));
        list.add(reportConfig);
    }

    private MongoCollection getCollection() {
        return mongoClient.getDatabase("subscribers").getCollection("clientReportCollection");
    }
}

Enter fullscreen mode Exit fullscreen mode

Let's understand the method listByConfiguredTime that is responsible for listing the configurations based on the schedule:

    public List<ClientReportConfig> listByConfiguredTime() {
        List<ClientReportConfig> list = new ArrayList<>();
        var now = LocalDateTime.now(ZoneId.of("UTC"));

        var dateNow = new Document("dateNow", now);
        var timeInMillisecondsAggregate = new Document("timeInMillisecondsAggregate",
                new Document("$multiply", Arrays.asList("$waitingTimeInMinutes", 60000L)));
        var addedDateAggregate = new Document("addedDateAggregate",
                new Document("$add", Arrays.asList("$lastExecution", "$timeInMillisecondsAggregate")));

        var shouldExecuteAggregate = new Document("shouldExecuteAggregate",
                new Document("$and", Arrays.asList(
                        new Document("$gt", Arrays.asList("$dateNow", "$addedDateAggregate")),
                        new Document("$eq", Arrays.asList("$active", true))
                )));
        var match = new Document("shouldExecuteAggregate", true);

        var iterator = getCollection().aggregate(Arrays.asList(
                new Document("$addFields", dateNow),
                new Document("$addFields", timeInMillisecondsAggregate),
                new Document("$addFields", addedDateAggregate),
                new Document("$addFields", shouldExecuteAggregate),
                new Document("$match", match))).iterator();

        try (MongoCursor<Document> cursor = iterator) {
            while (cursor.hasNext()) {
                Document document = cursor.next();
                buildResult(list, document);
            }
        }
        return list;
    }

Enter fullscreen mode Exit fullscreen mode
  1. I created a new document called dateNow with the current date time;
  2. I created a new document called timeInMillisecondsAggregate with the waiting time in minutes configured by the client in milliseconds;
  3. I created a new document called addedDateAggregate with the date of the last execution added to the time in milliseconds;
  4. I created a new document called shouldExecuteAggregate with a logic operation where if dateNow is greater than addedDateAggregate, and the active field is true then return a boolean true;
  5. We run a getCollection().aggregate with a list of addFields stages and at the end a stage $match that will filter our results based on the field shouldExecuteAggregate when true;

So, if we have a last execution added to the time configured, less than the date time of the current execution the results will be listed. It's important to say that dates are saved in UTC in the MongoDB.

If you would like to see the sample code you can find it here https://github.com/luanbrdev/sample-mongodb-addfields

If this content helped you in any way, let me know in the comments or if something was not clear, let me know as well. See you guys!

Top comments (1)

Collapse
 
andrebuarque profile image
André Buarque

Great content! Thank you for share 👏👏👏