DEV Community

Dublin Anondson
Dublin Anondson

Posted on

Getting Eventful With Azure Event Hubs: Part Two

In part one of this series, I went over creating an event hub in azure, connecting it up to my spring boot web service, then send messages to it. Now I'm going to go over reading those messages from event hub and processing them.

Like in part one, the completed example can be found here.

Creating a Storage Account in Azure

Why do we need a storage account?

We need a storage account setup so we can store partition leases and checkpoints.

Head back to the azure portal and click on storage accounts.

Storage Account

Then click the Add(+) button and you'll be taken to the storage account creation screen.

Storage Account Create

Fill that out and click 'Review + Create' at the bottom. We should get the OK from Microsoft, if something did go wrong, make the needed corrections and try it again.

Storage Account Review Passed

Once the OK is gotten, we can click 'Create' and azure will spin up our storage account.

Getting Access Key Info

Once the storage account is created, navigate to it and select the 'Access Key' option in the side menu and copy over either of the connection strings.

Creating an Event Processor Host

In our EventHub config class, I've created an EventProcessorHost bean.


import com.microsoft.azure.eventhubs.EventHubClient;
import com.microsoft.azure.eventhubs.EventHubException;
import com.microsoft.azure.eventprocessorhost.EventProcessorHost;
import org.springframework.beans.factory.annotation.Value;
import org.springframework.context.annotation.Bean;
import org.springframework.context.annotation.Configuration;

import java.io.IOException;
import java.util.concurrent.Executors;

@Configuration
public class EventHubConfig {

    @Value("${eventHub.connectionString}")
    private String connectionString;

    @Value("${eventHub.name}")
    private String eventHubName;

    @Value("${eventHub.storage.consumerGroupName}")
    private String consumerGroupName;

    @Value("${eventHub.storage.hostNamePrefix}")
    private String hostNamePrefix;

    @Value("${eventHub.storage.storageConnectionString}")
    private String storageConnectionString;

    @Value("${eventHub.storage.storageContainerName}")
    private String storageContainerName;

    @Bean
    public EventHubClient setupEventHubConnection() throws IOException, EventHubException {
        return EventHubClient.createFromConnectionStringSync(connectionString, Executors.newScheduledThreadPool(1));
    }

    @Bean
    public EventProcessorHost createEventHubProcessorHost() {
        return EventProcessorHost.EventProcessorHostBuilder
                .newBuilder(EventProcessorHost.createHostName(hostNamePrefix), consumerGroupName)
                .useAzureStorageCheckpointLeaseManager(storageConnectionString, storageContainerName, null)
                .useEventHubConnectionString(connectionString, eventHubName)
                .build();
    }
}
Enter fullscreen mode Exit fullscreen mode

For this example I'm using the EventProcessorHost builder and passing in the needed config for it. I've defined the config in my application.yml below.


eventHub:
  connectionString: 'connections string here' #Event Hub connection string goes here
  name: 'event hub name here' # The name of the event hub
  storage:
    consumerGroupName: $Default #This is the default consumer group for event hub, but it can be customized
    storageConnectionString: 'storage connection string here'
    storageContainerName: 'storage container name here'
    hostNamePrefix: 'some unique prefix' # Identifies the instance of the EventProcessorHost

Enter fullscreen mode Exit fullscreen mode

Now that we have an EventProcessorHost bean, we can inject it into our EventProcessorHostService.

import com.dublin.eventhub.demo.exception.ErrorNotificationHandler;
import com.dublin.eventhub.demo.processor.EventProcessor;
import com.microsoft.azure.eventprocessorhost.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.springframework.beans.factory.annotation.Autowired;
import org.springframework.stereotype.Component;

import javax.annotation.PostConstruct;
import java.util.concurrent.ExecutionException;


@Component
public class EventProcessorHostService {

    private final EventProcessorHost eventProcessorHost;
    private Logger log = LoggerFactory.getLogger(EventProcessorHostService.class);

    @Autowired
    public EventProcessorHostService(EventProcessorHost eventProcessorHost) {
        this.eventProcessorHost = eventProcessorHost;
    }

    @PostConstruct
    public void run() throws ExecutionException, InterruptedException {
        log.info("Setting up event hub {}", eventProcessorHost.getHostName());
        EventProcessorOptions options = new EventProcessorOptions();
        options.setExceptionNotification(new ErrorNotificationHandler());
        eventProcessorHost.registerEventProcessor(EventProcessor.class, options).get();
    }
}
Enter fullscreen mode Exit fullscreen mode

There's a little to unpack here, so let's go over what's going on.

  • I'm using constructor injection to pass in the EventProcessorHost bean I defined earlier.
  • I've defined a basic EventNotificationHandler class that handles errors not related to specific partitions (such as initialization failures). I pass that to the EventProcessorHost by wrapping it in an EventProcessorOptions object.
  • I'm registering the EventProcessor, this registers the host with the event hub and obtains leases on some of the partitions so we can start processing messages. For each partition lease, an instance of EventProcessor is created for that partition.
  • Finally, I use @PostConstruct to initialize the connection to event hub.

Below is the ErrorNotificationHandler.

import java.util.function.Consumer;

import com.microsoft.azure.eventprocessorhost.ExceptionReceivedEventArgs;
import lombok.extern.slf4j.Slf4j;

@Slf4j
public class ErrorNotificationHandler implements Consumer<ExceptionReceivedEventArgs> {

    @Override
    public void accept(ExceptionReceivedEventArgs t) {
        log.error("SAMPLE: Host " + t.getHostname() + " received general error notification during " + t.getAction() + ": " + t.getException().toString());
    }
}
Enter fullscreen mode Exit fullscreen mode

We now have our EventProcessorHost configured and ready to run, but in order for us to process messages, we also have to define an event processor class that reads off the event hub and handles the checkpointing.

We need to extend the IEventProcessor interface, which gives us access to the PartitionContext for checkpointing and an Iterable of the EventData. There's a couple things going on here.

  • I initialize the EventPayloadProcessor class in the onOpen method.
  • In the onEvents method, I've created a for loop to iterate through the events, and a try catch in the loop body, so if an event errors out while processing, I'm able to process the remaining events.
  • If each event processes successfully, I checkpoint right after processing.

NOTE: Since checkpointing is asynchronous and not the fastest, I recommend checkpointing after some number of events processed. Since this example is small I'm not bothering to.

import com.dublin.eventhub.demo.model.EventPayload;
import com.microsoft.azure.eventhubs.EventData;
import com.microsoft.azure.eventprocessorhost.CloseReason;
import com.microsoft.azure.eventprocessorhost.IEventProcessor;
import com.microsoft.azure.eventprocessorhost.PartitionContext;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;
import org.springframework.util.SerializationUtils;

@Slf4j
@NoArgsConstructor
@Service
public class EventProcessor implements IEventProcessor {

    private EventPayloadProcessor eventPayloadProcessor;

    @Override
    public void onOpen(PartitionContext partitionContext) {
        eventPayloadProcessor = new EventPayloadProcessor();
    }

    @Override
    public void onClose(PartitionContext partitionContext, CloseReason closeReason) {

    }

    @Override
    public void onEvents(PartitionContext partitionContext, Iterable<EventData> iterable) {
        for(EventData event: iterable) {
            try{
                EventPayload eventPayload = (EventPayload) SerializationUtils.deserialize(event.getBytes());
                eventPayloadProcessor.process(eventPayload);
                partitionContext.checkpoint(event);
            } catch (Exception e) {
                log.error("An Error occured when processing event data, exception: ", e);
            }

        }
    }

    @Override
    public void onError(PartitionContext partitionContext, Throwable throwable) {
    }
}
Enter fullscreen mode Exit fullscreen mode

And finally, I've defined the EventPayloadProcessor below.

import com.dublin.eventhub.demo.model.EventPayload;
import lombok.NoArgsConstructor;
import lombok.extern.slf4j.Slf4j;
import org.springframework.stereotype.Service;

@Slf4j
@NoArgsConstructor
@Service
class EventPayloadProcessor {

    void process(EventPayload eventPayload) {
        log.info("Hello! My name is {} and my favorite food is {}", eventPayload.getFirstName(), eventPayload.getFavoriteFood());
    }
}
Enter fullscreen mode Exit fullscreen mode

And that's it. Everything should be ready to roll. 🤠

Running the Application

If you've been following along since part one. There should be some messages out in event hub for us to process. So we should be able to start up the app and have the payloads get processed.

2019-10-06 08:12:18.242  INFO 42015 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller  : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub..
2019-10-06 08:12:18.247  INFO 42015 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService       : Sending message to the event hub event-hub-test
2019-10-06 08:12:18.253  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender   : clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], path[event-hub-test], operationTimeout[PT1M], creating a send link
2019-10-06 08:12:18.258  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory    : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session.
2019-10-06 08:12:18.258  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler  : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], condition[Error{condition=null, description='null', info=null}]
2019-10-06 08:12:18.259  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler     : onLinkLocalOpen senderName[cbs], linkName[cbs:sender], localTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.259  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.ReceiveLinkHandler  : onLinkLocalOpen receiverName[cbs], linkName[cbs:receiver], localSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2019-10-06 08:12:18.327  INFO 42015 --- [pool-1-thread-2] c.m.azure.eventhubs.impl.SessionHandler  : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[cbs-session], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2019-10-06 08:12:18.328  INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.SendLinkHandler     : onLinkRemoteOpen senderName[cbs], linkName[cbs:sender], remoteTarget[Target{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.328  INFO 42015 --- [pool-1-thread-2] c.m.a.eventhubs.impl.ReceiveLinkHandler  : onLinkRemoteOpen receiverName[cbs], linkName[cbs:receiver], remoteSource[Source{address='$cbs', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, distributionMode=null, filter=null, defaultOutcome=null, outcomes=null, capabilities=null}]
2019-10-06 08:12:18.329  INFO 42015 --- [pool-1-thread-2] c.m.a.e.impl.RequestResponseOpener       : requestResponseChannel.onOpen complete clientId[MF_41070c_1570367523209], session[cbs-session], link[cbs], endpoint[$cbs]
2019-10-06 08:12:18.398  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.MessagingFactory    : messagingFactory[MF_41070c_1570367523209], hostName[dublin-rest-demo.servicebus.windows.net], getting a session.
2019-10-06 08:12:18.398  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler  : onSessionLocalOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], condition[Error{condition=null, description='null', info=null}]
2019-10-06 08:12:18.464  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.SessionHandler  : onSessionRemoteOpen connectionId[MF_41070c_1570367523209], entityName[event-hub-test], sessionIncCapacity[0], sessionOutgoingWindow[2147483647]
2019-10-06 08:12:18.464  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler     : onLinkLocalOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], localTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.531  INFO 42015 --- [pool-1-thread-4] c.m.a.eventhubs.impl.SendLinkHandler     : onLinkRemoteOpen senderName[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], linkName[LN_ddb0ee_1570367538464_5324_G9], remoteTarget[Target{address='event-hub-test', durable=NONE, expiryPolicy=SESSION_END, timeout=0, dynamic=false, dynamicNodeProperties=null, capabilities=null}]
2019-10-06 08:12:18.531  INFO 42015 --- [pool-1-thread-4] c.m.azure.eventhubs.impl.MessageSender   : onOpenComplete - clientId[EC_493aee_1570367538249_MF_41070c_1570367523209-InternalSender], sendPath[event-hub-test], linkName[LN_ddb0ee_1570367538464_5324_G9]
2019-10-06 08:12:18.678  INFO 42015 --- [b4f183ddd]-1-14] c.d.e.d.processor.EventPayloadProcessor  : Hello! My name is Dublin and my favorite food is Nanas

Enter fullscreen mode Exit fullscreen mode

But gosh, that's a lot of noise in the logs, don't you think? Let's fix that by setting the log level of the event hub packages to ERROR.

logging:
  level:
    com.microsoft.azure.*: ERROR
Enter fullscreen mode Exit fullscreen mode

And try it again, much cleaner don't you think?

2019-10-06 08:55:48.241  INFO 47531 --- [           main] o.s.b.w.embedded.tomcat.TomcatWebServer  : Tomcat started on port(s): 8080 (http) with context path ''
2019-10-06 08:55:48.246  INFO 47531 --- [           main] com.dublin.eventhub.demo.Application     : Started Application in 5.37 seconds (JVM running for 5.792)
2019-10-06 08:56:01.700  INFO 47531 --- [nio-8080-exec-1] o.a.c.c.C.[Tomcat].[localhost].[/]       : Initializing Spring DispatcherServlet 'dispatcherServlet'
2019-10-06 08:56:01.700  INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Initializing Servlet 'dispatcherServlet'
2019-10-06 08:56:01.709  INFO 47531 --- [nio-8080-exec-1] o.s.web.servlet.DispatcherServlet        : Completed initialization in 9 ms
2019-10-06 08:56:01.817  INFO 47531 --- [nio-8080-exec-1] c.d.eventhub.demo.controller.Controller  : Eventhub send endpoint called, sending EventPayload(firstName=Dublin, lastName=Anondson, email=null, favoriteFood=Nanas) to event hub..
2019-10-06 08:56:01.822  INFO 47531 --- [nio-8080-exec-1] c.d.e.demo.service.EventHubService       : Sending message to the event hub event-hub-test
2019-10-06 08:56:19.581  INFO 47531 --- [352fbae26]-1-14] c.d.e.d.processor.EventPayloadProcessor  : Hello! My name is Dublin and my favorite food is Nanas

Enter fullscreen mode Exit fullscreen mode

And that's it, we've connected to event hub and read off our message!

Final Thoughts

In this tutorial we've gone through creating an azure storage account, setting up a connection to event hub in our spring boot service, and reading off the messages we've previously sent to event hub. I hope this guide was helpful, let me know what you think in the comments.

Happy coding!

Top comments (0)