In part one of this series, I went over creating an event hub in azure, connecting it up to my spring boot web service, then send messages to it. Now I'm going to go over reading those messages from event hub and processing them.
Like in part one, the completed example can be found here.
Creating a Storage Account in Azure
Why do we need a storage account?
We need a storage account setup so we can store partition leases and checkpoints.
Head back to the azure portal and click on storage accounts.
Then click the Add(+) button and you'll be taken to the storage account creation screen.
Fill that out and click 'Review + Create' at the bottom. We should get the OK from Microsoft, if something did go wrong, make the needed corrections and try it again.
Once the OK is gotten, we can click 'Create' and azure will spin up our storage account.
Getting Access Key Info
Once the storage account is created, navigate to it and select the 'Access Key' option in the side menu and copy over either of the connection strings.
Creating an Event Processor Host
In our EventHub config class, I've created an EventProcessorHost bean.
import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;
import java.io.IOException;
import java.util.concurrent.Executors;
@Configuration
public class EventHubConfig {
@Value("${eventHub.connectionString}")
private String connectionString;
@Value("${eventHub.name}")
private String eventHubName;
@Value("${eventHub.storage.consumerGroupName}")
private String consumerGroupName;
@Value("${eventHub.storage.hostNamePrefix}")
private String hostNamePrefix;
@Value("${eventHub.storage.storageConnectionString}")
private String storageConnectionString;
@Value("${eventHub.storage.storageContainerName}")
private String storageContainerName;
@Bean
public EventHubClient setupEventHubConnection() throws IOException, EventHubException {
return EventHubClient.createFromConnectionStringSync(connectionString, Executors.newScheduledThreadPool(1));
}
@Bean
public EventProcessorHost createEventHubProcessorHost() {
return EventProcessorHost.EventProcessorHostBuilder
.newBuilder(EventProcessorHost.createHostName(hostNamePrefix), consumerGroupName)
.useAzureStorageCheckpointLeaseManager(storageConnectionString, storageContainerName, null)
.useEventHubConnectionString(connectionString, eventHubName)
.build();
}
}
For this example I'm using the EventProcessorHost builder and passing in the needed config for it. I've defined the config in my application.yml below.
eventHub:
connectionString: 'connections string here' #Event Hub connection string goes here
name: 'event hub name here' # The name of the event hub
storage:
consumerGroupName: $Default #This is the default consumer group for event hub, but it can be customized
storageConnectionString: 'storage connection string here'
storageContainerName: 'storage container name here'
hostNamePrefix: 'some unique prefix' # Identifies the instance of the EventProcessorHost
Now that we have an EventProcessorHost bean, we can inject it into our EventProcessorHostService.
import com.dublin.eventhub.demo.exception.ErrorNotificationHandler;
import com.dublin.eventhub.demo.processor.EventProcessor;
import com.microsoft.azure.eventprocessorhost.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;
import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;
@Component
public class EventProcessorHostService {
private final EventProcessorHost eventProcessorHost;
private Logger log = LoggerFactory.getLogger(EventProcessorHostService.class);
@Autowired
public EventProcessorHostService(EventProcessorHost eventProcessorHost) {
this.eventProcessorHost = eventProcessorHost;
}
@PostConstruct
public void run() throws ExecutionException, InterruptedException {
log.info("Setting up event hub {}", eventProcessorHost.getHostName());
EventProcessorOptions options = new EventProcessorOptions();
options.setExceptionNotification(new ErrorNotificationHandler());
eventProcessorHost.registerEventProcessor(EventProcessor.class, options).get();
}
}
There's a little to unpack here, so let's go over what's going on.
- I'm using constructor injection to pass in the EventProcessorHost bean I defined earlier.
- I've defined a basic EventNotificationHandler class that handles errors not related to specific partitions (such as initialization failures). I pass that to the EventProcessorHost by wrapping it in an EventProcessorOptions object.
- I'm registering the EventProcessor, this registers the host with the event hub and obtains leases on some of the partitions so we can start processing messages. For each partition lease, an instance of EventProcessor is created for that partition.
- Finally, I use @PostConstruct to initialize the connection to event hub.
Below is the ErrorNotificationHandler.
import java.util.function.Consumer;
import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import lombok.extern.slf4j.Slf4j;
@Slf4j
public class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs> {
@Override
public void accept(ExceptionReceivedEventArgs t) {
log.error("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
}
}
We now have our EventProcessorHost configured and ready to run, but in order for us to process messages, we also have to define an event processor class that reads off the event hub and handles the checkpointing.
We need to extend the IEventProcessor interface, which gives us access to the PartitionContext for checkpointing and an Iterable of the EventData. There's a couple things going on here.
- I initialize the EventPayloadProcessor class in the onOpen method.
- In the onEvents method, I've created a for loop to iterate through the events, and a try catch in the loop body, so if an event errors out while processing, I'm able to process the remaining events.
- If each event processes successfully, I checkpoint right after processing.
NOTE: Since checkpointing is asynchronous and not the fastest, I recommend checkpointing after some number of events processed. Since this example is small I'm not bothering to.
import com.dublin.eventhub.demo.model.EventPayload;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.SerializationUtils;
@Slf4j
@NoArgsConstructor
@Service
public class EventProcessor implements IEventProcessor {
private EventPayloadProcessor eventPayloadProcessor;
@Override
public void onOpen(PartitionContext partitionContext) {
eventPayloadProcessor = new EventPayloadProcessor();
}
@Override
public void onClose(PartitionContext partitionContext, CloseReason closeReason) {
}
@Override
public void onEvents(PartitionContext partitionContext, Iterable<EventData> iterable) {
for(EventData event: iterable) {
try{
EventPayload eventPayload = (EventPayload) SerializationUtils.deserialize(event.getBytes());
eventPayloadProcessor.process(eventPayload);
partitionContext.checkpoint(event);
} catch (Exception e) {
log.error("An Error occured when processing event data, exception: ", e);
}
}
}
@Override
public void onError(PartitionContext partitionContext, Throwable throwable) {
}
}
And finally, I've defined the EventPayloadProcessor below.
import com.dublin.eventhub.demo.model.EventPayload;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
@Slf4j
@NoArgsConstructor
@Service
class EventPayloadProcessor {
void process(EventPayload eventPayload) {
log.info("Hello! My name is {} and my favorite food is {}", eventPayload.getFirstName(), eventPayload.getFavoriteFood());
}
}
And that's it. Everything should be ready to roll. 🤠
Running the Application
If you've been following along since part one. There should be some messages out in event hub for us to process. So we should be able to start up the app and have the payloads get processed.
2019-10-06 08:12:18.242 INFO 42015 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub..
2019-10-06 08:12:18.247 INFO 42015 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService : Sending message to the event hub event-hub-test
2019-10-06 08:12:18.253 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender : clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], path[event-hub-test], operationTimeout[PT1M], creating a send link
2019-10-06 08:12:18.258 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session.
2019-10-06 08:12:18.258 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], condition[Error{condition=null, description='null', info=null}]
2019-10-06 08:12:18.259 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler : onLinkLocalOpen senderName[cbs], linkName[cbs:sender], localTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.259 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.ReceiveLinkHandler : onLinkLocalOpen receiverName[cbs], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2019-10-06 08:12:18.327 INFO 42015 --- [pool-1-thread-2] c.m.azure.eventhubs.impl.SessionHandler : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2019-10-06 08:12:18.328 INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.SendLinkHandler : onLinkRemoteOpen senderName[cbs], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.328 INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.ReceiveLinkHandler : onLinkRemoteOpen receiverName[cbs], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2019-10-06 08:12:18.329 INFO 42015 --- [pool-1-thread-2] c.m.a.e.impl.RequestResponseOpener : requestResponseChannel.onOpen complete clientId[MF_41070c_1570367523209], session[cbs-session], link[cbs], endpoint[$cbs]
2019-10-06 08:12:18.398 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session.
2019-10-06 08:12:18.398 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], condition[Error{condition=null, description='null', info=null}]
2019-10-06 08:12:18.464 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2019-10-06 08:12:18.464 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler : onLinkLocalOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], localTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.531 INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler : onLinkRemoteOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], remoteTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.531 INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender : onOpenComplete - clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], sendPath[event-hub-test], linkName[LN_ddb0ee_1570367538464_5324_G9]
2019-10-06 08:12:18.678 INFO 42015 --- [b4f183ddd]-1-14] c.d.e.d.processor.EventPayloadProcessor : Hello! My name is Dublin and my favorite food is Nanas
But gosh, that's a lot of noise in the logs, don't you think? Let's fix that by setting the log level of the event hub packages to ERROR.
logging:
level:
com.microsoft.azure.*: ERROR
And try it again, much cleaner don't you think?
2019-10-06 08:55:48.241 INFO 47531 --- [ main] o.s.b.w.embedded.tomcat.TomcatWebServer : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-06 08:55:48.246 INFO 47531 --- [ main] com.dublin.eventhub.demo.Application : Started Application in 5.37 seconds (JVM running for 5.792)
2019-10-06 08:56:01.700 INFO 47531 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/] : Initializing Spring DispatcherServlet 'dispatcherServlet'
2019-10-06 08:56:01.700 INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Initializing Servlet 'dispatcherServlet'
2019-10-06 08:56:01.709 INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet : Completed initialization in 9 ms
2019-10-06 08:56:01.817 INFO 47531 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub..
2019-10-06 08:56:01.822 INFO 47531 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService : Sending message to the event hub event-hub-test
2019-10-06 08:56:19.581 INFO 47531 --- [352fbae26]-1-14] c.d.e.d.processor.EventPayloadProcessor : Hello! My name is Dublin and my favorite food is Nanas
And that's it, we've connected to event hub and read off our message!
Final Thoughts
In this tutorial we've gone through creating an azure storage account, setting up a connection to event hub in our spring boot service, and reading off the messages we've previously sent to event hub. I hope this guide was helpful, let me know what you think in the comments.
Happy coding!
Top comments (0)