Fusion Server

Version 4.1
How To
    Learn More

      Fusion SQL Query and Analytics

      Most organizations that deploy Fusion also have SQL-compliant business intelligence (BI) or dashboarding tools to facilitate self-service analytics.

      The Fusion SQL service:

      • Lets organizations leverage their investments in BI tools by using JDBC and SQL to analyze data managed by Fusion. For example, Tableau is popular data analytics tool that connects to Fusion SQL using JDBC to enable self-service analytics.

      • Helps business users access important data sets in Fusion without having to know how to query Solr.

      See Fusion SQL Administration to learn about architecture and configuration.

      See the reference topics below to learn how to query Fusion using the Fusion SQL engine:

      In addition to the specified System Requirements, Fusion on Windows requires Visual C++ Redistributable for Visual Studio 2015 to start the SQL service successfully.

      Approaches for optimizing the performance of queries

      In general terms, the Fusion SQL service supports three approaches for optimizing the performance of queries:

      • Join or aggregate in Spark – Read a set of raw rows from Solr and join or aggregate the rows in Spark.

        For queries that rely on Spark performing joins and aggregations on raw rows read from Solr, your goal is to minimize the number of rows read from Solr and achieve the best read performance of those rows.

      • Push down into Solr – Push down aggregation queries into Solr, returning a smaller set of aggregated rows to Spark.

      • Use views – Use views that send queries directly to Solr, using options supported by the spark-solr library.

      Examples of queries written to optimize performance

      Here we provide examples of how to get the best performance out of the Fusion SQL service.

      The examples show how to write queries to optimize performance, by ensuring that Fusion Spark is not performing unnecessary computations. This lets you do complex operations in Spark.

      Pulling back rows is a rate-limiting step when executing aggregations. These examples ensure that the Fusion Spark query planner pulls back the smallest number of rows.

      For general SQL information, see the Spark SQL Language manual.

      Example 1: Only request fields that have DocValues enabled

      Optimal read performance is achieved by only requesting fields that have DocValues enabled, because these can be pulled through the /export handler.

      It goes without saying that you should only request the fields you need for each query. Spark’s query planner pushes the field list down into the Fusion SQL service, which translates it into an fl parameter to Solr.

      For example, if you need movie_id and title from the movies table, do this:

      select movie_id, title from movies

      Do not do this:

      select * from movies

      Example 2: Use WHERE clause filtering to reduce the number of rows

      Use WHERE clause criteria, including full Solr queries, to do as much filtering in Solr as possible to reduce the number of rows.

      Spark’s SQL query planner pushes down simple filter criteria into the Fusion SQL service, which translates SQL filters into Solr filter query (fq) parameters. For example, if you want to query using SQL:

      select user_id, movie_id, rating from ratings where rating = 4

      Then behind the scenes, the Fusion SQL service transforms this query into the following Solr query:


      Notice that the WHERE clause was translated into an fq parameter and the specific fields needed for the query are sent along in the fl parameter. Also notice that the Fusion SQL service will use the /export handler if all of the fields requested have DocValues enabled. This makes a big difference in performance.

      You can also perform full-text queries using the WHERE clause. For example, the following SQL performs a full-text search for the term "love" in the plot_txt_en field of the movies table.

      select movie_id,title from movies where plot_txt_en='love'

      This works because the Fusion SQL service uses the Solr schema to determine that the plot_txt_en field is a text field, and so Solr assumes the user wants to perform a full-text query. Keep in mind that this full-text search feature does not work with cached tables, because those are held in Spark memory and WHERE criteria are not sent to Solr when tables are cached.

      The Fusion SQL service also provides the _query_ user-defined function (UDF) to execute any valid Solr query; for example:

      select movie_id,title from movies
      where _query_("+plot_txt_en:dogs -plot_txt_en:cats")

      You should use the _query_ UDF approach when you need to pass a complex query to Solr.

      Here is another example where we push down a subquery into Solr directly to apply a complex full-text search (in this case, a geofilt geospatial query) and then join the resulting rows with a different table:

      SELECT geo.place_name, count(*) as cnt
        FROM users u
        INNER JOIN (select place_name,zip_code from zipcodes where _query_('{!geofilt sfield=geo_location pt=44.9609,-93.2642 d=50}')) as geo
        ON geo.zip_code = u.zip_code WHERE u.gender='F' GROUP BY geo.place_name

      Example 3: Apply LIMIT clauses on pushdown queries

      Let us say you have a table of movies and ratings and want to join the title with the ratings table to get the top 100 movies with the most ratings, using something like this:

      select m.title, count(*) as num_ratings from movies m, ratings r where m.movie_id = r.movie_id group by m.title order by num_ratings desc limit 100

      Given that LIMIT clause, you might think this query will be very fast because you are only asking for 100 rows. However, if the ratings table is big (as is typically the case), then Spark has to read all of the ratings from Solr before joining and aggregating. The better approach is to push the LIMIT down into Solr, and then join from the smaller result set.

      select m.title, solr.num_ratings from movies m inner join (select movie_id, count(*) as num_ratings from ratings group by movie_id order by num_ratings desc limit 100) as popular_movies on m.movie_id = popular_movies.movie_id
      order by num_ratings desc

      Notice that the LIMIT is now on the subquery that gets run directly by Solr, using the Fusion SQL service pushdown. You should use this strategy whether you are aggregating in Solr or just retrieving raw rows. For example:

      SELECT e.id, e.name, solr.* FROM ecommerce e INNER JOIN (select timestamp_tdt, query_s, filters_s, type_s, user_id_s, doc_id_s from ecommerce_signals order by timestamp_tdt desc limit 50000) as signals ON signals.doc_id_s = e.id

      The subquery pulls the last 50,000 signals from Solr before joining the signals with the ecommerce table.

      Example 4: Tune read options of the underlying data asset in Fusion

      When you need to return fields that do not support DocValues from Solr, consider tuning the read options of the underlying data asset in Fusion.

      Behind the scenes, the Fusion SQL service uses parallel queries to each shard and cursorMark to page through all documents in each shard. This approach, while efficient, is not as fast as reading from the /export handler. For example, our ecommerce table contains text fields that cannot be exported using DocValues, so we can tune the read performance using the Catalog API:

      curl -X PUT -H "Content-type:application/json" --data-binary '{
        "name": "ecommerce",
        "assetType": "table",
        "projectId": "fusion",
        "description": "ecommerce demo data",
        "tags": ["fusion"],
        "format": "solr",
        "cacheOnLoad": false,
        "options" : [ "collection -> ecommerce", "splits_per_shard -> 4", "solr.params -> sort=id asc", "exclude_fields -> _lw_*,_raw_content_", "rows -> 10000" ]
      }' $FUSION_API/catalog/fusion/assets/ecommerce

      Notice that, in this case, we are reading all fields except those matching the patterns in the exclude_fields option. We have also increased the number of rows read per paging request to 10000 and we want 4 splits_per_shard, which means that we will use 4 tasks per shard to read data across all replicas of that shard.

      For more information, read about tuning Spark-Solr read performance.

      The key take-away here is that, if your queries need fields that cannot be exported, then you will need to do some manual tuning of the options on the data asset to get optimal performance. Here is another example where we want a geospatial field that does not support DocValues from Solr:

      curl -u $FUSION_USER:$FUSION_PASS -XPOST -H "Content-type:application/json" "$FUSION_API/catalog/fusion/assets" --data-binary @<(cat <<EOF
        "name": "minn_zipcodes",
        "assetType": "table",
        "projectId": "fusion",
        "format": "solr",
        "description":"zips around minn",
        "options": [
           "collection -> zipcodes",
           "fields -> zip_code,place_name,state,county,geo_point,geo_location,geo_location_rpt",
           "query -> {!geofilt sfield=geo_location pt=44.9609,-93.2642 d=50}",
           "solr.params -> sort=id asc"

      In this example, we want to return the geo_location_rpt field, for example to generate a heatmap, so we need to define a custom data asset in the catalog.

      Example 5: Aggregate in Solr using the underlying faceting engine

      Solr provides a number of basic aggregation capabilities, such as count, min, max, avg, and sum. To reduce the number of raw rows that are returned from Solr to Spark, the Fusion SQL service leverages Solr aggregation when possible. The smaller the number of rows returned from Solr lets Spark perform better query optimization, such as doing a broadcast of a small table across all partitions of a large table (a hash join).

      Here is an example where we push a group count operation (a facet basically) down into Solr using a subquery.

          SELECT m.title as title, top_rated_movies.aggCount as aggCount
            FROM movies m
      INNER JOIN (SELECT movie_id, COUNT(*) as aggCount FROM ratings WHERE rating >= 4 GROUP BY movie_id ORDER BY aggCount desc LIMIT 10) as top_rated_movies
              ON top_rated_movies.movie_id = m.movie_id
        ORDER BY aggCount DESC

      Solr returns aggregated rows by movie_id, and then we leverage Spark to perform the join between movies and the aggregated results of the subquery, which it can do quickly using a hash join with a broadcast.

      Example 6: Aggregate in Solr using streaming expressions

      To determine whether an aggregation can be pushed down into Solr for better performance, the Fusion SQL service analyzes the logical plan in Spark.

      Alternatively, you can write a streaming expression and then expose that as a view in the Fusion SQL service. For example, the following streaming expression joins ecommerce products and signals:

                 sort="id asc",
                       bucketSorts="count(*) desc",
        name as product_name,
        count(*) as click_count,
        id as product_id

      This streaming expression performs a hash join between the ecommerce table and the results of a facet expression on the signals collection. We also use the SELECT expression decorator to return human-friendly field names.

      For more information, read about how to write streaming expressions.

      After creating and testing the streaming expression in Solr, you must JSON encode the expression and then create a data asset in the Fusion catalog. For example:

      curl -XPOST -H "Content-Type:application/json" --data-binary '{
        "name": "ecomm_popular_docs",
        "assetType": "table",
        "projectId": "fusion",
        "description": "Join product name with facet counts of docs in signals",
        "tags": ["ecommerce"],
        "format": "solr",
        "cacheOnLoad": true,
        "options": ["collection -> ecommerce", "expr -> select(hashJoin(search(ecommerce,q=\"*:*\",fl=\"id,name\",sort=\"id asc\",qt=\"\/export\",partitionKeys=\"id\"),hashed=facet(ecommerce_signals,q=\"*:*\",buckets=\"doc_id_s\",bucketSizeLimit=10000,bucketSorts=\"count(*) desc\",count(*)),on=\"id=doc_id_s\"),name as product_name,count(*) as click_count,id as product_id)"]}' $FUSION_API/catalog/fusion/assets

      Here is another example of a streaming expression that leverages the underlying faceting engine’s support for computing aggregations beyond just a count:

          bucketSorts="count(*) desc",
        count(*) as the_count,
        sum(rating) as the_sum,
        min(rating) as the_min,
        max(rating) as the_max,
        avg(rating) as the_avg

      Example 7: Use sampling

      When doing exploratory analysis on a large table, use the rand function to bring back a small random sample of data from Solr, for example:

      select user_id as user, age as _age_ from users where rand() < 0.1 AND gender='F'

      The rand function returns a random number between 0 and 1, so in the previous example, we are requesting a sample that is roughly 10% of the total number of rows. Behind the scenes, Fusion SQL translates the rand UDF into a random streaming expression to draw a sample.

      Example 8: Cache results in Spark

      If you plan to perform additional queries against results, cache results in Spark.

      Catalog assets support the cacheOnLoad attribute, which caches the results of the query in Spark (memory with spill to disk). You can also request the results for any query sent to the Catalog API to be cached using the cacheResultsAs parameter:

      curl -XPOST -H "Content-Type:application/json" -d '{
        "sql":"SELECT u.user_id as user_id, age, gender, occupation, place_name, county, state, zip_code, geo_location_rpt, title, movie_id, rating, rating_timestamp FROM minn_users u INNER JOIN movie_ratings m ON u.user_id = m.user_id",
        "cacheResultsAs": "ratings_by_minn_users"
      }' "$FUSION_API/catalog/fusion/query"

      Be careful! If cached, updates to the underlying data source, most likely Solr, will no longer be visible. To trigger Spark to re-compute a cached view by going back to the underlying store, you can use the following SQL command:

      curl -XPOST -H "Content-Type:application/json" -d '{"sql":"refresh table table-name"}' \

      If a table is not cached, you can cache it using:

      curl -XPOST -H "Content-Type:application/json" -d '{"sql":"cache table table-name"}' \

      Or uncache it:

      curl -XPOST -H "Content-Type:application/json" -d '{"sql":"uncache table table-name"}' \

      User defined functions (UDFs)

      You can use SQL and Spark User Defined Functions (UDF) to clean and/or transform data from Solr.

      For example, Lucidworks Search Hub signals use complex field names generated by Snowplow. The following data asset definition uses SQL to make the data a bit more user friendly as it comes out of Solr:

        "name": "shub_signals",
        "assetType": "table",
        "projectId": "fusion",
        "description": "SearchHub signals",
        "tags": ["shub"],
        "format": "solr",
        "cacheOnLoad": false,
        "options": ["collection -> shub_signals", "solr.params -> sort=id asc", "fields -> timestamp_tdt,type_s,params.useragent_family_s,params.useragent_os_family_s,params.tz_s,params.totalResults_s,params.lang_s,params.useragent_type_name_s,params.terms_s,params.query_unique_id,params.useragent_v,params.doc_0,params.doc_1,params.doc_2,params.facet_ranges_publishedOnDate_before_d,params.uid_s,params.refr_s,params.useragent_category_s,params.sid_s,ip_sha_s,params.vid_s,params.page_s,params.fp_s"],
        "sql": "SELECT timestamp_tdt as timestamp, type_s as signal_type, `params.useragent_family_s` as ua_family,`params.useragent_os_family_s` as ua_os,`params.tz_s` as tz,cast(`params.totalResults_s` as int) as num_found, `params.lang_s` as lang, `params.useragent_type_name_s` as ua_type, `params.terms_s` as query_terms, `params.query_unique_id` as query_id, `params.useragent_v` as ua_vers, `params.doc_0` as doc0, `params.doc_1` as doc1, `params.doc_2` as doc2, `params.facet_ranges_publishedOnDate_before_d` as pubdate_range, `params.uid_s` as user_id, `params.refr_s` as referrer, `params.useragent_category_s` as ua_category, `params.sid_s` as session_id, `ip_sha_s` as ip, cast(`params.vid_s` as int) as num_visits, `params.page_s` as page_name, `params.fp_s` as fingerprint FROM shub_signals"

      Given how Spark uses lazy evaluation of DataFrame transformations, there is little additional overhead beyond the cost of reading from Solr to execute a SQL statement on top of the raw results from Solr.

      Beyond simple field renaming, you can also leverage 100s of built-in UDFs to enrich and/or transform fields. See: https://spark.apache.org/docs/1.6.3/api/scala/index.html#org.apache.spark.sql.functions$

      In the example above, the cast function casts a string field coming from Solr as an int:

      cast(`params.vid_s` as int)

      Keep in mind that Solr is a much more flexible data engine than is typically handled by BI visualization tools like Tableau. Consequently, the Fusion Catalog API lets you apply more structure to less structured data in Solr. Here is an example of using a UDF to aggregate by day (similar to Solr’s round down operator):

      select count(*) as num_per_day, date_format(rating_timestamp,"yyyy-MM-dd") as date_fmt from ratings group by date_format(rating_timestamp,"yyyy-MM-dd") order by num_per_day desc LIMIT 10