DEV Community

Rory Warin for Bloomreach

Posted on

Discovery | Crawling Billions of Pages: Building Large Scale Crawling Cluster (Pt 1)

Blog written by: Chou-han Yang from Bloomreach, 2015

Introduction

At Bloomreach, we are constantly crawling our customers’ websites to ensure their quality and to obtain the information we need to run our marketing applications. It is fairly easy to build a prototype with a few lines of scripts and there are a bunch of open source tools available to do that, such as Apache Nutch. We chose to build our own web crawling cluster at Bloomreach after evaluating several open source options. Our requirements were:

  • Handle more than billions of pages (in about a week).
  • Use Hadoop (with Amazon EMR) to parse those pages efficiently.
  • Have constant QPS (query per second) for each website.
  • Have multiple task queues per each website with different priorities.
  • Handle long latency for slow websites.

A very typical architecture for web crawling clusters includes three main components:

  1. A fetcher that send HTTP requests and reads content.
  2. A centralized queuing system for job management and distribution.
  3. A backend pipeline to parse and post-process pages.

Image description

For part one of this series, I would like to focus on the fetcher that we use to crawl our customers’ pages. I’ll cover other components separately in future posts.

First attempt: single process loop

To kick-start our discussion, we will just use simple code snippets to demonstrate a very simple fetcher:

import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.URL;

import java.util.List;



public class Crawler {



  public static void crawl(List urls) throws IOException {

    for (String urlStr : urls) {

      URL url = new URL(urlStr);



      BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()));

      processHTML(in);

      in.close();

    }

  }



  public static void processHTML(BufferedReader in) {

    // ...

  }



}

Enter fullscreen mode Exit fullscreen mode

This is a very straightforward implementation of the fetcher with several potential scaling issues:

  • It uses single thread and only one thread per process. In order to concurrently fetch from more than one website, the fetcher needs multiple processes.
  • If a single page takes a long time to process, or even worse, the server times out without any response, the whole process will be stuck.

As straightforward as it is, this approach won’t go very far before some operational headaches set in. So naturally, a better approach would be to use multiple threads in a single process. Unfortunately, with this system, the memory overhead for each process will quickly consume all your memory space.

Second attempt: multithreaded HTTP client

package scratch;



import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.URL;

import java.util.List;



public class Crawler implements Runnable {



  private String urlStr = null;



  public Crawler(String urlStr) {

    this.urlStr = urlStr;

  }



  public static void crawl(List urls) {

    for (String urlStr : urls) {

      new Thread(new Crawler(urlStr)).run();

    }

  }



  @Override

  public void run() {

    try {

      URL url = new URL(urlStr);



      BufferedReader in = new BufferedReader(new InputStreamReader(url.openStream()));

      processHTML(in);

      in.close();

    } catch (IOException e) {

      // Deal with exception.

    }

  }



  public static void processHTML(BufferedReader in) {

    // ...

  }



}

Enter fullscreen mode Exit fullscreen mode

This process seems more modern and it removes the requirements to run more than one process on a single machine. But the shortcoming that a single page can stop the whole loop remains. Compared to multiple process, multiple thread has better memory efficiency, but it will reach its limit when you are running at least 400 to 500 threads on a quad core machine.

Third attempt: asynchronous HTTP (Windows style)

To solve the problem of blocking threads for each website loop, people long ago developed solutions for Windows. An experienced Windows IIS programmer would be very familiar with the event-driven programming paradigm. Coming up with the same code in Java isn’t easy, but it might look something like:

package scratch;



import java.io.BufferedReader;

import java.io.IOException;

import java.io.InputStreamReader;

import java.net.URL;

import java.util.List;



public class Crawler {



  public static void crawl(List urls) {

    for (String urlStr : urls) {

      AsyncHttpClient client = new AsyncHttpClient();

      Response response = client.prepareGet(url).execute(new AsyncHandler<T>() {



        void onThrowable(Throwable t) {

        }



        public STATE onBodyPartReceived(HttpResponseBodyPart bodyPart) throws Exception {

          processHTML(bodyPart);

          return STATE.CONTINUE;

        }



        public STATE onStatusReceived(HttpResponseStatus responseStatus) throws Exception {

          return STATE.CONTINUE;

        }



        public STATE onHeadersReceived(HttpResponseHeaders headers) throws Exception {

          return STATE.CONTINUE;

        }



        T onCompleted() throws Exception {

          return T;

        }

      });

    }

  }



  public static void processHTML(BufferedReader in) {

    // ...

  }



}

Enter fullscreen mode Exit fullscreen mode

Windows usually uses a single thread to process all events, but you can allow multiple threads by changing the setting of the IIS Web server. The Windows operating system can dispatch different events to different window handlers so you can handle all asynchronous HTTP calls efficiently. For a very long time, people weren’t able to do this on Linux-based operating systems since the underlying socket library contained a potential bottleneck.

Fourth attempt: HTTP client with asynchronous I/O

The potential bottleneck has been removed by kernel 2.5.44 with the introduction to epoll system call. This allows a process to monitor a huge number of TCP connections without polling from each connection one-by-one. This also triggered the creation of series non-blocking libraries such as Java NIO.

Network libraries based on Java NIO have the benefit of easily scaling from a few thousands to tens of thousands of TCP connection per machine. The CPU no longer spends time in a waiting state or context switching between a huge number of threads. Therefore, performance and throughput both increase.

import java.net.*;

import java.util.List;



import org.jboss.netty.bootstrap.ClientBootstrap;

import org.jboss.netty.buffer.ChannelBuffer;

import org.jboss.netty.channel.*;

import org.jboss.netty.handler.codec.http.*;



import static org.jboss.netty.channel.Channels.pipeline;



public class Crawler extends SimpleChannelUpstreamHandler {



  public static void crawl(List urls) throws URISyntaxException {

    for (String urlStr : urls) {

      new Crawler().asyncRead(urlStr);

    }

  }



  public void asyncRead(String urlStr) throws URISyntaxException {

    URI uri = new URI(urlStr);



    // Configure the client.

    ClientBootstrap bootstrap = new ClientBootstrap();



    final SimpleChannelUpstreamHandler handler = this;



    // Set up the event pipeline factory.

    bootstrap.setPipelineFactory(new ChannelPipelineFactory() {



      @Override

      public ChannelPipeline getPipeline() throws Exception {

        ChannelPipeline pipeline = pipeline();

        pipeline.addLast("handler", handler);

        return pipeline;

      }

    });



    // Start the connection attempt.

    ChannelFuture future = bootstrap.connect(new InetSocketAddress(uri.getHost(), uri.getPort()));



    // Wait until the connection attempt succeeds or fails.

    Channel channel = future.awaitUninterruptibly().getChannel();

    if (!future.isSuccess()) {

      future.getCause().printStackTrace();

      bootstrap.releaseExternalResources();

      return;

    }



    // Prepare the HTTP request.

    HttpRequest request = new DefaultHttpRequest(HttpVersion.HTTP_1_1, HttpMethod.GET, uri.getRawPath());



    // Send the HTTP request.

    channel.write(request);



    // Wait for the server to close the connection.

    channel.getCloseFuture().awaitUninterruptibly();



    // Shut down executor threads to exit.

    bootstrap.releaseExternalResources();

  }



  @Override

  public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {

    HttpResponse response = (HttpResponse) e.getMessage();

    processHTML(response.getContent());

  }



  public void processHTML(ChannelBuffer content) {

    // ...

  }



}

Enter fullscreen mode Exit fullscreen mode

We use Netty to build our crawler, not only because it uses Java NIO, but because it also provides a good pipeline abstraction to the network stack. It is easy to insert a handler to HTTPS, compression, or time-out without compromising the code structure.

Conclusion

Based on our stress tests, each node with a quad-core CPU can go up to 600 queries per second, reaching the maximum network bandwidth, with its average HTML of size 400K bytes. With a six-node cluster, we can crawl at 3,600 QPS, which is about 311 million pages a day, or 1.2 billion pages in four days.

Next time, we will talk about how to store tasks with a very long list of URLs with efficient queuing and scheduling.

Top comments (0)