DEV Community

Cover image for How to synchronize MySQL Database with ElasticSearch And perform data querying in a Spring Boot Application
YASSINE SABRI
YASSINE SABRI

Posted on

5

How to synchronize MySQL Database with ElasticSearch And perform data querying in a Spring Boot Application

Sometimes the need for using an advanced searching mechanism such as ElasticSearch is not mandatory at the beginning of a project, but as it grows with time this need comes to the surface.

In order to satisfy this technical need and without altering the existing project’s architecture, we can implement a logic to synchronize the database data and index it in ElasticSearch and perform complex querying without impacting the performance of the database in production.

Let’s dive into it and explore how we can implement this …

Disclaimer: I will share only the important code snippets in order to keep the reading quick, you can check the link to the dedicated repo for this article on GitHub at the end.


Setting up the project

First of all, let's create a simple spring boot application with a MySQL database.

Tip: you can use Spring Initializr to set up the application's skeleton structure

Project configuration

In order to interface with ElasticSearch we need to do the following :

  1. Add the data-elasticsearch dependency to the pom.xml
<dependency>
<groupId>org.springframework.boot</groupId>
<artifactId>spring-boot-starter-data-elasticsearch</artifactId>
</dependency>

Tip: the version of the engine depends on spring-jpa dependency used within the project (see the compatibility matrix)

2. Add the following properties to the application.properties

#MySQL data source
...
#elastic config
spring.data.elasticsearch.cluster-name=${CLUSTER_NAME:elastic}
spring.data.elasticsearch.cluster-nodes=${CLUSTER_NODES:localhost:9300}
spring.data.elasticsearch.repositories.enabled=true
  • _spring.data.elasticsearch.cluster-name: _the cluster name, CLUSTER_NAME is an environment variable that we use if we're running the application in a docker container
  • spring.data.elasticsearch.cluster-nodes: the cluster nodes
  • spring.data.elasticsearch.repositories.enabled: to enable repositories so we can use JPA interfaces with ElasticSearch.

3. Define the package's path where the elastic's repositories reside

@SpringBootApplication
@EnableElasticsearchRepositories("com.example.springmysqlelastic.repo.elastic")
public class SpringMysqlElasticApplication {
public static void main(String[] args) {
SpringApplication.run(SpringMysqlElasticApplication.class, args);
}
}

In our example it's "com.example.springmysqlelastic.repo.elastic".

Project structure

For demonstration purposes, we're going to create a single table in our database and wrap it in its own index. In a real project, we will have a much complex schema and indexes to create.

  • Create a User entity with first name and last name attributes
@Entity
public class User {
@Id
@GeneratedValue(strategy = GenerationType.IDENTITY)
private Long id;
private String firstName;
private String lastName;
// getters and setters
}
  • Create a model class to represent our User index within ElasticSearch
@Document(indexName = "user")
public class UserModel {
private Long id;
private String firstName;
private String lastName;
// getters and setters
}
@Repository
public interface IUserESRepo extends ElasticsearchRepository<UserModel, Long> {
}

We're done settings up the main components for our app.


Synchronize data with ElasticSearch

In order to synchronize our data from MySQL with the user index in ElasticSearch, we're gonna be using a Scheduler in which we will implement the synchronization logic.

First of all, we need to enable Spring's scheduled task execution capability by adding the EnableScheduling annotation :

@EnableScheduling
public class SpringMysqlElasticApplication {
public static void main(String[] args) {
SpringApplication.run(SpringMysqlElasticApplication.class, args);
}
}

After that and since we can't sync all the records from the database each time the scheduler is running, we will add a modification date attribute to our user entity and use the UpdateTimestamp annotation to delegate the updating process of its value to Hibernate each time the entity will be changed.

@UpdateTimestamp
private Date modificationDate;

Next, we need to create our scheduler service.

@Service
public class ElasticSynchronizer {
// Fields
// Constructor
@Scheduled(cron = "0 */3 * * * *")
@Transactional
public void sync() {
LOG.info("Start Syncing - {}", LocalDateTime.now());
this.syncUsers();
LOG.info(" End Syncing - {}", LocalDateTime.now());
}
private void syncUsers() {
}
private static Predicate getModificationDatePredicate(CriteriaBuilder cb, Root<?> root) {
}
}

In this example, the scheduler will run every 3 minutes, this period will change from a project to another depending on the importance of synched data.

Each time the main method of the scheduler is executed, we will sync our users.

private void syncUsers() {
Specification<User> userSpecification = (root, criteriaQuery, criteriaBuilder) ->
getModificationDatePredicate(criteriaBuilder, root);
List<User> userList;
if (userESRepo.count() == 0) {
userList = userDAO.findAll();
} else {
userList = userDAO.findAll(userSpecification);
}
for(User user: userList) {
LOG.info("Syncing User - {}", user.getId());
userESRepo.save(this.userMapper.toUserModel(user));
}
}
view raw sync-users.java hosted with ❤ by GitHub
private static Predicate getModificationDatePredicate(CriteriaBuilder cb, Root<?> root) {
Expression<Timestamp> currentTime;
currentTime = cb.currentTimestamp();
Expression<Timestamp> currentTimeMinus = cb.literal(
new Timestamp(System.currentTimeMillis() -
(Constants.INTERVAL_IN_MILLISECONDE)));
return cb.between(root.<Date>get(Constants.MODIFICATION_DATE),
currentTimeMinus,
currentTime
);
}

In our syncUsers() method, we have two scenarios :

  • When the index is empty at the first launch of the application, we will send all the user records to ElasticSearch
  • Otherwise, we retrieve only the records that have been changed within the last INTERVAL_IN_MILLISECONDE starting from the time of the execution of the scheduler (as explained in the getModificationDatePredicate _method above)._

We're done with the synchronization, let's move on to the next part.


Querying data from ElasticSearch

We will start by adding a new rest Endpoint to take the search query as input and return the result as a ResultQuery object that will create later on.

@RestController
@RequestMapping(PathResources.SEARCH_CONTROLLER)
public class SearchController {
private ISearchService searchService;
@Autowired
public SearchController(ISearchService searchService) {
this.searchService = searchService;
}
@GetMapping(Constants.SEARCH_QUERY + "/{" + Constants.QUERY + "}")
public ResponseEntity<ResultQuery> searchQuery(@PathVariable String query)
throws IOException {
return new ResponseEntity<> (
searchService.searchFromQuery(query.trim().toLowerCase()), HttpStatus.OK);
}
}

After that, we will create a search service where we will build our custom request and retrieve data from ElasticSearch.

@Service
public class SearchService implements ISearchService {
@Value("${api.elasticsearch.uri}")
private String elasticSearchUri;
@Value("${api.elasticsearch.search}")
private String elasticSearchSearchPrefix;
private static final Logger LOGGER = LoggerFactory.getLogger(SearchService.class);
@Override
public ResultQuery searchFromQuery(String query) throws IOException {
String body = HelperFunctions.buildMultiIndexMatchBody(query);
return executeHttpRequest(body);
}
private ResultQuery executeHttpRequest(String body) throws IOException{
}
}

In our main function of the service _searchFromQuery() _we do the following :

  • First, we build the body of the request, in our case, we will create a multi-index search using the query_string function and add the list of fields where we want the engine to look for the giving query.

The body of the request will look something like this:

{
"from": 0,
"size": 100,
"track_total_hits": true,
"sort": {
"id": {
"order": "asc"
}
},
"query": {
"query_string": {
"query": "*test*",
"fields": [
"firstName",
"lastName"
],
"default_operator": "AND"
}
},
"highlight": {
"fields": {
"*": {}
},
"require_field_match": true
}
}

The fields of the query-string are a static list in the code, but they can be parametrized in the application to have more control and flexibility.

  • Second, we will execute the post request with the body that we created earlier and wrap the response from ElasticSearch in the ResultQuery
private ResultQuery executeHttpRequest(String body) throws IOException{
try (CloseableHttpClient httpClient = HttpClients.createDefault()) {
ResultQuery resultQuery = new ResultQuery();
HttpPost httpPost = new HttpPost(HelperFunctions.buildSearchUri(elasticSearchUri
, "", elasticSearchSearchPrefix));
httpPost.setHeader(Constants.CONTENT_ACCEPT, Constants.APP_TYPE);
httpPost.setHeader(Constants.CONTENT_TYPE, Constants.APP_TYPE);
try {
httpPost.setEntity(new StringEntity(body, Constants.ENCODING_UTF8));
HttpResponse response = httpClient.execute(httpPost);
String message = EntityUtils.toString(response.getEntity());
JSONObject myObject = new JSONObject(message);
if(myObject.getJSONObject(Constants.HITS)
.getInt(Constants.TOTAL_HITS) != 0){
resultQuery
.setElements(myObject
.getJSONObject(Constants.HITS)
.getJSONArray(Constants.HITS)
.toString());
resultQuery
.setNumberOfResults(myObject.getJSONObject(Constants.HITS)
.getInt(Constants.TOTAL_HITS));
resultQuery.setTimeTook((float) ((double) myObject.getInt(Constants.TOOK) / Constants.TO_MS));
} else {
resultQuery.setElements(null);
resultQuery.setNumberOfResults(0);
resultQuery.setTimeTook((float) ((double) myObject.getInt(Constants.TOOK) / Constants.TO_MS));
}
} catch (IOException | JSONException e) {
LOGGER.error("Error while connecting to elastic engine --> {}", e.getMessage());
resultQuery.setNumberOfResults(0);
}
return resultQuery;
}
}

Our ResultQuery class contains three main attributes :

public class ResultQuery {
private Float timeTook;
private Integer numberOfResults;
private String elements;
// getters and setters
}
  • timeTook: ElasticSearch response time in seconds
  • numberOfResults: the number of total elements retrieved
  • _elements: a _stringified JSON that represents the total hits found.

Testing the application

To run the application you can either do it locally or in a Docker container. (visit the README file for more info).

I added some records to the database and synced them to the user index.

User index’s records

Tip: you can visualize the index content using the ElasticSearch Head extension in Chrome/Mozilla browsers

As an example, let's look for the users that their first names or last names contain _"est" _:

Query response

_Voilà, _we've found 2 records that satisfy our criteria in 0.017 ms.

That's all folks!


Learn more by:

  • Visiting the Github repo to see the full code of the application.
  • Visiting the ElasticSearch documentation for more complex querying syntaxes.
  • For Medium fans you can read the article there.

Top comments (0)