Chapter 12 Streaming

As the velocity of the generation of data increases, so does the need to real-time analysis. Real-time refers to the ability to continuously analyze data from a constantly updating data feed. This is usually called Stream Analytics. In Stream Analytics, data is analyzed as its generated, not retroactively as it happens with “everyday” analysis.

This chapter will cover how to analyze a stream of data using R and Spark. It will also cover basics about stream analysis, along with how to implement these using sparklyr and other R packages.

12.1 Spark Streaming

Spark Streaming is an extension of the core Spark API. It is used for processing live streams of data. It does this in a scalable, high-throughput, and fault tolerant way. It also allows for the current data to be joined with the historical data.

Spark Streaming works by splitting the live input into small batches. Each batch is processed by Spark individually. The output from Spark is also in small batches. This process is not visible to the user. Spark displays streams as a DStream. The name stands for “discretize stream”. DStream represents the small batches as one continuous stream. Inside Spark, the DStream is represented as a sequence of Resilient Distributed Datasets (RDD).

The best resource to learn how Spark analyzes streams is the Apache Spark’s Official site (“Spark Streaming Programming Guide” 2018). This chapter will cover just enough Spark Streaming concepts to help you understand the mechanics of what the R code is doing. It is recommended to read the official resource, specially if you need to implement solutions based on Spark Streaming.

12.2 Working with Spark Streams

In practice, a Spark Stream update is a three stage operation. This is the breakdown of the three stages:

  1. Read - The stream is expected to append new files in a specified folder. Those new files contain the most recent information from the stream. Spark monitors the folder, and reads the data from the files. The following file formats are supported: CSV, text, JSON, parquet, and orc.

  2. Transform - Spark applies the desired operations on top of the data. No special sparklyr functions are needed to transform stream data. You can use same dplyr verbs, Spark transformers and even native R code (via spark_apply()).

  3. Write - The results of the transformed input are saved in a different folder. The following file formats are supported: CSV, text, JSON, parquet, and orc.

Figure 12.1 provides a visual aid to what each stage does and how they connect:

FIGURE 12.1: Working with Spark Streams

Here is the breakdown of the available sparklyr functions for reading and writing:

Format Read Write
CSV stream_read_csv stream_write_csv
JSON stream_read_json stream_write_json
Kafka stream_read_kafka stream_write_kafka
ORC stream_read_orc stream_write_orc
Parquet stream_read_parquet stream_write_parquet
Text stream_read_text stream_write_text
Memory stream_write_memory

In the same way all of the read and write operations in sparklyr for Spark Standalone, or in sparklyr’s local mode, the input and output folders are actual Operating System file system folders. For YARN managed clusters, these will be folder locations inside the Hadoop File System (HDFS).

Non-file driven applications are also supported, such as Kafka. Kafka will be discussed in the last part of the chapter.

12.3 sparklyr extras

The sparklyr package goes beyond providing an easy-to-use-interface to Spark Streaming. The R package includes features which provide a more complete integration with R:

  1. An out-of-the box graph visualization to monitor the stream.

  2. Stream generator for testing and learning purposes.

  3. A Shiny reactive function. It allows Shiny apps to read the contents of a steam.

12.3.1 Stream monitor

The stream_view() function will generate a Shiny app which displays the current state, as well as the history, of the stream. An example of how to use it is available in the Intro Example section.

12.3.2 Stream generator

The stream_generate_test() function creates a local test stream. This function works independently from a Spark connection. The following example will create five files in sub-folder called “source”. The files will be created one second apart from the previous file’s creation.

library(sparklyr)

stream_generate_test(iterations = 5, path = "source", interval = 1)

After the function completes, all of the files should show up in the “source” folder. Notice that the file size vary. This is so that it simulates what a true stream would do.

file.info(file.path("source", list.files("source")))[1] 
##                     size
## source/stream_1.csv   44
## source/stream_2.csv  121
## source/stream_3.csv  540
## source/stream_4.csv 2370
## source/stream_5.csv 7236

The stream_generate_test() by default will create a single numeric variable data frame.

readr::read_csv("source/stream_5.csv")
## # A tibble: 1,489 x 1
##        x
##    <dbl>
##  1   630
##  2   631
##  3   632
##  4   633
##  5   634
##  6   635
##  7   636
##  8   637
##  9   638
## 10   639
## # ... with 1,479 more rows

12.3.3 Shiny reactive

Shiny’s reactive framework is well suited to support streaming information. The idea is that your Shiny app can automatically display the latest results as fast as Spark can process them. The reactiveSpark() function provides that integration.

12.4 Intro example

This section will use a very simple example to introduce the mechanics of Spark Streaming, and how sparklyr interacts with it. This is very simple example. It will only move the input contents to the output contents without any transformations being done to it.

  1. Open a local Spark session

    sc <- spark_connect(master = "local")
  2. Remove the “source” and “destination” folders. This step ensures a clean slate if you try to run the example again.

    if(file.exists("source")) unlink("source", TRUE)
    if(file.exists("destination")) unlink("destination", TRUE)
  3. Just like with read_csv(), stream_read_csv() needs a file specification. To save ourselves from providing one, a single test file is generated.

    stream_generate_test(iterations = 1)
  4. stream_read_csv() starts the ingestion part of the job. It corresponds to the 1. Read stage described in the previous section.

    read_folder <- stream_read_csv(sc, "source")
  5. Set the output of the job to read the incoming data. That is done by passing the read_folder variable, set in the previous step. It corresponds to the 3. Read stage described in the previous section.

    write_output <- stream_write_csv(read_folder, "destination")
  6. The libraryfuture will allow the test generation to run in a asynchronous fashion. This is needed because the next step, stream_view() will start a Shiny app which takes over the R session.

    library(future)
    invisible(future(stream_generate_test(interval = 0.3)))
  7. stream_view() is the out-of-the box graph visualization to monitor the stream that was mentioned in the sparklyr Interface section.

    stream_view(write_output)

The Shiny app shows up in the Viewer pane. The column bars will slowly accumulate in the app’s plot After the test generator completes, the plot should look like what Figure 12.2 shows.

stream_view() output

FIGURE 12.2: stream_view() output

The final step is to clean up the stream and Spark connection

stream_stop(write_output)
spark_disconnect(sc)

12.5 Transformations

Streams can be transformed using dplyr, SQL queries, ML Pipelines or R code. We can use as many transformations as needed in the same way that Spark data frames can be transformed with sparklyr. The transformation source can be streams or data frames but the output is always a stream. If needed, one can always take a snapshot from the destination stream and save the output as a data frame, which is what sparklyr will do for you if a destination stream is not specified.

12.5.1 dplyr

The same dplyr verbs can be used on top of a Spark Stream. The following example shows how easy it is to filter rows and add columns to data from an input folder.

sc <- spark_connect(master = "local")
if(file.exists("source")) unlink("source", TRUE)

stream_generate_test(iterations = 5)

stream_read_csv(sc, "source") %>%
  filter(x > 700) %>%
  mutate(y = round(x / 100))
## # Source: spark<?> [inf x 2]
##        x     y
##    <int> <dbl>
##  1   701     7
##  2   702     7
##  3   703     7
##  4   704     7
##  5   705     7
##  6   706     7
##  7   707     7
##  8   708     7
##  9   709     7
## 10   710     7
## # ... with more rows

It also is possible to perform aggregations over the entire history of the stream. The history could be filtered or not.

stream_read_csv(sc, "source") %>%
  filter(x > 700) %>%
  mutate(y = round(x / 100)) %>%
  count(y) 
## # Source: spark<?> [inf x 2]
##       y     n
##   <dbl> <dbl>
## 1     8   200
## 2     9   200
## 3    10   102
## 4     7    98

Grouped aggregations of the latest data in the stream require a time stamp. The time stamp will be of when reading function, in this case stream_read_csv() , first “saw” that specific record. In Spark stream terms, that time stamp is called a “watermark”. The spark_watermark() function is used to add the time stamp. For this exercise, the watermark will be the same for all records. That is because the 5 files were read by the stream after they were created. Please note that only Kafka and memory outputs support watermarks.

stream_read_csv(sc, "source") %>%
  stream_watermark()
## # Source: spark<?> [inf x 2]
##        x timestamp          
##    <int> <dttm>             
##  1   630 2019-04-07 15:44:50
##  2   631 2019-04-07 15:44:50
##  3   632 2019-04-07 15:44:50
##  4   633 2019-04-07 15:44:50
##  5   634 2019-04-07 15:44:50
##  6   635 2019-04-07 15:44:50
##  7   636 2019-04-07 15:44:50
##  8   637 2019-04-07 15:44:50
##  9   638 2019-04-07 15:44:50
## 10   639 2019-04-07 15:44:50
## # ... with more rows

After the watermark is created, it can be used in the group_by() verb. It can then be piped into a summarise() function to get some stats of the stream.

stream_read_csv(sc, "source") %>%
  stream_watermark() %>%
  group_by(timestamp) %>%
  summarise(
    max_x = max(x, na.rm = TRUE),
    min_x = min(x, na.rm = TRUE),
    count = n()
  ) 
## # Source: spark<?> [inf x 4]
##   timestamp           max_x min_x count
##   <dttm>              <int> <int> <dbl>
## 1 2019-04-07 15:45:59  1000     1  2122

12.5.2 Transformer functions

Transformer functions can also be used to modify a stream. They can also be combined with the regular dplyr functions.

stream_read_csv(sc, "source") %>%
  mutate(x = as.numeric(x)) %>%
  ft_bucketizer("x", "buckets", splits = 0:10 * 100) %>%
  count(buckets)  %>%
  arrange(buckets)
## # Source:     spark<?> [inf x 2]
## # Ordered by: buckets
##    buckets     n
##      <dbl> <dbl>
##  1       0   299
##  2       1   220
##  3       2   200
##  4       3   200
##  5       4   200
##  6       5   200
##  7       6   201
##  8       7   200
##  9       8   200
## 10       9   202

12.5.3 R code

Arbitrary R code can also be used to transform a stream with the use of spark_apply(). Following the same principles from executing R code over Spark data frames, for structured streams, spark_apply() runs R code over each executor in the cluster where data is available, this enables processing high-throughput streams and fulfill low-latency requirements.

stream_read_csv(sc, "source") %>%
  spark_apply(~ nrow(.x), list(n="integer"))
## # Source: spark<?> [inf x 1]
##       n
##   <int>
## 1  1962
## 2   148
## 3    12

12.5.4 ML Pipelines

Spark pipelines can be used for scoring streams, but not to train over streaming data. The former is fully supported while the latter is a feature under active development by the Spark community.

  1. In order to try scoring data in a stream, it is necessary to first create a Pipeline Model. The following build, fits and saves a simple pipeline. It also opens and closes the Spark connection.

    sc <- spark_connect(master = "local")
    cars <- copy_to(sc, mtcars, "mtcars_remote")
    sc %>%
      ml_pipeline() %>%
      ft_binarizer("mpg", "over_30",30) %>%
      ft_r_formula(over_30 ~ wt) %>%
      ml_logistic_regression() %>%
      ml_fit(cars) %>%
      ml_save("cars_model")
    spark_disconnect(sc)
  2. A new connection of Spark is opened. The saved model is loaded into the new connection.

    sc <- spark_connect(master = "local")
    model <- ml_load(sc, "cars_model")
  3. Data that can be used for predictions is needed. The stream_generate_test() can be used for this as well. Instead of relying on the default output, the mtcars variable is passed to it.

    if(file.exists("source")) unlink("source", TRUE)
    stream_generate_test(mtcars, iterations = 5)
  4. The ml_transform() function can now be used on top of the stream. Because the function expects the model as the first function, the piping works a little different. Instead of starting with reading the stream, we start with the model, and use the stream input as the argument on ml_transform()

    model %>%
      ml_transform(stream_read_csv(sc, "source"))
    ## # Source: spark<?> [inf x 17]
    ##      mpg   cyl  disp    hp  drat    wt  qsec    vs    am
    ##    <dbl> <int> <dbl> <int> <dbl> <dbl> <dbl> <int> <int>
    ##  1  15.5     8 318     150  2.76  3.52  16.9     0     0
    ##  2  15.2     8 304     150  3.15  3.44  17.3     0     0
    ##  3  13.3     8 350     245  3.73  3.84  15.4     0     0
    ##  4  19.2     8 400     175  3.08  3.84  17.0     0     0
    ##  5  27.3     4  79      66  4.08  1.94  18.9     1     1
    ##  6  26       4 120.     91  4.43  2.14  16.7     0     1
    ##  7  30.4     4  95.1   113  3.77  1.51  16.9     1     1
    ##  8  15.8     8 351     264  4.22  3.17  14.5     0     1
    ##  9  19.7     6 145     175  3.62  2.77  15.5     0     1
    ## 10  15       8 301     335  3.54  3.57  14.6     0     1
    ## # ... with more rows, and 8 more variables: gear <int>,
    ## #   carb <int>, over_30 <dbl>, features <list>,
    ## #   label <dbl>, rawPrediction <list>,
    ## #   probability <list>, prediction <dbl>

12.6 Shiny integration

The reactiveSpark() provides a mechanism to process the transformations on a stream. It allows you to circumvent the need for writing an output. Also, because it does not depend on the stream writing functions, it is possible to to use watermark groups.

This section’s example will result in a Shiny app. It will start to accumulate and display the current and historical results. The app’s output is shown on Figure 12.3.

  1. Start by opening a Spark connection and begin a test generation.

    sc <- spark_connect(master = "local")
    if(file.exists("source")) unlink("source", TRUE)
    invisible(future(stream_generate_test(interval = 0.2, iterations = 10)))
  2. Load the shiny library and create a simple UI function with one table output.

    library(shiny)
    ui <- function() tableOutput("table")
  3. The server function contains a reactiveSpark() function. This function reads the stream, adds the watermark and then performs the aggregation. The results are then rendered via the table output.

    server <- function(input, output, session){
      ps <- stream_read_csv(sc, "source")  %>%
        stream_watermark() %>%
        group_by(timestamp) %>%
        summarise(
          max_x = max(x, na.rm = TRUE),
          min_x = min(x, na.rm = TRUE),
          count = n()) %>%
        reactiveSpark()  # Spark stream reactive
      output$table <- renderTable(
        ps() %>%
          mutate(timestamp = as.character(timestamp))
      )}
  4. The Shiny app can be activated with runGadget().

    runGadget(ui, server)
Shiny reactive

FIGURE 12.3: Shiny reactive

12.7 Kafka

Apache Kafka is to streaming as what Hadoop is to data storage/retrieval. Hadoop provides a a distributed, resilient and reliable architecture for large-scale data storage. Kafka does the same, but for large-scale streaming applications.

12.7.1 Workflow

A most basic Kafka workflow is illustrated on Figure 12.4. An application streams data into Kafka, called a Producer. Kafka stores the stream as records. Each record has a key, a value and a timestamp. Kafka can handle multiple streams that contain different information. To properly categorize each stream, Kafka uses a mechanism called topic. A topic is a alpha-numeric identifier of the stream. A Consumer is an app that is external to Kafka. It reads what is stored in Kafka for a given topic. Because the Consumer app is constantly monitoring the topic, the term used for its integration with Kafka is subscribe.

FIGURE 12.4: Basic workflow

Kafka also allows for an application to read from one topic, process its data, and then write the results to a different topic. That is called a Stream Processor. In Figure 12.5, the Stream Processor reads topic A, and then writes results to topic B. This allows for a given Consumer application to read results instead of “raw” feed data.

FIGURE 12.5: Kafka workflow

12.7.2 Spark integration

Spark Streaming enables the integration of Spark with Kafka. Spark is able to both, read from and write into Kafka topics. This means that Spark could be a Consumer, Stream Processor or Producer application of a Kafka implementation.

Unless there is a very specific need, using Spark as a Producer does not make much sense. That is because Spark Streaming reacts to a stream, it doesn’t generate it. Theoretically, there could be cases where Spark reads from one stream (Kafka or not) and streams to a Kafka cluster. To the target, Spark would look as a pure Producer, even though that’s not really the case.

The more likely use case for this integration, is to use Spark to read (Consumer) from one, or several, topics and then reactively write (Producer) to a different topic with the results of the analysis. All within the same Kafka cluster. This effectively makes Spark a Stream Processor.

There are nuances on how the Spark-to-Kafka write-back modes works. It is important to offer some clarification. There are three modes available: complete, update and append. The complete mode will provide the totals for every group every time there is a new batch. The update mode will provide totals for only the groups that have updates in the latest batch. The append mode is able to add raw records to the target topic. This mode is not meant for aggregates, but works well for passing a filtered subset to the target topic.

12.7.3 R integration

The R integration is delivered via the sparklyr interface. There are a couple of things to keep in mind:

  • The Kafka integration Spark package is required - The name should be org.apache.spark:spark-sql-kafka followed by the Kafka version, then the Scala version, and lastly the Spark package version. For example: org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0, requests the version 2.4.0 of the Spark package that supports Scala 2.11 and Kafka 10.

    config <- spark_config()
    config$sparklyr.shell.packages <- "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
    sc <- spark_connect(master = "local", config = config)
  • The Kafka writing and reading function in sparklyr rely on additional parameters - These parameters are passed via the options argument. The contents of the options argument are passed down to Kafka as-is. This means that the same Kafka options used in your other applications can be reused here. There are three basic Kafka options to keep in mind: kafka.bootstrap.server, topic and subscribe. The former expects a list of the of one or more hosts from the Kafka cluster. The other two set the topic that the function is either reading from or writing to. One is used at the exclusion of the other. For reading the subscribe option is used, and for writing, topic is used.

stream_read_kafka(
  sc, 
  options = list(
    kafka.bootstrap.server = "host1:9092, host2:9092", 
    subscribe = "topic"
    )
  ) 

12.7.4 Example

For the example, the Producer will stream random, single letters of the alphabet into Kafka. The topic will be called “letters”. A Stream Processor will be built in Spark by having it read the “letters” topic, and then produce the count by unique letter passed throught the stream. The count will be passed back to Kafka in separate topic called “totals”. To see the results, the same Spark connection will be used to setup a Consumer that reads the “totals” topic. Figure 12.6 is a diagram of how this example will work.

FIGURE 12.6: Kafka example

The example will use the update mode for writing back into Kafka. This means that only the totals of the letters that changed will be sent to the “totals” topic. The change in totals is determined after each batch from the “letters” topic is received. Figure 12.7 offers an deeper look of what the Stream Processor (Spark) process is supposed to do.

FIGURE 12.7: Stream Processor - Update mode

The infrastructure used for this example was a local, single node Kafka cluster. The external Producer uses the Kafka CLI to send the stream of letters. The installation instructions that were used can be found in the Appendix under the Kafka section.

  1. Load libraries, and open the connection. Remember to load the Kafka integration Spark package.

    library(sparklyr)
    library(dplyr)
    
    config <- spark_config()
    config$sparklyr.shell.packages <- "org.apache.spark:spark-sql-kafka-0-10_2.11:2.4.0"
    sc <- spark_connect(master = "local", config = config)
  2. The local Kafka cluster is served on port 9092, by default. In order to keep the read and write calls a little cleaner, a couple of variables will contain the Kafka options. Notice that the read option has subscribe, while the write option contains topic.

    hosts   <- "localhost:9092"
    read_options  <- list(kafka.bootstrap.servers = hosts, subscribe = "letters")
    write_options <- list(kafka.bootstrap.servers = hosts, topic = "totals")
  3. Typically, the following steps would be written together using a pipe. So that explanations can be shared, they have been broken into individual step variables. What is unique about this setup is the use of the read_options variable as the options argument.

    step_1 <- stream_read_kafka(sc, options = read_options)
  4. This steps coerces the value field into a character. The resulting content of the field is a single letter entry from the stream. The letters are grouped and counted. A single field is permitted by Kafka to send back as results. It also expect that the results are in a field called value. The new value field is a concatenated field with the letter and the count.

    step_2 <- step_1 %>%
      mutate(value = as.character(value)) %>%
      count(value) %>%
      mutate(value = paste0(value, "=", n))
  5. The results are written to the “totals” topic. Themode argument is set to “update”. This sets the count behavior illustrated in Figure 12.8

    step_3 <- step_2 %>%
      stream_write_kafka(mode = "update", options = write_options)

The last step starts a Spark job. The job will remain active until stopped or until Spark disconnects. At this point, there is no visible output. Even if there was an active Producer sending letters over.

A simple Shiny routine can be used as a Consumer app. It will read the “totals” topic, select the latest count for a given letter, and then display the results on a table.

library(shiny)
ui <- function() tableOutput("table")
server <- function(input, output, session){
  totals_options  <- list(kafka.bootstrap.servers = hosts, subscribe = "totals")
  ps <- stream_read_kafka(sc, options = totals_options) %>%
    mutate(value = as.character(value),
           letter = substr(value, 1,1),
           total = as.numeric(substr(value, 3, 100))
           ) %>%
    group_by(letter) %>%
    summarise(total = max(total, na.rm = TRUE)) %>%
    arrange(letter) %>%
    reactiveSpark()  
  output$table <- renderTable(ps())
}
runGadget(ui, server)

A new terminal session is started. Kafka’s CLI provides a simple Producer program that runs in the console. Using that program, we can manually write a single letter, and then press enter.

user@laptop:~/kafka_2.12-2.2.0$ bin/kafka-console-producer.sh --broker-list localhost:9092 --topic letters
>A
>B
>C
>A
>A
>C
>D

The Shiny reactive function will poll and refresh the results as the letters are being entered. This is shown in figure 11.8.

Shiny with Kafka

FIGURE 12.8: Shiny with Kafka

References

“Spark Streaming Programming Guide.” 2018. https://spark.apache.org/docs/latest/streaming-programming-guide.html.