Fusion SQL service

Most organizations that deploy Fusion also have SQL-compliant business intelligence (BI) or dashboarding tools to facilitate self-service analytics.

The Fusion SQL service:

  • Lets organizations leverage their investments in BI tools by using JDBC and SQL to analyze data managed by Fusion. For example, Tableau is popular data analytics tool that connects to Fusion SQL using JDBC to enable self-service analytics.

  • Helps business users access important data sets in Fusion without having to know how to query Solr.

Fusion SQL architecture

The following diagram depicts a common Fusion SQL service deployment scenario using the Kerberos network authentication protocol for single sign-on. Integration with Kerberos is optional. By default, the Fusion SQL service uses Fusion security for authentication and authorization.

Fusion SQL service architecture

The numbered steps in the diagram are:

  1. The JDBC/ODBC client application (for example, TIBCO Spotfire or Tableau) uses Kerberos to authenticate a Fusion data analyst.

  2. After authentication, the JDBC/ODBC client application sends the user’s SQL query to the Fusion SQL Thrift Server over HTTP.

  3. The SQL Thrift Server uses the keytab of the Kerberos service principal to validate the incoming user identity.

    The Fusion SQL Thrift Server is a Spark application with a specific number of CPU cores and memory allocated from the pool of Spark resources. You can scale out the number of Spark worker nodes to increase available memory and CPU resources to the Fusion SQL service.

  4. The Thrift Server sends the query to Spark to be parsed into a logical plan.

  5. During the query planning stage, Spark sends the logical plan to Fusion’s pushdown strategy component.

  6. During pushdown analysis, Fusion calls out to the registered AuthZ FilterProvider implementation to get a filter query to perform row-level filtering for the Kerberos-authenticated user.

    By default, there is no row-level security provider but users can install their own implementation using the Fusion SQL service API.

  7. Spark executes a distributed Solr query to return documents that satisfy the SQL query criteria and row-level security filter. To leverage the distributed nature of Spark and Solr, Fusion SQL sends a query to all replicas for each shard in a Solr collection. Consequently, you can scale out SQL query performance by adding more Spark and/or Solr resources to your cluster.

Fusion pushdown strategy

The pushdown strategy analyzes the query plan to determine if there is an optimal Solr query or streaming expression that can push down aggregations into Solr to improve performance and scalability. For example, the following SQL query can be translated into a Solr facet query by the Fusion pushdown strategy:

select count(1) as the_count, movie_id from ratings group by movie_id

The basic idea behind Fusion’s pushdown strategy is it is much faster to let Solr facets perform basic aggregations than it is to export raw documents from Solr and have Spark perform the aggregation. If an optimal pushdown query is not possible, then Spark pulls raw documents from Solr, and then performs any joins or aggregations needed in Spark. Put simply, the Fusion SQL service tries to translate SQL queries into optimized Solr queries. But failing that, the service simply reads all matching documents for a query into Spark, and then performs the SQL execution logic across the Spark cluster.

Starting, stopping, and status

Starting the Fusion SQL service

The Fusion SQL service is an optional service that must be started manually.

Give these commands from the bin directory below the Fusion home directory, for example, /opt/fusion/4.2.x (on Unix) or C:\lucidworks\fusion\4.2.x (on Windows).

On Unix:

When starting the Fusion SQL service, the best practice is to also start the Spark master and Spark worker services:

./fusion start spark-master spark-worker sql

On Windows:

When starting the Fusion SQL service, the best practice is to also start the Spark master and Spark worker services:

spark-master.cmd start
spark-worker.cmd start
sql.cmd start

Stopping the Fusion SQL service

The Fusion SQL service is an optional service that must be stopped manually.

Give these commands from the bin directory below the Fusion home directory, for example, /opt/fusion/4.2.x (on Unix) or C:\lucidworks\fusion\4.2.x (on Windows).

On Unix:

When stopping the Fusion SQL service, the best practice is to also stop the Spark master and Spark worker services:

./fusion stop sql spark-worker spark-master

On Windows:

When starting the Fusion SQL service, the best practice is to also start the Spark master and Spark worker services:

sql.cmd stop
spark-worker.cmd stop
spark-master.cmd stop

Updating the group.default definition

If you plan to run the Fusion SQL service for production, we recommend updating the group.default definition in the file conf/fusion.properties (on Unix) or conf\fusion.properties (on Windows) to include the spark-master, spark-worker, and sql services:

group.default = zookeeper, solr, api, connectors-classic, connectors-rpc, proxy, webapps, admin-ui, log-shipper, spark-master, spark-worker, sql

Verifying that the Fusion SQL service started

Verify the Fusion SQL service application started. Give these commands from the bin directory below the Fusion home directory.

On Unix:

./sql status

On Windows:

sql.cmd status

Alternatively, check the Spark UI, for example:

Check Spark UI

Which collections are registered

By default, all Fusion collections except system collections are registered in the Fusion SQL service so you can query them without any additional setup. However, empty collections cannot be queried, or even described from SQL, so empty collections won’t show up in the Fusion SQL service until they have data. In addition, any fields with a dot in the name are ignored when tables are auto-registered. You can use the Catalog API to alias fields with dots in the names to include these fields.

If you add data to a previously empty collection, then you can execute either of the following SQL commands to ensure that the data gets added as a table:

show tables

show tables in `default`

The Fusion SQL service checks previously empty collections every minute and automatically registers recently populated collections as a table.

You can describe any table using:

describe table-name

See the movielens lab in the Fusion Spark Bootcamp for a complete example of working with the Fusion Catalog API and Fusion SQL service. Also read about the Catalog API.

Troubleshooting SQL queries

If you encounter an issue with a SQL query, the first place to look for more information about the issue is the var/log/sql/sql.log file. If you need more verbose log information, change the level to DEBUG for the following loggers in the file conf/sql-log4j2.xml (on Unix) or conf\sql-log4j2.xml (on Windows):

<logger name="com.lucidworks.spark" level="DEBUG"/>
<logger name="com.lucidworks.spark.sql" level="DEBUG"/>

After making changes, you must restart the Fusion SQL service. Give these commands from the bin directory below the Fusion home directory, for example, /opt/fusion/4.2.x (on Unix) or C:\lucidworks\fusion\4.2.x (on Windows).

On Unix:

./sql restart

On Windows:

sql.cmd restart

Connecting to the Fusion SQL service from JDBC

The default JDBC properties for connecting to the Fusion SQL service are:

Driver:

org.apache.hive.jdbc.HiveDriver

Driver JAR:

Unix:

/opt/fusion/4.2.x/apps/libs/hive-jdbc-shaded-2.1.1.jar

Windows:

C:\lucidworks\fusion\4.2.x\apps\libs\hive-jdbc-shaded-2.1.1.jar

JDBC URL:

jdbc:hive2://localhost:8768/default;transportMode=http;httpPath=fusion

The username and password are the same as the ones you use to authenticate to Fusion.

Increasing resource allocations

So as to not conflict with the CPU and memory settings used for Fusion driver applications (default & script), the Fusion SQL service uses a unique set of configuration properties for granting CPU and memory for executing SQL queries.

You can use the Configurations API to override the default values shown here.

Configuration Property and Default Description

fusion.sql.cores

1

Sets the max number of cores to use across the entire cluster to execute SQL queries. Give as many as possible while still leaving CPU available for other Fusion jobs.

fusion.sql.executor.cores

1

Number of cores to use per executor

fusion.sql.memory

1g

Memory per executor to use for executing SQL queries

fusion.sql.default.shuffle.partitions

20

Default number of partitions when performing a distributed group-by-type operation, such as a JOIN

fusion.sql.bucket_size_limit.threshold

30,000,000

Threshold that determines when to use Solr streaming rollup instead of facet when computing aggregations; rollup can handle high cardinality dimenions but is much slower than using facets to compute aggregate measures.

fusion.sql.max.no.limit.threshold

10,000

Sets a limit for SQL queries that select all fields and all rows, that is, select * from table-name.

fusion.sql.max.cache.rows

5,000,000

Don’t cache tables bigger than this threshold. If a user sends the cache-table command for large collections with row counts that exceed this value, then the cache operations will fail.

fusion.sql.max_scan_rows

2,000,000

Safeguard mechanism to prevent queries that request too many rows from large tables. Queries that read more than this many rows from Solr will fail; increase this threshold for larger Solr clusters that can handle streaming more rows concurrently.

Tip
The Fusion SQL service is designed for executing analytics-style queries over large data sets. You need to provide ample CPU and memory so that queries execute efficiently and can leverage Spark’s in-memory caching for joins and aggregations.

Here’s an example of increasing the resources for the Fusion SQL service:

curl -H 'Content-type:application/json' -X PUT -d '8' "http://localhost:8765/api/v1/configurations/fusion.sql.cores"
curl -H 'Content-type:application/json' -X PUT -d '8' "http://localhost:8765/api/v1/configurations/fusion.sql.executor.cores"
curl -H 'Content-type:application/json' -X PUT -d '2g' "http://localhost:8765/api/v1/configurations/fusion.sql.memory"
curl -H 'Content-type:application/json' -X PUT -d '8' "http://localhost:8765/api/v1/configurations/fusion.sql.default.shuffle.partitions"

If you change any of these settings, you must restart the Fusion SQL service with ./sql restart (on Unix) or sql.cmd restart (on Windows).

The Fusion SQL service is a long-running Spark application and, as such, it holds on to the resources (CPU and memory) allocated to it using the aforementioned settings. Consequently, you might need to reconfigure the CPU and memory allocation for other Fusion Spark jobs to account for the resources given to the Fusion SQL service. In other words, any resources you give to the Fusion SQL service are no longer available for running other Fusion Spark jobs. For more information on adjusting the CPU and memory settings for Fusion Spark jobs, see the Spark configuration settings.

Hive configuration

Behind the scenes, the Fusion SQL service is based on Hive. Use the hive-site.xml file in /opt/fusion/4.2.x/conf/ (on Unix) or C:\lucidworks\fusion\4.2.x\conf\ (on Windows) to configure Hive settings.

If you change hive-site.xml, you must restart the Fusion SQL service with ./sql restart (on Unix) or sql.cmd restart (on Windows).

Using virtual tables with a common join key

With Solr, you can index different document types into the same shard using the composite ID router based on a common route key field. For example, a customer 360 application can index different customer-related document types (contacts, apps, support requests, and so forth) into the same collection, each with a common customer_id field. This lets Solr perform optimized joins between the document types using the route key field. This configuration uses Solr’s composite ID routing, which ensures that all documents with the same join key field end up in the same shard. See Document Routing.

Providing a compositeIdSpec for the Fusion collection

Before indexing, you need to provide a compositeIdSpec for the Fusion collection. For example:

curl -u $FUSION_USER:$FUSION_PASS -X POST -H "Content-type:application/json" \
  -d '{"id":"customer","solrParams":{"replicationFactor":1,"numShards":1,"maxShardsPerNode":10},"type":"DATA","compositeIdSpec":{"routeKey1Field":"customer_id_s"}}' \
  "$FUSION_API/apps/$MYAPP/collections?defaultFeatures=false"

In the example request above, we create a collection named customer with the route key field set to customer_id_s. When documents are indexed through Fusion, the Solr Index pipeline stage uses the compositeIdSpec to create a composite document ID, so documents get routed to the correct shard.

Exposing document types as virtual tables

If you configure your Fusion collection to use a route key field to route different document types to the same shard, then the Fusion SQL service can expose each document type as a virtual table and perform optimized joins between these virtual tables using the route key. To create virtual tables, you simply need to use the Fusion Catalog API on the data asset for the main collection to set the name of the field that determines the document type. For example, if you have a collection named customer that contains different document types (contacts, support tickets, sales contracts, and so forth), then you would set up virtual tables using the following Catalog API update request:

curl -XPUT -H "Content-type:application/json" http://localhost:8765/api/v1/catalog/fusion/assets/customer -d '{
  "projectId" : "fusion",
  "name" : "customer",
  "assetType" : "table",
  "description" : "Fusion collection customer",
  "format" : "solr",
  "options" : [ "collection -> customer", "exclude_fields -> _lw_*,*_\\d_coordinate,_raw_content_", "solr.params -> sort=id asc" ],
  "cacheOnLoad" : false,
  "id" : "fusion.customer",
  "additionalSettings": {
    "virtualTableField":"doc_type_s"
  }
}'

In the example above, we set the virtualTableField to doc_type_s. Fusion sends a facet request to the customer collection to get the unique values of the doc_type_s field and creates a data asset for each unique value. Each virtual table is registered in the Fusion SQL service as a table.

Performing optimized joins in SQL

After you have virtual tables configured and documents routed to the same shard using a compositeIdSpec, you can perform optimized joins in SQL that take advantage of Solr’s domain-join facet feature. For example, the following SQL statement results in a JSON facet request to Solr to perform the aggregation:

select count(1) num_support_requests,
       c.industry as industry,
       a.app_id as app_id,
       a.feature_id as feature_id
from customer c
join support s on c.customer_id = s.customer_id
join apps a on s.customer_id = a.customer_id
where c.region='US-East' AND s.support_type='Enhancement' AND a.app_type='Search'
group by industry, app_id, feature_id

In the example above, we compute the number of feature enhancement requests for Search applications from customers in the US-East region by performing a 3-way join between the customer, support, and apps virtual tables using the customer_id join key. Behind the scenes, Fusion SQL performs a JSON facet query that exploits all documents with the same customer_id value being in the same shard. This lets Solr compute the count for the industry, app_id, feature_id group by key more efficiently than is possible using table scans in Spark.

Using Kerberos for JDBC authentication

Use the following steps to configure the Fusion SQL service to use Kerberos for authentication.

  1. Create a service principal and keytab; your Active Directory or Kerberos administrator will know how to do this. At a minimum, enable the AES 128-bit encryption. You can use 256, but you’ll have to install the JCE extensions.

    This is an example command to create a keytab file for the service account:

    ktpass /out c:\fusion.service.keytab /princ fusion/sawsserver@FUSIONSQL.LOCAL /rndpass /ptype KRB5_NT_PRINCIPAL /mapUser fusion@FUSIONSQL.LOCAL -mapOp set -crypto AES128-SHA1
  2. Copy the keytab file to the Fusion conf directory.

  3. Update the file conf/hive-site.xml (on Unix) or conf\hive-site.xml (on Windows) to use Kerberos authentication and the correct principal and keytab file installed in step 2.

    On Unix:

    <property>
      <name>hive.server2.authentication</name>
      <value>Kerberos</value>
    </property>
    <property>
      <name>hive.server2.authentication.Kerberos.principal</name>
      <value>fusion/sawsserver@FUSIONSQL.LOCAL</value>
    </property>
    <property>
      <name>hive.server2.authentication.Kerberos.keytab</name>
      <value>./conf/fusion.service.keytab</value>
    </property>

    On Windows:

    <property>
      <name>hive.server2.authentication</name>
      <value>Kerberos</value>
    </property>
    <property>
      <name>hive.server2.authentication.Kerberos.principal</name>
      <value>fusion/sawsserver@FUSIONSQL.LOCAL</value>
    </property>
    <property>
      <name>hive.server2.authentication.Kerberos.keytab</name>
      <value>conf\fusion.service.keytab</value>
    </property>
  4. Install the file that contains information about your Kerberos realm on the Fusion server.

    On Unix:

    Place the file krb5.conf in the etc directory.

    On Windows:

    Place the file krb5.ini in the C:\Windows directory.

  5. Update the file conf/fusion.properties (on Unix) or conf\fusion.properties (on Windows) to point to the file krb5.conf (on Windows) or krb5.ini (on Windows) installed in step 4.

    On Unix:

    sql.jvmOptions = -Xmx1g -Djava.security.krb5.conf=/etc/krb5.conf

    On Windows:

    sql.jvmOptions = -Xmx1g -Djava.security.krb5.conf=C:\Windows\krb5.ini

Key features

Searching and Sorting

Scoring

The WHERE and ORDER BY clauses can be used to search and sort results using the underlying search engine. The score (lower case) keyword can be used to sort by the relevance score of a full text query.

An example of a query that uses the score keyword is below:

select id, title, score from books where abstract = 'hello world' order by score desc

Searching

Search predicates are specified in the WHERE clause. Search predicates on text fields will perform full text searches. Search predicates on string fields will perform exact matches unless the LIKE expression is used.

By default all multi-term predicates are sent to the search engine as phrase queries. In the example above 'hello world' is searched as a phrase query.

To stop the auto-phrasing of multi-term predicates wrap parenthesis around the terms. For example:

select id, title, score from books where abstract = '(hello world)' order by score desc

In the example above the '(hello world)' search predicate will be sent to the search engine without phrasing and perform the query hello OR world

When parenthesis are used the search expression is sent to Solr unchanged. This allows for richer search predicates such as proximity search.

The example below performs a proximity search:

select id, title, score from books where abstract = '("hello world"~4)' order by score desc

Lucene/Solr wildcards can be sent to the search engine directly using this syntax:

select id, title from books where abstract = '(he?lo)'

The LIKE clause can be used to perform wildcard searches with either the Solr wildcard symbol * or the SQL wildcard symbol %.

When using the traditional SQL % wildcard only leading and trailing wildcards are supported. Use Lucene/Solr wildcards as described above for more complex wildcards.

The example below shows a LIKE query with a trailing % wildcard.

select id, title from books where abstract like 'worl%'

The following operators are supported for numeric and datetime predicates: <, >, >=, <=, =, !=.

Both the IN and BETWEEN clauses can be used to specify predicates.

Boolean predicates can be used and are translated to boolean search queries.

The example below specifies a boolean query:

select id from products where prod_desc = 'bike' and price < 125 order by price asc

Sorting

Numeric, datetime and string fields can be sorted on using the ORDER BY clause. The sort is pushed down to the search engine for optimal performance. Multiple sorts can be specified using the standard SQL sytnax.

The example below sorts on a numeric field:

select id, prod_name, price_f from products where prod_desc = 'bike' order by price_f desc

Single and Multi-dimension SQL aggregations

SQL aggregations are translated to Solr facet queries to take advantage of Solr’s distributed aggregation capabilities. This allows for interactive data analysis over large data sets.

Single and multi-dimension aggregation using supported aggregation functions operate over the entire query result and are designed to return accurate results. The supported aggregation functions that are fully pushed down to the search engine are: count(*), count(distinct), sum, avg, min, max.

An example of a SQL aggregation that is translated to a Solr facet query is below:

select company_name, count(*) as cnt from orders group by company_name
order by cnt desc

Having Clause

A HAVING clause can also be applied to single and multi-dimension aggregations.

Time series aggregations

Fusion SQL provides a powerful and flexible time series aggregation query through the use of the date_format function. Aggregations that group by a date_format are translated to a Solr range facet query. This allows for fast, interactive time series reporting over large data sets.

An example of a time series aggregation is shown below:

select date_format(rec_time, 'yyyy-MM') as month, count(*) as cnt
from logrecords where rec_time > '2000-01-01' and rec_time < '2010-01-01'
group by month

The date_format function is used to specify both the output format and the time interval in one compact pattern as specified by the Java SimpleDateFormat class.

The example above is performing a monthly time series aggregation over the rec_time field which is a datetime field.

To switch to a daily time series aggregation all that is needed is to change the date pattern:

select date_format(rec_time, 'yyyy-MM-dd') as day, count(*) as cnt
from logrecords where rec_time > '2000-01-01' and rec_time < '2000-12-31'
group by day

Date math predicates

Fusion SQL also supports date math predicates through the date_add, date_sub, and current_date functions.

Below is an example of the use of date math predicates.

select date_format(rec_time, 'yyyy-MM-dd') as day, count(*) as cnt
from logrecords where rec_time > date_sub(current_date(), 30)
group by day

Auto-filling of time intervals

Fusion SQL automatically fills any time interval that does not contain data with with zeroes. This ensures that the full time range is included in the output which makes the time series results easy to visualize in charts.

Sort Order

Time series aggregations are sorted by default in time ascending order. The ORDER BY clause can be used to sort time series aggregation results in a different order.

Having Clause

A HAVING clause can also be applied to a time series query to limit the results to rows that meet specific criteria.

Sampling and Statistics

Sampling is often used in statistical analysis to gain an understanding of the distribution, shape and dispersion of a variable or the relationship between variables.

Fusion SQL returns a random sample for all basic selects that do not contain an ORDER BY clause. The random sample is designed to return a uniform distribution of samples that match a query. The sample can be used to infer statistical information about the larger result set.

The example below returns a random sample of single field:

select filesize_d from logs where year_i = 2019

If no limit is specified the sample size will be 25000. To increase the sample size add a limit larger then 25000.

select filesize_d from logs where year_i = 2019 limit 50000

The ability to subset the data with a query and then sample from that subset is called Stratified Random Sampling. Stratified Random Sampling is an important statistical technique used to better understand sub-populations of a larger data set.

Descriptive Statistics

Sub-queries can be used to return random samples for fast, often sub-second, statistical analysis. For example:

select count(*) as samplesize,
       mean(filesize_d) as mean,
       min(filesize_d) as min,
       max(filesize_d) as max,
       approx_percentile(filesize_d, .50) as median,
       variance(filesize_d) as variance,
       std(filesize_d) as standard_dev,
       skewness(filesize_d) as skewness,
       kurtosis(filesize_d) as kurtosis,
       sum(filesize_d) as sum
           from (select filesize_d from logs where year_i = 2019 limit 50000)

In the example above the sub-query is returning a random sample of 50000 results which is operated on by the main statistical query. The statistical query returns aggregations which describe the distribution, shape and dispersion of the sample set.

Correlation and Covariance

Sub-queries can be used to provide random samples for correlation and covariance:

select corr(filesize_d, response_d) as correlation,
       covar_samp(filesize_d, response_d) as covariance
            from (select filesize_d, response_d from logs limit 50000)

In the example above the random sample returns two fields to the corr and covar_samp functions in the main query. Correlation and covariance are used to show the strength of the linear relationship between two variables.

Numeric Histograms

Sub-queries can be used to provide random samples as input for numeric histograms:

select histogram_numeric(filesize_d, 12) as hist
    from (select filesize_d from testapp limit 50000)

In the example above the random sample is operated on by the histogram_numeric function which is returning a histogram with 12 bins. Histograms are used to visualize the shape of a distrubition.

The histogram_numeric function returns an array containing a struct for each bin. For visualization tools to display the histogram it will often need to be exploded into a result table. The explode function can be combined with the LATERAL_VIEW clause to return the histograms as a table.

SELECT CAST(hist.x as double) as bin_center,
       CAST(hist.y as double) as bin_height
FROM (select histogram_numeric(filesize_d, 12) as response_hist from (select filesize_d from testapp limit 50000)) a
LATERAL VIEW explode(response_hist) exploded_table as hist

Pushed Down Statistical Queries

A narrower set of statistical aggregations can be pushed down to the search engine and operate over entire result sets. These functions are: count(*), count(distinct), sum, min, max, avg and approx_percentile.

Below is an example of a fully pushed down statistical query:

select count(*) as cnt, avg(filesize_d) as avg, approx_percentile(filesize_d, .50) as median from logs where year_i = 2018

Statistical queries that contain a mix of the queries above and non-pushdown such as skewness or kurtosis will be operate over a random sample that matches the query.

Below is an example of a statisical query that operates over a random sample:

select count(*) as cnt,  skewness(filesize_d) as skewness from logs where year_i = 2018