Spark Getting Started

The public GitHub repo https://github.com/lucidworks/fusion-spark-bootcamp contains examples and labs for learning how to use Fusion’s Spark features.

In this section, you’ll walk through some basic concepts of using Spark in Fusion. For further exposure, you should work through the labs in the Fusion Spark Bootcamp.

Starting Spark Master and Worker Processes

In Fusion 3.0, the Fusion run script $FUSION_HOME/bin/fusion start doesn’t start the spark-master and spark-worker processes. This reduces the number of Java processes needed to run Fusion and therefore reduces memory and CPU consumption.

Jobs that depend on Spark, e.g. aggregations, will still execute in what Spark calls "local" mode. When in local mode, Spark executes tasks in-process in the driver application JVM. Local mode is intended for jobs that consume/produce small datasets. One caveat of using local mode is that a persistent Spark UI is not available, but you can access the driver/job application UI at port :4040 while the local SparkContext is running.

Tip

To allow the spark-master and spark-worker processes to start and stop via bin/fusion start and bin/fusion stop, we recommend adding them to the group.default definition in conf/fusion.properties:

group.default = zookeeper, solr, api, connectors, ui, spark-master, spark-worker

To scale-out Spark in Fusion to support larger data sets and to speed up processing, you should start the spark-master and spark-worker processes:

> cd $FUSION_HOME
> bin/spark-master start
> bin/spark-worker start

After starting the master and worker services, direct your browser to http://localhost:8767 to view the Spark UI. The display should be similar to:

Spark started via UI

Check the following logs if you do not see the master UI or at least one worker in the ALIVE state.

$FUSION_HOME/var/log/spark-master/spark-master.log
$FUSION_HOME/var/log/spark-worker/spark-worker.log

Use the Fusion API to get the status of the Spark master via the following request:

curl http://localhost:8765/api/v1/spark/master/status

This should return a response of the form:

[ {
  "url" : "spark://192.168.1.9:8766",
  "status" : "ALIVE",
  "workers" : [ {
    "id" : "worker-20161005175058-192.168.1.9-8769",
    "host" : "192.168.1.9",
    "port" : 8769,
    "webuiaddress" : "http://192.168.1.9:8082",
    "cores" : 8,
    "coresused" : 0,
    "coresfree" : 8,
    "memoryused" : 0,
    "memoryfree" : 2048,
    "state" : "ALIVE",
    "lastheartbeat" : 1475711489460
  } ], ...

If you have multiple Spark masters running in a Fusion cluster, each will be shown in the status but only one will be ALIVE; the other masters will be in STANDBY mode.

Tip
If you are operating a multi-node Spark cluster, we recommend running multiple Spark master processes to achieve high-availability if the active one fails, the standby will take over.

Running a Job in the Spark Shell

Once you have started the Spark master and worker, launch the Fusion Spark shell wrapper:

> bin/spark-shell

It may take a few minutes to load the first time as the script needs to download the shaded Fusion JAR file from the API service. When shell is initialized, you’ll see the prompt:

scala>

Type :paste to activate paste mode in the shell and paste in the following Scala code:

val readFromSolrOpts = Map(
  "collection" -> "logs",
  "fields" -> "host_s,port_s,level_s,message_t,thread_s,timestamp_tdt"
)
val logsDF = spark.read.format("solr").options(readFromSolrOpts).load
logsDF.registerTempTable("fusion_logs")
var sqlDF = spark.sql("""
|   SELECT COUNT(*) as num_values, level_s as level
|     FROM fusion_logs
| GROUP BY level_s
| ORDER BY num_values desc
|    LIMIT 10""".stripMargin)
sqlDF.show(10,false)
Note
In Fusion 2.4.x, you’ll need to specify the Solr zkhost in the readFromSolrOpts Map, but in 3.x we set it automatically using Fusion configuration API.

*Make sure the zkhost val is correct for your environment!*

Type ctrl-d to execute the script. Your results should look similar to the following:

Spark shell result

Congratulations, you just ran your first Fusion Spark job that reads data from Solr and performs a simple aggregation!

The Spark Shell UI

The Spark Master UI allows us to dig into the details of the Spark job. This handy guide helps you understand the Spark UI: https://jaceklaskowski.gitbooks.io/mastering-apache-spark/content/spark-webui.html.

In your browser (http://localhost:8767), there should be a job named "Spark shell" under running applications (the application ID will be different than the following screenshot):

Spark UI result

Click on the application ID and then on the Application Detail UI link:

Spark UI detail

Notice the tabs at the top that allow you to dig into details about the running application. Take a moment to explore the UI. Can answer the following questions about your application:

  • How many tasks were needed to execute this job?

  • Which JARs were added to the classpath for this job? Hint: look under the Environment tab.

  • How many executor processes were used to run this job? Why? Hint: take a look at all the Spark configuration properties under the Environment tab.

  • How many rows were read from Solr for this job? Hint: look under the SQL tab

For the above run, the answers are:

  • 202 tasks were needed to execute this job.

  • The Environment tab shows that one of the JAR files is named "fusion.jar" and was "Added By User". The fusion.jar file is a copy of the shaded JAR created by the API service:

Spark jars

  • It took 2 executor processes to run this job. Each executor has 2 CPUs allocated to it and the bin/spark-shell script asked for 4 total CPUs for the shell application.

  • This particular job read about 21K rows from Solr, but this number will differ based on how long Fusion has been running.

The key take-away is that you can see how Spark interacts with Solr using the UI.

Spark Job Tuning

Returning to the first question, why were 202 tasks were needed to execute this job?

SparkSQL query

The reason is that SparkSQL defaults to using 200 partitions when performing distributed group by operations, see property: spark.sql.shuffle.partitions. Let’s adjust that so Spark only uses 4 tasks since our data set is so small. In the Spark shell, execute the following Scala:

sqlContext.setConf("spark.sql.shuffle.partitions", "4")

You just need to re-execute the final query and show commands:

val readFromSolrOpts = Map(
  "collection" -> "logs",
  "fields" -> "host_s,port_s,level_s,message_t,thread_s,timestamp_tdt"
)
val logsDF = spark.read.format("solr").options(readFromSolrOpts).load
logsDF.registerTempTable("fusion_logs")
var sqlDF = spark.sql("""
|   SELECT COUNT(*) as num_values, level_s as level
|     FROM fusion_logs
| GROUP BY level_s
| ORDER BY num_values desc
|    LIMIT 10""".stripMargin)
sqlDF.show(10,false)

Now if you look at the Job UI, you’ll see a new job that executed with only 6 executors! You’ve just had your first experience with tuning Spark jobs.