Fusion SQL Overview

Most organizations that deploy Fusion also have SQL-compliant business intelligence (BI) or dashboarding tools to facilitate self-service analytics.

The Fusion SQL service:

  • Lets organizations leverage their investments in BI tools by using JDBC and SQL to analyze data managed by Fusion. For example, Tableau is popular data analytics tool that connects to Fusion SQL using JDBC to enable self-service analytics.

  • Helps business users access important data sets in Fusion without having to know how to query Solr.

Important
In addition to the specified System Requirements, Fusion on Windows requires Visual C++ Redistributable for Visual Studio 2015 to start the SQL service successfully.

Fusion SQL architecture


layout: default title: Fusion SQL Architecture toc: true skip: all ---

The following diagram depicts a common Fusion SQL service deployment scenario using the Kerberos network authentication protocol for single sign-on. Integration with Kerberos is optional. By default, the Fusion SQL service uses Fusion security for authentication and authorization.

Fusion SQL service architecture

The numbered steps in the diagram are:

  1. The JDBC/ODBC client application (for example, TIBCO Spotfire or Tableau) uses Kerberos to authenticate a Fusion data analyst.

  2. After authentication, the JDBC/ODBC client application sends the user’s SQL query to the Fusion SQL Thrift Server over HTTP.

  3. The SQL Thrift Server uses the keytab of the Kerberos service principal to validate the incoming user identity.

    The Fusion SQL Thrift Server is a Spark application with a specific number of CPU cores and memory allocated from the pool of Spark resources. You can scale out the number of Spark worker nodes to increase available memory and CPU resources to the Fusion SQL service.

  4. The Thrift Server sends the query to Spark to be parsed into a logical plan.

  5. During the query planning stage, Spark sends the logical plan to Fusion’s pushdown strategy component.

  6. During pushdown analysis, Fusion calls out to the registered AuthZ FilterProvider implementation to get a filter query to perform row-level filtering for the Kerberos-authenticated user.

    By default, there is no row-level security provider but users can install their own implementation using the Fusion SQL service API.

  7. Spark executes a distributed Solr query to return documents that satisfy the SQL query criteria and row-level security filter. To leverage the distributed nature of Spark and Solr, Fusion SQL sends a query to all replicas for each shard in a Solr collection. Consequently, you can scale out SQL query performance by adding more Spark and/or Solr resources to your cluster.

Fusion pushdown strategy

The pushdown strategy analyzes the query plan to determine if there is an optimal Solr query or streaming expression that can push down aggregations into Solr to improve performance and scalability. For example, the following SQL query can be translated into a Solr facet query by the Fusion pushdown strategy:

select count(1) as the_count, movie_id from ratings group by movie_id

The basic idea behind Fusion’s pushdown strategy is it is much faster to let Solr facets perform basic aggregations than it is to export raw documents from Solr and have Spark perform the aggregation. If an optimal pushdown query is not possible, then Spark pulls raw documents from Solr, and then performs any joins or aggregations needed in Spark. Put simply, the Fusion SQL service tries to translate SQL queries into optimized Solr queries. But failing that, the service simply reads all matching documents for a query into Spark, and then performs the SQL execution logic across the Spark cluster.

Which collections are registered

By default, all Fusion collections except system collections are registered in the Fusion SQL service so you can query them without any additional setup. However, empty collections cannot be queried, or even described from SQL, so empty collections won’t show up in the Fusion SQL service until they have data. In addition, any fields with a dot in the name are ignored when tables are auto-registered. You can use the Catalog API to alias fields with dots in the names to include these fields.

If you add data to a previously empty collection, then you can execute either of the following SQL commands to ensure that the data gets added as a table:

show tables

show tables in `default`

The Fusion SQL service checks previously empty collections every minute and automatically registers recently populated collections as a table.

You can describe any table using:

describe table-name

See the movielens lab in the Fusion Spark Bootcamp for a complete example of working with the Fusion Catalog API and Fusion SQL service. Also read about the Catalog API.

Hive configuration

Behind the scenes, the Fusion SQL service is based on Hive. Use the hive-site.xml file in /opt/fusion/4.1.x/conf/ (on Unix) or C:\lucidworks\fusion\4.1.x\conf\ (on Windows) to configure Hive settings.

If you change hive-site.xml, you must restart the Fusion SQL service with ./sql restart (on Unix) or sql.cmd restart (on Windows).

Approaches for optimizing the performance of queries

In general terms, the Fusion SQL service supports three approaches for optimizing the performance of queries:

  • Join or aggregate in Spark – Read a set of raw rows from Solr and join or aggregate the rows in Spark.

    For queries that rely on Spark performing joins and aggregations on raw rows read from Solr, your goal is to minimize the number of rows read from Solr and achieve the best read performance of those rows.

  • Push down into Solr – Push down aggregation queries into Solr, returning a smaller set of aggregated rows to Spark.

  • Use views – Use views that send queries directly to Solr, using options supported by the spark-solr library.

Examples of queries written to optimize performance

Here we provide examples of how to get the best performance out of the Fusion SQL service.

The examples show how to write queries to optimize performance, by ensuring that Fusion Spark isn’t performing unnecessary computations. This lets you do complex operations in Spark.

Pulling back rows is a rate-limiting step when executing aggregations. These examples ensure that the Fusion Spark query planner pulls back the smallest number of rows.

For general SQL information, see the Spark SQL Language manual.

Example 1: Only request fields that have DocValues enabled

Optimal read performance is achieved by only requesting fields that have DocValues enabled, because these can be pulled through the /export handler.

It goes without saying that you should only request the fields you need for each query. Spark’s query planner pushes the field list down into the Fusion SQL service, which translates it into an fl parameter to Solr.

For example, if you need movie_id and title from the movies table, do this:

select movie_id, title from movies

Don’t do this:

select * from movies

Example 2: Use WHERE clause filtering to reduce the number of rows

Use WHERE clause criteria, including full Solr queries, to do as much filtering in Solr as possible to reduce the number of rows.

Spark’s SQL query planner pushes down simple filter criteria into the Fusion SQL service, which translates SQL filters into Solr filter query (fq) parameters. For example, if you want to query using SQL:

select user_id, movie_id, rating from ratings where rating = 4

Then behind the scenes, the Fusion SQL service transforms this query into the following Solr query:

q=*:*&qt=/export&sort=id+asc&collection=ratings&fl=user_id,movie_id,rating&fq=rating:4

Notice that the WHERE clause was translated into an fq parameter and the specific fields needed for the query are sent along in the fl parameter. Also notice that the Fusion SQL service will use the /export handler if all of the fields requested have DocValues enabled. This makes a big difference in performance.

You can also perform full-text queries using the WHERE clause. For example, the following SQL performs a full-text search for the term "love" in the plot_txt_en field of the movies table.

select movie_id,title from movies where plot_txt_en='love'

This works because the Fusion SQL service uses the Solr schema to determine that the plot_txt_en field is a text field, and so Solr assumes the user wants to perform a full-text query. Keep in mind that this full-text search feature does not work with cached tables, because those are held in Spark memory and WHERE criteria aren’t sent to Solr when tables are cached.

The Fusion SQL service also provides the _query_ user-defined function (UDF) to execute any valid Solr query; for example:

select movie_id,title from movies
where _query_("+plot_txt_en:dogs -plot_txt_en:cats")

You should use the _query_ UDF approach when you need to pass a complex query to Solr.

Here’s another example where we push down a subquery into Solr directly to apply a complex full-text search (in this case, a geofilt geospatial query) and then join the resulting rows with a different table:

SELECT geo.place_name, count(*) as cnt
  FROM users u
  INNER JOIN (select place_name,zip_code from zipcodes where _query_('{!geofilt sfield=geo_location pt=44.9609,-93.2642 d=50}')) as geo
  ON geo.zip_code = u.zip_code WHERE u.gender='F' GROUP BY geo.place_name

Example 3: Apply LIMIT clauses on pushdown queries

Let’s say you have a table of movies and ratings and want to join the title with the ratings table to get the top 100 movies with the most ratings, using something like this:

select m.title, count(*) as num_ratings from movies m, ratings r where m.movie_id = r.movie_id group by m.title order by num_ratings desc limit 100

Given that LIMIT clause, you might think this query will be very fast because you’re only asking for 100 rows. However, if the ratings table is big (as is typically the case), then Spark has to read all of the ratings from Solr before joining and aggregating. The better approach is to push the LIMIT down into Solr, and then join from the smaller result set.

select m.title, solr.num_ratings from movies m inner join (select movie_id, count(*) as num_ratings from ratings group by movie_id order by num_ratings desc limit 100) as popular_movies on m.movie_id = popular_movies.movie_id
order by num_ratings desc

Notice that the LIMIT is now on the subquery that gets run directly by Solr, using the Fusion SQL service pushdown. You should use this strategy whether you’re aggregating in Solr or just retrieving raw rows. For example:

SELECT e.id, e.name, solr.* FROM ecommerce e INNER JOIN (select timestamp_tdt, query_s, filters_s, type_s, user_id_s, doc_id_s from ecommerce_signals order by timestamp_tdt desc limit 50000) as signals ON signals.doc_id_s = e.id

The subquery pulls the last 50,000 signals from Solr before joining the signals with the ecommerce table.

Example 4: Tune read options of the underlying data asset in Fusion

When you need to return fields that don’t support DocValues from Solr, consider tuning the read options of the underlying data asset in Fusion.

Behind the scenes, the Fusion SQL service uses parallel queries to each shard and cursorMark to page through all documents in each shard. This approach, while efficient, is not as fast as reading from the /export handler. For example, our ecommerce table contains text fields that can’t be exported using DocValues, so we can tune the read performance using the Catalog API:

curl -X PUT -H "Content-type:application/json" --data-binary '{
  "name": "ecommerce",
  "assetType": "table",
  "projectId": "fusion",
  "description": "ecommerce demo data",
  "tags": ["fusion"],
  "format": "solr",
  "cacheOnLoad": false,
  "options" : [ "collection -> ecommerce", "splits_per_shard -> 4", "solr.params -> sort=id asc", "exclude_fields -> _lw_*,_raw_content_", "rows -> 10000" ]
}' $FUSION_API/catalog/fusion/assets/ecommerce

Notice that, in this case, we’re reading all fields except those matching the patterns in the exclude_fields option. We’ve also increased the number of rows read per paging request to 10000 and we want 4 splits_per_shard, which means that we’ll use 4 tasks per shard to read data across all replicas of that shard.

For more information, read about tuning Spark-Solr read performance.

The key take-away here is that, if your queries need fields that can’t be exported, then you’ll need to do some manual tuning of the options on the data asset to get optimal performance. Here’s another example where we want a geospatial field that doesn’t support DocValues from Solr:

curl -u $FUSION_USER:$FUSION_PASS -XPOST -H "Content-type:application/json" "$FUSION_API/catalog/fusion/assets" --data-binary @<(cat <<EOF
{
  "name": "minn_zipcodes",
  "assetType": "table",
  "projectId": "fusion",
  "format": "solr",
  "description":"zips around minn",
  "options": [
     "collection -> zipcodes",
     "fields -> zip_code,place_name,state,county,geo_point,geo_location,geo_location_rpt",
     "query -> {!geofilt sfield=geo_location pt=44.9609,-93.2642 d=50}",
     "solr.params -> sort=id asc"
  ]
}
EOF
)

In this example, we want to return the geo_location_rpt field, for example to generate a heatmap, so we need to define a custom data asset in the catalog.

Example 5: Aggregate in Solr using the underlying faceting engine

Solr provides a number of basic aggregation capabilities, such as count, min, max, avg, and sum. To reduce the number of raw rows that are returned from Solr to Spark, the Fusion SQL service leverages Solr aggregation when possible. The smaller the number of rows returned from Solr lets Spark perform better query optimization, such as doing a broadcast of a small table across all partitions of a large table (a hash join).

Here is an example where we push a group count operation (a facet basically) down into Solr using a subquery.

    SELECT m.title as title, top_rated_movies.aggCount as aggCount
      FROM movies m
INNER JOIN (SELECT movie_id, COUNT(*) as aggCount FROM ratings WHERE rating >= 4 GROUP BY movie_id ORDER BY aggCount desc LIMIT 10) as top_rated_movies
        ON top_rated_movies.movie_id = m.movie_id
  ORDER BY aggCount DESC

Solr returns aggregated rows by movie_id, and then we leverage Spark to perform the join between movies and the aggregated results of the subquery, which it can do quickly using a hash join with a broadcast.

Example 6: Aggregate in Solr using streaming expressions

To determine whether an aggregation can be pushed down into Solr for better performance, the Fusion SQL service analyzes the logical plan in Spark.

Alternatively, you can write a streaming expression and then expose that as a view in the Fusion SQL service. For example, the following streaming expression joins ecommerce products and signals:

select(
  hashJoin(
    search(ecommerce,
           q="*:*",
           fl="id,name",
           sort="id asc",
           qt="/export",
          partitionKeys="id"),
    hashed=facet(ecommerce_signals,
                 q="*:*",
                 buckets="doc_id_s",
                 bucketSizeLimit=10000,
                 bucketSorts="count(*) desc",
                 count(*)),
    on="id=doc_id_s"),
  name as product_name,
  count(*) as click_count,
  id as product_id
)

This streaming expression performs a hash join between the ecommerce table and the results of a facet expression on the signals collection. We also use the SELECT expression decorator to return human-friendly field names.

For more information, read about how to write streaming expressions.

After creating and testing the streaming expression in Solr, you must JSON encode the expression and then create a data asset in the Fusion catalog. For example:

curl -XPOST -H "Content-Type:application/json" --data-binary '{
  "name": "ecomm_popular_docs",
  "assetType": "table",
  "projectId": "fusion",
  "description": "Join product name with facet counts of docs in signals",
  "tags": ["ecommerce"],
  "format": "solr",
  "cacheOnLoad": true,
  "options": ["collection -> ecommerce", "expr -> select(hashJoin(search(ecommerce,q=\"*:*\",fl=\"id,name\",sort=\"id asc\",qt=\"\/export\",partitionKeys=\"id\"),hashed=facet(ecommerce_signals,q=\"*:*\",buckets=\"doc_id_s\",bucketSizeLimit=10000,bucketSorts=\"count(*) desc\",count(*)),on=\"id=doc_id_s\"),name as product_name,count(*) as click_count,id as product_id)"]}' $FUSION_API/catalog/fusion/assets

Here’s another example of a streaming expression that leverages the underlying faceting engine’s support for computing aggregations beyond just a count:

select(
  facet(
    ratings,
    q="*:*",
    buckets="rating",
    bucketSorts="count(*) desc",
    bucketSizeLimit=100,
    count(*),
    sum(rating),
    min(rating),
    max(rating),
    avg(rating)
  ),
  rating,
  count(*) as the_count,
  sum(rating) as the_sum,
  min(rating) as the_min,
  max(rating) as the_max,
  avg(rating) as the_avg
)

Example 7: Use sampling

When doing exploratory analysis on a large table, use the rand function to bring back a small random sample of data from Solr, for example:

select user_id as user, age as _age_ from users where rand() < 0.1 AND gender='F'

The rand function returns a random number between 0 and 1, so in the previous example, we’re requesting a sample that is roughly 10% of the total number of rows. Behind the scenes, Fusion SQL translates the rand UDF into a random streaming expression to draw a sample.

Example 8: Cache results in Spark

If you plan to perform additional queries against results, cache results in Spark.

Catalog assets support the cacheOnLoad attribute, which caches the results of the query in Spark (memory with spill to disk). You can also request the results for any query sent to the Catalog API to be cached using the cacheResultsAs parameter:

curl -XPOST -H "Content-Type:application/json" -d '{
  "sql":"SELECT u.user_id as user_id, age, gender, occupation, place_name, county, state, zip_code, geo_location_rpt, title, movie_id, rating, rating_timestamp FROM minn_users u INNER JOIN movie_ratings m ON u.user_id = m.user_id",
  "cacheResultsAs": "ratings_by_minn_users"
}' "$FUSION_API/catalog/fusion/query"

Be careful! If cached, updates to the underlying data source, most likely Solr, will no longer be visible. To trigger Spark to re-compute a cached view by going back to the underlying store, you can use the following SQL command:

curl -XPOST -H "Content-Type:application/json" -d '{"sql":"refresh table table-name"}' \
"$FUSION_API/catalog/fusion/query"

If a table isn’t cached, you can cache it using:

curl -XPOST -H "Content-Type:application/json" -d '{"sql":"cache table table-name"}' \
"$FUSION_API/catalog/fusion/query"

Or uncache it:

curl -XPOST -H "Content-Type:application/json" -d '{"sql":"uncache table table-name"}' \
"$FUSION_API/catalog/fusion/query"

User defined functions (UDFs)

You can use SQL and Spark User Defined Functions (UDF) to clean and/or transform data from Solr.

For example, Lucidworks Search Hub signals use complex field names generated by Snowplow. The following data asset definition uses SQL to make the data a bit more user friendly as it comes out of Solr:

{
  "name": "shub_signals",
  "assetType": "table",
  "projectId": "fusion",
  "description": "SearchHub signals",
  "tags": ["shub"],
  "format": "solr",
  "cacheOnLoad": false,
  "options": ["collection -> shub_signals", "solr.params -> sort=id asc", "fields -> timestamp_tdt,type_s,params.useragent_family_s,params.useragent_os_family_s,params.tz_s,params.totalResults_s,params.lang_s,params.useragent_type_name_s,params.terms_s,params.query_unique_id,params.useragent_v,params.doc_0,params.doc_1,params.doc_2,params.facet_ranges_publishedOnDate_before_d,params.uid_s,params.refr_s,params.useragent_category_s,params.sid_s,ip_sha_s,params.vid_s,params.page_s,params.fp_s"],
  "sql": "SELECT timestamp_tdt as timestamp, type_s as signal_type, `params.useragent_family_s` as ua_family,`params.useragent_os_family_s` as ua_os,`params.tz_s` as tz,cast(`params.totalResults_s` as int) as num_found, `params.lang_s` as lang, `params.useragent_type_name_s` as ua_type, `params.terms_s` as query_terms, `params.query_unique_id` as query_id, `params.useragent_v` as ua_vers, `params.doc_0` as doc0, `params.doc_1` as doc1, `params.doc_2` as doc2, `params.facet_ranges_publishedOnDate_before_d` as pubdate_range, `params.uid_s` as user_id, `params.refr_s` as referrer, `params.useragent_category_s` as ua_category, `params.sid_s` as session_id, `ip_sha_s` as ip, cast(`params.vid_s` as int) as num_visits, `params.page_s` as page_name, `params.fp_s` as fingerprint FROM shub_signals"
}

Given how Spark uses lazy evaluation of DataFrame transformations, there is little additional overhead beyond the cost of reading from Solr to execute a SQL statement on top of the raw results from Solr.

Beyond simple field renaming, you can also leverage 100s of built-in UDFs to enrich and/or transform fields. See: https://spark.apache.org/docs/1.6.3/api/scala/index.html#org.apache.spark.sql.functions$

In the example above, the cast function casts a string field coming from Solr as an int:

cast(`params.vid_s` as int)

Keep in mind that Solr is a much more flexible data engine than is typically handled by BI visualization tools like Tableau. Consequently, the Fusion Catalog API lets you apply more structure to less structured data in Solr. Here’s an example of using a UDF to aggregate by day (similar to Solr’s round down operator):

select count(*) as num_per_day, date_format(rating_timestamp,"yyyy-MM-dd") as date_fmt from ratings group by date_format(rating_timestamp,"yyyy-MM-dd") order by num_per_day desc LIMIT 10