DEV Community

loading...
AWS Community Builders

Generate Recommendations with Amazon SageMaker and Apache Spark

Salah Elhossiny
ML engineer || AWS Certified MLS || AWS Community Builders member || Fullstack developer
・2 min read

Amazon SageMaker supports serverless Apache Spark (both Python and Scala) through SageMaker Processing Jobs.

We will use Processing Jobs throughout the book to perform data quality checks and feature transformations.

However, in this section we will generate recommendations using SageMaker Processing Jobs with Apache Spark ML’s collaborative filtering algorithm called Alternating Least Squares (ALS).

We would use this algorithm if we already have a Spark-based data pipeline and want to generate recommendations using that pipeline.

Here is the train_spark.py file that generates recommendations with Apache Spark ML and ALS:


import pyspark
from pyspark.sql import SparkSession
from pyspark.sql.functions import *
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.recommendation import ALS
from pyspark.sql import Row

def main():

lines = spark.read.text(s3_input_data).rdd
parts = lines.map(lambda row: row.value.split("::"))

ratingsRDD = parts.map(lambda p: Row(userId=int(p[0]),
    movieId=int(p[1]),
    rating=float(p[2]),
    timestamp=int(p[3])))

ratings = spark.createDataFrame(ratingsRDD)

(training, test) = ratings.randomSplit([0.8, 0.2])

# Build recommendation model using ALS on the training data

als = ALS(maxIter=5, regParam=0.01, userCol="userId",
itemCol="itemId",ratingCol="rating", coldStartStrategy="drop")

model = als.fit(training)

# Evaluate model by computing the RMSE on the test data

predictions = model.transform(test)

evaluator = RegressionEvaluator(
    metricName="rmse",
    labelCol="rating",
    predictionCol="prediction")

rmse = evaluator.evaluate(predictions)

# Generate top 10 recommendations for each user
userRecs = model.recommendForAllUsers(10)
userRecs.show()

# Write top 10 recommendations for each user

userRecs.repartition(1).write.mode("overwrite").option("header",True).option("delimiter","\t")
.csv(f{s3_output_data}/recommendations")


Enter fullscreen mode Exit fullscreen mode

Now let’s launch the PySpark script within a serverless Apache Spark environment running as a SageMaker
Processing Job:


from sagemaker.spark.processing import PySparkProcessor
from sagemaker.processing import ProcessingOutput


processor = PySparkProcessor(base_job_name='spark-als',
role=role,
instance_count=1,
instance_type='ml.r5.2xlarge',
max_runtime_in_seconds=1200)

processor.run(submit_app='train_spark_als.py',
arguments=['s3_input_data', s3_input_data,
's3_output_data', s3_output_data,
],
logs=True,
wait=False
)



Enter fullscreen mode Exit fullscreen mode

The output shows a user ID and list of recommendations [itemID, rank] sorted by rank from most recommended to least recommended:


|userId| recommendations|
+------+--------------------+
| 12 |  [[46, 6.146928], ... |
| 1  |  [[46, 4.963598], ... |
| 6  |  [[25, 4.5243497],... |

Enter fullscreen mode Exit fullscreen mode

References:

Book: Data Science on AWS

Discussion (0)