Spark Getting Started for Fusion 4.x
/opt/fusion/latest.***__x__****/bin/fusion
(on Unix) or C:\lucidworks\fusion{backslash}latest.****__x__***\bin\fusion.cmd
(on Windows) does not start the spark-master
and spark-worker
processes. This reduces the number of Java processes needed to run Fusion and therefore reduces memory and CPU consumption.Jobs that depend on Spark, for example, aggregations, will still execute in what Spark calls local mode. When in local mode, Spark executes tasks in-process in the driver application JVM. Local mode is intended for jobs that consume/produce small datasets.One caveat about using local mode is that a persistent Spark UI is not available. But you can access the driver/job application UI at port :4040
while the local SparkContext is running.To scale Spark in Fusion to support larger data sets and to speed up processing, you should start the spark-master
and spark-worker
services.On Unix:bin
directory below the Fusion home directory, for example, /opt/fusion/latest.***__x__****
(on Unix) or C:\lucidworks\fusion{backslash}latest.****__x__***
(on Windows).To have the spark-master
and spark-worker
processes start and stop with bin/fusion start
and bin/fusion stop
(on Unix) or bin\fusion.cmd start
and bin\fusion.cmd stop
(on Windows), add them to the group.default
definition in fusion.cors
(fusion.properties
in Fusion 4.x). For example:In Fusion 4.1+http://localhost:8767
to view the Spark master web UI, which should resemble this:bin
directory below the Fusion home directory, for example, /opt/fusion/latest.***__x__****
(on Unix) or C:\lucidworks\fusion{backslash}latest.****__x__***
(on Windows).The shell can take a few minutes to load the first time because the script needs to download the shaded Fusion JAR file from the API service.If ports are locked down between Fusion nodes, specify the Spark driver and BlockManager ports, for example:On Unix::paste
to activate paste mode in the shell and paste in the following Scala code:http://localhost:8767
), there should be a job named “Spark shell” under running applications (the application ID will be different than the following screenshot):spark-shaded-*.jar
and was “Added By User”.bin/spark-shell
script asked for 4 total CPUs for the shell application.spark.sql.shuffle.partitions
.Because our data set is so small, let us adjust Spark so that it only uses 4 tasks. In the Spark shell, execute the following Scala:show
command:Spark Configuration for Fusion 4.x
spark-worker.envVars
to SPARK_WORKER_CORES=<number of cores>
in the fusion.cors
(fusion.properties
in Fusion 4.x) file on all nodes hosting a spark-worker. For example, a r4.2xlarge instance in EC2 has 8 CPU cores, but the following configuration will improve utilization and performance:default.address
property in fusion.cors
(fusion.properties
in Fusion 4.x) to ensure that all Spark processes have a consistent address to bind to.bin
directory below the Fusion home directory, for example, /opt/fusion/latest.***__x__***
.On Windows:bin
directory below the Fusion home directory, for example, C:\lucidworks\fusion{backslash}latest.***__x__***
.fusion.spark.worker.memory
which specifies the total amount of memory available for all executors spun up by that Spark Worker process. This is the quantity seen in the memory column against a worker entry in the Workers table.The JVM memory setting (-Xmx
) for the spark-worker process configured in the fusion.cors
(fusion.properties
in Fusion 4.x) file controls how much memory the spark-worker needs to manage executors (and not how much memory should be made available to your job(s)). When modifying the -Xmx
value, use curl
as follows:spark.executor.memory
, Fusion calculates the amount of memory that can be allocated to the executorAggregation Spark jobs always get half the memory of the amount assigned to the workers.
This is controlled by the fusion.spark.executor.memory.fraction
property, which is set to 0.5
by default.For example, Spark workers have 4 Gb of memory by default and the executors for aggregator Spark jobs are assigned 2 Gb for each executor.To let Fusion aggregation jobs use more of the memory of the workers, increase fusion.spark.executor.memory.fraction
property to 1
.
Use this property instead of the Spark executor memory property.fusion.spark.cores.fraction
lets you limit the number of cores used by the Fusion driver applications (default and scripted). For example, in the screenshot above, we see 18 total CPUs available.We set the cores fraction property to 0.5 via the following command:Port number | Process |
4040 | SparkContext web UI |
7337 | Shuffle port for Apache Spark worker |
8767 | Spark master web UI |
8770 | Spark worker web UI |
8766 | Spark master listening port |
8769 | Spark worker listening port |
8772 (spark.driver.port ) | Spark driver listening port |
8788 (spark.blockManager.port ) | Spark BlockManager port |
1
to the assigned port number.
For example, if 4040 is not available, Spark uses 4041 (if available, or 4042, and so forth).Ensure that the ports in the above table are accessible, as well as a range of up to 16 subsequent ports.
For example, open ports 8772 through 8787, and 8788 through 8804, because a single node can have more than one Spark driver and Spark BlockManager.Path (relative to Fusion home) | Notes |
---|---|
bin/spark-master | Script to manage (start , stop , status , etc.) the Spark Master service in Fusion |
bin/spark-worker | Script to manage (start , stop , status , etc.) the Spark Worker service in Fusion |
bin/sql | Script to manage (start , stop , status , etc.) the SQL service in Fusion |
bin/spark-shell | Wrapper script to launch the interactive Spark shell with the Spark Master URL and shaded JAR |
apps/spark-dist | Apache Spark distribution; contains all JAR files needed to run Spark in Fusion |
apps/spark/hadoop | Hadoop home directory used by Spark jobs running in Fusion |
apps/spark/driver/lib | Add custom JAR files to this directory to include in all Spark jobs |
apps/spark/lib | JAR files used to construct the classpath for the spark-worker , spark-master , and sql services in Fusion |
var/spark-master | Working directory for the spark-master service |
var/spark-worker | Working directory for the spark-worker service; keep an eye on the disk usage under this directory as temporary application data for running Spark jobs is saved here |
var/spark-workDir-* | Temporary work directories are created in when an application is running. They are removed after the driver is shut down or closed. |
var/sql | Working directory for the SQL service |
var/api/work/spark-shaded-*.jar | The shaded JAR built by the API service; contains all classes needed to run Fusion Spark jobs. If one of the jars in the Fusion API has changed, then a new shaded jar is created with an updated name. |
Path (relative to Fusion home) | Notes 2+ |
---|---|
Log configuration | conf/spark-master-log4j2.xml |
Log configuration file for the spark-master service | conf/spark-worker-log4j2.xml |
Log configuration file for the spark-worker service | conf/spark-driver-log4j2.xml |
Log configuration file for the Spark Driver application launched by Fusion; this file controls the log settings for most Spark jobs run by Fusion | conf/spark-driver-scripted-log4j.xml (Fusion 4.1+ only.) |
Log configuration file for custom script jobs and Parallel Bulk Loader (PBL) based jobs | conf/spark-driver-launcher-log4j2.xml |
Log configuration file for jobs built using the Spark Job Workbench | conf/spark-executor-log4j2.xml |
Log configuration file for Spark executors; log messages are sent to STDOUT and can be viewed from the Spark UI | conf/sql-log4j2.xml |
Log configuration file for the Fusion SQL service 2+ | Logs |
var/log/spark-master/* | Logs for the spark-master service |
var/log/spark-worker/* | Logs for the spark-worker service |
var/log/sql/* | Logs for the sql service |
var/log/api/spark-driver-default.log | Main log file for built-in Fusion Spark jobs |
var/log/api/spark-driver-scripted.log | Main log file for custom script jobs |
var/log/api/spark-driver-launcher.log | Main log file for custom jobs built using the Spark Job Workbench |
javax.net.ssl.trustStore
javax.net.ssl.trustStorePassword
javax.net.ssl.trustStoreType
spark.executor.extraJavaOptions
fusion.spark.driver.jvmArgs
spark.driver.extraJavaOptions
Scale Spark Aggregations for Fusion 4.x
foreachPartition
). This is typically due to slowness indexing aggregated jobs back into Solr or due to JavaScript functions.The solution is to increase the number of partitions of the aggregated RDD (the input to Stage 2). By default, Fusion uses 25 partitions. Here, we increase the number of partitions to 72. Set these configuration properties:spark.default.parallelism
.* Default number of partitions in RDDs returned by transformations like join
, reduceByKey
, and parallelize
when not specified by the user:
spark.sql.shuffle.partitions
.* Number of partitions to use when shuffling data for joins or aggregations.
foreachPartition
stage of the job will use 72 partitions:splits_per_shard
read option, which defaults to 4. This configuration setting governs how many Spark tasks can read from Solr concurrently. Increasing this value can improve job performance but also adds load on Solr.Spark Troubleshooting
spark:
. Additionally, a Spark job has a job ID attributed as SPARK JOB ID .
View a comprehensive list of spark jobs:
fusion.properties
and look for group.default
. The line should have spark-master
and spark-worker
in the list, for example
group.default = zookeeper, solr, api, connectors-classic, connectors-rpc, proxy, webapps, admin-ui, spark-master, spark-worker, log-shipper
.spark-client
via shell script by navigating to fusion_home/bin/ directory and attempt to start via ./spark-shell
. It should successfully connect with a message, for example “Launching Spark Shell with Fusion Spark Master: local”. If spark-shell fails to connect at all, copy the error message and pass it to Lucidworks support.host:8764
. Check if scheduled jobs are complete or running.
http://localhost:8764/api/spark/info
. The API should return mode
and masterUrl
. If mode
and masterUrl
are local, then Spark services are not enabled explicitly or they are in a failure state. If Spark services are enabled then you will see mode
as STANDALONE.
mode
as LOCAL then there is an issue with starting Spark services.
http://<FUSION_HOST>/api/spark/log/driver/default?rows=100
OR http://localhost:8764/api/spark/log/driver/default?rows=100
.tail -F spark-driver-default.log
, or copy the complete log files under /fusion_home/var/log/api/
(for example, spark-driver-default.log, spark-driver-scripted.log, api.log, spark-driver-script-stdout.log) and share with Lucidworks to troubleshoot the actual issue.
http://localhost:8764/api/spark/master/config
http://localhost:8764/api/spark/worker/config
http://localhost:8764/api/spark/master/status
https://host:8764/api/spark/info
https://host:8764/api/spark/configurations
http://192.168.29.185:8764/api/apollo/configurations
text/plain
(which renders nicely in browsers) sorted by the timestamp.The REST API Reference documents log endpoints for Spark jobs. The URIs for the endpoints contain /api/spark/log
.The most useful log API endpoint is the spark/log/job/
endpoint, which goes through all Fusion REST API and Spark logs, filters the logs by the jobId
(using MDC, the mapped diagnostic context), and merges the output from different files.For example, to obtain log content for the job **_jobId_**
: