Kerberos Support

Each of the connectors support indexing in a cluster running Kerberos security. Generally, this is as simple as running kinit as the user who will execute the indexing task.

Kerberos support is available in several different ways:

  • Index content from Hadoop components such as the filesystem, Hive, or Pig secured with Kerberos.

  • Store Solr indexes in a Hadoop filesystem secured with Kerberos.

  • Secure the Solr cluster with Kerberos for all internode communication and access to the Admin UI.

Each approach requires configuration, outlined in the sections below.

Index Content from a Kerberos-Secured Cluster

If Solr is not secured with Kerberos for internode communication, indexing content from a Kerberos-secured cluster is as simple as using kinit to get a ticket for the service principal (user) who will be running the connector.

For example, if using the pig user to use a Pig script while indexing, you should get a Kerberos ticket for the pig user to allow it to authenticate.

To get the ticket, simply issue this command:

To verify that you have a ticket for the user, you can use the klist command. Here is a sample output from that command:

Ticket cache: FILE:lucid.cache
Default principal: user@HADOOP-VM1

Valid starting    Expires           Service principal
04/08/1513:49:06  04/09/1513:49:06  krbtgt/HADOOP-VM1@HADOOP-VM1
      renew until 04/08/1513:49:06

Once you have a ticket, you can use the connector as normal. No additional authentication or parameters are required.

Store Solr Indexes in HDFS

When storing your Solr indexes in HDFS when the Hadoop cluster is secured with Kerberos, you will need to configure Solr with the keytab location and service principal to use for communication with the Hadoop cluster.

See the section Kerberos Support in the HDP Search Installation Guide for more information.

Secure Solr with Kerberos

The Apache Solr Reference Guide includes a section on how to use Kerberos authentication with Solr: Kerberos Authentication.

When Solr is secured with Kerberos for internode communication and access to the Admin UI, you will need to create a JAAS file and service principal to allow the Hadoop components to write to the secured Solr indexes. How to reference the JAAS file is covered in the documentation for each connector. This file must be copied to each NodeManager node in the cluster (i.e., every node where map/reduce tasks are executed).

Hadoop Job Jar

The Lucidworks Hadoop Job Jar allows indexing content from HDFS to Solr. This section describes the Job Jar and how to use it.

Hadoop Job Jar Overview

The Job Jar is a connector of sorts, allowing you to index many different types of content stored in HDFS. It uses MapReduce to leverage the scaling qualities of Apache Hadoop while indexing content to Solr. It can be run from any location.

The Job Jar makes use of ingest mappers to parse documents and prepare them to be added to a Solr index. The selection of the ingest mapper is one of the arguments provided to the job when it’s started. Details of the available ingest mappers are included below.

How to Use

The job jar allows you to index many different types of content stored in HDFS to Solr. It uses MapReduce to leverage the scaling qualities of Apache Hadoop while indexing content to Solr.

The job jar can be run from any location in your Hadoop cluster, but requires a Hadoop client if used on a server where Hadoop (bin/hadoop)) is not installed. The client allows the job jar to to submit the job to Hadoop. The name and availability of a client will vary depending on the Hadoop distribution vendor. See your vendor for more information about how to download a client and configure it if you need one.

The job jar uses ingest mappers to parse documents and prepare them to be added to a Solr index. The selection of the ingest mapper is one of the arguments provided to the job when it is started. Details of the available ingest mappers are included in the section Ingest Mappers.

How it Works

The job jar consists of a series of MapReduce-enabled jobs to convert raw content into documents for indexing to Solr.

You must use the job jar with a user that has permissions to write to the hadoop.tmp.dir. The /tmp directory in HDFS must also be writable.

The Hadoop job jar works in three stages designed to take in raw content and output results to Solr. These stages are:

  1. Create one or more SequenceFiles from the raw content. This is done in one of two ways:

    1. If the source files are available in a shared Hadoop filesystem, prepare a list of source files and their locations as a SequenceFile. The raw contents of each file are not processed until step 2.

    2. If the source files are not available, prepare a list of source files and the raw content. This process is done sequentially and can take a significant amount of time if there are a large number of documents and/or if they are very large.

  2. Run a MapReduce job to extract text and metadata from the raw content.

  3. Run a MapReduce job to send the extracted content from HDFS to Solr using the SolrJ client. This implementation works with SolrJ’s CloudServer Java client which is aware of where Solr is running via Zookeeper.

Incremental indexing, where only changes to a document or directory are processed on successive job jar runs, is not supported. All three steps will be completed each time the job jar is run, regardless of whether the original content has changed.

The first step of the process converts the input content into a SequenceFile. In order to do this, the entire contents of that file must be read into memory so that it can be written out as a LWDocument in the SequenceFile. Thus, you should be careful to ensure that the system does not load into memory a file that is larger than the Java heap size of the process.

Ingest Mappers

Ingest mappers in the job jar parse documents and prepare them for indexing to Solr.

When configuring the arguments for the job, selection of the correct ingest mapper is key to successful indexing of documents. In some cases, the choice of ingest mapper will determine the arguments you need to provide to the job jar.

The ingest mapper choice is passed to the job jar with the -cls argument. Available job jar arguments are described in detail in the section Job Jar Arguments below.

There are several supported ingest mappers, described in detail below.

CSV Ingest Mapper

This ingest mapper allows you to index files in CSV format. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.CSVIngestMapper.

There are several additional arguments that can be supplied when using this ingest mapper. These are described in detail in the section CSV Ingest Mapper Arguments. For reference, these are the additional arguments:

  • csvDelimiter: the character that is used to separate values for different fields.

  • csvFieldMapping: define default field mapping from column names to Solr fields.

  • csvFirstLineComment: declare that the first line of the document is a comment.

  • idField: the column to be used as the document ID.

  • csvStrategy: the format of the CSV file.

Supports: TextInputFormat documents.

Directory Ingest Mapper

This ingest mapper allows you to index documents found in a defined directory. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.DirectoryIngestMapper.

There are no special arguments for this ingest mapper.

When using this ingest mapper, you may want to also use Apache Tika to parse the files. See the -Dlw.tika.process parameter below for details on how to flag the job to use Apache Tika and add the required .jar.

Grok Ingest Mapper

This ingest mapper allows you to index log files based on a Logstash configuration file. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.GrokIngestMapper.

LogStash filters such as grok, kv, date, etc., and grok patterns such as ID and WORD are supported. More information about Grok is available at http://logstash.net/docs/1.4.0/filters/grok.

During processing, any input and output statements in the configuration file will be ignored. The input will always be HDFS and the output will always be Solr.

There is one additional argument for this ingest mapper, grok.uri, which defines the location of the Logstash configuration file, in either the local filesystem or HDFS. More details are in the section Grok Ingest Mapper Arguments.

Supports: TextInputFormat documents.

RegEx Ingest Mapper

This ingest mapper allows definition of a regular expression that is used on the incoming file to extract content. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.RegexIngestMapper.

The ingest mapper expects that the key and value produced by the InputFormat are both Writable. The regular expression is only applied to the value.

There are three additional arguments that can be supplied with this ingest mapper, described in detail in the section Regular Expression Ingest Mapper Arguments. For reference, these additional properties are:

  • com.lucidworks.hadoop.ingest.RegexIngestMapper.regex: define a regular expression.

  • com.lucidworks.hadoop.ingest.RegexIngestMapper.groups_to_fields: map fields between regex capture groups and field names.

  • com.lucidworks.hadoop.ingest.RegexIngestMapper.match: use Java’s match method instead of find.

SequenceFile Ingest Mapper

This ingest mapper allows you to index a SequenceFile. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.SequenceFileIngestMapper.

If the type for the value of the key/value pair is "text", the string will be used, otherwise the raw bytes will be written.

There are no special arguments for this ingest mapper.

Supports: SequenceFileInputFormat documents.

SolrXML Ingest Mapper

This ingest mapper allows you to index a file in SolrXML format. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.SolrXMLIngestMapper.

The file should be in a SequenceFileInputFormat, where the key is any Writable and the value is text in SolrXML format. The default inputFormat of SequenceFileInputFormat can be overridden if required.

This mapper requires that the idField parameter be set when creating the workflow job. For more details, see the section SolrXML Ingest Mapper Arguments.

Only "add" commands in the SolrXML will be processed. All other commands will be ignored.

Supports: SequenceFileInputFormat documents.

XML Ingest Mapper

This ingest mapper allows you to index a file inXML format. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.XMLIngestMapper.

This mapper requires that the docXPathExpr parameter be set when creating the workflow job. For more details, see the section XML Ingest Mapper Arguments.

Supports: XMLInputFormat documents.

WARC Ingest Mapper

This ingest mapper allows you to index web archive (.warc) files in WarcFileInputFormat. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.WarcIngestMapper.

There are no special arguments for this ingest mapper.

Supports: WarcFileInputFormat documents.

Zip Ingest Mapper

This ingest mapper allows you to index documents contained in .zip files. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.ZipIngestMapper.

There are no special arguments for this ingest mapper. However, when using this ingest mapper, you may want to also use Apache Tika to parse the files. See the -Dlw.tika.process parameter below for details on how to flag the job to use Apache Tika and add the required .jar.

Supports: ZipFileInputFormat documents.

Job Jar Arguments

The job jar arguments allow you to define the type of content in your Hadoop filesystem, choose the ingest mappers appropriate for that content, and set other job parameters as needed.

There are three main sections to the job jar arguments:

  • the main class

  • system and mapper-specific arguments

  • key-value pair arguments

The arguments must be supplied in the above order.

The available arguments and parameters are described in the following sections.

Main Class

The main class must be specified. For all of the mappers available, it is always defined as com.lucidworks.hadoop.ingest.IngestJob.

System and Mapper-specific Arguments

System or Mapper-specific arguments, defined with a pattern of -Dargument=value, are supplied after the class name. In many cases, the arguments chosen depend on the ingest mapper chosen. The ingest mapper will be defined later in the argument string.

There are several possible arguments:

Ingest Behavior Arguments
-Dlww.commit.on.close

Defines if a commit should be done when the connection to Solr is complete. Commits in Solr flush documents to disk instead of holding them in memory. A commit is required for the documents to be searchable. There are settings in Solr to perform automatic commits when the queue grows to a certain size (see UpdateHandlers in SolrConfig in the Apache Solr Reference Guide for more on commits).

Default: false. Required: No.

-Dadd.subdirectories

If true, the exploration of a folder will be recursive, meaning it will look for subdirectories to traverse for documents.

Default: false. Required: No.

-Dlw.tika.process

If true, Apache Tika will be used to parse files. This is most commonly needed when using the DirectoryIngestMapper or the ZipIngestMapper.

Default: false. Required: No.

If -Dlw.tika.process is set to true, the solr-hadoop-tika-2.2.8.jar (including the path, if necessary) should be added to the job arguments with -libjars argument.
CSV Ingest Mapper Arguments

These arguments are used only when the CSVIngestMapper is chosen with the -cls property described in the section, Key-Value Pair Arguments, below.

-DcsvDelimiter

This is the file delimiter for CSV content.

Default: , (comma). Required: No.

-DcsvFieldMapping

This defines how to map columns in a CSV file to fields in Solr, in the format of 0=id. The key is a zero-based column number (the first column is always "0", the second column is "1", etc.), and the value is the name of the field to use to store the value in Solr. If this is not set, column 0 is used as the id, unless there is a column named 'id'. See the -DidField argument below for more on the ID field rules.

Default: none. Required: No.

-DcsvFirstLineComment

If true, the first line in a CSV file will be interpreted as a comment out and will not be indexed.

Default: false. Required: No.

-DcsvStrategy

Defines the format of a CSV file. Three formats are supported:

  • default: a CSV file that adheres to the RFC-4180 standard.

  • excel: a CSV file exported from Microsoft Excel. This commonly uses a comma (,) as the field delimiter.

  • tdf: a tab-delimited CSV file. If you use the tdf strategy, you do not need to override the delimiter with the -DcsvDelimiter argument.

    Default: default. Required: No.

-DidField

The column to be used as an ID. The field name used is the name after any mapping that occurs as a result of the -DcsvFieldMapping argument. If there is a column named 'id' and it is different from the field named with this property, you will get an error because you have defined two IDs and IDs must be unique.

This argument is not required when using the CSV Ingest Mapper, but is required when using the SolrXML Ingest Mapper.

Default: none. Required: No.

Grok Ingest Mapper Arguments

These arguments are used only when the GrokIngestMapper is chosen with the -cls property described in the section, Key-Value Pair Arguments, below.

-Dgrok.uri

The path to a Logstash configuration file, which can be in the local filesystem (file:///path/logStash.conf) or in HDFS (hdfs://path/logStash.conf).

Default: none. Required: No.

Regular Expression Ingest Mapper Arguments

These arguments are used only when the RegexIngestMapper is chosen with the -cls property described in the section, Key-Value Pair Arguments, below.

-Dcom.lucidworks.hadoop.ingest.RegexIngestMapper.regex

A Java Pattern compliant Regex. See Pattern Javadocs for more details. This property cannot be null or empty.

Default: none. Required: No.

-Dcom.lucidworks.hadoop.ingest.RegexIngestMapper.groups_to_fields

A comma-separated mapping (such as key=value,key=value,…​) between regular expression capturing groups and field names. The key must be an integer and the value must be a String. For instance, 0=body,1=text. Any capturing group not represented in the map will not be added to the document.

Default: none. Required: No.

-Dcom.lucidworks.hadoop.ingest.RegexIngestMapper.match

If true, the mapper will use Java’s Matcher class matches method instead of the find method. This will require the regex to match the entire string instead of part of the string.

Default: none. Required: No.

SolrXML Ingest Mapper Arguments

These arguments are used only when the SolrXMLIngestMapper is chosen with the -cls property described in the section, Key-Value Pair Arguments, below.

-DidField

The field in the XML document to be used as a unique document ID in the index.

This argument is required when using the SolrXML Ingest Mapper, but not required when using the CSV Ingest Mapper.

Default: none. Required: Yes.

Xml Ingest Mapper Arguments

These arguments are used only when the XMLIngestMapper is chosen with the -cls property described in the section, Key-Value Pair Arguments, below.

-Dlww.xslt

The path in hdfs to a xslt configuration file.

Default: none. Required: No.

-Dlww.xml.docXPathExpr

XMl XPath expressions for the document document.

Default: \. Required: Yes.

-Dlww.xml.idXPathExpr

XMl XPath expressions for the document id.

Default: none. Required: No.

-Dww.xml.includeParentAttrsPrefix

Pull parent node attributes by adding a prefix, if desired.

Default: none. Required: No.

-Dlww.xml.start

The start key tag from the xml.

Default: none. Required: Yes.

-Dlww.xml.end

The end key tag from the xml.

Default: none. Required: Yes.

Other arguments not described here (such as Hadoop-specific system arguments) can be supplied as needed and they will be added to the Hadoop configuration. These arguments should be defined with the -Dargument=value syntax.

Key-Value Pair Arguments

Key-value pair arguments apply to the ingest job generally. These arguments are expressed as -argument value. They are the last arguments supplied before the jar name is defined.

There are several possible arguments:

-cls

Required.

The ingest mapper class. This class must correspond to the content being indexed to ensure proper parsing of documents. See the section Ingest Mappers for a detailed explanation of each available ingest mapper.

  • com.lucidworks.hadoop.ingest.GrokIngestMapper

  • com.lucidworks.hadoop.ingest.CSVIngestMapper

  • com.lucidworks.hadoop.ingest.DirectoryIngestMapper

  • com.lucidworks.hadoop.ingest.RegexIngestMapper

  • com.lucidworks.hadoop.ingest.SequenceFileIngestMapper

  • com.lucidworks.hadoop.ingest.SolrXMLIngestMapper

  • com.lucidworks.hadoop.ingest.WarcIngestMapper

  • com.lucidworks.hadoop.ingest.ZipIngestMapper

-c

Required.

The collection name where documents should be indexed. This collection must exist prior to running the Hadoop job jar.

-of

Required.

The output format. For all cases, you can use the default com.lucidworks.hadoop.io.LWMapRedOutputFormat.

-i

Required.

The path to the Hadoop input data. This path should point to the HDFS directory. If the defined location is not a specific filename, the syntax must include a wildcard expression to find documents, such as /data/*.

-s

The Solr URL when running in standalone mode. In a default installation, this would be http://localhost:8983/solr. Use this parameter when you are not running in SolrCloud mode. If you are running Solr in SolrCloud mode, you should use -zk instead.

-zk

A list of ZooKeeper hosts, followed by the ZooKeeper root directory. For example, 10.0.1.1:2181,10.0.1.2:2181,10.0.1.3:2181/solr would be a valid value.

This parameter is used when running in SolrCloud mode, and allows the output of the ingest process to be routed via ZooKeeper to any available node. If you are not running in SolrCloud mode, use the -s argument instead.

-redcls

The class name of a custom IngestReducer, if any. In order for this to be invoked, you must also set -ur to a value higher than 0. If no value is specified, then the default reducer is used, which is com.lucidworks.hadoop.ingest.IngestReducer.

-ur

The number of reducers to use when outputting to the OutputFormat. Depending on the output format and your system resources, you may wish to have Hadoop do a reduce step so the output resource is not overwhelmed. The default is 0, which is to not use any reducers.

Summary of Argument Order

Using this example job jar argument:

bin/hadoop jar /path/to/solr-hadoop-job-2.2.8.jar (1)

   com.lucidworks.hadoop.ingest.IngestJob (2)

   -Dlww.commit.on.close=true -DcsvDelimiter=| (3)

   -cls com.lucidworks.hadoop.ingest.CSVIngestMapper -c gettingstarted -i /data/CSV -of com.lucidworks.hadoop.io.LWMapRedOutputFormat -s http://localhost:8888/solr (4)

We can summarize the proper order as follows:

1 The Hadoop command to run a job. This includes the path to the job jar (as necessary).
2 The main ingest class.
3 Mapper arguments, which vary depending on the Mapper class chosen, in the format of -Dargument=value.
4 Key-value arguments, which include the ingest mapper, Solr collection name, and other parameters, in the format of -argument value.

Unresolved directive in Guide-Jobs.adoc - include::../common/connectors/jobjar-examples.adoc[]

Pig Store/Load Functions

If you use Apache Pig to do large-scale data processing, you can use Pig scripts during indexing of content to Solr. For instance, you can use Pig to do large-scale preprocessing and joining of data and then output the resulting datasets to Solr so that you can more effectively query that data. This section describes the Pig Functions and how to use them.

Pig Functions

A single .jar provides functions for processing content before it is indexed to Solr. This jar is found at:

/opt/lucidworks-hdpsearch/pig/solr-pig-functions-2.2.8.jar

The appropriate Pig function jar must be put in HDFS in order to be used by your Pig script. It can be located anywhere in HDFS; you will supply the path to the proper jar when invoking your script.

Available Functions

The Pig functions included in the solr-pig-functions-2.2.8.jar are three UserDefined Functions (UDF) and two Store functions. These functions are:

  • com/lucidworks/hadoop/pig/SolrStoreFunc.class

  • com/lucidworks/hadoop/pig/FusionIndexPipelinesStoreFunc.class

  • com/lucidworks/hadoop/pig/EpochToCalendar.class

  • com/lucidworks/hadoop/pig/Extract.class

  • com/lucidworks/hadoop/pig/Histogram.class

Register the Functions

There are two approaches to using functions in Pig: REGISTER them in the script, or load them with your Pig command line request.

If using REGISTER, the Pig function jars must be put in HDFS in order to be used by your Pig script. It can be located anywhere in HDFS; you can either supply the path in your script or use a variable and define the variable with -p property definition.

The example below uses the second approach, loading the jars with the -Dpig.additional.jars system property when launching the script. With this approach, the jars can be located anywhere on the machine where the script will be run.

Indexing Data to Solr

There are a few required parameters for your script to output data to Solr for indexing.

These parameters can be defined in the script itself, or turned into variables that are defined each time the script runs. The example Pig script below shows an example of using these parameters with variables.

solr.zkhost

The ZooKeeper connection string if using Solr in SolrCloud mode. This should be in the form of server:port,server:port,server:port/chroot.

If you are not using SolrCloud, use the solr.server.url parameter instead.

solr.server.url

The location of the Solr instance when Solr is running in standalone mode. This should be in the form of http://server:port/solr.

solr.collection

The name of the Solr collection where documents will be indexed.

Indexing to a Kerberos-Secured Solr Cluster

When a Solr cluster is secured with Kerberos for internode communication, Pig scripts must include the full path to a JAAS file that includes the service principal and the path to a keytab file that will be used to index the output of the script to Solr.

Two parameters provide the information the script needs to access the JAAS file:

lww.jaas.file

The path to the JAAS file that includes a section for the service principal who will write to the Solr indexes. For example, to use this property in a Pig script:

set lww.jaas.file '/path/to/login.conf';

The JAAS configuration file must be copied to the same path on every node where a Node Manager is running (i.e., every node where map/reduce tasks are executed).

lww.jaas.appname

The name of the section in the JAAS file that includes the correct service principal and keytab path. For example, to use this property in a Pig script:

set lww.jaas.appname 'Client';

Here is a sample section of a JAAS file:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (2)
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (3)
};
1 The name of this section of the JAAS file. This name will be used with the lww.jaas.appname parameter.
2 The location of the keytab file.
3 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and Pig.

Indexing to a SSL-Enabled Solr Cluster

When SSL is enabled in a Solr cluster, Pig scripts must include the full paths to the keystore and truststore with their respective passwords.

set lww.keystore '/path/to/solr-ssl.keystore.jks'
set lww.keystore.password 'secret'
set lww.truststore '/path/to/solr-ssl.truststore.jks'
set lww.truststore.password 'secret'
The paths (and secret configurations) should be the same in all YARN/MapReduce hosts.

Sample CSV Script

The following Pig script will take a simple CSV file and index it to Solr.

set solr.zkhost '$zkHost';
set solr.collection '$collection'; (1)

A = load '$csv' using PigStorage(',') as (id_s:chararray,city_s:chararray,country_s:chararray,code_s:chararray,code2_s:chararray,latitude_s:chararray,longitude_s:chararray,flag_s:chararray); (2)
--dump A;
B = FOREACH A GENERATE $0 as id, 'city_s', $1, 'country_s', $2, 'code_s', $3, 'code2_s', $4, 'latitude_s', $5, 'longitude_s', $6, 'flag_s', $7; (3)

ok = store B into 'SOLR' using com.lucidworks.hadoop.pig.SolrStoreFunc(); (4)

This relatively simple script is doing several things that help to understand how the Solr Pig functions work.

1 This and the line above define parameters that are needed by SolrStoreFunc to know where Solr is. SolrStoreFunc needs the properties solr.zkhost and solr.collection, and these lines are mapping the zkhost and collection parameters we will pass when invoking Pig to the required properties.
2 Load the CSV file, the path and name we will pass with the csv parameter. We also define the field names for each column in CSV file, and their types.
3 For each item in the CSV file, generate a document id from the first field ($0) and then define each field name and value in name, value pairs.
4 Load the documents into Solr, using the SolrStoreFunc. While we don’t need to define the location of Solr here, the function will use the zkhost and collection properties that we will pass when we invoke our Pig script.
When using SolrStoreFunc, the document ID must be the first field.

When we want to run this script, we invoke Pig and define several parameters we have referenced in the script with the -p option, such as in this command:

./bin/pig -Dpig.additional.jars=/path/to/solr-pig-functions-2.2.8.jar -p csv=/path/to/my/csv/airports.dat -p zkHost=zknode1:2181,zknode2:2181,zknode3:2181/solr -p collection=myCollection ~/myScripts/index-csv.pig

The parameters to pass are:

csv

The path and name of the CSV file we want to process.

zkhost

The ZooKeeper connection string for a SolrCloud cluster, in the form of zkhost1:port,zkhost2:port,zkhost3:port/chroot. In the script, we mapped this to the solr.zkhost property, which is required by the SolrStoreFunc to know where to send the output documents.

collection

The Solr collection to index into. In the script, we mapped this to the solr.collection property, which is required by the SolrStoreFunc to know the Solr collection the documents should be indexed to.

The zkhost parameter above is only used if you are indexing to a SolrCloud cluster, which uses ZooKeeper to route indexing and query requests.

If, however, you are not using SolrCloud, you can use the solrUrl parameter, which takes the location of a standalone Solr instance, in the form of http://host:port/solr.

In the script, you would change the line that maps solr.zkhost to the zkhost property to map solr.server.url to the solrUrl property. For example:

`set solr.server.url '$solrUrl';`

Hive SerDe

The Lucidworks Hive SerDe allows reading and writing data to and from Solr using Apache Hive. Data from Solr can be presented as a Hive table to be joined with other Hive tables, and data. Additionally, data from Hive can be inserted into Solr with an INSERT statement. This section describes the Hive SerDe and how to use it.

Hive SerDe Jars

Due to backward compatibility issues between releases of Hive, the Hive SerDe jar can only be used with Hive 0.12, 0.14, and 0.15.

The jar can be found at /opt/lucidworks-hdpsearch/hive/solr-hive-serde-2.2.8.jar. Do not use this jar if you are using Hive 0.13.

Features

  • Index Hive table data to Solr.

  • Read Solr index data to a Hive table.

  • Kerberos support for securing communication between Hive and Solr.

  • As of v2.2.4 of the SerDe, integration with Lucidworks Fusion is supported.

    • Fusion’s index pipelines can be used to index data to Fusion.

    • Fusion’s query pipelines can be used to query Fusion’s Solr instance for data to insert into a Hive table.

Install the SerDe Jar to Hive

In order for Hive to work with Solr, the Hive SerDe jar must be added as a plugin to Hive.

From a Hive prompt, use the ADD JAR command and reference the path and filename of the SerDe jar for your Hive version.

   hive> ADD JAR solr-hive-serde-2.2.8.jar;

This can also be done in your Hive command to create the table, as in the example below.

Indexing Data with a Hive External Table

Indexing data to Solr or Fusion requires creating a Hive external table. An external table allows the data in the table to be used (read or write) by another system or application outside of Hive.

Indexing Data to Solr

For integration with Solr, the external table allows you to have Solr read from and write to Hive.

To create an external table for Solr, you can use a command similar to below. The properties available are described after the example.

hive> CREATE EXTERNAL TABLE solr (id string, field1 string, field2 int)
      STORED BY 'com.lucidworks.hadoop.hive.LWStorageHandler'
      LOCATION '/tmp/solr'
      TBLPROPERTIES('solr.server.url' = 'http://localhost:8888/solr',
                    'solr.collection' = 'collection1',
                    'solr.query' = '*:*');

In this example, we have created an external table named "solr", and defined a custom storage handler (STORED BY 'com.lucidworks.hadoop.hive.LWStorageHandler').

The LOCATION indicates the location in HDFS where the table data will be stored. In this example, we have chosen to use /tmp/solr.

In the section TBLPROPERTIES, we define several properties for Solr so the data can be indexed to the right Solr installation and collection:

solr.zkhost

The location of the ZooKeeper quorum if using LucidWorks in SolrCloud mode. If this property is set along with the solr.server.url property, the solr.server.url property will take precedence.

solr.server.url

The location of the Solr instance if not using LucidWorks in SolrCloud mode. If this property is set along with the solr.zkhost property, this property will take precedence.

solr.collection

The Solr collection for this table. If not defined, an exception will be thrown.

solr.query

The specific Solr query to execute to read this table. If not defined, a default of *:* will be used. This property is not needed when loading data to a table, but is needed when defining the table so Hive can later read the table.

lww.commit.on.close

If true, inserts will be automatically committed when the connection is closed. True is the default.

lww.jaas.file

Used only when indexing to or reading from a Solr cluster secured with Kerberos.

This property defines the path to a JAAS file that contains a service principal and keytab location for a user who is authorized to read from and write to Solr and Hive.

The JAAS configuration file must be copied to the same path on every node where a Node Manager is running (i.e., every node where map/reduce tasks are executed). Here is a sample section of a JAAS file:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (2)
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (3)
};
1 The name of this section of the JAAS file. This name will be used with the lww.jaas.appname parameter.
2 The location of the keytab file.
3 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and Hive.
lww.jaas.appname

Used only when indexing to or reading from a Solr cluster secured with Kerberos.

This property provides the name of the section in the JAAS file that includes the correct service principal and keytab path.

If the table needs to be dropped at a later time, you can use the DROP TABLE command in Hive. This will remove the metadata stored in the table in Hive, but will not modify the underlying data (in this case, the Solr index).

Query and Insert Data to Hive

Once the table is configured, any syntactically correct Hive query will be able to query the index.

For example, to select three fields named "id", "field1", and "field2" from the "solr" table, you would use a query such as:

hive> SELECT id, field1, field2 FROM solr;

Replace the table name as appropriate to use this example with your data.

To join data from tables, you can make a request such as:

hive> SELECT id, field1, field2 FROM solr left
      JOIN sometable right
      WHERE left.id = right.id;

And finally, to insert data to a table, simply use the Solr table as the target for the Hive INSERT statement, such as:

hive> INSERT INTO solr
      SELECT id, field1, field2 FROM sometable;

Example Indexing Hive to Solr

Solr includes a small number of sample documents for use when getting started. One of these is a CSV file containing book metadata. This file is found in your Solr installation, at $SOLR_HOME/example/exampledocs/books.csv.

Using the sample books.csv file, we can see a detailed example of creating a table, loading data to it, and indexing that data to Solr.

CREATE TABLE books (id STRING, cat STRING, title STRING, price FLOAT, in_stock BOOLEAN, author STRING, series STRING, seq INT, genre STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; (1)

LOAD DATA LOCAL INPATH '/solr/example/exampledocs/books.csv' OVERWRITE INTO TABLE books; (2)

ADD JAR solr-hive-serde-2.2.8.jar; (3)

CREATE EXTERNAL TABLE solr (id STRING, cat_s STRING, title_s STRING, price_f FLOAT, in_stock_b BOOLEAN, author_s STRING, series_s STRING, seq_i INT, genre_s STRING) (4)
     STORED BY 'com.lucidworks.hadoop.hive.LWStorageHandler' (5)
     LOCATION '/tmp/solr' (6)
     TBLPROPERTIES('solr.zkhost' = 'zknode1:2181,zknode2:2181,zknode3:2181/solr',
                   'solr.collection' = 'gettingstarted',
                   'solr.query' = '*:*'), (7)
                   'lww.jaas.file' = '/data/jaas-client.conf'; (8)


INSERT OVERWRITE TABLE solr SELECT b.* FROM books b;
1 Define the table books, and provide the field names and field types that will make up the table.
2 Load the data from the books.csv file.
3 Add the solr-hive-serde-2.2.8.jar file to Hive. Note the jar name shown here omits the version information which will be included in the jar file you have. If you are using Hive 0.13, you must also use a jar specifically built for 0.13.
4 Create an external table named solr, and provide the field names and field types that will make up the table. These will be the same field names as in your local Hive table, so we can index all of the same data to Solr.
5 Define the custom storage handler provided by the solr-hive-serde-2.2.8.jar.
6 Define storage location in HDFS.
7 The query to run in Solr to read records from Solr for use in Hive.
8 Define the location of Solr (or ZooKeeper if using SolrCloud), the collection in Solr to index the data to, and the query to use when reading the table. This example also refers to a JAAS configuration file that will be used to authenticate to the Kerberized Solr cluster.

HBase Indexer

The HBase Indexer provides the ability to stream events from HBase to Solr for near real-time searching.

HBase Indexer Features

The HBase Indexer uses HBase replication to copy rows from HBase to the Solr index in near real-time.

This distribution is based on the original hbase-indexer project from NGDATA, with these additional features:

  • Support for HBase 0.94, 0.96, 0.98, 1.1.0 and 1.1.2

  • Support for Solr 5.x

  • Kerberos authentication

  • Output to Lucidworks Fusion pipelines (optional)

Due to the changes required for Solr 5.x support, builds from this fork of the main project will not work with Solr 4.x.

How the HBase Indexer Works

The HBase Indexer works as an HBase replication sink and runs as a daemon. When updates are written to HBase region servers, they are replicated to the HBase Indexer processes. These processes then pass the updates to Solr for indexing.

To use the HBase Indexer, there are several configuration files to prepare before using the HBase Indexer. Then a specific configuration for your your HBase table must be supplied to map HBase columns and rows to Solr fields and documents.

If you are already using HBase replication, the HBase Indexer process should not interfere.

HBase Indexer Daemon Setup

Each of the following configuration changes need to be made to use the HBase Indexer.

These instructions assume Solr is running and a collection already exists for the data to be indexed. Solr does not need to be restarted to start using the HBase Indexer.

Configure hbase-indexer-site.xml

The first step to configuring the HBase Indexer is to define the ZooKeeper connect string and quorum location in hbase-indexer-site.xml, found in /opt/$lucidworks-hdpsearch/hbase-indexer/conf.

The properties to define are the hbaseindexer.zookeeper.connectstring and hbaseindexer.zookeeper.quorum as in this example:

<configuration>
   <property>
      <name>hbaseindexer.zookeeper.connectstring</name>
      <value>server1:2181,server2:2181,server3:2181</value>
   </property>
   <property>
      <name>hbase.zookeeper.quorum</name>
      <value>server1,server2,server3</value>
   </property>
</configuration>

Customize the server names and ports as appropriate for your environment.

Configure hbase-site.xml

The hbase-site.xml file on each HBase node needs to be modified to include several new properties, as shown below:

<configuration>
   <!-- SEP is basically replication, so enable it -->
   <property>
      <name>hbase.replication</name>
      <value>true</value>
   </property>
   <property>
      <name>replication.source.ratio</name>
      <value>1.0</value>
   </property>
   <property>
      <name>replication.source.nb.capacity</name>
      <value>1000</value>
   </property>
   <property>
      <name>replication.replicationsource.implementation</name>
      <value>com.ngdata.sep.impl.SepReplicationSource</value>
   </property>
</configuration>

In more detail, these properties are:

  • hbase.replication: True or false; enables replication.

  • replication.source.ratio: Source ratio of 100% makes sure that each SEP consumer is actually used (otherwise, some can sit idle, especially with small clusters).

  • replication.source.ratio: The ratio of the number of slave cluster regionservers the master cluster regionserver will connect to for updates. This should be set to 1.0 to get updates from all slaves.

  • replication.source.nb.capacity: The maximum number of hlog entries to replicate in one go. If this is large, and a consumer takes a while to process the events, the HBase rpc call will time out.

  • replication.replicationsource.implementation: A custom replication source for indexing content to Solr.

Once these values have been inserted to hbase-site.xml, HBase will need to be restarted. That is an upcoming step in the configuration.

If you use Ambari to manage your cluster, you can set these properties by going to the HBase Configs screen at Ambari → Configs → Advanced tab → Custom hbase-site.

Copy hbase-site.xml to HBase Indexer

Once you have added the new properties to hbase-site.xml, copy it to the hbase-indexer conf directory. In many cases, this is from /etc/hbase/conf to hbase-indexer/conf.

Copying this file ensures all of the parameters configured for HBase (such as settings for Kerberos and ZooKeeper) are available to the HBase Indexer.

Copy JAR Files

The SEP replication being used by HBase Indexer requires 4 .jar files to be copied from the HBase Indexer distribution to each HBase node.

These .jar files can be found in the hbase-indexer/lib directory.

They need to be copied to the $HBASE_HOME/lib directory on each node running HBase. These files are:

  • hbase-sep-api-2.2.8.jar

  • hbase-sep-impl-2.2.8.jar

  • hbase-sep-impl-common-2.2.8.jar

  • hbase-sep-tools-2.2.8.jar

Enable Kerberos Support

If you want to index content to a Solr cluster that has been secured with Kerberos for internode communication, you will need to apply additional configuration.

A JAAS file configures the authentication properties, and will include a section for a service principal and keytab file for a user who has access to both HBase and Solr. This user should be a different user than the service principal that Solr is using for internode communication.

Kerberos Parameters

To configure HBase Indexer to be able to write to Kerberized Solr, you will modify the hbase-indexer script found in /opt/$lucidworks-hdpsearch/hbase-indexer/bin.

Find the section where two of the properties are commented out by default:

#HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.file="
#HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.appname="

Uncomment those properties to supply the correct values (explained below), and then add another property:

HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Djava.security.auth.login.config="

The three together should look similar to this:

HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.file="
HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Djava.security.auth.login.config="
HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.appname="

Remove the # to uncomment existing lines, and supply values for each property:

-Dlww.jaas.file

The full path to a JAAS configuration file that includes a section to define the keytab location and service principal that will be used to run the HBase Indexer. This user must have access to both HBase and Solr, but should be a different user.

-Djava.security.auth.login.config

The path to the JAAS file which includes a section for the HBase user. This can be the same file that contains the definitions for the HBase Indexer user, but must be defined separately.

-Dlww.jaas.appname

The name of the section in the JAAS file that includes the service principal and keytab location for the user who will run the HBase Indexer, as shown below. If this is not defined, a default of "Client" will be used.

Sample JAAS File

Here is a sample JAAS file, and the areas that must be changed for your environment:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/hbase.keytab" (2)
  storeKey=true
  useTicketCache=false
  debug=true
  principal="hbase@SOLRSERVER.COM"; (3)
};
SolrClient { (4)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (5)
  storeKey=true
  useTicketCache=false
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (6)
};
1 The name of the section of the JAAS file for the HBase user. This first section named "Client" should contain the proper credentials for HBase and ZooKeeper.
2 The path to the keyTab file for the HBase user. The user running the HBase Indexer must have access to this file.
3 The service principal name for the HBase user.
4 The name of the section for the user who will run the HBase Indexer. This second section is named "SolrClient" and should contain the proper credentials for the user who will run the HBase Indexer. This section name will be used with the -Dlww.jaas.appname parameter as described earlier.
5 The path to the keyTab file for the Solr user. The user running the HBase Indexer must have access to this file.
6 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and HBase.

Restart HBase

Once each of the above changes have been made, restart HBase on all nodes.

Start the HBase Indexer Daemon

When configuration is complete and HBase has been restarted, you can start the HBase Indexer daemon.

From hbase-indexer/bin, run:

hbase-indexer server &

The & portion of this command will run the server in the background. If you would like to run it in the foreground, omit the & part of the above example.

At this point the HBase Indexer daemon is running, you are ready to configure an indexer to start indexing content.

Stream Data from HBase Indexer to Solr

Once the HBase configuration files have been updated, HBase has been restarted on all nodes, and the daemon started, the next step is to create an indexer to stream data from a specific HBase table to Solr.

The HBase table that will be indexed must have the REPLICATION_SCOPE set to "1".

If you are not familiar with HBase, the HBase Indexer tutorial provides a good introduction to creating a simple table and indexing it to Solr.

Add an Indexer

In order to process HBase events, an indexer must be created.

First you need to create an indexer configuration file, which is a simple XML file to tell the HBase Indexer how to map HBase columns to Solr fields, For example:

<?xml version="1.0"?>
<indexer table="indexdemo-user">
   <field name="firstname_s" value="info:firstname"/>
   <field name="lastname_s" value="info:lastname"/>
   <field name="age_i" value="info:age" type="int"/>
</indexer>

Note that this defines the Solr field name, then the HBase column, and optionally, the field type. The indexer-table value must also reflect the name of the table you intend to index.

More details on the indexer configuration options are available from https://github.com/NGDATA/hbase-indexer/wiki/Indexer-configuration.

Start an Indexer Process

Once we have an indexer configuration file, we can then start the indexer itself with the add-indexer command.

This command takes several properties, as in this example:

./hbase-indexer add-indexer \ (1)
  -n myindexer \ (2)
  -c indexer-conf.xml \ (3)
  -cp solr.zk=server1:3181,server2:3181,server3:3181/solr \ (4)
  -cp solr.collection=myCollection (5)
  -z server1:3181,server2:3181,server3:3181 (6)
1 The hbase-indexer script is found in /opt/$lucidworks-hdpsearch/hbase-indexer/bin directory. The add-indexer command adds the indexer to the running hbase-indexer daemon.
2 The -n property provides a name for the indexer.
3 The -c property defines the location of the indexer configuration file. Provide the path as well as the filename if you are launching the
4 The -cp property allows you to provide a key-value pair. In this case, we use the solr.zk property to define the location of the ZooKeeper ensemble used with Solr. The ZooKeeper connect string should include /solr as the znode path.
5 Another -cp key-value pair, which defines the solr.collection property with a value of a collection name in Solr that the documents should be indexed to. This collection must exist prior to running the indexer.
6 The -z property defines the location of the ZooKeeper ensemble used with HBase. This may be the same as was defined in item (4) above, but needs to be additionally defined.

More details on the options for add-indexer are available from https://github.com/NGDATA/hbase-indexer/wiki/CLI-tools.

Storm Bolt

When you want to integrate Solr with Storm, Lucidworks has provided more than a bolt, but instead a framework for building a custom topology.

The storm-solr Repository

Lucidworks has provided a toolset for creating a custom Apache Storm topology to simplify integrating Storm and Solr. This toolset helps to resolve several questions that you should address during development.

We have provided a clone of the storm-solr Github repository found at https://github.com/Lucidworks/storm-solr.

You should update this repository on your local machine with a simple git pull request.

Build the Jar

First, build the Jar for this project:

mvn clean package

This will generate the target/{jarfilename} that contains the Storm JAR needed to deploy a topology.

Packaging and Running a Storm Topology

To begin, let’s understand how to run a topology in Storm. Effectively, there are two basic modes of running a Storm topology: local and cluster mode. Local mode is great for testing your topology locally before pushing it out to a remote Storm cluster, such as staging or production. For starters, you need to compile and package your code and all of its dependencies into a single JAR with an main class that runs your topology.

For this project, the Maven Shade plugin is used to create a unified JAR with dependencies. The benefit of the Shade plugin is that it can relocate classes into different packages at the byte-code level to avoid dependency conflicts. This comes in quite handy if your application depends on 3rd party libraries that conflict with classes on the Storm classpath. You can look at the project pom.xml file for specific details about I use the Shade plugin. For now, let it suffice to say that the project makes it very easy to build a Storm JAR for your application. Once you have a unified JAR ({jarfilename}), you’re ready to run your topology in Storm.

To run locally, you need to have some code that starts up a local cluster in the JVM, submits your topology, and then waits for some configurable amount of time before shutting it all back down, as shown in the following code:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, stormConf, topologyFactory.build(this));
try {
   Thread.sleep(localRunSecs * 1000);
} catch (InterruptedException ie) { ... }
cluster.killTopology(topologyName);
cluster.shutdown();

However, if you want to run your topology in a remote cluster, such as production, you need to do something like:

StormSubmitter.submitTopology(topologyName, stormConf, topologyFactory.build(this));

Consequently, the first thing you’ll need is some way to decide whether to run locally or submit to a remote cluster using some command-line switch when invoking your application. You’ll also need some way to pass along environment specific configuration options. For instance, the Solr you want to index into will be different between local, staging, and production. Let it suffice to say that launching and configuring a Storm topology ends up requiring a fair amount of common boilerplate code. To address this need, our project provides a com.lucidworks.storm.StreamingApp class that:

  • Separates the process of defining a Storm topology from the process of running a Storm topology in different environments. This lets you focus on defining a topology for your specific requirements.

  • Provides a clean mechanism for separating environment-specific configuration settings.

  • Minimizes duplicated boilerplate code when developing multiple topologies and gives you a common place to insert reusable logic needed for all of your topologies.

To use StreamingApp, you simply need to implement the StormTopologyFactory interface, which defines the spouts and bolts in your topology:

public interface StormTopologyFactory {
   String getName();

   StormTopology build(StreamingApp app) throws Exception;
}

Developing a Storm Topology

Let’s look at an example that indexes tweets from Twitter into SolrCloud.

TwitterToSolrTopology

class TwitterToSolrTopology implements StormTopologyFactory {
   static final Fields spoutFields = new Fields("id", "tweet")

   String getName() { return "twitter-to-solr" }

   StormTopology build(StreamingApp app) throws Exception {
      // setup spout and bolts for accessing Spring-managed POJOs at runtime
      SpringSpout twitterSpout = new SpringSpout("twitterDataProvider", spoutFields);
      SpringBolt solrBolt = new SpringBolt("solrBoltAction", app.tickRate("solrBolt"));

      // wire up the topology to read tweets and send to Solr
      TopologyBuilder builder = new TopologyBuilder()
      builder.setSpout("twitterSpout", twitterSpout, app.parallelism("twitterSpout"))
      builder.setBolt("solrBolt", solrBolt, numShards).customGrouping("twitterSpout", shardGrouping)

   return builder.createTopology()
  }
}

A couple of things should stand out to you in this listing. First, there’s no command-line parsing, environment-specific configuration handling, or any code related to running this topology. All that is here is code defining a StormTopology.

Second, the code is quite easy to understand because it only does one thing.

Lastly, this class is written in Groovy instead of Java, which helps keep things nice and tidy. Of course if you don’t want to use Groovy, you can use Java, as the framework supports both seamlessly.

We’ll get into the specific details of the implementation shortly, but first, let’s see how to run the TwitterToSolrTopology using the StreamingApp framework. For local mode, you would do:

java -classpath $STORM_HOME/lib/*:target/{jarfilename} com.lucidworks.storm.StreamingApp \
   example.twitter.TwitterToSolrTopology -localRunSecs 90

The command above will run the TwitterToSolrTopology for 90 seconds on your local workstation and then shutdown. All the setup work is provided by the StreamingApp class.

To submit to a remote cluster, you would do:

$STORM_HOME/bin/storm jar target/{jarfilename} com.lucidworks.storm.StreamingApp \
   example.twitter.TwitterToSolrTopology -env staging

Notice that we use the -env flag to indicate running in a staging environment. It’s common to need to run a Storm topology in different environments, such as test, staging, and production, so it’s built into the StreamingApp framework.

SpringBolt

The com.lucidworks.storm.spring.SpringBolt class allows you to implement your bolt logic as a simple Spring-managed POJO. In the example above, the SpringBolt class delegates message processing logic to a Spring-managed bean with id solrBoltAction. The solrBoltAction bean is defined in the Spring container configuration file resources/spring.xml as:

  <bean id="solrBoltAction" class="com.lucidworks.storm.solr.SolrBoltAction">
    <property name="batchSize" value="100"/>
    <property name="bufferTimeoutMs" value="1000"/>
  </bean>

The SpringBolt framework provides clean separation of concerns and allows you to leverage the full power of the Spring framework for developing your Storm topology. Moreover, this approach makes it easier to test your bolt action logic in JUnit outside of the Storm framework.

The SolrBoltAction bean also depends on an instance of the CloudSolrClient class from SolrJ to be auto-wired by the Spring framework:

 <bean id="cloudSolrClient" class="shaded.apache.solr.client.solrj.impl.CloudSolrClient">
   <constructor-arg index="0" value="${zkHost}"/>
   <property name="defaultCollection" value="${defaultCollection}"/>
</bean>

The zkHost and defaultCollection properties are defined in resources/Config.groovy

SpringSpout

In Storm, a spout produces a stream of tuples. The TwitterToSolrTopology example uses an instance of SpringSpout and a Twitter data provider to stream tweets into the topology:

SpringSpout twitterSpout = new SpringSpout("twitterDataProvider", spoutFields);

SpringSpout allows you to focus on the application-specific logic needed to generate data without having to worry about Storm specific implementation details. As you might have guessed, the data provider is a Spring-managed POJO that implements the StreamingDataProvider interface:

public interface StreamingDataProvider {
   void open(Map stormConf);

 boolean next(NamedValues record) throws Exception;
}

Take a look at the TwitterDataProvider implementation provided in the project as a starting point for implementing a Spring-managed bean for your topology.

Unit Testing

When writing a unit test, you don’t want to have to spin up a Storm cluster to test application-specific logic that doesn’t depend on Storm. Recall that one of the benefits of using this framework is that it separates business logic from Storm boilerplate code.

Let’s look at some code from the unit test for our SolrBoltAction implementation.

@Test
public void testBoltAction() throws Exception {
  // Spring @Autowired property at runtime
  SolrBoltAction sba = new SolrBoltAction(cloudSolrServer);
  sba.setMaxBufferSize(1); // to avoid buffering docs

  // Mock the Storm tuple
  String docId = "1";
  TestDoc testDoc = new TestDoc(docId, "foo", 10);
  Tuple mockTuple = mock(Tuple.class);
  when(mockTuple.size()).thenReturn(2);
  when(mockTuple.getString(0)).thenReturn(docId);
  when(mockTuple.getValue(1)).thenReturn(testDoc);
  SpringBolt.ExecuteResult result = sba.execute(mockTuple, null);
  assertTrue(result == SpringBolt.ExecuteResult.ACK);
  cloudSolrServer.commit();
  ...
}

The first thing to notice is the unit test doesn’t need a Storm cluster to run. This makes tests run quickly and helps isolate bugs since there are fewer runtime dependencies in this test.

It’s also important to notice that the SolrBoltAction implementation is not running in a Spring-managed container in this unit test. We’re just creating the instance directly using the new operator. This is good test design as well since you don’t want to create a Spring container for every unit test and testing the Spring configuration is not the responsibility of this particular unit test.

The unit test is also using Mockito to mock the Storm Tuple object that is passed into SolrBoltAction. Mockito makes it easy to mock complex objects in a unit test.

The key take-away here is that the unit test focuses on verifying the SolrBoltAction implementation without having to worry about Storm or Spring.

Environment-specific Configuration

Commonly, you will need to manage configuration settings for different environments. For instance, we’ll need to index into a different SolrCloud cluster for staging and production. To address this need, the Spring-driven framework allows you to keep all environment-specific configuration properties in the same configuration file: Config.groovy.

Don’t worry if you don’t know Groovy, the syntax of the Config.groovy file is easy to understand and allows you to cleanly separate properties for the following environments: test, dev, staging, and production. This approach allows you to run the topology in multiple environments using a simple command-line switch, -env, to specify the environment settings that should be applied.

Here’s an example of Config.groovy that shows how to organize properties for the test, development, staging, and production environments:

environments {

 twitterSpout.parallelism = 1
 csvParserBolt.parallelism = 2
 solrBolt.tickRate = 5

 maxPendingMessages = -1

 test {
   env.name = "test"
 }

 development {
   env.name = "development"

   spring.zkHost = "localhost:9983"
   spring.defaultCollection = "gettingstarted"
   spring.fieldGuessingEnabled = true

   spring.fs.defaultFS = "hdfs://localhost:9000"
   spring.hdfsDirPath = "/user/timpotter/csv_files"
   spring.hdfsGlobFilter = "*.csv"
 }

 staging {
   env.name = "staging"

   spring.zkHost = "zkhost:2181"
   spring.defaultCollection = "staging_collection"
   spring.fieldGuessingEnabled = false
 }

 production {
   env.name = "production"

   spring.zkHost = "zkhost1:2181,zkhost2:2181,zkhost3:2181"
   spring.defaultCollection = "prod_collection"
   spring.fieldGuessingEnabled = false
 }
}

Notice that all dynamic variables used in the resources/storm-solr-spring.xml must be prefixed with "spring." in Config.groovy. For instance, the ${zkHost} setting in storm-solr-spring.xml resolves to the spring.zkHost property in Config.groovy.

You can also configure all Storm-topology related properties in the Config.groovy file. For instance, if you need to change the topology.max.task.parallelism property for your topology, you can set that in Config.groovy.

When adapting the project to your own requirements, the easiest approach is to update resources/Config.groovy with the configuration settings for each of your environments and then rebuild the Job JAR.

However, you can also specify a different Config.groovy file by using the -config command-line option when deploying the topology, such as:

$STORM_HOME/bin/storm jar target/${jarfilename} com.lucidworks.storm.StreamingApp \
   example.twitter.TwitterToSolrTopology -env staging -config MyConfig.groovy

Metrics

Storm provides high-level metrics for bolts and spouts, but if you need more visibility into the inner workings of your application-specific logic, then it’s common to use the Java metrics library, such as: https://dropwizard.github.io/metrics/3.1.0/. Fortunately, there are open source options for integrating metrics with Spring, see: https://github.com/ryantenney/metrics-spring.

The Spring context configuration file resources/storm-solr-spring.xml comes pre-configured with all the infrastructure needed to inject metrics into your bean implementations:

<metrics:metric-registry id="metrics"/>
<metrics:annotation-driven metric-registry="metrics"/>
<metrics:reporter type="slf4j" metric-registry="metrics" period="1m"/>

By default, the project is configured to log metrics once a minute to the Storm log using the slf4j reporter.

When implementing your StreamingDataAction (bolt) or StreamingDataProvider (spout), you can have Spring auto-wire metrics objects using the @Metric annotation when declaring metrics-related member variables. For instance, the SolrBoltAction class uses a Timer to track how long it takes to send batches to Solr:

@Metric
public Timer sendBatchToSolr;

The SolrBoltAction class provides several examples of how to use metrics in your bean implementations.

Before moving on to some Solr specific features in the framework, it’s important to remember one more point. The example Twitter topology we’ve been working with in this blog is quite trivial. In practice, most topologies are more complex and have many spouts and bolts, typically written by multiple developers. Moreover, topologies tend to evolve over time to incorporate data from new systems and requirements. Using this framework will help you craft complex topologies in a simple, maintainable fashion.

Field Mapping

The SolrBoltAction bean takes care of sending documents to SolrCloud in an efficient manner, but it only works with SolrInputDocument objects from SolrJ. It’s unlikely that your Storm topology will be working with SolrInputDocument objects natively, so the SolrBoltAction bean delegates mapping of input Tuples to SolrInputDocument objects to a Spring-managed bean that implements the com.lucidworks.storm.solr.SolrInputDocumentMapper interface. This fits nicely with our design approach of separating concerns in our topology.

The default implementation provided in the project (DefaultSolrInputDocumentMapper) uses Java reflection to read data from a Java object to populate the fields of the SolrInputDocument. In the Twitter example, the default implementation uses Java reflection to read data from a Twitter4J Status object to populate dynamic fields on a SolrInputDocument instance.

When using the default mapper, you must have dynamic fields enabled in your Solr schema.xml or have Solr’s field guessing feature enabled for your collection, which is enabled by default for the data_driven_schema_configs configset. The default mapper bean is defined in the resources/storm-solr-spring.xml file as:

  <bean id="solrInputDocumentMapper"
        class="com.lucidworks.storm.solr.DefaultSolrInputDocumentMapper">
    <property name="fieldGuessingEnabled" value="${fieldGuessingEnabled}"/>
  </bean>

As discussed above, the ${fieldGuessingEnabled} variable will be resolved from the Config.groovy configuration file at runtime.

It should be clear, however, that you can inject your own SolrInputDocumentMapper implementation into the bolt bean using Spring if the default implementation does not meet your needs.

Working on a Kerberized Environment

The HdfsFileSystemProvider bean needs the Kerberos credentials (keytab and principal). By default the authentication is set to SIMPLE.

  <bean id="hdfsFileSystemProvider" class="com.lucidworks.storm.example.hdfs.HdfsFileSystemProvider">
    <property name="hdfsConfig">
      <map>
        <entry key="fs.defaultFS" value="${fs.defaultFS:}"/>
        <entry key="hdfs.keytab.file" value="${hdfs.keytab.file:}"/>
        <entry key="hdfs.kerberos.principal" value="${hdfs.kerberos.principal:}"/>
        <entry key="hadoop.security.authentication" value="${hadoop.security.authentication:SIMPLE}"/>
      </map>
    </property>
  </bean>

The SolrSecurity bean needs the full path of jaas-client.conf (see https://cwiki.apache.org/confluence/display/solr/Security). By default, the file is not set and no authentication will be performed.

  <bean id="solrSecurity" class="com.lucidworks.storm.utils.SolrSecurity" init-method="setConfigigurer">
     <property name="solrJaasFile" value="${solrJaasFile:}"/>
     <property name="solrJaasAppName" value="${solrJaasAppName:}"/>
  </bean>

Environment-Specific Configuration Example

All the properties for the kerberized environment are optional.

  production {
    env.name = "production"

    spring.zkHost = "host1:2181,host2:2181,host3:2181/solr"
    spring.defaultCollection = "storm-collection"
    spring.fieldGuessingEnabled = false

    spring.maxBufferSize = 100
    spring.bufferTimeoutMs = 500

    spring.fs.defaultFS = "hdfs://namenode:port"
    spring.hdfsDirPath = "/path/to/dataset"
    spring.hdfsGlobFilter = "*.csv"

    spring.hdfs.keytab.file = "hdfs.keytab"
    spring.hdfs.kerberos.principal = "storm"
    spring.hadoop.security.authentication = "KERBEROS"

    spring.solrJaasFile = "/path/to/jaas-client.conf"
    spring.solrJaasAppName = "Client"
  }

Spark RDD

The Spark RDD provides a set of tools for reading data from Solr and indexing objects from Spark into Solr using SolrJ (a Java client for Solr).

The spark-solr Repository

Lucidworks has provided an SDK for creating a custom Apache Spark application to simplify integrating Spark and Solr.

We have provided a clone of the spark-solr Github repository found at https://github.com/Lucidworks/spark-solr.

You should update this repository on your local machine with a simple git pull request.

Build from Source

mvn clean package -DskipTests

This will build 2 jars in the target directory:

  • spark-solr-${VERSION}.jar

  • spark-solr-${VERSION}-shaded.jar

${VERSION} will be something like 2.1.0-SNAPSHOT, for development builds.

The first .jar is what you’d want to use if you were using spark-solr in your own project. The second is what you’d use to submit one of the included example apps to Spark.

Getting started

Import jar File via spark-shell

cd $SPARK_HOME
./bin/spark-shell --jars spark-solr-3.0.0-alpha.2-shaded.jar

The shaded jar can be downloaded from the Maven Central or built from the respective branch

Connect to your SolrCloud Instance

via DataFrame

val options = Map(
  "collection" -> "{solr_collection_name}",
  "zkhost" -> "{zk_connect_string}"
)
val df = spark.read.format("solr")
  .options(options)
  .load

via RDD

import com.lucidworks.spark.rdd.SolrRDD
val solrRDD = new SolrRDD(zkHost, collectionName, sc)

SolrRDD is an RDD of SolrDocument

via RDD (Java)

import com.lucidworks.spark.rdd.SolrJavaRDD;
import org.apache.spark.api.java.JavaRDD;

SolrJavaRDD solrRDD = SolrJavaRDD.get(zkHost, collection, jsc.sc());
JavaRDD<SolrDocument> resultsRDD = solrRDD.queryShards(solrQuery);

Features

  • Send objects from a Spark (Streaming or DataFrames) into Solr.

  • Read the results from a Solr query as a Spark RDD or DataFrame.

  • Stream documents from Solr using /export handler (only works for exporting fields that have docValues enabled).

  • Read large result sets from Solr using cursors or with /export handler.

  • Data locality. If Spark workers and Solr processes are co-located on the same nodes, the partitions are placed on the nodes where the replicas are located.

Querying

Cursors

Cursors are used by default to pull documents out of Solr. By default, the number of tasks allocated will be the number of shards available for the collection.

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. A good candidate for the split field is the version field that is attached to every document by the shard leader during indexing. See [splits] section to enable and configure intra shard splitting.

Cursors won’t work if the index changes during the query time. Constrain your query to a static index by using additional Solr parameters using [solr.params].

Streaming API (/export)

If the fields that are being queried have docValues enabled, then the Streaming API can be used to pull documents from Solr in a true Streaming fashion. This method is 8-10x faster than Cursors. The option [request_handler] allows you to enable Streaming API via DataFrame.

Indexing

Objects can be sent to Solr via Spark Streaming or DataFrames. The schema is inferred from the DataFrame and any fields that do not exist in Solr schema will be added via Schema API. See ManagedIndexSchemaFactory.

See Index parameters for configuration and tuning.

Configuration and Tuning

The Solr DataSource supports a number of optional parameters that allow you to optimize performance when reading data from Solr. The only required parameters for the DataSource are zkhost and collection.

Query Parameters

query

Probably the most obvious option is to specify a Solr query that limits the rows you want to load into Spark. For instance, if we only wanted to load documents that mention "solr", we would do:

Usage: option("query","body_t:solr")

Default: *:*

If you don’t specify the "query" option, then all rows are read using the "match all documents" query (*:*).

fields

You can use the fields option to specify a subset of fields to retrieve for each document in your results:

Usage: option("fields","id,author_s,favorited_b,…​")

By default, all fields for each document are pulled back from Solr.

rows

You can use the rows option to specify the number of rows to retrieve from Solr per request. Behind the scenes, the implementation uses either deep paging cursors or Streaming API and response streaming, so it is usually safe to specify a large number of rows.

To be clear, this is not the maximum number of rows to read from Solr. All matching rows on the backend are read. The rows parameter is the page size.

By default, the implementation uses 1000 rows but if your documents are smaller, you can increase this to 10000. Using too large a value can put pressure on the Solr JVM’s garbage collector.

Usage: option("rows","10000") Default: 1000

request_handler

Set the Solr request handler for queries. This option can be used to export results from Solr via /export handler which streams data out of Solr. See Exporting Result Sets for more information.

The /export handler needs fields to be explicitly specified. Please use the fields option or specify the fields in the query.

Usage: option("request_handler", "/export") Default: /select

Increase Read Parallelism using Intra-Shard Splits

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. The sub range splitting enables faster fetching from Solr by increasing the number of tasks in Solr. This should only be used if there are enough computing resources in the Spark cluster.

Shard splitting is disabled by default.

splits

Enable shard splitting on default field _version_.

Usage: option("splits", "true")

Default: false

The above option is equivalent to option("split_field", "_version_")

split_field

The field to split on can be changed using split_field option.

Usage: option("split_field", "id") Default: _version_

splits_per_shard

Behind the scenes, the DataSource implementation tries to split the shard into evenly sized splits using filter queries. You can also split on a string-based keyword field but it should have sufficient variance in the values to allow for creating enough splits to be useful. In other words, if your Spark cluster can handle 10 splits per shard, but there are only 3 unique values in a keyword field, then you will only get 3 splits.

Keep in mind that this is only a hint to the split calculator and you may end up with a slightly different number of splits than what was requested.

Usage: option("splits_per_shard", "30")

Default: 20

flatten_multivalued

This option is enabled by default and flattens multi valued fields from Solr.

Usage: option("flatten_multivalued", "false")

Default: true

dv

The dv option will fetch the docValues that are indexed but not stored by using function queries. Should be used for Solr versions lower than 5.5.0.

Usage: option("dv", "true")

Default: false

Index parameters

soft_commit_secs

If specified, the soft_commit_secs option will be set via SolrConfig API during indexing

Usage: option("soft_commit_secs", "10")

Default: None

batch_size

The batch_size option determines the number of documents that are sent to Solr via a HTTP call during indexing. Set this option higher if the docs are small and memory is available.

Usage: option("batch_size", "10000")

Default: 500

gen_uniq_key

If the documents are missing the unique key (derived from Solr schema), then the gen_uniq_key option will generate a unique value for each document before indexing to Solr. Instead of this option, the UUIDUpdateProcessorFactory can be used to generate UUID values for documents that are missing the unique key field

Usage: option("gen_uniq_key", "true")

Default: false

sample_seed

The sample_seed option allows you to read a random sample of documents from Solr using the specified seed. This option can be useful if you just need to explore the data before performing operations on the full result set. By default, if this option is provided, a 10% sample size is read from Solr, but you can use the sample_pct option to control the sample size.

Usage: option("sample_seed", "5150")

Default: None

sample_pct

The sample_pct option allows you to set the size of a random sample of documents from Solr; use a value between 0 and 1.

Usage: option("sample_pct", "0.05")

Default: 0.1

solr.params

The solr.params option can be used to specify any arbitrary Solr parameters in the form of a Solr query.

Usage: option("solr.params", "fq=userId:[10 TO 1000]&sort=userId desc")

Querying Time Series Data

partition_by

Set this option as time, in order to query mutiple time series collections, partitioned according to some time period

Usage: option("partition_by", "time")

Default:none

time_period

This is of the form X DAYS/HOURS/MINUTES.This should be the time period with which the partitions are created.

Usage: option("time_period", "1MINUTES")

Default: 1DAYS

datetime_pattern

This pattern can be inferred from time_period. But this option can be used to explicitly specify.

Usage: option("datetime_pattern", "yyyy_MM_dd_HH_mm")

Default: yyyy_MM_dd

time_stamp_field_name

This option is used to specify the field name in the indexed documents where time stamp is found.

Usage: option("time_stamp_field_name", "ts")

Default: timestamp_tdt

timezone_id

Used to specify the timezone.

Usage: option("timezone_id", "IST")

Default: UTC

max_active_partitions

This option is used to specify the maximum number of partitions that must be allowed at a time.

Usage: option("max_active_partitions", "100")

Default: null

Troubleshooting Tips

Why is dataFrame.count so slow?

Solr can provide the number of matching documents nearly instantly, so why is calling count on a DataFrame backed by a Solr query so slow? The reason is that Spark likes to read all rows before performing any operations on a DataFrame. So when you ask SparkSQL to count the rows in a DataFrame, spark-solr has to read all matching documents from Solr and then count the rows in the RDD.

If you’re just exploring a Solr collection from Spark and need to know the number of matching rows for a query, you can use SolrQuerySupport.getNumDocsFromSolr utility function.

I set rows to 10 and now my job takes forever to read 10 rows from Solr!

The rows option sets the page size, but all matching rows are read from Solr for every query. So if your query matches many documents in Solr, then Spark is reading them all 10 docs per request.

Use the sample_seed option to limit the size of the results returned from Solr.

Developing a Spark Application

The com.lucidworks.spark.SparkApp provides a simple framework for implementing Spark applications in Java. The class saves you from having to duplicate boilerplate code needed to run a Spark application, giving you more time to focus on the business logic of your application.

To leverage this framework, you need to develop a concrete class that either implements RDDProcessor or extends StreamProcessor depending on the type of application you’re developing.

RDDProcessor

Implement the com.lucidworks.spark.SparkApp$RDDProcessor interface for building a Spark application that operates on a JavaRDD, such as one pulled from a Solr query (see SolrQueryProcessor as an example).

StreamProcessor

Extend the com.lucidworks.spark.SparkApp$StreamProcessor abstract class to build a Spark streaming application.

See com.lucidworks.spark.example.streaming.oneusagov.OneUsaGovStreamProcessor or com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor for examples of how to write a StreamProcessor.

Authenticating with Kerberized Solr

For background on Solr security, see: https://cwiki.apache.org/confluence/display/solr/Security.

The SparkApp framework allows you to pass the path to a JAAS authentication configuration file using the -solrJaasAuthConfig option.

For example, if you need to authenticate using the "solr" Kerberos principal, you need to create a JAAS configuration file named jaas-client.conf that sets the location of your Kerberos keytab file, such as:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/keytabs/solr.keytab"
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr";
};

To use this configuration to authenticate to Solr, you simply need to pass the path to jaas-client.conf created above using the -solrJaasAuthConfig option, such as:

spark-submit --master yarn-server \
  --class com.lucidworks.spark.SparkApp \
  $SPARK_SOLR_PROJECT/target/spark-solr-${VERSION}-shaded.jar \
  hdfs-to-solr -zkHost $ZK -collection spark-hdfs \
  -hdfsPath /user/spark/testdata/syn_sample_50k \
  -solrJaasAuthConfig=/path/to/jaas-client.conf