Category Archives: Analytics

Mount/Unmount SASURL with Databricks File System

When we develop data analytics solution, data preparation and data load are the steps that we cannot skip. Azure Databricks supports both native file system Databricks File System (DBFS) and external storage. For external storage, we can access directly or mount it into Databricks File System. This article explains how to mount and unmount blog storage into DBFS.

The code from Azure Databricks official document.

#  Mount an Azure Blob storage container
dbutils.fs.mount(
  source = "wasbs://<container-name>@<storage-account-name>.blob.core.windows.net",
  mount_point = "/mnt/<mount-name>",
  extra_configs = {"<conf-key>":dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")})
# Unmount a mount point
dbutils.fs.unmount("/mnt/<mount-name>")

Normally in our data pipeline, we have the logic like this: 1) Check if the path is mounted or not. 2) If it is not mounted yet, mount the path. 3) If it is already mounted, either ignore the mount logic use the existing mounting point, or unmount it and mounting it again.

def mount_blob_storage_from_sas(dbutils, storage_account_name, container_name, mount_path, sas_token, unmount_if_exists = True):
  if([item.mountPoint for item in dbutils.fs.mounts()].count(mount_path) > 0):
    if unmount_if_exists:
        print('Mount point already taken - unmounting: '+mount_path)
        dbutils.fs.unmount(mount_path)
    else:
        print('Mount point already taken - ignoring: '+mount_path)
        return
  print('Mounting external storage in: '+mount_path)
  dbutils.fs.mount(
    source = "wasbs://{0}@{1}.blob.core.windows.net".format(container_name, storage_account_name),
    mount_point = mount_path,
    extra_configs = {"fs.azure.sas.{0}.{1}.blob.core.windows.net".format(container_name, storage_account_name): sas_token }) 

When blob storage is shared using SASURL instead of blob details information, we can parse the blob information from SASURL as below:

def get_detail_info_from_url(str):
  array_1=str.split('//', 1)
  array_2=array_1[1].split('.', 2)
  storageaccoutname = array_2[0]
  type=array_2[1]
  array_3=array_2[2].split('/', 1)
  array_4=array_3[1].split('?', 1)
  sas='?' + array_4[1]
  array_5=array_4[0].split('/', 1)
  contianer=array_5[0]
  return (storageaccoutname, contianer, sas)
sas_url = dbutils.secrets.get(scope = "<scope-name>", key = "<key-name>")
storage_account_name, container_name, sas_token = get_detail_info_from_url(sas_url)
mount_path = "/mnt/path1"
mount_blob_storage_form_sas(dbutils, storage_account_name, container_name, mount_path, sas_token, True)

We can integrate our Databricks tasks into Azure Data Factory with other activities to build one end to end data pipeline. Suggest that this mount/unmounting activity is designed as one prerequisite step for other notebooks tasks, see one example diagram in Azure Data Factory:

SCD Implementation with Databricks Delta

Slowly Changing Dimensions (SCD) are the most commonly used advanced dimensional technique used in dimensional data warehouses. Slowly changing dimensions are used when you wish to capture the data changes (CDC) within the dimension over time. Two typical SCD scenarios: SCD Type 1 and SCD Type 2. Type 1 – For this type of slowly changing dimension you simply overwrite the existing data values with new data values. Type 2 – This is the most commonly used type of slowly changing dimension. For this type of slowly changing dimension, add a new record encompassing the change and mark the old record as inactive.  In this blog, we are going to describe how we implement SCD Type 1 and SCD Type 2 with Azure Databricks. SCD Type 1&2 are newly supported by Databricks Delta. Please see the office document link for the command.

Data Preparation

Original Records: target.csv

IDNameOwnerDescription
G87D744D-345T-46AD-BD9D-B18CB66345YTProduct0user0@dummy.comProduct0 Desc
D87D7FFD-E03B-46AD-BD9D-B18CB6632DC1Product1user1@dummy.comProduct1 Desc
FF7D7FFD-E03B-46AD-BD9D-B18CB6632WW3Product2user2@dummy.comProduct2 Desc
TT7D7FFD-E03B-46AD-BD9D-B18CB6632256Product3user3@dummy.comProduct3 Desc

Update Records: source.csv

IDNameOwnerDescription
D87D7FFD-E03B-46AD-BD9D-B18CB6632DC1Product1user1@dummy.comProduct1 Description Changed Only
FF7D7FFD-E03B-46AD-BD9D-B18CB6632WW3Product2userchanged@dummy.comProduct2 Owner and Desciption Changed
TT7D7FFD-E03B-46AD-BD9D-B18CB6632256Product3user3@dummy.comProduct3 Description
2A6CE7F2-4C6F-41DF-9819-235021DC1226Product4user4@dummy.comNew Product

ID: Business Key column; Name: SCD Type 2 column; Owner: SCD Type 2 column; Descriptio: SCD Type 1 column.

Expected result:

Product0 row will remain the same; Product1 row only description will be updated; Original Product2 row will be inactive, and new row of Product2 is added; Product3 will remain the same; Product4 new row will be added.

Data Load and Transformation

Upload the targer.csv and source.csv into cloud, e.g. Azure blob storage or DBFS. Then load the two data files into two DataFrames.

# Load Data
target_df = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("/mnt/vpa-raw-data-dev/POC/target.csv")
source_df = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("/mnt/vpa-raw-data-dev/POC/source.csv")

In order to use MERGE command, we need save to two DataFrames into Delta table. More info about Delta table, please see the link here.

# Delta tables
spark.sql("DROP TABLE IF EXISTS source")
spark.sql("CREATE TABLE source (ID STRING, Name STRING, Owner STRING, Description STRING) USING DELTA LOCATION '/mnt/vpa-raw-data-dev/POC/source'")
spark.sql("TRUNCATE TABLE source")
source_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save('/mnt/vpa-raw-data-dev/POC/source')

from pyspark.sql.functions import *
from pyspark.sql.types import *
spark.sql("DROP TABLE IF EXISTS target")
spark.sql("CREATE TABLE target (ID STRING, Name STRING, Owner STRING, Description STRING, RowStatus STRING) USING DELTA LOCATION '/mnt/vpa-raw-data-dev/POC/target'")
spark.sql("TRUNCATE TABLE target")
target_df = target_df.withColumn('RowStatus', lit(None).cast(StringType()))
target_df.write.mode("overwrite").format("delta").save("/mnt/vpa-raw-data-dev/POC/target")

SCD Merge

We have to use two MERGE command to cover the four scenarios: SCD 1, SCD 2, New rows, and unchanged rows, as one MERGE command only support one UPDATE/INSERT within.

%sql
-- Update SCD Type 2 rows (RowStatus = 2) and Insert Not Match rows (RowStatus = 3)
MERGE INTO target
USING source
ON target.ID = source.ID
WHEN MATCHED AND (NOT(source.Owner <=> target.Owner) OR NOT(source.Name <=> target.Name)) THEN
  UPDATE SET
    target.RowStatus = 2,
    target.Owner = source.Owner,
    target.Name = source.Name,
    target.Description = source.Description  
WHEN NOT MATCHED
  THEN INSERT (ID,Name,Owner,Description,RowStatus) VALUES (source.ID,source.Name,source.Owner,source.Description,3)
  
-- Merge SCD Type 1 update (RowStatus = 1)
MERGE INTO target
USING source
ON target.ID = source.ID
WHEN MATCHED AND (target.RowStatus IS NULL) AND (NOT(source.Description <=> target.Description)) THEN
  UPDATE SET
    target.RowStatus = 1,
    target.Description = source.Description

Check the merge result in target table if both SCD Type 1 and 2 update as expected.

Update and Insert

When the merged data set in target table, we need use this target dataset to update the dimension tables, probably in traditional database(SQL Server/MySQL…). Here we use Azure SQL Database as an example.

Spark JDBC doesn’t support Update command. One workaround is use Spark Connector. In order to achieve this, we need create one temporary table to store those SCD Type 1 and Type 2 rows. Then update the dimension table with the temporary table through Spark Connector.

#Filter out SCD Type 1 and 2 rows from target Delta table, and save into one temp table in Azure SQL
scd12_df = spark.sql("SELECT ID, Name, Owner, Description, RowStatus FROM target WHERE ( RowStatus = 2 OR RowStatus = 1)")
scd12_df.write.mode("overwrite").jdbc(url = jdbcUrl, table = "Scd_tmp", properties = connectionProperties)
%scala
import org.apache.spark.sql.SQLContext
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
import com.microsoft.azure.sqldb.spark.connect._

//Update columns value for those SCD Type 1 change only row
val scd1_query = """
              |UPDATE Scd
              |SET Scd.Description = Scd_tmp.Description
              |FROM Scd
              |INNER JOIN Scd_tmp
              |ON Scd.ID = Scd_tmp.ID AND Scd_tmp.RowStatus = '1';
            """.stripMargin

val scd1_config = Config(Map(
  "url"            -> "dummy url",
  "databaseName"   -> "dummy databaseName",
  "user"           -> "dummy user",
  "password"       -> "dummy pwd",
  "queryCustom"    -> scd1_query
))
sqlContext.sqlDBQuery(scd1_config)

//Update SCD Type 2 row: Set Active_Record as 0, and Record_EndDate as current datatime.
val scd2_query2 = """
              |UPDATE Scd
              |SET Scd.Active_Record = '0', Scd.Record_EndDate = GETDATE()
              |FROM Scd
              |INNER JOIN Scd_tmp
              |ON Scd.ID = Scd_tmp.ID AND Scd_tmp.RowStatus = '2';
            """.stripMargin

val scd2_config = Config(Map(
  "url"            -> "dummy url",
  "databaseName"   -> "dummy databaseName",
  "user"           -> "dummy user",
  "password"       -> "dummy pwd",
  "queryCustom"    -> scd1_query
))
sqlContext.sqlDBQuery(scd2_config)

Finally insert the those from Spark into SQL tables through JDBC API.

newinserted_df = spark.sql("SELECT ID, Name, Owner, Description FROM target WHERE RowStatus = '3'")
newinserted_df.write.mode("append").jdbc(url = jdbcUrl, table = "Scd", properties = connectionProperties)

Handling Excel Data in Azure Databricks

By now, there is no default support of loading data from Spark in Cloud.(Here we take Azure Databricks as the example). Based on research, some links sound helpful.

https://stackoverflow.com/questions/44196741/how-to-construct-dataframe-from-a-excel-xls-xlsx-file-in-scala-spark

https://stackoverflow.com/questions/16888888/how-to-read-a-xlsx-file-using-the-pandas-library-in-ipython

This article describes some practice based on our recent projects.

Data Preparation

Sample MS Excel data as below: Two sheets: Companies, and Users

CompanyIDCompanyNameDescription
100Comp1Desc1
101Comp2Desc2
102Comp3Desc2

Solution 1: Read Excel data using Pandas, then convert Pandas DataFrame into Spark DataFrame.

from datetime import datetime
from pyspark.sql.functions import *
from pyspark.sql.types import *
import pandas as pd

filepath = '/dbfs/xxx/xxx/Samples.xlsx'
companies_pd_df = pd.read_excel(filepath, sheet_name='Companies')
companiesSchema = StructType([
  StructField("CompanyID", IntegerType(), False),
  StructField("CompanyName", StringType(), False),
  StructField("Description", StringType(), False)])
companies_df = spark.createDataFrame(companies_pd_df, schema=companiesSchema)

Note that:

  1. While convert Pandas DataFrame into Spark DataFrame, we need to manually define the Schema, otherwise the conversion will fail probably.
  2. In some cases, the created Spark DataFrame may display some dummy data or additional unnecessary row. In this case, we may filter out those unnecessary rows. e.g. companies_Df = companies_df.filter(isnan(“CompaniesID”) != True)

Solution 2: Use Spark Excel.

More details documentation can be found here. Code sample in Azure Databricks:

%scala

import org.apache.spark.sql._
import org.apache.spark.sql.types._
import com.crealytics.spark.excel._

val companiesSchema = StructType(Array(
  StructField("CompanyID", IntegerType, nullable = true),
  StructField("CompanyName", StringType, nullable = true),
  StructField("Description", StringType, nullable = true)))
val filepath = "/mnt/xxx/xxx/Samples.xlsx"
val df = spark.read
    .format("com.crealytics.spark.excel")
    .option("sheetName", "Companies")
    .option("useHeader", "true")
    .schema(companiesSchema)
    .load(filepath)
df.show()

Note that:

  1. Currently Spark Excel plugin is only available for Scala, not for Python yet. Hence we have to use magic command for Python notebook.
  2. Error/Exceptions may happens for some versions. I have tested out this successfully with version com.crealytics:spark-excel_2.11:0.12.0

Batch Data Ingest with Azure Databricks

In the previous blog we have introduced basic steps of data ingest of streaming data with Azure Databricks. Now we are going to describe how to do batch ingest with Azure Data Bricks. Most of the steps are similar, just the spark DataFrame API are different.

Data Source

spark.conf.set(
  "your blob account key here.blob.core.windows.net",
  "your blob account key value here")
val df = spark.read.json("wasbs://yout container@your blob account.blob.core.windows.net/veracity_ai_prod_41f11b1f17bb43d49ba51beabf2dd0a9/Availability/2018-06-04/06")
val availabilitySchema = df.schema
print(availabilitySchema)

Please note that we need to df.schema to get availability log’s schema, similar for other log formar, e.g. page views, event. It is very hard to program the schema manually.

Data Transform

Use spark.read to get the Data Frame from input data. Please pay attention to input file format. inputPath/2018-05-30/09 should be matched like (inputPath + “/*/*/”)

val availabilityDF = spark.read.schema(availabilitySchema)
.option("maxFilesTrigger", 1).json(inputPath + "/*/*/")

Use Spark SQL to do the transform, much easier than transform with Data Frame API.

availabilityDF.createOrReplaceTempView("logs")
val sqlDF = spark
.sql("SELECT internal.data.id as ID, availability[0].testTimestamp as timestamp, availability[0].testName as name, availability[0].runLocation as location, availability[0].message as message, availability[0].durationMetric.value as duration FROM logs")
sqlDF.show()
sqlDF.printSchema()

Data Sink

Here we use Azure SQL Server as the persistence of transformed data.

First define the database configuration

val jdbcUsername = "Your SQL Server username"
val jdbcPassword = "Your SQL Server password"
val jdbcHostname = "Your SQL Server name.database.windows.net" //typically, this is in the form or servername.database.windows.net
val jdbcPort = 1433
val jdbcDatabase ="Availability"
val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};user=${jdbcUsername};password=${jdbcPassword}"

Setup connection properties for JDBC

import java.util.Properties

val connectionProperties = new Properties()
connectionProperties.put("user", s"${jdbcUsername}")
connectionProperties.put("password", s"${jdbcPassword}")
val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver"
connectionProperties.setProperty("Driver", driverClass)

Finally use write.jdbc api to persistence transformed data into Azure SQL Server, quite cool.

sqlDF.write.jdbc(jdbcUrl, "aiavailability3", connectionProperties)

Introduction

Recently I am working on platform analytics solution for veracity.com, the open industry data platform from DNV GL. Data Ingest is one essential part for the whole data analytics end to end flow.

Application Insights(AI) logs are one of the key data source, hence we need build streaming analytics solution for such real time data source. Here we take availability logs data as example, you can also use other  AI data sources (PageViews….).

Export the data

There is a nice document for enabling the continues export from Application Insights.

Create a storage account

When you are creating it, I suggest to use “Standard-LRS“ with Cold tier. This is simply because we use storage account as a temporary storage place. (We will move into VERACITY container for storage in production)

Configure the export settings

In this case, we are exporting one type:

  • Availability (One way for in production monitoring data)

Once it is done and the telemetry data started exporting, you should be able to see folders in the blob container in storage account. One folder is mapping to one data type in above export setting.

Take a closer look at the folder content. Below screenshot shows the PageViews data between 12am to 1pm on 28th of Sept.

Please note that:

  • The date and time are UTC and are when the telemetry was deposited in the store – not the time it was generated.
  • The folders under Availability are stored in yyyy-mm-dd/hh/ structure.
  • Each blob is a text file that contains multiple ‘\n’-separated rows.
  • It contains the telemetry processed over a time period of roughly half a minute. It means that each folder will have about 100 to 120 files.
  • Each row represents a telemetry data point such as a request or page view.
  • Each row is an JSON document. The detailed data structure can be found at here.

With the nice Azure Storage Account Explorer, it is pretty easy to check the content of the blob file.

Please note that Application Insights also implemented the same logic as IP Anonymizationin Google Analytics. For all IP address that is collected by Application Insights, last octagon is anonymized to 0 (you can see the highlighted in above screenshot).

Read the data from blob

They are many ways to transfer the data out from storage account. Here we are going to use Azure Databricks.

Add one Azure Databricks service following here.

Add one cluster, choose the basic one following here.

Add one notebook:  named for example: Streaming Data Ingest

In order Azure Databricks can read data from blob storage, there are two ways: Databricks directly read blob storage through HDFS API; Or mount blob storage container into Databricks file system. See more information about how to access Blob Storage as here. We choose the second one.

Mount the storage folder into Databricks file system(DBFS), to mount a Blob storage container or a folder inside a container, use the following command:

dbutils.fs.mount(
  source = "wasbs://<your-container-name>@<your-storage-account-name>.blob.core.windows.net/<your-directory-name>",
  mountPoint = "/mnt/export",
  extraConfigs = Map("<conf-key>" -> "<conf-value>"))

Then you can check the folder data with command:

%fs ls /mnt

Initialize SparkSession:

import org.apache.spark.sql.functions._
import org.apache.spark.sql.SparkSession

val spark = SparkSession
.builder
.appName("BlobStorageStreaming")
.getOrCreate()

import spark.implicits._

In Spark 2.0+, we prefer use Structured Streaming(DataFrame /DataSet API) in, rather than Spark Core API, but when we see the Availability log data, it is XML like format, with several hierarchy. This is not easy to programming define the Structure type. The tricks here is that we first read one blob file to get Schema, then use this Schema when we create the structured streaming.

spark.conf.set(
"fs.azure.account.key.<your-storage-account-name>.blob.core.windows.net",
"<your key>")
val df = spark.read.json("wasbs://<your container>@<your-storage-account-name>.core.windows.net/veracity_ai_prod_41f11b1f17bb43d49ba51beabf2dd0a9/Availability/2018-06-04/06")
val availabilitySchema = df.schema
print(availabilitySchema)

Using structured streaming here to access the files in blob folder, we are going to emulate a stream from them by reading one file at a time, in the chronological order they were created.

//Use file sink to store output data into blob storage again.
val inputPath = "/mnt/export"
val outputPath = "/mnt/output"
val streamingInput = spark.readStream.schema(availabilitySchema).option("maxFilesTrigger", 1).json(inputPath + "/*/*/")
streamingInput.isStreaming
streamingInput.createOrReplaceTempView("logs")

Transform the data

Note that we do not need all availability data fields for analytics. Here we only pickup the fields that you need, as below (Needed fields are highlighted)

We use SQL to do the transform, then convert the DataFrame into DataSet, suggested if you need to do more analytics afterwards. In Structured Streaming, query is a handle to the streaming query that is running in the background.

val sqlDF = spark.sql("SELECT internal.data.id as ID, availability[0].testTimestamp as timestamp, availability[0].testName as name, availability[0].runLocation as location, availability[0].message as message, availability[0].durationMetric.value as duration FROM logs")
val sqlDS = sqlDF.as[(String, String, String, String, String, Double)]
//sqlDF.show()
//sqlDF.printSchema()
val query2 = 
sqlDS
.writeStream
.format("csv")
.option("path", outputPath)
.option("checkpointLocation", outputPath + "/checkpoint/")
.queryName("tblAvailability2")
.start()

Check the output folder in blob, to make sure that files are handled one by one continously (in micro-batch).

Note that: By the time writing this article, Structured Streaming does not support JDBC sink natively. Hence we need to extend the JDBCSink by ourselves.