SQL Aggregations

Fusion 4.0 introduces SQL aggregation for aggregating signals or other data. SQL is a familiar query language that is well suited to data aggregation. Fusion’s new SQL Aggregation Engine has more power and flexibility than Fusion’s legacy aggregation engine.

Note
The aggregation approach available in prior Fusion releases is still available, though it is deprecated and will be removed in a future release. We now refer to the prior aggregation approach as "legacy aggregations."

Advantages of SQL aggregation

These are advantages of SQL aggregation relative to legacy aggregation:

  • It’s SQL! – You can write SQL queries to aggregate your data.

  • Built-in aggregation functions – A SQL query can use any of the functions provided by Spark SQL. Use these functions to perform complex aggregations or to enrich aggregation results.

  • A customizable time-decay function – You can now customize the exponential time-decay function that Fusion uses for aggregations. + Prior to release 4.0, Fusion used a fixed time-decay function to determine aggregation weights. In Fusion 4.0, the time-decay function is implemented as a UDAF, so you can easily implement your own time-decay function.

  • Aggregate data from many types of data sources – You can use any asset in the Fusion Catalog as a data source. This lets you aggregate data from any data source supported by Spark.

  • Performance – Although performance results can vary, Fusion SQL aggregations are roughly 5 times faster than legacy Fusion aggregations (using the default aggregation as the comparison).

Key features

Here, we focus on several key features of SQL aggregations.

Rollup SQL

Most aggregation jobs run with the catch-up flag set to true, which means that Fusion only computes aggregations for new signals that have arrived since the last time the job was run, and up to and including ref_time, which is usually the run time of the current job. Fusion must "roll up" the newly aggregated rows into any existing aggregated rows in the _aggr collection.

Fusion generates a basic rollup SQL script automatically, by consulting the schema of the aggregated documents. If your roll-up logic is complex, you can provide a custom rollup SQL script.

This is an example of a rollup query:

SELECT query_s, doc_id_s, time_decay(1, timestamp_tdt, "30 days", ref_time, weight_d)
AS weight_d , SUM(aggr_count_i) AS aggr_count_i
FROM `commerce_signals_aggr` GROUP BY query_s, doc_id_s

Time-range filtering

When Fusion rolls up new data into an aggregation, time-range filtering lets you ensure that Fusion doesn’t aggregate the same data over and over again.

Fusion applies a time-range filter when loading rows from Solr, before executing the aggregation SQL statement. In other words, the SQL executes over rows that are already filtered by the appropriate time range for the aggregation job.

Notice that the examples Perform the Default SQL Aggregation and Use Different Weights Based on Signal Types don’t include a time-range filter. Fusion computes the time-range filter automatically as follows:

  • If the catch-up flag is set to true, Fusion uses the last time the job was run and ref_time (which you typically set to the current time). This is equivalent to the WHERE clause WHERE time > last_run_time AND time <= ref_time.

  • If the catch-up flag isn’t set to true, Fusion uses a filter with ref_time (and no start time). This is equivalent to the WHERE clause WHERE time <= ref_time.

The built-in time logic should suffice for most use cases. You can set the time range filter to [* TO *], and then specify a WHERE clause filter to achieve more complex time based filtering.

SQL functions

A Spark SQL aggregation query can use any of the functions provided by Spark SQL. Use these functions to perform complex aggregations or to enrich aggregation results.

Weight aggregated values using a time-decay function

Fusion automatically uses a default time_decay function to compute and apply appropriate weights to aggregation groups during aggregation (larger weights for more recent events and smaller weights for less recent events). This reduces the impact of less-recent signals. Intuitively, older signals (and the user behavior they represent) should count less than newer signals.

If the default time_decay function doesn’t meet your needs, you can modify it. The time_decay function is implemented as a UserDefinedAggregateFunction (UDAF).

Full function signature

This is the UDAF signature of the default time_decay function:

time_decay(count: Long,
           timestamp: Timestamp,
           halfLife: String (calendar interval),
           ref_time: Timestamp,
           weight_d: Double)

One small difference between the prior and current behavior is worth mentioning in passing:

  • Prior to Release 4.0, the decay_sum aggregator function used the difference between the aggregationTime (the time at which the aggregation job is run) and the event time to calculate exponentially decayed numerical values.

  • In Release 4.0, time_decay is a similar function. In time_decay, ref_time is used instead of aggregationTime. You can set aggregationTime to some other time than the run time of the aggregation job.

In practice, you will probably want to use aggregationTime as ref_time.

Abbreviated function signature and default values

Your function call can also use this abbreviated UDAF signature, that omits halfLife, ref_time, and weight_d:

time_decay(count: Long,
           timestamp: Timestamp)

In this case, Fusion fills in these values for the omitted parameters: halfLife = 30 days, ref_time = NOW, and weight_d = 0.1.

Matching legacy aggregation

To match the results of legacy aggregation, either use the abbreviated function signature or supply these values for the mentioned parameters: halfLife = 30 days, ref_time = NOW, and weight_d = 0.1.

Parameters

Parameters for time_decay are:

Parameter Description

count

Number of occurrences of the event. Typically, the increment is 1, though there is no reason it couldn’t be some other number. In most cases, you simply pass count_i, which is the event count field used by Fusion signals, as shown in the SQL aggregation examples.

timestamp

The date-and-time for the event. This time is the beginning of the interval used to calculate the time-based decay factor.

halfLife

Half life for the exponential decay that Fusion calculates. It is some interval of time, for example, 30 days or 10 minutes. The interval prefix is optional. Fusion treats 30 days as equivalent to interval 30 days.

ref_time

Reference time used to compute the age of an event for the time-decay computation. It is usually the time when the aggregation job runs (NOW). The reference time is not present in the data; Fusion determines the reference time at runtime. Fusion automatically attaches a ref_time column to every row before executing the SQL.

weight_d

Initial weight for an event, prior to the decay calculation. This value is typically not present in the signal data.

You can use SQL to compute weight_d; see Use Different Weights Based on Signal Types for an example.


.Sample calculation of the age of a signal This is an example of how Fusion calculates the age of a signal:

Imagine a SQL aggregation job that runs at Tuesday, July 11, 2017 1:00:00 AM (1499734800). For a signal with the timestamp Tuesday, July 11, 2017 12:00:00 AM (1499731200), the age of the signal in relation to the reference time is 1 hour.

Built-in SQL aggregation jobs

When signals are enabled for a collection, Fusion creates the following SQL aggregation jobs.

<collection>_click_signals_aggregation

The <collection>_click_signals_aggregation job computes a time-decayed weight for each document, query, and filters group in the signals collection. Fusion computes the weight for each group using an exponential time-decay on signal count (30 day half-life) and a weighted sum based on the signal type. This approach gives more weight to a signal that represents a user purchasing an item than to a user just clicking on an item.

You can customize the the signal types and weights for this job by changing the signalTypeWeights SQL parameter in the Fusion Admin UI.

signalTypeWeights

When the SQL aggregation job runs, Fusion translates the signalTypeWeights parameter into a WHERE IN clause to filter signals by the specified types (click, cart, purchase), and also passes the parameter into the weighted_sum SQL function. Notice that Fusion only displays the SQL parameters and not the actual SQL for this job. This is to simplify the configuration because, in most cases, you only need to change the parameters and not worry about the actual SQL. However, if you need to change the SQL for this job, you can edit it under the Advanced toggle on the form.

<collection>_session_rollup

The <collection>_session_rollup job aggregates related user activity into a session signal that contains activity count, duration, and keywords (based on user search terms). The Fusion App Insights application uses this job to show reports about user sessions. Use the elapsedSecsSinceLastActivity and elapsedSecsSinceSessionStart parameters to determine when a user session is considered to be complete. You can edit the SQL using the Advanced toggle.

The <collection>_session_rollup job uses signals as the input collection and output collection. Unlike other aggregation jobs that write aggregated documents to the <collection>_signals_aggr collection, the <collection>_session_rollup job creates session signals and saves them to the <collection>_signals collection.

<collection>_user_item_preferences_aggregation

The <collection>_user_item_preferences_aggregation job computes an aggregated weight for each user/item combination found in the signals collection. The weight for each group is computed using an exponential time-decay on signal count (30 day half-life) and a weighted sum based on the signal type. Use the signalTypeWeights parameter to set the correct signal types and weights for your dataset. You can use the results of this job as input to the ALS recommendation job.

If recommendations are enabled for your collection, then the ALS recommender job is scheduled to run after this job completes. Consequently, you should only run this aggregation once or twice a day, because training a recommender model is a complex, long-running job that requires significant resources from your Fusion cluster.

<collection>_user_query_history_aggregation

The <collection>_user_query_history_aggregation job computes an aggregated weight for each user/query combination found in the signals collection. The weight for each group is computed using an exponential time-decay on signal count (30 day half-life) and a weighted sum based on the signal type. Use the signalTypeWeights parameter to set the correct signal types and weights for your dataset. You can use the results of this job to boost queries for a user based on their past query activity.

Join signals with item metadata

Fusion’s basic aggregation jobs aggregate using the document ID. You can also aggregate at a more coarse-grained level using other fields available for documents (item metadata), such as manufacturer or brand for products. Aggregating with item metadata is useful for building personalization boosts into your search application.

The following PUT request creates additional aggregation jobs that join signals with the primary products collection to compute an aggregated weight for a manufacturer field:

curl -X PUT -H "Content-type:application/json" -d '{"enabled":true, "metadata_column":"manufacturer"}' "http://localhost:8764/api/collections/products/features/signals"

After performing the PUT request shown above, you will have two additional aggregation jobs in Fusion.

<collection>_user_<metadata>_preferences_aggregation

This job computes an aggregated weight for each user/item metadata combination, e.g. user/manufacturer, found in the signals collection. Fusion computes the weight for each group using an exponential time-decay on signal count (30 day half-life) and a weighted sum based on the signal type. Use the signalTypeWeights parameter to set the correct signal types and weights for your dataset. Use the primaryCollectionMetadataField parameter to set the name of a field from the primary collection to join into the results, e.g. manufacturer. You can use the results of this job to boost queries based on user preferences regarding item-specific metadata such as manufacturer (e.g. Ford vs. BMW) or brand (e.g. Ralph Lauren vs. Abercrombie & Fitch).

<collection>_query_<metadata>_preferences_aggregation

This job computes an aggregated weight for each query/item metadata combination, e.g. query/manufacturer, found in the signals collection. Fusion computes the weight for each group using an exponential time-decay on signal count (30 day half-life) and a weighted sum based on the signal type. Use the signalTypeWeights parameter to set the correct signal types and weights for your dataset. Use the primaryCollectionMetadataField parameter to set the name of a field from the primary collection to join into the results.

Tip
These additional item item/metadata aggregation jobs also serve as examples of how to join between the signals and primary collections to perform aggregations on fields other than the document ID. You can re-execute the same PUT request shown above using a different metadata field name in the metadata_column parameter.

Write SQL aggregations

In this section, we provide guidance to help you make the most of the Fusion SQL aggregation engine.

Project fields into the signals_aggr collection

For legacy reasons, the <collection>_signals_aggr collection relies on dynamic field names, such as doc_id_s and query_s instead of doc_id and query. Consequently, when you project fields to be written to the <collection>_signals_aggr collection, you should use dynamic field suffixes as shown in the SQL snippet below:

SELECT SUM(typed_aggr_count_i) AS aggr_count_i,
       query AS query_s,
       query AS query_t,
       doc_id AS doc_id_s,
       filters AS filters_s,
       SPLIT(filters, ' \\$ ') AS filters_ss,
       weighted_sum(...) AS weight_d
     FROM signal_type_groups
 GROUP BY query, doc_id, filters

You’re not required to use this approach, but if you don’t use dynamic field suffixes as shown above, you’ll need to change the boosting stages in Fusion to work with different field names.

Use WITH to organize complex queries

A common pattern in SQL aggregation queries is the use of subqueries to break up the logic into comprehensible units. For more information about the WITH clause, see https://modern-sql.com/feature/with. Let’s work through an example to illustrate the key points:

 1:  WITH signal_type_groups AS (
 2:      SELECT SUM(count_i) AS typed_aggr_count_i,
 3:             doc_id,
 4:             user_id,
 5:             type,
 6:             time_decay(count_i, timestamp_tdt) AS typed_weight_d
 7:        FROM product_signals
 8:       WHERE type IN ('click','cart','purchase')
 9:    GROUP BY user_id, doc_id, type
10: ) SELECT SUM(typed_aggr_count_i) AS aggr_count_i,
11:          doc_id AS doc_id_s,
12:          user_id AS user_id_s,
13:      weighted_sum(...) AS weight_d
14:     FROM signal_type_groups
15: GROUP BY doc_id, user_id

At line 1, we declare a statement scoped view named signal_type_groups using the WITH keyword.

Lines 2-9 define the subquery for the signal_type_groups view.

At line 7, we read from the product_signals collection in Fusion.

Line 8 filters the input to only include click, cart, and purchase signals. Behind the scenes, Fusion translates this WHERE IN clause to a Solr filter query, e.g. fq=type:(click OR cart OR purchase). The signal_type_groups view produces rows grouped by user_id, doc_id, and type (line 9).

Starting at line 10, we define a subquery that performs a rollup over the rows in the signal_type_groups view by grouping on doc_id and user_id (line 15). Notice how the WITH statement helps break this complex query up into two units that help make aggregation queries easier to comprehend. You are encouraged to adopt this pattern in your own SQL aggregation queries.

Built-in SQL Functions

In addition to the SQL functions provided by Spark, Fusion provides several additional functions to simplify common aggregation tasks. To recap, a UDAF aggregates multiple rows for the same group by key and a UDF performs some operation on a single row.

weighted_sum

The weighted_sum UDAF takes a weight, type, and type-weight mapping to produce an aggregated weight. For example, consider the following SQL snippet:

  SELECT query,
         doc_id,
         filters,
         weighted_sum(typed_weight_d, type, 'click:1.0,cart:10.0') AS weight_d
    FROM signal_type_groups
GROUP BY query, doc_id, filters

When applied to the rows in the table below, the weighted_sum function produces a final weight_d of 12.0 (2*1.0 + 1*10.0). The UDAF is passed rows grouped by query, doc_id, and filters.

query type doc_id filters typed_weight_d

iPad

click

1

gear

2

iPad

cart

1

gear

1

timediff

The timediff UDF computes the difference, in milliseconds, between to timestamps in the same row. From the session_rollup job, the timediff function computes the difference between the current time and the last activity in a session.

click_pos

The click_pos UDF computes either a reciprocal rank or a raw click position (using a 0-based index) of a document in a page of results. This UDF is used to compute the mean reciprocal rank (MRR) for experiments. For example, given the following list of documents and a doc ID, the click_pos UDF will return 2:

docs: a,b,c,d doc ID: c

concat_text

The concat_text UDF combines multivalued text fields coming from Solr into a field with a single value delimited by spaces. This UDF is useful when a field returned from Solr uses the _txt suffix, which indicates a multivalued text field.

Create and run a SQL aggregation job

You can perform a SQL aggregation on a signals collection for a datasource (or on some other collection), through the Fusion UI or using the Fusion API.

Preliminaries

Before you can create and run a SQL aggregation job, you must create an app, create a collection (or use the default collection for the app), and add a datasource. Before you run a SQL aggregation job, you need signal data. Otherwise, there is nothing to aggregate.

Use the Fusion UI

You can use the Fusion UI to perform a SQL aggregation.

Set up a SQL aggregation job
  1. With the app open, navigate to Collections Collections > Jobs.

  2. Click Add and select Aggregation from the dropdown list.

  3. Specify an arbitrary Spark Job ID.

  4. For the Source Collection, select the collection that contains the data to aggregate.

    Tip
    This is not the base collection. For example, to aggregate the signals in experiment_signals, you would select experiment_signals, not experiment.
  5. Under Aggregation Settings, expand SQL Aggregation.

  6. Enter or paste SQL in the SQL text box. Optionally, click Open Editor to open a dialog box with a larger editor. Click Close to close the dialog box.

  7. Click Save to save the job.

Run a SQL aggregation job
  1. With the app open, navigate to Collections Collections > Jobs.

  2. In the list of jobs, click the job you want to run, and then click Run.

  3. Click Start.

  4. Click Close Close to close the job management part of the Fusion UI.

Use the Fusion API

You can use the Fusion API to perform a SQL aggregation. This is an example.

Configure a SQL aggregation job:
curl ":8764/api/spark/configurations/experiment_signals_aggregation"
{
  "type" : "spark",
  "id" : "experiment_click_signals_aggregation",
  "definition" : {
    "id" : "experiment_click_signals_aggregation",
    "sql" : "SELECT SUM(count_i) AS aggr_count_i, query AS query_s, doc_id AS doc_id_s, time_decay(count_i, timestamp_tdt) AS weight_d FROM default_signals WHERE type_s='click' GROUP BY query, doc_id",
    "selectQuery" : "*:*",
    "outputPipeline" : "_system",
    "rollupAggregator" : "SQL",
    "sourceRemove" : false,
    "sourceCatchup" : true,
    "outputRollup" : true,
    "parameters" : [ {
      "key" : "optimizeOutput",
      "value" : "4"
    } ]
  },
  "inputCollection" : "experiment_click_signals",
  "rows" : 10000
}
Run a SQL aggregation job:
curl -u user:pass -X POST -H "Content-Type: application/json" http://localhost:8764/api/jobs/spark:experiment_click_signals_aggregation/actions -d '{"action": "start"}'

SQL aggregation examples

To acquaint you with how to use Spark SQL in SQL aggregations, here are several examples.

Perform the default SQL aggregation

This is the default SQL aggregation of signals for a base collection named products. It produces the same results as legacy aggregation.

SELECT SUM(count_i) AS aggr_count_i,
         query AS query_s,
         doc_id AS doc_id_s,
         time_decay(count_i, date) AS weight_d
    FROM products_signals
GROUP BY query, doc_id

Notice the following about this SQL:

  • SELECT SUM(count_i) AS aggr_count_icount_i is summed as aggr_count_i.

  • time_decay(count_i, date) AS weight_d – The time_decay function computes the aggregated weight_d field. This function is a Spark UserDefinedAggregateFunction (UDAF) that is built into Fusion. The function computes a weight for each aggregation group, using the count and an exponential decay on the signal timestamp, with a 30-day half life.

  • GROUP BY query, doc_id – The GROUP BY clause defines the fields used to compute aggregate metrics, which are typically the query, doc_id, and any filters. With SQL, you have more options to compute aggregated metrics without having to write custom JavaScript functions (which would be needed to supplement legacy aggregations). You can also use standard WHERE clause semantics, for example, WHERE type_s = 'add', to provide fine-grained filters.

  • The time_decay function uses an abbreviated function signature, time_decay(count_i, timestamp_tdt), instead of the full function signature shown in Use Different Weights Based on Signal Types.

An example of how SQL aggregation works

This is an example of how this aggregation works. Consider the following four input signals for a fictitious query q1 and document 1:

[{
  "type_s":"add",
  "doc_id_s":"1",
  "query_s":"q1",
  "count_i":1,
  "timestamp_tdt":"2017-07-11T00:00:00Z"},
{
  "type_s":"view",
  "doc_id_s":"1",
  "query_s":"q1",
  "count_i":1,
  "timestamp_tdt":"2017-07-11T00:00:00Z"},
{
  "type_s":"add",
  "doc_id_s":"1",
  "query_s":"q1",
  "count_i":1,
  "timestamp_tdt":"2017-07-11T00:00:00Z"},
{
  "type_s":"view",
  "doc_id_s":"1",
  "query_s":"q1",
  "count_i":1,
  "timestamp_tdt":"2017-07-11T00:00:00Z"}]

Fusion generates the following aggregated document for q1:

{
   "aggr_count_i":4,
   "query_s":"q1",
   "doc_id_s":"1",
   "weight_d":0.36644220285922535,
   "aggr_id_s":"products_sql_agg",
   "aggr_job_id_s":"15d4279d128T755e5137",
   "flag_s":"aggr",
   "query_t":"q1",
   "aggr_type_s":"sql",
   "timestamp_tdt":"2017-07-14T19:01:05.950Z"}

Use different weights based on signal types

This is a slightly more complex example that uses a subquery to compute a custom weight for each signal based on the signal type (add vs. click):

SELECT SUM(count_i) AS aggr_count_i,
       query_s,
       doc_id_s,
       time_decay(count_i, timestamp_tdt, "5 days", ref_time, signal_weight) AS weight_d
  FROM (SELECT count_i,
               query_s,
               doc_id_s,
               timestamp_tdt,
               ref_time,
               CASE WHEN type_s='add' THEN 0.25 ELSE 0.1 END AS signal_weight
          FROM products_signals)
GROUP BY query_s, doc_id_s

Compute metrics for sessions

This aggregation query uses a number of Spark SQL functions to compute some metrics for sessions:

SELECT concat_ws('||', clientip, session_id) as id,
       first(clientip) as clientip,
       min(ts) as session_start,
       max(ts) as session_end,
       (unix_timestamp(max(ts)) - unix_timestamp(min(ts))) as session_len_secs_l,
       sum(asInt(bytes)) as total_bytes_l,
       count(*) as total_requests_l
  FROM sessions
GROUP BY clientip, session_id