Tag Archives: Databricks

Kaggle AllState Competition in Azure Databricks

This post, we will describe how to practice one Kaggle competition process with Azure Databricks. Compared to run our training and tuning phase in local machines or single servers, it is quite fast that we can train our model in Azure Databricks with Spark.

Kaggle Allstate Claims Severity

When you’ve been devastated by a serious car accident, your focus is on the things that matter the most: family, friends, and other loved ones. Pushing paper with your insurance agent is the last place you want your time or mental energy spent. This is why Allstate, a personal insurer in the United States, is continually seeking fresh ideas to improve their claims service for the over 16 million households they protect.

Allstate is currently developing automated methods of predicting the cost, and hence severity, of claims. In this recruitment challenge, Kagglers are invited to show off their creativity and flex their technical chops by creating an algorithm which accurately predicts claims severity. Aspiring competitors will demonstrate insight into better ways to predict claims severity for the chance to be part of Allstate’s efforts to ensure a worry-free customer experience.

Data and Preparation

Download the data from https://www.kaggle.com/c/allstate-claims-severity/data. Upload the data file into DBFS or Azure blob storage, then read train data and test data into DataFrame.

import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import RegressionMetrics
print("Read and load data started...")
trainInput = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("dbfs:/yourpath/train.csv")
.cache())

testInput = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("dbfs:/yourpath/test.csv")
.cache())
print("Read and load data completed...")
data = trainInput.withColumnRenamed("loss", "label")
[trainingData, validationData] = data.randomSplit([0.7, 0.3])
trainingData.cache()
validationData.cache()
testData = testInput.cache()

Please note that you should replace the above bdfs path with your one. For Spark Dataframe/Dataset/RDD, better we cache them for future release. Here we cache trainingData, validationData and testData.

Modeling and Training

print("Feature engineering...")
print("Handle categories data...")
# Use StringIndexer or OneHotEncoder for categories columns
isCateg = lambda c: c.startswith("cat")
categNewCol = lambda c: "idx_{0}".format(c) if (isCateg(c)) else c

stringIndexerStages = map(lambda c: StringIndexer(inputCol=c, outputCol=categNewCol(c))
.fit(trainInput.select(c).union(testInput.select(c))), filter(isCateg, trainingData.columns))

removeTooManyCategs = lambda c: not re.match(r"cat(109$|110$|112$|113$|116$)", c)

# Keep those feature columns only
onlyFeatureCols = lambda c: not re.match(r"id|label", c)

featureCols = map(categNewCol, 
filter(onlyFeatureCols, 
filter(removeTooManyCategs, 
trainingData.columns)))

# Assemble features
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
print("Features generation and assembly completed...")
print("Building Random Forest for regression..")
algo = RandomForestRegressor(featuresCol="features", labelCol="label")

stages = stringIndexerStages
stages.append(assembler)
stages.append(algo)

#Build pipeline
pipeline = Pipeline(stages=stages)
print("K fold cross validation...")
numTrees = [5, 20]
maxDepth = [4, 6]
maxBins = [32]
numFolds = 3

paramGrid = (ParamGridBuilder()
.addGrid(algo.numTrees, numTrees)
.addGrid(algo.maxDepth, maxDepth)
.addGrid(algo.maxBins, maxBins)
.build())

cv = CrossValidator(estimator=pipeline,
evaluator=RegressionEvaluator(),
estimatorParamMaps=paramGrid,
numFolds=numFolds)

cvModel = cv.fit(trainingData)

Results Metrics and Prediction

trainPredictionsAndLabels = cvModel.transform(trainingData).select("label", "prediction").rdd

validPredictionsAndLabels = cvModel.transform(validationData).select("label", "prediction").rdd

trainRegressionMetrics = RegressionMetrics(trainPredictionsAndLabels)
validRegressionMetrics = RegressionMetrics(validPredictionsAndLabels)

bestModel = cvModel.bestModel
featureImportances = bestModel.stages[-1].featureImportances.toArray()

print("TrainingData count: {0}".format(trainingData.count()))
print("ValidationData count: {0}".format(validationData.count()))
print("TestData count: {0}".format(testData.count()))
print("=====================================================================")
print("Param algoNumTrees = {0}".format(",".join(map(lambda x:str(x), numTrees))))
print("Param algoMaxDepth = {0}".format(",".join(map(lambda x:str(x), maxDepth))))
print("Param algoMaxBins = {0}".format(",".join(map(lambda x:str(x), maxBins))))
print("Param numFolds = {0}".format(numFolds))
print("=====================================================================\n")
print("Training data MSE = {0}".format(trainRegressionMetrics.meanSquaredError))
print("Training data RMSE = {0}".format(trainRegressionMetrics.rootMeanSquaredError))
print("Training data R-squared = {0}".format(trainRegressionMetrics.r2))
print("Training data MAE = {0}".format(trainRegressionMetrics.meanAbsoluteError))
print("Training data Explained variance = {0}".format(trainRegressionMetrics.explainedVariance))
print("=====================================================================\n")
print("Validation data MSE = {0}".format(validRegressionMetrics.meanSquaredError))
print("Validation data RMSE = {0}".format(validRegressionMetrics.rootMeanSquaredError))
print("Validation data R-squared = {0}".format(validRegressionMetrics.r2))
print("Validation data MAE = {0}".format(validRegressionMetrics.meanAbsoluteError))
print("Validation data Explained variance = {0}".format(validRegressionMetrics.explainedVariance))
print("=====================================================================\n")
print("Feature importances:\n{0}\n".format("\n".join(map(lambda z: "{0} = {1}".format(str(z[0]),str(z[1])), zip(featureCols, featureImportances)))))
cvModel.transform(testData)\
.select("id", "prediction")\
.withColumnRenamed("prediction", "loss")\
.coalesce(1)\
.write.format("csv")\
.option("header", "true")\
.save("dbfs:/yourpath/rf_sub.csv"

Finally you can upload your result file to Kaggle if you want submit your results.