Product Selector

Fusion 5.9
    Fusion 5.9

    Spark Jobs APIManaged Fusion Job REST Server APIs

    The Sparks Job API is a set of endpoints for configuring and running Spark jobs.

    For more information, view the API specification.

    Spark job subtypes

    For the Spark job type, the available subtypes are listed below.

    • SQL Aggregation job

      A Spark SQL aggregation job where user-defined parameters are injected into a built-in SQL template at runtime.

    • Custom Python job

      The Custom Python job provides user the ability to run Python code via Fusion. This job supports Python 3.6+ code.

    • Script

      This job lets you run a custom Scala script in Fusion.

    To create a Script job, sign in to Fusion and click Collections > Jobs. Then click Add+ and in the Custom and Others Jobs section, select Script. You can enter basic and advanced parameters to configure the job. If the field has a default value, it is populated when you click to add the job.

    Basic parameters

    To enter advanced parameters in the UI, click Advanced. Those parameters are described in the advanced parameters section.
    • Spark job ID. The unique ID for the Spark job that references this job in the API. This is the id field in the configuration file. Required field.

    • Scala script. The Scala script to be executed in Fusion as a Spark job. This is the script field in the configuration file.

    Advanced parameters

    If you click the Advanced toggle, the following optional fields are displayed in the UI.

    • Spark Settings. This section lets you enter parameter name:parameter value options to use for Spark configuration. This is the sparkConfig field in the configuration file.

    • Spark shell options. This section lets you enter parameter name:parameter value options to send to the Spark shell when the job is run. This is the shellOptions field in the configuration file.

    • Interpreter params. This section lets you enter parameter name:parameter value options to bind the key:value pairs to the Scala interpreter. This is the interpreterParams field in the configuration file.

    See Additional Spark jobs for more information.

    Spark Configuration Properties

    Managed Fusion passes all configuration properties with prefix "spark." to the Spark master, Spark worker and each Spark application, both for aggregation jobs and custom-scripted processing.

    These properties are stored in Managed Fusion’s ZooKeeper and can be updated via requests to Managed Fusion endpoint api/configurations which will update the stored value without restarting the service, therefore existing jobs and SparkContexts will not be affected. The Managed Fusion endpoint api/configurations returns all configured properties for that installation. You can examine spark default configurations in a Unix shell using the utilities curl and grep. Here is an example that checks a local Managed Fusion installation running on port FUSION_PORT:

    curl -u USERNAME:PASSWORD https://EXAMPLE_COMPANY.b.lucidworks.cloud/api/configurations | grep '"spark.'
    
      "spark.executor.memory" : "2g",
      "spark.task.maxFailures" : "10",
      "spark.worker.cleanup.appDataTtl" : "7200",
      "spark.worker.cleanup.enabled" : "true",
      "spark.worker.memory" : "2g",

    The default SparkContext that Managed Fusion uses for aggregation jobs can be assigned a fraction of cluster resources (executor memory and/or available CPU cores). This allows other applications (such as scripted jobs, or shell sessions) to use the remaining cluster resources even when some aggregation jobs are running. Managed Fusion also supports dynamic allocation for all applications. This can be overridden per application. In practice, this means that even when there is an already running SparkContext with a relatively long idle time (eg. 10 minutes) but there are no active jobs that use it, its resources (CPU cores and executor memory) will be released for use by other applications.

    For scripted Spark jobs, users can specify per-job configuration overrides as a set of key / value pairs in a "sparkConfig" property element of a script job configuration, which takes precedence over values stored in ZooKeeper. The following is an example of a scripted job with a "sparkConfig" section:

    {
      "id": "scripted_job_example",
      "script": "val rdd = sc.textFile(\"/foo.txt\")\nrdd.count\n",
      "sparkConfig": {
        "spark.cores.max": 2,
        "spark.executor.memory": "1g"
      }
    }

    The following table lists those Spark configuration properties that Managed Fusion overrides or uses in order to determine applications' resource allocations.

    Property Description

    spark.master.url

    By default, left unset. This property is only specified when using an external Spark cluster; when Managed Fusion is using its own standalone Spark cluster, this property is not set.

    spark.cores.max

    The maximum number of cores across the cluster assigned to the application. If not specified, there is no limit. The default is unset, i.e., an unlimited number of cores.

    spark.executor.memory

    Amount of memory assigned to each application’s executor. The default is 2G.

    spark.scheduler.mode

    Controls how tasks are assigned to available resources. Can be either 'FIFO' or 'FAIR'. Default value is 'FAIR'.

    spark.dynamicAllocation.enabled

    Boolean - whether or not to enable dynamic allocation of executors. Default value is 'TRUE'.

    spark.shuffle.service.enabled

    Boolean - whether or not to enable internal shuffle service for standalone Spark cluster. Default value is 'TRUE'.

    spark.dynamicAllocation.executorIdleTimeout

    Number of seconds after which idle executors are removed. Default value is '60s'.

    spark.dynamicAllocation.minExecutors

    Number of executors to leave running even when idle. Default value is 0.

    spark.eventLog.enabled

    Boolean - whether or not event log is enabled. Event log stores job details and can be accessed after application finishes. Default value is 'TRUE'.

    spark.eventLog.dir

    Directory that stores event logs. Default location is $FUSION_HOME/var/spark-eventlog.

    spark.eventLog.compress

    Boolean - whether or not to compress event log data. Default value is 'TRUE'.

    spark.logConf

    Boolean - whether or not to log effective SparkConf of new SparkContext-s. Default value is 'TRUE'.

    spark.deploy.recoveryMode

    Default value is 'ZOOKEEPER'

    spark.deploy.zookeeper.url

    ZooKeeper connect string. Default value is $FUSION_ZK

    spark.deploy.zookeeper.dir

    ZooKeeper path, default value is /lucid/spark

    spark.worker.cleanup.enabled

    Boolean - whether or not to periodically cleanup worker data. Default value is 'TRUE'.

    spark.worker.cleanup.appDataTtl

    Time-to-live in seconds. Default value is 86400 (24h).

    spark.deploy.retainedApplications

    The maximum number of applications to show in the UI. Default value is 50.

    spark.deploy.retainedDrivers

    The maximum number of drivers. Default value is 50.

    spark.worker.timeout

    The maximum timeout in seconds allowed before a worker is considered lost. The default value is 30.

    spark.worker.memory

    The maximum total heap allocated to all executors running on this worker. Defaults to value of the executor memory heap.

    Managed Fusion Configuration Properties

    Property Description

    fusion.spark.master.port

    Spark master job submission port. Default value is 8766.

    fusion.spark.master.ui.port

    Spark master web UI port. Default value is 8767.

    fusion.spark.idleTime

    Maximum idle time in seconds, after which the application (ie. SparkContext) is shut down. Default value is 300.

    fusion.spark.executor.memory.min

    Minimum executor memory in MB. Default value 450Mb, which is sufficient to let Managed Fusion components in application task’s to initialize themselves

    fusion.spark.executor.memory.fraction

    A float number in range (0.0, 1.0] indicating what portion of spark.executor.memory to allocate to this application. Default value is 1.0.

    fusion.spark.cores.fraction

    A float number in range (0.0, 1.0] indicating what portion of spark.cores.max to allocate to this application. Default value is 1.0.