Spark Jobs API
- Spark Cluster Information: /spark/master
- Configure Spark Jobs: /spark/configurations
- Manage Spark Jobs: /spark/jobs
- Job Types and Configurations
- Spark job subtypes
- Spark Configuration Properties
- Fusion Configuration Properties
This API has a set of endpoints for configuring and running Spark jobs.
/spark/master
Spark Cluster Information: Path | Method | Description |
---|---|---|
|
GET |
Returns a string containing the Spark master URL |
|
GET |
Returns the detailed status of the Spark cluster started by Fusion. |
/spark/configurations
Configure Spark Jobs: This endpoint is used to view or manage Spark job configuration objects. A configuration always consists of the following properties:
-
id - a unique identifier for this object, used by Fusion to manage this object. If this property is not specified at object creation time, Fusion will supply one.
-
Type - one of a set of defined job types: {
aggregation
,script
,wordcount
,item-similarity
,quality
}.
Depending on the job type, further configuration properties are required, described in section Job Types and Configurations below.
Configurations are defined and stored using JSON. Here is an example of a configuration for a Spark job of type "wordcount":
{ "id": "wordcount-1", "type": "wordcount", "file": "hdfs://localhost:9000/data/aol.json.gz" }
This endpoint take an optional "id" parameter, used to restrict an operation to a specific configuration object. The body of a POST or PUT request is the JSON for the Spark job configuration.
Path | Method | Args | Description |
---|---|---|---|
|
GET |
|
Returns a JSON list of defined job configurations. The value of the optional request parameter |
|
POST |
job config JSON |
Create and return a new Spark job configuration in JSON. When creating a new job configuration if a unique job id is missing one will be created by Fusion and returned to the caller. |
|
GET |
Returns a defined Spark job configuration in JSON. |
|
|
PUT |
job config JSON |
Update a defined Spark job configuration. |
|
DELETE |
Delete a defined Spark job configuration. |
/spark/jobs
Manage Spark Jobs: The /spark/jobs
endpoint manages all Spark jobs, including aggregation jobs over signals data.
Note
|
The /aggregator/jobs API was deprecated in release 3.1 and has been removed in release 4.0.
|
Path | Method | Args | Description |
---|---|---|---|
Path |
Method |
Args |
Description |
|
GET |
|
Returns a listing of all active and recently finished jobs. Result is a JSON map from jobs to an object which lists both the job type and execution information. The value of the optional request parameter |
|
POST |
job JSON |
Start an arbitrary job using the provided configuration. A unique id will be automatically assigned and returned to the caller. Returns current status of that job. |
|
GET |
Returns the detailed job status for a running or recently completed job. |
|
|
POST |
|
Start a job using an existing job configuration. Optionally execute it synchronously (blocking until the job is complete). Returns current status of that job. |
|
DELETE |
Stops a running job. |
Job Types and Configurations
Fusion stores job configurations using JSON. You can use the /spark/configurations
API to define and manage job configurations. The required configuration information depends on the job type.
The /spark/jobs
API supports the following job types.
aggregation
Jobs of this type aggregate data, for example, click data.
script
Jobs of this type can run arbitrary Scala scripts. Jobs can return the results of computation to the caller as a part of the job status, by assigning the result in the script to a variable named result
.
Example configuration:
{ "id": "scripted-1", "type": "script", "maxRows": 100, "script": "var rdd = sc.textFile(\"hdfs://localhost:9000/data/aol.json.gz\")\nrdd.count\nval result = rdd\n" }
Example output:
{ "state": "finished", "jobId": "d42260a6T153ebaed126", "jobConfig": { "id": "scripted-1", "script": "var rdd = sc.textFile(\"hdfs://localhost:9000/data/aol.json.gz\")\nrdd.count\nval result = rdd\n", "maxRows": 100, "type": "script", "sync": "true", "submitted": 1459948015910 }, "result": { "result": [ { "_": "{\"user_id_s\":\"142\",\"query_s\":\"westchester.gov\",\"timestamp_dt\":\"2006-03-20T03:55:57.000Z\",\"params.position_s\":\"1\",\"doc_id_s\":\"http://www.westchestergov.com\"}" }, { "_": "{\"user_id_s\":\"142\",\"query_s\":\"207 ad2d 530\",\"timestamp_dt\":\"2006-04-08T01:31:14.000Z\",\"params.position_s\":\"1\",\"doc_id_s\":\"http://www.courts.state.ny.us\"}" }, ... ], "state": "finished", "resultCode": "SUCCESS", "output": "rdd: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/data/aol.json.gz MapPartitionsRDD[1] at textFile at <console>:17\nres0: Long = 99999\nresult: org.apache.spark.rdd.RDD[String] = hdfs://localhost:9000/data/aol.json.gz MapPartitionsRDD[1] at textFile at <console>:17\n" }, ... }
Note
|
the entries in the "result" part of the job output are simply the content of RDD[String] where lines are flat Strings and not a real JSON. Users can use sqlContext.read.json(…) to read this file as a DataFrame .
|
wordcount
Jobs of this type run a simple version of the Unix wc
utility that is implemented using Spark. It can open a text file defined as any data source supported by Spark DataFrameReader
, and returns three numbers: line count, word count, and character count. Words are delimited by whitespace, and character count doesn’t include the end-of-line characters.
Please note that this implementation uses Spark accumulators. When accumulators are used in transformations—as opposed to actions—the accumulators can count items multiple times, if some tasks should fail. As a consequence. the result is not as reliable as one using actions, but it is very inexpensive to calculate.
Example configuration:
{ "id": "wordcount-1", "type": "wordcount", "file": "hdfs://localhost:9000/data/aol.json.gz" }
Example output:
{ "state": "finished", "jobId": "aadddc12T153e6b822d8", "jobConfig": { "file": "hdfs://localhost:9000/data/aol.json.gz", "id": "wordcount-1", "type": "wordcount", "sync": "true", "submitted": 1459864740568 }, "result": { "state": "finished", "data": [ 99999, 282865, 15888690 ] }, ... }
item-similarity
Jobs of this type calculate the similarity between items.
Example configuration:
{ "id": "item-similarity-1", "inputProperties": [ { "rowFieldName": "user_id_s", "columnFieldName": "item_id_s", "collection": "test", "query": "type_s:purchase", "queryParams": [], "idField": "id" }, { "rowFieldName": "user_id_s", "columnFieldName": "item_id_s", "collection": "test", "query": "type_s:view", "queryParams": [], "idField": "id" } ], "maxPrefs": 100, "maxSimilaritiesPerItem": 100, "outputProperties": { "rowFieldName": "item_id_s", "columnFieldName": "item_id_ss", "columnStrengthFieldName": "weight_ds", "collection": "test_signals_aggr", "pipelineName": "test_signals_aggr-default" } }
Example output (trimmed):
{ "state": "finished", "jobId": "8a4c0000T153eb71cba0", "jobConfig": { "inputProperties": [ { "rowFieldName": "user_id_s", "columnFieldName": "item_id_s", "collection": "test", "query": "type_s:purchase", "queryParams": [], "idField": "id" }, { "rowFieldName": "user_id_s", "columnFieldName": "item_id_s", "collection": "test", "query": "type_s:view", "queryParams": [], "idField": "id" } ], "maxPrefs": 100, "maxSimilaritiesPerItem": 100, "outputProperties": { "rowFieldName": "item_id_s", "columnFieldName": "item_id_ss", "columnStrengthFieldName": "weight_ds", "collection": "test_signals_aggr", "pipelineName": "test_signals_aggr-default" }, "id": "item-similarity-1", "type": "item-similarity", "sync": "true", "submitted": 1459944016778 }, "result": { "state": "finished", "data": 101023 }, ... }
The returned data is the number of found item-item co-occurrences. Fusion sends the results of this job to the output collection.
quality
Jobs of this type calculate common search quality metrics, such as precision, recall, MRR, MAP and NDCG, for a collection or rankings. Ranking data can be represented as individual tuples of (query, doc_id, position)
, which naturally fits the format of data received as click-through events.
Example configuration:
{ "id": "quality-1", "type": "quality", "inputA": { "format": "solr", "options": { "collection": "test" } }, "nameField": "query_s", "itemField": "doc_id_s", "rankField": "position_i", "tagA": "id", "n": 10 }
Example output (trimmed):
{ "state": "finished", "jobId": "be00034dT153eb7ee74c", "jobConfig": { "inputA": { "format": "solr", "options": { "collection": "test" } }, "nameField": "query_s", "itemField": "doc_id_s", "rankField": "position_i", "tagA": "id", "n": 10, "id": "quality-1", "type": "quality", "sync": "true", "useCursorMark": true, "referenceTime": 1459944875828, "submitted": 1459944875846, "aggregationTime": 1459944875828, "inputCollection": "test", "rows": 10000 }, "result": { "summaryStats": { "map": { "count": 4943, "mean": 0.14667206150111256, "stdev": 0.1054289576185325, "variance": 0.011115265104530325, "max": 1, "min": 0.1, "sum": 725.0000000000091, "sumOfLogs": -10157.487088839782, "sumOfSquares": 161.28000000000247, "geoMean": 0.1281026291366323, "skewness": 3.7730477042414576, "kurtosis": 19.623389512667725 }, "mrr": { ... }, "ndcg": { ... }, "precision": { ... }, "recall": { ... } }, "state": "finished", "n": 10 }, ... }
Spark job subtypes
For the Spark job type, the available subtypes are listed below.
Job subtype | Description |
---|---|
Aggregation |
Define an aggregation job to be executed by Fusion Spark. |
ALS Recommender |
Train a collaborative filtering matrix decomposition recommender using SparkML’s Alternating Least Squares (ALS) to batch-compute user recommendations and item similarities. |
Bisecting KMeans Clustering Job |
Train a bisecting KMeans clustering model. |
Cluster Labeling |
Attach keyword labels to documents that have already been assigned to groups. See Doc Clustering below. |
Collection Analysis |
Produce statistics about the types of documents in a collection and their lengths. |
Co-occurrence Similarity |
Compute a mutual-information item similarity model. |
Doc Clustering |
Preprocess documents, separate out extreme-length documents and other outliers, automatically select the number of clusters, and extract keyword labels for clusters. You can choose between Bisecting KMeans and KMeans clustering methods, and between TFIDF and word2vec vectorization methods. |
Item Similarity Recommender |
Compute user recommendations based on pre-computed item similarity model. |
Levenshtein |
Compare the items in a collection and produces possible spelling mistakes based on the Levenshtein edit distance. |
Logistic Regression Classifier Training Job |
Train a regularized logistic regression model for text classification. |
Matrix Decomposition-Based Query-Query Similarity Job |
Train a collaborative filtering matrix decomposition recommender using SparkML’s Alternating Least Squares (ALS) to batch-compute query-query similarities. |
Outlier Detection |
Find groups of outliers for the entire set of documents in the collection. |
Random Forest Classifier Training |
Train a random forest classifier for text classification. |
Script |
Run a custom Scala script as a Fusion Job. |
Statistically Interesting Phrases (SIP) |
Output statistically interesting phrases in a collection, that is, phrases that occur more frequently or less frequently than expected. |
Spark Configuration Properties
Fusion passes all configuration properties with the prefix spark.
to the Spark master, Spark worker, and each Spark application, both for aggregation jobs and custom-scripted processing.
These properties are stored in Fusion’s ZooKeeper instance. You can updated properties through requests to the Fusion endpoint api/apollo/configurations
. Requests update the stored value without restarting the service; therefore existing jobs and SparkContexts are not affected.
The Fusion endpoint api/apollo/configurations
returns all configured properties for that installation.
You can examine spark default configurations in a Unix shell using the utilities curl
and grep
.
Here is an example that checks a local Fusion installation running on port 8764:
curl -u username:password http://localhost:8764/api/apollo/configurations | grep '"spark.' "spark.executor.memory" : "2g", "spark.task.maxFailures" : "10", "spark.worker.cleanup.appDataTtl" : "7200", "spark.worker.cleanup.enabled" : "true", "spark.worker.memory" : "2g",
The default SparkContext that Fusion uses for aggregation jobs can be assigned a fraction of cluster resources (executor memory and/or available CPU cores). This allows other applications (such as scripted jobs, or shell sessions) to use the remaining cluster resources even when some aggregation jobs are running. Fusion 2.3 also permits dynamic allocation for all applications. This can be overridden per application. In practice, this means that even when there’s an already running SparkContext with a relatively long idle time (eg. 10 minutes) but there are no active jobs that use it, its resources (CPU cores and executor memory) will be released for use by other applications.
For scripted Spark jobs, users can specify per-job configuration overrides as a set of key/value pairs in a "sparkConfig" property element of a script job configuration, which takes precedence over values stored in ZooKeeper. The following is an example of a scripted job with a "sparkConfig" section:
{ "id": "scripted_job_example", "script": "val rdd = sc.textFile(\"/foo.txt\")\nrdd.count\n", "sparkConfig": { "spark.cores.max": 2, "spark.executor.memory": "1g" } }
The following table lists those Spark configuration properties that Fusion overrides or uses in order to determine applications' resource allocations.
Property | Description |
---|---|
|
By default, left unset. This property is only specified when using an external Spark cluster; when Fusion is using its own standalone Spark cluster, this property isn’t set. |
|
The maximum number of cores across the cluster assigned to the application. If not specified, there is no limit. The default is unset, i.e., an unlimited number of cores. |
|
Amount of memory assigned to each application’s executor. The default is 2G. |
|
Controls how tasks are assigned to available resources. Can be either 'FIFO' or 'FAIR'. Default value is 'FAIR'. |
|
Boolean - whether or not to enable dynamic allocation of executors. Default value is 'TRUE'. |
|
Boolean - whether or not to enable internal shuffle service for standalone Spark cluster. Default value is 'TRUE'. |
|
Number of seconds after which idle executors are removed. Default value is '60s'. |
|
Number of executors to leave running even when idle. Default value is 0. |
|
Boolean - whether or not event log is enabled. Event log stores job details and can be accessed after application finishes. Default value is 'TRUE'. |
|
Directory which stores event logs. Default location is |
|
Boolean - whether or not to compress event log data. Default value is 'TRUE'. |
|
Boolean - whether or not to log effective SparkConf of new SparkContext-s. Default value is 'TRUE'. |
|
Default value is 'ZOOKEEPER' |
|
ZooKeeper connect string. Default value is |
|
ZooKeeper path, default value is |
|
Boolean - whether or not to periodically cleanup worker data. Default value is 'TRUE'. |
|
Time-to-live in seconds. Default value is 86400 (24h). |
|
The maximum number of applications to show in the UI. Default value is 50. |
|
The maximum number of drivers. Default value is 50. |
|
The maximum timeout in seconds allowed before a worker is considered lost. The default value is 30. |
|
The maximum total heap allocated to all executors running on this worker. Defaults to value of the executor memory heap. |
Fusion Configuration Properties
Property | Description |
---|---|
|
Spark master job submission port. Default value is 8766. |
|
Spark master UI port. Default value is 8767. |
|
Maximum idle time in seconds, after which the application (ie. SparkContext) is shut down. Default value is 300. |
|
Minimum executor memory in MB. Default value 450Mb, which is sufficient to let Fusion components in application task’s to initialize themselves |
|
A float number in range |
|
A float number in range |