I am working a project using Spring Integration for encrypting some PHI files.
The system is responsible for scanning input files a folder, then perform some encryption, and finally upload the output to another folder.
However, the input and output folders are actually remote folders, which the system can connect via SFTP.
With Spring Integration, you can imagine that you don't need to write code for interacting with the SFTP servers. All you need is some configuration using XML-based or Annotation-based.
By that, you can focus on the main tasks - encryption.
The workflow will have these main steps:
- Scan for input files on input SFTP folder using an InboundChannelAdapter with SFTP protocol. The output of this step will be a Message on a Channel (named
processingChannel
). - Consume messages from the channel and do encryption, then build another message with the output and push it into another channel (named
deliveringChannel
). - Finally, an OutboundChannelAdapter, will consume messages from the deliveringChannel and upload it to the output SFTP.
You can configure all 3 steps very easy and simple.
Then, there is only left for you to do is implementing the main business - encrypt the files.
You can see that our encryption service is actually the consumer of the processingChannel. And we can scale this consumer by using different kinds of channel: DirectChannel (for point-to-point consuming; ExecutorChannel for fanning out the messages to all threads inside a thread-pool; Pub/Sub Channel for multiple consumers.
Let's configure the channels:
ChannelConfig.java
@Configuration
@EnableIntegration
public class ChannelConfig {
@Bean
public MessageChannel processingChannel() {
return new ExecutorChannel(Executors.newFixedThreadPool(10));
}
@Bean
public MessageChannel deliveringChannel() {
return new ExecutorChannel(Executors.newFixedThreadPool(10));
}
}
Both channels are ExecutorChannel which use a ThreadPool to handle messages concurrently.
Here is the configuration for Inbound Channel Adapter.
SftpInboundConfig.java
@Configuration
@EnableIntegration
public class SftpInboundConfig {
@Bean
public SessionFactory<SftpClient.DirEntry> sftpSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return factory;
}
@Bean
public IntegrationFlow sftpInboundFlow() {
return IntegrationFlow
.from(
Sftp.inboundAdapter(sftpSessionFactory())
.remoteDirectory(remoteDir)
.localDirectory(new File(localDir)),
e -> e.id("sftpInboundAdapter")
.autoStartup(true)
.poller(Pollers.fixedDelay(1000))
)
.channel("processingChannel")
.get()
}
}
We define a SFTP session factory for creating session/connection to input SFTP server. The sftpInboundFlow
will use this factory to create session and download files from Input SFTP server.
And here is for Outbound Channel Adapter.
SftpOutboundConfig.java
@Configuration
@EnableIntegration
public class SftpOutboundConfig {
@Bean
public SessionFactory<SftpClient.DirEntry> sftpOutboundSessionFactory() {
DefaultSftpSessionFactory factory = new DefaultSftpSessionFactory(true);
factory.setHost(host);
factory.setPort(port);
factory.setUser(user);
factory.setPassword(password);
factory.setAllowUnknownKeys(true);
return factory;
}
@Bean
public IntegrationFlow outboundFlow() {
return IntegrationFlow.from("deliveringChannel")
.handle(Sftp.outboundAdapter(sftpOutboundSessionFactory())
.remoteDirectory(remoteDir)
.autoCreateDirectory(true)
)
.get();
}
}
We also define a SFTP session factory for output SFTP server. The outboundFlow
will use this factory to create session and upload encrypted files to output SFTP server.
Wiring with our main business
And finally, we need to map our EncryptionService
with processingChannel
and deliveringChannel
. This service will consume messages from processingChannel
, do encryption and push another message to deliveringChannel
.
ProcessingFlowConfig.java
@Configuration
@EnableIntegration
public class ProcessingFlowConfig {
@Bean
public IntegrationFlow processFlow() {
return IntegrationFlow.from("processingChannel")
.handle("encryptionService", "encrypt")
.channel("deliveringChannel")
.get();
}
}
We use IntegrationFlow
to consume messages from processingChannel
and call encryptionService.encrypt
method to do encryption and output messages will be pushed to deliveringChannel
.
Let's pretend we have a simple EncryptionService like this:
EncryptionService.java
@Service
public class EncryptionService {
public Message<File> encrypt(Message<File> message) {
File encrypted = doSomeEncryption(message.getPayload());
return MessageBuilder.withPayload(encrypted).build();
}
}
Just like that. We're done.
Conclusion
With Spring Integration, we can seamlessly connect systems, services, and components by using adapters, channels, and integration flows. This allows us to shift our focus away from the plumbing of connectivity and instead concentrate on our core business — processing and transforming messages and data across multiple systems within our enterprise and integrating with external services from partner organizations.
Top comments (0)