In the previous blog we have introduced basic steps of data ingest of streaming data with Azure Databricks. Now we are going to describe how to do batch ingest with Azure Data Bricks. Most of the steps are similar, just the spark DataFrame API are different.
Data Source
spark.conf.set( "your blob account key here.blob.core.windows.net", "your blob account key value here") val df = spark.read.json("wasbs://yout container@your blob account.blob.core.windows.net/veracity_ai_prod_41f11b1f17bb43d49ba51beabf2dd0a9/Availability/2018-06-04/06") val availabilitySchema = df.schema print(availabilitySchema)
Please note that we need to df.schema to get availability log’s schema, similar for other log formar, e.g. page views, event. It is very hard to program the schema manually.
Data Transform
Use spark.read to get the Data Frame from input data. Please pay attention to input file format. inputPath/2018-05-30/09 should be matched like (inputPath + “/*/*/”)
val availabilityDF = spark.read.schema(availabilitySchema) .option("maxFilesTrigger", 1).json(inputPath + "/*/*/")
Use Spark SQL to do the transform, much easier than transform with Data Frame API.
availabilityDF.createOrReplaceTempView("logs") val sqlDF = spark .sql("SELECT internal.data.id as ID, availability[0].testTimestamp as timestamp, availability[0].testName as name, availability[0].runLocation as location, availability[0].message as message, availability[0].durationMetric.value as duration FROM logs") sqlDF.show() sqlDF.printSchema()
Data Sink
Here we use Azure SQL Server as the persistence of transformed data.
First define the database configuration
val jdbcUsername = "Your SQL Server username" val jdbcPassword = "Your SQL Server password" val jdbcHostname = "Your SQL Server name.database.windows.net" //typically, this is in the form or servername.database.windows.net val jdbcPort = 1433 val jdbcDatabase ="Availability" val driver = "com.microsoft.sqlserver.jdbc.SQLServerDriver" val jdbcUrl = s"jdbc:sqlserver://${jdbcHostname}:${jdbcPort};database=${jdbcDatabase};user=${jdbcUsername};password=${jdbcPassword}"
Setup connection properties for JDBC
import java.util.Properties val connectionProperties = new Properties() connectionProperties.put("user", s"${jdbcUsername}") connectionProperties.put("password", s"${jdbcPassword}") val driverClass = "com.microsoft.sqlserver.jdbc.SQLServerDriver" connectionProperties.setProperty("Driver", driverClass)
Finally use write.jdbc api to persistence transformed data into Azure SQL Server, quite cool.
sqlDF.write.jdbc(jdbcUrl, "aiavailability3", connectionProperties)