Skip to main content
To acquaint you with how to use Spark SQL in SQL aggregations, here are several examples:

Perform the default SQL aggregation

This is the default SQL aggregation of signals for a base collection named products. It produces the same results as legacy aggregation:
SELECT SUM(count_i) AS aggr_count_i,
         query AS query_s,
         doc_id AS doc_id_s,
         time_decay(count_i, date) AS weight_d
    FROM products_signals
GROUP BY query, doc_id
Notice the following about this SQL:
  • SELECT SUM(count_i) AS aggr_count_i. count_i is summed as aggr_count_i.
  • time_decay(count_i, date) AS weight_d. The time_decay function computes the aggregated weight_d field. This function is a Spark UserDefinedAggregateFunction (UDAF) that is built into Fusion. The function computes a weight for each aggregation group, using the count and an exponential decay on the signal timestamp, with a 30-day half life.
  • GROUP BY query, doc_id. The GROUP BY clause defines the fields used to compute aggregate metrics, which are typically the query, doc_id, and any filters. With SQL, you have more options to compute aggregated metrics without having to write custom JavaScript functions (which would be needed to supplement legacy aggregations). You can also use standard WHERE clause semantics, for example, WHERE type_s = 'add', to provide fine-grained filters.
  • The time_decay function uses an abbreviated function signature, time_decay(count_i, timestamp_tdt), instead of the full function signature shown in Use Different Weights Based on Signal Types.
An example of how SQL aggregation works This is an example of how this aggregation works. Consider the following four input signals for a fictitious query q1 and document 1:
[{
  "type_s":"add",
  "doc_id_s":"1",
  "query_s":"q1",
  "count_i":1,
  "timestamp_tdt":"2017-07-11T00:00:00Z"},
{
  "type_s":"view",
  "doc_id_s":"1",
  "query_s":"q1",
  "count_i":1,
  "timestamp_tdt":"2017-07-11T00:00:00Z"},
{
  "type_s":"add",
  "doc_id_s":"1",
  "query_s":"q1",
  "count_i":1,
  "timestamp_tdt":"2017-07-11T00:00:00Z"},
{
  "type_s":"view",
  "doc_id_s":"1",
  "query_s":"q1",
  "count_i":1,
  "timestamp_tdt":"2017-07-11T00:00:00Z"}]
Fusion generates the following aggregated document for q1:
{
   "aggr_count_i":4,
   "query_s":"q1",
   "doc_id_s":"1",
   "weight_d":0.36644220285922535,
   "aggr_id_s":"products_sql_agg",
   "aggr_job_id_s":"15d4279d128T755e5137",
   "flag_s":"aggr",
   "query_t":"q1",
   "aggr_type_s":"sql",
   "timestamp_tdt":"2017-07-14T19:01:05.950Z"}

Use different weights based on signal types

This is a slightly more complex example that uses a subquery to compute a custom weight for each signal based on the signal type (add vs. click):
SELECT SUM(count_i) AS aggr_count_i,
       query_s,
       doc_id_s,
       time_decay(count_i, timestamp_tdt, "5 days", ref_time, signal_weight) AS weight_d\
  FROM (SELECT count_i,
               query_s,
               doc_id_s,
               timestamp_tdt,
               ref_time,
               CASE WHEN type_s='add' THEN 0.25 ELSE 0.1 END AS signal_weight
          FROM products_signals)
GROUP BY query_s, doc_id_s

Compute metrics for sessions

This aggregation query uses a number of Spark SQL functions to compute some metrics for sessions:
SELECT concat_ws('||', clientip, session_id) as id,
       first(clientip) as clientip,
       min(ts) as session_start,
       max(ts) as session_end,
       (unix_timestamp(max(ts)) - unix_timestamp(min(ts))) as session_len_secs_l,
       sum(asInt(bytes)) as total_bytes_l,
       count(*) as total_requests_l\
  FROM sessions
GROUP BY clientip, session_id

Learn more

So as to not conflict with the CPU and memory settings used for Fusion driver applications (default & script), the Fusion SQL service uses a unique set of configuration properties for granting CPU and memory for executing SQL queries.You can use the Configurations API to override the default values shown here.
Configuration Property and DefaultDescription
fusion.sql.cores 1Sets the max number of cores to use across the entire cluster to execute SQL queries. Give as many as possible while still leaving CPU available for other Fusion jobs.
fusion.sql.executor.cores 1Number of cores to use per executor
fusion.sql.memory 1gMemory per executor to use for executing SQL queries
fusion.sql.default.shuffle.partitions 20Default number of partitions when performing a distributed group-by-type operation, such as a JOIN
fusion.sql.bucket_size_limit.threshold 30,000,000Threshold that determines when to use Solr streaming rollup instead of facet when computing aggregations; rollup can handle high cardinality dimensions but is much slower than using facets to compute aggregate measures.
fusion.sql.max.no.limit.threshold 10,000Sets a limit for SQL queries that select all fields and all rows, that is, select * from table-name.
fusion.sql.max.cache.rows 5,000,000Do not cache tables bigger than this threshold. If a user sends the cache-table command for large collections with row counts that exceed this value, then the cache operations will fail.
fusion.sql.max_scan_rows 2,000,000Safeguard mechanism to prevent queries that request too many rows from large tables. Queries that read more than this many rows from Solr will fail; increase this threshold for larger Solr clusters that can handle streaming more rows concurrently.
The Fusion SQL service is designed for executing analytics-style queries over large data sets. You need to provide ample CPU and memory so that queries execute efficiently and can leverage Spark’s in-memory caching for joins and aggregations.
Here is an example of increasing the resources for the Fusion SQL service:
curl -H 'Content-type:application/json' -X PUT -d '8' "http://<FUSION_HOST>/api/configurations/fusion.sql.cores"
curl -H 'Content-type:application/json' -X PUT -d '8' "http://<FUSION_HOST>/api/configurations/fusion.sql.executor.cores"
curl -H 'Content-type:application/json' -X PUT -d '2g' "http://<FUSION_HOST>/api/configurations/fusion.sql.memory"
curl -H 'Content-type:application/json' -X PUT -d '8' "http://<FUSION_HOST>/api/configurations/fusion.sql.default.shuffle.partitions"
If you change any of these settings, you must restart the Fusion SQL service with ./sql restart (on Unix) or sql.cmd restart (on Windows).The Fusion SQL service is a long-running Spark application and, as such, it holds on to the resources (CPU and memory) allocated to it using the aforementioned settings. Consequently, you might need to reconfigure the CPU and memory allocation for other Fusion Spark jobs to account for the resources given to the Fusion SQL service. In other words, any resources you give to the Fusion SQL service are no longer available for running other Fusion Spark jobs. For more information on adjusting the CPU and memory settings for Fusion Spark jobs, see the Spark configuration settings.
With Solr, you can index different document types into the same shard using the composite ID router based on a common route key field. For example, a customer 360 application can index different customer-related document types (contacts, apps, support requests, and so forth) into the same collection, each with a common customer_id field. This lets Solr perform optimized joins between the document types using the route key field. This configuration uses Solr’s composite ID routing, which ensures that all documents with the same join key field end up in the same shard. See Document Routing.

Providing a compositeIdSpec for the Fusion collection

Before indexing, you need to provide a compositeIdSpec for the Fusion collection. For example:
curl -u USERNAME:PASSWORD -X POST -H "Content-type:application/json" \
  -d '{"id":"customer","solrParams":{"replicationFactor":1,"numShards":1,"maxShardsPerNode":10},"type":"DATA","compositeIdSpec":{"routeKey1Field":"customer_id_s"}}' \
  "https://FUSION_HOST:6764/apps/APP_NAME/collections?defaultFeatures=false"
In the example request above, we create a collection named customer with the route key field set to customer_id_s. When documents are indexed through Fusion, the Solr Index pipeline stage uses the compositeIdSpec to create a composite document ID, so documents get routed to the correct shard.

Exposing document types as virtual tables

If you configure your Fusion collection to use a route key field to route different document types to the same shard, then the Fusion SQL service can expose each document type as a virtual table and perform optimized joins between these virtual tables using the route key. To create virtual tables, you simply need to use the Fusion Catalog API on the data asset for the main collection to set the name of the field that determines the document type. For example, if you have a collection named customer that contains different document types (contacts, support tickets, sales contracts, and so forth), then you would set up virtual tables using the following Catalog API update request:
curl -XPUT -H "Content-type:application/json" http://<FUSION_HOST>/api/catalog/fusion/assets/customer -d '{
  "projectId" : "fusion",
  "name" : "customer",
  "assetType" : "table",
  "description" : "Fusion collection customer",
  "format" : "solr",
  "options" : [ "collection -> customer", "exclude_fields -> _lw_*,*_\\d_coordinate,_raw_content_", "solr.params -> sort=id asc" ],
  "cacheOnLoad" : false,
  "id" : "fusion.customer",
  "additionalSettings": {
    "virtualTableField":"doc_type_s"
  }
}'
In the example above, we set the virtualTableField to doc_type_s. Fusion sends a facet request to the customer collection to get the unique values of the doc_type_s field and creates a data asset for each unique value. Each virtual table is registered in the Fusion SQL service as a table.

Performing optimized joins in SQL

After you have virtual tables configured and documents routed to the same shard using a compositeIdSpec, you can perform optimized joins in SQL that take advantage of Solr’s domain-join facet feature. For example, the following SQL statement results in a JSON facet request to Solr to perform the aggregation:
select count(1) num_support_requests,
       c.industry as industry,
       a.app_id as app_id,
       a.feature_id as feature_id
from customer c
join support s on c.customer_id = s.customer_id
join apps a on s.customer_id = a.customer_id
where c.region='US-East' AND s.support_type='Enhancement' AND a.app_type='Search'
group by industry, app_id, feature_id
In the example above, we compute the number of feature enhancement requests for Search applications from customers in the US-East region by performing a 3-way join between the customer, support, and apps virtual tables using the customer_id join key. Behind the scenes, Fusion SQL performs a JSON facet query that exploits all documents with the same customer_id value being in the same shard. This lets Solr compute the count for the industry, app_id, feature_id group by key more efficiently than is possible using table scans in Spark.
In this article, we provide guidance to help you make the most of the Fusion SQL aggregation engine.

Project fields into the signals_aggr collection

For legacy reasons, the COLLECTION_NAME_signals_aggr collection relies on dynamic field names, such as doc_id_s and query_s instead of doc_id and query. Consequently, when you project fields to be written to the COLLECTION_NAME_signals_aggr collection, you should use dynamic field suffixes as shown in the SQL snippet below:
SELECT SUM(typed_aggr_count_i) AS aggr_count_i,
        query AS query_s,
        query AS query_t,
        doc_id AS doc_id_s,
        filters AS filters_s,
        SPLIT(filters, ' \\$ ') AS filters_ss,
        weighted_sum(...) AS weight_d
        FROM signal_type_groups
  GROUP BY query, doc_id, filters
You are not required to use this approach, but if you do not use dynamic field suffixes as shown above, you will need to change the boosting stages in Fusion to work with different field names.

Use WITH to organize complex queries

A common pattern in SQL aggregation queries is the use of subqueries to break up the logic into comprehensible units. For more information about the WITH clause, see https://modern-sql.com/feature/with. Let us work through an example to illustrate the key points:
 1:  WITH signal_type_groups AS (
 2:      SELECT SUM(count_i) AS typed_aggr_count_i,
 3:             doc_id,
 4:             user_id,
 5:             type,
 6:             time_decay(count_i, timestamp_tdt) AS typed_weight_d
 7:        FROM product_signals
 8:       WHERE type IN ('click','cart','purchase')
 9:    GROUP BY user_id, doc_id, type
10: ) SELECT SUM(typed_aggr_count_i) AS aggr_count_i,
11:          doc_id AS doc_id_s,
12:          user_id AS user_id_s,
13:      weighted_sum(...) AS weight_d
14:     FROM signal_type_groups
15: GROUP BY doc_id, user_id
  • At line 1, we declare a statement scoped view named signal_type_groups using the WITH keyword.
  • Lines 2-9 define the subquery for the signal_type_groups view.
  • At line 7, we read from the product_signals collection in Fusion.
  • Line 8 filters the input to only include click, cart, and purchase signals. Behind the scenes, Fusion translates this WHERE IN clause to a Solr filter query, e.g. fq=type:(click OR cart OR purchase). The signal_type_groups view produces rows grouped by user_id, doc_id, and type (line 9).
  • Starting at line 10, we define a subquery that performs a rollup over the rows in the signal_type_groups view by grouping on doc_id and user_id (line 15). Notice how the WITH statement helps break this complex query up into two units that help make aggregation queries easier to comprehend. You are encouraged to adopt this pattern in your own SQL aggregation queries.
I