DEV Community

Pavel Zeger
Pavel Zeger

Posted on • Updated on

How to implement GZIP decompression for incoming HTTP requests on the Netty server

Disclaimer

This article represents my perspective on this solution, thus any suggestions, fixes, or discussions will be highly appreciated.

The short story

We use Spring Boot and Spring Webflux to run our platform with Netty HTTP server and client. As you may know, Netty doesn't have an out-of-the-box solution to decompress incoming HTTP requests to an HTTP server, hence I needed to "reinvent the wheel" using Netty capabilities.

The implementation

There are a few ways to implement a decompression on the incoming request in Netty:

  • via WebFilter
  • via custom handler added to the Netty event loop
  • via controller itself

The most convenient and flexible option is to use the WebFilter interface with a high precedence configuration, as this approach allows us to filter requests not only by their metadata but also by the request's data we receive from our clients.

According to the MSDN documentation (Compression in HTTP) in order to start a decompression of the payload, the server have to get an HTTP header Content-Encoding: gzip from a client. Thus, our first goal is to recognize the presence of such a header within a request. Let's create a utility class, CompressionUtils, that will hold a static method to verify the presence of this header and another static method that will convert a GZIP input stream into a byte array to transfer this array further into the Netty pipeline:

import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.stereotype.Component;

import lombok.AccessLevel;
import java.io.IOException;
import java.io.InputStream;
import java.io.StringWriter;
import java.nio.charset.StandardCharsets;

import static java.util.Objects.nonNull;
import static org.apache.commons.io.IOUtils.copy;
import static org.springframework.http.HttpHeaders.CONTENT_ENCODING;
import static org.springframework.util.CollectionUtils.isEmpty;

@Component
@NoArgsConstructor(access = AccessLevel.PRIVATE)
public class CompressionUtils {

    public static final String GZIP = "gzip";
    public static final String UNKNOWN = "unknown";

    public static byte[] getDeflatedBytes(InputStream inputStream) throws IOException {
        String string = IOUtils.toString(inputStream, UTF_8);
        return string.getBytes();
    }

    public static boolean isGzipRequest(ServerHttpRequest serverHttpRequest) {
        return containsGzip(serverHttpRequest, CONTENT_ENCODING);
    }

    public static boolean isGzipResponseRequired(ServerHttpRequest serverHttpRequest) {
        return containsGzip(serverHttpRequest, ACCEPT_ENCODING);
    }

    private static boolean containsGzip(ServerHttpRequest serverHttpRequest, String headerName) {
        HttpHeaders headers = serverHttpRequest.getHeaders();
        if (!isEmpty(headers)) {
            String header = headers.getFirst(headerName);
            return nonNull(header) && header.contains(GZIP);
        }

        return false;
    }

}
Enter fullscreen mode Exit fullscreen mode

To prevent blocking conversion of the incoming input stream into a byte array, I've used the Apache Commons IO project and the static toString method uses the StringBuilder class.

The second stage of our implementation is to wrap an incoming ServerHttpRequest instance into our implementation. In this way, we can retain the source data of the incoming request and override the getBody method. The Netty pipeline invokes this method to convert incoming data into a Netty's DataBuffer wrapper. Therefore, it's a convenient way to swap this part of the flow instead of integrating it into the Netty flow with a custom handler. Customizing it via WebFilter is a cleaner approach that does not affect all incoming requests and allows us to control the decompression process (with error handling and metrics).

So, our wrapper class GzipServerHttpRequest has the main method getBody with the following logic:

  1. Gets a body of the source server HTTP request.
  2. Transform it into an InputStream.
  3. Transform a reactive stream of input streams into a SequenceInputStream to get the whole POST request body.
  4. Decompress the GZIP body with GZIPInputStream.
  5. Wrap the final byte array into a Netty DataBuffer again.
import org.springframework.core.io.buffer.DataBuffer;
import org.springframework.core.io.buffer.DefaultDataBufferFactory;
import org.springframework.http.HttpCookie;
import org.springframework.http.HttpHeaders;
import org.springframework.http.HttpMethod;
import org.springframework.http.server.RequestPath;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.SslInfo;
import org.springframework.util.MultiValueMap;
import reactor.core.publisher.Flux;
import reactor.core.publisher.SynchronousSink;

import lombok.RequiredArgsConstructor;
import java.io.InputStream;
import java.io.SequenceInputStream;
import java.net.InetSocketAddress;
import java.net.URI;
import java.time.Instant;
import java.util.zip.GZIPInputStream;

@SuppressWarnings("NullableProblems")
@RequiredArgsConstructor
public class GzipServerHttpRequest implements ServerHttpRequest {

    private final ServerHttpRequest serverHttpRequest;

    @Override
    public String getId() {
        return serverHttpRequest.getId();
    }

    @Override
    public RequestPath getPath() {
        return serverHttpRequest.getPath();
    }

    @Override
    public MultiValueMap<String, String> getQueryParams() {
        return serverHttpRequest.getQueryParams();
    }

    @Override
    public MultiValueMap<String, HttpCookie> getCookies() {
        return serverHttpRequest.getCookies();
    }

    @Override
    public String getMethodValue() {
        return serverHttpRequest.getMethodValue();
    }

    @Override
    public URI getURI() {
        return serverHttpRequest.getURI();
    }

    @Override
    public HttpHeaders getHeaders() {
        return serverHttpRequest.getHeaders();
    }

    @Override
    public InetSocketAddress getRemoteAddress() {
        return serverHttpRequest.getRemoteAddress();
    }

    @Override
    public SslInfo getSslInfo() {
        return serverHttpRequest.getSslInfo();
    }

    @Override
    public HttpMethod getMethod() {
        return serverHttpRequest.getMethod();
    }

    @Override
    public Flux<DataBuffer> getBody() {
        final Instant startTime = Instant.now();
        return serverHttpRequest.getBody()
                .map(dataBuffer -> dataBuffer.asInputStream(true))
                .reduce(SequenceInputStream::new)
                .handle(this::decompress)
                .flux();
    }

    private void decompress(InputStream inputStream, SynchronousSink<DataBuffer> sink) {
        try (var gzipInputStream = new GZIPInputStream(inputStream)) {
            byte[] deflatedBytes = getDeflatedBytes(gzipInputStream);
            sink.next(new DefaultDataBufferFactory().wrap(deflatedBytes));
        } catch (Exception exception) {
            sink.error(getException());
        }
    }

    private IllegalGzipRequestException getException() {
        String exceptionMessage = String.format("Decompression of a gzip content failed, URI: [%s]", serverHttpRequest.getURI());
        return new IllegalGzipRequestException(exceptionMessage);
    }

}
Enter fullscreen mode Exit fullscreen mode

Once we have the main implementation, we can now add the GzipDecompressionFilter to mutate our ServerWebExchange with the new instance of a wrapped ServerHttpRequest:

import lombok.extern.slf4j.Slf4j;
import org.springframework.core.Ordered;
import org.springframework.core.annotation.Order;
import org.springframework.http.HttpHeaders;
import org.springframework.http.server.reactive.ServerHttpRequest;
import org.springframework.http.server.reactive.ServerHttpResponse;
import org.springframework.stereotype.Component;
import org.springframework.web.server.ServerWebExchange;
import org.springframework.web.server.WebFilter;
import org.springframework.web.server.WebFilterChain;
import reactor.core.publisher.Mono;

import java.time.Instant;

import static java.util.Objects.isNull;
import static org.apache.commons.lang3.exception.ExceptionUtils.getMessage;
import static org.springframework.http.HttpHeaders.CONTENT_ENCODING;
import static org.springframework.http.HttpStatus.OK;
import static org.springframework.util.CollectionUtils.isEmpty;

@Slf4j
@Component
@Order(Ordered.HIGHEST_PRECEDENCE)
public class GzipDecompressionFilter implements WebFilter {

    @SuppressWarnings("NullableProblems")
    @Override
    public Mono<Void> filter(ServerWebExchange serverWebExchange, WebFilterChain webFilterChain) {
        if (!isGzipRequest(serverWebExchange.getRequest()))
            return webFilterChain.filter(serverWebExchange);

        final Instant startTime = Instant.now();
        ServerWebExchange mutatedWebExchange = getMutatedWebExchange(serverWebExchange);
        return webFilterChain
                .filter(mutatedWebExchange)
                .onErrorResume(this::logError);
    }

    private ServerWebExchange getMutatedWebExchange(ServerWebExchange serverWebExchange) {
        ServerHttpRequest mutatedHttpRequest = new GzipServerHttpRequest(serverWebExchange.getRequest());
        return serverWebExchange
                .mutate()
                .request(mutatedHttpRequest)
                .build();
    }

    private Mono<Void> logError(Throwable exception) {
        log.error("Gzip decompressed HTTP request failed, exception: [{}]", getMessage(exception));
        return Mono.empty();
    }

}
Enter fullscreen mode Exit fullscreen mode

The last class we'll add is our custom RuntimeException, IllegalGzipRequestException, to recognize errors of this type in the third-party system we use for intercepting exceptions in our services:

public class IllegalGzipRequestException extends RuntimeException {

    public IllegalGzipRequestException(String message) {
        super(message);
    }

}
Enter fullscreen mode Exit fullscreen mode

In case our client wants to get a compressed response, we can easily add this implementation via the application.properties file:

server.compression.enabled=true
server.compression.min-response-size=1KB
Enter fullscreen mode Exit fullscreen mode

Resources

  1. Compression in HTTP
  2. Interface WebFilter
  3. Project Lombok
  4. Class Compression
  5. Netty HttpContentEncoder

Finding my articles helpful? You could give me a caffeine boost to keep them coming! Your coffee donation will keep my keyboard clacking and my ideas brewing. But remember, it's completely optional. Stay tuned, stay informed, and perhaps, keep the coffee flowing!
keep the coffee flowing

Top comments (0)