Este projeto é um teste que fiz para verificar a performance do uso de banco de dados NoSQL de chave/valor como cache de busca de dados em comparação com um banco SQLite3 local. Os resultados em um ambiente local não distribuído me impressionaram devido a um recurso de cache do SQLite.
O projeto criado para testes foi um webservice Rest JSON com Python + Flask. O serviço possui um endpoint para encurtar uma URL e outro para acessar o link original de uma URL curta. O cache criado em questão é para a leitura da URL previamente encurtada. O servidor web pode responder de maneira muito mais eficiente se o banco de dados que armazena a URL de forma estruturada não precisar buscar em arquivos o dado toda a vez que é requisitado. O banco de dados de chave-valor Valkey, que é, basicamente, uma versão open source do Redis, foi utilizado para o cache em questão.
O código a seguir trata-se das configurações das rotas (endpoints, ou controllers) do servidor:
from flask import Flask, request
from business.link_shortener import generate_new_link, get_link_by_hash_string
app = Flask(__name__)
@app.route("/heart-beat")
def hello_world():
return {'OK': 'true'}
@app.route("/link", methods=['POST'])
def link():
data = request.get_json()
new_link = generate_new_link(data['url'])
return {'short_link': new_link}
@app.route("/link/<short_link>")
def get_link(short_link):
link = get_link_by_hash_string(short_link)
return {'link': link}
O core da aplicação de exemplo está no arquivo que se segue:
import sqlite3
import random
import string
import threading
import valkey
import json
# Thread-local storage for database connections
_thread_local = threading.local()
# Flag that activate the key value database cache
kv_cache = True
# Create a global connection pool for Valkey
kv_pool = valkey.ConnectionPool(
host='localhost',
port=6379,
db=0,
max_connections=50, # Match the number of concurrent threads
socket_keepalive=True,
socket_connect_timeout=5,
retry_on_timeout=True,
decode_responses=True # Automatically decode responses to strings
)
# Initialize database schema
conn = sqlite3.connect('database.db')
cur = conn.cursor()
cur.execute('''
CREATE TABLE IF NOT EXISTS links (
id INTEGER PRIMARY KEY,
link_hash TEXT,
link_target TEXT
)
''')
cur.execute('CREATE INDEX IF NOT EXISTS idx_links_id ON links (id)')
cur.execute('CREATE INDEX IF NOT EXISTS idx_links_link_hash ON links (link_hash)')
conn.commit()
conn.close()
def get_db_connection():
if not hasattr(_thread_local, 'conn'):
_thread_local.conn = sqlite3.connect('database.db', check_same_thread=False)
_thread_local.conn.execute('PRAGMA cache_size = 0') # Disable page cache
_thread_local.conn.execute('PRAGMA temp_store = 0') # Use disk for temp storage
return _thread_local.conn
def get_kv_db():
if not hasattr(_thread_local, 'kv_db'):
_thread_local.kv_db = valkey.Valkey(connection_pool=kv_pool)
return _thread_local.kv_db
def generate_hash():
return ''.join(random.choices(string.ascii_letters + string.digits, k=10))
def create(link):
conn = get_db_connection()
cur = conn.cursor()
link_hash = generate_hash()
cur.execute('INSERT INTO links (link_hash, link_target) VALUES (?, ?)', (link_hash, link))
conn.commit()
return link_hash
def get_link_by_hash(hash):
if kv_cache:
cached = get_kv_db().get(hash)
if cached:
return cached # Already decoded due to decode_responses=True
conn = get_db_connection()
cur = conn.cursor()
cur.execute('SELECT link_target FROM links WHERE link_hash = ?', (hash,))
result = cur.fetchone()
if result[0]:
if kv_cache:
get_kv_db().set(hash, result[0])
return result[0]
else:
return None
Na linha 48 se encontra uma variável que configura se o cache Redis será ativo ou não. Note que nas linhas 80 e 81 existem configurações que desabilitam o cache do próprio SQlite3.
Eu solicitei a um agente de I.A. que gerasse um teste de stress para este servidor. Com algumas modificações, ele ficou pronto para disparar requisições pesadas por meio de várias threads. Segue o script para tal ação:
import time
import requests
from concurrent.futures import ThreadPoolExecutor, as_completed
from statistics import mean, median
# Configuration
BASE_URL = "http://localhost:5000"
NUM_TASKS = 100
NUM_THREADS = 30
NUM_LINK_GETS = 80
def create_test_link():
try:
response = requests.post(f"{BASE_URL}/link", json={'url': 'https://example.com'})
if response.status_code == 200:
data = response.json()
return data.get('short_link')
except Exception as e:
print(f"Error creating test link: {e}")
return None
def test_link_retrieval():
start = time.time()
short_link = create_test_link()
results = []
for _ in range(NUM_LINK_GETS):
try:
response = requests.get(f"{BASE_URL}/link/{short_link}")
elapsed = time.time() - start
results.append({
'success': response.status_code == 200,
'time': elapsed,
'status_code': response.status_code
})
except Exception as e:
results.append({
'success': False,
'time': elapsed,
'error': str(e)
})
return results
def run_stress_test():
print(f"Total tasks: {NUM_TASKS}")
print(f"Concurrent threads: {NUM_THREADS}\n")
results = []
start_time = time.time()
with ThreadPoolExecutor(max_workers=NUM_THREADS) as executor:
futures = []
for _ in range(NUM_TASKS):
future = executor.submit(test_link_retrieval)
futures.append(future)
for i, future in enumerate(as_completed(futures), 1):
subresults = future.result()
results.extend(subresults)
if i % 100 == 0:
print(f"Completed: {i}/{NUM_TASKS}")
total_time = time.time() - start_time
# Analysis
successful = [r for r in results if r['success']]
failed = [r for r in results if not r['success']]
response_times = [r['time'] for r in successful]
print("\n" + "="*60)
print("STRESS TEST RESULTS")
print("="*60)
print(f"Total tasks: {NUM_TASKS}")
print(f"Successful: {len(successful)}")
print(f"Failed: {len(failed)}")
print(f"Total time: {total_time:.2f}s")
print(f"Tasks/second: {NUM_TASKS/total_time:.2f}")
if response_times:
print(f"\nResponse times:")
print(f" Min: {min(response_times)*1000:.2f}ms")
print(f" Max: {max(response_times)*1000:.2f}ms")
print(f" Mean: {mean(response_times)*1000:.2f}ms")
print(f" Median: {median(response_times)*1000:.2f}ms")
if failed:
print(f"\nErrors:")
for r in failed[:5]: # Show first 5 errors
print(f" - {r.get('error', 'Unknown error')}")
print("="*60)
run_stress_test()
Nos testes em ambiente local em um servidor único, não foram inseridas as configurações de desabilitar o cache de memória do SQLite3 e os resultados foram impressionantes! O banco de dados SQL respondia praticamente no mesmo tempo com ou sem o auxílio do Valkey (Redis).
Resultados com o SQLite3, SEM o cache do Valkey (Redis)
============================================================
STRESS TEST RESULTS
============================================================
Total tasks: 100
Successful: 8000
Failed: 0
Total time: 4.57s
Tasks/second: 21.89
Response times:
Min: 10.92ms
Max: 1534.86ms
Mean: 664.45ms
Median: 627.76ms
============================================================
Resultados com o SQLite3, COM o cache do Valkey (Redis)
============================================================
STRESS TEST RESULTS
============================================================
Total tasks: 100
Successful: 8000
Failed: 0
Total time: 5.24s
Tasks/second: 19.08
Response times:
Min: 25.95ms
Max: 1896.03ms
Mean: 783.95ms
Median: 747.06ms
============================================================
Mesmo levando-se em conta o overhead de rede para acessar o Valkey (Redis), a velocidade do SQLite ainda impressiona!
O banco de dados SQLite3 não é utilizado para aplicações em larga escala, por isso, o teste feito localmente não representa um ambiente real sem o recurso de cache do próprio banco. Mesmo desabilitando o cache do SQLite, o próprio sistema operacional faz cache do sistema de arquivos em memória. Estes recursos chamaram mais a minha atenção do que a própria Prova de Conceito do uso do Valkey para cache.
O código fonte completo pode ser encontrado em: memcache-poc
Top comments (1)
Muito boa análise, parabéns!