Spark Driver Processes

A Spark "driver" is an application that creates a SparkContext for executing one or more jobs in the Spark cluster. The following diagram depicts the driver’s role in a Spark cluster:

Spark driver

In the diagram above, the spark-master service in Fusion is the Cluster Manager.

If your Spark job performs any collect operations, then the result of the collect (or collectAsMap) is sent back to the driver from all the executors. Consequently, if the result of the collect is too big too fit into memory, you will encounter OOM issues (or other memory related problems) when running your job.

All Fusion jobs run on Spark using a driver process started by the API service.

Custom jobs

Fusion supports custom Spark jobs that are written in Scala, Java, or Python jobs and that are built using the Spark Job Workbench, a toolkit provided by Lucidworks. See the examples in the respository for details.

To troubleshoot problems with a custom job, start by looking for errors in the script-job driver log, /opt/fusion/4.1.x/var/log/api/spark-driver-launcher.log (on Unix) or C:\lucidworks\var\fusion\4.1.x\var\log\api\spark-driver-launcher.log (on Windows).

Drivers

Fusion has four types of job drivers:

  • Default driver – Executes built-in Fusion jobs, such as a signal aggregation job or a metrics rollup job.

  • Script-job driver – Executes custom script jobs; a separate driver is needed to isolate the classpath for custom Scala scripts.

  • Spark-shell driver – Wrapper script provided with Fusion to launch the Spark Scala REPL shell with the correct master URL (pulled from Fusion’s API) and a shaded Fusion JAR added. Launched using /opt/fusion/4.1.x/bin/spark-shell (on Unix) or C:\lucidworks\var\fusion\4.1.x\bin\spark-shell.cmd (on Windows).

  • Custom-job driver – Executes custom jobs built using the Spark Job Workbench, a toolkit provided by Lucidworks to help build custom Spark jobs using Scala, Java, or Python.

Default driver

Navigate to the Fusion UI and select the system metrics collection in the UI. Select one of the built-in aggregation jobs, such as hourlyMetricsRollup-counters. In the diagram above, the spark-master service in Fusion is the Cluster Manager.

You must delete any existing driver applications before launching the job. Even if you haven’t started any jobs by hand, Fusion’s API service might have started one automatically, because Fusion ships with built-in jobs that run in the background which perform rollups of metrics in the system_metrics collection. Therefore, before you try to launch a job, you should run the following command:

curl -XDELETE http://localhost:8764/api/spark/driver

Wait a few seconds and use the Spark UI to verify that no Fusion-Spark application (for example, Fusion-20161005224611) is running.

In a terminal window or windows, set up a tail -f job (on Unix, or the equivalent on Windows) on the api and spark-driver-default logs:

tail -f var/log/api/api.log var/log/api/spark-driver-default.log

Give this command from the bin directory below the Fusion home directory, for example, /opt/fusion/4.1.x (on Unix) or C:\lucidworks\fusion\4.1.x (on Windows).

Now, start any aggregation job from the UI. It doesn’t matter whether or not this job performs any work; the goal of this exercise is to show what happens in Fusion and Spark when you run an aggregation job. You should see activity in both logs related to starting the driver application and running the selected job. The Spark UI will now show a Fusion-Spark app:

Spark default driver

Use the ps command to get more details on this process:

ps waux | grep SparkDriver

The output should show that the Fusion SparkDriver is a JVM process started by the API service; it is not managed by the Fusion agent. Within a few minutes, the Spark UI will update itself:

Spark default driver dynamic allocation

Notice that the application no longer has any cores allocated and that all of the memory available is not being used (0.0B Used of 2.0 GB Total). This is because we launch our driver applications with spark.dynamicAllocation.enabled=true. This setting allows the Spark master to reclaim CPU & memory from an application if it is not actively using the resources allocated to it.

Both driver processes (default and scripted) manage a SparkContext. For the default driver, the SparkContext will be shut down after waiting a configurable (fusion.spark.idleTime: default 5 mins) idle time. The scripted driver shuts down the SparkContext after every scripted job is run to avoid classpath pollution between jobs.

Script-job driver

Fusion supports custom script jobs.

Script jobs require a separate driver to isolate the classpath for custom Scala scripts, as well as to isolate the classpath between the jobs, so that classes compiled from scripts don’t pollute the classpath for subsequent scripted jobs.

For this reason, the SparkContext that each scripted job uses is immediately shut down after the job is finished and is recreated for new jobs. This adds some startup overhead for scripted jobs.

Refer to the apachelogs lab in the Fusion Spark Bootcamp project for a complete example of a custom script job.

To troubleshoot problems with a script job, start by looking for errors in the script-job driver log spark-driver-scripted.log in /opt/fusion/4.1.x/var/log/api/ (on Unix) or C:\lucidworks\var\fusion\4.1.x\var\log\api\ (on Windows).

Spark drivers in a multinode cluster

To find out which node is running the Spark driver which node is running the driver when running a multi-node Fusion deployment which has several nodes running Fusion’s API services, you can query the driver status via the following call:

curl http://localhost:8764/api/spark/driver/status

This returns a status report:

{
  "/spark-drivers/15797426d56T537184c2" : {
    "id" : "15797426d56T537184c2",
    "hostname" : "192.168.1.9",
    "port" : 8601,
    "scripted" : false
  }
}