DEV Community

Geazi Anc
Geazi Anc

Posted on

Engenharia de Dados com Scala: aprenda a fazer webscraping dos filmes mais assistidos da Netflix em cada país

No vasto universo do streaming, a Netflix reina soberana em seu trono, proporcionando entretenimento a milhões de espectadores em todo o globo. Enquanto os usuários mergulham nos extensos catálogos de filmes, séries e documentários, uma pergunta intrigante surge: quais são os filmes mais assistidos em diferentes partes do mundo? Em busca dessa resposta, embarcamos em uma jornada de engenharia de dados, utilizando a poderosa linguagem Scala para extrair e analisar os top 10 filmes mais assistidos de cada país na plataforma.

A essência deste projeto reside na aplicação de webscraping para desvendar os segredos guardados nas páginas da Netflix, revelando não apenas os filmes mais populares, mas também proporcionando insights valiosos sobre as nuances culturais que moldam o gosto cinematográfico em diferentes nacionalidades.

Para isso, desenvolveremos uma pipeline de dados com a linguagem Scala para a extração, transformação e carregamento desses dados, com todo o desenvolvimento baseado no paradigma da programação funcional. Além disso, como era de se imaginar, iremos utilizar algumas bibliotecas não nativas da linguagem durante esse processo, tais como STTP para solicitações HTTP, uPickle para a leitura e criação de objetos json e Scala XML para a manipulação de conteúdos XML.

Portanto, se você se interessou em saber sobre os top 10 filmes mais assistidos na Netflix em cada país ao redor do mundo, junte-se a mim nessa emocionante jornada de exploração de dados! Todo o código do projeto está disponível em meu GitHub 💚.

Índice

1. Análise de requisitos

No site oficial da Netflix é possível visualizar as séries e filmes mais assistidos do mundo, de forma global. Logo mais abaixo encontram-se uma lista de países com seus respectivos filmes mais assistidos da semana, como, por exemplo, do nosso querido Brasil. O leitor perspicaz irá notar que logo embaixo da tabela há alguns arquivos no formato TSV disponíveis para baixarmos e fazermos nossas análises. Contudo, o objetivo desse artigo é a extração desses dados por meio de webscraping e, portanto, deixaremos esses arquivos de lado por hora 😉.

Nossa tarefa aqui é bem simples, a princípio. Precisamos desenvolver uma pipeline de dados que realiza a extração de todos os top 10 filmes mais assistidos da semana em todos os países disponíveis na Netflix e, posteriormente, salvar esses dados localmente no formato json. Os dados devem consistir nesses filmes coletados para cada país da plataforma, assim como o nome do país, a URL direcionando para a página onde esses dados foram extraídos e a data de quando a extração foi realizada.

2. Configuração do ambiente de desenvolvimento

A abordagem mais tradicional para o desenvolvimento dessa pipeline seria construir um projeto com base na arquitetura de código padrão gerada pelo SBT. Porém, como você e eu prezamos muito pela simplicidade e organização do código, optaremos no desenvolvimento da pipeline por meio de scripts Scala, utilizando uma das melhores ferramentas desenvolvidas pela comunidade de todos os tempos, o Scala CLI.

A escolha do Scala CLI ao invés do tradicional SBT se dá pela simplicidade em executar código Scala sem a necessidade de montar todo um projeto para isso. O Scala CLI também permite testar rapidamente um trecho de código adicionando todas as dependências necessárias e, o melhor de tudo, possui uma integração simples com a biblioteca toolkit, uma biblioteca que provê um conjunto de dependências bem úteis para prototipagem. Caso você tenha instalado o Scala por meio do Coursier, o Scala CLI já vem instalado por padrão. Caso não tenha o Scala ou o Scala CLI instalado em sua máquina, confira os passos na própria documentação.

$ cs version
2.1.8
$ scala --version
Scala code runner version 3.3.1 -- Copyright 2002-2023, LAMP/EPFL
$ java -version
openjdk version "11.0.20.1" 2023-08-24
OpenJDK Runtime Environment (build 11.0.20.1+1-post-Ubuntu-0ubuntu122.04)
OpenJDK 64-Bit Server VM (build 11.0.20.1+1-post-Ubuntu-0ubuntu122.04, mixed mode, sharing)
$ scala-cli --version
Scala CLI version: 1.0.6
Scala version (default): 3.3.1
Enter fullscreen mode Exit fullscreen mode

Para o desenvolvimento do projeto, iremos instalar as seguintes dependências:

  • Toolkit: um conjunto de bibliotecas projetadas para executar tarefas de programação comuns. Ele inclui ferramentas para trabalhar com arquivos e processos, analisar JSON, enviar solicitações HTTP e desenvolver testes unitários.
  • STTP: uma biblioteca de código aberto a qual provê uma API simples e amigável para descrever solicitações HTTP e a manipulação de responses.
  • Scala XML: a biblioteca padrão da linguagem Scala para a manipulação e criação de dados no formato XML.
  • uPickle: uma biblioteca leve para a serialização e deserialização de JSON para a linguagem Scala.
  • Scribe: uma biblioteca que provê um modo rápido e efetivo de realizar logs em Scala, sem a necessidade de arquivos de configuração ou dependências adicionais.
  • OS-Lib: uma interface simples para a manipulação de arquivos e subprocessos em Scala.

Mãos na massa! Antes de tudo, crie um diretório chamado netflix-scraper e, em seguida, um subdiretório chamado src.

$ mkdir -vp netflix-scraper/src
mkdir: created directory 'netflix-scraper/src'
$ cd Netflix-scraper
netflix-scraper
Enter fullscreen mode Exit fullscreen mode

Agora, iremos criar um arquivo chamado Scraper.sc no diretório src. Esse será o arquivo principal (não único) de nossa pipeline. Observe que o arquivo está com a extensão .sc ao invés de .scala. A extensão .sc é um formato comum quando falamos do desenvolvimento de scripts Scala 🤖.

$ touch src/Scraper.sc
src/Scraper.sc
Enter fullscreen mode Exit fullscreen mode

E, por fim, iremos adicionar essas três primeiras linhas:

//> using toolkit "latest"
//> using dep org.scala-lang.modules::scala-xml:2.2.0
//> using dep com.outr::scribe:3.12.2
Enter fullscreen mode Exit fullscreen mode

Sim, é isso mesmo que você leu. Toda a configuração de nossa pipeline é feita através de comentários no topo de nosso script, também conhecidos como diretivas (directives) 🎉. Logo, não é necessário explicitamente declarar as dependências em um arquivo build.sbt, como é feito tradicionalmente. Como eu disse, isso torna a experiência de desenvolvimento bem mais simples e rápida. Recomendo a leitura da documentação do Scala CLI, onde há muito mais exemplos de configuração através das directives.

  • Linha 1: estamos dizendo ao compilador para utilizar a versão mais recente da biblioteca toolkit. Por padrão, ele instala automaticamente as bibliotecas STTP, uPickle, OsLib e MUnit.
  • Linha 2: instalamos a biblioteca scala-xml. Observe que o formato do artefato não difere muito daquilo que escrevemos dentro dos arquivos build.sbt, substituindo, apenas, os sinais de $ (sifrão) por : (dois pontos).
  • Linha 3: novamente a instalação de uma dependência, dessa vez o Scribe. Nada de novo por aqui!

Podemos instalar essas dependências apenas compilando todo o projeto através do Scala CLI:

$ scala-cli compile .
Compiling project (Scala 3.3.1, JVM (11))
Compiled project (Scala 3.3.1, JVM (11))
Enter fullscreen mode Exit fullscreen mode

E aí, instalou? Como posso testar isso? É simples, jovem padawan! É só iniciar uma sessão do console do scala no diretório atual, que todas as dependências instaladas serão automaticamente carregadas para nós:

$ scala-cli repl .
           Welcome to Scala 3.3.1 (11.0.20.1, Java OpenJDK 64-Bit Server VM).
Type in expressions for evaluation. Or try :help.
       Scala>
scala> import upickle.default.*
scala> write(ujson.Obj("hello" -> "world!"))
val res0: String = {"hello":"world!"}
scala> import sttp.client4.quick.*
scala> quickRequest.get(uri"https://www.netflix.com").send().code
val res1: sttp.model.StatusCode = 200
scala> os.pwd
val res2: os.Path = /home/geazi/repos/netflix-scraper
scala> import scala.xml.*
scala> <p>Hello, readers!</p>
val res3: scala.xml.Elem = <p>Hello, readers!</p>
Enter fullscreen mode Exit fullscreen mode

Sim, aparentemente todas as dependências estão instaladas, conforme o esperado. Agora, vamos dar uma olhada de perto de como será a arquitetura geral de nossa pipeline!

3. Arquitetura geral

A pipeline de dados irá consistir em cinco componentes principais:

  • Extractor: módulo responsável por fazer a extração dos dados nas páginas web da Netflix. Irá extrair todos os países disponíveis na plataforma e também extrair os dados dos filmes mais assistidos da semana de cada um dos países obtidos. Centralizado no pacote service.
  • Transformer: módulo responsável por fazer todas as transformações de dados necessárias para que possamos extrair os dados das páginas web. Irá consistir, em grande parte, em métodos para a manipulação de conteúdos XML. Centralizado no pacote service.
  • Loader: módulo que centraliza os métodos para carregar o conteúdo final em um determinado destino. Como vimos nos requisitos do projeto, os dados serão salvos no formato json no diretório local. Centralizado no pacote service.
  • Model: pacote que reúne os modelos para a estruturação dos dados coletados. Mais detalhes na próxima sessão.
  • Core: onde irá ficar todo o código para a extração, transformação e carregamento dos dados. Em termos práticos, nosso arquivo src/Scraper.sc.

Vamos criar a estrutura de diretórios para esses componentes. A princípio iremos criar os módulos vazios e posteriormente preenchelos com os códigos necessários.

$ mkdir -v src/model
mkdir: created directory 'src/model'
$ mkdir -v src/service
mkdir: created directory 'src/service'
$ touch src/service/Extractor.scala
src/service/Extractor.scala
$ touch src/service/Transformer.scala
src/service/Transformer.scala
$ touch src/service/Loader.scala
src/service/Loader.scala
Enter fullscreen mode Exit fullscreen mode

Sim, eu imagino que você deve estar se perguntando: por que agora os arquivos estão no formato .scala? Porque são módulos da nossa pipeline, não o script principal, como ocorre com o Scraper.sc. Caso tenha mais de um arquivo .sc no mesmo diretório, você deve especificar para o compilador qual deles deseja executar, já que ele reconhece ambos como arquivos que contém a classe Main. Fora que, ao escrever nossos componentes em arquivos .scala, torna-se mais fácil de migrá-los posteriormente para um projeto SBT, caso haja a necessidade.

Agora iremos fazer a modelagem de nossos dados 🎲.

4. Modelagem de dados

Como foi visto nos requisitos do projeto, precisamos extrair todos os filmes mais assistidos da semana na Netflix para cada país disponível na plataforma. Os dados devem consistir nesses filmes coletados, assim como o nome do país, a URL direcionando para a página dos filmes e a data em que a extração foi realizada.

Portanto, podemos estruturar esses dados em dois modelos:

  • Film: modelo que representa individualmente cada filme mais assistido da semana em cada um dos países. Como vimos na página da Netflix, a tabela consiste em três colunas: rank, title e weeksInTop10, as quais se tornarão atributos no modelo.
  • Top10FilmsByCountry: modelo que representa os filmes mais assistidos da semana em cada um dos países. Ele consiste no nome do país, a URL direcionando para a página onde esses dados foram coletados, a data em que a extração ocorreu e os filmes propriamente ditos.

Mãos na massa! Vamos criar esses modelos no pacote model de nosso projeto. Não podemos esquecer que esses dados posteriormente serão convertidos para o formato json, então em nossos modelos já iremos herdar a classe ReadWriter da biblioteca uPickle. Para mais detalhes sobre a conversão de case classes em objetos json com uPickle, recomendo a leitura da documentação, já que está fora do escopo desse artigo explicar essas nuances com detalhes.

$ touch src/model/Film.scala
src/model/Film.scala
$ touch src/model/Top10FilmsByCountry.scala
src/model/Top10FilmsByCountry.scala
Enter fullscreen mode Exit fullscreen mode
// src/model/Film.scala
package io.scraper.netflix.model

import upickle.default.*

final case class Film(rank: Int, title: String, weeksInTop10: Int)
    derives ReadWriter

Enter fullscreen mode Exit fullscreen mode

Agora vamos criar o modelo Top10FilmsByCountry.

// src/model/Top10FilmsByCountry.scala
package io.scraper.netflix.model

import java.time.LocalDate
import java.time.format.DateTimeFormatter

import upickle.default.*

final case class Top10FilmsByCountry(
  countryName: String,
  url: String,
  weekOf: LocalDate,
  films: Seq[Film],
)

Enter fullscreen mode Exit fullscreen mode

Esse modelo, como pôde observar, possui uma peculiaridade do anterior, visto que possui um atributo weekOf do tipo LocalDate. Por conta disso, não podemos derivar diretamente a classe ReadWriter do uPickle, visto que daria erro de conversão na hora de transformar o modelo em um objeto json, já que o uPickle trabalha nativamente apenas com tipos primitivos da linguagem Scala.

Para solucionar esse problema, portanto, precisaremos customizar o serializer e o deserializer do uPickle, convertendo manualmente o tipo LocalDate para String durante a conversão para objeto json e convertendo a String weekOf para o tipo LocalDate durante a conversão da String json para a case class Top10FilmsByCountry. A documentação possui um ótimo exemplo para criar seus próprios serializers e deserializers. Recomendo a leitura caso não entenda o que esteja ocorrendo no código.

Para isso, vamos criar um companion object da case class com os serializers e deserializers.

object Top10FilmsByCountry:
  private val format = DateTimeFormatter.ofPattern("yyyy-MM-dd")

  private val serializer: Top10FilmsByCountry => ujson.Obj     = x =>
    ujson.Obj(
      "countryName" -> x.countryName,
      "url"         -> x.url,
      "weekOf"      -> x.weekOf.toString,
      "films"       -> writeJs(x.films),
    )

  private val deserializer: ujson.Value => Top10FilmsByCountry = json =>
    new Top10FilmsByCountry(
      countryName = json("countryName").str,
      url = json("url").str,
      weekOf = LocalDate.parse(json("weekOf").str, format),
      films = read[Seq[Film]](json("films")),
    )

  implicit val rw: ReadWriter[Top10FilmsByCountry] =
    readwriter[ujson.Value].bimap[Top10FilmsByCountry](
      serializer,
      deserializer,
    )

Enter fullscreen mode Exit fullscreen mode

5. Desenvolvimento dos componentes

Feito! Agora todos os modelos de nossa pipeline já foram criados. Hora de começar a desenvolver o Extractor, Transformer e Loader, com todo o código utilizando conceitos do paradigma funcional.

5.1. Extração de dados

O Extractor, como dito anteriormente, irá consistir em um módulo a parte com um objeto singleton localizado no pacote io.scraper.netflix.service. Irá ser composto por dois métodos, sendo eles:

  • getCountries: retorna a página HTML em formato String contendo todos os países disponíveis na Netflix
  • getMostPopularFilmsFrom: retorna a página HTML em formato String do país especificado com os top 10 filmes mais assistidos da semana
package io.scraper.netflix.service

import sttp.client4.quick.*
import sttp.model.Uri

import scala.util.{Failure, Success, Try}

object Extractor:
  def getMostPopularFilmsFrom(url: Uri): Try[String] =
    val response   = quickRequest.get(url).send()
    val statusCode = response.code.toString.toInt

    if statusCode == 200 then Success(response.body)
    else Failure(new Exception(s"Failed to access $url, status $statusCode"))

  def getCountries: Try[String] =
    val url        = uri"https://www.netflix.com/tudum/top10/united-states"
    val response   = quickRequest.get(url).send()
    val statusCode = response.code.toString.toInt

    if statusCode == 200 then Success(response.body)
    else Failure(new Exception(s"Failed to get countries from $url, status $statusCode"))

Enter fullscreen mode Exit fullscreen mode

Como pôde observar, esses dois métodos retorna uma instância da classe Try[String]. Caso o status do retorno da solicitação HTTP seja igual a 200, retorna uma instância da classe Success com o conteúdo do body. Caso contrário, retorna uma instância da classe Failure com uma exception genérica dizendo que não foi possível processar a solicitação, e o código do erro.

5.2. Transformação de dados

Já o Transformer consiste em um módulo a parte com um objeto singleton localizado no pacote io.scraper.netflix.service. Será composto por cinco métodos, sendo eles:

  • removeIllegalTagsOf: recebe como argumento a página HTML dos filmes mais assistidos da semana de um determinado país no formato String e retorna uma String da página HTML sem duas tags HTML que não fazem ser possível a conversão da String em um objeto do tipo Elem da biblioteca Scala XML.
  • loadXmlFrom: recebe como argumento uma página HTML no formato String e retorna a página HTML estruturada no formato XML com o tipo Elem da biblioteca Scala XML.
  • getFilmsFromTable: recebe como argumento a tabela XML do tipo NodeSeq contendo os filmes mais assistidos da semana e retorna uma instância da classe Try[Seq[Film]] com os dados estruturados em uma sequência de instâncias da classe Film.
  • getCountries: recebe como argumento a página HTML de todos os países disponíveis na Netflix estruturada no formato XML do tipo Elem e retorna uma instância da classe Try[Set[(String, String)]] contendo um Set de tuplas a qual é composta pelo nome do país e a URL direcionando para a página de filmes mais assistidos da semana daquele país, respectivamente.
  • getDataFrom: um método privado que recebe como argumento a tabela de filmes mais assistidos da semana do tipo NodeSeq e retorna um objeto do tipo Seq[Seq[String]], sendo a sequência interna uma sequência de três elementos contendo o rank do filme, o título do filme e a quantidade de semanas que está no top 10, respectivamente.
package io.scraper.netflix.service

import io.scraper.netflix.model.Film

import scala.util.Try
import scala.xml.*

object Transformer:
  def removeIllegalTagsOf(xmlContent: String): Try[String] = Try(
    xmlContent
      .replace("<!DOCTYPE html>", "")
      .replace("<br>", ""),
  )

  def loadXmlFrom(xmlString: String): Try[Elem] =
    Try(XML.loadString(xmlString))

  def getFilmsFromTable(table: NodeSeq): Try[Seq[Film]] = Try(
    getDataFrom(table).map {
      case List(rank: String, title: String, weeksInTop10: String) => new Film(rank.toInt, title, weeksInTop10.toInt)
      case _ => throw new IllegalArgumentException("Is not matchable with class List(String, String, String)")
    },
  )

  def getCountries(content: Elem): Try[Set[(String, String)]] = Try(
    (content \\ "a")
      .filter(t => (t \ "@class").text.contains("country-pill") && !t.text.startsWith("#"))
      .map(a => (a.text, (a \ "@href").text))
      .toSet,
  )

  private def getDataFrom(table: NodeSeq): Seq[Seq[String]] =
    table.map(r => (r \ "td").map(c => c.text))

Enter fullscreen mode Exit fullscreen mode

5.3. Carregamento de dados

Por fim, o Loader consiste em um módulo a parte com um objeto singleton localizado no pacote io.scraper.netflix.service. Será composto por um único método, sendo ele:

  • saveTop10FilmsByCountryAsJson: recebe dois argumentos, sendo o primeiro um objeto do tipo String | os.Path com o caminho completo onde os dados no formato json serão salvos, e o segundo um objeto do tipo Seq[Top10FilmsByCountry] com os dados estruturados para serem salvos.
package io.scraper.netflix.service

import io.scraper.netflix.model.Top10FilmsByCountry

import upickle.default.*

import scala.util.Try

object Loader:
  def saveTop10FilmsByCountryAsJson(
    filePath: String | os.Path,
    top10FilmsByCountry: Seq[Top10FilmsByCountry],
  ): Try[Unit] = filePath match
    case s: String  => Try(os.write.over(os.Path(s), write(top10FilmsByCountry, indent = 2)))
    case p: os.Path => Try(os.write.over(p, write(top10FilmsByCountry, indent = 2)))

Enter fullscreen mode Exit fullscreen mode

5.4. Core

Agora chegamos no desenvolvimento principal de nossa pipeline de dados. Hora de colocar todos esses métodos que foram desenvolvidos para trabalhar!

Antes de tudo, pricisamos fazer todos os imports necessários

// src/Scraper.sc
import java.time.LocalDate

import io.scraper.netflix.model.*
import io.scraper.netflix.service.*

import scribe.format.*
import sttp.model.Uri
import sttp.model.Uri.UriContext
import upickle.default.*

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}
import scala.xml.*

Enter fullscreen mode Exit fullscreen mode

Depois disso, definimos todas as constantes antes do código em si, para que fique mais legível e fácil de localizar:

// default variables
val baseUrl      = "https://www.netflix.com"
val jsonFilePath = os.pwd / "top10_films_of_netflix_by_country.json"
val customFormat = formatter"$date $level   $messages$newLine"
val weekOf       = LocalDate.now()

Enter fullscreen mode Exit fullscreen mode
  • baseUrl: a URL principal da Netflix. Com base nessa URL que vamos concatenar a URL da página de cada um dos países com os filmes mais assistidos da semana, visto que essa URL é parcial, não completa.
  • jsonFilePath: o caminho onde os dados no formato json serão salvos.
  • customFormat: um formatador customizado da biblioteca Scribe de nossos logs. O log será composto da data e hora, o nível do log e a mensagem do log.
  • weekOf: a data em que a extração está sendo executada. Posteriormente essa constante será utilizada como atributo de nosso modelo Top10FilmsByCountry.

Feito isso, precisamos configurar a biblioteca Scribe com o formatador de logs customizado que acabamos de criar:

// main
scribe.Logger.root
  .clearHandlers()
  .clearModifiers()
  .withHandler(formatter = customFormat)
  .replace()

Enter fullscreen mode Exit fullscreen mode

A próxima etapa de nossa pipeline será realizar a extração de todos os países disponíveis na Netflix. Como foi visto, todos os métodos do Extractor e Transformer retornam uma instância da classe Try[T]. Portanto, para que possamos manipular esses conteúdos retornados, vamos utilizar uma for comprehension, a qual por padrão chama o método .map da classe Try[T].

As etapas da extração dos países consiste em:

  1. Extração da página HTML de todos os países disponíveis na Netflix.
  2. Remoção das tags HTML que não permitem que convertemos diretamente a String em um objeto Elem da biblioteca Scala XML.
  3. Conversão da página HTML no formato String para uma estrutura XML do tipo Elem.
  4. Obtenção da lista dos nomes e das URLs dos países através da estrutura XML
  5. Retorno do Set de tuplas contendo o nome do país e a URL da página dos filmes mais assistidos da semana
  6. Verificação se todas essas operações foram bem-sucedidas. Caso não tenham, imprimir no log o erro que ocorreu e levantar uma exception, interrompendo a execução da pipeline.
scribe.info("Netflix Scraper started")
scribe.info("Extracting countries available at Netflix")

val extractedCountries = for
  response  <- Extractor.getCountries
  xmlString <- Transformer.removeIllegalTagsOf(response)
  content   <- Transformer.loadXmlFrom(xmlString)
  countries <- Transformer.getCountries(content)
yield countries

extractedCountries match
  case Success(r) => scribe.info(s"${r.size} countries were successfully extracted")
  case Failure(e) =>
    scribe.error("Failed to extract countries available at Netflix")
    throw e

Enter fullscreen mode Exit fullscreen mode

A próxima etapa é extrair os filmes mais assistidos da semana para cada um dos países disponíveis na Netflix.

  1. Desencapsula o Set de tuplas dos países da classe Try[Set[(String, String)]]. Depois, cria uma nova lista de tuplas contendo o nome do país e a URL completa para a página dos filmes mais assistidos da semana, concatenando a base URL com a URL parcial anteriormente obtida.
  2. Cria uma função lambda extractAndTransformFilmsFrom que recebe como argumento a URL da página de filmes mais assistidos da semana de um país e retorna uma instância da classe Future[Try[Seq[Film]]]. No corpo dessa função utilizamos outro for comprehension para encadear a pipeline de extração e transformação dos dados, sendo as etapas compostas por:
    1. Extrai a página HTML em formato String dos filmes mais assistidos da semana de acordo com a URL passada como argumento da função.
    2. Remove as tags que impedem que façamos a conversão da página HTML para uma estrutura XML.
    3. Converte a página HTML no formato String para uma estrutura XML.
    4. Obtém a tabela de filmes da estrutura XML, retornando uma nova estrutura XML.
    5. Obtém todos os filmes mais assistidos da semana da tabela XML e cria uma instância da classe Film para cada filme na tabela.
  3. Percorre a lista de países, criando uma nova lista de tuplas contendo o nome do país, a URL da página dos filmes mais assistidos da semana e uma instância da classe Future com a operação de extração e transformação dos dados.
  4. Verifica se a operação assíncrona da função lambda extractAndTransformFilmsFrom foi concluída. Depois disso, retorna uma tupla contendo o nome do país, a URL da página dos filmes mais assistidos da semana e uma instância da classe Try[Top10FilmsByCountry], com o resultado da operação.
  5. Percorre a lista de tuplas. Verifica se a operação de extração e transformação dos filmes mais assistidos de um determinado país foi bem-sucedida. Caso tenha sido, exibe o log com a mensagem que a extração dos filmes mais assistidos da semana foi bem-sucedida para aquele país. Caso não tenha sido, imprime o log com a mensagem dizendo que não foi possível fazer a extração dos filmes daquele determinado país, e o erro da operação.
  6. Agrupa os dados por Success e Failure.
  7. Faz a contagem de quantos países foi possível extrair os filmes mais assistidos da semana e quantos países que a operação de extração falhou.

Mãos a obra! São muitas etapas, eu sei, mas veja que o código é bem simples.

val countries =
  extractedCountries.get.toList.map((countryName, url) => (countryName, baseUrl + url))

scribe.info("Starting to extract most popular films for each countries")

val extractAndTransformFilmsFrom: Uri => Future[Try[Seq[Film]]] = url =>
  Future {
    for
      response  <- Extractor.getMostPopularFilmsFrom(url)
      xmlString <- Transformer.removeIllegalTagsOf(response)
      content   <- Transformer.loadXmlFrom(xmlString)
      filmTable <- Try(content \\ "tbody" \ "tr")
      films     <- Transformer.getFilmsFromTable(filmTable)
    yield films
  }

val top10FilmsByCountries = countries
  .map((countryName, url) => (countryName, url, extractAndTransformFilmsFrom(uri"$url")))
  .map { (countryName, url, future) =>
    val films               = Await.result(future, 3.seconds)
    val top10FilmsByCountry =
      for films <- films
      yield Top10FilmsByCountry(countryName, url, weekOf, films)

    (countryName, url, top10FilmsByCountry)
  }

val results = top10FilmsByCountries.map { (countryName, url, top10FilmsByCountry) =>
  top10FilmsByCountry match
    case Success(r) =>
      scribe.info(s"Succeed to extract top10 films of $countryName: $url")
      Success(r)
    case Failure(e) =>
      scribe.error(s"Failed to extract top10 films of $countryName. Reason: $e")
      Failure(e)
}.groupBy {
  case Success(r) => "success"
  case Failure(e) => "failure"
}

val (succeed, failed) = (results("success").length, results("failure").length)

scribe.info(s"Extracted top10 films from $succeed countries with success and $failed with failure")

Enter fullscreen mode Exit fullscreen mode

E, por fim:

  1. Verificamos se houve ao menos uma extração bem-sucedida. Caso não haja nenhuma operação bem-sucedida, imprime o log com a mensagem de erro e levanta uma exception, interrompendo a execução da pipeline.
  2. Salva os dados no diretório local no formato json.
  3. Verifica se houve algum erro durante o salvamento dos dados. Caso tenha tido algum erro, levanta uma Exception e interrompe a execução da pipeline. Caso não tenha tido nenhum problema, imprime o log com a operação bem-sucedida e finaliza a pipeline com sucesso.
scribe.info(s"Saving json data at $jsonFilePath")

if succeed == 0 then throw new NoSuchElementException("Zero extraction was succeed. Nothing to save")
val top10FilmsByCountryList = results("success").map(_.get).sortBy(_.countryName)

Loader.saveTop10FilmsByCountryAsJson(jsonFilePath, top10FilmsByCountryList) match
  case Success(r) => scribe.info(s"Saved at $jsonFilePath")
  case Failure(e) => throw e

scribe.info("Netflix Scraper successfully finished")

Enter fullscreen mode Exit fullscreen mode

Portanto, o código completo da nossa pipeline de dados ficou dessa forma:

//> using toolkit "latest"
//> using dep org.scala-lang.modules::scala-xml:2.2.0
//> using dep com.outr::scribe:3.12.2

import java.time.LocalDate

import io.scraper.netflix.model.*
import io.scraper.netflix.service.*

import scribe.format.*
import sttp.model.Uri
import sttp.model.Uri.UriContext
import upickle.default.*

import scala.concurrent.{Await, Future}
import scala.concurrent.duration.*
import scala.concurrent.ExecutionContext.Implicits.global
import scala.util.{Failure, Success, Try}
import scala.xml.*

// default variables
val baseUrl      = "https://www.netflix.com"
val jsonFilePath = os.pwd / "top10_films_of_netflix_by_country.json"
val customFormat = formatter"$date $level   $messages$newLine"
val weekOf       = LocalDate.now()

// main
scribe.Logger.root
  .clearHandlers()
  .clearModifiers()
  .withHandler(formatter = customFormat)
  .replace()

scribe.info("Netflix Scraper started")
scribe.info("Extracting countries available at Netflix")

val extractedCountries = for
  response  <- Extractor.getCountries
  xmlString <- Transformer.removeIllegalTagsOf(response)
  content   <- Transformer.loadXmlFrom(xmlString)
  countries <- Transformer.getCountries(content)
yield countries

extractedCountries match
  case Success(r) => scribe.info(s"${r.size} countries were successfully extracted")
  case Failure(e) =>
    scribe.error("Failed to extract countries available at Netflix")
    throw e

val countries =
  extractedCountries.get.toList.map((countryName, url) => (countryName, baseUrl + url))

scribe.info("Starting to extract most popular films for each countries")

val extractAndTransformFilmsFrom: Uri => Future[Try[Seq[Film]]] = url =>
  Future {
    for
      response  <- Extractor.getMostPopularFilmsFrom(url)
      xmlString <- Transformer.removeIllegalTagsOf(response)
      content   <- Transformer.loadXmlFrom(xmlString)
      filmTable <- Try(content \\ "tbody" \ "tr")
      films     <- Transformer.getFilmsFromTable(filmTable)
    yield films
  }

val top10FilmsByCountries = countries
  .map((countryName, url) => (countryName, url, extractAndTransformFilmsFrom(uri"$url")))
  .map { (countryName, url, future) =>
    val films               = Await.result(future, 3.seconds)
    val top10FilmsByCountry =
      for films <- films
      yield Top10FilmsByCountry(countryName, url, weekOf, films)

    (countryName, url, top10FilmsByCountry)
  }

val results = top10FilmsByCountries.map { (countryName, url, top10FilmsByCountry) =>
  top10FilmsByCountry match
    case Success(r) =>
      scribe.info(s"Succeed to extract top10 films of $countryName: $url")
      Success(r)
    case Failure(e) =>
      scribe.error(s"Failed to extract top10 films of $countryName. Reason: $e")
      Failure(e)
}.groupBy {
  case Success(r) => "success"
  case Failure(e) => "failure"
}

val (succeed, failed) = (results("success").length, results("failure").length)

scribe.info(s"Extracted top10 films from $succeed countries with success and $failed with failure")
scribe.info(s"Saving json data at $jsonFilePath")

if succeed == 0 then throw new NoSuchElementException("Zero extraction was succeed. Nothing to save")
val top10FilmsByCountryList = results("success").map(_.get).sortBy(_.countryName)

Loader.saveTop10FilmsByCountryAsJson(jsonFilePath, top10FilmsByCountryList) match
  case Success(r) => scribe.info(s"Saved at $jsonFilePath")
  case Failure(e) => throw e

scribe.info("Netflix Scraper successfully finished")

Enter fullscreen mode Exit fullscreen mode

6. Execução da pipeline e análise dos resultados

Voilà! Pipeline completamente desenvolvida! Vamos executá-la e, além disso, medir o tempo de execução.

Para executar nossa pipeline, apenas executamos o comando scala-cli . na raís do projeto. Porém, como também queremos medir o tempo de execução, vamos acrescentar o comando time, que no fim da execução nos diz quanto tempo um determinado comando levou para ser executado.

O output da execução será exibido parcialmente por questão de brevidade.

$ time scala-cli .
2023.11.26 22:53:05 INFO   Netflix Scraper started
2023.11.26 22:53:05 INFO   Extracting countries available at Netflix
2023.11.26 22:53:07 INFO   89 countries were successfully extracted
2023.11.26 22:53:07 INFO   Starting to extract most popular films for each countries
2023.11.26 22:53:08 INFO   Succeed to extract top10 films of Uruguay: https://www.netflix.com/tudum/top10/uruguay?week=2023-11-19
2023.11.26 22:53:08 INFO   Succeed to extract top10 films of Canada: https://www.netflix.com/tudum/top10/canada?week=2023-11-19
2023.11.26 22:53:08 ERROR   Failed to extract top10 films of Bolivia. Reason: java.lang.NumberFormatException: For input string: "42❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚❚"
2023.11.26 22:53:08 INFO   Succeed to extract top10 films of Greece: https://www.netflix.com/tudum/top10/greece?week=2023-11-19
...
2023.11.26 22:53:08 INFO   Succeed to extract top10 films of Finland: https://www.netflix.com/tudum/top10/finland?week=2023-11-19
2023.11.26 22:53:08 INFO   Extracted top10 films from 84 countries with success and 5 with failure
2023.11.26 22:53:08 INFO   Saving json data at /home/geazi/repos/netflix-scraper/top10_films_of_netflix_by_country.json
2023.11.26 22:53:08 INFO   Saved at /home/geazi/repos/netflix-scraper/top10_films_of_netflix_by_country.json
2023.11.26 22:53:08 INFO   Netflix Scraper successfully finished
real    0m5.113s
user    0m7.967s
sys     0m0.816s
Enter fullscreen mode Exit fullscreen mode

Pipeline finalizada com sucesso! Tivemos 84 países com a extração bem-sucedida e 5 com falha, em um tempo de execução de 5 segundos.

Vamos dar uma olhada nos dados coletados? Novamente, o output será exibido parcialmente por questão de brevidade.

$ cat top10_films_of_netflix_by_country.json
[
  {
    "countryName": "Argentina",
    "url": "https://www.netflix.com/tudum/top10/argentina?week=2023-11-19",
    "weekOf": "2023-11-26",
    "films": [
      {
        "rank": 1,
        "title": "The Killer",
        "weeksInTop10": 2
      },
      {
        "rank": 2,
        "title": "Mamá se fue de viaje",
        "weeksInTop10": 2
      },
      {
        "rank": 3,
        "title": "Monster Hunter",
        "weeksInTop10": 1
      },
    ...
  {
    "countryName": "Brazil",
    "url": "https://www.netflix.com/tudum/top10/brazil?week=2023-11-19",
    "weekOf": "2023-11-26",
    "films": [
      {
        "rank": 1,
        "title": "Monster Hunter",
        "weeksInTop10": 1
      },
      {
        "rank": 2,
        "title": "The Killer",
        "weeksInTop10": 2
      },
      {
        "rank": 3,
        "title": "Best. Christmas. Ever!",
        "weeksInTop10": 1
      },
    ...
    "countryName": "Italy",
    "url": "https://www.netflix.com/tudum/top10/italy?week=2023-11-19",
    "weekOf": "2023-11-26",
    "films": [
      {
        "rank": 1,
        "title": "The Killer",
        "weeksInTop10": 2
      },
      {
        "rank": 2,
        "title": "Best. Christmas. Ever!",
        "weeksInTop10": 1
      },
      {
        "rank": 3,
        "title": "10 giorni con Babbo Natale",
        "weeksInTop10": 1
      },
    ...
Enter fullscreen mode Exit fullscreen mode

7. Considerações finais

Chegamos ao fim desse artigo, caro leitor! Agradeço muito por ter me acompanhado até aqui 💚.

Nesse artigo aprendemos como fazer a extração dos top 10 filmes mais assistidos da semana para cada um dos países disponíveis na Netflix por meio de webscraping com a incrível linguagem Scala. Trabalhamos alguns conceitos do paradigma da programação funcional; utilizamos o Scala CLI para desenvolver um projeto minimalista de uma forma rápida e simples; utilizamos várias bibliotecas para realizar solicitações HTTP, fazer manipulações de json e XML; e fazer o logging de nossa pipeline com Scribe.

Por favor, não esqueça de reagir a esse artigo e deixar um comentário, de modo a ganhar relevância no Dev Community e para que mais pessoas tenham acesso ao conteúdo.

Até a próxima!

Top comments (2)

Collapse
 
raulferreirasilva profile image
Raul Ferreira

Cara que artigo fenomenal, não conhecia a linguagem Scale achei ela bem intrigante e diferente de tudo que tenho contato (sou dev front-end), mas deu vontade de tentar replicar seu código só por curiosidade e praticar uma linguagem nova KKKK muito obrigado por compartilhar seu conhecimento 🦤.

Collapse
 
geazi_anc profile image
Geazi Anc

Opa, amigo, eu que lhe agradeço pela leitura! Um forte abraço!