Tag Archives: Azure Databricks

Mount/Unmount SASURL with Databricks File System

When we develop data analytics solution, data preparation and data load are the steps that we cannot skip. Azure Databricks supports both native file system Databricks File System (DBFS) and external storage. For external storage, we can access directly or mount it into Databricks File System. This article explains how to mount and unmount blog storage into DBFS.

The code from Azure Databricks official document.

#  Mount an Azure Blob storage container
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/<mount-name>",
  extra_configs = {"<conf-key>":dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")})
# Unmount a mount point

Normally in our data pipeline, we have the logic like this: 1) Check if the path is mounted or not. 2) If it is not mounted yet, mount the path. 3) If it is already mounted, either ignore the mount logic use the existing mounting point, or unmount it and mounting it again.

def mount_blob_storage_from_sas(dbutils, storage_account_name, container_name, mount_path, sas_token, unmount_if_exists = True):
  if([item.mountPoint for item in dbutils.fs.mounts()].count(mount_path) > 0):
    if unmount_if_exists:
        print('Mount point already taken - unmounting: '+mount_path)
        print('Mount point already taken - ignoring: '+mount_path)
  print('Mounting external storage in: '+mount_path)
    source = "wasbs://{0}@{1}.blob.core.windows.net".format(container_name, storage_account_name),
    mount_point = mount_path,
    extra_configs = {"fs.azure.sas.{0}.{1}.blob.core.windows.net".format(container_name, storage_account_name): sas_token }) 

When blob storage is shared using SASURL instead of blob details information, we can parse the blob information from SASURL as below:

def get_detail_info_from_url(str):
  array_1=str.split('//', 1)
  array_2=array_1[1].split('.', 2)
  storageaccoutname = array_2[0]
  array_3=array_2[2].split('/', 1)
  array_4=array_3[1].split('?', 1)
  sas='?' + array_4[1]
  array_5=array_4[0].split('/', 1)
  return (storageaccoutname, contianer, sas)
sas_url = dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")
storage_account_name, container_name, sas_token = get_detail_info_from_url(sas_url)
mount_path = "/mnt/path1"
mount_blob_storage_form_sas(dbutils, storage_account_name, container_name, mount_path, sas_token, True)

We can integrate our Databricks tasks into Azure Data Factory with other activities to build one end to end data pipeline. Suggest that this mount/unmounting activity is designed as one prerequisite step for other notebooks tasks, see one example diagram in Azure Data Factory:

Recommender System with Azure Databricks

Recommender system are among the most well known, widely used and highest-value use cases for applying machine learning. This article describes how to build a movie recommender model based on the MovieLens dataset with Azure Databricks and other services in Azure platform.

There are quite many frameworks or tools that can be used for recommender system, e.g. Apache Spark ML or Mllib; Surprise;Tensorflow. Azure Databricks supports these popular frameworks. First we will start from Apache Spark framework to see how to build one basic recommender model.

Data Loading and Processing

Let’s use the MovieLens dataset to build a movie recommender using collaborative filtering with Spark’s Alternating Least Saqures implementation.  Download the data from the MovieLens dataset. Upload the data file into DBFS or Azure blob storage. If you store the data in blob storage, we need mount the blob container path in DBFS path.

datasets_path = "dbfs:/mnt/dbscontainer/MovieLens/Latest/"
small_ratings_file = "dbfs:/mnt/dbscontainer/MovieLens/Latest/ml-latest-small/ratings.csv"
complete_ratings_file = "dbfs:/mnt/dbscontainer/MovieLens/Latest/ml-latest/ratings.csv"

Please note that there are two datasets: small and full. Here we just use the full dataset, which contains 24404096 ratings and 668953 tag applications across 40110 movies. These data were created by 259137 users between January 09, 1995 and October 17, 2016. This dataset was generated on October 18, 2016. Let’s start loading the ratings data and splitting into training/test.

COL_USER = "userId"
COL_ITEM = "movieId"
COL_RATING = "rating"
COL_PREDICTION = "prediction"
COL_TIMESTAMP = "timestamp"

ratingSchema = StructType(
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType())

full_rating_raw_df = spark.read.option("sep", ",").option("header", "true").schema(ratingSchema).csv(complete_ratings_file)
training, test = full_rating_raw_df.randomSplit([0.8, 0.2], seed=0)
print("training data count: %s" % training.count())
print("test data count: %s" % test.count())

Collaborative Filtering

Collaborative filtering is commonly used for recommender systems. In Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B’s opinion on a different issue x than to have the opinion on x of a user chosen randomly.

There are two types of library in Spark library for Collaborative Filtering: spark.ml and spark.mllib. The major difference is that spark.ml is DataFrame-based API, while spark.mllib is RDD-based API. Here we will use spark.ml for ALS model, as RDD-based API is now in maintenance mode.

The implementation in spark.ml has the following parameters:

  • numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
  • rank is the number of latent factors in the model (defaults to 10).
  • maxIter is the maximum number of iterations to run (defaults to 10).
  • regParam specifies the regularization parameter in ALS (defaults to 1.0).
  • implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
  • alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
  • nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).
# Use the complete dataset to build the final model
# Note that parameters rank, maxIter,regParam...are hyper parameters.

import time
header = {
    "userCol": "userId",
    "itemCol": "movieId",
    "ratingCol": "rating",

als = ALS(
start_time = time.time()
model = als.fit(training)
train_time = time.time() - start_time
print("Took {} seconds for training.".format(train_time))
# Evaluate the model by computing the RMSE on the test data

from pyspark.ml.evaluation import RegressionEvaluator
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Making Recommendations

We now have our recommender model ready, we can give it a try providing some movie recommendations. 

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = full_rating_raw_df.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = full_rating_raw_df.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

Persisting the Model

Sometimes we might want to persist the base model for later use in our on-line recommendations. Although a new model is updated every time we have new user ratings, it might be worth storing the current one, in order to save time when starting up the server.

# Save and load model

from pyspark.ml.recommendation import *
model_path = "dbfs:/mnt/dbscontainer/MovieLens/Latest/models/movie_lens_als"
sameModel = ALSModel.load(model_path)

Feature Engineering with PySpark in Azure Databricks

Azure Databricks is a very cool easy to use platform for both analytics engineers and machine learning developers. I would like to use this post to summarize basic APIs and tricks in feature engineering with Azure Databricks. Previously I was using Jupyter notebook or PyCharm to develop or practice some machine learning cases. Compared to Jupyter Notebook, Azure Databricks provides similar Notebook, which adds some additional features. In the future I will describe how to use Azure Databricks for production.

Feature engineering is the preprocessing phase in machine learning, also needs huge effort from developers to get data ready for modeling and training. Here I list some basic feature engineering scenarios with PySpark in Azure Databricks.

If you want to run code snippet below in normal Jupyter Notebook, you need add Spark initialization code as below. (Not necessary in Azure Databricks, as it is already for use.)

spark = SparkSession\

Continuous Data

from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
(0, 1.1),
(1, 8.5),
(2, 5.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Assign buckets per the splits boundary
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
df = spark.createDataFrame(data, ["id", "hour"])
df = df.repartition(1)

# Divide into 3 buckets as quantile distribution
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
discretizerModel = discretizer.fit(df)
result = discretizerModel.transform(df)
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Calculate Max value model
scalerModel = scaler.fit(dataFrame)

# Transform with scale model, so that values are scaled between [-1.0, 1.0]
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
#Standard scaler
from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)

# Calculate mean and variance
scalerModel = scaler.fit(dataFrame)

# Standardize
scaledData = scalerModel.transform(dataFrame)
#Polynomial expansion
from pyspark.ml.feature import PolynomialExpansion

df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)


Discrete Data

# StringIndexer and One-hot encoder
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)


#Use Stop Words
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
# Tokenizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
# Count Vectorizer
from pyspark.ml.feature import CountVectorizer

df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])

cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
# NGram
from pyspark.ml.feature import NGram

#Hanmeimei loves LiLei
#LiLei loves Hanmeimei

wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)


# SQL Transformer
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])

sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
# R formula transform
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])

formula = RFormula(
formula="clicked ~ country + hour",

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()

Note that: There are many feature engineering APIs are not mentioned here. Please see more info from latest pyppark.ml.feature package.



Batch Data Ingest with Azure Databricks

In the previous blog we have introduced basic steps of data ingest of streaming data with Azure Databricks. Now we are going to describe how to do batch ingest with Azure Data Bricks. Most of the steps are similar, just the spark DataFrame API are different.

Data Source

  "your blob account key here.blob.core.windows.net",
  "your blob account key value here")
val df = spark.read.json("wasbs://yout container@your blob account.blob.core.windows.net/veracity_ai_prod_41f11b1f17bb43d49ba51beabf2dd0a9/Availability/2018-06-04/06")
val availabilitySchema = df.schema

Please note that we need to df.schema to get availability log’s schema, similar for other log formar, e.g. page views, event. It is very hard to program the schema manually.

Data Transform

Use spark.read to get the Data Frame from input data. Please pay attention to input file format. inputPath/2018-05-30/09 should be matched like (inputPath + “/*/*/”)

val availabilityDF = spark.read.schema(availabilitySchema)
.option("maxFilesTrigger", 1).json(inputPath + "/*/*/")

Use Spark SQL to do the transform, much easier than transform with Data Frame API.

val sqlDF = spark
.sql("SELECT internal.data.id as ID, availability[0].testTimestamp as timestamp, availability[0].testName as name, availability[0].runLocation as location, availability[0].message as message, availability[0].durationMetric.value as duration FROM logs")

Data Sink

Here we use Azure SQL Server as the persistence of transformed data.

First define the database configuration

val jdbcUsername = "Your SQL Server username"
val jdbcPassword = "Your SQL Server password"
val jdbcHostname = "Your SQL Server name.database.windows.net" //typically, this is in the form or servername.database.windows.net
val jdbcPort = 1433
val jdbcDatabase ="Availability"
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};user=${jdbcUsername};password=${jdbcPassword}"

Setup connection properties for JDBC

import java.util.Properties

val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)

Finally use write.jdbc api to persistence transformed data into Azure SQL Server, quite cool.

sqlDF.write.jdbc(jdbcUrl, "aiavailability3", connectionProperties)


Recently I am working on platform analytics solution for veracity.com, the open industry data platform from DNV GL. Data Ingest is one essential part for the whole data analytics end to end flow.

Application Insights(AI) logs are one of the key data source, hence we need build streaming analytics solution for such real time data source. Here we take availability logs data as example, you can also use other  AI data sources (PageViews….).

Export the data

There is a nice document for enabling the continues export from Application Insights.

Create a storage account

When you are creating it, I suggest to use “Standard-LRS“ with Cold tier. This is simply because we use storage account as a temporary storage place. (We will move into VERACITY container for storage in production)

Configure the export settings

In this case, we are exporting one type:

  • Availability (One way for in production monitoring data)

Once it is done and the telemetry data started exporting, you should be able to see folders in the blob container in storage account. One folder is mapping to one data type in above export setting.

Take a closer look at the folder content. Below screenshot shows the PageViews data between 12am to 1pm on 28th of Sept.

Please note that:

  • The date and time are UTC and are when the telemetry was deposited in the store – not the time it was generated.
  • The folders under Availability are stored in yyyy-mm-dd/hh/ structure.
  • Each blob is a text file that contains multiple ‘\n’-separated rows.
  • It contains the telemetry processed over a time period of roughly half a minute. It means that each folder will have about 100 to 120 files.
  • Each row represents a telemetry data point such as a request or page view.
  • Each row is an JSON document. The detailed data structure can be found at here.

With the nice Azure Storage Account Explorer, it is pretty easy to check the content of the blob file.

Please note that Application Insights also implemented the same logic as IP Anonymizationin Google Analytics. For all IP address that is collected by Application Insights, last octagon is anonymized to 0 (you can see the highlighted in above screenshot).

Read the data from blob

They are many ways to transfer the data out from storage account. Here we are going to use Azure Databricks.

Add one Azure Databricks service following here.

Add one cluster, choose the basic one following here.

Add one notebook:  named for example: Streaming Data Ingest

In order Azure Databricks can read data from blob storage, there are two ways: Databricks directly read blob storage through HDFS API; Or mount blob storage container into Databricks file system. See more information about how to access Blob Storage as here. We choose the second one.

Mount the storage folder into Databricks file system(DBFS), to mount a Blob storage container or a folder inside a container, use the following command:

  source = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>",
  mountPoint = "/mnt/export",
  extraConfigs = Map("<conf-key>" -> "<conf-value>"))

Then you can check the folder data with command:

%fs ls /mnt

Initialize SparkSession:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession

import spark.implicits._

In Spark 2.0+, we prefer use Structured Streaming(DataFrame /DataSet API) in, rather than Spark Core API, but when we see the Availability log data, it is XML like format, with several hierarchy. This is not easy to programming define the Structure type. The tricks here is that we first read one blob file to get Schema, then use this Schema when we create the structured streaming.

"<your key>")
val df = spark.read.json("wasbs://<your container>@<your-storage-account-name>.core.windows.net/veracity_ai_prod_41f11b1f17bb43d49ba51beabf2dd0a9/Availability/2018-06-04/06")
val availabilitySchema = df.schema

Using structured streaming here to access the files in blob folder, we are going to emulate a stream from them by reading one file at a time, in the chronological order they were created.

//Use file sink to store output data into blob storage again.
val inputPath = "/mnt/export"
val outputPath = "/mnt/output"
val streamingInput = spark.readStream.schema(availabilitySchema).option("maxFilesTrigger", 1).json(inputPath + "/*/*/")

Transform the data

Note that we do not need all availability data fields for analytics. Here we only pickup the fields that you need, as below (Needed fields are highlighted)

We use SQL to do the transform, then convert the DataFrame into DataSet, suggested if you need to do more analytics afterwards. In Structured Streaming, query is a handle to the streaming query that is running in the background.

val sqlDF = spark.sql("SELECT internal.data.id as ID, availability[0].testTimestamp as timestamp, availability[0].testName as name, availability[0].runLocation as location, availability[0].message as message, availability[0].durationMetric.value as duration FROM logs")
val sqlDS = sqlDF.as[(String, String, String, String, String, Double)]
val query2 = 
.option("path", outputPath)
.option("checkpointLocation", outputPath + "/checkpoint/")

Check the output folder in blob, to make sure that files are handled one by one continously (in micro-batch).

Note that: By the time writing this article, Structured Streaming does not support JDBC sink natively. Hence we need to extend the JDBCSink by ourselves.