SQL Aggregations

Fusion 4.0 introduces SQL aggregation for aggregating signals or other data. SQL is a familiar query language that is well suited to data aggregation.

Note
The aggregation approach available in prior Fusion releases is still available, though it is deprecated and will be removed in a future release. We now refer to the prior aggregation approach as "legacy aggregations."

Advantages of SQL aggregation

These are advantages of SQL aggregation relative to legacy aggregation:

  • It’s SQL! – You can write SQL queries to aggregate your data.

  • Built-in aggregation functions – A SQL query can use any of the functions provided by Spark SQL. Use these functions to perform complex aggregations or to enrich aggregation results.

  • A customizable time-decay function – You can now customize the exponential time-decay function that Fusion uses for aggregations.

    Prior to release 4.0, Fusion used a fixed time-decay function to determine aggregation weights. In Fusion 4.0, the time-decay function is implemented as a UDAF, so you can easily implement your own time-decay function.

  • Aggregate data from many types of data sources – You can use any asset in the Fusion Catalog as a data source. This lets you aggregate data from any data source supported by Spark.

  • Performance – Although performance results can vary, Fusion SQL aggregations are roughly 5 times faster than legacy Fusion aggregations (using the default aggregation as the comparison).

Key features

Rollup SQL

Most aggregation jobs run with the catch-up flag set to true, which means that Fusion only computes aggregations for new signals that have arrived since the last time the job was run, and up to and including ref_time, which is usually the run time of the current job. Fusion must "roll up" the newly aggregated rows into any existing aggregated rows in the _aggr collection.

Fusion generates a basic rollup SQL script automatically, by consulting the schema of the aggregated documents. If your rollup logic is complex, you can provide a custom rollup SQL script.

This is an example of a rollup query:

SELECT query_s, doc_id_s, time_decay(1, timestamp_tdt, "30 days", ref_time, weight_d)
AS weight_d , SUM(aggr_count_i) AS aggr_count_i
FROM `commerce_signals_aggr` GROUP BY query_s, doc_id_s

Time-range filtering

When Fusion rolls up new data into an aggregation, time-range filtering lets you ensure that Fusion doesn’t aggregate the same data over and over again.

Fusion applies a time-range filter when loading rows from Solr, before executing the aggregation SQL statement. In other words, the SQL executes over rows that are already filtered by the appropriate time range for the aggregation job.

Notice that the examples Perform the Default SQL Aggregation and Use Different Weights Based on Signal Types don’t include a time-range filter. Fusion computes the time-range filter automatically as follows:

  • If the catch-up flag is set to true, Fusion uses the last time the job was run and ref_time (which you typically set to the current time). This is equivalent to the WHERE clause WHERE time > last_run_time AND time <= ref_time.

  • If the catch-up flag isn’t set to true, Fusion uses a filter with ref_time (and no start time). This is equivalent to the WHERE clause WHERE time <= ref_time.

The built-in time logic should suffice for most use cases. You can set the time range filter to TO and specify a WHERE clause filter to achieve more complex time based filtering.

SQL functions

A Spark SQL aggregation query can use any of the functions provided by Spark SQL. Use these functions to perform complex aggregations or to enrich aggregation results.

Weight aggregated values using a time-decay function

Fusion automatically uses a default time_decay function to compute and apply appropriate weights to aggregation groups during aggregation. Larger weights are assigned to more recent events. This reduces the impact of less-recent signals. Intuitively, older signals (and the user behavior they represent) should count less than newer signals.

If the default time_decay function doesn’t meet your needs, you can modify it. The time_decay function is implemented as a UserDefinedAggregateFunction (UDAF).

Full function signature

This is the UDAF signature of the default time_decay function:

time_decay(count: Long,
           timestamp: Timestamp,
           halfLife: String (calendar interval),
           ref_time: Timestamp,
           weight_d: Double)

One small difference between the prior and current behavior is worth mentioning in passing:

  • Prior to Release 4.0, the decay_sum aggregator function used the difference between the aggregationTime (the time at which the aggregation job is run) and the event time to calculate exponentially decayed numerical values.

  • In Release 4.0, time_decay is a similar function. In time_decay, ref_time is used instead of aggregationTime. You can set aggregationTime to some other time than the run time of the aggregation job.

In practice, you will probably want to use aggregationTime as ref_time.

Abbreviated function signature and default values

Your function call can also use this abbreviated UDAF signature, that omits halfLife, ref_time, and weight_d:

time_decay(count: Long,
           timestamp: Timestamp)

In this case, Fusion fills in these values for the omitted parameters: halfLife = 30 days, ref_time = NOW, and weight_d = 0.1.

Matching legacy aggregation

To match the results of legacy aggregation, either use the abbreviated function signature or supply these values for the mentioned parameters: halfLife = 30 days, ref_time = NOW, and weight_d = 0.1.

Parameters

Parameters for time_decay are:

Parameter Description

count

Number of occurrences of the event. Typically, the increment is 1, though there is no reason it couldn’t be some other number. In most cases, you simply pass count_i, which is the event count field used by Fusion signals, as shown in the SQL aggregation examples.

timestamp

The date-and-time for the event. This time is the beginning of the interval used to calculate the time-based decay factor.

halfLife

Half life for the exponential decay that Fusion calculates. It is some interval of time, for example, 30 days or 10 minutes. The interval prefix is optional. Fusion treats 30 days as equivalent to interval 30 days.

ref_time

Reference time used to compute the age of an event for the time-decay computation. It is usually the time when the aggregation job runs (NOW). The reference time is not present in the data; Fusion determines the reference time at runtime. Fusion automatically attaches a ref_time column to every row before executing the SQL.

weight_d

Initial weight for an event, prior to the decay calculation. This value is typically not present in the signal data.

You can use SQL to compute weight_d; see Use Different Weights Based on Signal Types for an example.

Sample calculation of the age of a signal

This is an example of how Fusion calculates the age of a signal:

Imagine a SQL aggregation job that runs at Tuesday, July 11, 2017 1:00:00 AM (1499734800). For a signal with the timestamp Tuesday, July 11, 2017 12:00:00 AM (1499731200), the age of the signal in relation to the reference time is 1 hour.