Product Selector

Fusion 5.12
    Fusion 5.12

    SQL Aggregation Examples

    To acquaint you with how to use Spark SQL in SQL aggregations, here are several examples:

    Perform the default SQL aggregation

    This is the default SQL aggregation of signals for a base collection named products. It produces the same results as legacy aggregation:

    SELECT SUM(count_i) AS aggr_count_i,
             query AS query_s,
             doc_id AS doc_id_s,
             time_decay(count_i, date) AS weight_d
        FROM products_signals
    GROUP BY query, doc_id

    Notice the following about this SQL:

    • SELECT SUM(count_i) AS aggr_count_i. count_i is summed as aggr_count_i.

    • time_decay(count_i, date) AS weight_d. The time_decay function computes the aggregated weight_d field. This function is a Spark UserDefinedAggregateFunction (UDAF) that is built into Fusion. The function computes a weight for each aggregation group, using the count and an exponential decay on the signal timestamp, with a 30-day half life.

    • GROUP BY query, doc_id. The GROUP BY clause defines the fields used to compute aggregate metrics, which are typically the query, doc_id, and any filters. With SQL, you have more options to compute aggregated metrics without having to write custom JavaScript functions (which would be needed to supplement legacy aggregations). You can also use standard WHERE clause semantics, for example, WHERE type_s = 'add', to provide fine-grained filters.

    • The time_decay function uses an abbreviated function signature, time_decay(count_i, timestamp_tdt), instead of the full function signature shown in Use Different Weights Based on Signal Types.

    An example of how SQL aggregation works

    This is an example of how this aggregation works. Consider the following four input signals for a fictitious query q1 and document 1:

    [{
      "type_s":"add",
      "doc_id_s":"1",
      "query_s":"q1",
      "count_i":1,
      "timestamp_tdt":"2017-07-11T00:00:00Z"},
    {
      "type_s":"view",
      "doc_id_s":"1",
      "query_s":"q1",
      "count_i":1,
      "timestamp_tdt":"2017-07-11T00:00:00Z"},
    {
      "type_s":"add",
      "doc_id_s":"1",
      "query_s":"q1",
      "count_i":1,
      "timestamp_tdt":"2017-07-11T00:00:00Z"},
    {
      "type_s":"view",
      "doc_id_s":"1",
      "query_s":"q1",
      "count_i":1,
      "timestamp_tdt":"2017-07-11T00:00:00Z"}]

    Fusion generates the following aggregated document for q1:

    {
       "aggr_count_i":4,
       "query_s":"q1",
       "doc_id_s":"1",
       "weight_d":0.36644220285922535,
       "aggr_id_s":"products_sql_agg",
       "aggr_job_id_s":"15d4279d128T755e5137",
       "flag_s":"aggr",
       "query_t":"q1",
       "aggr_type_s":"sql",
       "timestamp_tdt":"2017-07-14T19:01:05.950Z"}

    Use different weights based on signal types

    This is a slightly more complex example that uses a subquery to compute a custom weight for each signal based on the signal type (add vs. click):

    SELECT SUM(count_i) AS aggr_count_i,
           query_s,
           doc_id_s,
           time_decay(count_i, timestamp_tdt, "5 days", ref_time, signal_weight) AS weight_d
      FROM (SELECT count_i,
                   query_s,
                   doc_id_s,
                   timestamp_tdt,
                   ref_time,
                   CASE WHEN type_s='add' THEN 0.25 ELSE 0.1 END AS signal_weight
              FROM products_signals)
    GROUP BY query_s, doc_id_s

    Compute metrics for sessions

    This aggregation query uses a number of Spark SQL functions to compute some metrics for sessions:

    SELECT concat_ws('||', clientip, session_id) as id,
           first(clientip) as clientip,
           min(ts) as session_start,
           max(ts) as session_end,
           (unix_timestamp(max(ts)) - unix_timestamp(min(ts))) as session_len_secs_l,
           sum(asInt(bytes)) as total_bytes_l,
           count(*) as total_requests_l
      FROM sessions
    GROUP BY clientip, session_id