DEV Community

Salad Lam
Salad Lam

Posted on

1 1 1 1 1

Reactor Netty: UDP DNS client example

This example is to demonstrate

  • send and receive DNS UDP packets
  • add netty's built-in ChannelHandler
  • using Reactor Netty's interface to build send and receive action
  • terminate connection

Code of netty is here and using following library

  • netty 4.1.107.Final
  • Project Reactor 3.6.3
  • Reactor Netty 1.1.16

Code

package example;

import io.netty.buffer.ByteBufUtil;
import io.netty.handler.codec.dns.*;
import io.netty.util.NetUtil;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import reactor.netty.Connection;
import reactor.netty.udp.UdpClient;

import java.net.InetSocketAddress;

public class DnsUdpClient {

    private static final String SERVER_HOST = "8.8.8.8";
    private static final int SERVER_PORT = 53;
    private static final Logger LOGGER = LoggerFactory.getLogger(DnsUdpClient.class);

    // (1)
    private static void handleQueryResp(DatagramDnsResponse msg) {
        if (msg.count(DnsSection.QUESTION) > 0) {
            DnsQuestion question = msg.recordAt(DnsSection.QUESTION, 0);
            LOGGER.info("name: {}", question.name());
        }
        for (int i = 0, count = msg.count(DnsSection.ANSWER); i < count; i++) {
            DnsRecord r = msg.recordAt(DnsSection.ANSWER, i);
            if (r.type() == DnsRecordType.A) {
                //just print the IP after query
                DnsRawRecord raw = (DnsRawRecord) r;
                LOGGER.info("{}", NetUtil.bytesToIpAddress(ByteBufUtil.getBytes(raw.content())));
            }
        }
    }

    public static void main(String[] args) {
        UdpClient client = UdpClient.create()
                .host(SERVER_HOST).port(SERVER_PORT)    // (2)
                .wiretap(true)  // (3)
                /*
                // (4)
                .doOnChannelInit((observer, channel, remoteAddress) -> {
                    Connection c = Connection.from(channel);
                    c.addHandlerLast(new DatagramDnsQueryEncoder());
                    c.addHandlerLast(new DatagramDnsResponseDecoder());
                    LOGGER.info("pipeline={}", channel.pipeline());
                });
                */
                // (5)
                .doOnConnected(c -> {
                    c.addHandlerLast(new DatagramDnsQueryEncoder());
                    c.addHandlerLast(new DatagramDnsResponseDecoder());
                    LOGGER.info("pipeline={}", c.channel().pipeline());
                });

        Connection conn = client.connectNow();  // (6)

        // (7)
        conn.inbound().receiveObject()
                .doOnNext(obj -> {
                    DatagramDnsResponse r = (DatagramDnsResponse) obj;
                    LOGGER.info("response={}", obj);
                    handleQueryResp(r);
                })
                .doOnError(err -> LOGGER.error(String.valueOf(err)))
                .subscribe();

        // (8)
        conn.outbound()
                .sendObject(new DatagramDnsQuery(null, new InetSocketAddress(SERVER_HOST, SERVER_PORT), 0x42b7)
                        .setRecord(
                                DnsSection.QUESTION,
                                new DefaultDnsQuestion("www.google.com", DnsRecordType.A)
                        ))
                .then().subscribe();

        conn.outbound()
                .sendObject(new DatagramDnsQuery(null, new InetSocketAddress(SERVER_HOST, SERVER_PORT), 0x42b8)
                        .setRecord(
                                DnsSection.QUESTION,
                                new DefaultDnsQuestion("projectreactor.io", DnsRecordType.A)
                        ))
                .then().subscribe();

        // (9)
        conn.onReadIdle(5000, () -> {
            LOGGER.error("Request time out");
            conn.disposeNow();
        });

        // (10)
        conn.onDispose().block();
    }

}
Enter fullscreen mode Exit fullscreen mode

Explanation

(1): for decoding DNS reply

(2): destination IP and port number is necessary when create netty's io.netty.channel.Channel

(3): a debug ChannelHandler which prints all events related to Channel in DEBUG level will be added into the start of Channel's pipeline

(4): doOnChannelInit() accepts reactor.netty.ChannelPipelineConfigurer, for configuring the channel pipeline while initializing the channel.

This ChannelPipelineConfigurer is called by reactor.netty.transport.TransportConfig.TransportChannelInitializer#initChannel

@Override
protected void initChannel(Channel channel) {
    ChannelPipeline pipeline = channel.pipeline();

    if (config.metricsRecorder != null) {
        //...
    }

    if (config.loggingHandler != null) {
        pipeline.addFirst(NettyPipeline.LoggingHandler, config.loggingHandler);
    }

    ChannelOperations.addReactiveBridge(channel, config.channelOperationsProvider(), connectionObserver);

    config.defaultOnChannelInit()
          .then(config.doOnChannelInit)
          .onChannelInit(connectionObserver, channel, remoteAddress);   // <- HERE

    pipeline.remove(this);

    if (log.isDebugEnabled()) {
        log.debug(format(channel, "Initialized pipeline {}"), pipeline.toString());
    }
}
Enter fullscreen mode Exit fullscreen mode

To obtain reactor.netty.Connection instance from Netty's Channel instance, the following static method can be used.

Connection c = Connection.from(channel);
Enter fullscreen mode Exit fullscreen mode

At the beginning of function executing, pipeline contains

pipeline=DefaultChannelPipeline{(reactor.left.loggingHandler = reactor.netty.transport.logging.ReactorNettyLoggingHandler), (TransportConfig$TransportChannelInitializer#0 = reactor.netty.transport.TransportConfig$TransportChannelInitializer), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}

Enter fullscreen mode Exit fullscreen mode

At the end of function executing, pipeline contains

pipeline=DefaultChannelPipeline{(reactor.left.loggingHandler = reactor.netty.transport.logging.ReactorNettyLoggingHandler), (TransportConfig$TransportChannelInitializer#0 = reactor.netty.transport.TransportConfig$TransportChannelInitializer), (DatagramDnsQueryEncoder = io.netty.handler.codec.dns.DatagramDnsQueryEncoder), (DatagramDnsResponseDecoder = io.netty.handler.codec.dns.DatagramDnsResponseDecoder), (reactor.right.reactiveBridge = reactor.netty.channel.ChannelOperationsHandler)}
Enter fullscreen mode Exit fullscreen mode

reactor.netty.channel.ChannelOperationsHandler is the bridge connect netty and Reactor Netty library. It must at the end of pipeline.

DatagramDnsQueryEncoder and DatagramDnsResponseDecoder are netty's build-in ChannelHandler, which response for encoding/decoding raw ByteBuf from/to DNS instances.

(5): doOnConnected() is another way for configuring the channel pipeline. It do the job exactly the same as (4), but it run on Channel's channelConnected event.

(6): block, perform actual establish connection (but there is no need for establish connection of UDP traffic), and return when connection is setup.

(7): define the action on packet recevied. inbound().receiveObject() returns Flux. Remember to append subscribe() at the end.

(8): there is 2 lines, one for one request sent. then() will returns a Mono which will have complete signal when packet is sent. Also it must be subscribed.

(9): terminate connection if do not receive packet in 5 seconds

(10): get a Mono which will have complete signal when connection is shutdown successfully. block() is blocking the main thread. Main thread must end at last.

Speedy emails, satisfied customers

Postmark Image

Are delayed transactional emails costing you user satisfaction? Postmark delivers your emails almost instantly, keeping your customers happy and connected.

Sign up

Top comments (0)

Heroku

This site is powered by Heroku

Heroku was created by developers, for developers. Get started today and find out why Heroku has been the platform of choice for brands like DEV for over a decade.

Sign Up

👋 Kindness is contagious

Please leave a ❤️ or a friendly comment on this post if you found it helpful!

Okay