# Chapter 12 Streaming

“Our stories aren’t over yet.”

— Arya Stark

We can look back at the previous eleven chapters and think, what else are we missing? We’ve done everything from analysing tabular datasets, performed unsupervised learning over raw text, analyzed graphs and geographic datasets and even transformed data with custom R code! So now what?

We were not explicit about this, but we’ve always assumed your data is static, we assumed it doesn’t change over time. But suppose for a moment your job is to analyze traffic patterns to give recommendations to the department of transportation. A very reasonable approach would be to analyze historical data and then design predictive models that compute forecasts overnight. Overnight? That’s very useful but traffic patterns change to the hour and even by the minute. You could try to preprocess and predict faster and faster, but eventually this model breaks – you can’t load large-scale datasets, transform them, score them, unload them and repeat this process by the second.

Instead, we need to introduce a different kind of dataset, one that is not static but rather dynamic; one that is like a table but is growing constantly, we will refer to such datasets as streams.

## 12.1 Overview

We know how to work with large-scale static datasets, all the previous chapters dealt with them; but how should we reason about large-scale real-time datasets? We will define as streams – datasets with an infinite amount entries.

For instance, if we wanted to do real-time scoring using a pre-trained topic model, the entries would be lines of text for static datasets; for real-time datasets, we would perform the same scoring over an infinite amount of lines of text. Now, in practice, you will never process an infinite amount of records, you will eventually stop the stream – or this universe might end, whatever comes first – regardless, thinking of them as infinite datasets makes it so much easier to reason about them.

Streams are most relevant when processing real time data; for example, when analyzing a Twitter feed or stock prices. Both examples have well defined columns, like ‘tweet’ or ‘price’, but there are always new rows of data to be analyzed.

Spark Structured Streams provide scalable and fault-tolerant data processing over streams of data. That means, one can use many machines to process multiple streaming sources, perform joins with other streams or static sources, and recover from failures with at-least-once guarantees (each message is certain to be delivered, but may do so multiple times).

In Spark, you create streams by defining a source, a transformation and a sink; you can think of these steps as reading, transforming and writing a stream, which Figure ?? describes.

Streams read data using any of the stream_read_*() functions, the read operation defines the source of the stream; you can define one or multiple sources from which to read from.
Transforming
A stream can perform one or multiple transformation using dplyr, SQL, feature transformers, scoring pipelines or distributed R code. Transformations can join against streaming sources or static datasets.
Writing
The write operations are performed with the family of stream_write_*() functions, the read operation defined the sink of the stream. You can specify a single sink to write data to or multiple ones.

You can read and write to streams in several different file formats: CSV, JSON, Parquet, ORC and text; and form Kafka which we will introduce later on.

Memory stream_write_memory

Since the transformation step is optional, the simplest stream we can define is one that continuously copies text files between source and destination.

First, install the future package with install.packages("future") and connect to Spark:

Since a stream requires the source to exist, create a source folder:

dir.create("source")

We are now ready to define our first stream!

stream <- stream_read_text(sc, "source/") %>%
stream_write_text("destination/")

The streams starts running with stream_write_*(); once executed, the stream will monitor the source path and process data into the destination/ path as it arrives.

We can use stream_generate_test() to produce a file every second containing lines of text that follow a given distribution, you can read more about this in the Appendix. In practice, you would connect to existing sources without having to generate data artificially. We can then use view_stream() to track the rows per second (rps) being processed in the source, destination and their latest values over time. The result is shown in Figure .

future::future(stream_generate_test(interval = 0.5))
stream_view(stream)

Notice that the rows-per-second in the destination stream are higher than the rows-per-second in the source stream; this is expected and desirable since Spark measures incoming rates from the source, but actual row processing times in the destination stream. For example, if 10 rows-per-second are written to the source/ path, the incoming rate is 10 RPS. However, if it takes Spark only 0.01 seconds to write all those 10 rows, the output rate is 100 RPS.

Use stream_stop() to properly stop processing data from this stream.

stream_stop(stream)

This exercise introduced how we can easily start a Spark stream that reads and writes data based on a simulated stream, let’s do something more interesting than just copying data with proper transformations.

## 12.2 Transformations

In a real life scenario, the incoming data from a stream would not be written as-is to the output. The Spark Stream job would make transformations to the data, and then write the transformed data.

Streams can be transformed using dplyr, SQL queries, ML Pipelines or R code. We can use as many transformations as needed in the same way that Spark data frames can be transformed with sparklyr.

The source of the transformation can be a stream or data frame, but the output is always a stream. If needed, one can always take a snapshot from the destination stream, and then save the output as a data frame. That is what sparklyr will do for you if a destination stream is not specified.

Each sub-section will cover an option provided by sparklyr to perform transformations on a stream.

### 12.2.1 Analysis

You can analyze streams with dplyr verbs and SQL using DBI, as a quick example, we will filter rows and add columns over a stream. We won’t explicitly call stream_generate_test(), but you can call it on your own through the later package if you feel the urge to verify that data is being processed continuously.

library(dplyr)

filter(x > 700) %>%
mutate(y = round(x / 100))
# Source: spark<?> [inf x 2]
x     y
<int> <dbl>
1   701     7
2   702     7
3   703     7
4   704     7
5   705     7
6   706     7
7   707     7
8   708     7
9   709     7
10   710     7
# … with more rows

It is also possible to perform aggregations over the entire history of the stream. The history could be filtered or not.

stream_read_csv(sc, "source") %>%
filter(x > 700) %>%
mutate(y = round(x / 100)) %>%
count(y) 
# Source: spark<?> [inf x 2]
y     n
<dbl> <dbl>
1     8 25902
2     9 25902
3    10 13210
4     7 12692

Grouped aggregations of the latest data in the stream require a timestamp. The timestamp will be of when reading function, in this case stream_read_csv() , first “saw” that specific record. In Spark stream terms, that timestamp is called a “watermark”. The spark_watermark() function is used to add the timestamp. For this exercise, the watermark will be the same for all records, since the five files were read by the stream after they were created. Please note that only Kafka and memory outputs support watermarks.

stream_read_csv(sc, "source") %>%
stream_watermark()
# Source: spark<?> [inf x 2]
x timestamp
<int> <dttm>
1   276 2019-06-30 07:14:21
2   277 2019-06-30 07:14:21
3   278 2019-06-30 07:14:21
4   279 2019-06-30 07:14:21
5   280 2019-06-30 07:14:21
6   281 2019-06-30 07:14:21
7   282 2019-06-30 07:14:21
8   283 2019-06-30 07:14:21
9   284 2019-06-30 07:14:21
10   285 2019-06-30 07:14:21
# … with more rows

After the watermark is created, it can be used in the group_by() verb. It can then be piped into a summarise() function to get some stats of the stream.

stream_read_csv(sc, "source") %>%
stream_watermark() %>%
group_by(timestamp) %>%
summarise(
max_x = max(x, na.rm = TRUE),
min_x = min(x, na.rm = TRUE),
count = n()
) 
# Source: spark<?> [inf x 4]
timestamp           max_x min_x  count
<dttm>              <int> <int>  <dbl>
1 2019-06-30 07:14:55  1000     1 259332

### 12.2.2 Modeling

Spark streams currently do not support training on reael-time datasets, aside from the technical challenges; even if it were possible, it would be quite challenging to train models since the model itself would have to adapt over time, this is known as online learning and perhaps something that Spark will support in the future.

That said, there are other modeling concepts we can use with streams, like feature transformers and scoring, lets try out a feature transformer with streams and leave scoring for the next section since we will need to train a model.

The next example makes use of the ft_bucketizer() feature transformer to modify the stream followed by regular dplyr functions which you can use just as you would with static datasets.

stream_read_csv(sc, "source") %>%
mutate(x = as.numeric(x)) %>%
ft_bucketizer("x", "buckets", splits = 0:10 * 100) %>%
count(buckets)  %>%
arrange(buckets)
# Source:     spark<?> [inf x 2]
# Ordered by: buckets
buckets     n
<dbl> <dbl>
1       0 25747
2       1 26008
3       2 25992
4       3 25908
5       4 25905
6       5 25903
7       6 25904
8       7 25901
9       8 25902
10       9 26162

### 12.2.3 Pipelines

Spark pipelines can be used for scoring streams, but not to train over streaming data. The former is fully supported while the latter is a feature under active development by the Spark community.

In order to score a stream, it is necessary to first create a our model. So lets build, fit and save a simple pipeline.

cars <- copy_to(sc, mtcars)

model <- ml_pipeline(sc) %>%
ft_binarizer("mpg", "over_30", 30) %>%
ft_r_formula(over_30 ~ wt) %>%
ml_logistic_regression() %>%
ml_fit(cars)

Tip: If you choose to, you can make use of other concepts presented in the Pipelines chapter, like saving and reloading pipelines through ml_save() and ml_load() before scoring streams.

We can then generate a stream based on mtcars using stream_generate_test() and score the model using ml_transform():

future::future(stream_generate_test(mtcars, "cars-stream", iterations = 5))

ml_transform(model, stream_read_csv(sc, "cars-stream"))
# Source: spark<?> [inf x 17]
mpg   cyl  disp    hp  drat    wt  qsec    vs    am  gear  carb over_30
<dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int> <int> <int>   <dbl>
1  15.5     8 318     150  2.76  3.52  16.9     0     0     3     2       0
2  15.2     8 304     150  3.15  3.44  17.3     0     0     3     2       0
3  13.3     8 350     245  3.73  3.84  15.4     0     0     3     4       0
4  19.2     8 400     175  3.08  3.84  17.0     0     0     3     2       0
5  27.3     4  79      66  4.08  1.94  18.9     1     1     4     1       0
6  26       4 120.     91  4.43  2.14  16.7     0     1     5     2       0
7  30.4     4  95.1   113  3.77  1.51  16.9     1     1     5     2       1
8  15.8     8 351     264  4.22  3.17  14.5     0     1     5     4       0
9  19.7     6 145     175  3.62  2.77  15.5     0     1     5     6       0
10  15       8 301     335  3.54  3.57  14.6     0     1     5     8       0
# … with more rows, and 5 more variables: features <list>, label <dbl>,
#   rawPrediction <list>, probability <list>, prediction <dbl>

Even though the example this example was put together with a few lines of code, it is actually quite impressive what we just accomplished: You copied data into Spark, performed feature engineering, traineed a model and scored the model over a real-time dataset, with seven lines of code! – Let’s try now to use custom transformations, in real-time.

### 12.2.4 Distributed R {streaming-r-code}

Arbitrary R code can also be used to transform a stream with the use of spark_apply(). This approach follows the same principles discussed in the Distributed R chapter, as in, spark_apply() runs R code over each executor in the cluster where data is available, this enables processing high-throughput streams and fulfill low-latency requirements.

stream_read_csv(sc, "cars-stream") %>%
select(mpg) %>%
spark_apply(~ round(.x), mpg = "integer") %>%
stream_write_csv("cars-round")

Which as you would expect, processes data from cars-stream into cars-round by running the custom round() R function, let’s pick into the output sink:

spark_read_csv(sc, "cars-round")
# Source: spark<carsround> [?? x 1]
mpg
<dbl>
1    16
2    15
3    13
4    19
5    27
6    26
7    30
8    16
9    20
10    15
# … with more rows

Again, make sure you apply the concepts you already know about spark_apply() when using streams; for instance, you should consider using arrow to significantly improve performance.

This was our last transformation for streams, you will now learn how to use Spark streams with Shiny, a package that makes it easy to build interactive web applications from R.

spark_disconnect(sc)

## 12.3 Kafka

Apache Kafka is an open-source stream-processing software platform developed by LinkedIn and donated to the Apache Software Foundation, written in Scala and Java. As an analogy. Kafka is to real-time storage as what Hadoop is to static storage.

Kafka stores the stream as records which consist of a key, a value and a timestamp. Kafka can handle multiple streams that contain different information by categorizing them into topic. Kafka is commonly used to connect multiple real-time applications, a producer is an application that streams data into Kafka, while a consumer is the one that reads from Kafka; in Kafka terminology, a consumer application subscribes to topics. Therefore, the most basic workflow we can accomplish with Kafka is one with a single producer and a single consumer, illustrated in Figure ??.

If you are new to Kafka, we don’t recommend you try to run the code from this section; but if you are really motivated to follow along , first you will need to install Kafka as explained in the Appendix or deploy Kafka in your cluster.

Using Kafka also requires the Kafka package when connecting to Spark, make sure is specified in your connection config:

library(sparklyr)
library(dplyr)

sc <- spark_connect(master = "local", config = list(
sparklyr.shell.packages = "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
))

Once connected, it’s straightforward to read data from a stream:

stream_read_kafka(
sc,
options = list(
kafka.bootstrap.server = "host1:9092, host2:9092",
subscribe = "<topic-name>"
)
) 

However, notice that you need to properly configure the options list; kafka.bootstrap.server expects a list of Kafka hosts, topic and subscribe define which topic should be used when writing or reading from Kafka respectively.

While we started by presenting first a simple single-producer and single-consumer use case; Kafka also allows much more complex interactions. Next we will read from one topic, process its data and then write the results to a different topic – systems that are producers and consumers from the same topic are referred as, stream processors. In Figure ??, the stream processor reads topic A, and then writes results to topic B. This allows for a given consumer application to read results instead of “raw” feed data.

There are three modes available when processing Kafka streams in Spark: complete, update and append. The complete mode will provide the totals for every group every time there is a new batch, update provides totals for only the groups that have updates in the latest batch, append adds raw records to the target topic. This append mode is not meant for aggregates, but works well for passing a filtered subset to the target topic.

In our next example, the producer will stream random letters into Kafka under a letters topic. Then Spark will act as the stream processor reading the letters topic and computing unique letter which are then written back to Kafka under the totals topic. We will use the update mode when writing back into Kafka, only the totals that changed will be sent to Kafka. This change is determined after each batch from the letters topic.

hosts  <- "localhost:9092"

read_options <- list(kafka.bootstrap.servers = hosts, subscribe = "letters")
write_options <- list(kafka.bootstrap.servers = hosts, topic = "totals")

mutate(value = as.character(value)) %>%         # coerce into a character
count(value) %>%                                # group and count letters
mutate(value = paste0(value, "=", n)) %>%       # kafka expects a value field
stream_write_kafka(mode = "update",
options = write_options)

You can take a quick look at totals by reading from Kafka,

stream_read_kafka(sc, options = totals_options)

Using a new terminal session, use Kafka’s command line tool to manually write single letter into the letters topic.

kafka-console-producer.sh --broker-list localhost:9092 --topic letters
>A
>B
>C

The letters that you input are pushed to Kaka, read by Spark, aggregated within Spark and pushed back into Kafka and then, finally, consumed by Spark again to give you a glimpse into totals topic. This was quite a setup, but also a realistic configuration which is common to find in real-time processing projects.

Next, we will use the Shiny framework to visualize streams, in real-time!

## 12.4 Shiny

Shiny’s reactive framework is well suited to support streaming information which you can use to display real-time data from Spark using the reactiveSpark(), reactive. There is much to learn about Shiny which we can possibly present; however, if you are already familiar with Shiny, this example should be quite easy to understand.

We have a modified version of the k-means Shiny example which, instead of getting the data from the static iris dataset, gets generated with stream_generate_test(), consumed by Spark, retrieved to Shiny through reactiveSpark() and displayed as captured in Figure ??.

To run this example, store the following Shiny app under shiny/shiny-stream.R.

library(sparklyr)
library(shiny)

dir.create("shiny-stream", showWarnings = FALSE)

sc <- spark_connect(
master = "local", version = "2.3",
config = list(sparklyr.sanitize.column.names = FALSE))

ui <- pageWithSidebar(
headerPanel('Iris k-means clustering from Spark stream'),
sidebarPanel(
selectInput('xcol', 'X Variable', names(iris)),
selectInput('ycol', 'Y Variable', names(iris),
selected=names(iris)[[2]]),
numericInput('clusters', 'Cluster count', 3,
min = 1, max = 9)
),
mainPanel(plotOutput('plot1'))
)

server <- function(input, output, session) {
columns = sapply(datasets::iris, class)) %>%
reactiveSpark()

selectedData <- reactive(iris()[, c(input$xcol, input$ycol)])
clusters <- reactive(kmeans(selectedData(), input$clusters)) output$plot1 <- renderPlot({
par(mar = c(5.1, 4.1, 0, 1))
plot(selectedData(), col = clusters()$cluster, pch = 20, cex = 3) points(clusters()$centers, pch = 4, cex = 4, lwd = 4)
})
}

shinyApp(ui, server)

This Shiny application can then be launched with runApp(),

shiny::runApp("shiny/shiny-stream.R")

While the shiny app runs, launch a new R session from the same directory and create a test stream with stream_generate_test(); this will generate a stream of continuous data that Spark can process and Shiny visualize.

sparklyr::stream_generate_test(datasets::iris, "shiny/shiny-stream",
rep(5, 10^3))

This section showed how easy it is to create a Shiny app that can be used for several purposes, such as monitoring, and dashboarding.

In a more complex implementation, the source would more likely be a Kafka stream. The next section will cover how to integrate Kafka, Spark Stream and sparklyr.

Before we transition, disconnect from Spark and clear the folders that we used:

spark_disconnect(sc)

"car-round", "shiny/shiny-stream"), recursive = TRUE)
A humble start that became quite useful when we learned about the several different transformations you can apply to stream; from data analysis transformations using the dplyr and DBI packages, to feature transformers introduced while modeling, to fully-fledged pipelines capable of scoring in real-time and, last but not least, transforming datasets with custom R code; a lot to digest for sure.