# Chapter 5 Pipelines

In this chapter, we dive into ML Pipelines, which are the engine that powers the machine learning functionality we saw in the Modeling chapter. When you invoke an ML function via the formula interface, for example ml_logistic_regression(mtcars, am ~ .), sparklyr actually constructs a pipeline for you under the hood. The Pipelines API is a lower level interface that enables advanced workflows, and we will soon show how to utilize it.

## 5.1 Estimators and Transformers

The building blocks of pipelines are objects called transformers and estimators, which are collectively referred to as pipeline stages. A transformer can be used to apply transformations to a data frame and return another data frame; the resulting data frame often comprises the original data frame with new columns appended to it. An estimator, on the other hand, can be used to create a transformer giving some training data. Consider the following example to illustrate this relationship: a “center and scale” estimator can learn the mean and standard deviation of some data and store the statistics in a resulting transformer; this transformer can then be used to normalize the data it was trained on and also any new, yet unseen, data.

Here is an example of how to define an estimator:

scaler <- ft_standard_scaler(sc, input_col = "features", output_col = "features_scaled", with_mean = TRUE)
scaler
## StandardScaler (Estimator)
## <standard_scaler_7f6d46f452a1>
##  (Parameters -- Column Names)
##   input_col: features
##   output_col: features_scaled
##  (Parameters)
##   with_mean: TRUE
##   with_std: TRUE

We can now create some data (for which we know the mean and standard deviation) then fit our scaling model to it using the ml_fit() function.

df <- copy_to(sc, data.frame(value = rnorm(100000))) %>%
ft_vector_assembler(input_cols = "value", output_col = "features")

scaler_model <- scaler %>%
ml_fit(df)
scaler_model
## StandardScalerModel (Transformer)
## <standard_scaler_7f6d46f452a1>
##  (Parameters -- Column Names)
##   input_col: features
##   output_col: features_scaled
##  (Transformer Info)
##   mean:  num 0.00421
##   std:  num 0.999 

Note: In Spark ML, many algorithms and feature transformers require that the input be a vector column. The function ft_vector_assembler() performs this task. The function can also be used to initialize a transformer to be used in a pipeline.

We see that the mean and standard deviation are very close to $$0$$ and $$1$$, respectively, which is what we expect. We can then use the transformer to transform a data frame, using the ml_transform() function:

scaler_model %>%
ml_transform(df) %>%
glimpse()
## Observations: ??
## Variables: 3
## Database: spark_connection
## $value <dbl> 0.75373300, -0.84207731, 0.59365113, -… ##$ features        <list> [0.753733, -0.8420773, 0.5936511, -0.…
## $features_scaled <list> [0.7502211, -0.8470762, 0.58999, -0.4… ## 5.2 Pipelines and Pipeline Models A pipeline is simply a sequence of transformers and estimators, and a pipeline model is a pipeline that has been trained on data so all of its components have been converted to transformers. Note that Spark ML internals dictate that pipelines are always estimators, even if they comprise only of transformers. There are a couple ways to construct a pipeline in sparklyr, both of which uses the ml_pipeline() function. We can initialize an empty pipeline with ml_pipeline(sc) and append stages to it: ml_pipeline(sc) %>% ft_standard_scaler( input_col = "features", output_col = "features_scaled", with_mean = TRUE ) ## Pipeline (Estimator) with 1 stage ## <pipeline_7f6d6a6a38ee> ## Stages ## |--1 StandardScaler (Estimator) ## | <standard_scaler_7f6d63bfc7d6> ## | (Parameters -- Column Names) ## | input_col: features ## | output_col: features_scaled ## | (Parameters) ## | with_mean: TRUE ## | with_std: TRUE Alternatively, we can pass stages directly to ml_pipeline(): pipeline <- ml_pipeline(scaler) We fit a pipeline as we would fit an estimator: pipeline_model <- pipeline %>% ml_fit(df) pipeline_model ## PipelineModel (Transformer) with 1 stage ## <pipeline_7f6d64df6e45> ## Stages ## |--1 StandardScalerModel (Transformer) ## | <standard_scaler_7f6d46f452a1> ## | (Parameters -- Column Names) ## | input_col: features ## | output_col: features_scaled ## | (Transformer Info) ## | mean: num 0.00421 ## | std: num 0.999  pipeline  ## 5.3 Applying Pipelines to OKCupid Data Now that we have an understanding of the rudimentary concepts for ML Pipelines, let us apply them to the predictive modeling problem from the previous chapter, where we are trying to predict whether people are currently employed by looking at their profiles. Our starting point is the okc_train data frame. We first exhibit the pipeline, which includes feature engineering and modeling steps, then walk through it. pipeline <- ml_pipeline(sc) %>% ft_dplyr_transformer( okc_train %>% mutate( essay_length = char_length(paste(!!!syms(paste0("essay", 0:9)))) ) ) %>% ft_string_indexer(input_col = "sex", output_col = "sex_indexed") %>% ft_string_indexer(input_col = "drinks", output_col = "drinks_indexed") %>% ft_string_indexer(input_col = "drugs", output_col = "drugs_indexed") %>% ft_one_hot_encoder_estimator( input_cols = c("sex_indexed", "drinks_indexed", "drugs_indexed"), output_cols = c("sex_encoded", "drinks_encoded", "drugs_encoded") ) %>% ft_vector_assembler( input_cols = c("age", "sex_encoded", "drinks_encoded", "drugs_encoded", "essay_length"), output_col = "features" ) %>% ft_standard_scaler(input_col = "features", output_col = "features_scaled", with_mean = TRUE) %>% ml_logistic_regression(features_col = "features_scaled", label_col = "not_working") The first stage in our pipeline is a SQL transformer defined by a data frame. The ft_dplyr_transformer() function extracts the SQL statement associated with mutate(., essay_length = char_length(paste(!!!syms(paste0("essay", 0:9))))) and constructs a SQL transformer with it. You can also use ft_sql_transformer() directly if you prefer writing SQL; note that only single table verbs are supported either way so joins are not supported. The next three stages index the sex, drinks, and drugs columns, which are character, into numeric indicies via ft_string_indexer(). This is necessary for the ft_one_hot_encoder_estimator() that comes next which requires numeric column inputs. Once all of our predictor variables are of numeric type (recall that age is numeric already), we can create our features vector using ft_vector_assembler() which concatenates all of its inputs together into one column of vectors. We can then use ft_standard_scaler() to normalize all elements of the features column (including the one-hot encoded 0/1 values of the categorical variables), and finally apply a logistic regression via ml_logistic_regression(). During prototyping, you may want to execute these transformations eagerly on a small subset of the data, by passing the data frame to the ft_ and ml_ functions, and inspecting the transformed data frame. For example, you can do the following: okc_train %>% ft_string_indexer("sex", "sex_indexed") %>% select(sex_indexed) ## # Source: spark<?> [?? x 1] ## sex_indexed ## <dbl> ## 1 0 ## 2 0 ## 3 1 ## 4 0 ## 5 1 ## 6 0 ## 7 0 ## 8 1 ## 9 1 ## 10 0 ## # … with more rows Once you have found the right transformations for your dataset, you can then replace the data frame input with ml_pipeline(sc), and the result will be a pipeline that you can apply to any data frame with the appropriate schema. ### 5.3.1 Hyperparameter Tuning Going back to the pipeline we have created above, we can use ml_cross_validator() to perform the cross validation workflow we demonstrated in the previous chapter and easily test different hyperparameter combinations. In this example, we test whether centering the variables improve predictions together with various regularization values for the logistic regression. We define the cross validator as follows: cv <- ml_cross_validator( sc, estimator = pipeline, estimator_param_maps = list( standard_scaler = list(with_mean = c(TRUE, FALSE)), logistic_regression = list( elastic_net_param = c(0.25, 0.75), reg_param = c(1e-2, 1e-3) ) ), evaluator = ml_binary_classification_evaluator(sc, label_col = "not_working"), num_folds = 10 ) The estimator argument is simply the estimator we want to tune, and in this case it is the pipeline that we defined. We provide the hyperparameter values we are interested in via the estimator_param_maps parameter, which takes a nested named list. The names at the first level correspond to UIDs of the stages we want to tune (if a partial UID is provided sparklyr will attempt to match it to a pipeline stage) and the names at the second level correspond to parameters of each stage. In the snippet above, we are specifying that we want to test • Standard scaler: the values TRUE and FALSE for with_mean, which denotes whether predictor values are centered • Logistic regression: The values 0.25 and 0.75 for $$\alpha$$, and the values 1e-2 and 1e-3 for $$\lambda$$ We expect this to give rise to $$2 \times 2 \times 2 = 8$$ hyperparameter combinations, which we can confirm by printing the cv object: cv ## CrossValidator (Estimator) ## <cross_validator_d5676ac6f5> ## (Parameters -- Tuning) ## estimator: Pipeline ## <pipeline_d563b0cba31> ## evaluator: BinaryClassificationEvaluator ## <binary_classification_evaluator_d561d90b53d> ## with metric areaUnderROC ## num_folds: 10 ## [Tuned over 8 hyperparameter sets] As with any other estimator, we can fit the cross validator using ml_fit() cv_model <- cv %>% ml_fit(okc_train) and inspect the results: ml_validation_metrics(cv_model) %>% arrange(-areaUnderROC) ## areaUnderROC elastic_net_param_1 reg_param_1 with_mean_2 ## 1 0.7722700 0.75 0.001 TRUE ## 2 0.7718431 0.75 0.010 FALSE ## 3 0.7718350 0.75 0.010 TRUE ## 4 0.7717677 0.25 0.001 TRUE ## 5 0.7716070 0.25 0.010 TRUE ## 6 0.7715972 0.25 0.010 FALSE ## 7 0.7713816 0.75 0.001 FALSE ## 8 0.7703913 0.25 0.001 FALSE ## 5.4 Operating Modes of Pipelines Functions By now, you have likely noticed that the pipeline stage functions, such as ft_string_indexer() and ml_logistic_regression() behave differently depending on the first argument passed to them3. The full pattern is as follows: First argument Returns Example Spark connection Estimator or transformer object ft_string_indexer(sc) Pipeline Pipeline ml_pipeline(sc) %>% ft_string_indexer() Data frame, without formula Data frame ft_string_indexer(iris, "Species", "indexed") Data frame, with formula sparklyr ML model object ml_logistic_regression(iris, Species ~ .) • If a Spark connection is provided, the function returns a transformer or estimator object, which can be utilized directly using ml_fit() or ml_transform() or be included in a pipeline. • If a pipeline is provided, the function returns a pipeline object with the stage appended to it. • If a data frame is provided to a feature transformer function (those with prefix ft_), or an ML algorithm without also providing a formula, the function instantiates the pipeline stage object, fit it to the data if necessary (if the stage is an estimator), then transforms the data frame returning a data frame. • If a data frame and a formula are provided to an ML algorithm that supports the formula interface, sparklyr builds a pipeline model under the hood and returns an ML model object which contains additional metadata information. The formula interface approach is what we studied in the Modeling section, and is what we recommend new users to Spark start with, since its syntax is similar to existing R modeling packages and abstracts away some Spark ML peculiarities. However, to take advantage of the full power of Spark ML and leverage pipelines for workflow organization and interoperability, it is worthwhile to learn the ML Pipelines API. ## 5.5 Model Persistence and Interoperability One of the most powerful aspects of pipelines is that they can be serialized to disk and are fully interoperable with the other Spark APIs, such as Python and Scala. To save a pipeline model, call ml_save() and provide a path. model_dir <- file.path(tempdir(), "model") ml_save(pipeline_model, model_dir) ## Model successfully saved. Let us take a look at the directory we just wrote to. list.dirs(model_dir,full.names = FALSE) ## [1] "" "metadata" ## [3] "stages" "stages/0_dplyr_transformer_d5640940540" ## [5] "stages/0_dplyr_transformer_d5640940540/metadata" "stages/1_string_indexer_d5635e4cddf" ## [7] "stages/1_string_indexer_d5635e4cddf/data" "stages/1_string_indexer_d5635e4cddf/metadata" ## [9] "stages/2_string_indexer_d564088071d" "stages/2_string_indexer_d564088071d/data" ## [11] "stages/2_string_indexer_d564088071d/metadata" "stages/3_string_indexer_d56228b2204" ## [13] "stages/3_string_indexer_d56228b2204/data" "stages/3_string_indexer_d56228b2204/metadata" ## [15] "stages/4_one_hot_encoder_estimator_d565c664653" "stages/4_one_hot_encoder_estimator_d565c664653/data" ## [17] "stages/4_one_hot_encoder_estimator_d565c664653/metadata" "stages/5_vector_assembler_d563e932289" ## [19] "stages/5_vector_assembler_d563e932289/metadata" "stages/6_standard_scaler_d562dc07077" ## [21] "stages/6_standard_scaler_d562dc07077/data" "stages/6_standard_scaler_d562dc07077/metadata" ## [23] "stages/7_logistic_regression_d563517ac18" "stages/7_logistic_regression_d563517ac18/data" ## [25] "stages/7_logistic_regression_d563517ac18/metadata"  We can dive into a couple of the files to see what type of data was saved. spark_read_json(sc, file.path(model_dir, "stages/0_dplyr_transformer_d5640940540/metadata/")) %>% glimpse() ## Observations: ?? ## Variables: 5 ## Database: spark_connection ##$ class        <chr> "org.apache.spark.ml.feature.SQLTransformer"
## $paramMap <list> [["SELECT age, body_type, diet, drinks, drugs, … ##$ sparkVersion <chr> "2.4.0"
## $timestamp <dbl> 1.557277e+12 ##$ uid          <chr> "dplyr_transformer_d5640940540"
spark_read_parquet(sc, file.path(model_dir, "stages/7_logistic_regression_d563517ac18/data"))
## # Source: spark<data> [?? x 5]
##   numClasses numFeatures interceptVector coefficientMatrix                      isMultinomial
##        <int>       <int> <list>          <list>                                 <lgl>
## 1          2          12 <dbl [1]>       <-1.3263745374391678  -0.101825246818… FALSE  

We see that quite a bit of information has been exported, from the SQL statement in the dplyr transformer to the fitted coefficient estimates of the logistic regression. We can then (in a new Spark session) reconstruct the model by using ml_load():

model_2 <- ml_load(sc, model_dir)

Let us see if we can retrieve the logistic regression stage from this pipeline model:

model_2 %>%
ml_stage("logistic_regression")
## LogisticRegressionModel (Transformer)
## <logistic_regression_d563517ac18>
##  (Parameters -- Column Names)
##   features_col: features_scaled
##   label_col: not_working
##   prediction_col: prediction
##   probability_col: probability
##   raw_prediction_col: rawPrediction
##  (Transformer Info)
##   coefficient_matrix:  num [1, 1:12] -1.3264 -0.1018 0.1143 0.207 0.0247 ...
##   coefficients:  num [1:12] -1.3264 -0.1018 0.1143 0.207 0.0247 ...
##   intercept:  num 1.68
##   intercept_vector:  num 1.68
##   num_classes:  int 2
##   num_features:  int 12
##   threshold:  num 0.5
##   thresholds:  num [1:2] 0.5 0.5 

Note that the exported JSON and parquet files are agnostic of the API that exported them. This means that in a multilingual machine learning engineering team, you can pick up a data preprocessing pipeline from a data engineer working in Python, build a prediction model on top of it, then hand off the final pipeline off to a deployment engineering working in Scala.

### 5.5.1 Sparklyr ML Models

When ml_save() is called for sparklyr ML models (created using the formula interface), the associated pipeline model is saved, but any sparklyr specific metadata, such as index labels, are not. In other words, saving a sparklyr ml_model object then loading it will yield a pipeline model object, as if you created it via the ML Pipelines API. What we gain from this tradeoff of loss information is interoperability with other languages.