Eons ago I had a requirement to ingest large sets of data into our relational database (MySQL). This is how I approached the problem and optimized the solution, thought about sharing it in case someone needs something similiar.
**There are plenty of tools which do a much better job at loading data.
Case Study - Optimizing Large-Scale MySQL Data Ingestion
- Ingest (Text/CSV)encrypted data file 18 MB to 100 MB large (around 80K to 1200K lines of data set) from a SFTP Server.
- The data format was predefined and positional.
- Poll and load data from the latest files from SFTP
- Load only the Delta from the files (new changes only)
- Update existing records with Unique key.
- The Tech stack was Java/Spring Boot, MySQL
** Initial Solution (Procedural) **
My initial solution was to write simple Java (with Distributed Transaction, Buffered Input, Local Caching etc...).
- Poll the SFTP for the latest file
public void connect(String host, String port, String username, String password)
throws JSchException {
JSch jsch = new JSch();
// Uncomment the line below if the FTP server requires certificate
// jsch.addIdentity("private-key-path");
// Uncomment the two lines below if the FTP server requires password
session = jsch.getSession(username, host, Integer.valueOf(port));
session.setPassword(password);
session.connect();
}
public List<String> lsStar(String path, String searchForFileName)
throws JSchException, SftpException {
ChannelSftp sftpChannel = null;
Channel channel = session.openChannel("sftp");
channel.connect();
sftpChannel = (ChannelSftp) channel;
Vector<LsEntry> vector = sftpChannel.ls(path);
List<String> latestFile = new ArrayList<>();
String nextName = null;
LsEntry lsEntry = null;
for (Object sftpFile : vector) {
lsEntry = (ChannelSftp.LsEntry) sftpFile;
nextName = lsEntry.getFilename();
// Add to the list if name matched the pattern
if (nextName.startsWith(searchForFileName))
latestFile.add(nextName);
}
sftpChannel.exit();
return latestFile;
}
- Download the file and copy it in S3, decrypt the file and copy a temp file for processing(destroy the file after use).
- Read the file from S3 line by line and compare using a unique Key
- If new record insert
- If existing record and compare it with existing value if changed, update the new value.
- Even with caching, distributed transactions and throttling ran into Lock tables issues, high latency and scalability issue.
This was pretty rudimentary and non-optimized solution and took any where betwenn 80 sec to 200 sec to complete on a AWS-M3-Medium Server with around 8 GB RAM and it worked fine for couple of client, but then when we scaled from 5 to 30 clients, the scheduling and table lock extended way too long and some files would fail under load.
We were boot strapping so we could not vertically scale our server, also the database bottle neck would still persists unless we could implement database sharding.
Processing Large Data Set (Optimized)
The first part remains the same. The data loading was optimized.
- Define a async Transaction Executor process.
- Create a TEMP table, the column name and count should match the original table.
- Download the file content from S3 and create a temp local file.
- Follow the below step.
- Before every load Truncate all the data from the TEMP table.
//Step 1 - Truncate The existing data in the TEMP Table
public void truncateTempTableForFileUpload() {
_bigDataRepository.truncateTempTableFoFileUpload();
}
- Using a java.util.function.Supplier interface wrapper ensures that each major step (Load, Update, Insert) runs in its own clean transaction, preventing long-running locks.
@Transactional(propagation = Propagation.REQUIRES_NEW)
public <T> T executeInNewTx(Supplier<T> action) {
return action.get();
}
- Instead of parsing the CSV in Java, we use LOAD DATA LOCAL INFILE. This is the fastest way to move data into MySQL because it bypasses the overhead of the query parser and logs for individual rows.
//Step 2 - Read the Content from the Local File and Upload into the TEMP table
txExecutor.executeInNewTx(() -> bulkFileUpload(localFilePath));
public void bulkFileUpload(String localFilePath) {
_bigDataRepository.bulkFileUpload(localFilePath);
}
- call async INSERTDATA AND UPDATEDATA
int insertedRec =txExecutor.executeInNewTx(() -> insertTempDataIntoBigData());
int updatedRec =txExecutor.executeInNewTx(() -> updateTempDataIntoBigData());
Delete Local file for garbage collection.
Files.deleteIfExists(Path.of(localFilePath));
Repository Code with SQL
// REPO Code
public interface BigDataRepository extends JpaRepository<TempBigData, Long> {
/**
* The path is hard coded, the first is for local env, for all other env this path should work
* for local testing use the C:\\ Local path
* @param fileLocation
*/
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Modifying
// @Query(
// value = "LOAD DATA LOCAL INFILE
// 'C:\\/Users\\/eslk\\/git\\/bigdata/bigdataset_file.csv' INTO TABLE
// temp_big_data_for_loading FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'",
// nativeQuery = true)
@Query(
value = "LOAD DATA LOCAL INFILE '/var/app/current/bigdataset_file.csv' INTO TABLE "
+ " temp_big_data_for_loading " + " FIELDS TERMINATED BY ',' LINES TERMINATED BY '\\n'",
nativeQuery = true)
public void bulkFileUpload(String fileLocation);
@Modifying
@Transactional
@Query(value = "COMMIT", nativeQuery = true)
void commit();
@Modifying
@Transactional
@Query(value = "delete from temp_big_data_for_loading where id > 0", nativeQuery = true)
void truncateTempTableFoFileUpload();
@Modifying
@Transactional(propagation = Propagation.REQUIRES_NEW)
@Query(value = "update big_data a ,temp_big_data_for_loading b SET "
+ " a.last_name = b.last_name,a.first_name= b.first_name,a.address= b.address_line1,"
+ " a.city = b.city, "
+ " a.last_updated_on=b.last_updated_on " + " where "
+ " a.broker_id = b.broker_id and a.organization_id=b.org_id and a.uniq_id = b.uniq_id "
+ " and a.id > 1 ", nativeQuery = true)
public int updateTempDataIntoBigData();
@Modifying
@Transactional
@Query(
value = "insert into big_data (broker_id,sd_organization_id,uniq_id,last_name,first_name,address, "
+ " city,state,zip_code,email,gender,salary,date_of_birth, "
+ " b.created_on ,b.last_updated_on " + " from temp_big_data_for_loading b "
+ " LEFT JOIN big_data a " + " ON a.uniq_id = b.uniq_id "
+ " AND a.broker_id = b.broker_id " + " AND a.organization_id = b.org_id "
+ " where a.uniq_id IS NULL ",
nativeQuery = true)
public int insertTempDataIntoBigData();
Key Takeaways
Staging is Key: Using a TEMP (or staging) table allows you to perform complex "Upsert" logic using standard JOINs, which the database engine can optimize far better than application code.
Avoid Row-Level Bloat: Java is great for orchestration, but SQL is built for set manipulation. Whenever you see a loop containing a database query, look for a set-based alternative.
Local Infile Security: Note that LOAD DATA LOCAL requires the allowLoadLocalInfile=true flag in your MySQL connection string and server configuration for security reasons.
Final Result: Processing time dropped from ~180s down to 3s. This allowed the system to handle the scale of 30+ concurrent clients without the database bottlenecking.
Top comments (0)