Category Archives: Feature Engineering

Kaggle AllState Competition in Azure Databricks

This post, we will describe how to practice one Kaggle competition process with Azure Databricks. Compared to run our training and tuning phase in local machines or single servers, it is quite fast that we can train our model in Azure Databricks with Spark.

Kaggle Allstate Claims Severity

When you’ve been devastated by a serious car accident, your focus is on the things that matter the most: family, friends, and other loved ones. Pushing paper with your insurance agent is the last place you want your time or mental energy spent. This is why Allstate, a personal insurer in the United States, is continually seeking fresh ideas to improve their claims service for the over 16 million households they protect.

Allstate is currently developing automated methods of predicting the cost, and hence severity, of claims. In this recruitment challenge, Kagglers are invited to show off their creativity and flex their technical chops by creating an algorithm which accurately predicts claims severity. Aspiring competitors will demonstrate insight into better ways to predict claims severity for the chance to be part of Allstate’s efforts to ensure a worry-free customer experience.

Data and Preparation

Download the data from https://www.kaggle.com/c/allstate-claims-severity/data. Upload the data file into DBFS or Azure blob storage, then read train data and test data into DataFrame.

import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import RegressionMetrics
print("Read and load data started...")
trainInput = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("dbfs:/yourpath/train.csv")
.cache())

testInput = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("dbfs:/yourpath/test.csv")
.cache())
print("Read and load data completed...")
data = trainInput.withColumnRenamed("loss", "label")
[trainingData, validationData] = data.randomSplit([0.7, 0.3])
trainingData.cache()
validationData.cache()
testData = testInput.cache()

Please note that you should replace the above bdfs path with your one. For Spark Dataframe/Dataset/RDD, better we cache them for future release. Here we cache trainingData, validationData and testData.

Modeling and Training

print("Feature engineering...")
print("Handle categories data...")
# Use StringIndexer or OneHotEncoder for categories columns
isCateg = lambda c: c.startswith("cat")
categNewCol = lambda c: "idx_{0}".format(c) if (isCateg(c)) else c

stringIndexerStages = map(lambda c: StringIndexer(inputCol=c, outputCol=categNewCol(c))
.fit(trainInput.select(c).union(testInput.select(c))), filter(isCateg, trainingData.columns))

removeTooManyCategs = lambda c: not re.match(r"cat(109$|110$|112$|113$|116$)", c)

# Keep those feature columns only
onlyFeatureCols = lambda c: not re.match(r"id|label", c)

featureCols = map(categNewCol, 
filter(onlyFeatureCols, 
filter(removeTooManyCategs, 
trainingData.columns)))

# Assemble features
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
print("Features generation and assembly completed...")
print("Building Random Forest for regression..")
algo = RandomForestRegressor(featuresCol="features", labelCol="label")

stages = stringIndexerStages
stages.append(assembler)
stages.append(algo)

#Build pipeline
pipeline = Pipeline(stages=stages)
print("K fold cross validation...")
numTrees = [5, 20]
maxDepth = [4, 6]
maxBins = [32]
numFolds = 3

paramGrid = (ParamGridBuilder()
.addGrid(algo.numTrees, numTrees)
.addGrid(algo.maxDepth, maxDepth)
.addGrid(algo.maxBins, maxBins)
.build())

cv = CrossValidator(estimator=pipeline,
evaluator=RegressionEvaluator(),
estimatorParamMaps=paramGrid,
numFolds=numFolds)

cvModel = cv.fit(trainingData)

Results Metrics and Prediction

trainPredictionsAndLabels = cvModel.transform(trainingData).select("label", "prediction").rdd

validPredictionsAndLabels = cvModel.transform(validationData).select("label", "prediction").rdd

trainRegressionMetrics = RegressionMetrics(trainPredictionsAndLabels)
validRegressionMetrics = RegressionMetrics(validPredictionsAndLabels)

bestModel = cvModel.bestModel
featureImportances = bestModel.stages[-1].featureImportances.toArray()

print("TrainingData count: {0}".format(trainingData.count()))
print("ValidationData count: {0}".format(validationData.count()))
print("TestData count: {0}".format(testData.count()))
print("=====================================================================")
print("Param algoNumTrees = {0}".format(",".join(map(lambda x:str(x), numTrees))))
print("Param algoMaxDepth = {0}".format(",".join(map(lambda x:str(x), maxDepth))))
print("Param algoMaxBins = {0}".format(",".join(map(lambda x:str(x), maxBins))))
print("Param numFolds = {0}".format(numFolds))
print("=====================================================================\n")
print("Training data MSE = {0}".format(trainRegressionMetrics.meanSquaredError))
print("Training data RMSE = {0}".format(trainRegressionMetrics.rootMeanSquaredError))
print("Training data R-squared = {0}".format(trainRegressionMetrics.r2))
print("Training data MAE = {0}".format(trainRegressionMetrics.meanAbsoluteError))
print("Training data Explained variance = {0}".format(trainRegressionMetrics.explainedVariance))
print("=====================================================================\n")
print("Validation data MSE = {0}".format(validRegressionMetrics.meanSquaredError))
print("Validation data RMSE = {0}".format(validRegressionMetrics.rootMeanSquaredError))
print("Validation data R-squared = {0}".format(validRegressionMetrics.r2))
print("Validation data MAE = {0}".format(validRegressionMetrics.meanAbsoluteError))
print("Validation data Explained variance = {0}".format(validRegressionMetrics.explainedVariance))
print("=====================================================================\n")
print("Feature importances:\n{0}\n".format("\n".join(map(lambda z: "{0} = {1}".format(str(z[0]),str(z[1])), zip(featureCols, featureImportances)))))
cvModel.transform(testData)\
.select("id", "prediction")\
.withColumnRenamed("prediction", "loss")\
.coalesce(1)\
.write.format("csv")\
.option("header", "true")\
.save("dbfs:/yourpath/rf_sub.csv"

Finally you can upload your result file to Kaggle if you want submit your results.

 

 

Feature Engineering with PySpark in Azure Databricks

Azure Databricks is a very cool easy to use platform for both analytics engineers and machine learning developers. I would like to use this post to summarize basic APIs and tricks in feature engineering with Azure Databricks. Previously I was using Jupyter notebook or PyCharm to develop or practice some machine learning cases. Compared to Jupyter Notebook, Azure Databricks provides similar Notebook, which adds some additional features. In the future I will describe how to use Azure Databricks for production.

Feature engineering is the preprocessing phase in machine learning, also needs huge effort from developers to get data ready for modeling and training. Here I list some basic feature engineering scenarios with PySpark in Azure Databricks.

If you want to run code snippet below in normal Jupyter Notebook, you need add Spark initialization code as below. (Not necessary in Azure Databricks, as it is already for use.)

spark = SparkSession\
.builder\
.appName("AppName")\
.getOrCreate()

Continuous Data

#Binarizer
from __future__ import print_function
from pyspark.sql import SparkSession
from pyspark.ml.feature import Binarizer

continuousDataFrame = spark.createDataFrame([
(0, 1.1),
(1, 8.5),
(2, 5.2)
], ["id", "feature"])

binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature")

binarizedDataFrame = binarizer.transform(continuousDataFrame)

print("Binarizer output with Threshold = %f" % binarizer.getThreshold())
binarizedDataFrame.show()
#Bucketizer
from pyspark.ml.feature import Bucketizer

splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")]
data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)]
dataFrame = spark.createDataFrame(data, ["features"])

bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures")

# Assign buckets per the splits boundary
bucketedData = bucketizer.transform(dataFrame)

print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1))
bucketedData.show()
#QuantileDiscretizer
from pyspark.ml.feature import QuantileDiscretizer

data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)]
df = spark.createDataFrame(data, ["id", "hour"])
df = df.repartition(1)

# Divide into 3 buckets as quantile distribution
discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result")
discretizerModel = discretizer.fit(df)
result = discretizerModel.transform(df)
result.show()
#MaxAbsScaler
from pyspark.ml.feature import MaxAbsScaler
from pyspark.ml.linalg import Vectors

dataFrame = spark.createDataFrame([
(0, Vectors.dense([1.0, 0.1, -8.0]),),
(1, Vectors.dense([2.0, 1.0, -4.0]),),
(2, Vectors.dense([4.0, 10.0, 8.0]),)
], ["id", "features"])

scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures")

# Calculate Max value model
scalerModel = scaler.fit(dataFrame)

# Transform with scale model, so that values are scaled between [-1.0, 1.0]
scaledData = scalerModel.transform(dataFrame)
scaledData.select("features", "scaledFeatures").show()
#Standard scaler
from pyspark.ml.feature import StandardScaler

dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt")
scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures",
withStd=True, withMean=False)

# Calculate mean and variance
scalerModel = scaler.fit(dataFrame)

# Standardize
scaledData = scalerModel.transform(dataFrame)
scaledData.show()
#Polynomial expansion
from pyspark.ml.feature import PolynomialExpansion

df = spark.createDataFrame([
(Vectors.dense([2.0, 1.0]),),
(Vectors.dense([0.0, 0.0]),),
(Vectors.dense([3.0, -1.0]),)
], ["features"])

polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures")
polyDF = polyExpansion.transform(df)

polyDF.show(truncate=False)

Discrete Data

# StringIndexer and One-hot encoder
from pyspark.ml.feature import OneHotEncoder, StringIndexer

df = spark.createDataFrame([
(0, "a"),
(1, "b"),
(2, "c"),
(3, "a"),
(4, "a"),
(5, "c")
], ["id", "category"])

stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex")
model = stringIndexer.fit(df)
indexed = model.transform(df)

encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec")
encoded = encoder.transform(indexed)
encoded.show()

Text

#Use Stop Words
from pyspark.ml.feature import StopWordsRemover

sentenceData = spark.createDataFrame([
(0, ["I", "saw", "the", "red", "balloon"]),
(1, ["Mary", "had", "a", "little", "lamb"])
], ["id", "raw"])

remover = StopWordsRemover(inputCol="raw", outputCol="filtered")
remover.transform(sentenceData).show(truncate=False)
# Tokenizer
from pyspark.ml.feature import Tokenizer, RegexTokenizer
from pyspark.sql.functions import col, udf
from pyspark.sql.types import IntegerType

sentenceDataFrame = spark.createDataFrame([
(0, "Hi I heard about Spark"),
(1, "I wish Java could use case classes"),
(2, "Logistic,regression,models,are,neat")
], ["id", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")

regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W")

countTokens = udf(lambda words: len(words), IntegerType())

tokenized = tokenizer.transform(sentenceDataFrame)
tokenized.select("sentence", "words")\
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)

regexTokenized = regexTokenizer.transform(sentenceDataFrame)
regexTokenized.select("sentence", "words") \
.withColumn("tokens", countTokens(col("words"))).show(truncate=False)
# Count Vectorizer
from pyspark.ml.feature import CountVectorizer

df = spark.createDataFrame([
(0, "a b c".split(" ")),
(1, "a b b c a".split(" "))
], ["id", "words"])

cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0)
model = cv.fit(df)
result = model.transform(df)
result.show(truncate=False)
# TF-IDF
from pyspark.ml.feature import HashingTF, IDF, Tokenizer

sentenceData = spark.createDataFrame([
(0.0, "Hi I heard about Spark"),
(0.0, "I wish Java could use case classes"),
(1.0, "Logistic regression models are neat")
], ["label", "sentence"])

tokenizer = Tokenizer(inputCol="sentence", outputCol="words")
wordsData = tokenizer.transform(sentenceData)

hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20)
featurizedData = hashingTF.transform(wordsData)

idf = IDF(inputCol="rawFeatures", outputCol="features")
idfModel = idf.fit(featurizedData)
rescaledData = idfModel.transform(featurizedData)

rescaledData.select("label", "features").show()
# NGram
from pyspark.ml.feature import NGram

#Hanmeimei loves LiLei
#LiLei loves Hanmeimei

wordDataFrame = spark.createDataFrame([
(0, ["Hi", "I", "heard", "about", "Spark"]),
(1, ["I", "wish", "Java", "could", "use", "case", "classes"]),
(2, ["Logistic", "regression", "models", "are", "neat"])
], ["id", "words"])

ngram = NGram(n=2, inputCol="words", outputCol="ngrams")

ngramDataFrame = ngram.transform(wordDataFrame)
ngramDataFrame.select("ngrams").show(truncate=False)

Others

# SQL Transformer
from pyspark.ml.feature import SQLTransformer
df = spark.createDataFrame([
(0, 1.0, 3.0),
(2, 2.0, 5.0)
], ["id", "v1", "v2"])

sqlTrans = SQLTransformer(
statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__")
sqlTrans.transform(df).show()
# R formula transform
from pyspark.ml.feature import RFormula
dataset = spark.createDataFrame(
[(7, "US", 18, 1.0),
(8, "CA", 12, 0.0),
(9, "NZ", 15, 0.0)],
["id", "country", "hour", "clicked"])

formula = RFormula(
formula="clicked ~ country + hour",
featuresCol="features",
labelCol="label")

output = formula.fit(dataset).transform(dataset)
output.select("features", "label").show()

Note that: There are many feature engineering APIs are not mentioned here. Please see more info from latest pyppark.ml.feature package.

https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature