Spark Jobs API

This is a set of endpoints for configuring and running Spark jobs.

Job Types and Configurations

Job configurations are defined and stored using JSON. The required configuration information depends on the job type.

aggregation

This job type is supported by /spark/jobs API, and can be defined and managed using /spark/configurations API, but the dedicated /aggregator/jobs and /aggregator/aggregations API should be used in most cases because it provides additional support for managing jobs per collection, passing specific job parameters from requests, stricter parameter validation, etc.

script

This job type supports the execution of arbitrary Scala scripts. Results of computation can be returned to the caller as a part of job status, by assigning the result in the script to a variable called result.

Example configuration:

{
  "id": "scripted-1",
  "type": "script",
  "maxRows": 100,
  "script": "var rdd = sc.textFile(\"hdfs://localhost:9000/data/aol.json.gz\")\nrdd.count\nval result = rdd\n"
}

Example output:

{
  "state": "finished",
  "jobId": "d42260a6T153ebaed126",
  "jobConfig": {
    "id": "scripted-1",
    "script": "var rdd = sc.textFile(\"hdfs://localhost:9000/data/aol.json.gz\")\nrdd.count\nval result = rdd\n",
    "maxRows": 100,
    "type": "script",
    "sync": "true",
    "submitted": 1459948015910
  },
  "result": {
    "result": [
      {
        "_": "{\"user_id_s\":\"142\",\"query_s\":\"westchester.gov\",\"timestamp_dt\":\"2006-03-20T03:55:57.000Z\",\"params.position_s\":\"1\",\"doc_id_s\":\"http://www.westchestergov.com\"}"
      },
      {
        "_": "{\"user_id_s\":\"142\",\"query_s\":\"207 ad2d 530\",\"timestamp_dt\":\"2006-04-08T01:31:14.000Z\",\"params.position_s\":\"1\",\"doc_id_s\":\"http://www.courts.state.ny.us\"}"
      },
        ...
    ],
    "state": "finished",
    "resultCode": "SUCCESS",
    "output": "rdd: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/data/aol.json.gz MapPartitionsRDD[1] at textFile at <console>:17\nres0: Long = 99999\nresult: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/data/aol.json.gz MapPartitionsRDD[1] at textFile at <console>:17\n"
  },
    ...
}

(Note: the entries in the "result" part of the job output are simply the content of RDD[String] where lines are flat Strings and not a real JSON. Users can use sqlContext.read.json(…​) to read this file as a DataFrame).

wordcount

This is a simple version of the Unix wc utility implemented using Spark. It can open a text file defined as any data source supported by Spark DataFrameReader, and returns three numbers: line count, word count, and character count. Words are delimited by whitespace, and character count doesn’t include the end-of-line characters.

Please note that this implementation uses Spark accumulators. When accumulators are used in transformations - as opposed to actions - they may count multiple times if some tasks should fail. As a consequence the result is not as reliable, but it’s very inexpensive to calculate.

Example configuration:

{
  "id": "wordcount-1",
  "type": "wordcount",
  "file": "hdfs://localhost:9000/data/aol.json.gz"
}

Example output:

{
  "state": "finished",
  "jobId": "aadddc12T153e6b822d8",
  "jobConfig": {
    "file": "hdfs://localhost:9000/data/aol.json.gz",
    "id": "wordcount-1",
    "type": "wordcount",
    "sync": "true",
    "submitted": 1459864740568
  },
  "result": {
  "state": "finished",
  "data": [
    99999,
    282865,
    15888690
  ]
  },
    ...
}

item-similarity

Example configuration:

{
  "id": "item-similarity-1",
  "inputProperties": [
    {
      "rowFieldName": "user_id_s",
      "columnFieldName": "item_id_s",
      "collection": "test",
      "query": "type_s:purchase",
      "queryParams": [],
      "idField": "id"
    },
    {
      "rowFieldName": "user_id_s",
      "columnFieldName": "item_id_s",
      "collection": "test",
      "query": "type_s:view",
      "queryParams": [],
      "idField": "id"
    }
  ],
  "maxPrefs": 100,
  "maxSimilaritiesPerItem": 100,
  "outputProperties": {
    "rowFieldName": "item_id_s",
    "columnFieldName": "item_id_ss",
    "columnStrengthFieldName": "weight_ds",
    "collection": "test_signals_aggr",
    "pipelineName": "test_signals_aggr-default"
  }
}

Example output (trimmed):

{
  "state": "finished",
  "jobId": "8a4c0000T153eb71cba0",
  "jobConfig": {
  "inputProperties": [
    {
      "rowFieldName": "user_id_s",
      "columnFieldName": "item_id_s",
      "collection": "test",
      "query": "type_s:purchase",
      "queryParams": [],
      "idField": "id"
    },
    {
      "rowFieldName": "user_id_s",
      "columnFieldName": "item_id_s",
      "collection": "test",
      "query": "type_s:view",
      "queryParams": [],
      "idField": "id"
    }
  ],
  "maxPrefs": 100,
  "maxSimilaritiesPerItem": 100,
  "outputProperties": {
    "rowFieldName": "item_id_s",
    "columnFieldName": "item_id_ss",
    "columnStrengthFieldName": "weight_ds",
    "collection": "test_signals_aggr",
    "pipelineName": "test_signals_aggr-default"
  },
  "id": "item-similarity-1",
  "type": "item-similarity",
  "sync": "true",
  "submitted": 1459944016778
  },
  "result": {
  "state": "finished",
  "data": 101023
  },
...
}

Results of this job are sent to the output collection, and the returned data is the number of found co-occurrences.

quality

This job type calculates common search quality metrics, such as precision, recall, MRR, MAP and NDCG, for a collection or rankings. Ranking data may be represented as individual tuples of (query, doc_id, position, which naturally fits the format of data received as click-through events.

Example configuration:

{
  "id": "quality-1",
  "type": "quality",
  "inputA": {
    "format": "solr",
    "options": {
      "collection": "test"
    }
  },
  "nameField": "query_s",
  "itemField": "doc_id_s",
  "rankField": "position_i",
  "tagA": "id",
  "n": 10
}

Example output (trimmed):

{
  "state": "finished",
  "jobId": "be00034dT153eb7ee74c",
  "jobConfig": {
  "inputA": {
    "format": "solr",
    "options": {
      "collection": "test"
    }
  },
  "nameField": "query_s",
  "itemField": "doc_id_s",
  "rankField": "position_i",
  "tagA": "id",
  "n": 10,
  "id": "quality-1",
  "type": "quality",
  "sync": "true",
  "useCursorMark": true,
  "referenceTime": 1459944875828,
  "submitted": 1459944875846,
  "aggregationTime": 1459944875828,
  "inputCollection": "test",
  "rows": 10000
  },
  "result": {
  "summaryStats": {
    "map": {
      "count": 4943,
      "mean": 0.14667206150111256,
      "stdev": 0.1054289576185325,
      "variance": 0.011115265104530325,
      "max": 1,
      "min": 0.1,
      "sum": 725.0000000000091,
      "sumOfLogs": -10157.487088839782,
      "sumOfSquares": 161.28000000000247,
      "geoMean": 0.1281026291366323,
      "skewness": 3.7730477042414576,
      "kurtosis": 19.623389512667725
    },
    "mrr": { ... },
    "ndcg": { ... },
    "precision": { ... },
    "recall": { ... }
  },
  "state": "finished",
  "n": 10
  },
...
}

Spark Configuration Properties

Fusion passes all configuration properties with prefix "spark." to the Spark master, Spark worker and each Spark application, both for aggregation jobs and custom-scripted processing.

These properties are stored in Fusion’s ZooKeeper and can be updated via requests to Fusion endpoint api/apollo/configurations which will update the stored value without restarting the service, therefore existing jobs and SparkContexts will not be affected. The Fusion endpoint api/apollo/configurations returns all configured properties for that installation. You can examine spark default configurations in a Unix shell using the utilities curl and grep. Here is an example which checks a local Fusion installation running on port 8764:

curl -u username:password http://localhost:8764/api/apollo/configurations | grep '"spark.'

  "spark.executor.memory" : "2g",
  "spark.task.maxFailures" : "10",
  "spark.worker.cleanup.appDataTtl" : "7200",
  "spark.worker.cleanup.enabled" : "true",
  "spark.worker.memory" : "2g",

The default SparkContext that Fusion uses for aggregation jobs can be assigned a fraction of cluster resources (executor memory and/or available CPU cores). This allows other applications (such as scripted jobs, or shell sessions) to use the remaining cluster resources even when some aggregation jobs are running. Fusion 2.3 also permits dynamic allocation for all applications. This can be overriden per application. In practice, this means that even when there’s an already running SparkContext with a relatively long idle time (eg. 10 minutes) but there are no active jobs that use it, its resources (CPU cores and executor memory) will be released for use by other applications.

For scripted Spark jobs, users can specify per-job configuration overrides as a set of key / value pairs in a "sparkConfig" property element of a script job configuration, which takes precedence over values stored in ZooKeeper. The following is an example of a scripted job with a "sparkConfig" section:

{
  "id": "scripted_job_example",
  "script": "val rdd = sc.textFile(\"/foo.txt\")\nrdd.count\n",
  "sparkConfig": {
    "spark.cores.max": 2,
    "spark.executor.memory": "1g"
  }
}

The following table lists those Spark configuration properties that Fusion overrides or uses in order to determine applications' resource allocations.

Property Description

spark.master.url

By default, left unset. This property is only specified when using an external Spark cluster; when Fusion is using its own standalone Spark cluster, this property isn’t set.

spark.cores.max

The maximum number of cores across the cluster assigned to the application. If not specified, there is no limit. The default is unset, i.e., an unlimited number of cores.

spark.executor.memory

Amount of memory assigned to each application’s executor. The default is 2G.

spark.scheduler.mode

Controls how tasks are assigned to available resources. Can be either 'FIFO' or 'FAIR'. Default value is 'FAIR'.

spark.dynamicAllocation.enabled

Boolean - whether or not to enable dynamic allocation of executors. Default value is 'TRUE'.

spark.shuffle.service.enabled

Boolean - whether or not to enable internal shuffle service for standalone Spark cluster. Default value is 'TRUE'.

spark.dynamicAllocation.executorIdleTimeout

Number of seconds after which idle executors are removed. Default value is '60s'.

spark.dynamicAllocation.minExecutors

Number of executors to leave running even when idle. Default value is 0.

spark.eventLog.enabled

Boolean - whether or not event log is enabled. Event log stores job details and can be accessed after application finishes. Default value is 'TRUE'.

spark.eventLog.dir

Directory which stores event logs. Default location is $FUSION_HOME/var/spark-eventlog.

spark.eventLog.compress

Boolean - whether or not to compress event log data. Default value is 'TRUE'.

spark.logConf

Boolean - whether or not to log effective SparkConf of new SparkContext-s. Default value is 'TRUE'.

spark.deploy.recoveryMode

Default value is 'ZOOKEEPER'

spark.deploy.zookeeper.url

ZooKeeper connect string. Default value is $FUSION_ZK

spark.deploy.zookeeper.dir

ZooKeeper path, default value is /lucid/spark

spark.worker.cleanup.enabled

Boolean - whether or not to periodically cleanup worker data. Default value is 'TRUE'.

spark.worker.cleanup.appDataTtl

Time-to-live in seconds. Default value is 86400 (24h).

spark.deploy.retainedApplications

The maximum number of applications to show in the UI. Default value is 50.

spark.deploy.retainedDrivers

The maximum number of drivers. Default value is 50.

spark.worker.timeout

The maximum timeout in seconds allowed before a worker is considered lost. The default value is 30.

spark.worker.memory

The maximum total heap allocated to all executors running on this worker. Defaults to value of the executor memory heap.

Fusion Configuration Properties

Property Description

fusion.spark.master.port

Spark master job submission port. Default value is 8766.

fusion.spark.master.ui.port

Spark master UI port. Default value is 8767.

fusion.spark.idleTime

Maximum idle time in seconds, after which the application (ie. SparkContext) is shut down. Default value is 300.

fusion.spark.executor.memory.min

Minimum executor memory in MB. Default value 450Mb, which is sufficient to let Fusion components in application task’s to initialize themselves

fusion.spark.executor.memory.fraction

A float number in range (0.0, 1.0] indicating what portion of spark.executor.memory to allocate to this application. Default value is 1.0.

fusion.spark.cores.fraction

A float number in range (0.0, 1.0] indicating what portion of spark.cores.max to allocate to this application. Default value is 1.0.