Fusion SQL service

Most organizations that deploy Fusion also have SQL-compliant business intelligence (BI) or dashboarding tools to facilitate self-service analytics.

The Fusion SQL service:

  • Lets organizations leverage their investments in BI tools by using JDBC and SQL to analyze data managed by Fusion. For example, Tableau is popular data analytics tool that connects to Fusion SQL using JDBC to enable self-service analytics.

  • Helps business users access important data sets in Fusion without having to know how to query Solr.

Fusion SQL architecture

The following diagram depicts a common Fusion SQL service deployment scenario using the Kerberos network authentication protocol for single sign-on. Integration with Kerberos is optional. By default, the Fusion SQL service uses Fusion security for authentication and authorization.

Fusion SQL service architecture

The numbered steps in the diagram are:

  1. The JDBC/ODBC client application (for example, TIBCO Spotfire or Tableau) uses Kerberos to authenticate a Fusion data analyst.

  2. After authentication, the JDBC/ODBC client application sends the user’s SQL query to the Fusion SQL Thrift Server over HTTP.

  3. The SQL Thrift Server uses the keytab of the Kerberos service principal to validate the incoming user identity.

    The Fusion SQL Thrift Server is a Spark application with a specific number of CPU cores and memory allocated from the pool of Spark resources. You can scale out the number of Spark worker nodes to increase available memory and CPU resources to the Fusion SQL service.

  4. The Thrift Server sends the query to Spark to be parsed into a logical plan.

  5. During the query planning stage, Spark sends the logical plan to Fusion’s pushdown strategy component.

  6. During pushdown analysis, Fusion calls out to the registered AuthZ FilterProvider implementation to get a filter query to perform row-level filtering for the Kerberos-authenticated user.

    By default, there is no row-level security provider but users can install their own implementation using the Fusion SQL service API.

  7. Spark executes a distributed Solr query to return documents that satisfy the SQL query criteria and row-level security filter. To leverage the distributed nature of Spark and Solr, Fusion SQL sends a query to all replicas for each shard in a Solr collection. Consequently, you can scale out SQL query performance by adding more Spark and/or Solr resources to your cluster.

Fusion pushdown strategy

The pushdown strategy analyzes the query plan to determine if there is an optimal Solr query or streaming expression that can push down aggregations into Solr to improve performance and scalability. For example, the following SQL query can be translated into a Solr facet query by the Fusion pushdown strategy:

select count(1) as the_count, movie_id from ratings group by movie_id

The basic idea behind Fusion’s pushdown strategy is it is much faster to let Solr facets perform basic aggregations than it is to export raw documents from Solr and have Spark perform the aggregation. If an optimal pushdown query is not possible, then Spark pulls raw documents from Solr, and then performs any joins or aggregations needed in Spark. Put simply, the Fusion SQL service tries to translate SQL queries into optimized Solr queries. But failing that, the service simply reads all matching documents for a query into Spark, and then performs the SQL execution logic across the Spark cluster.

Starting, stopping, and status

Starting the Fusion SQL service

The Fusion SQL service is an optional service that must be started manually.

Give these commands from the bin directory below the Fusion home directory, for example, /opt/fusion/4.1.x (on Unix) or C:\lucidworks\fusion\4.1.x (on Windows).

On Unix:

When starting the Fusion SQL service, the best practice is to also start the Spark master and Spark worker services:

./fusion start spark-master spark-worker sql

On Windows:

When starting the Fusion SQL service, the best practice is to also start the Spark master and Spark worker services:

spark-master.cmd start
spark-worker.cmd start
sql.cmd start

Stopping the Fusion SQL service

The Fusion SQL service is an optional service that must be stopped manually.

Give these commands from the bin directory below the Fusion home directory, for example, /opt/fusion/4.1.x (on Unix) or C:\lucidworks\fusion\4.1.x (on Windows).

On Unix:

When stopping the Fusion SQL service, the best practice is to also stop the Spark master and Spark worker services:

./fusion stop sql spark-worker spark-master

On Windows:

When starting the Fusion SQL service, the best practice is to also start the Spark master and Spark worker services:

sql.cmd stop
spark-worker.cmd stop
spark-master.cmd stop

Updating the group.default definition

If you plan to run the Fusion SQL service for production, we recommend updating the group.default definition in the file conf/fusion.properties (on Unix) or conf\fusion.properties (on Windows) to include the spark-master, spark-worker, and sql services:

group.default = zookeeper, solr, api, connectors-classic, connectors-rpc, proxy, webapps, admin-ui, log-shipper, spark-master, spark-worker, sql

Verifying that the Fusion SQL service started

Verify the Fusion SQL service application started. Give these commands from the bin directory below the Fusion home directory.

On Unix:

./sql status

On Windows:

sql.cmd status

Alternatively, check the Spark UI, for example:

Check Spark UI

Which collections are registered

By default, all Fusion collections except system collections are registered in the Fusion SQL service so you can query them without any additional setup. However, empty collections cannot be queried, or even described from SQL, so empty collections won’t show up in the Fusion SQL service until they have data. In addition, any fields with a dot in the name are ignored when tables are auto-registered. You can use the Catalog API to alias fields with dots in the names to include these fields.

If you add data to a previously empty collection, then you can execute either of the following SQL commands to ensure that the data gets added as a table:

show tables

show tables in `default`

The Fusion SQL service checks previously empty collections every minute and automatically registers recently populated collections as a table.

You can describe any table using:

describe table-name

See the movielens lab in the Fusion Spark Bootcamp for a complete example of working with the Fusion Catalog API and Fusion SQL service. Also read about the Catalog API.

Troubleshooting SQL queries

If you encounter an issue with a SQL query, the first place to look for more information about the issue is the var/log/sql/sql.log file. If you need more verbose log information, change the level to DEBUG for the following loggers in the file conf/sql-log4j2.xml (on Unix) or conf\sql-log4j2.xml (on Windows):

<logger name="com.lucidworks.spark" level="DEBUG"/>
<logger name="com.lucidworks.spark.sql" level="DEBUG"/>

After making changes, you must restart the Fusion SQL service. Give these commands from the bin directory below the Fusion home directory, for example, /opt/fusion/4.1.x (on Unix) or C:\lucidworks\fusion\4.1.x (on Windows).

On Unix:

./sql restart

On Windows:

sql.cmd restart

Connecting to the Fusion SQL service from JDBC

The default JDBC properties for connecting to the Fusion SQL service are:

Driver:

org.apache.hive.jdbc.HiveDriver

Driver JAR:

Unix:

/opt/fusion/4.1.x/apps/libs/hive-jdbc-shaded-2.1.1.jar

Windows:

C:\lucidworks\fusion\4.1.x\apps\libs\hive-jdbc-shaded-2.1.1.jar

JDBC URL:

jdbc:hive2://localhost:8768/default;transportMode=http;httpPath=fusion

The username and password are the same as the ones you use to authenticate to Fusion.

Increasing resource allocations

So as to not conflict with the CPU and memory settings used for Fusion driver applications (default & script), the Fusion SQL service uses a unique set of configuration properties for granting CPU and memory for executing SQL queries.

You can use the Configurations API to override the default values shown here.

Configuration Property and Default Description

fusion.sql.cores

1

Sets the max number of cores to use across the entire cluster to execute SQL queries. Give as many as possible while still leaving CPU available for other Fusion jobs.

fusion.sql.executor.cores

1

Number of cores to use per executor

fusion.sql.memory

1g

Memory per executor to use for executing SQL queries

fusion.sql.default.shuffle.partitions

20

Default number of partitions when performing a distributed group-by-type operation, such as a JOIN

fusion.sql.bucket_size_limit.threshold

30,000,000

Threshold that determines when to use Solr streaming rollup instead of facet when computing aggregations; rollup can handle high cardinality dimenions but is much slower than using facets to compute aggregate measures.

fusion.sql.max.no.limit.threshold

10,000

Sets a limit for SQL queries that select all fields and all rows, that is, select * from table-name.

fusion.sql.max.cache.rows

5,000,000

Don’t cache tables bigger than this threshold. If a user sends the cache-table command for large collections with row counts that exceed this value, then the cache operations will fail.

fusion.sql.max_scan_rows

2,000,000

Safeguard mechanism to prevent queries that request too many rows from large tables. Queries that read more than this many rows from Solr will fail; increase this threshold for larger Solr clusters that can handle streaming more rows concurrently.

Tip
The Fusion SQL service is designed for executing analytics-style queries over large data sets. You need to provide ample CPU and memory so that queries execute efficiently and can leverage Spark’s in-memory caching for joins and aggregations.

Here’s an example of increasing the resources for the Fusion SQL service:

curl -H 'Content-type:application/json' -X PUT -d '8' "http://localhost:8765/api/v1/configurations/fusion.sql.cores"
curl -H 'Content-type:application/json' -X PUT -d '8' "http://localhost:8765/api/v1/configurations/fusion.sql.executor.cores"
curl -H 'Content-type:application/json' -X PUT -d '2g' "http://localhost:8765/api/v1/configurations/fusion.sql.memory"
curl -H 'Content-type:application/json' -X PUT -d '8' "http://localhost:8765/api/v1/configurations/fusion.sql.default.shuffle.partitions"

If you change any of these settings, you must restart the Fusion SQL service with ./sql restart (on Unix) or sql.cmd restart (on Windows).

The Fusion SQL service is a long-running Spark application and, as such, it holds on to the resources (CPU and memory) allocated to it using the aforementioned settings. Consequently, you might need to reconfigure the CPU and memory allocation for other Fusion Spark jobs to account for the resources given to the Fusion SQL service. In other words, any resources you give to the Fusion SQL service are no longer available for running other Fusion Spark jobs. For more information on adjusting the CPU and memory settings for Fusion Spark jobs, see the Spark configuration settings.

Hive configuration

Behind the scenes, the Fusion SQL service is based on Hive. Use the hive-site.xml file in /opt/fusion/4.1.x/conf/ (on Unix) or C:\lucidworks\fusion\4.1.x\conf\ (on Windows) to configure Hive settings.

If you change hive-site.xml, you must restart the Fusion SQL service with ./sql restart (on Unix) or sql.cmd restart (on Windows).

Using virtual tables with a common join key

With Solr, you can index different document types into the same shard using the composite ID router based on a common route key field. For example, a customer 360 application can index different customer-related document types (contacts, apps, support requests, and so forth) into the same collection, each with a common customer_id field. This lets Solr perform optimized joins between the document types using the route key field. This configuration uses Solr’s composite ID routing, which ensures that all documents with the same join key field end up in the same shard. See Document Routing.

Providing a compositeIdSpec for the Fusion collection

Before indexing, you need to provide a compositeIdSpec for the Fusion collection. For example:

curl -u $FUSION_USER:$FUSION_PASS -X POST -H "Content-type:application/json" \
  -d '{"id":"customer","solrParams":{"replicationFactor":1,"numShards":1,"maxShardsPerNode":10},"type":"DATA","compositeIdSpec":{"routeKey1Field":"customer_id_s"}}' \
  "$FUSION_API/apps/$MYAPP/collections?defaultFeatures=false"

In the example request above, we create a collection named customer with the route key field set to customer_id_s. When documents are indexed through Fusion, the Solr Index pipeline stage uses the compositeIdSpec to create a composite document ID, so documents get routed to the correct shard.

Exposing document types as virtual tables

If you configure your Fusion collection to use a route key field to route different document types to the same shard, then the Fusion SQL service can expose each document type as a virtual table and perform optimized joins between these virtual tables using the route key. To create virtual tables, you simply need to use the Fusion Catalog API on the data asset for the main collection to set the name of the field that determines the document type. For example, if you have a collection named customer that contains different document types (contacts, support tickets, sales contracts, and so forth), then you would set up virtual tables using the following Catalog API update request:

curl -XPUT -H "Content-type:application/json" http://localhost:8765/api/v1/catalog/fusion/assets/customer -d '{
  "projectId" : "fusion",
  "name" : "customer",
  "assetType" : "table",
  "description" : "Fusion collection customer",
  "format" : "solr",
  "options" : [ "collection -> customer", "exclude_fields -> _lw_*,*_\\d_coordinate,_raw_content_", "solr.params -> sort=id asc" ],
  "cacheOnLoad" : false,
  "id" : "fusion.customer",
  "additionalSettings": {
    "virtualTableField":"doc_type_s"
  }
}'

In the example above, we set the virtualTableField to doc_type_s. Fusion sends a facet request to the customer collection to get the unique values of the doc_type_s field and creates a data asset for each unique value. Each virtual table is registered in the Fusion SQL service as a table.

Performing optimized joins in SQL

After you have virtual tables configured and documents routed to the same shard using a compositeIdSpec, you can perform optimized joins in SQL that take advantage of Solr’s domain-join facet feature. For example, the following SQL statement results in a JSON facet request to Solr to perform the aggregation:

select count(1) num_support_requests,
       c.industry as industry,
       a.app_id as app_id,
       a.feature_id as feature_id
from customer c
join support s on c.customer_id = s.customer_id
join apps a on s.customer_id = a.customer_id
where c.region='US-East' AND s.support_type='Enhancement' AND a.app_type='Search'
group by industry, app_id, feature_id

In the example above, we compute the number of feature enhancement requests for Search applications from customers in the US-East region by performing a 3-way join between the customer, support, and apps virtual tables using the customer_id join key. Behind the scenes, Fusion SQL performs a JSON facet query that exploits all documents with the same customer_id value being in the same shard. This lets Solr compute the count for the industry, app_id, feature_id group by key more efficiently than is possible using table scans in Spark.

Using Kerberos for JDBC authentication

Use the following steps to configure the Fusion SQL service to use Kerberos for authentication.

  1. Create a service principal and keytab; your Active Directory or Kerberos administrator will know how to do this. At a minimum, enable the AES 128-bit encryption. You can use 256, but you’ll have to install the JCE extensions.

    This is an example command to create a keytab file for the service account:

    ktpass /out c:\fusion.service.keytab /princ fusion/sawsserver@FUSIONSQL.LOCAL /rndpass /ptype KRB5_NT_PRINCIPAL /mapUser fusion@FUSIONSQL.LOCAL -mapOp set -crypto AES128-SHA1
  2. Copy the keytab file to the Fusion conf directory.

  3. Update the file conf/hive-site.xml (on Unix) or conf\hive-site.xml (on Windows) to use Kerberos authentication and the correct principal and keytab file installed in step 2.

    On Unix:

    <property>
      <name>hive.server2.authentication</name>
      <value>Kerberos</value>
    </property>
    <property>
      <name>hive.server2.authentication.Kerberos.principal</name>
      <value>fusion/sawsserver@FUSIONSQL.LOCAL</value>
    </property>
    <property>
      <name>hive.server2.authentication.Kerberos.keytab</name>
      <value>./conf/fusion.service.keytab</value>
    </property>

    On Windows:

    <property>
      <name>hive.server2.authentication</name>
      <value>Kerberos</value>
    </property>
    <property>
      <name>hive.server2.authentication.Kerberos.principal</name>
      <value>fusion/sawsserver@FUSIONSQL.LOCAL</value>
    </property>
    <property>
      <name>hive.server2.authentication.Kerberos.keytab</name>
      <value>conf\fusion.service.keytab</value>
    </property>
  4. Install the file that contains information about your Kerberos realm on the Fusion server.

    On Unix:

    Place the file krb5.conf in the etc directory.

    On Windows:

    Place the file krb5.ini in the C:\Windows directory.

  5. Update the file conf/fusion.properties (on Unix) or conf\fusion.properties (on Windows) to point to the file krb5.conf (on Windows) or krb5.ini (on Windows) installed in step 4.

    On Unix:

    sql.jvmOptions = -Xmx1g -Djava.security.krb5.conf=/etc/krb5.conf

    On Windows:

    sql.jvmOptions = -Xmx1g -Djava.security.krb5.conf=C:\Windows\krb5.ini

Approaches for optimizing the performance of queries

In general terms, the Fusion SQL service supports three approaches for optimizing the performance of queries:

  • Join or aggregate in Spark – Read a set of raw rows from Solr and join or aggregate the rows in Spark.

    For queries that rely on Spark performing joins and aggregations on raw rows read from Solr, your goal is to minimize the number of rows read from Solr and achieve the best read performance of those rows.

  • Push down into Solr – Push down aggregation queries into Solr, returning a smaller set of aggregated rows to Spark.

  • Use views – Use views that send queries directly to Solr, using options supported by the spark-solr library.

Examples of queries written to optimize performance

Here we provide examples of how to get the best performance out of the Fusion SQL service.

The examples show how to write queries to optimize performance, by ensuring that Fusion Spark isn’t performing unnecessary computations. This lets you do complex operations in Spark.

Pulling back rows is a rate-limiting step when executing aggregations. These examples ensure that the Fusion Spark query planner pulls back the smallest number of rows.

For general SQL information, see the Spark SQL Language manual.

Example 1: Only request fields that have DocValues enabled

Optimal read performance is achieved by only requesting fields that have DocValues enabled, because these can be pulled through the /export handler.

It goes without saying that you should only request the fields you need for each query. Spark’s query planner pushes the field list down into the Fusion SQL service, which translates it into an fl parameter to Solr.

For example, if you need movie_id and title from the movies table, do this:

select movie_id, title from movies

Don’t do this:

select * from movies

Example 2: Use WHERE clause filtering to reduce the number of rows

Use WHERE clause criteria, including full Solr queries, to do as much filtering in Solr as possible to reduce the number of rows.

Spark’s SQL query planner pushes down simple filter criteria into the Fusion SQL service, which translates SQL filters into Solr filter query (fq) parameters. For example, if you want to query using SQL:

select user_id, movie_id, rating from ratings where rating = 4

Then behind the scenes, the Fusion SQL service transforms this query into the following Solr query:

q=*:*&qt=/export&sort=id+asc&collection=ratings&fl=user_id,movie_id,rating&fq=rating:4

Notice that the WHERE clause was translated into an fq parameter and the specific fields needed for the query are sent along in the fl parameter. Also notice that the Fusion SQL service will use the /export handler if all of the fields requested have DocValues enabled. This makes a big difference in performance.

You can also perform full-text queries using the WHERE clause. For example, the following SQL performs a full-text search for the term "love" in the plot_txt_en field of the movies table.

select movie_id,title from movies where plot_txt_en='love'

This works because the Fusion SQL service uses the Solr schema to determine that the plot_txt_en field is a text field, and so Solr assumes the user wants to perform a full-text query. Keep in mind that this full-text search feature does not work with cached tables, because those are held in Spark memory and WHERE criteria aren’t sent to Solr when tables are cached.

The Fusion SQL service also provides the _query_ user-defined function (UDF) to execute any valid Solr query; for example:

select movie_id,title from movies
where _query_("+plot_txt_en:dogs -plot_txt_en:cats")

You should use the _query_ UDF approach when you need to pass a complex query to Solr.

Here’s another example where we push down a subquery into Solr directly to apply a complex full-text search (in this case, a geofilt geospatial query) and then join the resulting rows with a different table:

SELECT geo.place_name, count(*) as cnt
  FROM users u
  INNER JOIN (select place_name,zip_code from zipcodes where _query_('{!geofilt sfield=geo_location pt=44.9609,-93.2642 d=50}')) as geo
  ON geo.zip_code = u.zip_code WHERE u.gender='F' GROUP BY geo.place_name

Example 3: Apply LIMIT clauses on pushdown queries

Let’s say you have a table of movies and ratings and want to join the title with the ratings table to get the top 100 movies with the most ratings, using something like this:

select m.title, count(*) as num_ratings from movies m, ratings r where m.movie_id = r.movie_id group by m.title order by num_ratings desc limit 100

Given that LIMIT clause, you might think this query will be very fast because you’re only asking for 100 rows. However, if the ratings table is big (as is typically the case), then Spark has to read all of the ratings from Solr before joining and aggregating. The better approach is to push the LIMIT down into Solr, and then join from the smaller result set.

select m.title, solr.num_ratings from movies m inner join (select movie_id, count(*) as num_ratings from ratings group by movie_id order by num_ratings desc limit 100) as popular_movies on m.movie_id = popular_movies.movie_id
order by num_ratings desc

Notice that the LIMIT is now on the subquery that gets run directly by Solr, using the Fusion SQL service pushdown. You should use this strategy whether you’re aggregating in Solr or just retrieving raw rows. For example:

SELECT e.id, e.name, solr.* FROM ecommerce e INNER JOIN (select timestamp_tdt, query_s, filters_s, type_s, user_id_s, doc_id_s from ecommerce_signals order by timestamp_tdt desc limit 50000) as signals ON signals.doc_id_s = e.id

The subquery pulls the last 50,000 signals from Solr before joining the signals with the ecommerce table.

Example 4: Tune read options of the underlying data asset in Fusion

When you need to return fields that don’t support DocValues from Solr, consider tuning the read options of the underlying data asset in Fusion.

Behind the scenes, the Fusion SQL service uses parallel queries to each shard and cursorMark to page through all documents in each shard. This approach, while efficient, is not as fast as reading from the /export handler. For example, our ecommerce table contains text fields that can’t be exported using DocValues, so we can tune the read performance using the Catalog API:

curl -X PUT -H "Content-type:application/json" --data-binary '{
  "name": "ecommerce",
  "assetType": "table",
  "projectId": "fusion",
  "description": "ecommerce demo data",
  "tags": ["fusion"],
  "format": "solr",
  "cacheOnLoad": false,
  "options" : [ "collection -> ecommerce", "splits_per_shard -> 4", "solr.params -> sort=id asc", "exclude_fields -> _lw_*,_raw_content_", "rows -> 10000" ]
}' $FUSION_API/catalog/fusion/assets/ecommerce

Notice that, in this case, we’re reading all fields except those matching the patterns in the exclude_fields option. We’ve also increased the number of rows read per paging request to 10000 and we want 4 splits_per_shard, which means that we’ll use 4 tasks per shard to read data across all replicas of that shard.

For more information, read about tuning Spark-Solr read performance.

The key take-away here is that, if your queries need fields that can’t be exported, then you’ll need to do some manual tuning of the options on the data asset to get optimal performance. Here’s another example where we want a geospatial field that doesn’t support DocValues from Solr:

curl -u $FUSION_USER:$FUSION_PASS -XPOST -H "Content-type:application/json" "$FUSION_API/catalog/fusion/assets" --data-binary @<(cat <<EOF
{
  "name": "minn_zipcodes",
  "assetType": "table",
  "projectId": "fusion",
  "format": "solr",
  "description":"zips around minn",
  "options": [
     "collection -> zipcodes",
     "fields -> zip_code,place_name,state,county,geo_point,geo_location,geo_location_rpt",
     "query -> {!geofilt sfield=geo_location pt=44.9609,-93.2642 d=50}",
     "solr.params -> sort=id asc"
  ]
}
EOF
)

In this example, we want to return the geo_location_rpt field, for example to generate a heatmap, so we need to define a custom data asset in the catalog.

Example 5: Aggregate in Solr using the underlying faceting engine

Solr provides a number of basic aggregation capabilities, such as count, min, max, avg, and sum. To reduce the number of raw rows that are returned from Solr to Spark, the Fusion SQL service leverages Solr aggregation when possible. The smaller the number of rows returned from Solr lets Spark perform better query optimization, such as doing a broadcast of a small table across all partitions of a large table (a hash join).

Here is an example where we push a group count operation (a facet basically) down into Solr using a subquery.

    SELECT m.title as title, top_rated_movies.aggCount as aggCount
      FROM movies m
INNER JOIN (SELECT movie_id, COUNT(*) as aggCount FROM ratings WHERE rating >= 4 GROUP BY movie_id ORDER BY aggCount desc LIMIT 10) as top_rated_movies
        ON top_rated_movies.movie_id = m.movie_id
  ORDER BY aggCount DESC

Solr returns aggregated rows by movie_id, and then we leverage Spark to perform the join between movies and the aggregated results of the subquery, which it can do quickly using a hash join with a broadcast.

Example 6: Aggregate in Solr using streaming expressions

To determine whether an aggregation can be pushed down into Solr for better performance, the Fusion SQL service analyzes the logical plan in Spark.

Alternatively, you can write a streaming expression and then expose that as a view in the Fusion SQL service. For example, the following streaming expression joins ecommerce products and signals:

select(
  hashJoin(
    search(ecommerce,
           q="*:*",
           fl="id,name",
           sort="id asc",
           qt="/export",
          partitionKeys="id"),
    hashed=facet(ecommerce_signals,
                 q="*:*",
                 buckets="doc_id_s",
                 bucketSizeLimit=10000,
                 bucketSorts="count(*) desc",
                 count(*)),
    on="id=doc_id_s"),
  name as product_name,
  count(*) as click_count,
  id as product_id
)

This streaming expression performs a hash join between the ecommerce table and the results of a facet expression on the signals collection. We also use the SELECT expression decorator to return human-friendly field names.

For more information, read about how to write streaming expressions.

After creating and testing the streaming expression in Solr, you must JSON encode the expression and then create a data asset in the Fusion catalog. For example:

curl -XPOST -H "Content-Type:application/json" --data-binary '{
  "name": "ecomm_popular_docs",
  "assetType": "table",
  "projectId": "fusion",
  "description": "Join product name with facet counts of docs in signals",
  "tags": ["ecommerce"],
  "format": "solr",
  "cacheOnLoad": true,
  "options": ["collection -> ecommerce", "expr -> select(hashJoin(search(ecommerce,q=\"*:*\",fl=\"id,name\",sort=\"id asc\",qt=\"\/export\",partitionKeys=\"id\"),hashed=facet(ecommerce_signals,q=\"*:*\",buckets=\"doc_id_s\",bucketSizeLimit=10000,bucketSorts=\"count(*) desc\",count(*)),on=\"id=doc_id_s\"),name as product_name,count(*) as click_count,id as product_id)"]}' $FUSION_API/catalog/fusion/assets

Here’s another example of a streaming expression that leverages the underlying faceting engine’s support for computing aggregations beyond just a count:

select(
  facet(
    ratings,
    q="*:*",
    buckets="rating",
    bucketSorts="count(*) desc",
    bucketSizeLimit=100,
    count(*),
    sum(rating),
    min(rating),
    max(rating),
    avg(rating)
  ),
  rating,
  count(*) as the_count,
  sum(rating) as the_sum,
  min(rating) as the_min,
  max(rating) as the_max,
  avg(rating) as the_avg
)

Example 7: Use sampling

When doing exploratory analysis on a large table, use the rand function to bring back a small random sample of data from Solr, for example:

select user_id as user, age as _age_ from users where rand() < 0.1 AND gender='F'

The rand function returns a random number between 0 and 1, so in the previous example, we’re requesting a sample that is roughly 10% of the total number of rows. Behind the scenes, Fusion SQL translates the rand UDF into a random streaming expression to draw a sample.

Example 8: Cache results in Spark

If you plan to perform additional queries against results, cache results in Spark.

Catalog assets support the cacheOnLoad attribute, which caches the results of the query in Spark (memory with spill to disk). You can also request the results for any query sent to the Catalog API to be cached using the cacheResultsAs parameter:

curl -XPOST -H "Content-Type:application/json" -d '{
  "sql":"SELECT u.user_id as user_id, age, gender, occupation, place_name, county, state, zip_code, geo_location_rpt, title, movie_id, rating, rating_timestamp FROM minn_users u INNER JOIN movie_ratings m ON u.user_id = m.user_id",
  "cacheResultsAs": "ratings_by_minn_users"
}' "$FUSION_API/catalog/fusion/query"

Be careful! If cached, updates to the underlying data source, most likely Solr, will no longer be visible. To trigger Spark to re-compute a cached view by going back to the underlying store, you can use the following SQL command:

curl -XPOST -H "Content-Type:application/json" -d '{"sql":"refresh table table-name"}' \
"$FUSION_API/catalog/fusion/query"

If a table isn’t cached, you can cache it using:

curl -XPOST -H "Content-Type:application/json" -d '{"sql":"cache table table-name"}' \
"$FUSION_API/catalog/fusion/query"

Or uncache it:

curl -XPOST -H "Content-Type:application/json" -d '{"sql":"uncache table table-name"}' \
"$FUSION_API/catalog/fusion/query"

User defined functions (UDFs)

You can use SQL and Spark User Defined Functions (UDF) to clean and/or transform data from Solr.

For example, Lucidworks Search Hub signals use complex field names generated by Snowplow. The following data asset definition uses SQL to make the data a bit more user friendly as it comes out of Solr:

{
  "name": "shub_signals",
  "assetType": "table",
  "projectId": "fusion",
  "description": "SearchHub signals",
  "tags": ["shub"],
  "format": "solr",
  "cacheOnLoad": false,
  "options": ["collection -> shub_signals", "solr.params -> sort=id asc", "fields -> timestamp_tdt,type_s,params.useragent_family_s,params.useragent_os_family_s,params.tz_s,params.totalResults_s,params.lang_s,params.useragent_type_name_s,params.terms_s,params.query_unique_id,params.useragent_v,params.doc_0,params.doc_1,params.doc_2,params.facet_ranges_publishedOnDate_before_d,params.uid_s,params.refr_s,params.useragent_category_s,params.sid_s,ip_sha_s,params.vid_s,params.page_s,params.fp_s"],
  "sql": "SELECT timestamp_tdt as timestamp, type_s as signal_type, `params.useragent_family_s` as ua_family,`params.useragent_os_family_s` as ua_os,`params.tz_s` as tz,cast(`params.totalResults_s` as int) as num_found, `params.lang_s` as lang, `params.useragent_type_name_s` as ua_type, `params.terms_s` as query_terms, `params.query_unique_id` as query_id, `params.useragent_v` as ua_vers, `params.doc_0` as doc0, `params.doc_1` as doc1, `params.doc_2` as doc2, `params.facet_ranges_publishedOnDate_before_d` as pubdate_range, `params.uid_s` as user_id, `params.refr_s` as referrer, `params.useragent_category_s` as ua_category, `params.sid_s` as session_id, `ip_sha_s` as ip, cast(`params.vid_s` as int) as num_visits, `params.page_s` as page_name, `params.fp_s` as fingerprint FROM shub_signals"
}

Given how Spark uses lazy evaluation of DataFrame transformations, there is little additional overhead beyond the cost of reading from Solr to execute a SQL statement on top of the raw results from Solr.

Beyond simple field renaming, you can also leverage 100s of built-in UDFs to enrich and/or transform fields. See: https://spark.apache.org/docs/1.6.3/api/scala/index.html#org.apache.spark.sql.functions$

In the example above, the cast function casts a string field coming from Solr as an int:

cast(`params.vid_s` as int)

Keep in mind that Solr is a much more flexible data engine than is typically handled by BI visualization tools like Tableau. Consequently, the Fusion Catalog API lets you apply more structure to less structured data in Solr. Here’s an example of using a UDF to aggregate by day (similar to Solr’s round down operator):

select count(*) as num_per_day, date_format(rating_timestamp,"yyyy-MM-dd") as date_fmt from ratings group by date_format(rating_timestamp,"yyyy-MM-dd") order by num_per_day desc LIMIT 10