Slowly Changing Dimensions (SCD) are the most commonly used advanced dimensional technique used in dimensional data warehouses. Slowly changing dimensions are used when you wish to capture the data changes (CDC) within the dimension over time. Two typical SCD scenarios: SCD Type 1 and SCD Type 2. Type 1 – For this type of slowly changing dimension you simply overwrite the existing data values with new data values. Type 2 – This is the most commonly used type of slowly changing dimension. For this type of slowly changing dimension, add a new record encompassing the change and mark the old record as inactive. In this blog, we are going to describe how we implement SCD Type 1 and SCD Type 2 with Azure Databricks. SCD Type 1&2 are newly supported by Databricks Delta. Please see the office document link for the command.
Data Preparation
Original Records: target.csv
ID | Name | Owner | Description |
G87D744D-345T-46AD-BD9D-B18CB66345YT | Product0 | user0@dummy.com | Product0 Desc |
D87D7FFD-E03B-46AD-BD9D-B18CB6632DC1 | Product1 | user1@dummy.com | Product1 Desc |
FF7D7FFD-E03B-46AD-BD9D-B18CB6632WW3 | Product2 | user2@dummy.com | Product2 Desc |
TT7D7FFD-E03B-46AD-BD9D-B18CB6632256 | Product3 | user3@dummy.com | Product3 Desc |
Update Records: source.csv
ID | Name | Owner | Description |
D87D7FFD-E03B-46AD-BD9D-B18CB6632DC1 | Product1 | user1@dummy.com | Product1 Description Changed Only |
FF7D7FFD-E03B-46AD-BD9D-B18CB6632WW3 | Product2 | userchanged@dummy.com | Product2 Owner and Desciption Changed |
TT7D7FFD-E03B-46AD-BD9D-B18CB6632256 | Product3 | user3@dummy.com | Product3 Description |
2A6CE7F2-4C6F-41DF-9819-235021DC1226 | Product4 | user4@dummy.com | New Product |
ID: Business Key column; Name: SCD Type 2 column; Owner: SCD Type 2 column; Descriptio: SCD Type 1 column.
Expected result:
Product0 row will remain the same; Product1 row only description will be updated; Original Product2 row will be inactive, and new row of Product2 is added; Product3 will remain the same; Product4 new row will be added.
Data Load and Transformation
Upload the targer.csv and source.csv into cloud, e.g. Azure blob storage or DBFS. Then load the two data files into two DataFrames.
# Load Data
target_df = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("/mnt/vpa-raw-data-dev/POC/target.csv")
source_df = spark.read.option("sep", ",").option("header", "true").option("inferSchema", "true").csv("/mnt/vpa-raw-data-dev/POC/source.csv")
In order to use MERGE command, we need save to two DataFrames into Delta table. More info about Delta table, please see the link here.
# Delta tables
spark.sql("DROP TABLE IF EXISTS source")
spark.sql("CREATE TABLE source (ID STRING, Name STRING, Owner STRING, Description STRING) USING DELTA LOCATION '/mnt/vpa-raw-data-dev/POC/source'")
spark.sql("TRUNCATE TABLE source")
source_df.write.format("delta").mode("overwrite").option("mergeSchema", "true").save('/mnt/vpa-raw-data-dev/POC/source')
from pyspark.sql.functions import *
from pyspark.sql.types import *
spark.sql("DROP TABLE IF EXISTS target")
spark.sql("CREATE TABLE target (ID STRING, Name STRING, Owner STRING, Description STRING, RowStatus STRING) USING DELTA LOCATION '/mnt/vpa-raw-data-dev/POC/target'")
spark.sql("TRUNCATE TABLE target")
target_df = target_df.withColumn('RowStatus', lit(None).cast(StringType()))
target_df.write.mode("overwrite").format("delta").save("/mnt/vpa-raw-data-dev/POC/target")
SCD Merge
We have to use two MERGE command to cover the four scenarios: SCD 1, SCD 2, New rows, and unchanged rows, as one MERGE command only support one UPDATE/INSERT within.

%sql
-- Update SCD Type 2 rows (RowStatus = 2) and Insert Not Match rows (RowStatus = 3)
MERGE INTO target
USING source
ON target.ID = source.ID
WHEN MATCHED AND (NOT(source.Owner <=> target.Owner) OR NOT(source.Name <=> target.Name)) THEN
UPDATE SET
target.RowStatus = 2,
target.Owner = source.Owner,
target.Name = source.Name,
target.Description = source.Description
WHEN NOT MATCHED
THEN INSERT (ID,Name,Owner,Description,RowStatus) VALUES (source.ID,source.Name,source.Owner,source.Description,3)
-- Merge SCD Type 1 update (RowStatus = 1)
MERGE INTO target
USING source
ON target.ID = source.ID
WHEN MATCHED AND (target.RowStatus IS NULL) AND (NOT(source.Description <=> target.Description)) THEN
UPDATE SET
target.RowStatus = 1,
target.Description = source.Description
Check the merge result in target table if both SCD Type 1 and 2 update as expected.

Update and Insert
When the merged data set in target table, we need use this target dataset to update the dimension tables, probably in traditional database(SQL Server/MySQL…). Here we use Azure SQL Database as an example.

Spark JDBC doesn’t support Update command. One workaround is use Spark Connector. In order to achieve this, we need create one temporary table to store those SCD Type 1 and Type 2 rows. Then update the dimension table with the temporary table through Spark Connector.
#Filter out SCD Type 1 and 2 rows from target Delta table, and save into one temp table in Azure SQL
scd12_df = spark.sql("SELECT ID, Name, Owner, Description, RowStatus FROM target WHERE ( RowStatus = 2 OR RowStatus = 1)")
scd12_df.write.mode("overwrite").jdbc(url = jdbcUrl, table = "Scd_tmp", properties = connectionProperties)
%scala
import org.apache.spark.sql.SQLContext
import com.microsoft.azure.sqldb.spark.config.Config
import com.microsoft.azure.sqldb.spark.query._
import com.microsoft.azure.sqldb.spark.connect._
//Update columns value for those SCD Type 1 change only row
val scd1_query = """
|UPDATE Scd
|SET Scd.Description = Scd_tmp.Description
|FROM Scd
|INNER JOIN Scd_tmp
|ON Scd.ID = Scd_tmp.ID AND Scd_tmp.RowStatus = '1';
""".stripMargin
val scd1_config = Config(Map(
"url" -> "dummy url",
"databaseName" -> "dummy databaseName",
"user" -> "dummy user",
"password" -> "dummy pwd",
"queryCustom" -> scd1_query
))
sqlContext.sqlDBQuery(scd1_config)
//Update SCD Type 2 row: Set Active_Record as 0, and Record_EndDate as current datatime.
val scd2_query2 = """
|UPDATE Scd
|SET Scd.Active_Record = '0', Scd.Record_EndDate = GETDATE()
|FROM Scd
|INNER JOIN Scd_tmp
|ON Scd.ID = Scd_tmp.ID AND Scd_tmp.RowStatus = '2';
""".stripMargin
val scd2_config = Config(Map(
"url" -> "dummy url",
"databaseName" -> "dummy databaseName",
"user" -> "dummy user",
"password" -> "dummy pwd",
"queryCustom" -> scd1_query
))
sqlContext.sqlDBQuery(scd2_config)
Finally insert the those from Spark into SQL tables through JDBC API.
newinserted_df = spark.sql("SELECT ID, Name, Owner, Description FROM target WHERE RowStatus = '3'")
newinserted_df.write.mode("append").jdbc(url = jdbcUrl, table = "Scd", properties = connectionProperties)