This is a submission for the Redis AI Challenge: Real-Time AI Innovators.
What I Built
The Challenge: Milliseconds Matter in E-commerce
Picture this: A user lands on your e-commerce site, browses through a few products, adds something to cart, then hesitates. In those crucial seconds, your recommendation engine needs to:
- Process their real-time behavior
- Analyze their browsing patterns
- Serve personalized recommendations
- Do all of this in under 100ms
Traditional recommendation systems using batch processing and SQL databases? They're great for Netflix (where you can wait a few seconds), but in e-commerce, every millisecond of delay costs conversions.
PickPerfect as the name suggests picks the perfect product to recommend to the user based the user's activity like viewing the product page, adding product to cart etc...
Tech Stack
- Frontend - vite, react, typescript, tailwind css
- Backend - fastapi
- Database - Redis
- AI engine - RedisAI
Demo
The above image shows geolocation based search for Featured Deals
and vector search for Recommended For You
The above image shows product info page.
The above shows vector search on the input query.
The above image shows range based search for numeric fields, exact match for tag fields with OR
operator.
The above image shows cart page. Whenever a product is added to the cart, it is tracked.
How I Used Redis 8?
Redis is used in the application almost everywhere. As a real time data layer and as well as permanent storage database. The individual steps where it's is as follows:
Products Indexing
The textual part of the part i.e title, description etc. are concatenated together to form a string. This new string is sent to openAI for embedding which forms the feature vector of the product. This feature vector is indexed in redis full text search document.
redis_client.execute_command(
f"""
FT.CREATE {SEARCH_INDEX_NAME} ON JSON PREFIX 1 product: SCHEMA
$.name AS name TEXT
$.description AS description TEXT
$.brand AS brand TAG
$.price AS price NUMERIC
$.rating AS rating NUMERIC
$.reviews AS reviews NUMERIC
$.category AS category TAG
$.inStock AS inStock TAG
$.features[*] AS features TEXT
$.warehouse_geolocation AS warehouse_location GEO
$.image AS image TEXT NOINDEX
$.embedding AS embedding VECTOR FLAT 6 TYPE FLOAT32 DIM {EMBED_DIM} DISTANCE_METRIC {VECTOR_ALGORITHM}
"""
)
Full Text Search
Products are searched via search filters based on the attributes of the products.
def multi_parameter_search(
self,
text_query: Optional[str] = None,
price_min: Optional[float] = None,
price_max: Optional[float] = None,
category: Optional[str] = None,
brand: Optional[str] = None,
rating: Optional[float] = None,
in_stock: Optional[bool] = None,
geo_location: Optional[Tuple[float, float]] = None,
geo_radius_km: float = 50,
) -> List[Product]:
"""Search products with multiple parameters."""
query_parts = []
query_params = {}
if text_query:
query_parts.append(f"(@name|@description|@brand|@features:{text_query})")
if price_min is not None or price_max is not None:
min_val = price_min if price_min is not None else "-inf"
max_val = price_max if price_max is not None else "+inf"
query_parts.append(f"@price:[{min_val} {max_val}]")
if category:
query_parts.append(f"@category:{category}")
if brand:
query_parts.append(f"@brand:{brand}")
if rating is not None:
query_parts.append(f"@rating:[{rating} 5]")
if in_stock is not None:
query_parts.append(f"@inStock:{str(in_stock).lower()}")
if geo_location and geo_radius_km:
lon, lat = geo_location
query_parts.append(f"@warehouse_location:[{lon} {lat} {geo_radius_km} km]")
base_query = " ".join(query_parts) if query_parts else "*"
query = Query(base_query).return_fields("id").dialect(2)
query = query.paging(0, 20)
try:
result = self.redis_client.ft(settings.search_index_name).search(
query, query_params=query_params
)
filtered_products = []
for doc in result.docs:
data = self.redis_client.json().get(getattr(doc, "id"))
if "embedding" in data:
del data["embedding"]
filtered_products.append(Product(**data))
return filtered_products
except Exception as e:
print(f"Multi-parameter search error: {e}")
return []
Vector search
Search queries are converted to embedding using openAI and top K nearest neighbors are searched using redis full text search.
def vector_search_embed(self, embedding: List[float], k: int = 10) -> List[Product]:
"""Search products using vector similarity."""
vector_bytes = np.array(embedding).astype(np.float32).tobytes()
base_query = f"*=>[KNN {k} @embedding $vec AS vector_score]"
query = (
Query(base_query)
.return_fields("id", "vector_score")
.sort_by("vector_score")
.paging(0, k)
)
params_dict = {"vec": vector_bytes}
results = self.redis_client.ft(settings.search_index_name).search(
query, query_params=params_dict
)
filtered_products = []
for doc in results.docs:
data = self.redis_client.json().get(getattr(doc, "id"))
if "embedding" in data:
del data["embedding"]
filtered_products.append(Product(**data))
return filtered_products
Semantic Cache
Redis Semantic cache module of langchain uses RedisVL internally for semantically matching keys. The code for langchain semantic cache can be found in langchain docs.
Geolocation based search
Products in warehouses near to the location of the user can be delivered faster and hence giving extra discount or newer deals becomes a crucial part. This is performed using full text search geo datatype. Reusing the code of full text search.
Personalized recommendation
Whenever an user visits the product page or adds a product to the cart, an event is stored at redis with userId, productId, timestamp. During product recommendation process a sliding window of latest 10 events are retrieved and the embedding vectors of the products are averaged out and again search a vector search is performed on the full text data structure. For personalized recommendations, the user has to be logged in.
As user interests keep changing, time decay method is used to give more importance to the latest events than the older ones.
def get_personalized_recommendations(self, user_email: str) -> List[Product]:
"""Get personalized recommendations for a user."""
user_events = self.redis_client.lrange(f"user_events:{user_email}", 0, 100)
if not user_events:
return product_service.get_trending_products(10)
product_scores = {}
for event_str in user_events:
if isinstance(event_str, bytes):
event_str = event_str.decode("utf-8")
event_data = json.loads(event_str)
product_id = event_data["product_id"]
event_type = event_data["event_type"]
timestamp = event_data["timestamp"]
time_decay = self.calculate_time_decay(timestamp)
base_score = self.event_weights.get(event_type, 1)
final_score = base_score * time_decay
product_scores[product_id] = product_scores.get(product_id, 0) + final_score
if not product_scores:
return product_service.get_trending_products(10)
# Create user preference vector
weighted_vectors = []
for product_id, score in product_scores.items():
embedding = self.redis_client.json().get(
f"product:{product_id}", "$.embedding"
)
if embedding:
embedding = np.array(embedding).astype(np.float32)
weighted_vectors.append(embedding * score)
if weighted_vectors:
user_preference_vector = np.mean(weighted_vectors, axis=0)
initial_limit = min(10 * 3, 100)
results = product_service.vector_search_embed(
user_preference_vector, initial_limit
)
recommendations = []
for res in results:
if res.id in product_scores:
continue
recommendations.append(res)
return recommendations[:10]
return []
Trending products and categories
Trending products and categories are found out as a result of user event tracking. Redis sorted sets are used here to store the products views.
Top comments (0)