# Chapter 11 Distributed R

“Not like this. Not like this. Not like this.”

— Cersei Lannister

In previous chapters you learned how to perform data analysis and modeling in local Spark instances and proper Spark clusters and the previous Extensions chapter described how to make use of additional functionality provided by the Spark and R communities at large. In most cases, the combination of Spark functionality and extensions is more than enough to perform almost any computation. However, for those cases where functionality is lacking in Spark and their extensions, you can consider distributing R computations to worker nodes yourself.

You can run arbitrary R code in each worker node to run any computation – you can run simulations, crawl content from the web, transform data and so on. In addition, you can also make use of any package available in CRAN and private packages available in your organization, this reduces the amount of code you need to write to help you keep productive.

If you are already familiar with R, you might be tempted to use this approach for all Spark operations; however, this is not the recommended use of spark_apply(). Previous chapters provided more efficient techniques and tools to solve well known problems – in contrast, spark_apply() introduces additional cognitive overhead, additional troubleshooting steps, performance trade-offs and, in general, additional complexity that should be avoided. Not to say that spark_apply() should never be used; but instead, spark_apply() is reserved to support use cases where previous tools and techniques fell short.

## 11.1 Overview

The Introduction chapter introduced MapReduce as a technique capable of processing large-scale datasets; it also described how Apache Spark provided a superset of operations to perform MapReduce computations with ease and more efficiently. The Tuning chapter presented insights into how Spark works by applying custom transformation over each partition of the distributed datasets. For instance, if we were to multiply by ten each element of a distributed numeric dataset, Spark would apply a mapping operation over each partition through multiple workers, conceptually, this is illustrated in Figure ??.

This chapter presents how to define a custom f(x) mapping operation using spark_apply(); for the previous example, spark_apply() provides support to define 10 * x as follows:

sdf_len(sc, 3) %>% spark_apply(~ 10 * .x)
# Source: spark<?> [?? x 1]
id
* <dbl>
1    10
2    20
3    30

Notice that ~ 10 * .x is plain R code executed across all worker nodes; the ~ operator is defined in the rlang package and provides a compact definition of a function equivalent to function(.x) 10 * .x – this compact form is also known as an anonymous function or lambda expression.

The f(x) function must take an R data frame as input and must also produce an R data frame as output, conceptually illustrated in Figure ??.

We can refer back to the original MapReduce example from the Introduction chapter, where the map operation was defined to split sentences into words and then, the total unique words were counted as the reduce operation.

In R, we could make use of the unnest_tokens() function from the tidytext R package, which you would need to install from CRAN before conneceting to Spark. You can then use tidytext with spark_apply() to tokenize those sentences into a table of words:

sentences <- copy_to(sc, data_frame(text = c("I like apples", "I like bananas")))

sentences %>%
spark_apply(~tidytext::unnest_tokens(.x, word, text))
# Source: spark<?> [?? x 1]
word
* <chr>
1 i
2 like
3 apples
4 i
5 like
6 bananas

Finally, we can reduce this dataset using dplyr to compute this original MapReduce word-count example using dplyr as follows:

sentences %>%
spark_apply(~tidytext::unnest_tokens(., word, text)) %>%
group_by(word) %>%
summarise(count = count())
# Source: spark<?> [?? x 2]
word    count
* <chr>   <dbl>
1 i           2
2 apples      1
3 like        2
4 bananas     1

The rest of this chapter will explain in detail use cases, features, caveats, considerations and troubleshooting techniques required when defining custom mappings through spark_apply()

Note: The previous sentence tokenizer example can be more efficiently implemented using concepts from previous chapters, specifically through sentences %>% ft_tokenizer("text", "words") %>% transmute(word = explode(words)).

## 11.2 Use Cases

In the overview section we presented an example to help you understand how spark_apply() works; this section will cover a few practical use cases for spark_apply():

Import
You can consider using R to import data from external data sources and formats. For example, when a file format is not natively supported in Spark or its extensions, you can consider using R code to implement a distributed custom parser using R packages.
Model
It is natural to use the rich modeling capabilities already available in R with Spark. In most cases, R models can’t be used across large data; however, we will present two particular use cases where R models can be useful at scale. For instance, when data fits into a single machine you can make use of grid search to optimize their parameters in parallel. In cases where the data can be partitioned to create several models over subsets of the data you can use partitioned modeling in R to compute models across partitions.
Transform
You can make use of R’s rich data transformation capabilities to complement Spark. We will present a use case of evaluating data by external systems and use R to interoperate with them by calling them through a web API.
Compute
When you need to perform large-scale computation in R, or big-compute as described in the Introduction chapter, Spark is ideal to distribute this computation. We will present simulations as a particular use case for large-scale computing in R.

We will now explore each use case in detail and provide a working example to help you understand how you use spark_apply() effectively.

### 11.2.1 Custom Parsers

While Spark and its various extensions provide support for many file formats (CSVs, JSON, Parquet, AVRO, etc) there are many more file formats that you might need to use at scale. You can parse these additional formats using spark_apply() and many of the existing R packages. In this section we will understand how to parse log files, but similar approaches can be followed to parse other file formats.

It is common to use Spark to analyze log files; for instance, logs that track download data from Amazon S3. To parse logs, the webreadr package can simplify this process by providing support to load logs stored as: Amazon S3, Squid and the Common log format. You should install webreadr from CRAN before connecting to Spark.

For example, an Amazon S3 log looks as follows.

#Version: 1.0
#Fields: date time x-edge-location sc-bytes c-ip cs-method cs(Host) cs-uri-stem
sc-status cs(Referer) cs(User-Agent) cs-uri-query cs(Cookie) x-edge-result-type

2014-05-23  01:13:11    FRA2    182 192.0.2.10  GET d111111abcdef8.cloudfront.net
/view/my/file.html    200 www.displaymyfiles.com  Mozilla/4.0%20
(compatible;%20MSIE%205.0b1;%20Mac_PowerPC)   -   zip=98101   RefreshHit
MRVMF7KydIvxMWfJIglgwHQwZsbG2IhRJ07sn9AkKUFSHS9EXAMPLE==
d111111abcdef8.cloudfront.net http    -   0.001

Which can be parsed easily with read_aws() as follows:

aws_log <- system.file("extdata/log.aws", package = "webreadr")
webreadr::read_aws(aws_log)
# A tibble: 2 x 18
date                edge_location bytes_sent ip_address http_method host  path
<dttm>              <chr>              <int> <chr>      <chr>       <chr> <chr>
1 2014-05-23 01:13:11 FRA2                 182 192.0.2.10 GET         d111… /vie…
2 2014-05-23 01:13:12 LAX1             2390282 192.0.2.2… GET         d111… /sou…
# ... with 11 more variables: status_code <int>, referer <chr>, user_agent <chr>,
#   query <chr>, cookie <chr>, result_type <chr>, request_id <chr>,
#   host_header <chr>, protocol <chr>, bytes_received <chr>, time_elapsed <dbl>

To scale this operation, we can make use of read_aws() using spark_apply():

spark_read_text(sc, "logs", aws_log, overwrite = TRUE, whole = TRUE) %>%
spark_apply(~webreadr::read_aws(.x$contents)) # Source: spark<?> [?? x 18] date edge_location bytes_sent ip_address http_method host path * <dttm> <chr> <int> <chr> <chr> <chr> <chr> 1 2014-05-23 01:13:11 FRA2 182 192.0.2.10 GET d111… /vie… 2 2014-05-23 01:13:12 LAX1 2390282 192.0.2.2… GET d111… /sou… # ... with 11 more variables: status_code <int>, referer <chr>, user_agent <chr>, # query <chr>, cookie <chr>, result_type <chr>, request_id <chr>, # host_header <chr>, protocol <chr>, bytes_received <chr>, time_elapsed <dbl> The code between plain R and spark_apply() is similar; however, when using spark_apply() logs are parsed in parallel across all the worker nodes available in your cluster. This concludes the custom parsers section, there are many other file formats you can parse at scale from R following a similar approach. We will now present partitioned modeling as another use case focused on modeling across several datasets in parallel. ### 11.2.2 Partitioned Modeling There are many modeling packages available in R that can also be run at scale by partitioning the data into manageable groups that do fit in the resources of a single machine. For instance, suppose that you have a 1TB dataset for sales data across multiple cities and you are tasked with creating sales predictions over each city. For this case, you can consider partitioning the original dataset per city, say into 10GB of data per city, which could be managed by a single compute instance. For this kind of partitionable dataset, you can also consider using spark_apply() by training each model over each city. As a simple example of partitioned modeling, we can run a linear regression using the iris dataset partitioned by Species: iris <- copy_to(sc, datasets::iris) iris %>% spark_apply(nrow, group_by = "Species") # Source: spark<?> [?? x 2] Species result <chr> <int> 1 versicolor 50 2 virginica 50 3 setosa 50 Then you can run a linear regression over each species using spark_apply(): iris %>% spark_apply( function(e) summary(lm(Petal_Length ~ Petal_Width, e))$r.squared,
names = "r.squared",
group_by = "Species")
# Source: spark<?> [?? x 2]
Species    r.squared
<chr>          <dbl>
1 versicolor     0.619
2 virginica      0.104
3 setosa         0.110

As you can see from the r.squared results and intuitively in Figure ??, the linear model for versicolor better fits to the regression line.

purrr::map(c("versicolor", "virginica", "setosa"),
~dplyr::filter(datasets::iris, Species == !!.x) %>%
ggplot2::ggplot(ggplot2::aes(x = Petal.Length, y = Petal.Width)) +
ggplot2::geom_point())

This concludes our brief overview on how to perform modeling over several different, partitionable, datasets. A similar technique can be applied to perform modeling over the same dataset using different modeling parameters which we will cover next.

### 11.2.4 Web APIs

A Web API is a program that can do something useful through a web interface that other programs can reuse. For instance, services like Twitter provide Web APIs that allow you to automate reading tweets from a program written in R and other programming languages. You can make use of Web APIs using spark_apply() by sending programmatic requests to external services using R code.

For example, Google provides a Web API to label images using deep learning techniques; you can make use of this API from R, but for larger datasets, you would need to access their APIs from Spark. You can use Spark to prepare data to be consumed by a webAPI, then use spark_apply() to perform this call and process all the incoming results back in Spark.

The following example makes use of the googleAuthR package to authenticate to Google Cloud, the RoogleVision package to perform labeling over the Google Vision API, and spark_apply() to interoperate between Spark and Google’s deep learning service. If you want to run the following example you will need to disconnect first from Spark and download your cloudml.json from the Google developer portal.

sc <- spark_connect(
master = "local",
config = list(sparklyr.shell.files = "cloudml.json"))

images <- copy_to(sc, data.frame(
image = "http://pbs.twimg.com/media/DwzcM88XgAINkg-.jpg"
))

spark_apply(images, function(df) {
json_file = "cloudml.json")

df$image, download = FALSE) }) # Source: spark<?> [?? x 4] mid description score topicality <chr> <chr> <dbl> <dbl> 1 /m/04rky Mammal 0.973 0.973 2 /m/0bt9lr Dog 0.958 0.958 3 /m/01z5f Canidae 0.956 0.956 4 /m/0kpmf Dog breed 0.909 0.909 5 /m/05mqq3 Snout 0.891 0.891 In order to successfully run a large distributed computation over a Web API, the Web API would have to be able to scale to support the load from all the Spark executors. One can trust that major service providers are likely to support all the requests incoming from your cluster. However, when calling internal Web APIs, make sure the API can handle the load. Also, when using third-party services, consider the cost of calling their API across all the executors in your cluster to avoid potentially expensive and unexpected charges. Next you will learn a use case for large-compute where R is used to perform distributed rendering. ### 11.2.5 Simulations R can be used in combination with Spark to perform large-scale computing. The use case we will explore in this section is rendering computationally-expensive images using the rayrender package, which uses ray tracing – a photorealistic technique commonly used in movie production. Let’s use this package to render a simple scene that includes a few spheres with using a lambertian material, a diffusely reflecting material or “matte”; but first, install using remotes::install_github("tylermorganwall/rayrender"), make sure to also disconnect and reconnect from Spark. library(rayrender) scene <- generate_ground(material = lambertian()) %>% add_object(sphere(material = metal(color="orange"), z = -2)) %>% add_object(sphere(material = metal(color="orange"), z = +2)) %>% add_object(sphere(material = metal(color="orange"), x = -2)) render_scene(scene, lookfrom = c(10, 5, 0), parallel = TRUE) In higher definitions, say 1920x1080, the previous example takes several minutes to render the single frame from Figure ??, rendering a few seconds at 30 frames-per-second would take several hours in a single machine. However, we can reduce this time using multiple machines by parallelizing computation across them. For instance, using ten machines with the same amount of CPUs would cut rendering time ten fold: system2("hadoop", args = c("fs", "-mkdir", "/rendering")) sdf_len(sc, 628, repartition = 628) %>% spark_apply(function(idx, scene) { render <- sprintf("%04d.png", idx$id)
rayrender::render_scene(scene, width = 1920, height = 1080,
lookfrom = c(12 * sin(idx$id/100), 5, 12 * cos(idx$id/100)),
filename = render)

}, context = scene, columns = list()) %>% collect()

After all the images are rendered, the last step is to collect them from HDFS and use tools like ffmpeg to convert individual images into an animation:

hadoop fs -get rendering/
ffmpeg -s 1920x1080 -i rendering/%d.png -vcodec libx264 -crf 25
-pix_fmt yuv420p rendering.mp4

Note: This example assumes HDFS is used as the storage technology for Spark and being run under a hadoop user, you will need to adjust this for your particular storage or user.

We’ve covered some of the common use cases for spark_apply(), but you are certainly welcome to find other use cases for your particular needs. The following sections will present technical concepts you will need to understand to create additional use cases and to use spark_apply() effectively.

## 11.3 Partitions

Most Spark operations, say to analyze data with dplyr or model with MLlib, do not require understanding how Spark partitions data, it works automatically – this is not the case for distributed R computations. Instead, you will have to learn and understand how exactly Spark is partitioning your data and provide transformations that are compatible with them. This is required since spark_apply() receives each partition and allows you to perform any transformation, not the entire dataset. You can refresh concepts like partitioning and transformations using the diagrams and examples from the Tuning chapter.

To help us understand how partitions are represented in spark_apply(), consider the following code. Should we expect the output to be the total number of rows?

sdf_len(sc, 10) %>%
spark_apply(~nrow(.x))
# Source: spark<?> [?? x 1]
result
*  <int>
1      5
2      5

As you can see from the results, the general the answer is no; Spark assumes data will be distributed across multiple machines so you will often find it already partitioned, even for small datasets. So we should not expect spark_apply() to operate over a single partition, let’s find out how many partitions sdf_len(sc, 10) contains:

sdf_len(sc, 10) %>% sdf_num_partitions()
[1] 2

This explains why counting rows through nrow() under spark_apply() retrieves two rows since there are two partitions, not one. spark_apply() is retrieving the count of rows over each partition, each partition contains five rows; not ten rows total as you might have expected.

For this particular example, we could further aggregate these partitions by repartitioning and then adding up – this would resemble a simple MapReduce operation using spark_apply():

sdf_len(sc, 10) %>%
spark_apply(~nrow(.x)) %>%
sdf_repartition(1) %>%
spark_apply(~sum(.x))
# Source: spark<?> [?? x 1]
result
*  <int>
1     10

It was the intent of this section is to make you aware of partitions while using spark_apply(); the next section presents group_by as a way to control partitions.

## 11.4 Grouping

When using spark_apply(), we can request explicit partitions from Spark. For instance, if we had to process numbers less than four in one partition and the remaining ones in a second partition, we could create these groups explicitly and then request spark_apply() to use them:

sdf_len(sc, 10) %>%
transmute(groups = id < 4) %>%
spark_apply(~nrow(.x), group_by = "groups")
# Source: spark<?> [?? x 2]
groups result
* <lgl>   <int>
1 TRUE        3
2 FALSE       7

Notice that spark_apply() is still processing two partitions, but in this case, we expect these partitions since we explicitly requested them in spark_apply(); therefore, you can safely interpret the results as “there are three integers less than four”.

Note: You can only group data by partitions that fit in a single machine, if one of the groups is too large, an exception will be thrown. To perform operations over groups that exceed the resources of a single node, you can consider partitioning to smaller units or use dplyr::do which is currently optimized for large partitions.

The takeaway from this section is to always consider partitions when dealing with spark_apply(). Next, we will zoom inside spark_apply() to understand how columns are interpreted.

## 11.5 Columns

By default, spark_apply(), will inspect the data frame being produced to find out column names and types automatically, for example:

sdf_len(sc, 1) %>%
spark_apply(~ data.frame(numbers = 1, names = "abc"))
# Source: spark<?> [?? x 2]
numbers names
*   <dbl> <chr>
1       1 abc 

However, this is inefficient since spark_apply() needs to run twice. First to find columns by computing spark_apply() against a subset of all the data, and then to compute the actual desired values.

To improve performance, the columns can be specified explicitly through the columns parameters. The columns expects a named list of types expected in the resulting data frame.

We can then rewrite the previous example to run only once by specifying the correct type for the numbers column:

sdf_len(sc, 1) %>%
spark_apply(
~ data.frame(numbers = 1, names = "abc"),
columns = list(numbers = "double", names = "character"))
# Source: spark<?> [?? x 2]
numbers names
*   <dbl> <chr>
1       1 abc 

In this section and the previous one, we presented how rows and columns interact with spark_apply(). The following section will allow us to make use of contextual information that is sometimes required when processing distributed datasets.

## 11.6 Context

In order to process partitions using spark_apply(), you might need to include auxility data that is small-enough to fit in each node. This was the case in the Grid Search use case, where the dataset was passed to all partitions and remained unpartitioned itself.

We can modify the initial f(x) = 10 * x example in this chapter to allow us to customize the multiplier – it was originally set to 10 but we can make it configurable by specifying it as the context parameter.

sdf_len(sc, 4) %>%
spark_apply(
function(data, context) context * data,
context = 100
)
# Source: spark<?> [?? x 1]
id
<dbl>
1   100
2   200
3   300
4   400

Figure ?? illustrates this example conceptually. Notice that the data partitions are still variable; however, the contextual parameter is distributed to all the nodes.

The grid search example used this parameter to pass a data frame to each worker node; however, since the context parameter is serialized as an R object, it can contain anything. For instance, if you need to pass multiple values – or even multiple datasets – you can pass a list with values.

The following example defines a f(x) = m * x + b function and runs m = 10 and b = 2:

sdf_len(sc, 4) %>%
spark_apply(
~.y$m * .x + .y$b,
context = list(b = 2, m = 10)
)
# Source: spark<?> [?? x 1]
id
<dbl>
1    12
2    22
3    32
4    42

Notice that we’ve renamed context to .y to shorten the variable name, this works since spark_apply() assumes context is the second parameter in functions and expressions.

The context parameter will proof to be extremely useful; for instance, the next section will present how to properly construct functions, and context will be used in advanced use cases to construct functions dependent on other functions.

## 11.7 Functions

Previous sections presented spark_apply() as an operation to perform custom transformations using a function or expression, in programming literature functions with a context are also referred as a closure.

Expressions are useful to define short transformations, like ~ 10 * .x. For an expression, .x contains a partition and .y the context, when available. However, it can be hard to define an expression for complex code that spans multiple lines, for those cases, functions are more appropriate.

Functions enable complex and multi-line transformations, they are defined as function(data, context) {} and allow you to provide arbitrary code within {}. We’ve used them in previous sections when using Google Cloud to transform images into image captions.

The function passed to spark_apply() is serialized using serialize(), which is described as “a simple low-level interface for serializing to connections.”. One of the current limitations of serialize() is that it won’t serialize objects being referenced outside of its environment. For instance, the following function will error out since the closures references external_value:

external_value <- 1
spark_apply(iris, function(e) e + external_value)

As workarounds to this limitation, you can add the functions your closure needs into the context followed by assigning the functions into the global environment:

func_a <- function() 40
func_b <- function() func_a() + 1
func_c <- function() func_b() + 1

sdf_len(sc, 1) %>% spark_apply(function(df, context) {
for (name in names(context)) assign(name, context[[name]], envir = .GlobalEnv)
func_c()
}, context = list(
func_a = func_a,
func_b = func_b,
func_c = func_c
))
# Source: spark<?> [?? x 1]
result
<dbl>
1     42

When this is not feasible, you can also create your own R package with the functionality you need and then use your package in spark_apply().

Up to this point, you’ve learned all the functionality available in spark_apply() using plain R code; however, we have not presented how to use packages when distributing computations – R packages are essential when creating useful transformations.

## 11.8 Packages

With spark_apply() you can use any R package inside Spark. For instance, you can use the broom package to create a tidy data frame from linear regression output.

spark_apply(
iris,
function(e) broom::tidy(lm(Petal_Length ~ Petal_Width, e)),
names = c("term", "estimate", "std.error", "statistic", "p.value"),
group_by = "Species")
# Source: spark<?> [?? x 6]
Species    term        estimate std.error statistic  p.value
<chr>      <chr>          <dbl>     <dbl>     <dbl>    <dbl>
1 versicolor (Intercept)    1.78     0.284       6.28 9.48e- 8
2 versicolor Petal_Width    1.87     0.212       8.83 1.27e-11
3 virginica  (Intercept)    4.24     0.561       7.56 1.04e- 9
4 virginica  Petal_Width    0.647    0.275       2.36 2.25e- 2
5 setosa     (Intercept)    1.33     0.0600     22.1  7.68e-27
6 setosa     Petal_Width    0.546    0.224       2.44 1.86e- 2

The first time you call spark_apply() all of the contents in your local .libPaths(), which contains all R packages, will be copied into each Spark worker node. Packages will only be copied once and will persist as long as the connection remains open. It’s not uncommon for R libraries to be several gigabytes in size, so be prepared for a one-time tax while the R packages are copied over to your Spark cluster. You can disable package distribution by setting packages = FALSE.

Note: Since packages are copied only once for the duration of the spark_connect() connection, installing additional packages is not supported while the connection is active. Therefore, if a new package needs to be installed, spark_disconnect() the connection, modify packages and reconnect. In addition, R packages are not copied in local mode because the packages already exist on the local system.

While this was a short section, using packages with distributed R code opens up and entire new universe of interesting use cases. Somewhere covered in the use cases section, but you can think of many more by looking at the rich ecosystem of R packages available today.

This section completes all the functionality you need to distribute R code with R packages, next we will cover some of the requirements your cluster needs to make use of spark_apply().

## 11.9 Cluster Requirements

All the functionality presented in previous chapters, does not require any special configuration of your Spark cluster – as long as you have a properly configured Spark cluster, you can use R with it. This is not the case for the functionality presented in this chapter, your cluster administrator, your cloud provider or yourself will have to configure your cluster by:

• Installing R in every node, to execute R code across your cluster.
• Optionally, installing Apache Arrow in every node when using Spark 2.3 or later, Arrow provides performance improvements that bring distributed R code closer to native Scala code.

Let’s take a look at each requirement to make sure you properly consider the tradeoffs or benefits that they provide.

### 11.9.1 Installing R

Starting with the first requirement, the R Runtime is expected to be pre-installed in every node in the cluster, this is a requirement specific to spark_apply().

Failure to install R in every node will trigger a Cannot run program, no such file or directory error while attempting to use spark_apply().

Contact your cluster administrator to consider making the R runtime available throughout the entire cluster. If R is already installed, you can specify the installation path to use using the spark.r.command configuration setting, as in:

config <- spark_config()
config["spark.r.command"] <- "<path-to-r-version>"

sc <- spark_connect(master = "local", config = config)
sdf_len(sc, 10) %>% spark_apply(function(e) e)

A Homogeneous Cluster is required since the driver node distributes, and potentially compiles, packages to the workers. For instance, the driver and workers must have the same processor architecture, system libraries, etc. This is usually the case for most clusters, but might not be the case for yours.

Different cluster managers, Spark distributions and cloud providers, support different solutions to install additional software, like R, across every node in the cluster; those instructions should be followed when installing R over each worker node. To mention a few,

Spark Standalone
Requires connecting to each machine and installing R; there are tools like pssh that allow you to run a single installation command against multiple machines.
Cloudera
Provides an R parcel, see “How to Distribute your R code with sparklyr and Cloudera Data Science Workbench” (???), which enables R over each worker node.
Amazon EMR
R is pre-installed when starting an EMR cluster as mentioned in the Amazon EMR section.
Microsoft HDInsight
R is pre-installed and no additional steps are needed.
Livy
Livy connections do not support distributing packages since the client machine where the libraries are precompiled might not have the same processor architecture, nor operating systems than the cluster machines.

Strictly speaking, this completes the last requirement for your cluster. However, we strongly recommend you use Apache Arrow with spark_apply() to support large-scale computation with minimal overhead.

### 11.9.2 Apache Arrow

Before we introduce Apache Arrow, we need to present how data is stored and transferred between Spark and R. R was designed from its inception to perform fast numeric computations, to accomplish this, figuring out the best way to store data is very important.

Some computing systems store data internally by row; however, most interesting numerical operations usually require processing data by column. For example, calculating the mean of a column requires processing each column on its own, not the entire row. Spark stores data by default by row, since it’s easier to partition; in contrast, R stores data by column. Therefore, something needs to transform both representations when data is transferred between Spark and R, see Figure ??.

This transformation from rows to columns needs to happen for each partition. In addition, data also needs to be transformed from Scala’s internal representation to R’s internal representation. These transformations wastes a lot of CPU cycles, Apache Arrow reduces these transformations.

Apache Arrow is a cross-language development platform for in-memory data. In Spark, it speeds up transferring data between Scala and R by defining a common data format compatible with many programming languages – instead of having to transform between Scala’s internal representation and R’s, the same structure can be used for both languages. In addition, transforming data from row based storage to columnar storage, is performed in parallel in Spark, which can be further optimized by using the columnar storage formats presented in the Data chapter. The improved transformation are available in Figure ??.

Arrow is not required but it is strongly advised while working spark_apply(). It has been available since Spark 2.3.0; however, it requires system administrators to install the Apache Arrow runtime in every node, see arrow.apache.org/install.

In addition, to use Apache Arrow with sparklyr you also need to install the arrow package:

devtools::install_github("apache/arrow", subdir = "r", ref = "apache-arrow-0.12.0")

Before we use arrow, we will take a measurement to validate

system.time(
sdf_len(sc, 10^4) %>% spark_apply(nrow) %>% collect()
)
   user  system elapsed
0.240   0.020   7.957

In our particular system, processing 10K rows takes about 8 seconds. To enable Arrow, simply include the library and use spark_apply() as usual. Lets measure how long it takes spark_apply() to process 1M rows:

library(arrow)
system.time(
sdf_len(sc, 10^6) %>% spark_apply(nrow) %>% collect()
)
   user  system elapsed
0.317   0.021   3.922

In our system, Arrow can process 100X more data in half the time, just 4 seconds.

Most functionality in arrow simply works on the background improving performance and data serialization; however, there is one setting you should be aware of. The spark.sql.execution.arrow.maxRecordsPerBatch configuration settings specifies the default size of each arrow data transfer. It’s shared with other Spark components and defaults to 10,000 rows.

library(arrow)
sdf_len(sc, 2 * 10^4) %>% spark_apply(nrow)
# Source: spark<?> [?? x 1]
result
<int>
1  10000
2  10000

You might need to adjust this number based on how much data your system can handle, making it smaller for large dataset or bigger for operations that require records to be processed together. We can change this setting to 5K rows and verify the partitions change appropriately:

config <- spark_config()
config$spark.sql.execution.arrow.maxRecordsPerBatch <- 5 * 10^3 sc <- spark_connect(master = "local", config = config) sdf_len(sc, 2 * 10^4) %>% spark_apply(nrow) # Source: spark<?> [?? x 1] result <int> 1 5000 2 5000 3 5000 4 5000 So far we’ve presented use cases, main operations and cluster requirements. The next and last section will teach you troubleshooting techniques useful when distributing R code. ## 11.10 Troubleshooting A custom transformation can fail for many reasons, to learn how to troubleshoot errors, lets simulate a problem by triggering an errors ourselves: sdf_len(sc, 1) %>% spark_apply(~stop("force an error")) Error in force(code) : sparklyr worker rscript failure, check worker logs for details Log: wm_bx4cn70s6h0r5vgsldm0000gn/T/Rtmpob83LD/file2aac1a6188_spark.log ---- Output Log ---- 19/03/11 14:12:24 INFO sparklyr: Worker (1) completed wait using lock for RScript Notice that the error message points out to inspect the logs. When running in local mode, you can simply run: spark_log(sc, filter = "terminated unexpectedly") 19/03/11 14:12:24 ERROR sparklyr: RScript (1) terminated unexpectedly: force an error  Which points out to the artificial stop("force an error") error we introduced ourselves. However, if you are not working in local mode, you will have to retrieve the worker logs from your cluster manager. Since this can be cumbersome, one alternative is to rerun spark_apply() but return the error message yourself: sdf_len(sc, 1) %>% spark_apply(~tryCatch( stop("force an error"), error = function(e) e$message
))
# Source: spark<?> [?? x 1]
result
<chr>
1 force an error

There are a few other more advanced troubleshooting techniques applicable to spark_apply(), the following sections present these techniques in-order; meaning, you should try to troubleshoot using worker logs first, followed by identifying partitioning errors and finally, attempting to debug a worker node.

### 11.10.1 Worker Logs

Whenever spark_apply() is executed, information regarding execution is written over each worker node. You can use this log to write custom messages to help you diagnose and fine-tune your code.

For instance, suppose that you don’t know what the first column name of df is, we can write a custom log message executed from the worker nodes using worker_log() as follows:

sdf_len(sc, 1) %>% spark_apply(function(df) {
worker_log("the first column in the data frame is named ", names(df)[[1]])
df
})
# Source: spark<?> [?? x 1]
id
* <int>
1     1

When running locally, we can filter the log entries for the worker as follows:

spark_log(sc, filter = "sparklyr: RScript")
18/12/18 11:33:47 INFO sparklyr: RScript (3513) the first column in the dataframe
is named id
18/12/18 11:33:47 INFO sparklyr: RScript (3513) computed closure
18/12/18 11:33:47 INFO sparklyr: RScript (3513) updating 1 rows
18/12/18 11:33:47 INFO sparklyr: RScript (3513) updated 1 rows
18/12/18 11:33:47 INFO sparklyr: RScript (3513) finished apply
18/12/18 11:33:47 INFO sparklyr: RScript (3513) finished 

Notice that the logs print our custom log entry showing that id is the name of the first column in the given data frame.

This functionality is useful when troubleshooting errors; for instance, if we force an error using the stop() function:

sdf_len(sc, 1) %>% spark_apply(function(df) {
stop("force an error")
})

You will get an error similar to,

 Error in force(code) :
sparklyr worker rscript failure, check worker logs for details

As suggested in the error, we can look in the worker logs for the specific errors as follows:

spark_log(sc)

This will show an entry containing the error and the call stack:

18/12/18 11:26:47 INFO sparklyr: RScript (1860) computing closure
18/12/18 11:26:47 ERROR sparklyr: RScript (1860) terminated unexpectedly:
force an error
18/12/18 11:26:47 ERROR sparklyr: RScript (1860) collected callstack:
11: stop("force and error")
10: (function (df)
{
stop("force and error")
})(structure(list(id = 1L), class = "data.frame", row.names = c(NA,
-1L)))

Notice that, spark_log(sc) only retrieves the worker logs when using local clusters, when running in proper clusters with multiple machines, you will have to use the tools and user interface provided by the cluster manager to find these log entries.

### 11.10.2 Resolving Timeouts

When running with several hundred executors, it becomes more likely that some tasks will hang indefinitely. You might be in this situation if most of the tasks in your job complete successfully, but a handful of them are still running and do not fail or succeed.

Suppose that we need to calculate the size of many web pages, we could use spark_apply() with something similar to:

sdf_len(sc, 3, repartition = 3) %>%
file.size("index.html"))

Some web pages might not exist or take too long to download. In which case, most tasks will succeed, but a few will hang. To prevent a few tasks from blocking all computations, you can use the spark.speculation Spark setting. When this setting is enabled, once 75% of all tasks succeed, Spark will look for tasks taking longer than the median task execution time and retry this tasks. You can use the spark.speculation.multiplier setting to configure the time multiplier used to consider a task slow.

Therefore, for the previous example, you can consider configuring Spark to retry tasks that take two times longer than the median as follows:

config <- spark_config()
config["spark.speculation"] <- TRUE
config["spark.speculation.multiplier"] <- 4

### 11.10.3 Inspecting Partition

If a particular partition fails, you can detect the broken partition by computing a digest, and then retrieving that particular partition. As usual, install digest from CRAN before connecting to Spark.

sdf_len(sc, 3) %>% spark_apply(function(x) {
worker_log("processing ", digest::digest(x), " partition")
x
})

This will add an entry similar to:

18/11/03 14:48:32 INFO sparklyr: RScript (2566)
processing f35b1c321df0162e3f914adfb70b5416 partition 

When executing this in your cluster, you will have to look in the logs for the task that is not finishing, once you have that digest, you can cancel the job.

Then you can use that digest to retrieve that specific data frame to R with something like:

sdf_len(sc, 3) %>% spark_apply(function(x) {
if (identical(digest::digest(x),
}) %>% collect()
# A tibble: 1 x 1
result
<int>
1      1

Which you can then run in R to troubleshoot further.

### 11.10.4 Debugging Workers

You can use a debugger, which is a tool to let you execute your code line-by-line, to troubleshoot spark_apply() for local connections. You can start spark_apply() in debug mode using the debug parameter and then following the instructions.

sdf_len(sc, 1) %>% spark_apply(function() {
stop("Error!")
}, debug = TRUE)
Debugging spark_apply(), connect to worker debugging session as follows:
1. Find the workers <sessionid> and <port> in the worker logs, from RStudio
click 'Log' under the connection, look for the last entry with contents:
'Session (<sessionid>) is waiting for sparklyr client to connect to
port <port>'
2. From a new R session run:
debugonce(sparklyr:::spark_worker_main)
sparklyr:::spark_worker_main(<sessionid>, <port>)

As the instructions indicate, you will need to connect “as the worker node” from a different R session and then step through the code. This method is not as straightforward as previous ones, since you will also need to step through some lines of sparklyr code; so this is something we only recommend as a last resort.

You can also use the online resources that were described in the Getting Started chapter. Lets now wrap up this chapter with a brief recap of all the functionality that was presented.

## 11.11 Recap

This chapter presented spark_apply() as an advanced technique that you can use to fill gaps in functionality in Spark or its many extensions. We presented sensible use cases for spark_apply() to parse data, model in parallel many small datasets, perform grid search and call web APIs. You learned how partitions relate to spark_apply(), how to create custom groups, distribute contextual information across all nodes, troubleshoot problems and presented limitations, cluster configuration caveats.

We also introduced Apache Arrow as a library we strongly recommend when using Spark with R and presented installation, use cases and considerations you should be aware.

Up to this chapter, we’ve only worked with large datasets of static data. As in, we’ve assumed our datasets do not change over time and remain invariant while analysing, modeling and visualizing them. In the next chapter, Streaming, we will introduce techniques to process datasets which, in addition to being large, are also growing an resemble a stream of information.