Node Selectors
You can control which nodes Spark executors are scheduled on using a Spark configuration property for a job:LABEL
specified for the node, and the name of the node as the LABEL_VALUE
. For example, if a node is labeled with fusion_node_type=spark_only
, schedule Spark executor pods to run on that node using:
In Fusion 5.5, Spark version 2.4.x does not support tolerations for Spark pods. As a result, Spark pods can’t be scheduled on any nodes with taints in Fusion 5.5.
Cluster mode
Fusion 5 ships with Spark and operates in “cluster mode” on top of Kubernetes. In cluster mode, each Spark driver runs in a separate pod, and resources can be managed per job. Each executor also runs in its own pod.Spark config defaults
The table below shows the default configurations for Spark. These settings are configured in the job-launcher config map, accessible usingkubectl get configmaps <release-name>-job-launcher
. Some of these settings are also configurable via Helm.
Spark Resource Configurations
Spark Configuration | Default value | Helm Variable |
---|---|---|
spark.driver.memory | 3g | |
spark.executor.instances | 2 | executorInstances |
spark.executor.memory | 3g | |
spark.executor.cores | 6 | |
spark.kubernetes.executor.request.cores | 3 | |
spark.sql.caseSensitive | true |
Spark Configuration | Default value | Helm Variable |
---|---|---|
spark.kubernetes.container.image.pullPolicy | Always | image.imagePullPolicy |
spark.kubernetes.container.image.pullSecrets | image.imagePullSecrets | |
spark.kubernetes.authenticate.driver.serviceAccountName | <name>-job-launcher-spark | |
spark.kubernetes.driver.container.image | fusion-dev-docker.ci-artifactory.lucidworks.com | image.repository |
spark.kubernetes.executor.container.image | fusion-dev-docker.ci-artifactory.lucidworks.com | image.repository |
Spark operations how-tos
These topics provide how-tos for Spark operations:- Configure Spark Job Resource Allocation
- Configure Spark Jobs to Access Cloud Storage
- Get Logs for a Spark Job
- Clean Up Spark Driver Pods
- Install the Spark History Server
- Configure the Spark History Server
- Access the Spark History Server
Configure Spark Job Resource Allocation
Configure Spark Job Resource Allocation
Number of instances and cores allocated
To set the number of cores allocated for a job, add the following parameter keys and values in the Spark Settings field. This is done within the “advanced” job properties in the Fusion UI or thesparkConfig
object, if defining a job via the Fusion API.Parameter Key | Example Value |
---|---|
spark.executor.instances | 3 |
spark.kubernetes.executor.request.cores | 3 |
spark.executor.cores | 6 |
spark.driver.cores | 1 |
spark.kubernetes.executor.request.cores
is unset, the default configuration, Spark sets the number of CPUs for the executor pod to be the same number as spark.executor.cores
. For example, if spark.executor.cores
is 3
, Spark allocates 3 CPUs for the executor pod and runs 3 tasks in parallel. To under-allocate the CPU for the executor pod and still run multiple tasks in parallel, set spark.kubernetes.executor.request.cores
to a lower value than spark.executor.cores
.The ratio for spark.kubernetes.executor.request.cores
to spark.executor.cores
depends on the type of job: either CPU-bound or I/O-bound. Allocate more memory to the executor if more tasks are running in parallel on a single executor pod.If these settings not specified, the job launches with a driver using one core and 3GB of memory plus two executors, each using one core with 1GB of memory.Memory allocation
The amount of memory allocated to the driver and executors is controlled on a per-job basis using thespark.executor.memory
and spark.driver.memory
parameters in the Spark Settings section of the job definition. This is found in the Fusion UI or within the sparkConfig
object in the JSON definition of the job.Parameter Key | Example Value |
---|---|
spark.executor.memory | 6g |
spark.driver.memory | 2g |
Configure Spark Jobs to Access Cloud Storage
Configure Spark Jobs to Access Cloud Storage
Supported jobs
This procedure applies to Spark-based jobs:- ALS Recommender
- Cluster Labeling
- Co-occurrence Similarity
- Collection Analysis
- Create Seldon Core Model Deployment Job
- Delete Seldon Core Model Deployment Job
- Document Clustering
- Ground Truth
- Head/Tail Analysis
- Item Similarity Recommender
- Legacy Item Recommender
- Legacy Item Similarity
- Levenshtein Spell Checking
- Logistic Regression Classifier Training
- Matrix Decomposition-Based Query-Query Similarity
- Outlier Detection
- Parallel Bulk Loader
- Parameterized SQL Aggregation
- Phrase Extraction
- Query-to-Query Session-Based Similarity
- Query-to-Query Similarity
- Random Forest Classifier Training
- Ranking Metrics
- SQL Aggregation
- SQL-Based Experiment Metric (deprecated)
- Statistically Interesting Phrases
- Synonym Detection Jobs
- Synonym and Similar Queries Detection Jobs
- Token and Phrase Spell Correction
- Word2Vec Model Training
Configuring credentials for Spark jobs
GCS
The examples in this subsection use placeholder values. See the table below for descriptions of the placeholders:Placeholder | Description |
---|---|
<key name> | Name of the Solr GCS service account key. |
<key file path> | Path to the Solr GCS service account key. |
-
Create a secret containing the credentials JSON file:
For more information, see Creating and managing service account keys. The topic is used to generate your organization’s GOOGLE_APPLICATION_CREDENTIALS, which are needed to create an extra config map.
-
Create an extra config map in Kubernetes setting the required properties for GCP.
-
Create a properties file with GCP properties:
-
Create a config map based on the properties file:
-
Create a properties file with GCP properties:
-
Add the
gcp-launcher
config map tovalues.yaml
underjob-launcher
:
AWS S3
AWS credentials can’t be set with a single file. Instead, set two environment variables referring to the key and secret using the instructions below:-
Create a secret pointing to the credentials:
-
Create an extra config map in Kubernetes setting the required properties for AWS:
-
Create a properties file with AWS properties:
-
Create a config map based on the properties file:
-
Create a properties file with AWS properties:
-
Add the
aws-launcher
config map tovalues.yaml
underjob-launcher
:
Azure Data Lake
Configuring Azure through environment variables orconfigMaps
isn’t possible yet. Instead, manually upload the core-site.xml
file into the job-launcher
pod at /app/spark-dist/conf
. See below for an example core-site.xml
file:At this time, only Data Lake Gen 1 is supported.
Configuring credentials per job
- Create a Kubernetes secret with the GCP/AWS credentials.
- Add the Spark configuration to configure the secrets for the Spark driver/executor.
GCS
The examples in this subsection use placeholder values. See the table below for descriptions of the placeholders:Placeholder | Description |
---|---|
<key name> | Name of the Solr GCS service account key. |
<key file path> | Path to the Solr GCS service account key. |
-
Create a secret containing the credentials JSON file.
See Creating and managing service account keys for more details.
-
Toggle the Advanced configuration in the job UI, and add the following to the Spark configuration:
AWS S3
AWS credentials can’t be set with a single file. Instead, set two environment variables referring to the key and secret using the instructions below:-
Create a secret pointing to the credentials:
-
Toggle the Advanced configuration in the job UI, and add the following to the Spark configuration:
Get Logs for a Spark Job
Get Logs for a Spark Job
See the table below for useful commands related to Spark jobs:
You can then access the Spark UI on
Description | Command |
---|---|
Retrieve the initial logs that contain information about the pod spin up. | curl -X GET -u USERNAME:PASSWORD http://FUSION_HOST:FUSION_PORT/api/spark/driver/log/JOB_ID |
Retrieve the pod ID. | k get pods -l spark-role=driver -l jobConfigId=JOB_ID |
Retrieve logs from failed jobs. | kubectl logs DRIVER_POD_NAME |
Tail logs from running containers by using the -f parameter. | kubectl logs -f POD_NAME |
Spark deletes failed and successful executor pods. Fusion provides a cleanup Kubernetes cron job that removes successfully completed driver pods every 15 minutes.
Viewing the Spark UI
In the event that you need to monitor or inspect your Spark job executions, you can use port forwarding to access the Spark UI in your web browser. Port forwarding forwards your local port connection to the port of the pod that is running the Spark driver.To view the Spark UI, find the pod that is running the Spark driver and run the following command:localhost:4040
Clean Up Spark Driver Pods
Clean Up Spark Driver Pods
Spark driver pods are cleaned up using a Kubernetes cron job that runs every 15 minutes. This cron job is created automatically when the
job-launcher
microservice is installed in the Fusion cluster.To clean up pods manually, run this command:Install the Spark History Server
Install the Spark History Server
While logs from the Spark driver and executor pods can be viewed using
kubectl logs [POD_NAME]
, executor pods are deleted at their end of their execution, and driver pods are deleted by Fusion on a default schedule of every hour. In order to preserve and view Spark logs, install the Spark History Server into your Kubernetes cluster and configure Spark to write logs in a manner that suits your needs.Spark History Server can be installed via its publicly available Helm chart. To do this, create a values.yaml
file to configure it:Configure the Spark History Server
Configure the Spark History Server
Recommended configuration
For Fusion, configure the Spark History Server to store and read Spark logs in cloud storage. For installations on Google Kubernetes Engine, set these keys in thevalues.yaml
file:- The
key
andsecret
fields provide the Spark History Server with the details of where it can find an account with access to the Google Cloud Storage bucket given inlogDirectory
. Later examples show how to set up a new service account that’s shared between the Spark History Server and the Spark driver/executors for both viewing and writing logs. - By default, the Spark History Server Helm chart creates an external LoadBalancer, exposing it to outside access. In this example, the
service
key overrides the default. The Spark History Server is set up on an internal IP within your cluster only and is not exposed externally. Later examples show how to access the Spark History Server. - The
nfs.enableExampleNFS
option turns off the unneeded default NFS server set up by the Spark History Server.
-
Use
gcloud
to create a new service account:If you have an existing service account you wish to use instead, you can skip thecreate
command, though you will still need to create the JSON key pair and ensure that the existing account can read and write to the log bucket. -
Use
keys create
to create a JSON key pair, and upload it to your cluster as a Kubernetes secret: -
Give the service account the
storage/admin
role, allowing it to perform “create” and “view” operations: -
Run the
gsutil
command to apply the service account to your chosen bucket. -
Upload the JSON key pair into the cluster as a secret:
helm install [namespace]-spark-history-server stable/spark-history-server --values values.yaml
.Other configurations
Azure
The Azure configuration process is similar Google Kubernetes Engine. However, logs are stored in Azure Blob Storage, and you can use SAS token or key access.- This line is used to authenticate with a SAS token. Replace the line with
echo "your-azure-storage-account-key" >> azure-storage-account-key
to use a storage account key instead.
values.yaml
file resembles the following:values.yaml
file resembles the following:Amazon Web Services
In AWS, you can use IAM roles or an access/secret key pair. The use of AWS IAM roles is preferred over using an access/secret key pair, but both options are described.values.yaml
file resembles the following:The
values.yaml
file uses the Hadoop s3a://
link instead of s3://
.Configuring Spark
After starting the Spark History Server, update the config map for Fusion’s job-launcher service so it can write logs to the same location that Spark History Server is reading from.In this example, Fusion is installed into a namespace calledsparkhistory
.-
Before editing the config map, make a copy of the existing settings in case you need to revert the changes.
-
Edit the config map to write the logs to the same Google Cloud Storage bucket we configured the Spark History Server to read from.
-
Update the
spark
key with the new YAML settings below:The YAML settings inform Spark of the location of the secret and the settings that specify the location of the SparkeventLog
. The settings also inform Spark how to access GCS with thespark.hadoop.fs.AbstractFileSystem.gs.impl
andspark.hadoop.fs.gs.impl
keys. -
Delete the
job-launcher
pod. The newjob-launcher
pod will apply the new configuration to later jobs.
Access the Spark History Server
Access the Spark History Server
Currently, the Spark History Server has only set up a ClusterIP. To expand access, port forward the server using You can now access the Spark History Server at
kubectl
:http://localhost:18080
. Run a Spark job and confirm that you can see the logs appear in the UI.