Author Archives: yzb

Mount/Unmount SASURL with Databricks File System

When we develop data analytics solution, data preparation and data load are the steps that we cannot skip. Azure Databricks supports both native file system Databricks File System (DBFS) and external storage. For external storage, we can access directly or mount it into Databricks File System. This article explains how to mount and unmount blog storage into DBFS.

The code from Azure Databricks official document.

#  Mount an Azure Blob storage container
dbutils.fs.mount(
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/<mount-name>",
  extra_configs = {"<conf-key>":dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")})
# Unmount a mount point
dbutils.fs.unmount("/mnt/<mount-name>")

Normally in our data pipeline, we have the logic like this: 1) Check if the path is mounted or not. 2) If it is not mounted yet, mount the path. 3) If it is already mounted, either ignore the mount logic use the existing mounting point, or unmount it and mounting it again.

def mount_blob_storage_from_sas(dbutils, storage_account_name, container_name, mount_path, sas_token, unmount_if_exists = True):
  if([item.mountPoint for item in dbutils.fs.mounts()].count(mount_path) > 0):
    if unmount_if_exists:
        print('Mount point already taken - unmounting: '+mount_path)
        dbutils.fs.unmount(mount_path)
    else:
        print('Mount point already taken - ignoring: '+mount_path)
        return
  print('Mounting external storage in: '+mount_path)
  dbutils.fs.mount(
    source = "wasbs://{0}@{1}.blob.core.windows.net".format(container_name, storage_account_name),
    mount_point = mount_path,
    extra_configs = {"fs.azure.sas.{0}.{1}.blob.core.windows.net".format(container_name, storage_account_name): sas_token }) 

When blob storage is shared using SASURL instead of blob details information, we can parse the blob information from SASURL as below:

def get_detail_info_from_url(str):
  array_1=str.split('//', 1)
  array_2=array_1[1].split('.', 2)
  storageaccoutname = array_2[0]
  type=array_2[1]
  array_3=array_2[2].split('/', 1)
  array_4=array_3[1].split('?', 1)
  sas='?' + array_4[1]
  array_5=array_4[0].split('/', 1)
  contianer=array_5[0]
  return (storageaccoutname, contianer, sas)
sas_url = dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")
storage_account_name, container_name, sas_token = get_detail_info_from_url(sas_url)
mount_path = "/mnt/path1"
mount_blob_storage_form_sas(dbutils, storage_account_name, container_name, mount_path, sas_token, True)

We can integrate our Databricks tasks into Azure Data Factory with other activities to build one end to end data pipeline. Suggest that this mount/unmounting activity is designed as one prerequisite step for other notebooks tasks, see one example diagram in Azure Data Factory:

Recommender System with Azure Databricks

Recommender system are among the most well known, widely used and highest-value use cases for applying machine learning. This article describes how to build a movie recommender model based on the MovieLens dataset with Azure Databricks and other services in Azure platform.

There are quite many frameworks or tools that can be used for recommender system, e.g. Apache Spark ML or Mllib; Surprise;Tensorflow. Azure Databricks supports these popular frameworks. First we will start from Apache Spark framework to see how to build one basic recommender model.

Data Loading and Processing

Let’s use the MovieLens dataset to build a movie recommender using collaborative filtering with Spark’s Alternating Least Saqures implementation.  Download the data from the MovieLens dataset. Upload the data file into DBFS or Azure blob storage. If you store the data in blob storage, we need mount the blob container path in DBFS path.

datasets_path = "dbfs:/mnt/dbscontainer/MovieLens/Latest/"
small_ratings_file = "dbfs:/mnt/dbscontainer/MovieLens/Latest/ml-latest-small/ratings.csv"
complete_ratings_file = "dbfs:/mnt/dbscontainer/MovieLens/Latest/ml-latest/ratings.csv"

Please note that there are two datasets: small and full. Here we just use the full dataset, which contains 24404096 ratings and 668953 tag applications across 40110 movies. These data were created by 259137 users between January 09, 1995 and October 17, 2016. This dataset was generated on October 18, 2016. Let’s start loading the ratings data and splitting into training/test.

COL_USER = "userId"
COL_ITEM = "movieId"
COL_RATING = "rating"
COL_PREDICTION = "prediction"
COL_TIMESTAMP = "timestamp"

ratingSchema = StructType(
    [
        StructField(COL_USER, IntegerType()),
        StructField(COL_ITEM, IntegerType()),
        StructField(COL_RATING, FloatType()),
        StructField(COL_TIMESTAMP, LongType())
    ]
)

full_rating_raw_df = spark.read.option("sep", ",").option("header", "true").schema(ratingSchema).csv(complete_ratings_file)
display(full_rating_raw_df)
training, test = full_rating_raw_df.randomSplit([0.8, 0.2], seed=0)
print("training data count: %s" % training.count())
print("test data count: %s" % test.count())

Collaborative Filtering

Collaborative filtering is commonly used for recommender systems. In Collaborative filtering we make predictions (filtering) about the interests of a user by collecting preferences or taste information from many users (collaborating). The underlying assumption is that if a user A has the same opinion as a user B on an issue, A is more likely to have B’s opinion on a different issue x than to have the opinion on x of a user chosen randomly.

There are two types of library in Spark library for Collaborative Filtering: spark.ml and spark.mllib. The major difference is that spark.ml is DataFrame-based API, while spark.mllib is RDD-based API. Here we will use spark.ml for ALS model, as RDD-based API is now in maintenance mode.

The implementation in spark.ml has the following parameters:

  • numBlocks is the number of blocks the users and items will be partitioned into in order to parallelize computation (defaults to 10).
  • rank is the number of latent factors in the model (defaults to 10).
  • maxIter is the maximum number of iterations to run (defaults to 10).
  • regParam specifies the regularization parameter in ALS (defaults to 1.0).
  • implicitPrefs specifies whether to use the explicit feedback ALS variant or one adapted for implicit feedback data (defaults to false which means using explicit feedback).
  • alpha is a parameter applicable to the implicit feedback variant of ALS that governs the baseline confidence in preference observations (defaults to 1.0).
  • nonnegative specifies whether or not to use nonnegative constraints for least squares (defaults to false).
# Use the complete dataset to build the final model
# Note that parameters rank, maxIter,regParam...are hyper parameters.

import time
header = {
    "userCol": "userId",
    "itemCol": "movieId",
    "ratingCol": "rating",
}

als = ALS(
    rank=10,
    maxIter=15,
    implicitPrefs=False,
    regParam=0.05,
    coldStartStrategy='drop',
    nonnegative=False,
    seed=42,
    **header
)
start_time = time.time()
model = als.fit(training)
train_time = time.time() - start_time
print("Took {} seconds for training.".format(train_time))
# Evaluate the model by computing the RMSE on the test data

from pyspark.ml.evaluation import RegressionEvaluator
predictions = model.transform(test)
evaluator = RegressionEvaluator(metricName="rmse", labelCol="rating",
                                predictionCol="prediction")
rmse = evaluator.evaluate(predictions)
print("Root-mean-square error = " + str(rmse))

Making Recommendations

We now have our recommender model ready, we can give it a try providing some movie recommendations. 

# Generate top 10 movie recommendations for each user
userRecs = model.recommendForAllUsers(10)
# Generate top 10 user recommendations for each movie
movieRecs = model.recommendForAllItems(10)

# Generate top 10 movie recommendations for a specified set of users
users = full_rating_raw_df.select(als.getUserCol()).distinct().limit(3)
userSubsetRecs = model.recommendForUserSubset(users, 10)
# Generate top 10 user recommendations for a specified set of movies
movies = full_rating_raw_df.select(als.getItemCol()).distinct().limit(3)
movieSubSetRecs = model.recommendForItemSubset(movies, 10)

Persisting the Model

Sometimes we might want to persist the base model for later use in our on-line recommendations. Although a new model is updated every time we have new user ratings, it might be worth storing the current one, in order to save time when starting up the server.

# Save and load model

from pyspark.ml.recommendation import *
model_path = "dbfs:/mnt/dbscontainer/MovieLens/Latest/models/movie_lens_als"
model.save(model_path)
sameModel = ALSModel.load(model_path)

NCFM on Azure Databricks

This article shows how we can run deep learning model for one image classification task. We take the Kaggle NCFM competition as the playground project.

Data and Preparation

Download the data from  https://www.kaggle.com/c/the-nature-conservancy-fisheries-monitoring/data . Unzip and upload the data file into DBFS or Azure blob storage. This is a typical single-label image classification problem covering 8 classes (7 for fish and 1 for non-fish). Training set is rather small, only 3777 images, extra 1000 for testing. Images are challenging since noise/background dominates in the whole picture. To prepare model training, we will split the labed data into training and validation. Usually, 80% for training and 20% for validation. After the split, there are separate folders: val_train, train_split.

Modeling and Training

Small size of training set may be risk of overfiting during model training. One solution is transfer leaning/fine-tune the weights of pre-trained networks. Pre-trained models trained across multiple GPUs on ImageNet; ConvNet features are more generic in early layers and more original-dataset-specific in later layers; Use a small learning rate to fine-tune; Usually fine-tuning begins with later layers;

Here we use Inception-V3 model with ImageNet Pretrained weights. The pre-trained Inception-v3 model achieves state-of-the-art accuracy for recognizing general objects with 1000 classes, like “Zebra”, “Dalmatian”, and “Dishwasher”. The model extracts general features from input images in the first part and classifies them based on those features in the second part.

import os
from keras.applications.inception_v3 import InceptionV3
from keras.layers import Flatten, Dense, AveragePooling2D
from keras.models import Model
from keras.optimizers import RMSprop, SGD
from keras.callbacks import ModelCheckpoint
from keras.preprocessing.image import ImageDataGenerator

learning_rate = 0.001
img_width = 299
img_height = 299
nbr_train_samples = 3019
nbr_validation_samples = 758
nbr_epochs = 20
batch_size = 32
train_data_dir = '/dbfs/mnt/vpa-raw-data-dev/POC/train_split'
val_data_dir = '/dbfs/mnt/vpa-raw-data-dev/POC/val_split'

FishNames = ['ALB', 'BET', 'DOL', 'LAG', 'NoF', 'OTHER', 'SHARK', 'YFT']

print('Loading InceptionV3 Weights ...')
InceptionV3_notop = InceptionV3(include_top=False, weights='imagenet',
                    input_tensor=None, input_shape=(299, 299, 3))

print('Adding Average Pooling Layer and Softmax Output Layer ...')
output = InceptionV3_notop.get_layer(index = -1).output  # Shape: (8, 8, 2048)
output = AveragePooling2D((8, 8), strides=(8, 8), name='avg_pool')(output)
output = Flatten(name='flatten')(output)
output = Dense(8, activation='softmax', name='predictions')(output)

InceptionV3_model = Model(InceptionV3_notop.input, output)
InceptionV3_model.summary()

optimizer = SGD(lr = learning_rate, momentum = 0.9, decay = 0.0, nesterov = True)
InceptionV3_model.compile(loss='categorical_crossentropy', optimizer = optimizer, metrics = ['accuracy'])

# autosave best Model
best_model_file = "/dbfs/mnt/vpa-raw-data-dev/POC/weights.h5"
best_model = ModelCheckpoint(best_model_file, monitor='val_acc', verbose = 1, save_best_only = True)

In order to improve our ranking, we use data augmentation for testing images.

# this is the augmentation configuration we will use for training
train_datagen = ImageDataGenerator(
        rescale=1./255,
        shear_range=0.1,
        zoom_range=0.1,
        rotation_range=10.,
        width_shift_range=0.1,
        height_shift_range=0.1,
        horizontal_flip=True)

# this is the augmentation configuration we will use for validation, only rescaling
val_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(
        train_data_dir,
        target_size = (img_width, img_height),
        batch_size = batch_size,
        shuffle = True,
        classes = FishNames,
        class_mode = 'categorical')

validation_generator = val_datagen.flow_from_directory(
        val_data_dir,
        target_size=(img_width, img_height),
        batch_size=batch_size,
        shuffle = True,
        #save_to_dir = '/Users/Sandy/Repo/Kaggle_NCFM/visulization',
        #save_prefix = 'aug',
        classes = FishNames,
        class_mode = 'categorical')

InceptionV3_model.fit_generator(
        train_generator,
        steps_per_epoch = 94,
        nb_epoch = 20,
        validation_data = validation_generator,
        validation_steps = 23,
        callbacks = [best_model])

Prediction

import os
import numpy as np
from keras.preprocessing.image import ImageDataGenerator
from keras.models import load_model

img_width = 299
img_height = 299
batch_size = 32
nbr_test_samples = 1000

FishNames = ['ALB', 'BET', 'DOL', 'LAG', 'NoF', 'OTHER', 'SHARK', 'YFT']

root_path = '/dbfs/mnt/vpa-raw-data-dev/POC/'
weights_path = os.path.join(root_path, 'weights.h5')
print(weights_path)
test_data_dir = os.path.join(root_path, 'test_stg1/')
print(test_data_dir)

test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_directory(
        test_data_dir,
        target_size=(img_width, img_height),
        batch_size=batch_size,
        shuffle = False, # Important !!!
        classes = None,
        class_mode = None)

test_image_list = test_generator.filenames

print('Loading model and weights from training process ...')
InceptionV3_model = load_model(weights_path)
print('Begin to predict for testing data ...')
predictions = InceptionV3_model.predict_generator(test_generator, nbr_test_samples)
np.savetxt(os.path.join(root_path, 'predictions.txt'), predictions)

print('Begin to write submission file ..')
f_submit = open(os.path.join(root_path, 'submit.csv'), 'w')
f_submit.write('image,ALB,BET,DOL,LAG,NoF,OTHER,SHARK,YFT\n')
for i, image_name in enumerate(test_image_list):
    pred = ['%.6f' % p for p in predictions[i, :]]
    if i % 100 == 0:
        print('{} / {}'.format(i, nbr_test_samples))
    f_submit.write('%s,%s\n' % (os.path.basename(image_name), ','.join(pred)))
f_submit.close()
print('Submission file successfully generated!')

Practical Tricks

  1. When we use ConvNet for image classification task, while the train samples size is quite small, we can pick a STOA ConvNet architecture, e.g. InceptionV3, ResNet, Inception-ResNet, DenseNet, etc with pre-trained weights on ImageNet to speed up convergence.
  2. Finetune with small learning rate. I have tried learning rate with 0.001 and 0.0001. The smaller learning rate training is quite slow, but gain good validation accuracy.
  3. Use Data augumentation to reduce overfitting.
  4. Split train and local validation.
  5. Ensemble models might help, but I didn’t try yet while I am writing this.

Finally special thanks to pengpaiSH for referencing his code sample. https://github.com/pengpaiSH/Kaggle_NCFM

SCD Implementation with Databricks Delta

Slowly Changing Dimensions (SCD) are the most commonly used advanced dimensional technique used in dimensional data warehouses. Slowly changing dimensions are used when you wish to capture the data changes (CDC) within the dimension over time. Two typical SCD scenarios: SCD Type 1 and SCD Type 2. Type 1 – For this type of slowly changing dimension you simply overwrite the existing data values with new data values. Type 2 – This is the most commonly used type of slowly changing dimension. For this type of slowly changing dimension, add a new record encompassing the change and mark the old record as inactive.  In this blog, we are going to describe how we implement SCD Type 1 and SCD Type 2 with Azure Databricks. SCD Type 1&2 are newly supported by Databricks Delta. Please see the office document link for the command.

Data Preparation

Original Records: target.csv

IDNameOwnerDescription
G87D744D-345T-46AD-BD9D-B18CB66345YTProduct0user0@dummy.comProduct0 Desc
D87D7FFD-E03B-46AD-BD9D-B18CB6632DC1Product1user1@dummy.comProduct1 Desc
FF7D7FFD-E03B-46AD-BD9D-B18CB6632WW3Product2user2@dummy.comProduct2 Desc
TT7D7FFD-E03B-46AD-BD9D-B18CB6632256Product3user3@dummy.comProduct3 Desc

Update Records: source.csv

IDNameOwnerDescription
D87D7FFD-E03B-46AD-BD9D-B18CB6632DC1Product1user1@dummy.comProduct1 Description Changed Only
FF7D7FFD-E03B-46AD-BD9D-B18CB6632WW3Product2userchanged@dummy.comProduct2 Owner and Desciption Changed
TT7D7FFD-E03B-46AD-BD9D-B18CB6632256Product3user3@dummy.comProduct3 Description
2A6CE7F2-4C6F-41DF-9819-235021DC1226Product4user4@dummy.comNew Product

ID: Business Key column; Name: SCD Type 2 column; Owner: SCD Type 2 column; Descriptio: SCD Type 1 column.

Expected result:

Product0 row will remain the same; Product1 row only description will be updated; Original Product2 row will be inactive, and new row of Product2 is added; Product3 will remain the same; Product4 new row will be added.

Data Load and Transformation

Upload the targer.csv and source.csv into cloud, e.g. Azure blob storage or DBFS. Then load the two data files into two DataFrames.

# Load Data
target_df = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("/mnt/vpa-raw-data-dev/POC/target.csv")
source_df = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("/mnt/vpa-raw-data-dev/POC/source.csv")

In order to use MERGE command, we need save to two DataFrames into Delta table. More info about Delta table, please see the link here.

# Delta tables
spark.sql("DROP TABLE IF EXISTS source")
spark.sql("CREATE TABLE source (ID STRING, Name STRING, Owner STRING, Description STRING) USING DELTA LOCATION '/mnt/vpa-raw-data-dev/POC/source'")
spark.sql("TRUNCATE TABLE source")
source_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save('/mnt/vpa-raw-data-dev/POC/source')

from pyspark.sql.functions import *
from pyspark.sql.types import *
spark.sql("DROP TABLE IF EXISTS target")
spark.sql("CREATE TABLE target (ID STRING, Name STRING, Owner STRING, Description STRING, RowStatus STRING) USING DELTA LOCATION '/mnt/vpa-raw-data-dev/POC/target'")
spark.sql("TRUNCATE TABLE target")
target_df = target_df.withColumn('RowStatus', lit(None).cast(StringType()))
target_df.write.mode("overwrite").format("delta").save("/mnt/vpa-raw-data-dev/POC/target")

SCD Merge

We have to use two MERGE command to cover the four scenarios: SCD 1, SCD 2, New rows, and unchanged rows, as one MERGE command only support one UPDATE/INSERT within.

%sql
-- Update SCD Type 2 rows (RowStatus = 2) and Insert Not Match rows (RowStatus = 3)
MERGE INTO target
USING source
ON target.ID = source.ID
WHEN MATCHED AND (NOT(source.Owner <=> target.Owner) OR NOT(source.Name <=> target.Name)) THEN
  UPDATE SET
    target.RowStatus = 2,
    target.Owner = source.Owner,
    target.Name = source.Name,
    target.Description = source.Description  
WHEN NOT MATCHED
  THEN INSERT (ID,Name,Owner,Description,RowStatus) VALUES (source.ID,source.Name,source.Owner,source.Description,3)
  
-- Merge SCD Type 1 update (RowStatus = 1)
MERGE INTO target
USING source
ON target.ID = source.ID
WHEN MATCHED AND (target.RowStatus IS NULL) AND (NOT(source.Description <=> target.Description)) THEN
  UPDATE SET
    target.RowStatus = 1,
    target.Description = source.Description

Check the merge result in target table if both SCD Type 1 and 2 update as expected.

Update and Insert

When the merged data set in target table, we need use this target dataset to update the dimension tables, probably in traditional database(SQL Server/MySQL…). Here we use Azure SQL Database as an example.

Spark JDBC doesn’t support Update command. One workaround is use Spark Connector. In order to achieve this, we need create one temporary table to store those SCD Type 1 and Type 2 rows. Then update the dimension table with the temporary table through Spark Connector.

#Filter out SCD Type 1 and 2 rows from target Delta table, and save into one temp table in Azure SQL
scd12_df = spark.sql("SELECT ID, Name, Owner, Description, RowStatus FROM target WHERE ( RowStatus = 2 OR RowStatus = 1)")
scd12_df.write.mode("overwrite").jdbc(url = jdbcUrl, table = "Scd_tmp", properties = connectionProperties)
%scala
import org.apache.spark.sql.SQLContext
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
import com.microsoft.azure.sqldb.spark.connect._

//Update columns value for those SCD Type 1 change only row
val scd1_query = """
              |UPDATE Scd
              |SET Scd.Description = Scd_tmp.Description
              |FROM Scd
              |INNER JOIN Scd_tmp
              |ON Scd.ID = Scd_tmp.ID AND Scd_tmp.RowStatus = '1';
            """.stripMargin

val scd1_config = Config(Map(
  "url"            -> "dummy url",
  "databaseName"   -> "dummy databaseName",
  "user"           -> "dummy user",
  "password"       -> "dummy pwd",
  "queryCustom"    -> scd1_query
))
sqlContext.sqlDBQuery(scd1_config)

//Update SCD Type 2 row: Set Active_Record as 0, and Record_EndDate as current datatime.
val scd2_query2 = """
              |UPDATE Scd
              |SET Scd.Active_Record = '0', Scd.Record_EndDate = GETDATE()
              |FROM Scd
              |INNER JOIN Scd_tmp
              |ON Scd.ID = Scd_tmp.ID AND Scd_tmp.RowStatus = '2';
            """.stripMargin

val scd2_config = Config(Map(
  "url"            -> "dummy url",
  "databaseName"   -> "dummy databaseName",
  "user"           -> "dummy user",
  "password"       -> "dummy pwd",
  "queryCustom"    -> scd1_query
))
sqlContext.sqlDBQuery(scd2_config)

Finally insert the those from Spark into SQL tables through JDBC API.

newinserted_df = spark.sql("SELECT ID, Name, Owner, Description FROM target WHERE RowStatus = '3'")
newinserted_df.write.mode("append").jdbc(url = jdbcUrl, table = "Scd", properties = connectionProperties)

Handling Excel Data in Azure Databricks

By now, there is no default support of loading data from Spark in Cloud.(Here we take Azure Databricks as the example). Based on research, some links sound helpful.

https://stackoverflow.com/questions/44196741/how-to-construct-dataframe-from-a-excel-xls-xlsx-file-in-scala-spark

https://stackoverflow.com/questions/16888888/how-to-read-a-xlsx-file-using-the-pandas-library-in-ipython

This article describes some practice based on our recent projects.

Data Preparation

Sample MS Excel data as below: Two sheets: Companies, and Users

CompanyIDCompanyNameDescription
100Comp1Desc1
101Comp2Desc2
102Comp3Desc2

Solution 1: Read Excel data using Pandas, then convert Pandas DataFrame into Spark DataFrame.

from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

filepath = '/dbfs/xxx/xxx/Samples.xlsx'
companies_pd_df = pd.read_excel(filepath, sheet_name='Companies')
companiesSchema = StructType([
  StructField("CompanyID", IntegerType(), False),
  StructField("CompanyName", StringType(), False),
  StructField("Description", StringType(), False)])
companies_df = spark.createDataFrame(companies_pd_df, schema=companiesSchema)

Note that:

  1. While convert Pandas DataFrame into Spark DataFrame, we need to manually define the Schema, otherwise the conversion will fail probably.
  2. In some cases, the created Spark DataFrame may display some dummy data or additional unnecessary row. In this case, we may filter out those unnecessary rows. e.g. companies_Df = companies_df.filter(isnan(“CompaniesID”) != True)

Solution 2: Use Spark Excel.

More details documentation can be found here. Code sample in Azure Databricks:

%scala

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import com.crealytics.spark.excel._

val companiesSchema = StructType(Array(
  StructField("CompanyID", IntegerType, nullable = true),
  StructField("CompanyName", StringType, nullable = true),
  StructField("Description", StringType, nullable = true)))
val filepath = "/mnt/xxx/xxx/Samples.xlsx"
val df = spark.read
    .format("com.crealytics.spark.excel")
    .option("sheetName", "Companies")
    .option("useHeader", "true")
    .schema(companiesSchema)
    .load(filepath)
df.show()

Note that:

  1. Currently Spark Excel plugin is only available for Scala, not for Python yet. Hence we have to use magic command for Python notebook.
  2. Error/Exceptions may happens for some versions. I have tested out this successfully with version com.crealytics:spark-excel_2.11:0.12.0

Google Analytics Raw Data Ingest

Google Analytics is a very popular tool for those customers that want analytics insights for their website or apps, without building their own data analytics system or big data platform.

In some cases, Google Analytics(GA) report/dashboards or its reporting api are not matching our needs perfectly. Hence we did some research to investigate how to ingest  GA raw data.

Solution 1:  Google Analytics 360

GA 360 supports features that export session and hit data into Google BigQuery. But the biggest challenge is that GA 360 price starts from US$150000 per year. This may be one concern for small or startup companies.

https://support.google.com/analytics/answer/3437618?hl=en

Solution 2: Third party tool.

SCITYLANA: https://www.google.com/analytics/partners/company/5177885753081856/gadp/5629499534213120/app/5707702298738688/listing/5757334940811264

Snowplow: https://snowplowanalytics.com/blog/2018/02/08/warehousing-google-analytics-data-api-vs-hit-level-data

Solution 3: Custom code to implement export ga raw into Bigquery. Some examples found.

http://dmitriilin.com/exporting-data-google-analytics-google-bigquery/

http://daynebatten.com/2015/07/raw-data-google-analytics/

Inspired by the above articles, we are able to send the raw hit level data to GA and another destination (e.g. BigQuery) at the same time. Then we can continuously copy data from BigQuery into Blob Storage through Azure Data Factory. The data flow is as below:

Kaggle AllState Competition in Azure Databricks

This post, we will describe how to practice one Kaggle competition process with Azure Databricks. Compared to run our training and tuning phase in local machines or single servers, it is quite fast that we can train our model in Azure Databricks with Spark.

Kaggle Allstate Claims Severity

When you’ve been devastated by a serious car accident, your focus is on the things that matter the most: family, friends, and other loved ones. Pushing paper with your insurance agent is the last place you want your time or mental energy spent. This is why Allstate, a personal insurer in the United States, is continually seeking fresh ideas to improve their claims service for the over 16 million households they protect.

Allstate is currently developing automated methods of predicting the cost, and hence severity, of claims. In this recruitment challenge, Kagglers are invited to show off their creativity and flex their technical chops by creating an algorithm which accurately predicts claims severity. Aspiring competitors will demonstrate insight into better ways to predict claims severity for the chance to be part of Allstate’s efforts to ensure a worry-free customer experience.

Data and Preparation

Download the data from https://www.kaggle.com/c/allstate-claims-severity/data. Upload the data file into DBFS or Azure blob storage, then read train data and test data into DataFrame.

import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import RegressionMetrics
print("Read and load data started...")
trainInput = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("dbfs:/yourpath/train.csv")
.cache())

testInput = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("dbfs:/yourpath/test.csv")
.cache())
print("Read and load data completed...")
data = trainInput.withColumnRenamed("loss", "label")
[trainingData, validationData] = data.randomSplit([0.7, 0.3])
trainingData.cache()
validationData.cache()
testData = testInput.cache()

Please note that you should replace the above bdfs path with your one. For Spark Dataframe/Dataset/RDD, better we cache them for future release. Here we cache trainingData, validationData and testData.

Modeling and Training

print("Feature engineering...")
print("Handle categories data...")
# Use StringIndexer or OneHotEncoder for categories columns
isCateg = lambda c: c.startswith("cat")
categNewCol = lambda c: "idx_{0}".format(c) if (isCateg(c)) else c

stringIndexerStages = map(lambda c: StringIndexer(inputCol=c, outputCol=categNewCol(c))
.fit(trainInput.select(c).union(testInput.select(c))), filter(isCateg, trainingData.columns))

removeTooManyCategs = lambda c: not re.match(r"cat(109$|110$|112$|113$|116$)", c)

# Keep those feature columns only
onlyFeatureCols = lambda c: not re.match(r"id|label", c)

featureCols = map(categNewCol, 
filter(onlyFeatureCols, 
filter(removeTooManyCategs, 
trainingData.columns)))

# Assemble features
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
print("Features generation and assembly completed...")
print("Building Random Forest for regression..")
algo = RandomForestRegressor(featuresCol="features", labelCol="label")

stages = stringIndexerStages
stages.append(assembler)
stages.append(algo)

#Build pipeline
pipeline = Pipeline(stages=stages)
print("K fold cross validation...")
numTrees = [5, 20]
maxDepth = [4, 6]
maxBins = [32]
numFolds = 3

paramGrid = (ParamGridBuilder()
.addGrid(algo.numTrees, numTrees)
.addGrid(algo.maxDepth, maxDepth)
.addGrid(algo.maxBins, maxBins)
.build())

cv = CrossValidator(estimator=pipeline,
evaluator=RegressionEvaluator(),
estimatorParamMaps=paramGrid,
numFolds=numFolds)

cvModel = cv.fit(trainingData)

Results Metrics and Prediction

trainPredictionsAndLabels = cvModel.transform(trainingData).select("label", "prediction").rdd

validPredictionsAndLabels = cvModel.transform(validationData).select("label", "prediction").rdd

trainRegressionMetrics = RegressionMetrics(trainPredictionsAndLabels)
validRegressionMetrics = RegressionMetrics(validPredictionsAndLabels)

bestModel = cvModel.bestModel
featureImportances = bestModel.stages[-1].featureImportances.toArray()

print("TrainingData count: {0}".format(trainingData.count()))
print("ValidationData count: {0}".format(validationData.count()))
print("TestData count: {0}".format(testData.count()))
print("=====================================================================")
print("Param algoNumTrees = {0}".format(",".join(map(lambda x:str(x), numTrees))))
print("Param algoMaxDepth = {0}".format(",".join(map(lambda x:str(x), maxDepth))))
print("Param algoMaxBins = {0}".format(",".join(map(lambda x:str(x), maxBins))))
print("Param numFolds = {0}".format(numFolds))
print("=====================================================================\n")
print("Training data MSE = {0}".format(trainRegressionMetrics.meanSquaredError))
print("Training data RMSE = {0}".format(trainRegressionMetrics.rootMeanSquaredError))
print("Training data R-squared = {0}".format(trainRegressionMetrics.r2))
print("Training data MAE = {0}".format(trainRegressionMetrics.meanAbsoluteError))
print("Training data Explained variance = {0}".format(trainRegressionMetrics.explainedVariance))
print("=====================================================================\n")
print("Validation data MSE = {0}".format(validRegressionMetrics.meanSquaredError))
print("Validation data RMSE = {0}".format(validRegressionMetrics.rootMeanSquaredError))
print("Validation data R-squared = {0}".format(validRegressionMetrics.r2))
print("Validation data MAE = {0}".format(validRegressionMetrics.meanAbsoluteError))
print("Validation data Explained variance = {0}".format(validRegressionMetrics.explainedVariance))
print("=====================================================================\n")
print("Feature importances:\n{0}\n".format("\n".join(map(lambda z: "{0} = {1}".format(str(z[0]),str(z[1])), zip(featureCols, featureImportances)))))
cvModel.transform(testData)\
.select("id", "prediction")\
.withColumnRenamed("prediction", "loss")\
.coalesce(1)\
.write.format("csv")\
.option("header", "true")\
.save("dbfs:/yourpath/rf_sub.csv"

Finally you can upload your result file to Kaggle if you want submit your results.

 

 

Feature Engineering with PySpark in Azure Databricks

Azure Databricks is a very cool easy to use platform for both analytics engineers and machine learning developers. I would like to use this post to summarize basic APIs and tricks in feature engineering with Azure Databricks. Previously I was using Jupyter notebook or PyCharm to develop or practice some machine learning cases. Compared to Jupyter Notebook, Azure Databricks provides similar Notebook, which adds some additional features. In the future I will describe how to use Azure Databricks for production.

Feature engineering is the preprocessing phase in machine learning, also needs huge effort from developers to get data ready for modeling and training. Here I list some basic feature engineering scenarios with PySpark in Azure Databricks.

If you want to run code snippet below in normal Jupyter Notebook, you need add Spark initialization code as below. (Not necessary in Azure Databricks, as it is already for use.)

spark = SparkSession\
.builder\
.appName("AppName")\
.getOrCreate()

Continuous Data

#Binarizer
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
(0, 1.1),
(1, 8.5),
(2, 5.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
#Bucketizer
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Assign buckets per the splits boundary
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()
#QuantileDiscretizer
from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
df = spark.createDataFrame(data, ["id", "hour"])
df = df.repartition(1)

# Divide into 3 buckets as quantile distribution
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
discretizerModel = discretizer.fit(df)
result = discretizerModel.transform(df)
result.show()
#MaxAbsScaler
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Calculate Max value model
scalerModel = scaler.fit(dataFrame)

# Transform with scale model, so that values are scaled between [-1.0, 1.0]
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
#Standard scaler
from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)

# Calculate mean and variance
scalerModel = scaler.fit(dataFrame)

# Standardize
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
#Polynomial expansion
from pyspark.ml.feature import PolynomialExpansion

df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)

Discrete Data

# StringIndexer and One-hot encoder
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()

Text

#Use Stop Words
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
# Tokenizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
# Count Vectorizer
from pyspark.ml.feature import CountVectorizer

df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])

cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)
# TF-IDF
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
# NGram
from pyspark.ml.feature import NGram

#Hanmeimei loves LiLei
#LiLei loves Hanmeimei

wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

Others

# SQL Transformer
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])

sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
# R formula transform
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])

formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()

Note that: There are many feature engineering APIs are not mentioned here. Please see more info from latest pyppark.ml.feature package.

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature

 

Batch Data Ingest with Azure Databricks

In the previous blog we have introduced basic steps of data ingest of streaming data with Azure Databricks. Now we are going to describe how to do batch ingest with Azure Data Bricks. Most of the steps are similar, just the spark DataFrame API are different.

Data Source

spark.conf.set(
  "your blob account key here.blob.core.windows.net",
  "your blob account key value here")
val df = spark.read.json("wasbs://yout container@your blob account.blob.core.windows.net/veracity_ai_prod_41f11b1f17bb43d49ba51beabf2dd0a9/Availability/2018-06-04/06")
val availabilitySchema = df.schema
print(availabilitySchema)

Please note that we need to df.schema to get availability log’s schema, similar for other log formar, e.g. page views, event. It is very hard to program the schema manually.

Data Transform

Use spark.read to get the Data Frame from input data. Please pay attention to input file format. inputPath/2018-05-30/09 should be matched like (inputPath + “/*/*/”)

val availabilityDF = spark.read.schema(availabilitySchema)
.option("maxFilesTrigger", 1).json(inputPath + "/*/*/")

Use Spark SQL to do the transform, much easier than transform with Data Frame API.

availabilityDF.createOrReplaceTempView("logs")
val sqlDF = spark
.sql("SELECT internal.data.id as ID, availability[0].testTimestamp as timestamp, availability[0].testName as name, availability[0].runLocation as location, availability[0].message as message, availability[0].durationMetric.value as duration FROM logs")
sqlDF.show()
sqlDF.printSchema()

Data Sink

Here we use Azure SQL Server as the persistence of transformed data.

First define the database configuration

val jdbcUsername = "Your SQL Server username"
val jdbcPassword = "Your SQL Server password"
val jdbcHostname = "Your SQL Server name.database.windows.net" //typically, this is in the form or servername.database.windows.net
val jdbcPort = 1433
val jdbcDatabase ="Availability"
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};user=${jdbcUsername};password=${jdbcPassword}"

Setup connection properties for JDBC

import java.util.Properties

val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)

Finally use write.jdbc api to persistence transformed data into Azure SQL Server, quite cool.

sqlDF.write.jdbc(jdbcUrl, "aiavailability3", connectionProperties)

Create bitcoin address

Bitcoin and BlockChain are too popular to ignored by technical people.  Hence I also spend some hours to read some books, and do some coding. This blog explains basic steps to create bitcoin address.

Step 1: Generate private key (256 bit, 64 bytes)

import binascii as ba

import ecdsa

import hashlib

import random

def create_priv_key():

   digits = ['%x' % random.randrange(16) for _ in range(64)]

   return''.join(digits)
priv_key = create_priv_key()

print('1. Private key:%s' % priv_key)

Step 2: Generate public key

def build_pub_key(priv_key):

   buff = bytes.fromhex(priv_key)

   sk = ecdsa.SigningKey.from_string(buff,

   curve=ecdsa.SECP256k1)

   pub_key = b'\x04' + sk.verifying_key.to_string()

   return ba.b2a_hex(pub_key).decode()
pub_key = build_pub_key(priv_key)

print('2. Public key:%s' % pub_key)

Step 3: Calculate sha-256

def sha256(key):

    sha = hashlib.sha256(bytes.fromhex(key))

    return ba.hexlify(sha.digest()).decode()
r_3 = sha256(pub_key)

print('3. Public key sha-256:%s' % r_3)

Step 4: Calculate ripedm160

def ripemd160(key):

    rm = hashlib.new('ripemd160')

    rm.update(bytes.fromhex(key))

    return ba.hexlify(rm.digest()).decode()
r_4 = ripemd160(r_3)

print('4. Calculate Step 3 ripemd-160:%s' % r_4)

Step 5: Prefix address with protocol version

r_5 = '00' + r_4

print('5. Prefix Bitcoin protocol version :%s' % r_5)

Step 6: Calculate sha-256 twice

r_6 = sha256(sha256(r_5))

print('6. Calculate Step 5 with sha-256 twice:%s' % r_6)

Step 7: Validation

r_7 = r_5 + r_6[:8]

print('7. Append output last step into Step 5 for validation:%s' % r_7)

Step 8: Encode with base58

addr = base58.b58encode(bytes.fromhex(r_7))
print(‘8. Wallet address: %s’ % addr)