Azure Databricks is a very cool easy to use platform for both analytics engineers and machine learning developers. I would like to use this post to summarize basic APIs and tricks in feature engineering with Azure Databricks. Previously I was using Jupyter notebook or PyCharm to develop or practice some machine learning cases. Compared to Jupyter Notebook, Azure Databricks provides similar Notebook, which adds some additional features. In the future I will describe how to use Azure Databricks for production.
Feature engineering is the preprocessing phase in machine learning, also needs huge effort from developers to get data ready for modeling and training. Here I list some basic feature engineering scenarios with PySpark in Azure Databricks.
If you want to run code snippet below in normal Jupyter Notebook, you need add Spark initialization code as below. (Not necessary in Azure Databricks, as it is already for use.)
spark = SparkSession\ .builder\ .appName("AppName")\ .getOrCreate()
Continuous Data
#Binarizer from __future__ import print_function from pyspark.sql import SparkSession from pyspark.ml.feature import Binarizer continuousDataFrame = spark.createDataFrame([ (0, 1.1), (1, 8.5), (2, 5.2) ], ["id", "feature"]) binarizer = Binarizer(threshold=5.1, inputCol="feature", outputCol="binarized_feature") binarizedDataFrame = binarizer.transform(continuousDataFrame) print("Binarizer output with Threshold = %f" % binarizer.getThreshold()) binarizedDataFrame.show()
#Bucketizer from pyspark.ml.feature import Bucketizer splits = [-float("inf"), -0.5, 0.0, 0.5, float("inf")] data = [(-999.9,), (-0.5,), (-0.3,), (0.0,), (0.2,), (999.9,)] dataFrame = spark.createDataFrame(data, ["features"]) bucketizer = Bucketizer(splits=splits, inputCol="features", outputCol="bucketedFeatures") # Assign buckets per the splits boundary bucketedData = bucketizer.transform(dataFrame) print("Bucketizer output with %d buckets" % (len(bucketizer.getSplits())-1)) bucketedData.show()
#QuantileDiscretizer from pyspark.ml.feature import QuantileDiscretizer data = [(0, 18.0), (1, 19.0), (2, 8.0), (3, 5.0), (4, 2.2), (5, 9.2), (6, 14.4)] df = spark.createDataFrame(data, ["id", "hour"]) df = df.repartition(1) # Divide into 3 buckets as quantile distribution discretizer = QuantileDiscretizer(numBuckets=3, inputCol="hour", outputCol="result") discretizerModel = discretizer.fit(df) result = discretizerModel.transform(df) result.show()
#MaxAbsScaler from pyspark.ml.feature import MaxAbsScaler from pyspark.ml.linalg import Vectors dataFrame = spark.createDataFrame([ (0, Vectors.dense([1.0, 0.1, -8.0]),), (1, Vectors.dense([2.0, 1.0, -4.0]),), (2, Vectors.dense([4.0, 10.0, 8.0]),) ], ["id", "features"]) scaler = MaxAbsScaler(inputCol="features", outputCol="scaledFeatures") # Calculate Max value model scalerModel = scaler.fit(dataFrame) # Transform with scale model, so that values are scaled between [-1.0, 1.0] scaledData = scalerModel.transform(dataFrame) scaledData.select("features", "scaledFeatures").show()
#Standard scaler from pyspark.ml.feature import StandardScaler dataFrame = spark.read.format("libsvm").load("data/mllib/sample_libsvm_data.txt") scaler = StandardScaler(inputCol="features", outputCol="scaledFeatures", withStd=True, withMean=False) # Calculate mean and variance scalerModel = scaler.fit(dataFrame) # Standardize scaledData = scalerModel.transform(dataFrame) scaledData.show()
#Polynomial expansion from pyspark.ml.feature import PolynomialExpansion df = spark.createDataFrame([ (Vectors.dense([2.0, 1.0]),), (Vectors.dense([0.0, 0.0]),), (Vectors.dense([3.0, -1.0]),) ], ["features"]) polyExpansion = PolynomialExpansion(degree=3, inputCol="features", outputCol="polyFeatures") polyDF = polyExpansion.transform(df) polyDF.show(truncate=False)
Discrete Data
# StringIndexer and One-hot encoder from pyspark.ml.feature import OneHotEncoder, StringIndexer df = spark.createDataFrame([ (0, "a"), (1, "b"), (2, "c"), (3, "a"), (4, "a"), (5, "c") ], ["id", "category"]) stringIndexer = StringIndexer(inputCol="category", outputCol="categoryIndex") model = stringIndexer.fit(df) indexed = model.transform(df) encoder = OneHotEncoder(inputCol="categoryIndex", outputCol="categoryVec") encoded = encoder.transform(indexed) encoded.show()
Text
#Use Stop Words from pyspark.ml.feature import StopWordsRemover sentenceData = spark.createDataFrame([ (0, ["I", "saw", "the", "red", "balloon"]), (1, ["Mary", "had", "a", "little", "lamb"]) ], ["id", "raw"]) remover = StopWordsRemover(inputCol="raw", outputCol="filtered") remover.transform(sentenceData).show(truncate=False)
# Tokenizer from pyspark.ml.feature import Tokenizer, RegexTokenizer from pyspark.sql.functions import col, udf from pyspark.sql.types import IntegerType sentenceDataFrame = spark.createDataFrame([ (0, "Hi I heard about Spark"), (1, "I wish Java could use case classes"), (2, "Logistic,regression,models,are,neat") ], ["id", "sentence"]) tokenizer = Tokenizer(inputCol="sentence", outputCol="words") regexTokenizer = RegexTokenizer(inputCol="sentence", outputCol="words", pattern="\\W") countTokens = udf(lambda words: len(words), IntegerType()) tokenized = tokenizer.transform(sentenceDataFrame) tokenized.select("sentence", "words")\ .withColumn("tokens", countTokens(col("words"))).show(truncate=False) regexTokenized = regexTokenizer.transform(sentenceDataFrame) regexTokenized.select("sentence", "words") \ .withColumn("tokens", countTokens(col("words"))).show(truncate=False)
# Count Vectorizer from pyspark.ml.feature import CountVectorizer df = spark.createDataFrame([ (0, "a b c".split(" ")), (1, "a b b c a".split(" ")) ], ["id", "words"]) cv = CountVectorizer(inputCol="words", outputCol="features", vocabSize=3, minDF=2.0) model = cv.fit(df) result = model.transform(df) result.show(truncate=False)
# TF-IDF from pyspark.ml.feature import HashingTF, IDF, Tokenizer sentenceData = spark.createDataFrame([ (0.0, "Hi I heard about Spark"), (0.0, "I wish Java could use case classes"), (1.0, "Logistic regression models are neat") ], ["label", "sentence"]) tokenizer = Tokenizer(inputCol="sentence", outputCol="words") wordsData = tokenizer.transform(sentenceData) hashingTF = HashingTF(inputCol="words", outputCol="rawFeatures", numFeatures=20) featurizedData = hashingTF.transform(wordsData) idf = IDF(inputCol="rawFeatures", outputCol="features") idfModel = idf.fit(featurizedData) rescaledData = idfModel.transform(featurizedData) rescaledData.select("label", "features").show()
# NGram from pyspark.ml.feature import NGram #Hanmeimei loves LiLei #LiLei loves Hanmeimei wordDataFrame = spark.createDataFrame([ (0, ["Hi", "I", "heard", "about", "Spark"]), (1, ["I", "wish", "Java", "could", "use", "case", "classes"]), (2, ["Logistic", "regression", "models", "are", "neat"]) ], ["id", "words"]) ngram = NGram(n=2, inputCol="words", outputCol="ngrams") ngramDataFrame = ngram.transform(wordDataFrame) ngramDataFrame.select("ngrams").show(truncate=False)
Others
# SQL Transformer from pyspark.ml.feature import SQLTransformer df = spark.createDataFrame([ (0, 1.0, 3.0), (2, 2.0, 5.0) ], ["id", "v1", "v2"]) sqlTrans = SQLTransformer( statement="SELECT *, (v1 + v2) AS v3, (v1 * v2) AS v4 FROM __THIS__") sqlTrans.transform(df).show()
# R formula transform from pyspark.ml.feature import RFormula dataset = spark.createDataFrame( [(7, "US", 18, 1.0), (8, "CA", 12, 0.0), (9, "NZ", 15, 0.0)], ["id", "country", "hour", "clicked"]) formula = RFormula( formula="clicked ~ country + hour", featuresCol="features", labelCol="label") output = formula.fit(dataset).transform(dataset) output.select("features", "label").show()
Note that: There are many feature engineering APIs are not mentioned here. Please see more info from latest pyppark.ml.feature package.
https://spark.apache.org/docs/latest/api/python/pyspark.ml.html#module-pyspark.ml.feature