Tag Archives: Databricks

NCFM on Azure Databricks

This article shows how we can run deep learning model for one image classification task. We take the Kaggle NCFM competition as the playground project.

Data and Preparation

Download the data from  https://www.kaggle.com/c/the-nature-conservancy-fisheries-monitoring/data . Unzip and upload the data file into DBFS or Azure blob storage. This is a typical single-label image classification problem covering 8 classes (7 for fish and 1 for non-fish). Training set is rather small, only 3777 images, extra 1000 for testing. Images are challenging since noise/background dominates in the whole picture. To prepare model training, we will split the labed data into training and validation. Usually, 80% for training and 20% for validation. After the split, there are separate folders: val_train, train_split.

Modeling and Training

Small size of training set may be risk of overfiting during model training. One solution is transfer leaning/fine-tune the weights of pre-trained networks. Pre-trained models trained across multiple GPUs on ImageNet; ConvNet features are more generic in early layers and more original-dataset-specific in later layers; Use a small learning rate to fine-tune; Usually fine-tuning begins with later layers;

Here we use Inception-V3 model with ImageNet Pretrained weights. The pre-trained Inception-v3 model achieves state-of-the-art accuracy for recognizing general objects with 1000 classes, like “Zebra”, “Dalmatian”, and “Dishwasher”. The model extracts general features from input images in the first part and classifies them based on those features in the second part.

import os
from keras.applications.inception_v3 import InceptionV3
from keras.layers import Flatten, Dense, AveragePooling2D
from keras.models import Model
from keras.optimizers import RMSprop, SGD
from keras.callbacks import ModelCheckpoint
from keras.preprocessing.image import ImageDataGenerator

learning_rate = 0.001
img_width = 299
img_height = 299
nbr_train_samples = 3019
nbr_validation_samples = 758
nbr_epochs = 20
batch_size = 32
train_data_dir = '/dbfs/mnt/vpa-raw-data-dev/POC/train_split'
val_data_dir = '/dbfs/mnt/vpa-raw-data-dev/POC/val_split'

FishNames = ['ALB', 'BET', 'DOL', 'LAG', 'NoF', 'OTHER', 'SHARK', 'YFT']

print('Loading InceptionV3 Weights ...')
InceptionV3_notop = InceptionV3(include_top=False, weights='imagenet',
                    input_tensor=None, input_shape=(299, 299, 3))

print('Adding Average Pooling Layer and Softmax Output Layer ...')
output = InceptionV3_notop.get_layer(index = -1).output  # Shape: (8, 8, 2048)
output = AveragePooling2D((8, 8), strides=(8, 8), name='avg_pool')(output)
output = Flatten(name='flatten')(output)
output = Dense(8, activation='softmax', name='predictions')(output)

InceptionV3_model = Model(InceptionV3_notop.input, output)
InceptionV3_model.summary()

optimizer = SGD(lr = learning_rate, momentum = 0.9, decay = 0.0, nesterov = True)
InceptionV3_model.compile(loss='categorical_crossentropy', optimizer = optimizer, metrics = ['accuracy'])

# autosave best Model
best_model_file = "/dbfs/mnt/vpa-raw-data-dev/POC/weights.h5"
best_model = ModelCheckpoint(best_model_file, monitor='val_acc', verbose = 1, save_best_only = True)

In order to improve our ranking, we use data augmentation for testing images.

# this is the augmentation configuration we will use for training
train_datagen = ImageDataGenerator(
        rescale=1./255,
        shear_range=0.1,
        zoom_range=0.1,
        rotation_range=10.,
        width_shift_range=0.1,
        height_shift_range=0.1,
        horizontal_flip=True)

# this is the augmentation configuration we will use for validation, only rescaling
val_datagen = ImageDataGenerator(rescale=1./255)

train_generator = train_datagen.flow_from_directory(
        train_data_dir,
        target_size = (img_width, img_height),
        batch_size = batch_size,
        shuffle = True,
        classes = FishNames,
        class_mode = 'categorical')

validation_generator = val_datagen.flow_from_directory(
        val_data_dir,
        target_size=(img_width, img_height),
        batch_size=batch_size,
        shuffle = True,
        #save_to_dir = '/Users/Sandy/Repo/Kaggle_NCFM/visulization',
        #save_prefix = 'aug',
        classes = FishNames,
        class_mode = 'categorical')

InceptionV3_model.fit_generator(
        train_generator,
        steps_per_epoch = 94,
        nb_epoch = 20,
        validation_data = validation_generator,
        validation_steps = 23,
        callbacks = [best_model])

Prediction

import os
import numpy as np
from keras.preprocessing.image import ImageDataGenerator
from keras.models import load_model

img_width = 299
img_height = 299
batch_size = 32
nbr_test_samples = 1000

FishNames = ['ALB', 'BET', 'DOL', 'LAG', 'NoF', 'OTHER', 'SHARK', 'YFT']

root_path = '/dbfs/mnt/vpa-raw-data-dev/POC/'
weights_path = os.path.join(root_path, 'weights.h5')
print(weights_path)
test_data_dir = os.path.join(root_path, 'test_stg1/')
print(test_data_dir)

test_datagen = ImageDataGenerator(rescale=1./255)
test_generator = test_datagen.flow_from_directory(
        test_data_dir,
        target_size=(img_width, img_height),
        batch_size=batch_size,
        shuffle = False, # Important !!!
        classes = None,
        class_mode = None)

test_image_list = test_generator.filenames

print('Loading model and weights from training process ...')
InceptionV3_model = load_model(weights_path)
print('Begin to predict for testing data ...')
predictions = InceptionV3_model.predict_generator(test_generator, nbr_test_samples)
np.savetxt(os.path.join(root_path, 'predictions.txt'), predictions)

print('Begin to write submission file ..')
f_submit = open(os.path.join(root_path, 'submit.csv'), 'w')
f_submit.write('image,ALB,BET,DOL,LAG,NoF,OTHER,SHARK,YFT\n')
for i, image_name in enumerate(test_image_list):
    pred = ['%.6f' % p for p in predictions[i, :]]
    if i % 100 == 0:
        print('{} / {}'.format(i, nbr_test_samples))
    f_submit.write('%s,%s\n' % (os.path.basename(image_name), ','.join(pred)))
f_submit.close()
print('Submission file successfully generated!')

Practical Tricks

  1. When we use ConvNet for image classification task, while the train samples size is quite small, we can pick a STOA ConvNet architecture, e.g. InceptionV3, ResNet, Inception-ResNet, DenseNet, etc with pre-trained weights on ImageNet to speed up convergence.
  2. Finetune with small learning rate. I have tried learning rate with 0.001 and 0.0001. The smaller learning rate training is quite slow, but gain good validation accuracy.
  3. Use Data augumentation to reduce overfitting.
  4. Split train and local validation.
  5. Ensemble models might help, but I didn’t try yet while I am writing this.

Finally special thanks to pengpaiSH for referencing his code sample. https://github.com/pengpaiSH/Kaggle_NCFM

Kaggle AllState Competition in Azure Databricks

This post, we will describe how to practice one Kaggle competition process with Azure Databricks. Compared to run our training and tuning phase in local machines or single servers, it is quite fast that we can train our model in Azure Databricks with Spark.

Kaggle Allstate Claims Severity

When you’ve been devastated by a serious car accident, your focus is on the things that matter the most: family, friends, and other loved ones. Pushing paper with your insurance agent is the last place you want your time or mental energy spent. This is why Allstate, a personal insurer in the United States, is continually seeking fresh ideas to improve their claims service for the over 16 million households they protect.

Allstate is currently developing automated methods of predicting the cost, and hence severity, of claims. In this recruitment challenge, Kagglers are invited to show off their creativity and flex their technical chops by creating an algorithm which accurately predicts claims severity. Aspiring competitors will demonstrate insight into better ways to predict claims severity for the chance to be part of Allstate’s efforts to ensure a worry-free customer experience.

Data and Preparation

Download the data from https://www.kaggle.com/c/allstate-claims-severity/data. Upload the data file into DBFS or Azure blob storage, then read train data and test data into DataFrame.

import re
from pyspark.sql import SparkSession
from pyspark.ml.feature import StringIndexer, VectorAssembler
from pyspark.ml.regression import RandomForestRegressor, RandomForestRegressionModel
from pyspark.ml import Pipeline, PipelineModel
from pyspark.ml.evaluation import RegressionEvaluator
from pyspark.ml.tuning import ParamGridBuilder, CrossValidator
from pyspark.mllib.evaluation import RegressionMetrics
print("Read and load data started...")
trainInput = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("dbfs:/yourpath/train.csv")
.cache())

testInput = (spark.read
.option("header", "true")
.option("inferSchema", "true")
.csv("dbfs:/yourpath/test.csv")
.cache())
print("Read and load data completed...")
data = trainInput.withColumnRenamed("loss", "label")
[trainingData, validationData] = data.randomSplit([0.7, 0.3])
trainingData.cache()
validationData.cache()
testData = testInput.cache()

Please note that you should replace the above bdfs path with your one. For Spark Dataframe/Dataset/RDD, better we cache them for future release. Here we cache trainingData, validationData and testData.

Modeling and Training

print("Feature engineering...")
print("Handle categories data...")
# Use StringIndexer or OneHotEncoder for categories columns
isCateg = lambda c: c.startswith("cat")
categNewCol = lambda c: "idx_{0}".format(c) if (isCateg(c)) else c

stringIndexerStages = map(lambda c: StringIndexer(inputCol=c, outputCol=categNewCol(c))
.fit(trainInput.select(c).union(testInput.select(c))), filter(isCateg, trainingData.columns))

removeTooManyCategs = lambda c: not re.match(r"cat(109$|110$|112$|113$|116$)", c)

# Keep those feature columns only
onlyFeatureCols = lambda c: not re.match(r"id|label", c)

featureCols = map(categNewCol, 
filter(onlyFeatureCols, 
filter(removeTooManyCategs, 
trainingData.columns)))

# Assemble features
assembler = VectorAssembler(inputCols=featureCols, outputCol="features")
print("Features generation and assembly completed...")
print("Building Random Forest for regression..")
algo = RandomForestRegressor(featuresCol="features", labelCol="label")

stages = stringIndexerStages
stages.append(assembler)
stages.append(algo)

#Build pipeline
pipeline = Pipeline(stages=stages)
print("K fold cross validation...")
numTrees = [5, 20]
maxDepth = [4, 6]
maxBins = [32]
numFolds = 3

paramGrid = (ParamGridBuilder()
.addGrid(algo.numTrees, numTrees)
.addGrid(algo.maxDepth, maxDepth)
.addGrid(algo.maxBins, maxBins)
.build())

cv = CrossValidator(estimator=pipeline,
evaluator=RegressionEvaluator(),
estimatorParamMaps=paramGrid,
numFolds=numFolds)

cvModel = cv.fit(trainingData)

Results Metrics and Prediction

trainPredictionsAndLabels = cvModel.transform(trainingData).select("label", "prediction").rdd

validPredictionsAndLabels = cvModel.transform(validationData).select("label", "prediction").rdd

trainRegressionMetrics = RegressionMetrics(trainPredictionsAndLabels)
validRegressionMetrics = RegressionMetrics(validPredictionsAndLabels)

bestModel = cvModel.bestModel
featureImportances = bestModel.stages[-1].featureImportances.toArray()

print("TrainingData count: {0}".format(trainingData.count()))
print("ValidationData count: {0}".format(validationData.count()))
print("TestData count: {0}".format(testData.count()))
print("=====================================================================")
print("Param algoNumTrees = {0}".format(",".join(map(lambda x:str(x), numTrees))))
print("Param algoMaxDepth = {0}".format(",".join(map(lambda x:str(x), maxDepth))))
print("Param algoMaxBins = {0}".format(",".join(map(lambda x:str(x), maxBins))))
print("Param numFolds = {0}".format(numFolds))
print("=====================================================================\n")
print("Training data MSE = {0}".format(trainRegressionMetrics.meanSquaredError))
print("Training data RMSE = {0}".format(trainRegressionMetrics.rootMeanSquaredError))
print("Training data R-squared = {0}".format(trainRegressionMetrics.r2))
print("Training data MAE = {0}".format(trainRegressionMetrics.meanAbsoluteError))
print("Training data Explained variance = {0}".format(trainRegressionMetrics.explainedVariance))
print("=====================================================================\n")
print("Validation data MSE = {0}".format(validRegressionMetrics.meanSquaredError))
print("Validation data RMSE = {0}".format(validRegressionMetrics.rootMeanSquaredError))
print("Validation data R-squared = {0}".format(validRegressionMetrics.r2))
print("Validation data MAE = {0}".format(validRegressionMetrics.meanAbsoluteError))
print("Validation data Explained variance = {0}".format(validRegressionMetrics.explainedVariance))
print("=====================================================================\n")
print("Feature importances:\n{0}\n".format("\n".join(map(lambda z: "{0} = {1}".format(str(z[0]),str(z[1])), zip(featureCols, featureImportances)))))
cvModel.transform(testData)\
.select("id", "prediction")\
.withColumnRenamed("prediction", "loss")\
.coalesce(1)\
.write.format("csv")\
.option("header", "true")\
.save("dbfs:/yourpath/rf_sub.csv"

Finally you can upload your result file to Kaggle if you want submit your results.