Chapter 14 Appendix

14.2 Introduction

14.3 Getting Started

14.3.1 Prerequisites

14.3.1.1 Installing R

From r-project.org, download and launch the R installer for your platform, Windows, Macs or Linux available.

The R Project for Statistical Computing

FIGURE 14.1: The R Project for Statistical Computing

14.3.1.2 Installing Java

From java.com/download, download and launch the installer for your platform, Windows, Macs or Linux are also available.

Java Download Page

FIGURE 14.2: Java Download Page

Starting with Spark 2.1, Java 8 is required; however, previous versions of Spark support Java 7. Regardless, we recommend installing Java Runtime Engine 8, or JRE 8 for short.

Note: For advanced readers that are already using the Java Development Kit, JDK for short. Please notice that JDK 9+ is currently unsupported so you will need to downgrade to JDK 8 by uninstalling JDK 9+ or by setting JAVA_HOME appropiately.

14.3.1.3 Installing RStudio

While installing RStudio is not strictly required to work with Spark with R, it will make you much more productive and therefore, I would recommend you take the time to install RStudio from rstudio.com/download, then download and launch the installer for your platform: Windows, Macs or Linux.

RStudio Downloads Page

FIGURE 14.3: RStudio Downloads Page

After launching RStudio, you can use RStudio’s console panel to execute the code provided in this chapter.

14.3.1.4 Using RStudio

If you are not familiar with RStudio, you should make note of the following panes:

  • Console: A standalone R console you can use to execute all the code presented in this book.
  • Packages: This pane allows you to install sparklyr with ease, check its version, navigate to the help contents, etc.
  • Connections: This pane allows you to connecto to Spark, manage your active connection and view the available datasets.
RStudio Overview

FIGURE 14.4: RStudio Overview

14.4 Analysis

14.4.1 Hive Functions

Name Description
size(Map<K.V>) Returns the number of elements in the map type.
size(Array) Returns the number of elements in the array type.
map_keys(Map<K.V>) Returns an unordered array containing the keys of the input map.
map_values(Map<K.V>) Returns an unordered array containing the values of the input map.
array_contains(Array, value) Returns TRUE if the array contains value.
sort_array(Array) Sorts the input array in ascending order according to the natural ordering of the array elements and returns it
binary(string or binary) Casts the parameter into a binary.
cast(expr as ‘type’) Converts the results of the expression expr to the given type.
from_unixtime(bigint unixtime[, string format]) Converts the number of seconds from unix epoch (1970-01-01 00:00:00 UTC) to a string.
unix_timestamp() Gets current Unix timestamp in seconds.
unix_timestamp(string date) Converts time string in format yyyy-MM-dd HH:mm:ss to Unix timestamp (in seconds).
to_date(string timestamp) Returns the date part of a timestamp string.
year(string date) Returns the year part of a date.
quarter(date/timestamp/string) Returns the quarter of the year for a date.
month(string date) Returns the month part of a date or a timestamp string.
day(string date) dayofmonth(date) Returns the day part of a date or a timestamp string.
hour(string date) Returns the hour of the timestamp.
minute(string date) Returns the minute of the timestamp.
second(string date) Returns the second of the timestamp.
weekofyear(string date) Returns the week number of a timestamp string.
extract(field FROM source) Retrieve fields such as days or hours from source. Source must be a date, timestamp, interval or a string that can be converted into either a date or timestamp.
datediff(string enddate, string startdate) Returns the number of days from startdate to enddate.
date_add(date/timestamp/string startdate, tinyint/smallint/int days) Adds a number of days to startdate.
date_sub(date/timestamp/string startdate, tinyint/smallint/int days) Subtracts a number of days to startdate.
from_utc_timestamp({any primitive type} ts, string timezone) Converts a timestamp in UTC to a given timezone.
to_utc_timestamp({any primitive type} ts, string timezone) Converts a timestamp in a given timezone to UTC.
current_date Returns the current date.
current_timestamp Returns the current timestamp.
add_months(string start_date, int num_months, output_date_format) Returns the date that is num_months after start_date.
last_day(string date) Returns the last day of the month which the date belongs to.
next_day(string start_date, string day_of_week) Returns the first date which is later than start_date and named as day_of_week.
trunc(string date, string format) Returns date truncated to the unit specified by the format.
months_between(date1, date2) Returns number of months between dates date1 and date2.
date_format(date/timestamp/string ts, string fmt) Converts a date/timestamp/string to a value of string in the format specified by the date format fmt.
if(boolean testCondition, T valueTrue, T valueFalseOrNull) Returns valueTrue when testCondition is true, returns valueFalseOrNull otherwise.
isnull( a ) Returns true if a is NULL and false otherwise.
isnotnull ( a ) Returns true if a is not NULL and false otherwise.
nvl(T value, T default_value) Returns default value if value is null else returns value.
COALESCE(T v1, T v2, …) Returns the first v that is not NULL, or NULL if all v’s are NULL.
CASE a WHEN b THEN c [WHEN d THEN e]* [ELSE f] END When a = b, returns c; when a = d, returns e; else returns f.
nullif( a, b ) Returns NULL if a=b; otherwise returns a.
assert_true(boolean condition) Throw an exception if ‘condition’ is not true, otherwise return null.
ascii(string str) Returns the numeric value of the first character of str.
base64(binary bin) Converts the argument from binary to a base 64 string.
character_length(string str) Returns the number of UTF-8 characters contained in str.
chr(bigint double A)
concat(string binary A, string
context_ngrams(array<array>, array, int K, int pf) Returns the top-k contextual N-grams from a set of tokenized sentences.
concat_ws(string SEP, string A, string B…) Like concat() above, but with custom separator SEP.
decode(binary bin, string charset) Decodes the first argument into a String using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’). If either argument is null, the result will also be null.
elt(N int,str1 string,str2 string,str3 string,…) Return string at index number, elt(2,‘hello’,‘world’) returns ‘world’.
encode(string src, string charset) Encodes the first argument into a BINARY using the provided character set (one of ‘US-ASCII’, ‘ISO-8859-1’, ‘UTF-8’, ‘UTF-16BE’, ‘UTF-16LE’, ‘UTF-16’).
field(val T,val1 T,val2 T,val3 T,…) Returns the index of val in the val1,val2,val3,… list or 0 if not found.
find_in_set(string str, string strList) Returns the first occurance of str in strList where strList is a comma-delimited string.
format_number(number x, int d) Formats the number X to a format like '#,###,###.##', rounded to D decimal places, and returns the result as a string. If D is 0, the result has no decimal point or fractional part.
get_json_object(string json_string, string path) Extracts json object from a json string based on json path specified, and returns json string of the extracted json object.
in_file(string str, string filename) Returns true if the string str appears as an entire line in filename.
instr(string str, string substr) Returns the position of the first occurrence of substr in str.
length(string A) Returns the length of the string.
locate(string substr, string str[, int pos]) Returns the position of the first occurrence of substr in str after position pos.
lower(string A) lcase(string A) Returns the string resulting from converting all characters of B to lower case.
lpad(string str, int len, string pad) Returns str, left-padded with pad to a length of len. If str is longer than len, the return value is shortened to len characters.
ltrim(string A) Returns the string resulting from trimming spaces from the beginning(left hand side) of A.
ngrams(array<array>, int N, int K, int pf) Returns the top-k N-grams from a set of tokenized sentences, such as those returned by the sentences() UDAF.
octet_length(string str) Returns the number of octets required to hold the string str in UTF-8 encoding.
parse_url(string urlString, string partToExtract [, string keyToExtract]) Returns the specified part from the URL. Valid values for partToExtract include HOST, PATH, QUERY, REF, PROTOCOL, AUTHORITY, FILE, and USERINFO.
printf(String format, Obj… args) Returns the input formatted according do printf-style format strings.
regexp_extract(string subject, string pattern, int index) Returns the string extracted using the pattern.
regexp_replace(string INITIAL_STRING, string PATTERN, string REPLACEMENT) Returns the string resulting from replacing all substrings in INITIAL_STRING that match the java regular expression syntax defined in PATTERN with instances of REPLACEMENT.
repeat(string str, int n) Repeats str n times.
replace(string A, string OLD, string NEW) Returns the string A with all non-overlapping occurrences of OLD replaced with NEW.
reverse(string A) Returns the reversed string.
rpad(string str, int len, string pad) Returns str, right-padded with pad to a length of len.
rtrim(string A) Returns the string resulting from trimming spaces from the end(right hand side) of A.
sentences(string str, string lang, string locale) Tokenizes a string of natural language text into words and sentences, where each sentence is broken at the appropriate sentence boundary and returned as an array of words.
space(int n) Returns a string of n spaces.
split(string str, string pat) Splits str around pat (pat is a regular expression).
str_to_map(text[, delimiter1, delimiter2]) Splits text into key-value pairs using two delimiters. Delimiter1 separates text into K-V pairs, and Delimiter2 splits each K-V pair. Default delimiters are ‘,’ for delimiter1 and ‘:’ for delimiter2.
substr(string binary A, int start)
substring_index(string A, string delim, int count) Returns the substring from string A before count occurrences of the delimiter delim.
translate(string char
trim(string A) Returns the string resulting from trimming spaces from both ends of A.
unbase64(string str) Converts the argument from a base 64 string to BINARY.
upper(string A) ucase(string A) Returns the string resulting from converting all characters of A to upper case. For example, upper(‘fOoBaR’) results in ‘FOOBAR’.
initcap(string A) Returns string, with the first letter of each word in uppercase, all other letters in lowercase. Words are delimited by whitespace.
levenshtein(string A, string B) Returns the Levenshtein distance between two strings.
soundex(string A) Returns soundex code of the string.
mask(string str[, string upper[, string lower[, string number]]]) Returns a masked version of str.
mask_first_n(string str[, int n]) Returns a masked version of str with the first n values masked. mask_first_n(“1234-5678-8765-4321”, 4) results in nnnn-5678-8765-4321.
mask_last_n(string str[, int n]) Returns a masked version of str with the last n values masked.
mask_show_first_n(string str[, int n]) Returns a masked version of str, showing the first n characters unmasked.
mask_show_last_n(string str[, int n]) Returns a masked version of str, showing the last n characters unmasked.
mask_hash(string char
java_method(class, method[, arg1[, arg2..]]) Synonym for reflect.
reflect(class, method[, arg1[, arg2..]]) Calls a Java method by matching the argument signature, using reflection.
hash(a1[, a2…]) Returns a hash value of the arguments.
current_user() Returns current user name from the configured authenticator manager.
logged_in_user() Returns current user name from the session state.
current_database() Returns current database name.
md5(string/binary) Calculates an MD5 128-bit checksum for the string or binary.
sha1(string/binary)sha(string/binary) Calculates the SHA-1 digest for string or binary and returns the value as a hex string.
crc32(string/binary) Computes a cyclic redundancy check value for string or binary argument and returns bigint value.
sha2(string/binary, int) Calculates the SHA-2 family of hash functions (SHA-224, SHA-256, SHA-384, and SHA-512).
aes_encrypt(input string/binary, key string/binary) Encrypt input using AES.
aes_decrypt(input binary, key string/binary) Decrypt input using AES.
version() Returns the Hive version.
count(expr) Returns the total number of retrieved rows.
sum(col), sum(DISTINCT col) Returns the sum of the elements in the group or the sum of the distinct values of the column in the group.
avg(col), avg(DISTINCT col) Returns the average of the elements in the group or the average of the distinct values of the column in the group.
min(col) Returns the minimum of the column in the group.
max(col) Returns the maximum value of the column in the group.
variance(col), var_pop(col) Returns the variance of a numeric column in the group.
var_samp(col) Returns the unbiased sample variance of a numeric column in the group.
stddev_pop(col) Returns the standard deviation of a numeric column in the group.
stddev_samp(col) Returns the unbiased sample standard deviation of a numeric column in the group.
covar_pop(col1, col2) Returns the population covariance of a pair of numeric columns in the group.
covar_samp(col1, col2) Returns the sample covariance of a pair of a numeric columns in the group.
corr(col1, col2) Returns the Pearson coefficient of correlation of a pair of a numeric columns in the group.
percentile(BIGINT col, p) Returns the exact pth percentile of a column in the group (does not work with floating point types). p must be between 0 and 1.
percentile(BIGINT col, array(p1 [, p2]…)) Returns the exact percentiles p1, p2, … of a column in the group. pi must be between 0 and 1.
percentile_approx(DOUBLE col, p [, B]) Returns an approximate pth percentile of a numeric column (including floating point types) in the group. The B parameter controls approximation accuracy at the cost of memory. Higher values yield better approximations, and the default is 10,000. When the number of distinct values in col is smaller than B, this gives an exact percentile value.
percentile_approx(DOUBLE col, array(p1 [, p2]…) [, B]) Same as above, but accepts and returns an array of percentile values instead of a single one.
regr_avgx(independent, dependent) Equivalent to avg(dependent).
regr_avgy(independent, dependent) Equivalent to avg(independent).
regr_count(independent, dependent) Returns the number of non-null pairs used to fit the linear regression line.
regr_intercept(independent, dependent) Returns the y-intercept of the linear regression line, i.e. the value of b in the equation dependent = a * independent + b.
regr_r2(independent, dependent) Returns the coefficient of determination for the regression.
regr_slope(independent, dependent) Returns the slope of the linear regression line, i.e. the value of a in the equation dependent = a * independent + b.
regr_sxx(independent, dependent) Equivalent to regr_count(independent, dependent) * var_pop(dependent).
regr_sxy(independent, dependent) Equivalent to regr_count(independent, dependent) * covar_pop(independent, dependent).
regr_syy(independent, dependent) Equivalent to regr_count(independent, dependent) * var_pop(independent).
histogram_numeric(col, b) Computes a histogram of a numeric column in the group using b non-uniformly spaced bins. The output is an array of size b of double-valued (x,y) coordinates that represent the bin centers and heights
collect_set(col) Returns a set of objects with duplicate elements eliminated.
collect_list(col) Returns a list of objects with duplicates.
ntile(INTEGER x) Divides an ordered partition into x groups called buckets and assigns a bucket number to each row in the partition. This allows easy calculation of tertiles, quartiles, deciles, percentiles and other common summary statistics.
explode(ARRAY a) Explodes an array to multiple rows. Returns a row-set with a single column (col), one row for each element from the array.
explode(MAP<Tkey,Tvalue> m) Explodes a map to multiple rows. Returns a row-set with a two columns (key,value) , one row for each key-value pair from the input map.
posexplode(ARRAY a) Explodes an array to multiple rows with additional positional column of int type (position of items in the original array, starting with 0). Returns a row-set with two columns (pos,val), one row for each element from the array.
inline(ARRAY<STRUCT<f1:T1,…,fn:Tn>> a) Explodes an array of structs to multiple rows. Returns a row-set with N columns (N = number of top level elements in the struct), one row per struct from the array.
stack(int r,T1 V1,…,Tn/r Vn) Breaks up n values V1,…,Vn into r rows. Each row will have n/r columns. r must be constant.
json_tuple(string jsonStr,string k1,…,string kn) Takes JSON string and a set of n keys, and returns a tuple of n values.
parse_url_tuple(string urlStr,string p1,…,string pn) Takes URL string and a set of n URL parts, and returns a tuple of n values.

14.5 Modeling

14.5.1 MLlib Functions

The following table exhibits the ML algorithms supported in sparklyr:

14.5.1.1 Classification

Algorithm Function
Decision Trees ml_decision_tree_classifier()
Gradient-Boosted Trees ml_gbt_classifier()
Linear Support Vector Machines ml_linear_svc()
Logistic Regression ml_logistic_regression()
Multilayer Perceptron ml_multilayer_perceptron_classifier()
Naive-Bayes ml_naive_bayes()
One vs Rest ml_one_vs_rest()
Random Forests ml_random_forest_classifier()

14.5.1.2 Regression

Algorithm Function
Accelerated Failure Time Survival Regression ml_aft_survival_regression()
Decision Trees ml_decision_tree_regressor()
Generalized Linear Regression ml_generalized_linear_regression()
Gradient-Boosted Trees ml_gbt_regressor()
Isotonic Regression ml_isotonic_regression()
Linear Regression ml_linear_regression()

14.5.1.3 Clustering

Algorithm Function
Bisecting K-Means Clustering ml_bisecting_kmeans()
Gaussian Mixture Clustering ml_gaussian_mixture()
K-Means Clustering ml_kmeans()
Latent Dirichlet Allocation ml_lda()

14.5.1.4 Recommendation

Algorithm Function
Alternating Least Squares Factorization ml_als()

14.5.1.5 Frequent Pattern Mining

Algorithm Function
FPGrowth ml_fpgrowth()

14.5.1.6 Feature Transformers

Transformer Function
Binarizer ft_binarizer()
Bucketizer ft_bucketizer()
Chi-Squared Feature Selector ft_chisq_selector()
Vocabulary from Document Collections ft_count_vectorizer()
Discrete Cosine Transform ft_discrete_cosine_transform()
Transformation using dplyr ft_dplyr_transformer()
Hadamard Product ft_elementwise_product()
Feature Hasher ft_feature_hasher()
Term Frequencies using Hashing export(ft_hashing_tf)
Inverse Document Frequency ft_idf()
Imputation for Missing Values export(ft_imputer)
Index to String ft_index_to_string()
Feature Interaction Transform ft_interaction()
Rescale to [-1, 1] Range ft_max_abs_scaler()
Rescale to [min, max] Range ft_min_max_scaler()
Locality Sensitive Hashing ft_minhash_lsh()
Converts to n-grams ft_ngram()
Normalize using the given P-Norm ft_normalizer()
One-Hot Encoding ft_one_hot_encoder()
Feature Expansion in Polynomial Space ft_polynomial_expansion()
Maps to Binned Categorical Features ft_quantile_discretizer()
SQL Transformation ft_sql_transformer()
Standardizes Features using Corrected STD ft_standard_scaler()
Filters out Stop Words ft_stop_words_remover()
Map to Label Indices ft_string_indexer()
Splits by White Spaces ft_tokenizer()
Combine Vectors to Row Vector ft_vector_assembler()
Indexing Categorical Feature ft_vector_indexer()
Subarray of the Original Feature ft_vector_slicer()
Transform Word into Code ft_word2vec()

14.7 Streaming

14.7.1 Stream Generator

The stream_generate_test() function creates a local test stream. This function works independently from a Spark connection. The following example will create five files in sub-folder called “source”. The files will be created one second apart from the previous file’s creation.

After the function completes, all of the files should show up in the “source” folder. Notice that the file size vary. This is so that it simulates what a true stream would do.

##                     size
## source/stream_1.csv   44
## source/stream_2.csv  121
## source/stream_3.csv  540
## source/stream_4.csv 2370
## source/stream_5.csv 7236

The stream_generate_test() by default will create a single numeric variable data frame.

## # A tibble: 1,489 x 1
##        x
##    <dbl>
##  1   630
##  2   631
##  3   632
##  4   633
##  5   634
##  6   635
##  7   636
##  8   637
##  9   638
## 10   639
## # ... with 1,479 more rows

14.7.2 Installing Kafka

This instructions were put together using information from the official Kafka site. Specifically from the Quickstart page, and at the time of writing this book. Newer versions of Kafka undoubtely will be available when reading this book. The idea is to “timestamp” the versions used in the example in the Streaming chapter.

  1. Download Kafka

    wget http://apache.claz.org/kafka/2.2.0/kafka_2.12-2.2.0.tgz
  2. Expand the tar file and enter the new folder

    tar -xzf kafka_2.12-2.2.0.tgz
    cd kafka_2.12-2.2.0
  3. Start the Zookeeper service that comes with Kafka

    bin/zookeeper-server-start.sh config/zookeeper.properties
  4. Start the Kafka service

    bin/kafka-server-start.sh config/server.properties

Make sure to always start Zookeper first, and then Kafka.