Example Aggregation with Sample Data

Located in the $FUSION/examples/signals directory of your installation are a set of files that can show you how signals aggregation can work. There are three files that work together:

  • signals.sh - this is a script that will load signals and run an aggregation job from start to finish.

  • signals.json - a sample data set of 20,000 signal events. These are 'click' events.

  • aggregation_definition.json - a sample aggregation job definition that will demonstrate some of the capabilities of the aggregator.

You can simply start the signals.sh script and sit back and wait for it to finish. Below we’ll step through what the script is doing.

Creating a Collection and Loading Signals

The script will create a collection in your system named 'lucidworks102'. This collection will have signals enabled, which will create two additional system collections, 'lucidworks102_signals' and 'lucidworks102_signals_aggr'.

The script will then load the sample signal events contained in the signals.json file. This is about 20,000 click events that reflect queries users performed and document IDs the users clicked on. These signal events will be loaded to the 'lucidworks102_signals' collection.

Defining the Aggregation Job

Once the signal events have been loaded, the script will create an aggregation job and begin processing the events. The aggregated events will be stored in the 'lucidworks102_signals_aggr'. Below we’ll step through this sample job in detail and note some interesting configurations. The line numbers correspond to the lines in aggregation_definition.json. Note that many of the functions referred to here are described in full in the section Aggregator Functions.

First, we assign the ID of the aggregation job and define the signalTypes and the aggregator we will use.

{
  "id": "1",
  "signalTypes": [
    "click"
  ],

In this case, we are using 'click' signalTypes, but we are not limited to only those types. We could instead define any string, as long as it exists in the type field of the events stored in the signals collection. If necessary, we could mix types, as needed.

Note that we have not defined the fields that we will use to create tuples for aggregation. Since we have not defined these, the defaults will be used: 'doc_id_s', 'query_s', and ' filters_s'. We have also not defined an 'aggregator' property so by default 'click' aggregation will occur and 'weight_d' fields will be calculated and added to the aggregated records.

In the next step, we define an 'aggregates' property, which will contain all of the functions we’d like to perform with the events data. The functions will be performed in the order they are defined here.

  "aggregates": [
    {
      "type": "count",
      "sourceFields": [
        "id"
      ],
      "targetField": "count_d"
    },

This first step counts the number of 'id' fields in the events data, and puts that number in a field named 'count_d'.

Next we define a script function.

    {
      "type": "script",
      "params": {
        "aggregateScript": "event.addField('script_d',2.0);result.setField('script_d',2.0);"
      }
    },

The aggregateScript is called for every event record in the source collection of signals. This script defines that a field 'script_d' should be added to each source record, and should also be added to the result record (i.e., the record after aggregation).

Next we define a sumOfLogs function.

    {
      "type": "sumOfLogs",
      "sourceFields": [
        "script_d"
      ],
      "targetField": "script_sum_logs_d"
    },

This definition uses the 'script_d' field that the previous step just created in each record.

Next we define a collect function.

    {
      "type": "collect",
      "sourceFields": [
        "id"
      ],
      "targetField": "ids_ss"
    },

This function will collect all 'id' fields from the source event, and put the ids in a field named 'id_ss'.

Finally, we define an expression function.

    {
      "type": "expr",
      "sourceFields": [
        "query_s",
        "filters_s"
      ],
      "targetField": "expr_t",
      "params": {
        "expr": "v = ''; if (value != null) v = value + ' | '; v + query_s + ' & ' + filters_s"
      }
    }
  ],

This expression concatenates values from the 'query_s' and 'filters_s' fields and appends them to the previous value of the target field (in this case, 'expr_t'). We have included a conditional block at the beginning of the expression because when this expression is first invoked the result ("value") may be 'null' and JavaScript would convert it to a literal "null" string.

This is the end of the aggregates section of the configuration, so we’ve closed the JSON tags appropriately.

The last property for this job is to set the timeRange.

  "timeRange": "[* TO NOW]"
}

This simple range will get all events for all dates and times.

Loading the Configuration and Starting the Job

If you are using the signals.sh script, you don’t need to load the configuration or start the aggregator job. However, we’ve included the API calls below for your reference.

It’s possible to load the configuration as a file via the Signals Aggregator API, with a call such as:

curl -u user:pass -X POST -H 'Content-Type: application/json' http://localhost:8764/api/apollo/aggregator/aggregations -d @aggregation_definition.json

If this was successful, the response will be empty. While the file name is aggregation_definition.json, the ID of the job as defined in the file is '1', so that is the ID this job will be known as.

Once the job definition has been loaded, you can start the job when ready:

curl -u user:pass -X POST http://localhost:8764/api/apollo/aggregator/jobs/lucidworks102_signals/1

The job will likely run for a while, so you can check it periodically by doing a GET request to the same endpoint:

curl -u user:pass http://localhost:8764/api/apollo/aggregator/jobs/lucidworks102_signals/Aggr1

Once the job is finished, you are ready to work with your aggregated signals.

Sample Aggregated Event

Once the aggregation job has completed, we can take a look at a specific aggregated event.

{
    "id": "0886f6d2e02c47d5b11fc809dd8508bc-358",
    "attr_params.filterQueries_": [
        "cat00000",
        "abcat0200000",
        "abcat0204000",
        "pcmcat144700050004"
    ],
    "filters_s": "abcat0200000 $ abcat0204000 $ cat00000 $ pcmcat144700050004",
    "count_d": 5,
    "doc_id_s": "1118988",
    "script_d": 4,
    "query_t": "shure",
    "aggr_type_s": "click@doc_id_s-query_s-filters_s",
    "params.position_s": "0",
    "query_s": "shure",
    "filters_orig_ss": [
        "abcat0204000",
        "pcmcat144700050004",
        "abcat0200000",
        "cat00000"
    ],
    "weight_d": 1.0778248953069447e-11,
    "flag_s": "aggr",
    "expr_t": "shure & abcat0200000 $ abcat0204000 $ cat00000 $ pcmcat144700050004 | shure & abcat0200000 $ abcat0204000 $ cat00000 $ pcmcat144700050004 | shure & abcat0200000 $ abcat0204000 $ cat00000 $ pcmcat144700050004 | shure & abcat0200000 $ abcat0204000 $ cat00000 $ pcmcat144700050004 | shure & abcat0200000 $ abcat0204000 $ cat00000 $ pcmcat144700050004",
    "ids_ss": [
        "12a3a546-95b2-4a86-8d36-88aebc89eb5f",
        "4f074782-74b5-4a05-9bdf-1f447a60735d",
        "6006eb92-6bcb-417e-828a-e0408590c76e",
        "be02f44d-4731-44f3-94ac-faede6e7d4e3",
        "f80f886c-3825-4d90-81ce-89f53a987d92"
    ],
    "script_sum_logs_d": 3.4657359027997265,
    "aggr_id_s": "0886f6d2e02c47d5b11fc809dd8508bc",
    "type_s": "click",
    "query_orig_s": "shure",
    "attr_query_orig_s_": [
        "shure",
        "Shure"
    ],
    "count_i": 5,
    "timestamp_dt": "2014-09-12T22:05:22.818Z",
    "_version_": 1479078865938677800
}

Let’s walk through the fields of this aggregated document:

  • First, the id consists of the job id, a dash, then a sequential number of the aggregated document.

  • The fields that made up our tuple, in this case filters_s, doc_id, and query_s since we used the default, are shown verbatim from the input signals.

  • The aggr_type_s field consists of the aggregator used ('click') and the tuple dimensions ('doc_id_s-query_s-filters_s').

  • The weight_d field has been added to each aggregated record because we performed a 'click' aggregation. If we had chosen a 'simple' aggregation instead, we would not have a 'weight_d' field but in other respects the aggregated record will be very similar.

  • The count_d is 5, which shows how many source events comprised this aggregated record.

  • The flag_s field is set to 'aggr' to indicate that this is an aggregated event.

  • The script_sum_logs_d is the calculated sumOfLogs result, which used the values from the 'script_d' field in the original records added by the 'script' function.

  • The id_ss is the result of the 'collect' function, in which we asked for all the event ids.

  • The expr_t field is the output of the 'expr' function including the concatenated values from the 'query_s' and 'filters_s' fields.

  • The aggr_id is the unique ID of the aggregation job. Note that this ID is one component of this aggregated event ID.

  • The type_s is the type of the source event. If more than one type was chosen for the aggregation job, each type would appear separated by a pipe ('|') delimiter (such as, 'view|click|buy').

  • The count_i field is the count of source events for this aggregated result.

  • The timestamp_dt field is a reference time for the aggregation. This time is either when the aggregation was executed, or was supplied as part of the aggregation definition with the 'time' property.

Once the records have been aggregated, they can be used for Recommendations, or for other uses within your organization.