Skip to main content
In Fusion 5.x, Spark operates in native Kubernetes mode, rather than a standalone mode. This topic describes Spark operations in Fusion 5.x.

Node Selectors

You can control which nodes Spark executors are scheduled on using a Spark configuration property for a job:
spark.kubernetes.node.selector.<LABEL>=<LABEL_VALUE>
Use the LABEL specified for the node, and the name of the node as the LABEL_VALUE. For example, if a node is labeled with fusion_node_type=spark_only, schedule Spark executor pods to run on that node using:
spark.kubernetes.node.selector.fusion_node_type=spark_only
In Fusion 5.5, Spark version 2.4.x does not support tolerations for Spark pods. As a result, Spark pods can’t be scheduled on any nodes with taints in Fusion 5.5.

Cluster mode

Fusion 5 ships with Spark and operates in “cluster mode” on top of Kubernetes. In cluster mode, each Spark driver runs in a separate pod, and resources can be managed per job. Each executor also runs in its own pod.

Spark config defaults

The table below shows the default configurations for Spark. These settings are configured in the job-launcher config map, accessible using kubectl get configmaps <release-name>-job-launcher. Some of these settings are also configurable via Helm. Spark Resource Configurations
Spark ConfigurationDefault valueHelm Variable
spark.driver.memory3g
spark.executor.instances2executorInstances
spark.executor.memory3g
spark.executor.cores6
spark.kubernetes.executor.request.cores3
spark.sql.caseSensitivetrue
Spark Kubernetes Configurations
Spark ConfigurationDefault valueHelm Variable
spark.kubernetes.container.image.pullPolicyAlwaysimage.imagePullPolicy
spark.kubernetes.container.image.pullSecretsimage.imagePullSecrets
spark.kubernetes.authenticate.driver.serviceAccountName<name>-job-launcher-spark
spark.kubernetes.driver.container.imagefusion-dev-docker.ci-artifactory.lucidworks.comimage.repository
spark.kubernetes.executor.container.imagefusion-dev-docker.ci-artifactory.lucidworks.comimage.repository

Spark operations how-tos

These topics provide how-tos for Spark operations:

Number of instances and cores allocated

To set the number of cores allocated for a job, add the following parameter keys and values in the Spark Settings field. This is done within the “advanced” job properties in the Fusion UI or the sparkConfig object, if defining a job via the Fusion API.
Parameter KeyExample Value
spark.executor.instances3
spark.kubernetes.executor.request.cores3
spark.executor.cores6
spark.driver.cores1
If spark.kubernetes.executor.request.cores is unset, the default configuration, Spark sets the number of CPUs for the executor pod to be the same number as spark.executor.cores. For example, if spark.executor.cores is 3, Spark allocates 3 CPUs for the executor pod and runs 3 tasks in parallel. To under-allocate the CPU for the executor pod and still run multiple tasks in parallel, set spark.kubernetes.executor.request.cores to a lower value than spark.executor.cores.The ratio for spark.kubernetes.executor.request.cores to spark.executor.cores depends on the type of job: either CPU-bound or I/O-bound. Allocate more memory to the executor if more tasks are running in parallel on a single executor pod.If these settings not specified, the job launches with a driver using one core and 3GB of memory plus two executors, each using one core with 1GB of memory.

Memory allocation

The amount of memory allocated to the driver and executors is controlled on a per-job basis using the spark.executor.memory and spark.driver.memory parameters in the Spark Settings section of the job definition. This is found in the Fusion UI or within the sparkConfig object in the JSON definition of the job.
Parameter KeyExample Value
spark.executor.memory6g
spark.driver.memory2g

Supported jobs

This procedure applies to Spark-based jobs:
  • ALS Recommender
  • Cluster Labeling
  • Co-occurrence Similarity
  • Collection Analysis
  • Create Seldon Core Model Deployment Job
  • Delete Seldon Core Model Deployment Job
  • Document Clustering
  • Ground Truth
  • Head/Tail Analysis
  • Item Similarity Recommender
  • Legacy Item Recommender
  • Legacy Item Similarity
  • Levenshtein Spell Checking
  • Logistic Regression Classifier Training
  • Matrix Decomposition-Based Query-Query Similarity
  • Outlier Detection
  • Parallel Bulk Loader
  • Parameterized SQL Aggregation
  • Phrase Extraction
  • Query-to-Query Session-Based Similarity
  • Query-to-Query Similarity
  • Random Forest Classifier Training
  • Ranking Metrics
  • SQL Aggregation
  • SQL-Based Experiment Metric (deprecated)
  • Statistically Interesting Phrases
  • Synonym Detection Jobs
  • Synonym and Similar Queries Detection Jobs
  • Token and Phrase Spell Correction
  • Word2Vec Model Training
Amazon Web Services (AWS) and Google Cloud Storage (GCS) credentials can be configured per job or per cluster.

Configuring credentials for Spark jobs

GCS
The examples in this subsection use placeholder values. See the table below for descriptions of the placeholders:
PlaceholderDescription
<key name> Name of the Solr GCS service account key.
<key file path> Path to the Solr GCS service account key.
  1. Create a secret containing the credentials JSON file:
    kubectl create secret generic <key name> --from-file=/<key file path>/<key name>.json
    
    For more information, see Creating and managing service account keys. The topic is used to generate your organization’s GOOGLE_APPLICATION_CREDENTIALS, which are needed to create an extra config map.
  2. Create an extra config map in Kubernetes setting the required properties for GCP.
    1. Create a properties file with GCP properties:
      cat gcp-launcher.properties
      spark.kubernetes.driverEnv.GOOGLE_APPLICATION_CREDENTIALS = /mnt/gcp-secrets/<key name>.json
      spark.kubernetes.driver.secrets.<key name> = /mnt/gcp-secrets
      spark.kubernetes.executor.secrets.<key name> = /mnt/gcp-secrets
      spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS = /mnt/gcp-secrets/<key name>.json
      spark.hadoop.google.cloud.auth.service.account.json.keyfile = /mnt/gcp-secrets/<key name>.json
      
    2. Create a config map based on the properties file:
      kubectl create configmap gcp-launcher --from-file=gcp-launcher.properties
      
  3. Add the gcp-launcher config map to values.yaml under job-launcher:
    configSources:
     - gcp-launcher
    
AWS S3
AWS credentials can’t be set with a single file. Instead, set two environment variables referring to the key and secret using the instructions below:
  1. Create a secret pointing to the credentials:
    kubectl create secret generic aws-secret --from-literal=key='<access key>' --from-literal=secret='<secret key>'
    
  2. Create an extra config map in Kubernetes setting the required properties for AWS:
    1. Create a properties file with AWS properties:
      cat aws-launcher.properties
      spark.kubernetes.driver.secretKeyRef.AWS_ACCESS_KEY_ID=aws-secret:key
      spark.kubernetes.driver.secretKeyRef.AWS_SECRET_ACCESS_KEY=aws-secret:secret
      spark.kubernetes.executor.secretKeyRef.AWS_ACCESS_KEY_ID=aws-secret:key
      spark.kubernetes.executor.secretKeyRef.AWS_SECRET_ACCESS_KEY=aws-secret:secret
      
    2. Create a config map based on the properties file:
      kubectl create configmap aws-launcher --from-file=aws-launcher.properties
      
  3. Add the aws-launcher config map to values.yaml under job-launcher:
    configSources:
     - aws-launcher
    
Azure Data Lake
Configuring Azure through environment variables or configMaps isn’t possible yet. Instead, manually upload the core-site.xml file into the job-launcher pod at /app/spark-dist/conf. See below for an example core-site.xml file:
<property>
  <name>dfs.adls.oauth2.access.token.provider.type</name>
  <value>ClientCredential</value>
</property>
<property>
    <name>dfs.adls.oauth2.refresh.url</name>
    <value> Insert Your OAuth 2.0 Endpoint URL Value Here </value>
</property>
<property>
    <name>dfs.adls.oauth2.client.id</name>
    <value> Insert Your Application ID Here </value>
</property>
<property>
    <name>dfs.adls.oauth2.credential</name>
    <value>Insert the Secret Key Value Here </value>
</property>
<property>
    <name>fs.adl.impl</name>
    <value>org.apache.hadoop.fs.adl.AdlFileSystem</value>
</property>
<property>
    <name>fs.AbstractFileSystem.adl.impl</name>
    <value>org.apache.hadoop.fs.adl.Adl</value>
</property>
At this time, only Data Lake Gen 1 is supported.

Configuring credentials per job

  1. Create a Kubernetes secret with the GCP/AWS credentials.
  2. Add the Spark configuration to configure the secrets for the Spark driver/executor.
GCS
The examples in this subsection use placeholder values. See the table below for descriptions of the placeholders:
PlaceholderDescription
<key name>Name of the Solr GCS service account key.
<key file path> Path to the Solr GCS service account key.
  1. Create a secret containing the credentials JSON file.
    kubectl create secret generic <key name> --from-file=/<key file path>/<key name>.json
    
    See Creating and managing service account keys for more details.
  2. Toggle the Advanced configuration in the job UI, and add the following to the Spark configuration:
    spark.kubernetes.driver.secrets.<key name> = /mnt/gcp-secrets
    spark.kubernetes.executor.secrets.<key name> = /mnt/gcp-secrets
    spark.kubernetes.driverEnv.GOOGLE_APPLICATION_CREDENTIALS = /mnt/gcp-secrets/<key name>.json
    spark.executorEnv.GOOGLE_APPLICATION_CREDENTIALS = /mnt/gcp-secrets/<key name>.json
    spark.hadoop.google.cloud.auth.service.account.json.keyfile = /mnt/gcp-secrets/<key name>.json
    
AWS S3
AWS credentials can’t be set with a single file. Instead, set two environment variables referring to the key and secret using the instructions below:
  1. Create a secret pointing to the credentials:
    kubectl create secret generic aws-secret --from-literal=key='<access key>' --from-literal=secret='<secret key>'
    
  2. Toggle the Advanced configuration in the job UI, and add the following to the Spark configuration:
    spark.kubernetes.driver.secretKeyRef.AWS_ACCESS_KEY_ID=aws-secret:key
    spark.kubernetes.driver.secretKeyRef.AWS_SECRET_ACCESS_KEY=aws-secret:secret
    spark.kubernetes.executor.secretKeyRef.AWS_ACCESS_KEY_ID=aws-secret:key
    spark.kubernetes.executor.secretKeyRef.AWS_SECRET_ACCESS_KEY=aws-secret:secret
    
See the table below for useful commands related to Spark jobs:
DescriptionCommand
Retrieve the initial logs that contain information about the pod spin up.curl -X GET -u USERNAME:PASSWORD http://FUSION_HOST:FUSION_PORT/api/spark/driver/log/JOB_ID
Retrieve the pod ID.k get pods -l spark-role=driver -l jobConfigId=JOB_ID
Retrieve logs from failed jobs.kubectl logs DRIVER_POD_NAME
Tail logs from running containers by using the -f parameter.kubectl logs -f POD_NAME
Spark deletes failed and successful executor pods. Fusion provides a cleanup Kubernetes cron job that removes successfully completed driver pods every 15 minutes.

Viewing the Spark UI

In the event that you need to monitor or inspect your Spark job executions, you can use port forwarding to access the Spark UI in your web browser. Port forwarding forwards your local port connection to the port of the pod that is running the Spark driver.To view the Spark UI, find the pod that is running the Spark driver and run the following command:
kubectl -n namespace port-forward driver-pod 4040:4040
You can then access the Spark UI on localhost:4040
Spark driver pods are cleaned up using a Kubernetes cron job that runs every 15 minutes. This cron job is created automatically when the job-launcher microservice is installed in the Fusion cluster.To clean up pods manually, run this command:
kubectl delete pods --namespace default --field-selector=status.phase=Succeeded -l spark-role=driver
While logs from the Spark driver and executor pods can be viewed using kubectl logs [POD_NAME], executor pods are deleted at their end of their execution, and driver pods are deleted by Fusion on a default schedule of every hour. In order to preserve and view Spark logs, install the Spark History Server into your Kubernetes cluster and configure Spark to write logs in a manner that suits your needs.Spark History Server can be installed via its publicly available Helm chart. To do this, create a values.yaml file to configure it:
helm install [release-name] [chart] --namespace [fusion-namespace] --values [spark-history-values-yaml]
For Fusion, configure the Spark History Server to store and read Spark logs in cloud storage. For installations on Google Kubernetes Engine, set these keys in the values.yaml file:
gcs:
  enableGCS: true
  secret: history-secrets ①
  key: sparkhistory.json ①
  logDirectory: gs://[BUCKET_NAME]
service: 
  type: ClusterIP
  port:
     number: 18080

pvc:
  enablePVC: false
nfs:
  enableExampleNFS: false ③
  1. The key and secret fields provide the Spark History Server with the details of where it can find an account with access to the Google Cloud Storage bucket given in logDirectory. Later examples show how to set up a new service account that’s shared between the Spark History Server and the Spark driver/executors for both viewing and writing logs.
  2. By default, the Spark History Server Helm chart creates an external LoadBalancer, exposing it to outside access. In this example, the service key overrides the default. The Spark History Server is set up on an internal IP within your cluster only and is not exposed externally. Later examples show how to access the Spark History Server.
  3. The nfs.enableExampleNFS option turns off the unneeded default NFS server set up by the Spark History Server.
To give the Spark History Server access to the Google Cloud Storage bucket where the logs are kept:
  1. Use gcloud to create a new service account:
    export ACCOUNT_NAME=sparkhistory
    export GCP_PROJECT_ID=[PROJECT_ID]
    gcloud iam service-accounts create ${ACCOUNT_NAME} --display-name "${ACCOUNT_NAME}"
    
    If you have an existing service account you wish to use instead, you can skip the create command, though you will still need to create the JSON key pair and ensure that the existing account can read and write to the log bucket.
  2. Use keys create to create a JSON key pair, and upload it to your cluster as a Kubernetes secret:
    gcloud iam service-accounts keys create "${ACCOUNT_NAME}.json" --iam-account "${
    ACCOUNT_NAME}@${GCP_PROJECT_ID}.iam.gserviceaccount.com"
    
  3. Give the service account the storage/admin role, allowing it to perform “create” and “view” operations:
    gcloud projects add-iam-policy-binding ${GCP_PROJECT_ID} --member "serviceAccount:${ACCOUNT_NAME}@${GCP_PROJECT_ID}.iam.gserviceaccount.com" --role roles/storage.admin
    
  4. Run the gsutil command to apply the service account to your chosen bucket.
    gsutil iam ch serviceAccount:${ACCOUNT_NAME}@${GCP_PROJECT_ID}.iam.gserviceaccount.com:objectAdmin gs://[BUCKET_NAME]
    
  5. Upload the JSON key pair into the cluster as a secret:
    kubectl -n [NAMESPACE] create secret generic history-secrets --from-file=sparkhistory.json
    
The Spark History Server can now be installed with helm install [namespace]-spark-history-server stable/spark-history-server --values values.yaml.

Other configurations

Azure
The Azure configuration process is similar Google Kubernetes Engine. However, logs are stored in Azure Blob Storage, and you can use SAS token or key access.
echo "your-storage-account-name" >> azure-storage-account-name
echo "your-container-name" >> azure-blob-container-name
echo "your-azure-blob-sas-key" >> azure-blob-sas-key
kubectl create secret generic azure-secrets --from-file=azure-storage-account-name --from-file=azure-blob-container-name [--from-file=azure-blob-sas-key | --from-file=azure-storage-account-key]
  1. This line is used to authenticate with a SAS token. Replace the line with echo "your-azure-storage-account-key" >> azure-storage-account-key to use a storage account key instead.
To use SAS token access, the values.yaml file resembles the following:
wasbs:
  enableWASBS: true
  secret: azure-secrets
  sasKeyName: azure-blob-sas-key
  storageAccountNameKeyName: azure-storage-account-name
  containerKeyName: azure-blob-container-name
  logDirectory: [BUCKET_NAME]
For non-SAS access, the values.yaml file resembles the following:
wasbs:
  enableWASBS: true
  secret: azure-secrets
  sasKeyMode: false
  storageAccountKeyName: azure-storage-account-key
  storageAccountNameKeyName: azure-storage-account-name
  containerKeyName:  azure-blob-container-name
  logDirectory: [BUCKET_NAME]
Amazon Web Services
In AWS, you can use IAM roles or an access/secret key pair. The use of AWS IAM roles is preferred over using an access/secret key pair, but both options are described.
aws iam list-access-keys --user-name your-user-name --output text | awk '{print $2}' >> aws-access-key
echo "your-aws-secret-key" >> aws-secret-key
kubectl create secret generic aws-secrets --from-file=aws-access-key --from-file=aws-secret-key
For IAM, the values.yaml file resembles the following:
s3:
  enableS3: true
  logDirectory: s3a://[BUCKET_NAME]
The values.yaml file uses the Hadoop s3a:// link instead of s3://.
For an access/secret pair, add the secret:
s3:
  enableS3: true
  enableIAM: false
  accessKeyName: aws-access-key
  secretKeyName: aws-secret-key
  logDirectory: s3a://[BUCKET_NAME]

Configuring Spark

After starting the Spark History Server, update the config map for Fusion’s job-launcher service so it can write logs to the same location that Spark History Server is reading from.In this example, Fusion is installed into a namespace called sparkhistory.
  1. Before editing the config map, make a copy of the existing settings in case you need to revert the changes.
    kubectl get cm -n [NAMESPACE] sparkhistory-job-launcher -o yaml > sparkhistory-job-launcher.yaml
    
  2. Edit the config map to write the logs to the same Google Cloud Storage bucket we configured the Spark History Server to read from.
    kubectl edit cm -n [NAMESPACE] sparkhistory-job-launcher
    
  3. Update the spark key with the new YAML settings below:
    spark:
      hadoop:
        fs:
          AbstractFileSystem:
            gs:
              impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS
          gs:
            impl: com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem
        google:
          cloud:
            auth:
              service:
                account:
                  json:
                    keyfile: /etc/history-secrets/[ACCOUNT_NAME].json
      eventLog:
        enabled: true
        compress: true
        dir: gs://[BUCKET_NAME]
    
      kubernetes:
        driver:
          secrets:
            history-secrets: /etc/history-secrets
          container:
    
        executor:
          secrets:
            history-secrets: /etc/history-secrets
          container:
    
    
    
    
    The YAML settings inform Spark of the location of the secret and the settings that specify the location of the Spark eventLog. The settings also inform Spark how to access GCS with the spark.hadoop.fs.AbstractFileSystem.gs.impl and spark.hadoop.fs.gs.impl keys.
  4. Delete the job-launcher pod. The new job-launcher pod will apply the new configuration to later jobs.
Currently, the Spark History Server has only set up a ClusterIP. To expand access, port forward the server using kubectl:
kubectl get pods -n [NAMESPACE] # to find the Spark History Server pod
kubectl port-forward [POD_NAME] -n [NAMESPACE] 18080:18080
You can now access the Spark History Server at http://localhost:18080. Run a Spark job and confirm that you can see the logs appear in the UI.
I