Catalog API

The Fusion Catalog is a collection of one or more analytics projects, and each project is a collection of data assets, such as tables or relations. Fusion comes with a built-in project called "fusion".

The Fusion Catalog API provides access to assets by data analysis applications that can perform SQL or Solr queries. It includes endpoints for finding, retrieving, and manipulating projects and assets using basic keyword and metadata-driven search.

By default, non-admin Fusion users do not have access to Catalog objects. However, the Catalog API itself does not enforce any permissions, so a user who bypasses the auth proxy has full access to all projects and assets. An admin can grant permissions to Catalog endpoints for users; see Access Control.

Intra-shard splits

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub-ranges using a split field. The sub range splitting enables faster fetching from Solr by increasing the number of tasks in Solr. This should only be used if there are enough computing resources in the Spark cluster.

Shard splitting is enabled by default, with two sub-ranges per shard. See Configuration options below for shard splitting parameters.

Body attributes

For PUT and POST requests, these are valid JSON body attributes:

Name Type Description

projectId

String

The project name

name

String

The asset name

assetType

DataAssetType

One of: + * project * table * relation * field * udf * metric

description

String

A string describing this asset

sourceUri

String

A URI to the data source

owner

String

The user that owns the asset

ownerEmail

String

The owner’s email address

tags

Set<String>

A set of arbitrary category strings

format

String

The format of the underlying data source

options

List<String>

A list of options for the underlying data source. See Configuration options below for valid options.

filters

List<String>

A set of Solr query parameters to filter the request

sql

String

A SQL statement to execute

cacheOnLoad

boolean

'True' to cache the dataset in Spark on catalog project initialization

dependsOn

List<String>

A list of other assets to load before initializing this data asset

createdOn

Date

The asset’s creation date, in ISO-8601 format; otherwise the current timestamp is used

Configuration options

Name Description Default

collection

The Solr collection name.

None

zkhost

A ZooKeeper connect string is the list of all servers and ports for the current ZooKeeper cluster. For example, if running a single-node Fusion developer deployment with embedded ZooKeeper, the connect string is localhost:9983/lwfusion/3.1.0/solr. If you have an external 3-node ZooKeeper cluster running on servers "zk1.acme.com", "zk2.acme.com", "zk3.acme.com", all listening on port 2181, then the connect string is zk1.acme.com:2181,zk2.acme.com:2181,zk3.acme.com:2181

The connectString of the default search cluster

query

A Solr query that limits the rows to load into Spark. For example, to only load documents that mention "solr":

options("query","body_t:solr")

*:*

fields

A subset of fields to retrieve for each document in the results, such as:

options("fields","id,author_s,favorited_b,…​")

You can also specify an alias for a field using Solr’s field alias syntax, such as author:author_s. If you want to invoke a function query, such as rord(), then you’ll need to provide an alias, such as ord_user:ord(user_id). If the return type of the function query is something other than int or long, then you’ll need to specify the return type after the function query, such as:

foo:div(sum(x,100),max(y,1)):double
Tip
If you request Solr function queries, then the library must use the /select Solr handler to make the request as exporting function queries through /export is not supported by Solr.

By default, all stored fields for each document are pulled back from Solr.

rows

The number of rows to retrieve from Solr per request; do not confuse this with max_rows (see below). This is not the maximum number of rows to read from Solr. All matching rows on the backend are read. The rows parameter is the page size.

Behind the scenes, the implementation uses either deep paging cursors or Streaming API and response streaming, so it is usually safe to specify a large number of rows. By default, the implementation uses 1000 rows but if your documents are smaller, you can increase this to 10000. Using too large a value can put pressure on the Solr JVM’s garbage collector.

Example: options("rows","10000")

1000

max_rows

The maximum number of rows; only applies when using the /select handler. The library will issue the query from a single task and let Solr do the distributed query processing.

No paging is performed, that is, the rows param is set to max_rows when querying. Consequently, this option should not be used for large max_rows values, rather you should just retrieve all rows using multiple Spark tasks and then re-sort with Spark if needed.

Example: options("max_rows", "100")

None

request_handler

Set the Solr request handler for queries. This option can be used to export results from Solr via /export handler which streams data out of Solr. See Exporting Result Sets for more information.

The /export handler needs fields to be explicitly specified. Please use the fields option or specify the fields in the query.

Example: options("request_handler", "/export")

/select

splits

Enable shard splitting on default field version.

Example: options("splits", "true")

The above option is equivalent to options("split_field", "version")

False

split_field

The field to split on can be changed using split_field option.

Example: options("split_field", "id")

version

splits_per_shard

Split the shard into evenly-sized splits using filter queries. You can also split on a string-based keyword field but it should have sufficient variance in the values to allow for creating enough splits to be useful. In other words, if your Spark cluster can handle 10 splits per shard, but there are only 3 unique values in a keyword field, then you will only get 3 splits.

Keep in mind that this is only a hint to the split calculator and you may end up with a slightly different number of splits than what was requested.

Example: options("splits_per_shard", "30")

20

flatten_multivalued

Flatten multi-valued fields from Solr.

Example: options("flatten_multivalued", "false")

true

dv

Fetch the docValues that are indexed but not stored by using function queries. Should be used for Solr versions lower than 5.5.0.

Example: options("dv", "true")

false

sample_seed

Read a random sample of documents from Solr using the specified seed. This option can be useful if you just need to explore the data before performing operations on the full result set. By default, if this option is provided, a 10% sample size is read from Solr, but you can use the sample_pct option to control the sample size.

Example: options("sample_seed", "5150")

None

sample_pct

The size of a random sample of documents from Solr; use a value between 0 and 1.

Example: options("sample_pct", "0.05")

0.1

skip_non_dv

Skip all fields that are not docValues.

Example: options("skip_non_dv", "true")

false

Examples

Define a "movielens" project:
FUSION=localhost:8764
curl -u user:pass -X POST -H "Content-type:application/json"\
 -d '{
  "name": "movielens",
  "assetType": "project",
  "description": "tables and views for the movielens project",
  "tags": ["movies","users"],
  "cacheOnLoad": false
}' "http://{fusion_path}/api/apollo/catalog"
Add a "ratings" table to the "movielens" project:
curl -u user:pass -X POST -H "Content-type:application/json" -d '{
  "name": "ratings",
  "assetType": "table",
  "projectId": "movielens",
  "description": "movie ratings data",
  "tags": ["movies"],
  "format": "solr",
  "cacheOnLoad": true,
  "options": ["collection -> movielens_ratings", "fields -> user_id,movie_id,rating,rating_timestamp"]
}' "http://{fusion_path}/api/apollo/catalog/movielens/assets"
Issue a SQL statement against the "ratings" table:
curl -u user:pass -X POST -H "Content-type:application/json" -d '{
  "name": "ratings",
  "assetType": "table",
  "projectId": "movielens",
  "description": "movie ratings data",
  "tags": ["movies"],
  "format": "solr",
  "cacheOnLoad": true,
  "options": ["collection -> movielens_ratings", "fields -> user_id,movie_id,rating,rating_timestamp"]
}' "http://{fusion_path}/api/apollo/catalog/movielens/query"
Issue a SQL query against the "movielens" project:
curl -u user:pass -X POST -H "Content-Type:application/json" -d '{
"sql":"SELECT m.title as title, solr.aggCount as aggCount FROM movies m INNER JOIN (SELECT movie_id, COUNT(*) as aggCount FROM ratings WHERE rating >= 4 GROUP BY movie_id ORDER BY aggCount desc LIMIT 10) as solr ON solr.movie_id = m.movie_id ORDER BY aggCount DESC"
}' http://localhost:8764/api/apollo/catalog/movielens/query
Load a catalog table from a Postgres database:
curl -u user:pass -X POST -H "Content-type:application/json" -d '{
 "projectId": "nyc_taxi",
 "assetType": "table",
 "name": "trips",
 "sourceUri": "http://www.nyc.gov/html/tlc/html/about/trip_record_data.shtml",
 "owner": "Joe Example",
 "ownerEmail": "examplejoe@gmail.com",
 "description": "The NYC taxi trip data stored in Postgres using tools provided by https://github.com/toddwschneider/nyc-taxi-data",
 "tags": ["nyc", "taxi", "postgres", "trips"],
 "format": "jdbc",
 "cacheOnLoad": true,
 "options": ["url -> ${nyc_taxi_jdbc_url}","dbtable -> trips","partitionColumn -> id","numPartitions -> 4","lowerBound -> 0", "upperBound -> $MAX(id)", "fetchSize -> 1000"],
 "filters": ["pickup_latitude >= -90 AND pickup_latitude <= 90 AND pickup_longitude >= -180 AND pickup_longitude <= 180", "dropoff_latitude >= -90 AND dropoff_latitude <= 90 AND dropoff_longitude >= -180 AND dropoff_longitude <= 180"],
 "sql": "SELECT id,cab_type_id,vendor_id,pickup_datetime,dropoff_datetime,store_and_fwd_flag,rate_code_id,passenger_count,trip_distance,fare_amount,extra,mta_tax,tip_amount,tolls_amount,ehail_fee,improvement_surcharge,total_amount,payment_type,trip_type, concat_ws(',',pickup_latitude,pickup_longitude) as pickup, concat_ws(',',dropoff_latitude,dropoff_longitude) as dropoff FROM trips"
}' "http://{fusion_path}/api/apollo/catalog/nyc_taxi/assets"
Create a data asset using a streaming expression:
curl -u user:pass -X POST -H "Content-type:application/json" -d '{
  "name": "movie_ratings",
  "assetType": "table",
  "projectId": "movielens",
  "description": "movie ratings data",
  "tags": ["movies"],
  "format": "solr",
  "cacheOnLoad": true,  "options": ["collection -> movielens_ratings", "expr -> hashJoin(search(movielens_ratings,q=\"*:*\",fl=\"movie_id,user_id,rating\",sort=\"movie_id asc\",qt=\"\/export\",partitionKeys=\"movie_id\"),hashed=search(movielens_movies,q=\"*:*\",fl=\"movie_id,title\",sort=\"movie_id asc\",qt=\"\/export\",partitionKeys=\"movie_id\"),on=\"movie_id\")"]
}' "http://{fusion_path}/api/apollo/catalog/movielens/assets"
Send a Solr query:
curl -u user:pass -X POST -H "Content-Type:application/json" -d '{
  "solr":"*:*",
  "requestHandler":"/select",
  "collection":"movielens_movies",
  "params":{
    "facet":"on",
    "facet.field":"genre",
    "rows":0
  }
}' http://localhost:8764/api/apollo/catalog/movielens/query
Send a Solr query using a streaming expression:
curl -u user:pass -X POST -H "Content-Type:application/json" --data-binary @streaming_join.json http://localhost:8764/api/apollo/catalog/movielens/query

{
  "solr":"hashJoin(search(movielens_ratings, q=*:*, qt=\"/export\", fl=\"user_id,movie_id,rating\", sort=\"movie_id asc\", partitionKeys=\"movie_id\"), hashed=search(movielens_movies, q=*:*, fl=\"movie_id,title\", qt=\"/export\", sort=\"movie_id asc\",partitionKeys=\"movie_id\"),on=\"movie_id\")",
  "collection":"movielens_ratings",
  "requestHandler":"/stream"
}