Chapter 8 Data

The goal of this chapter is to help you learn how to access, read and write data using Spark. It will provide the necessary background to help you work with a variety of data.

This chapter will cover how to access data in different source types and file systems. It will show you the pattern of how to extend Spark’s capabilities to work with data no accessible “out-of-the-box”.

Additionally, this chapter will introduce several recommendations. The recommendations will focused on improving performance and efficiency for writing or reading data.

8.1 Source types and file systems

It may be challenging accessing data for the first time. The likely reasons are problems with a new source type, or file system.

“Out-of-the-box”, Spark is able to interact with several source types and file system. Source types include: Comma separated values (CSV), Apache Parquet, and JDBC. File system protocols include: local file system (Linux, Windows, Mac), and Hadoop file System (HDFS).

There is a way for Spark to interact with other source types and file systems. The next sub section will cover how to do that.

8.1.1 Default packages

Spark is a very flexible computing platform. It can add functionality by using extension programs, called packages. Accessing a new source type or file system can be done by using the appropriate package.

Packages need to be loaded into Spark at connection time. To load the package, Spark needs its location. The location could be inside the cluster, in a file share or the Internet.

In sparklyr, the package location is passed to spark_connect(). All packages should be listed in the defaultPackages entry of the connection configuration. Here is an example that loads the package needed to access Amazon S3 buckets:

conf <- spark_config()
conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.7"
sc <- spark_connect(master = "local", config = conf)

8.1.2 Source types

Spark can read and write several source types. In sparklyr, the source types are aligned to R functions:

Format Read Write
Comma separated values (CSV) spark_read_csv() spark_write_csv()
JavaScript Object Notation (JSON) spark_read_json() spark_write_json()
Library for Support Vector Machines (LIBSVM) spark_read_libsvm() spark_write_libsvm()
Java Database Connectivity (JDBC) spark_read_jdbc() spark_write_jdbc()
Optimized Row Columnar (ORC) spark_read_orc() spark_write_orc()
Apache Parquet spark_read_parquet() spark_write_parquet()
Text spark_read_text() spark_write_text()

8.1.2.1 New Source Type

It is possible to access data source types not listed above. Loading the appropriate default package for Spark is the first of two steps The second step is to actually read or write the data. The spark_read_source() and spark_write_source() functions do that. They are generic functions that can use the libraries imported by a default package.

The following example code shows how to use the datastax:spark-cassandra-connector package to read from Cassandra. The key is to use the org.apache.spark.sql.cassandra library as the source argument. It provides the mapping Spark can use to make sense of the data source.

con <- spark_config()
conf$sparklyr.defaultPackages <- "datastax:spark-cassandra-connector:2.0.0-RC1-s_2.11"
sc <- spark_connect(master = "local", config = conf)
spark_read_source(
  sc, 
  name = "emp",
  source = "org.apache.spark.sql.cassandra",
  options = list(keyspace = "dev", table = "emp")
  )

8.1.3 File systems

Spark will default to the file system that it is currently running on. In a YARN managed cluster, the default file system will be HDFS. An example path of “/home/user/file.csv” will be read from cluster’s HDFS folders, and not the Linux folders. The Operating System’s file system will be accessed for other deployments, such as Stand Alone, and sparklyr’s local.

The file system protocol can be changed when reading or writing. It is done via the path argument of the sparklyr function. For example, a full path of “file://home/user/file.csv” will force the use of the local Operating System’s file system.

There are other file system protocols. An example is Amazon’s S3 service. Spark is does not know how to read the S3 protocol. Accessing the “s3a” protocol involves adding a package to the defaultPackages configuration variable passed at connection time.

conf <- spark_config()
conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.7"
sc <- spark_connect(master = "local", config = conf)
my_file <- spark_read_csv(sc, "my-file", path =  "s3a://my-bucket/my-file.csv")

Currently, only “file://” and “hdfs://” file protocols are supported when used in their respective environments. Accessing a different file protocol requires loading a default package. In some cases, the vendor providing the Spark environment could already be loading the package for you.

8.2 Reading data

This section will introduce several techniques that improve the speed and efficiency of reading data. If new to Spark and sparklyr, it is highly recommended to review this section before starting work with large data sets.

8.2.1 Folders as a table

Loading multiple files into a single data object is a common scenario. In R, we typically use a loop or functional programming to accomplish this.

lapply(c("data-folder/file1.csv", "data-folder/file2.csv"), read.csv)

In Spark, there is the notion of a folder as a table. Instead of enumerating each file, simply pass the path the containing folder’s path. Spark assumes that every file in that folder is part of the same table. This implies that the target folder should only be used for data purposes.

spark_read_csv(sc, "my_data", path = "data-folder")

The folder as a table notion is also found in other open source technologies. Under the hood, Hive tables work the same way. When querying a Hive table, the mapping is done over multiple files inside the same folder. The folder’s name usually match the name of the table visible to the user.

8.2.2 File layout

When reading data, Spark is able to determine the data source’s column names and types. This comes at a cost. To determine the type Spark has to do an initial pass on the data, and then assign a type. For large data, this may add a significant amount of time to the data ingestion process. This can become costly even for medium size data loads. For files that are read over and over again, the additional read time accumulates over time.

Spark allows the user to provide a column layout. If provided, Spark will bypass the step that it uses to determine the file’s layout. In sparklyr, the column argument is used to take advantage of this functionality. The infer_schema argument also needs to be set to FALSE. This arguments is the switch that indicates if the column argument should be used.

For example, a file called test.csv is going to be loaded to Spark. This is its layout:

"x","y"
"a",1
"b",2
"c",3
"d",4
"e",5

The column spec is started with a vector containing the column types. The vector’s values are named to match the field names.

col_spec_1 <- c("character", "numeric")
names(col_spec_1) <- c("x", "y")
col_spec_1
##           x           y 
## "character"   "numeric" 

The accepted variable types are:

  • integer

  • character

  • logical

  • double

  • numeric

  • factor

  • Date

  • POSIXct

In spark_read_csv(), col_spec_1 is passed to the columns argument, and infer_schema is set to FALSE.

sc <- spark_connect(master = "local")
test_1 <- spark_read_csv(sc, "test1","test.csv", 
                         columns = col_spec_1, 
                         infer_schema = FALSE)
test_1
## # Source: spark<test1> [?? x 2]
##    x         y
##    <chr> <dbl>
##  1 a         1
##  2 b         2
##  3 c         3
##  4 d         4
##  5 e         5

In the example we tried to match the names and types of the original file. The ability to pass a column spec provides additional flexibility. The following example shows how to set the field type to something different. The new field type needs a compatible type. For example, a character field could not be set tp numeric. The example also changes the names of the fields.

col_spec_2 <- c("character", "character")
names(col_spec_2) <- c("my_letter", "my_number")

test_2 <- spark_read_csv(sc, "test2","test.csv", 
                         columns = col_spec_2, 
                         infer_schema = FALSE)
test_2
# Source: spark<test2> [?? x 2]
   my_letter my_number
   <chr>     <chr>    
 1 a         1        
 2 b         2        
 3 c         3        
 4 d         4        
 5 e         5    

The ability to change the field type can be very useful. Malformed entries can cause error during reading. This is common in non-character fields. The practical approach is to import the field as a character field, and then use dplyr to coerce the field’s conversion.

8.2.3 Spark memory

Spark copies the data into its distributed memory. This makes analyses and other processes very fast. There are cases where loading all of the data may not be practical, or necessary. For those cases, Spark can then just “map” the files without copying data into memory.

The mapping creates a sort of “virtual” table in Spark memory. The implication is that when a query runs against that table, Spark has to read the data from the files at that time. Any consecutive read after that will do the same. In effect, Spark becomes a pass-through for the data. The advantage of this method is that there is almost no up-front time cost to “reading” the file. The mapping process is comparatively fast.

In sparklyr, that is controlled by the memory argument of its read functions. Setting it to FALSE prevents the data copy. It defaults to TRUE.

mapped_test <- spark_read_csv(sc, "test","test.csv", memory = FALSE)

There are good use cases for this method. One of them is when not all columns of a table are needed. For example, take a very large file that contain many columns. This is not first time we interact with this data. We know what columns are needed for the analysis. The files can be read using memory = FALSE, and then select the needed columns with dplyr. The resulting dplyr variable can then be cached into memory, using the compute() function. This will make Spark query the file(s), pull the selected fields, and copy only that data into memory. The result is a in-memory table that took comparatively less time to ingest.

mapped_test %>%
  select(y) %>%
  compute("test")

8.2.4 Column Names

By default, sparklyr sanitizes column names. It translates characters such as . to _. This was required in Spark 1.6.X. To disable this functionality, you can run the following code:

options(sparklyr.sanitize.column.names = FALSE)
dplyr::copy_to(sc, iris, overwrite = TRUE)
# Source:   table<iris> [?? x 5]
# Database: spark_connection
   Sepal.Length Sepal.Width Petal.Length Petal.Width Species
          <dbl>       <dbl>        <dbl>       <dbl> <chr>  
 1          5.1         3.5          1.4         0.2 setosa 
 2          4.9         3            1.4         0.2 setosa 
 3          4.7         3.2          1.3         0.2 setosa 
 4          4.6         3.1          1.5         0.2 setosa 
 5          5           3.6          1.4         0.2 setosa 
 6          5.4         3.9          1.7         0.4 setosa 
 7          4.6         3.4          1.4         0.3 setosa 
 8          5           3.4          1.5         0.2 setosa 
 9          4.4         2.9          1.4         0.2 setosa 
10          4.9         3.1          1.5         0.1 setosa 
# ... with more rows

8.3 Writing Data

Some projects require that new data generated in Spark to be written back to a remote source. For example, the data could be new predicted values returned by a Spark model. The job processes the mass generation of predictions, and the predictions need to be stored. This section will cover recommendations when working on such projects.

8.3.1 Spark, not R, as pass-through

Avoid collecting data in R to then upload it to the target (see Figure 8.1) That seems to be the first approach attempted by new users. It may look like a faster alternative. Performance wise, it is not faster. Additionally, this approach will not scale properly. The data will eventually grow to the point where R cannot handle being the middle point.

FIGURE 8.1: Avoid using R as a pass through

All efforts should be made to have Spark connect to the target location. This way, reading, processing and writing all happens within the same Spark session.

FIGURE 8.2: Spark as a pass through

8.3.2 Practical approach

Consider the following use scenario: A Spark job just processed predictions for a large data set. The data size of only the predictions are also considerable. Choosing a method to write results will depend on infrastructure.

For example, Spark and the target location share the same infrastructure. For example, Spark and the target Hive table are in the same cluster. Copying the results is not a problem. The data transfer is between RAM and disk of the same cluster.

A contrasting example, Spark and the target location are not in the same infrastructure. There are two options, choosing one will depend on the size of the data, and network speed:

  • Spark connects to the remote target location, and copy the new data If this is done within the same Data Center, or cloud provider, the data transfer could be fast enough to have Spark write the data directly.

  • Spark writes the results locally, and transfer the results via a third-party application For example, Spark could write the results into CSV files, and then have a separate job copy the files over via FTP. In the target location, use a separate process to transfer the data into the target location. Spark, R, and any other technology are tools. It is best to recognize that one tool cannot, and should not be expected to, do everything.

8.4 Date & time

Some Spark date/time functions make timezone assumptions. For instance, the following code makes use of to_date(). It assumes that the timestamp will be given in the local time zone. This is not to discourage use of date/time functions. Please be aware of time zones to be handled with care.

sdf_len(sc, 1) %>%
  transmute(
    date = timestamp(1419126103) %>% 
      from_utc_timestamp('UTC') %>% 
      to_date() %>% 
      as.character()
  )

8.5 Specific types and protocols

This section will cover techniques to help you interface with some of the most popular data types and protocols.

8.5.1 Amazon S3

Amazon Simple Storage Service, or S3, has become a common location to store file. Spark is able to directly access S3. This functionality can be used inside sparklyr. There are three key items to have, or use, when working with data from an S3 bucket:

  • AWS Credentials - They are required by the S3 service, even for publicly accessible buckets.
  • Hadoop-to-AWS package - It is loaded at connection time.
  • A bucket location - The recommended file system to use is “s3a”.

There are multiple ways to set the credentials to use to access the bucket. Please refer to the official documentation for more information. It is found in the Apache Spark official site (“Spark Integration with Cloud Infrastructures” 2019).

The easiest way is to set the credentials using Environment variables. Choose a secure way to load the values into variables in R, and then load them into the appropriate Environment variable name. In case show below, AWS_ACCESS_KEY_ID and AWS_SECRET_ACCESS_KEY.

Sys.setenv(AWS_ACCESS_KEY_ID = my_key_id)
Sys.setenv(AWS_SECRET_ACCESS_KEY = my_secret_key)

Spark requires an integration package in order to access Amazon S3 buckets. Interestingly, the package is not a Spark package, it is a Hadoop package. This means that the selected version will be a Hadoop version. After some experiments, it seems that with Spark versions 2, only up to Hadoop 2.7.7 will work. That may change when Spark enters version 3. If using a YARN managed cluster, the package may be different. That would depend on the Hadoop vendor. The official site for Apache based project is called Maven (“Maven Repository: Home Page” 2019). Please visit that site to find alternative package versions if the recommended one does not work. The recommended search term to use would be: “hadoop-aws”.

conf <- spark_config()
conf$sparklyr.defaultPackages <- "org.apache.hadoop:hadoop-aws:2.7.7"

sc <- spark_connect(master = "local", config = conf)

For the file system prefix use “s3a”. There are other options, such as “s3” and “s3n”. As per the Hadoop documents, the “s3a” file system should be the default selection.

my_file <- spark_read_csv(sc, "my-file", path =  "s3a://my-bucket/my-file.csv")

8.5.2 SQL

The sparklyr package provides a DBI compliant interface (???). This means that DBI functions can be used to interact with data via Spark. This includes non-SQL sources that are accessible via or cached in Spark.

library(sparklyr)
library(dplyr)
sc <- spark_connect(master = "local")
cars <- copy_to(sc, mtcars, "remote_mtcars")

DBI::dbGetQuery(sc, "SELECT * FROM remote_mtcars LIMIT 5")
##    mpg cyl disp  hp drat    wt  qsec vs am gear carb
## 1 21.0   6  160 110 3.90 2.620 16.46  0  1    4    4
## 2 21.0   6  160 110 3.90 2.875 17.02  0  1    4    4
## 3 22.8   4  108  93 3.85 2.320 18.61  1  1    4    1
## 4 21.4   6  258 110 3.08 3.215 19.44  1  0    3    1
## 5 18.7   8  360 175 3.15 3.440 17.02  0  0    3    2

8.5.3 Hive

In YARN managed clusters, Spark provides a deeper integration with Apache Hive. Hive tables are easily accessible after opening a Spark connection.

sc <- spark_connect(master = "yarn-client", 
                    spark_home = "/usr/lib/spark/",
                    version = "2.1.0",
                    config = conf)

Accessing a Hive table’s data can be done with a simple reference. Using DBI, a table can be referenced within a SQL statement.

DBI::dbSendQuery(sc, "SELECT * FROM table limit 10")

Another way to reference a table is with dplyr. The tbl() function, creates a pointer to the table.

library(dplyr)

t <- tbl(sc, "table")

It is important to reiterate that no data is imported into R, the tbl() function creates only a reference. The expectation is that there will be more dplyr verbs following the tbl() command.

t %>%
  group_by(field1) %>%
  summarise(totals = sum(field2))
8.5.3.0.1 Database selection

Hive table references assume a default database source. Often, the table needed table is in a different database within the Metastore. To access it using SQL, prefix the database name to the table. Separate them using a period.

DBI::dbSendQuery(sc, "SELECT * FROM databasename.table")

In dplyr, the in_schema() function can be used. The function is used inside the tbl() call.

tbl(sc, in_schema("databasename", "table"))

In sparklyr, the tbl_change_db() function sets the current session’s default database. Any subsequent call via DBI or dplyr will use the selected name as the default database.

tbl_change_db(sc, "databasename")

8.5.4 Comma Delimited Values (CSV)

The CSV format may be the most common file type in use today. Spark offers a couple of techniques to help you troubleshoot issues when reading these kinds of files.

Spark offers the following modes for addressing parsing issues:

  • PERMISSIVE: NULLs are inserted for missing tokens.

  • DROPMALFORMED: Drops lines which are malformed.

  • FAILFAST: Aborts if encounters any malformed line.

These can be used in sparklyr by passing them inside the options argument. The following example creates a file with a broken entry. It then shows how it can be read into Spark.

library(sparklyr)
sc <- spark_connect(master = "local")

## Creates bad test file
writeLines(c("bad", 1, 2, 3, "broken"), "bad.csv")

spark_read_csv(
  sc,
  "bad3",
  "bad.csv",
  columns = list(foo = "integer"),
  infer_schema = FALSE,
  options = list(mode = "DROPMALFORMED"))
## # Source: spark<bad3> [?? x 1]
##     foo
##   <int>
## 1     1
## 2     2
## 3     3

Spark 2 provides an issue tracking column. The column is hidden by default. To enable it, add _corrupt_record to the columns list. This can be combines with the use of the PERMISIVE mode. All rows will be imported, invalid entries will receive an NA, and the issue tracked in the _corrupt_record column.

spark_read_csv(
  sc,
  "bad2",
  "bad.csv",
  columns = list(foo = "integer", "_corrupt_record" = "character"),
  infer_schema = FALSE,
  options = list(mode = "PERMISIVE")
)
## # Source: spark<bad2> [?? x 2]
##     foo `_corrupt_record`
##   <int> <chr>            
## 1     1 NA               
## 2     2 NA               
## 3     3 NA               
## 4    NA broken  

8.6 Recap

In the next chapter, Tuning, you will learn in-detail how Spark works and use this knowledge to optimize it’s resource usage and performance.

References

“Spark Integration with Cloud Infrastructures.” 2019. https://spark.apache.org/docs/latest/cloud-integration.html.

“Maven Repository: Home Page.” 2019. https://mvnrepository.com.