Monthly Archives: December 2019

Mount/Unmount SASURL with Databricks File System

When we develop data analytics solution, data preparation and data load are the steps that we cannot skip. Azure Databricks supports both native file system Databricks File System (DBFS) and external storage. For external storage, we can access directly or mount it into Databricks File System. This article explains how to mount and unmount blog storage into DBFS.

The code from Azure Databricks official document.

#  Mount an Azure Blob storage container
dbutils.fs.mount(
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/<mount-name>",
  extra_configs = {"<conf-key>":dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")})
# Unmount a mount point
dbutils.fs.unmount("/mnt/<mount-name>")

Normally in our data pipeline, we have the logic like this: 1) Check if the path is mounted or not. 2) If it is not mounted yet, mount the path. 3) If it is already mounted, either ignore the mount logic use the existing mounting point, or unmount it and mounting it again.

def mount_blob_storage_from_sas(dbutils, storage_account_name, container_name, mount_path, sas_token, unmount_if_exists = True):
  if([item.mountPoint for item in dbutils.fs.mounts()].count(mount_path) > 0):
    if unmount_if_exists:
        print('Mount point already taken - unmounting: '+mount_path)
        dbutils.fs.unmount(mount_path)
    else:
        print('Mount point already taken - ignoring: '+mount_path)
        return
  print('Mounting external storage in: '+mount_path)
  dbutils.fs.mount(
    source = "wasbs://{0}@{1}.blob.core.windows.net".format(container_name, storage_account_name),
    mount_point = mount_path,
    extra_configs = {"fs.azure.sas.{0}.{1}.blob.core.windows.net".format(container_name, storage_account_name): sas_token }) 

When blob storage is shared using SASURL instead of blob details information, we can parse the blob information from SASURL as below:

def get_detail_info_from_url(str):
  array_1=str.split('//', 1)
  array_2=array_1[1].split('.', 2)
  storageaccoutname = array_2[0]
  type=array_2[1]
  array_3=array_2[2].split('/', 1)
  array_4=array_3[1].split('?', 1)
  sas='?' + array_4[1]
  array_5=array_4[0].split('/', 1)
  contianer=array_5[0]
  return (storageaccoutname, contianer, sas)
sas_url = dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")
storage_account_name, container_name, sas_token = get_detail_info_from_url(sas_url)
mount_path = "/mnt/path1"
mount_blob_storage_form_sas(dbutils, storage_account_name, container_name, mount_path, sas_token, True)

We can integrate our Databricks tasks into Azure Data Factory with other activities to build one end to end data pipeline. Suggest that this mount/unmounting activity is designed as one prerequisite step for other notebooks tasks, see one example diagram in Azure Data Factory:

Recommender System with Azure Databricks

Recommender system are among the most well known, widely used and highest-value use cases for applying machine learning. This article describes how to build a movie recommender model based on the MovieLens dataset with Azure Databricks and other services in Azure platform.

There are quite many frameworks or tools that can be used for recommender system, e.g. Apache Spark ML or Mllib; Surprise;Tensorflow. Azure Databricks supports these popular frameworks. First we will start from Apache Spark framework to see how to build one basic recommender model.

Data Loading and Processing

Let’s use the MovieLens dataset to build a movie recommender using collaborative filtering with Spark’s Alternating Least Saqures implementation.  Download the data from the MovieLens dataset. Upload the data file into DBFS or Azure blob storage. If you store the data in blob storage, we need mount the blob container path in DBFS path.

datasets_path = "dbfs:/mnt/dbscontainer/MovieLens/Latest/"
small_ratings_file = "dbfs:/mnt/dbscontainer/MovieLens/Latest/ml-latest-small/ratings.csv"
complete_ratings_file = "dbfs:/mnt/dbscontainer/MovieLens/Latest/ml-latest/ratings.csv"

Please note that there are two datasets: small and full. Here we just use the full dataset, which contains 24404096 ratings and 668953 tag applications across 40110 movies. These data were created by 259137 users between January 09, 1995 and October 17, 2016. This dataset was generated on October 18, 2016. Let’s start loading the ratings data and splitting into training/test.

COL_USER = "userId"
COL_ITEM = "movieId"
COL_RATING = "rating"
COL_PREDICTION = "prediction"
COL_TIMESTAMP = "timestamp"

ratingSchema = StructType(
    [
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType())
    ]
)

full_rating_raw_df = spark.read.option("sep", ",").option("header", "true").schema(ratingSchema).csv(complete_ratings_file)
display(full_rating_raw_df)
training, test = full_rating_raw_df.randomSplit([0.8, 0.2], seed=0)
print("training data count: %s" % training.count())
print("test data count: %s" % test.count())

Collaborative Filtering

Collaborative filtering is commonly used for recommender systems. In Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B’s opinion on a different issue x than to have the opinion on x of a user chosen randomly.

There are two types of library in Spark library for Collaborative Filtering: spark.ml and spark.mllib. The major difference is that spark.ml is DataFrame-based API, while spark.mllib is RDD-based API. Here we will use spark.ml for ALS model, as RDD-based API is now in maintenance mode.

The implementation in spark.ml has the following parameters:

  • numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
  • rank is the number of latent factors in the model (defaults to 10).
  • maxIter is the maximum number of iterations to run (defaults to 10).
  • regParam specifies the regularization parameter in ALS (defaults to 1.0).
  • implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
  • alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
  • nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).
# Use the complete dataset to build the final model
# Note that parameters rank, maxIter,regParam...are hyper parameters.

import time
header = {
    "userCol": "userId",
    "itemCol": "movieId",
    "ratingCol": "rating",
}

als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)
start_time = time.time()
model = als.fit(training)
train_time = time.time() - start_time
print("Took {} seconds for training.".format(train_time))
# Evaluate the model by computing the RMSE on the test data

from pyspark.ml.evaluation import RegressionEvaluator
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Making Recommendations

We now have our recommender model ready, we can give it a try providing some movie recommendations. 

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = full_rating_raw_df.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = full_rating_raw_df.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

Persisting the Model

Sometimes we might want to persist the base model for later use in our on-line recommendations. Although a new model is updated every time we have new user ratings, it might be worth storing the current one, in order to save time when starting up the server.

# Save and load model

from pyspark.ml.recommendation import *
model_path = "dbfs:/mnt/dbscontainer/MovieLens/Latest/models/movie_lens_als"
model.save(model_path)
sameModel = ALSModel.load(model_path)