Chapter 7 Connections
The previous chapter, Clusters, presented the major cluster computing trends, cluster managers, distributions and cloud service providers to help you choose the Spark cluster that best suits your needs. In contrast, this chapter presents the internal components of a Spark cluster and how to connect to Spark clusters running on any cluster manager, distribution or services presented in the previous chapter.
In addition, this chapter provides various troubleshooting connection techniques. While we hope you won’t need to use these, this chapter will prepare you to use them as effective techniques to resolve connectivity issues.
While this chapter might feel a bit dry – connecting and troubleshooting connections is definetely not the most exciting part of large-scale data analysis – it will introduce the components of a Spark cluster and how they interact, often known as the architecture of Apache Spark. This chapter, in addition to the Data and Tunning chapters, will provide a detailed view of how Spark works, which will help you move towards becoming an intermediate Spark user that can go beyond analysis into the exciting world of distributed computing, using Apache Spark.
The overall connection architecture for a Spark cluster is composed of three type of compute instances: the driver node, the worker nodes and the cluster manager. A cluster manager is a service that allows Spark to be executed in the cluster, this was detailed in the previous chapter under the cluster managers section. The worker nodes (also referred to as executors) execute compute tasks over partitioned data and communicate intermediate results to other workers or back to the driver node. The driver node is tasked with delegating work to the worker nodes, but also for aggregating their results and controlling computation flow. For the most part, aggregation happens in the worker nodes; however, even after the nodes aggregate data, it is often the case that the driver node would have to collect the worker’s results. Therefore, the driver node usually has at least, but often much more, compute resources (memory, CPUs, local storage, etc.) than the worker node.
Strictly speaking, the driver node and worker nodes are just names assigned to machines with particular roles, while the actual computation in the driver node is performed by the spark context. The Spark context is the main entry point for Spark functionality (“Azure Wikipedia” 2019) since it’s tasked with scheduling tasks, managing storage, tracking execution status, access configuration settings, canceling jobs and so on. In the worker nodes, the actual computation is performed under a spark executor, which is a Spark component tasked with executing subtasks against specific data partition.
We can illustrate this concepts in Figure 7.1, where the driver node orchestrates worker’s work through the cluster manager.
If you already have a Spark cluster in your organization, you should request the connection information to this cluster from your cluster administrator, read their usage policies carefully and follow their advice. Since a cluster may be shared among many users, you want to make sure you only request the compute resources you need – we will cover how to request resources in the Tunning chapter. Your system administrator will describe if it’s an on-premise vs cloud cluster, the cluster manager being used, supported connections and supported tools. You can use this information to jump directly to Local, Standalone, YARN, Mesos, Livy or Kubernetes based on the information provided to you.
Note: Once connected is performed with
spark_connect(), you can use all techniques described in previous chapters using the
sc connection; for instance, you can do data analysis or modeling with the same code previous chapters presented.
7.1.1 Edge Nodes
Computing clusters are configured to enable high-bandwidth and fast network connectivity between nodes. To optimize network connectivity, the nodes in the cluster are configured to trust each other and to disable security features. This improves performance but requires the cluster to be secured by closing all external network communication, making the entire cluster secure as a whole. Except for a few cluster machines that are carefully configured to accept connections from outside the cluster; conceptually, these machines are located in the “edge” of the cluster and are known as edge nodes.
Therefore, before connecting to Apache Spark, it is likely you will first have to connect to an edge node in your cluster. There are two methods to connect:
- Using a computer terminal application, one can use a secure shell to establish a remote connection into the cluster, once you connect into the cluster, you can launch R and then use
sparklyr. However, a terminal can be cumbersome for some tasks, like exploratory data analysis, so it’s often only used while configuring the cluster or troubleshooting issues.
- Web Browser
- While using
sparklyrfrom a terminal is possible, it is usually more productive to install a web server in an edge node that provides access to run R with
sparklyrfrom a web browser. Most likely, you will want to consider using RStudio or Jupyter rather than connecting from the terminal.
Figure 7.2 explains these concepts visually. The left block is usually your web browser, the right block is the edge node, client and edge node communicate over HTTP when using a web browser or SSH when using the terminal.
7.1.2 Spark Home
After you connect to an edge node, the next step is to find out where Spark is installed, this location is known as the
SPARK_HOME. In most cases, your cluster administrator will have already set the
SPARK_HOME environment variable to the correct installation path. If not, you will need to find out the correct
SPARK_HOME path. The
SPARK_HOME path must be specified as an environment variable or explicitly when running
spark_connect() using the
If your cluster provider or cluster administrator already provided
SPARK_HOME for you, the following code should return a path instead of an empty string.
If the code above returns an empty string, this would mean the
SPARK_HOME environment vatiable is not set in your cluster, so you will have to specify
SPARK_HOME while using
spark_connect() as follows:
sc <- spark_connect(master = "<cluster-master>", spark_home = "local/path/to/spark")
When connecting to Spark in local mode, Spark starts a single process which runs most of the cluster components like the Spark context and a single executor. This is ideal to learn Spark, work offline, troubleshoot issues or to test code before you run it over a large compute cluster. A local connection to Spark is represented in Figure 7.3.
Notice that in the local connections diagram, there is no cluster manager nor worker process since, in local mode, everything runs inside the driver application. It’s also worth noting that
sparklyr starts the Spark Context through
spark-submit, a script available in every Spark installation to enable users to submit custom application to Spark which,
sparklyr makes use of to submit itself to Spark. For the curious reader, the Contributing chapter explains the internal processes that takes place in
sparklyr to submit this application and connect properly from R.
To perform this local connection, we can connect with the following familiar code used in previous chapters:
# Connect to local Spark instance sc <- spark_connect(master = "local")
Connecting to a Spark Standalone cluster requires the location of the cluster manager’s master instance, which can be found in the cluster manager web interface as described in the Standalone Clusters section. You can find this location by looking for a URL starting with
A connection in standalone mode starts from
sparklyr, which launches
spark-submit, which then submits the
sparklyr application, and creates the Spark Context, which requests executors from the Spark Standalone instance running under the given
Visually, this is described in Figure 7.4, which is quite similar to the overall connection architecture from Figure 7.1 but, with additional details that are particular to standalone clusters and
In order to connect, use
master = "spark://hostname:port" in
spark_connect() as follows:
sc <- spark_connect(master = "spark://hostname:port")
Hadoop YARN is the cluster manager from the Hadoop project, it’s the most common cluster manager which you are likely to find in clusters that started out as Hadoop clusters; with Cloudera, Hortonworks and MapR distributions as when using Amazon EMR. YARN supports two connection modes: YARN Client and YARN Cluster. However, YARN Client mode is much more common that YARN Cluster since it’s more efficient and easier to set up.
7.4.1 Yarn Client
When connecting in YARN Client mode, the driver instance runs R, sparklyr and the Spark Context which requests worker nodes from YARN to run Spark executors as shown in Figure 7.5.
To connect, one can simply run with
master = "yarn" as follows:
sc <- spark_connect(master = "yarn")
Behind the scenes, when running YARN in client mode, the cluster manager will do what you would expect a cluster manager would do; it will allocate resources from the cluster and assign them to your Spark application, which the Spark Context will manage for you. The important piece to notice in Figure ?? is that, the Spark Context resides in the same machine where you run R code, this is different when running YARN in cluster mode.
7.4.2 Yarn Cluster
The main difference between YARN in cluster mode and running YARN in client mode is that, in cluster mode, the driver node is not required to be the node where R and sparklyr were launched; instead, the driver node remains the designated driver node which is usually a different node than the edge node where R is running. It can be helpful to consider using cluster mode when the edge node has too many concurrent users, when is lacking computing resources, or where tools (like RStudio or Jupyter) need to be managed independently of other cluster resources.
Figure 7.6 shows how the different components become decoupled when running in cluster mode. Notice there is still a line connecting the client with the cluster manager since, first of all, resources still need to be allocated from the cluster manager; however, once allocated, the client communicates directly with the driver node which will then communicate with the worker nodes. From this diagram, you might think that cluster mode looks much more complicated than the client mode diagram – this would be a correct assesment; therefore, it’s best to avoid cluster mode when possible due to additional configuration overhead that is best to avoid, if possible.
To connect in YARN Cluster mode, we can simple run:
sc <- spark_connect(master = "yarn-cluster")
Cluster mode assumes that the node running
spark_connect() is properly configured, meaning that,
yarn-site.xml exists and the
YARN_CONF_DIR environment variable is properly set. When using Hadoop as a file system, you will also need the
HADOOP_CONF_DIR environment variable properly configured. In addition, you would need to have proper network connectivity between the client and the driver node, not just with sufficient bandwidth but also making sure both machines are reachable and no intermediate. This configuration is usually provided by your system administrator and is not something that you would have to manually configure.
As opposed to other connection methods which require using an edge node in the cluster, Livy provides a Web API that makes the Spark cluster accessible from outside the cluster and does not require a Spark installation in the client. Once connected through the Web API, the Livy Service starts the Spark context by requesting resources from the cluster manager and distributing work as usual. Figure 7.7 illustrates a Livy connection, notice that the client connects remotely to the driver through a web API.
Connecting through Livy requires the URL to the Livy service which should be similar to
https://hostname:port/livy. Since remote connections are allowed, connections usually requires, at the very least, basic authentication:
sc <- spark_connect( master = "https://hostname:port/livy", method = "livy", config = livy_config( spark_version = "2.4.0", username = "<username>", password = "<password>" ))
To try out Livy in your local machine, you can install and run a Livy service as described under the Livy Clusters section and then, connect as follows:
sc <- spark_connect( master = "http://localhost:8998", method = "livy", version = "2.4.0")
Once connected through Livy, you can make use of any
sparklyr feature; however, Livy is not suitable for exploratory data analysis, since executing commands has a significant performance cost. That said, while running long running computations, this overhead could be considered irrelevant. In general, it is preferred to avoid using Livy and work directly within an edge node in the cluster; when this is not feasible, using Livy could be a reasonable approach.
Note: Specifying the Spark version through the
spark_version parameter is optional; however, when the version is specified, performance is significantly improved by deploying precompiled Java binaries compatible with the given version. Therefore, it is a best practice to specify the Spark version when connecting to Spark using Livy.
Similar to YARN, Mesos supports client mode and a cluster mode; however –
sparklyr currently only supports client mode under Mesos. Therefore, the diagram from Figure 7.8, is equivalent to YARN Client’s diagram with only the cluster manager changed from YARN to Mesos.
Connecting requires the address to the Mesos master node, usually in the form of
mesos://zk://host1:2181,host2:2181,host3:2181/mesos for Mesos using ZooKeeper.
sc <- spark_connect(master = "mesos://host:port")
MESOS_NATIVE_JAVA_LIBRARY environment variable needs to be set by your system administrator, or manually set when running mesos on your local machine. For instance, in OS X, you can install and initialize Mesos from a terminal, followed by manually setting the mesos library and connecting with
brew install mesos /usr/local/Cellar/mesos/1.6.1/sbin/mesos-master --registry=in_memory --ip=127.0.0.1 MESOS_WORK_DIR=. /usr/local/Cellar/mesos/1.6.1/sbin/mesos-slave --master=127.0.0.1:5050
Sys.setenv(MESOS_NATIVE_JAVA_LIBRARY = "/usr/local/Cellar/mesos/1.6.1/lib/libmesos.dylib") sc <- spark_connect(master = "mesos://localhost:5050", spark_home = spark_home_dir())
Kubernetes cluster do not support client modes like Mesos or YARN; instead, the connection model is similar to YARN Cluster, where the driver node is assigned by Kubernetes. This is illustrated in Figure 7.9.
Kubernetes support is scheduled to be added to
sparklyr with sparklyr/issues/1525, please follow progress for this feature directly in github. Once Kubernetes becomes supported in
sparklyr, connecting to Kubernetes will work as follows:
sc <- spark_connect( master = "k8s://https://<apiserver-host>:<apiserver-port>", config = list( spark.executor.instances = 2, spark.kubernetes.container.image = "spark-image" ) )
If your computer is already configured to use a Kubernetes cluster, you can use the following command to find the
When working with cloud providers, there are a few connection differences. For instance, connecting from Databricks requires the following connection method:
sc <- spark_connect(method = "databricks")
Since Amazon EMR makes use of YARN, you can connect using
master = "yarn":
sc <- spark_connect(master = "yarn")
Connections to Spark when using IBM’s Watson Studio requires you to retrieve a configuration object through a
load_spark_kernels() function IBM provides:
kernels <- load_spark_kernels() sc <- spark_connect(config = kernels)
Under Microsoft Azure HDInsights and when using ML Services (R Server), creating an
sparklyr connection gets initialized through:
library(RevoScaleR) cc <- rxSparkConnect(reset = TRUE, interop = "sparklyr") sc <- rxGetSparklyrConnection(cc)
Please reference your cloud provider documentation and their support channels if assistance is needed.
Most of the time,
sparklyr used interactively; as in, you explicitly connect with
spark_connect() and then execute commands to analyze and model large-scale data. However, you can also automate processes by scheduling Spark jobs that use
sparklyr. Spark does not provide tools to schedule data processing tasks; so instead, you would use other workflow management tools. This can be useful useful to transform data, prepare a model and score data overnight or to make use of Spark by other systems.
As an example, you can create a file named
batch.R with contents:
library(sparklyr) sc <- spark_connect(master = "local") sdf_len(sc, 10) %>% spark_write_csv("batch.csv") spark_disconnect(sc)
You can then submit this application to Spark in batch mode using
master parameter should be set to the appropriately.
spark_submit(master = "local", "batch.R")
You can also invoke
spark-submit from the shell directly through:
/spark-home-path/spark-submit --files batch.R --class sparklyr.Shell '/spark-jars-path/sparklyr-2.3-2.11.jar' 8880 12345 --batch batch.R
The last parameters represent the port number
8880 and the session number,
12345, which can be set to any unique numeric identifier. You can use the following R code to get the correct paths:
# Retrieve spark-home-path spark_home_dir() # Retrive spark-jars-path system.file("java", package = "sparklyr")
You can customize your script by passing additional command line arguments to
spark-submit and then reading them back in R using
When connecting to a Spark Cluster using tools like Jupyter and RStudio, you can run the same connection parameters presented in this chapater. However, since many cloud providers make use of a web proxy to secure Spark’s web interface, in order to use
spark_web() or the RStudio connections pane extension, you will need to properly configure the
sparklyr.web.spark setting which you would then pass to
spark_config() through the
For instance, when using Amazon EMR, you can configure
sparklyr.web.yarn by dinamically retrieving the YARN application and building the EMR proxy URL:
domain <- "http://ec2-12-345-678-9.us-west-2.compute.amazonaws.com" config <- spark_config() config$sparklyr.web.spark <- ~paste0( domain, ":20888/proxy/", invoke(spark_context(sc), "applicationId")) config$sparklyr.web.yarn <- paste0(domain, ":8088") sc <- spark_connect(master = "yarn", config = config)
It is common to connect once, and only once, to Spark. However, you can also open multiple connections to Spark by connecting to different clusters or by specifying the
app_name parameter. This can be helpful to compare Spark versions or validate your analysis before submitting to the cluster. The following example opens connections to Spark 1.6.3, 2.3.0 and Spark Standalone:
# Connect to local Spark 1.6.3 sc_1_6_3 <- spark_connect(master = "local", version = "1.6.3") # Connect to local Spark 2.3.0 sc_2_3_0 <- spark_connect(master = "local", version = "2.3.0", appName = "Spark23") # Connect to local Spark Standalone sc_standalone <- spark_connect(master = "spark://host:port")
Finally, we can disconnect from each connection:
spark_disconnect(sc_1_6_3) spark_disconnect(sc_2_3_0) spark_disconnect(sc_standalone)
Alternatively, you can disconnect from all connections at once:
Last but not least, we will introduce the following troubleshooting techniques: Logging, Spark Submit and Windows. When in doubt of where to start, start with the Windows section when using Windows systems, followed by Logging and closing with Spark Submit. This techniques are useful when running
spark_connect() fails with an error message.
The first technique to troubleshoot connections is to print Spark logs directly to the console to help you spot additional error messages:
sc <- spark_connect(master = "local", log = "console")
In addition, you can enable verbose logging by setting the
sparklyr.verbose option when connecting:
sc <- spark_connect(master = "local", log = "console", config = list(sparklyr.verbose = TRUE))
7.12.2 Spark Submit
You can diagnose if a connection issue is specific to R or Spark in general. This can be accomplished by running an example job through
spark-submit and validating that no errors are thrown:
# Find the spark directory using an environment variable spark_home <- Sys.getenv("SPARK_HOME") # Or by getting the local spark installation spark_home <- sparklyr::spark_home_dir()
Then execute the sample compute Pi example by replacing
"local" with the correct master parameter you are troubleshooting:
# Launching a sample application to compute Pi system2( file.path(spark_home, "bin", "spark-submit"), c( "--master", "local", "--class", "org.apache.spark.examples.SparkPi", file.path(spark_home, "examples", "jars", "spark-examples_2.11-2.4.0.jar"), 100) )
... Pi is roughly 3.1415503141550314 ...
If the above message is not displayed, you will have to investigate why your Spark cluster is not properly configured, which is beyond the scope of this book. When using a cloud provider or a Spark distribution, you can contact their support team to help you troubleshoot this further; otherwise, StackOverflow is a good place to start.
Otherwise, if you do see the message above, this means your Spark cluster is properly configured but somehow, R is not being able to use Spark, so you will have to troubleshoot in-detail as we will explain next.
To troubleshoot the connection process in detail, you can manually replicate the two-step connection process, which is often very helpful to diagnose connection issues. Connecting to Spark is performed in two steps; first,
spark-submit is triggered from R which submits the application to Spark, second, R connects to the running Spark application.
First, identify the Spark installation directory and the path to the correct
sparklyr*.jar by running:
dir(system.file("java", package = "sparklyr"), pattern = "sparklyr", full.names = T)
Make sure you identify the correct version that matches your Spark cluster, for instance
sparklyr-2.1-2.11.jar for Spark 2.1.
Then, from the terminal, run:
$SPARK_HOME/bin/spark-submit --class sparklyr.Shell $PATH_TO_SPARKLYR_JAR 8880 42
18/06/11 12:13:53 INFO sparklyr: Session (42) found port 8880 is available 18/06/11 12:13:53 INFO sparklyr: Gateway (42) is waiting for sparklyr client to connect to port 8880
8880 represents the default port to use in
42 the session number, this is a cryptographically secure number generated bysparklyr
, but for troubleshooting purposes can be as simple as42`.
If this first connection step fails, it means that the cluster can’t accept the application. This usually means that there are not enough resources, there are permission restrictions, etc.
The second step is to connect from R as follows – notice that there is a 60 seconds timeout, so you’ll have to run the R command after running the terminal command. If needed, this timeout can be configured as described in the Tunning chapter.
library(sparklyr) sc <- spark_connect(master = "sparklyr://localhost:8880/42", version = "2.3")
If this second connection step fails, it usually means that there is a connectivity problem between R and the driver node. You can try using a different connection port, for instance.
Connecting from Windows is, in most cases, as straightforward as connecting from Linux and OS X. However, there are a few common connection issues you should be aware of:
- Firewalls and antivirus software might block ports for your connection. The default port used by
8880; double check this port is not being blocked.
- Long path names can cause issues, specially in older Windows systems like Windows 7. When using these systems, try connecting with Spark installed with all folders using at most eight characters and no spaces in their names.
This chapter presented an overview of Spark’s architecture, connection concepts and examples to connect in local mode, standalone, YARN, Mesos, Kubernetes and Livy. It also presented edge nodes and their role while connecting to Spark clusters. This should have provided you with enough information to successfully connect to any Apache Spark cluster.
To troubleshoot connection problems beyond the techniques described in this chpater, it is recommended to search for the connection problem in StackOverflow, the sparklyr GitHub issues and, if needed, open a new GitHub issue in sparklyr to assist further.
In the next chapter, Data, we will cover how to read and write over multiple data sources, help you understand how Spark makes use of Spark DataFrames, and describe how to import and export data from your Spark clusters.
“Azure Wikipedia.” 2019. https://spark.apache.org/docs/2.1.0/api/scala/index.html.