By Michael Heilman, Civis Analytics
scikit-learn is a wonderful tool for machine learning in Python, with great flexibility for implementing pipelines and running experiments (see, e.g., this Civis blog post series), but it’s not really designed for distributed computing on “big data” (e.g., hundreds of millions of records or more).
A common predictive modeling scenario, at least at Civis, is having a small or medium amount of labeled data to estimate a model from (e.g., 10,000 records), but a much larger unlabeled dataset to make predictions about. In this scenario, one might want to train a model on a laptop or single server with scikit-learn for ease of use and flexibility, but then apply that model to the large unlabeled dataset more quickly by distributing the computation with PySpark. Using PySpark for distributed prediction might also make sense if your ETL task is already implemented with (or would benefit from being implemented with) PySpark, which is wonderful for data transformations and ETL.
PySpark has functionality to pickle python objects, including functions, and have them applied to data that is distributed across processes, machines, etc. Also, it has a pandas-like syntax but separates the definition of the computation from its execution, similar to TensorFlow.
One issue is that passing data between a) Java-based Spark execution processes, which send data between machines and can perform transformations super-efficiently, and b) a Python process (e.g., for predicting with scikit-learn) incurs some overhead due to serialization and inter-process communication. One solution for this is the User-Defined Functions (UDFs) in PySpark’s DataFrame API. You can use the DataFrame API to perform most operations efficiently in Java (without having to write Java or Scala!) but then call Python UDFs that incur the Java-Python communication overhead only when necessary.
Normal PySpark UDFs operate one-value-at-a-time, which incurs a large amount of Java-Python communication overhead. Recently, PySpark added Pandas UDFs, which efficiently convert chunks of DataFrame columns to Pandas Series objects via Apache Arrow to avoid much of the overhead of regular UDFs. Having UDFs expect Pandas Series also saves converting between Python and NumPy floating point representations for scikit-learn, as one would have to do for a regular UDF.
The PySpark documentation is generally good and there are some posts about Pandas UDFs (1, 2, 3), but maybe the example code below will help some folks who have the specific use case of deploying a scikit-learn model for prediction in PySpark. We’ve found this sort of workflow very promising for multiple use cases recently at Civis (see, this SciPy talk), and hopefully you will, too.
As a final note, I’ll mention that it’s worth considering PySpark as an alternative to Pandas for a dataframe implementation and/or Python’s concurrent.futures for parallelization. PySpark works on small datasets with only a little extra effort but can much more easily be scaled up if needed.
pyspark_pandas_udf_sklearn.ipynb hosted with ❤ by GitHub
import numpy as np from sklearn.datasets import make_classification from sklearn.model_selection import train_test_split from sklearn.ensemble import RandomForestClassifier from sklearn.model_selection import GridSearchCV import pandas as pd import pyspark import pyspark.sql.functions as F from pyspark.sql.types import DoubleType, StringType, ArrayType
# Make some fake data and train a model. n_samples_test = 100000 n_samples_train = 1000 n_samples_all = n_samples_train + n_samples_test n_features = 50 X, y = make_classification(n_samples=n_samples_all, n_features=n_features, random_state=123) X_train, X_test, y_train, y_test = train_test_split(X, y, test_size=n_samples_test, random_state=45) # Use pandas to put the test data in parquet format to illustrate how to load it up later. # In real usage, the data might be on S3, Azure Blog Storage, HDFS, etc. column_names = [f'feature{i}' for i in range(n_features)] ( pd.DataFrame(X_test, columns=column_names) .reset_index() .rename(columns={'index': 'id'}) .to_parquet('unlabeled_data') )
param_grid = {'n_estimators': [100], 'max_depth': [2, 4, None]} gs_rf = GridSearchCV( RandomForestClassifier(random_state=42), param_grid=param_grid, scoring='roc_auc' ).fit(X_train, y_train) print('ROC AUC: %.3f' % gs_rf.best_score_)
sc = pyspark.SparkContext(appName="foo") sqlContext = pyspark.SQLContext(sc)
df_unlabeled = sqlContext.read.parquet('unlabeled_data') df_unlabeled
@F.udf(returnType=DoubleType()) def predict_udf(*cols): # cols will be a tuple of floats here. return float(gs_rf.predict_proba((cols,))[0, 1]) df_pred_a = df_unlabeled.select( F.col('id'), predict_udf(*column_names).alias('prediction') ) df_pred_a.take(5)
Make predictions with a Pandas UDF
Now we’ll use a Pandas UDF (i.e., vectorized UDF). In this case, Spark will send a tuple of pandas Series objects with multiple rows at a time. The tuple will have one Series per column/feature, in the order they are passed to the UDF. Note that one of these Series objects won’t contain features for all rows at once because Spark partitions datasets across workers. The partition size can be tuned, but we’ll just use defaults here.
@F.pandas_udf(returnType=DoubleType()) def predict_pandas_udf(*cols): # cols will be a tuple of pandas.Series here. X = pd.concat(cols, axis=1) return pd.Series(gs_rf.predict_proba(X)[:, 1]) df_pred_b = df_unlabeled.select( F.col('id'), predict_pandas_udf(*column_names).alias('prediction') ) df_pred_b.take(5)
Making multiclass predictions
Above, we’re just returning a single series of predictions for the positive class, which works for single binary or dependent variables. One can also put multiclass or multilabel models in Pandas UDFs. One just returns a series of lists of numbers instead of a series of numbers.
@F.pandas_udf(returnType=ArrayType(DoubleType())) def predict_pandas_udf(*cols): X = pd.concat(cols, axis=1) return pd.Series(row.tolist() for row in gs_rf.predict_proba(X)) df_pred_multi = ( df_unlabeled.select( F.col('id'), predict_pandas_udf(*column_names).alias('predictions') ) # Select each item of the prediction array into its own column. .select( F.col('id'), *[F.col('predictions')[i].alias(f'prediction_{c}') for i, c in enumerate(gs_rf.classes_)] ) ) df_pred_multi.take(5)