Tag Archives: Spark Connector

SCD Implementation with Databricks Delta

Slowly Changing Dimensions (SCD) are the most commonly used advanced dimensional technique used in dimensional data warehouses. Slowly changing dimensions are used when you wish to capture the data changes (CDC) within the dimension over time. Two typical SCD scenarios: SCD Type 1 and SCD Type 2. Type 1 – For this type of slowly changing dimension you simply overwrite the existing data values with new data values. Type 2 – This is the most commonly used type of slowly changing dimension. For this type of slowly changing dimension, add a new record encompassing the change and mark the old record as inactive.  In this blog, we are going to describe how we implement SCD Type 1 and SCD Type 2 with Azure Databricks. SCD Type 1&2 are newly supported by Databricks Delta. Please see the office document link for the command.

Data Preparation

Original Records: target.csv

IDNameOwnerDescription
G87D744D-345T-46AD-BD9D-B18CB66345YTProduct0user0@dummy.comProduct0 Desc
D87D7FFD-E03B-46AD-BD9D-B18CB6632DC1Product1user1@dummy.comProduct1 Desc
FF7D7FFD-E03B-46AD-BD9D-B18CB6632WW3Product2user2@dummy.comProduct2 Desc
TT7D7FFD-E03B-46AD-BD9D-B18CB6632256Product3user3@dummy.comProduct3 Desc

Update Records: source.csv

IDNameOwnerDescription
D87D7FFD-E03B-46AD-BD9D-B18CB6632DC1Product1user1@dummy.comProduct1 Description Changed Only
FF7D7FFD-E03B-46AD-BD9D-B18CB6632WW3Product2userchanged@dummy.comProduct2 Owner and Desciption Changed
TT7D7FFD-E03B-46AD-BD9D-B18CB6632256Product3user3@dummy.comProduct3 Description
2A6CE7F2-4C6F-41DF-9819-235021DC1226Product4user4@dummy.comNew Product

ID: Business Key column; Name: SCD Type 2 column; Owner: SCD Type 2 column; Descriptio: SCD Type 1 column.

Expected result:

Product0 row will remain the same; Product1 row only description will be updated; Original Product2 row will be inactive, and new row of Product2 is added; Product3 will remain the same; Product4 new row will be added.

Data Load and Transformation

Upload the targer.csv and source.csv into cloud, e.g. Azure blob storage or DBFS. Then load the two data files into two DataFrames.

# Load Data
target_df = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("/mnt/vpa-raw-data-dev/POC/target.csv")
source_df = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("/mnt/vpa-raw-data-dev/POC/source.csv")

In order to use MERGE command, we need save to two DataFrames into Delta table. More info about Delta table, please see the link here.

# Delta tables
spark.sql("DROP TABLE IF EXISTS source")
spark.sql("CREATE TABLE source (ID STRING, Name STRING, Owner STRING, Description STRING) USING DELTA LOCATION '/mnt/vpa-raw-data-dev/POC/source'")
spark.sql("TRUNCATE TABLE source")
source_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save('/mnt/vpa-raw-data-dev/POC/source')

from pyspark.sql.functions import *
from pyspark.sql.types import *
spark.sql("DROP TABLE IF EXISTS target")
spark.sql("CREATE TABLE target (ID STRING, Name STRING, Owner STRING, Description STRING, RowStatus STRING) USING DELTA LOCATION '/mnt/vpa-raw-data-dev/POC/target'")
spark.sql("TRUNCATE TABLE target")
target_df = target_df.withColumn('RowStatus', lit(None).cast(StringType()))
target_df.write.mode("overwrite").format("delta").save("/mnt/vpa-raw-data-dev/POC/target")

SCD Merge

We have to use two MERGE command to cover the four scenarios: SCD 1, SCD 2, New rows, and unchanged rows, as one MERGE command only support one UPDATE/INSERT within.

%sql
-- Update SCD Type 2 rows (RowStatus = 2) and Insert Not Match rows (RowStatus = 3)
MERGE INTO target
USING source
ON target.ID = source.ID
WHEN MATCHED AND (source.Owner <> target.Owner OR source.Name <> target.Name) THEN
  UPDATE SET
    target.RowStatus = 2,
    target.Owner = source.Owner,
    target.Name = source.Name,
    target.Description = source.Description  
WHEN NOT MATCHED
  THEN INSERT (ID,Name,Owner,Description,RowStatus) VALUES (source.ID,source.Name,source.Owner,source.Description,3)
  
-- Merge SCD Type 1 update (RowStatus = 1)
MERGE INTO target
USING source
ON target.ID = source.ID
WHEN MATCHED AND (target.RowStatus IS NULL) AND (source.Description <> target.Description) THEN
  UPDATE SET
    target.RowStatus = 1,
    target.Description = source.Description

Check the merge result in target table if both SCD Type 1 and 2 update as expected.

Update and Insert

When the merged data set in target table, we need use this target dataset to update the dimension tables, probably in traditional database(SQL Server/MySQL…). Here we use Azure SQL Database as an example.

Spark JDBC doesn’t support Update command. One workaround is use Spark Connector. In order to achieve this, we need create one temporary table to store those SCD Type 1 and Type 2 rows. Then update the dimension table with the temporary table through Spark Connector.

#Filter out SCD Type 1 and 2 rows from target Delta table, and save into one temp table in Azure SQL
scd12_df = spark.sql("SELECT ID, Name, Owner, Description, RowStatus FROM target WHERE ( RowStatus = 2 OR RowStatus = 1)")
scd12_df.write.mode("overwrite").jdbc(url = jdbcUrl, table = "Scd_tmp", properties = connectionProperties)
%scala
import org.apache.spark.sql.SQLContext
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
import com.microsoft.azure.sqldb.spark.connect._

//Update columns value for those SCD Type 1 change only row
val scd1_query = """
              |UPDATE Scd
              |SET Scd.Description = Scd_tmp.Description
              |FROM Scd
              |INNER JOIN Scd_tmp
              |ON Scd.ID = Scd_tmp.ID AND Scd_tmp.RowStatus = '1';
            """.stripMargin

val scd1_config = Config(Map(
  "url"            -> "dummy url",
  "databaseName"   -> "dummy databaseName",
  "user"           -> "dummy user",
  "password"       -> "dummy pwd",
  "queryCustom"    -> scd1_query
))
sqlContext.sqlDBQuery(scd1_config)

//Update SCD Type 2 row: Set Active_Record as 0, and Record_EndDate as current datatime.
val scd2_query2 = """
              |UPDATE Scd
              |SET Scd.Active_Record = '0', Scd.Record_EndDate = GETDATE()
              |FROM Scd
              |INNER JOIN Scd_tmp
              |ON Scd.ID = Scd_tmp.ID AND Scd_tmp.RowStatus = '2';
            """.stripMargin

val scd2_config = Config(Map(
  "url"            -> "dummy url",
  "databaseName"   -> "dummy databaseName",
  "user"           -> "dummy user",
  "password"       -> "dummy pwd",
  "queryCustom"    -> scd1_query
))
sqlContext.sqlDBQuery(scd2_config)

Finally insert the those from Spark into SQL tables through JDBC API.

newinserted_df = spark.sql("SELECT ID, Name, Owner, Description FROM target WHERE RowStatus = '3'")
newinserted_df.write.mode("append").jdbc(url = jdbcUrl, table = "Scd", properties = connectionProperties)