DEV Community

Cover image for Enviando documentos via Bulk Api no ElasticSearch
Leandro Ferreira
Leandro Ferreira

Posted on • Updated on <time datetime="2021-04-13T15:28:00Z" class="date-no-year">Apr 13</time>

Enviando documentos via Bulk Api no ElasticSearch

Olá, já derrubaram algo em produção hoje?

Hoje eu vou falar sobre como importar documentos para o ElasticSearch utilizando a Bulk Api.

Para quem não conhece a Bulk Api serve para enviar ou remover documentos em massa no ElasticSearch, diminuindo assim a sobrecarga do sistema e melhorando a velocidade de indexação.

Agora falando sobre caso de uso, o meu é basicamente indexar arquivos de access logs que eu recebo da minha CDN na empresa em que trabalho, hoje os logs vem nesse formato:

{ "version": "v5", "time": "2020-09-30T21:25:04+00:00", "client": "0619h", "configuration": "1570140334", "host": "xxx.xxx.xxx", "request_time": "1.140", "request_method": "GET", "upstream_cache_status": "REVALIDATED", "status": "200", "proxy_status": "-", "upstream_status": "304", "upstream_bytes_received": "298", "scheme": "https", "request_uri": "/xxx", "sent_http_content_type": "image/svg+xml", "server_protocol": "HTTP/2.0", "request_length": "58", "bytes_sent": "1699", "upstream_connect_time": "0.140", "upstream_header_time": "1.139", "upstream_response_time": "1.139", "tcpinfo_rtt": "109355", "remote_addr": "000.000.000.000", "remote_port": "37040", "country": "Brazil", "state": "Rio de Janeiro", "ssl_protocol": "TLSv1.3", "ssl_cipher": "TLS_AES_256_GCM_SHA384", "http_user_agent": "Mozilla/5.0 (Linux; Android 9; Redmi Note 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.127 Mobile Safari/537.36", "http_referrer": "https://xxx.xxx.xxx/xxx.html", "requestPath": "/home1", "requestQuery": "", "headers": "-" }

{ "version": "v5", "time": "2020-09-30T21:25:04+00:00", "client": "0619h", "configuration": "1570140334", "host": "xxx.xxx.xxx", "request_time": "1.140", "request_method": "GET", "upstream_cache_status": "REVALIDATED", "status": "200", "proxy_status": "-", "upstream_status": "304", "upstream_bytes_received": "298", "scheme": "https", "request_uri": "/files/media/design_e_tecnologia.8e655520.svg", "sent_http_content_type": "image/svg+xml", "server_protocol": "HTTP/2.0", "request_length": "58", "bytes_sent": "1699", "upstream_connect_time": "0.140", "upstream_header_time": "1.139", "upstream_response_time": "1.139", "tcpinfo_rtt": "109355", "remote_addr": "000.000.000.000", "remote_port": "37040", "country": "Brazil", "state": "Rio de Janeiro", "ssl_protocol": "TLSv1.3", "ssl_cipher": "TLS_AES_256_GCM_SHA384", "http_user_agent": "Mozilla/5.0 (Linux; Android 9; Redmi Note 8) AppleWebKit/537.36 (KHTML, like Gecko) Chrome/85.0.4183.127 Mobile Safari/537.36", "http_referrer": "https://xxx.xxx.xxx/xxx.html", "requestPath": "/home", "requestQuery": "", "headers": "-" }
Enter fullscreen mode Exit fullscreen mode

Isto é, é um json por linha, todos os jsons estão "flat" em uma linha só, agora nós precisamos criar um cabeçalho para CADA LINHA DESSE JSON, isto mesmo, api de Bulk Import do ElasticSearch espera que o arquivo siga este formato:

{ "index" : { "_index" : "NOMEDOINDICE"}
{ "exemplo1": "arquivojson"...}
Enter fullscreen mode Exit fullscreen mode

Ou seja, a primeira linha fica o cabeçalho identificando qual índice você quer colocar este documento e na linha seguinte o documento json em questão.

Para facilitar a minha vida eu criei uma função em shell script que aplica essa tag em todas as linhas de um arquivo onde cada linha é um objeto json:

function insert_tag() {
    while IFS=$'\n' read -r p || [ -n "$p" ]
    do
      printf '{ "index" : { "_index" : "NOMEDOINDICE"}\n'
      printf '%s\n' "$p"
    done < "$1"
}
Enter fullscreen mode Exit fullscreen mode

Essa função recebe como parâmetro o nome do arquivo com os objetos json e joga em stdout o arquivo já processado e com os cabeçalhos.

$ insert_tag ex.json > ex_comtags.json
Enter fullscreen mode Exit fullscreen mode

Após inserir a “tag” vamos enviar isto para o ElasticSearch (Assumindo que seu elasticsearch esteja acessível em localhost:9200, caso seu endereço seja diferente altere no curl a seguir):

$ curl -s -H "Content-Type: application/x-ndjson" -XPOST "localhost:9200/_bulk" --data-binary @ex_comtags.json
Enter fullscreen mode Exit fullscreen mode

Devemos receber um json de retorno indicando se teve sucesso ou não ao inserir o documento:

    {
      "index": {
        "_index": "azion-01102020",
        "_type": "_doc",
        "_id": "T_PB5HQBQstMer5wsUf5",
        "_version": 1,
        "result": "created",
        "_shards": {
          "total": 1,
          "successful": 1,
          "failed": 0
        },
        "_seq_no": 1110061,
        "_primary_term": 1,
        "status": 201
      }
    }
Enter fullscreen mode Exit fullscreen mode

Preste atenção no objeto "_shards" e na propriedade successful (poderia ser status não é mesmo?) o valor deve ser 1, caso esteja diferente o retorno vai trazer o motivo pelo qual o seu documento não foi indexado.

Desta forma você consegue escrever scripts para automatizar a inserção de documentos sem sobrecarregar o seu servidor.

Caso tenham problemas ao fazer isso não hesitem em perguntar nos comentários, vou continuar a escrever bastante sobre o ElasticSearch então caso tenham dúvidas ou queiram saber algo é só mandar a sugestão.

Até mais,
Leandro Ferreira

Discussion (0)