Chapter 8 Data

“Has it occurred to you that she might not have been a reliable source of information?”

— Jon Snow

The previous chapter, Connections, presented how to connect to Spark clusters, either on-premise or in the cloud; with previous knowledge acquired in previous chapters, you are now equipped to start doing analysis and modeling at scale! However, so far we haven’t really explained much about how to read data into Spark, we’ve used copy_to() to upload small datasets or functions like spark_read_csv() or spark_write_csv() without explaining in detail how and why.

So, we are about to learn how to read and write data using Spark; while this is important on it’s own, this chapter will also introduce you to data lakes – a repository of data stored in its natural or raw format which provides various benefits over existing storage architectures. For instance, you can easily integrate data from external systems without transforming into a common format and without assuming they are as reliable as your internal data sources.

In addition, we will also present how to extend Spark’s capabilities to work with data not accessible out-of-the-box and several recommendations focused on improving performance for reading and writing data. Reading large datasets often requires you to fine-tune your Spark cluster configuration, but this will be the topic of the next chapter, Tuning.

8.1 Overview

In the Introduction chapter, you learned that beyond big data and big compute, you can also use Spark to improve velocity, variety and veracity in data tasks. While you can use the learnings of this chapter for any task requiring loading and storing data, it is particularly interesting to present this chapter in the context of dealing with a variety of data sources. To understand why, we should first take a quick detour understanding how data is currently processed in many organizations.

For several years, it’s been a common practice to store large datasets in a relational database, a system first proposed in 1970 by Edgar F. Codd. (Codd 1970) You can think of a database as a collection of tables that are related to one another where each table is carefully designed to hold specific data types and relationships to other tables. Most relational database systems use SQL (Structured Query Language) for querying and maintaining the database. Databases are still widely used today, with good reason – Databases store data reliably and consistently, your bank probably stores account balances in a database and that’s a good practice!

However, databases have also been used to store information from other applications and systems. For instance, your bank may also store data produced by other banks, say incoming checks. In order to accomplish this, the external data needs to be extracted from the external system, transformed into something that fits the current database and finally load it. This is known as ETL (extract-transform-load), a general procedure of copying data from one or more sources into a destination system which represents the data differently from the source, this process became popular in the 1970s.

Aside from databases, data is often also loaded into a data warehouse, a system used for reporting and data analysis; usually stored and indexed in a format that increases data analysis speed, but which is often not suitable for modeling or running custom distributed code. The challenge is that, changing databases and data warehouses is usually a long and delicate process since data needs to be re-indexed and the data from multiple data sources needs to be carefully transformed into single tables that can share across data sources.

Instead of trying to transform all data sources into a common format, you can embrace this variety of data sources in a data lake – a system or repository of data stored in its natural format, see Figure ??. Since data lakes make data available in its original format, there is no need to carefully transform it in advance, anyone can use it for analysis which adds significant flexibility over ETL. Spark can then be used to unify data processing from data lakes, databases and data warehouses through a single interface that is scalable across all of them. Some organizations also use Spark to replace their existing ETL process; however, this falls in the realm of data engineering which is well beyond the scope of this book, we illustrate this with dotted lines in Figure ??.

Spark processing raw data from a data lakes, databases and data warehouses||data-data-lake

Spark processing raw data from a data lakes, databases and data warehouses||data-data-lake

In order to support a broad variety of data source, Spark needs to be able to read and write data in several different file formats (CSV, JSON, Parquet, etc), access them while stored in several file systems (HDFS, S3, DBFS, etc) and, potentially, interoperate with other storage systems (databases, data warehouses, etc). We will get to all of that; but first, we will start by presenting how to read, write and copy data using Spark.

8.2 Read

If you are new to Spark, it is highly recommended to review this section before you start working with large datasets. We will introduce several techniques that improve the speed and efficiency of reading data. Each subsection will present specific ways to take advantage of how Spark reads files, such as the ability to treat entire folders as datasets, as well as being able to describe them to read datasets faster in Spark.

8.2.1 Paths

When analyzing data, loading multiple files into a single data object is a common scenario. In R, we typically use a loop or a functional programming directive to accomplish this. That is because R has to load each file individually into your R session. Let’s create a few CSV files inside a folder and read them with R first,

letters <- data.frame(x = letters, y = 1:length(letters))

dir.create("data-csv")
write.csv(letters[1:3, ], "data-csv/letters1.csv", row.names = FALSE)
write.csv(letters[1:3, ], "data-csv/letters2.csv", row.names = FALSE)

do.call("rbind", lapply(dir("data-csv", full.names = TRUE), read.csv))
  x y
1 a 1
2 b 2
3 c 3
4 a 1
5 b 2
6 c 3

In Spark, there is the notion of a folder as a dataset. Instead of enumerating each file, simply pass the path containing all the files. Spark assumes that every file in that folder is part of the same dataset. This implies that the target folder should only be used for data purposes. This is specially important since storage systems like HDFS, store files across multiple machines but conceptually, they are stored in the same folder; when Spark reads the files from this folder, is actually executing distributed code to read each file inside each machine – no data is transferred between machines when distributed files are read!

library(sparklyr)
sc <- spark_connect(master = "local", version = "2.3")

spark_read_csv(sc, "data-csv/")
# Source: spark<datacsv> [?? x 2]
   x         y
   <chr> <int>
 1 a         1
 2 b         2
 3 c         3
 4 d         4
 5 e         5
 6 a         1
 7 b         2
 8 c         3
 9 d         4
10 e         5

The “folder as a table” idea is found in other open source technologies as well. Under the hood, Hive tables work the same way. When querying a Hive table, the mapping is done over multiple files inside the same folder. The folder’s name usually match the name of the table visible to the user.

Next we will present a technique that allows Spark to read file faster, as well as to reduce read failures, by describing the structure of a dataset in advance.

8.2.2 Schema

When reading data, Spark is able to determine the data source’s column names and column types, also known as the schema. However, guessing the schema comes at a cost; Spark has to do an initial pass on the data to guess what it is. For large dataset, this may add a significant amount of time to the data ingestion process, which can become costly even for medium size datasets. For files that are read over and over again, the additional read time accumulates over time.

To avoid this, Spark allows you to provide a column definition py providing a columns argument to describe your dataset. You can create this schema by sampling a small portion of the original file yourself,

spec_with_r <- sapply(read.csv("data-csv/letters1.csv", nrows = 10), class)
spec_with_r
        x         y 
 "factor" "integer" 

Or you can set the column specification to a vector containing the column types explicitly. The vector’s values are named to match the field names.

spec_explicit <- c(x = "character", y = "numeric")
spec_explicit
          x           y 
"character"   "numeric" 

The accepted variable types are: integer, character, logical, double, numeric, factor, Date and POSIXct.

Then, when reading using spark_read_csv(), you can pass col_spec_1 to the columns argument to match the names and types of the original file. This helps improve performance since Spark will not have to figure out the column types.

spark_read_csv(sc, "data-csv/", columns = spec_with_r)
# Source: spark<datacsv> [?? x 2]
  x         y
  <chr> <int>
1 a         1
2 b         2
3 c         3
4 a         1
5 b         2
6 c         3

The following example shows how to set the field type to something different. However, the new field type needs to be a compatible type in the original dataset. For example, a character field can not be set to numeric. If an incompatible type is used, the file read will fail with an error. Additionally, the following example also changes the names of the original fields.

spec_compatible <- c(my_letter = "character", my_number = "character")

spark_read_csv(sc, "data-csv/", columns = spec_compatible)
# Source: spark<datacsv> [?? x 2]
  my_letter my_number
  <chr>     <chr>    
1 a         1        
2 b         2        
3 c         3        
4 a         1        
5 b         2        
6 c         3    

In Spark, malformed entries can cause errors during reading, specially for non-character fields. To prevent such errors, we can use a file spec that imports them as character, and then use dplyr to coerce the field into the desired type.

This subsection reviewed how we can read files faster and with less failures, which lets us start our analysis quicker. Another way to accelerate our analysis, is by loading less data into Spark memory, the next subsection will cover how to do this.

8.2.3 Memory

By default, when using Spark with R, when you read data is copied into Sparks distributed memory – making data analysis and other operations very fast. There are cases, such as when the data is too big, where loading all the data may not be practical, or even necessary. For those cases, Spark can then just “map” the files without copying data into memory.

The mapping creates a sort of “virtual” table in Spark. The implication is that when a query runs against that table, Spark has to read the data from the files at that time. Any consecutive reads after that will do the same. In effect, Spark becomes a pass-through for the data. The advantage of this method is that there is almost no up-front time cost to “reading” the file, the mapping is very fast. The downside is that running queries that actually extract data will take longer.

This is controlled by the memory argument of the read functions. Setting it to FALSE prevents the data copy. It defaults to TRUE.

mapped_csv <- spark_read_csv(sc, "data-csv/", memory = FALSE)

There are good use cases for this method, one of them is when not all columns of a table are needed. For example, take a very large file that contains many columns, assuming this is not the first time you interact with this data, you would know what columns are needed for the analysis. When know which columns you need, the files can be read using memory = FALSE, and then select the needed columns with dplyr. The resulting dplyr variable can then be cached into memory, using the compute() function. This will make Spark query the file(s), pull the selected fields, and copy only that data into memory. The result is a in-memory table that took comparatively less time to ingest.

mapped_csv %>%
  dplyr::select(y) %>%
  dplyr::compute("test")

The next subsection covers a short technique to make it easier to carry the original field names of imported data.

8.2.4 Columns

Spark 1.6 required that column names be sanitized, so R does that by default. There may be cases when you would like to keep the original names intact, and are also working with Spark version 2.0 or above. To do that set the sparklyr.sanitize.column.names option to FALSE.

options(sparklyr.sanitize.column.names = FALSE)
copy_to(sc, iris, overwrite = TRUE)
# Source:   table<iris> [?? x 5]
# Database: spark_connection
   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
          <dbl>       <dbl>        <dbl>       <dbl> <chr>  
 1          5.1         3.5          1.4         0.2 setosa 
 2          4.9         3            1.4         0.2 setosa 
 3          4.7         3.2          1.3         0.2 setosa 
 4          4.6         3.1          1.5         0.2 setosa 
 5          5           3.6          1.4         0.2 setosa 
 6          5.4         3.9          1.7         0.4 setosa 
 7          4.6         3.4          1.4         0.3 setosa 
 8          5           3.4          1.5         0.2 setosa 
 9          4.4         2.9          1.4         0.2 setosa 
10          4.9         3.1          1.5         0.1 setosa 
# ... with more rows

After reviewing how to read data into Spark in this section, the next section will cover how we can write data from our Spark session.

8.3 Write

Some projects require that new data generated in Spark to be written back to a remote source. For example, the data could be new predicted values returned by a Spark model. The job processes the mass generation of predictions, but then the predictions need to be stored. This section will focus on how we should use Spark for moving the data from Spark into an external destination.

Many new users start by downloading Spark data into R, to then upload it to a target. Figure ?? illustrates this approach. It works for smaller data sets, but it becomes inefficient for larger ones. The data typically grows in size to the point that it is no longer feasible for R to be the middle point.

Incorrect use of Spark when writing large datasets||data-avoid-approach

Incorrect use of Spark when writing large datasets||data-avoid-approach

All efforts should be made to have Spark connect to the target location. This way, reading, processing and writing happens within the same Spark session.

As Figure ?? shows, a better approach is to use Spark to read, process, and write to the target. This approach is able to scale as big as the Spark cluster allows, and prevents using R as a choke point.

Correct use of Spark when writing large datasets||data-recommended-approach

Correct use of Spark when writing large datasets||data-recommended-approach

Consider the following scenario: A Spark job just processed predictions for a large data set, resulting in a considerably amount of predictions. Choosing a method to write results will depend on the technology infrastructure you are working on. More specifically, it will depend on Spark and the target running, or not, in the same cluster.

Back to our scenario, we have a large dataset in Spark that needs to be saved. When Spark and the target are in the same cluster, copying the results is not a problem, the data transfer is between RAM and disk of the same cluster or efficiently shuffled through a high-bandwidth connection.

But what to do if the target is not within the Spark cluster? There are two options, choosing one will depend on the size of the data, and network speed:

Spark Transfer
In this case, Spark connects to the remote target location, and copies the new data. If this is done within the same Data Center, or cloud provider, the data transfer could be fast enough to have Spark write the data directly.
External Transfer
Otherwise, Spark can write the results to disk, and transfers them via a third-party application. Spark writes the results as files, and then a separate job copies the files over. In the target location, you would use a separate process to transfer the data into the target location.

It is best to recognize that Spark, R, and any other technology are tools. No tool can do everything, nor should be expected to. Next we will describe how to copy data into Spark or collect large datasets that don’t fit in-memory, this can be used to transfer data across clusters, or help initialize your distributed datasets.

8.4 Copy

Previous chapters used copy_to() as a handy helper to copy data into Spark; however, copy_to() can only be used to transfer in-memory datasets that are already loaded in memory; this dataset tend to be much smaller than the kind of datasets you would want to copy into Spark.

For instance, suppose that we have a 3GB dataset generated as follows,

dir.create("largefile.txt")
write.table(matrix(rnorm(10 * 10^6), ncol = 10), "largefile.txt/1",
            append = T, col.names = F, row.names = F)
for (i in 2:30) 
  file.copy("largefile.txt/1", paste("largefile.txt/", i))

If we only had 2GB of memory in the driver node, we would not be able to load this 3GB file into memory using copy_to(). Instead, when using the Hadoop file system as storage in your cluster, you can use the hadoop command line tool to copy files from disk into Spark from the terminal as follows. Please notice that the following code will only work in clusters using HDFS, not in local environments.

hadoop fs -copyFromLocal largefile.txt largefile.txt

You can then read the uploaded file as described in the file formats section; for text files, you would run:

spark_read_text(sc, "largefile.txt", memory = FALSE)
# Source: spark<largefile> [?? x 1]
   line                                                                   
   <chr>                                                                  
 1 0.0982531064914565 -0.577567317599452 -1.66433938237253 -0.20095089489…
 2 -1.08322304504007 1.05962389624635 1.1852771207729 -0.230934710049462 …
 3 -0.398079835552421 0.293643382374479 0.727994248743204 -1.571547990532…
 4 0.418899768227183 0.534037617828835 0.921680317620166 -1.6623094393911…
 5 -0.204409401553028 -0.0376212693728992 -1.13012269711811 0.56149527218…
 6 1.41192628218417 -0.580413572014808 0.727722566256326 0.5746066486689 …
 7 -0.313975036262443 -0.0166426329807508 -0.188906975208319 -0.986203251…
 8 -0.571574679637623 0.513472254005066 0.139050812059352 -0.822738334753…
 9 1.39983023148955 -1.08723592838627 1.02517804413913 -0.412680186313667…
10 0.6318328148434 -1.08741784644221 -0.550575696474202 0.971967251067794…
# … with more rows

collect() has a similar limitation, it can only collect datasets that fit your driver memory; however, if you had to extract a large dataset from Spark through the driver node, you could use specialized tools provided by the distributed storage, for HDFS you would run,

hadoop fs -copyToLocal largefile.txt largefile.txt

Alternatively, you can also collect datasets that don’t fit in-memory by providing a callback to collect(). A callback is just an R function that will be called over each Spark partition, you can then write this dataset to disk or push to other clusters over the network.

The following code could be use to collect 3GB even if the driver node collecting this dataset had less than 3GB of memory. That said, as explained in the Analysis chapter, you should avoid collecting large datasets into a single machine since this creates a significant performance bottleneck. For conciseness, we will collect only the first million rows; feel free to remove head(10^6) if you havee a few minutes to spare.

dir.create("large")
spark_read_text(sc, "largefile.txt", memory = FALSE) %>%
  head(10^6) %>%
  collect(callback = function(df, idx) {
    writeLines(df$line, paste0("large/large-", idx, ".txt"))
  })

Make sure you clean up these large files and empty your recycle bin as well,

unlink("largefile.txt", recursive = TRUE)
unlink("large", recursive = TRUE)

In most cases, data will already be stored in the cluster, so you should not have to worry about copying large-datasets; instead, you can usually focus on reading and writing different file formats, which we will describe next.

8.5 File Formats

Out-of-the-box, Spark is able to interact with several file formats like: CSV, JSON, LIBSVM, ORC and Parquet. The following table maps the file format to the function you should use to read and write data in Spark:

Format Read Write
Comma separated values (CSV) spark_read_csv() spark_write_csv()
JavaScript Object Notation (JSON) spark_read_json() spark_write_json()
Library for Support Vector Machines (LIBSVM) spark_read_libsvm() spark_write_libsvm()
Optimized Row Columnar (ORC) spark_read_orc() spark_write_orc()
Apache Parquet spark_read_parquet() spark_write_parquet()
Text spark_read_text() spark_write_text()

The following sections will describe special considerations particular to each file format and present some of the strengths and weaknesses of some of the popular file formats, starting with the well-known CSV file format.

8.5.1 CSV

The CSV format may be the most common file type in use today, which is defined by a text file separated by a given character, usually a comma. It should be pretty straightforward to read CSV files; however, it’s worth mentioning a couple techniques that can help you process CSVs that are not fully compliant with a well-formed CSV file. Spark offers the following modes for addressing parsing issues:

  • Permissive: NULL values are inserted for missing tokens.

  • Drop Malformed: Drops lines which are malformed.

  • Fail Fast: Aborts if encounters any malformed line.

These can be used in sparklyr by passing them inside the options argument. The following example creates a file with a broken entry. It then shows how it can be read into Spark.

## Creates bad test file
writeLines(c("bad", 1, 2, 3, "broken"), "bad.csv")

spark_read_csv(
  sc,
  "bad3",
  "bad.csv",
  columns = list(foo = "integer"),
  options = list(mode = "DROPMALFORMED"))
# Source: spark<bad3> [?? x 1]
    foo
  <int>
1     1
2     2
3     3

Spark provides an issue tracking column which was hidden by default. To enable it, add _corrupt_record to the columns list. This can be combined with the use of the PERMISSIVE mode. All rows will be imported, invalid entries will receive an NA, and the issue tracked in the _corrupt_record column.

spark_read_csv(
  sc,
  "bad2",
  "bad.csv",
  columns = list(foo = "integer", "_corrupt_record" = "character"),
  options = list(mode = "PERMISSIVE")
)
# Source: spark<bad2> [?? x 2]
    foo `_corrupt_record`
  <int> <chr>            
1     1 NA               
2     2 NA               
3     3 NA               
4    NA broken  

Reading and storing data as CSVs is quite common and supported across most systems; so for tabular datasets, is still a popular option; for datasets containing nested structures and non-tabular data, JSON is usually preferred

8.5.2 JSON

JSON is a file format originally derived from JavaScript which has grown to be language-independent and very popular due to its flexibility and ubiquitous support. Reading and writing JSON files is quite straightforward,

writeLines("{'a':1, 'b': {'f1': 2, 'f3': 3}}", "data.json")
simple_json <- spark_read_json(sc, "data.json")
simple_json
# Source: spark<data> [?? x 2]
      a b         
  <dbl> <list>    
1     1 <list [2]>

However, when you deal with dataset containing nested fields like the one from this example; it is worth pointing out how to extract nested fields. One approach is to use a JSON Path, which is a domain specific syntax commonly used to extract and query JSON files, you can use a combination of get_json_object() and to_json() to specify the JSON path you are interested in, to extract f1 you would run the following transformation:

simple_json %>% dplyr::transmute(z = get_json_object(to_json(b), '$.f1'))
# Source: spark<?> [?? x 3]
      a b          z    
  <dbl> <list>     <chr>
1     1 <list [2]> 2 

Another approach is to install sparkly.nested from CRAN with install.packages("sparklyr.nested") and then unnest nested data with sdf_unnest():

sparklyr.nested::sdf_unnest(simple_json, "b")
# Source: spark<?> [?? x 3]
      a    f1    f3
  <dbl> <dbl> <dbl>
1     1     2     3

While JSON and CSVs are quite simple to use and versatile, they are not optimized for performance; but other formats like ORC, AVRO and Parquet are.

8.5.3 Parquet

Apache Parquet, Apache ORC and Apache AVRO are all file formats designed with performance in mind. Parquet and ORC happen to store data in columnar format while AVRO is row-based, all of them are binary file formats which reduces storage space and improve performance. This at the cost of making them a bit harder to read by external systems and libraries; however, this is usually not an issue when used as intermediate data storage within Spark.

To illustrate this, Figure ?? plots the result of running a one-million-rows write-speed benchmark using the bench package, please use your own benchmarks over meaningful datasets when deciding which formats fits best your needs.

numeric <- copy_to(sc, data.frame(nums = runif(10^6)))
bench::mark(
  CSV = spark_write_csv(numeric, "data.csv", mode = "overwrite"),
  JSON = spark_write_json(numeric, "data.json", mode = "overwrite"),
  Parquet = spark_write_parquet(numeric, "data.parquet", mode = "overwrite"),
  ORC = spark_write_parquet(numeric, "data.orc", mode = "overwrite"),
  iterations = 20
) %>% ggplot2::autoplot()
One million rows write benchmark between CSV, JSON, Parquet and ORC||data-file-format-benchmark

One million rows write benchmark between CSV, JSON, Parquet and ORC||data-file-format-benchmark

From now on, please disconnect from Spark whenever we present a new spark_connect() command.

spark_disconnect(sc)

This concludes the introduction to some of the out-of-the-box supported file formats, we will present next how to deal with formats that require external packages and customization.

8.5.4 Others

Spark is a very flexible computing platform, it can add functionality by using extension programs, called packages. Accessing a new source type or file system can be done by using the appropriate package.

Packages need to be loaded into Spark at connection time. To load the package, Spark needs its location, which could be inside the cluster, in a file share or the Internet.

In sparklyr, the package location is passed to spark_connect(). All packages should be listed in the sparklyr.connect.packages entry of the connection configuration.

It is possible to access data source types not listed above. Loading the appropriate default package for Spark is the first of two steps The second step is to actually read or write the data. The spark_read_source() and spark_write_source() functions do that. They are generic functions that can use the libraries imported by a default package.

For instance, we can read XML files as follows:

sc <- spark_connect(master = "local", version = "2.3", config = list(
  sparklyr.connect.packages = "com.databricks:spark-xml_2.11:0.5.0"))

writeLines("<ROWS><ROW><text>Hello World</text></ROW>", "simple.xml")
spark_read_source(sc, "simple_xml", "simple.xml", "xml")
# Source: spark<data> [?? x 1]
  text       
  <chr>      
1 Hello World

Which you can also write back to XML with ease as follows,

tbl(sc, "simple_xml") %>%
  spark_write_source("xml", options = list(path = "data.xml"))

In addition, there are a few extensions developed by the R community to load additional file formats, to mention a few: sparklyr.nested to assist with nested data, spark.sas7bdat to read data from SAS, sparkavro to read data in AVRO format and sparkwarc to read WARC files which use extensibility mechanisms introduced in the Extensions chapter. The Distributed R chapter will present techniques to use R packages to load additional file formats and the Contributing chapter presents techniques to use Java libraries to complement this further. But first, let’s explore how to retrieve and store files from several different file systems.

8.6 File Systems

Spark will default to the file system that it is currently running on. In a YARN managed cluster, the default file system will be HDFS. An example path of “/home/user/file.csv” will be read from cluster’s HDFS folders, and not the Linux folders. The Operating System’s file system will be accessed for other deployments, such as Stand Alone, and sparklyr’s local.

The file system protocol can be changed when reading or writing. It is done via the path argument of the sparklyr function. For example, a full path of “file://home/user/file.csv” will force the use of the local Operating System’s file system.

There are many other file system protocols like: dbfs:// for Databricks’ file system, s3a:// for Amazon’s S3 service, wasb:// for Microsoft Azure storage, gs:// for Google storage, etc.

Spark does not provide support for all them directly; instead, they are configured as needed. For instance, accessing the “s3a” protocol requires adding a package to the sparklyr.connect.packages configuration setting while connecting and appropriate credentials might need to be specified using the AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY environment variables.

Sys.setenv(AWS_ACCESS_KEY_ID = my_key_id)
Sys.setenv(AWS_SECRET_ACCESS_KEY = my_secret_key)

sc <- spark_connect(master = "local", version = "2.3", config = list(
  sparklyr.connect.packages = "org.apache.hadoop:hadoop-aws:2.7.7"))

my_file <- spark_read_csv(sc, "my-file", path =  "s3a://my-bucket/my-file.csv")

Accessing other file protocol requires loading different packages; although, in some cases, the vendor providing the Spark environment might loading the package for you. Please refer to your vendor’s documentation to find out if that is the case.

The section file systems in the Appendix contains tips on how to read and write data from specific file systems.

8.7 Storage Systems

A data lake and Spark usually go hand-in-hand with optional access to storage systems like databases and data warehouses. Presenting all the different storage systems with appropriate examples would be quite time consuming, so instead we will present some of the commonly used storage systems

As a start, Apache Hive is a data warehouse software that facilitates reading, writing, and managing large datasets residing in distributed storage using SQL. In fact, Spark has component from Hive built directly into Spark’s sources and is very common to have installations of Spark or Hive side-by-side, so we will start by presenting Hive, followed by Cassandra and closing on JDBC connections.

8.7.1 Hive

In YARN managed clusters, Spark provides a deeper integration with Apache Hive. Hive tables are easily accessible after opening a Spark connection.

Accessing a Hive table’s data can be done using DBI, by referencing a table in a SQL statement.

sc <- spark_connect(master = "local", version = "2.3")
spark_read_csv(sc, "test", "data-csv/", memory = FALSE)

DBI::dbGetQuery(sc, "SELECT * FROM test limit 10")

Another way to reference a table is with dplyr using the tbl() function, which retrieves a reference to the table.

dplyr::tbl(sc, "test")

It is important to reiterate that no data is imported into R, the tbl() function only creates a reference. You can then pipe more dplyr verbs following the tbl() command.

dplyr::tbl(sc, "test") %>%
  dplyr::group_by(y) %>%
  dplyr::summarise(totals = sum(y))

Hive table references assume a default database source. Often, the table needed table is in a different database within the Metastore. To access it using SQL, prefix the database name to the table. Separate them using a period.

DBI::dbSendQuery(sc, "SELECT * FROM databasename.table")

In dplyr, the in_schema() function can be used. The function is used inside the tbl() call.

tbl(sc, dbplyr::in_schema("databasename", "table"))

You can also use the tbl_change_db() function to set the current session’s default database. Any subsequent call via DBI or dplyr will use the selected name as the default database.

tbl_change_db(sc, "databasename")

The following examples require additional Spark packages and database which might be hard to follow unless you happen to have a JDBC driver or Cassandra database accessible to you; therefore, lets disconnect from Spark:

spark_disconnect(sc)

Next we will explore a less structured storage system, often referred to as NoSQL databases.

8.7.2 Cassandra

Apache Cassandra is a free and open-source, distributed, wide column store, NoSQL database management system designed to handle large amounts of data across many commodity servers. While there are many other database systems beyond Cassandra, taking a quick look at how Cassandra can be used form Spark will also inform how to make use of other Database and storage systems like Solr, Redshift, Delta Lake, etc.

The following example code shows how to use the datastax:spark-cassandra-connector package to read from Cassandra. The key is to use the org.apache.spark.sql.cassandra library as the source argument. It provides the mapping Spark can use to make sense of the data source. Unless you have a Cassandra database, skip executing the following statement.

sc <- spark_connect(master = "local", version = "2.3", config = list(
  sparklyr.connect.packages = "datastax:spark-cassandra-connector:2.3.1-s_2.11"))

spark_read_source(
  sc, 
  name = "emp",
  source = "org.apache.spark.sql.cassandra",
  options = list(keyspace = "dev", table = "emp"),
  memory = FALSE)

One of the most useful features of Spark when dealing with external databases and data warehouses is that, Spark can push down computation to the Database, this is known as pushdown predicates; in a nutshell, it improves performance by asking remote databases smart questions. When you execute a query that contains the filter(age > 20) expression against a remote table referenced through spark_read_source() and not loaded in-memory, rather than bringing the entire table into Spark, it will get passed to the remote database and only a subset of the remote table is retrieved.

While is ideal to find Spark packages that support the remote storage system, there will be times when a package is not available and vendor JDBC drivers need to be considered.

8.7.3 JDBC

When a Spark package is not available to provide connectivity, a JDBC connection can be considered. JDBC stands for Java Database Connectivity, t’s an interface for the programming language Java, which defines how a client may access a database.

It is quite easy to connect to a remote database with spark_read_jdbc() and spark_write_jdbc(); as long as you have access to the appropriate JDBC driver, which at times is trivial and other times quite an adventure. To keep this simple, we can briefly consider how a connection to a remote MySQL database could be accomplished.

First, you would need to download the appropriate JDBC driver from the MySQL’s developer portal and specify this additional driver as a sparklyr.shell.driver-class-path connection option. Since JDBC drivers are Java-based, the code is contained inside a JAR (Java ARchive) file. Once connected to Spark with the appropriate driver, you can use the jdbc:// protocol to access particular drivers and databases. Unless you are willing to download and configure MySQL on your own, skip executing the following statement.

sc <- spark_connect(master = "local", version = "2.3", config = list(
  "sparklyr.shell.driver-class-path" =
    "~/Downloads/mysql-connector-java-5.1.41/mysql-connector-java-5.1.41-bin.jar"
))

spark_read_jdbc(sc, "person_jdbc",  options = list(
  url = "jdbc:mysql://localhost:3306/sparklyr",
  user = "root", password = "<password>",
  dbtable = "person"))

If you are a customer of particular database vendors, making use of the vendor-provided resources is usually the best place to start looking for appropriate drivers.

8.8 Recap

This chapter expanded on how and why you should use Spark to connect and process a variety of data sources, through a new data storage model known as data lakes – a storage pattern that provides more flexibility than standard ETL processes by enabling you to use raw datasets with, potentially, more information to enrich data analysis and modeling.

We then presented best-practices for reading, writing and copying data into and from Spark and came back to exploring the components of a data lake: file formats and file systems; the former representing how data is stored, and the latter where the data is stored. You then learned how to tackle file formats and storage systems that require additional Spark packages, reviewed some of the performance tradeoffs across file formats, and learned the concepts required to make use of storage systems (databases and warehouses) in Spark.

While reading and writing dataset should come naturally to you, you might still hit resource restrictions while reading and writing large-datasets. Therefore, in the next chapter you will learn how Spark manages tasks and data across multiple machines, which will in turn allow you to further improve the performance of your analysis and modeling tasks.

References

Codd, Edgar F. 1970. “A Relational Model of Data for Large Shared Data Banks.” ACM.