Kerberos Support

Each of the connectors support indexing in a cluster running Kerberos security, and Solr can authenticate with a Kerberos-secured cluster.

Kerberos support is available in several different ways:

  • Index content from Hadoop components such as the filesystem, Hive, or Pig secured with Kerberos.

  • Store Solr indexes in a Hadoop filesystem secured with Kerberos.

  • Secure the Solr cluster with Kerberos for all internode communication and access to the Admin UI.

Each approach requires configuration, outlined in the sections below.

Index Content from a Kerberos-Secured Cluster

If Solr is not secured with Kerberos for internode communication, indexing content from a Kerberos-secured cluster is as simple as using kinit to get a ticket for the service principal (user) who will be running the connector.

For example, if using the pig user to use a Pig script while indexing, you should get a Kerberos ticket for the pig user to allow it to authenticate.

To get the ticket, simply issue this command:

To verify that you have a ticket for the user, you can use the klist command. Here is a sample output from that command:

Ticket cache: FILE:lucid.cache
Default principal: user@HADOOP-VM1

Valid starting    Expires           Service principal
04/08/1513:49:06  04/09/1513:49:06  krbtgt/HADOOP-VM1@HADOOP-VM1
      renew until 04/08/1513:49:06

Once you have a ticket, you can use the connector as normal. No additional authentication or parameters are required.

Store Solr Indexes in Kerberos-Secured HDFS

When storing your Solr indexes in HDFS when the Hadoop cluster is secured with Kerberos, you will need to configure Solr with the keytab location and service principal to use for communication with the Hadoop cluster.

See the section Kerberos Support in the HDP Search Installation Guide for more information.

Secure Solr with Kerberos

The Apache Solr Reference Guide includes a section on how to use Kerberos authentication with Solr: Kerberos Authentication Plugin.

When Solr is secured with Kerberos for internode communication and access to the Admin UI, you will need to create a JAAS file and service principal to allow the Hadoop components to write to the secured Solr indexes. How to reference the JAAS file is covered in the documentation for each connector. This file must be copied to each NodeManager node in the cluster (i.e., every node where map/reduce tasks are executed).

Hadoop Job Jar

The Lucidworks Hadoop Job Jar allows indexing content from HDFS to Solr. This section describes the Job Jar and how to use it.

Hadoop Job Jar Overview

The Job Jar is a connector of sorts, allowing you to index many different types of content stored in HDFS. It uses MapReduce to leverage the scaling qualities of Apache Hadoop while indexing content to Solr. It can be run from any location.

The Job Jar makes use of ingest mappers to parse documents and prepare them to be added to a Solr index. The selection of the ingest mapper is one of the arguments provided to the job when it’s started. Details of the available ingest mappers are included below.

How it Works

The job jar runs a series of MapReduce-enabled jobs to convert raw content into documents for indexing to Solr.

You must use the job jar with a user that has permissions to write to the hadoop.tmp.dir. The /tmp directory in HDFS must also be writable.

The Hadoop job jar works in three stages designed to take in raw content and output results to Solr. These stages are:

  1. Create one or more SequenceFiles from the raw content. This is done in one of two ways:

    1. If the source files are available in a shared Hadoop filesystem, prepare a list of source files and their locations as a SequenceFile. The raw contents of each file are not processed until step 2.

    2. If the source files are not available, prepare a list of source files and the raw content. This process is done sequentially and can take a significant amount of time if there are a large number of documents and/or if they are very large.

  2. Run a MapReduce job to extract text and metadata from the raw content.

    • This process uses ingest mappers to parse documents and prepare them to be added to a Solr index. The section Ingest Mappers below provides a list of available mappers.

    • Apache Tika can also be used for additional document parsing and metadata extraction.

  3. Run a MapReduce job to send the extracted content from HDFS to Solr using the SolrJ client. This implementation works with SolrJ’s CloudServer Java client which is aware of where Solr is running via Zookeeper.

Incremental indexing, where only changes to a document or directory are processed on successive job jar runs, is not supported. All three steps will be completed each time the job jar is run, regardless of whether the original content has changed.

The first step of the process converts the input content into a SequenceFile. In order to do this, the entire contents of that file must be read into memory so that it can be written out as a LWDocument in the SequenceFile. Thus, you should be careful to ensure that the system does not load into memory a file that is larger than the Java heap size of the process.

Ingest Mappers

Ingest mappers in the job jar parse documents and prepare them for indexing to Solr.

There are several available ingest mappers:

  • CSVIngestMapper

  • DirectoryIngestMapper

  • GrokIngestMapper

  • RegexIngestMapper

  • SequenceFileIngestMapper

  • SolrXMLIngestMapper

  • XMLIngestMapper

  • WarcIngestMapper

  • ZipIngestMapper

The ingest mapper is added to the job arguments with the use of the -cls parameter. However, many mappers require additional arguments. Please refer to the the wiki page Ingest Mapper Arguments for each mapper for the required and optional arguments.

Example Arguments

These are two simple examples to demonstrate how to construct the job jar arguments for some content scenarios.

Index CSV files

To index CSV files, you could use the following arguments:

bin/hadoop jar /opt/lucidworks-hdpsearch/job/solr-hadoop-job-3.0.0.jar \ (1)
com.lucidworks.hadoop.ingest.IngestJob \ (2)
-Dlww.commit.on.close=true \ (3)
-DcsvDelimiter=| \ (4)
-cls com.lucidworks.hadoop.ingest.CSVIngestMapper \ (5)
-c gettingstarted \ (6)
-i /data/CSV \ (7)
-of com.lucidworks.hadoop.io.LWMapRedOutputFormat \ (8)
-s http://localhost:8888/solr (9)

To explain in more detail, here is a breakdown of each parameter:

1 Use the hadoop binary with the jar command and supply the path to the solr-hadoop-job-3.0.0.jar.
2 Define the main class which is always com.lucidworks.hadoop.ingest.IngestJob.
3 Commit the documents when finished.
4 Define the delimiter character as a pipe (|).
5 Define the mapper class, in this case -cls com.lucidworks.hadoop.ingest.CSVIngestMapper.
6 Name the collection in Solr to index the documents to.
7 Define the name and path of the file(s) to be indexed. Again, this path should be to the location of the file(s) in HDFS.
8 Define the output format, which is always com.lucidworks.hadoop.io.LWMapRedOutputFormat.
9 Provide the location of Solr. We’re not using SolrCloud, so Solr is found at: -s http://localhost:8888/solr

Index a Directory of Files with SolrCloud

bin/hadoop jar /opt/lucidworks-hdpsearch/job/solr-hadoop-job-3.0.0.jar \ (1)
com.lucidworks.hadoop.ingest.IngestJob \ (2)
-Dlww.commit.on.close=true \ (3)
-cls com.lucidworks.hadoop.ingest.DirectoryIngestMapper \ (4)
-c myCollection \ (5)
-i /data/files \ (6)
-of com.lucidworks.hadoop.io.LWMapRedOutputFormat \ (7)
-zk 10.0.1.7:2181,10.0.1.8:2181,10.0.1.9:2181/solr (8)

In this example, we have defined the job very similarly to the previous example. To step through it line-by-line:

1 Use the hadoop binary with the jar command and supply the path to the solr-hadoop-job-3.0.0.jar.
2 Define the main class which is always com.lucidworks.hadoop.ingest.IngestJob.
3 Commit the documents when finished.
4 Use the Directory Ingest mapper class, which allows the job jar to traverse a directory of files for indexing.
5 Name the myCollection as the collection in Solr where the documents will be indexed.
6 Point the job jar to the input directory (/data/files).
7 Define the output format, which is always com.lucidworks.hadoop.io.LWMapRedOutputFormat.
8 Provide the location of ZooKeeper, because in this case we are using SolrCloud. This uses the -zk parameter to define the ZooKeeper connection string. The host:port locations should be separated by commas, followed by the root directory. In this case, the ZooKeeper root is /solr, but another root directory may have been defined during initial startup of Solr.

Hive SerDe

The Lucidworks Hive SerDe allows reading and writing data to and from Solr using Apache Hive. Data from Solr can be presented as a Hive table to be joined with other Hive tables, and data. Additionally, data from Hive can be inserted into Solr with an INSERT statement. This section describes the Hive SerDe and how to use it.

Hive SerDe Jars

Due to backward compatibility issues between releases of Hive, the Hive SerDe jar can only be used with Hive 0.12, 0.14, and 0.15.

The jar can be found at /opt/lucidworks-hdpsearch/hive/solr-hive-serde-3.0.0.jar. Do not use this jar if you are using Hive 0.13.

Features

  • Index Hive table data to Solr.

  • Read Solr index data to a Hive table.

  • Kerberos support for securing communication between Hive and Solr.

  • As of v2.2.4 of the SerDe, integration with Lucidworks Fusion is supported.

    • Fusion’s index pipelines can be used to index data to Fusion.

    • Fusion’s query pipelines can be used to query Fusion’s Solr instance for data to insert into a Hive table.

Install the SerDe Jar to Hive

In order for Hive to work with Solr, the Hive SerDe jar must be added as a plugin to Hive.

From a Hive prompt, use the ADD JAR command and reference the path and filename of the SerDe jar for your Hive version.

   hive> ADD JAR solr-hive-serde-3.0.0.jar;

This can also be done in your Hive command to create the table, as in the example below.

Indexing Data with a Hive External Table

Indexing data to Solr or Fusion requires creating a Hive external table. An external table allows the data in the table to be used (read or write) by another system or application outside of Hive.

Indexing Data to Solr

For integration with Solr, the external table allows you to have Solr read from and write to Hive.

To create an external table for Solr, you can use a command similar to below. The properties available are described after the example.

hive> CREATE EXTERNAL TABLE solr (id string, field1 string, field2 int)
      STORED BY 'com.lucidworks.hadoop.hive.LWStorageHandler'
      LOCATION '/tmp/solr'
      TBLPROPERTIES('solr.server.url' = 'http://localhost:8888/solr',
                    'solr.collection' = 'collection1',
                    'solr.query' = '*:*');

In this example, we have created an external table named "solr", and defined a custom storage handler (STORED BY 'com.lucidworks.hadoop.hive.LWStorageHandler').

The LOCATION indicates the location in HDFS where the table data will be stored. In this example, we have chosen to use /tmp/solr.

In the section TBLPROPERTIES, we define several properties for Solr so the data can be indexed to the right Solr installation and collection:

solr.zkhost

The location of the ZooKeeper quorum if using LucidWorks in SolrCloud mode. If this property is set along with the solr.server.url property, the solr.server.url property will take precedence.

solr.server.url

The location of the Solr instance if not using LucidWorks in SolrCloud mode. If this property is set along with the solr.zkhost property, this property will take precedence.

solr.collection

The Solr collection for this table. If not defined, an exception will be thrown.

solr.query

The specific Solr query to execute to read this table. If not defined, a default of *:* will be used. This property is not needed when loading data to a table, but is needed when defining the table so Hive can later read the table.

lww.commit.on.close

If true, inserts will be automatically committed when the connection is closed. True is the default.

lww.jaas.file

Used only when indexing to or reading from a Solr cluster secured with Kerberos.

This property defines the path to a JAAS file that contains a service principal and keytab location for a user who is authorized to read from and write to Solr and Hive.

The JAAS configuration file must be copied to the same path on every node where a Node Manager is running (i.e., every node where map/reduce tasks are executed). Here is a sample section of a JAAS file:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (2)
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (3)
};
1 The name of this section of the JAAS file. This name will be used with the lww.jaas.appname parameter.
2 The location of the keytab file.
3 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and Hive.
lww.jaas.appname

Used only when indexing to or reading from a Solr cluster secured with Kerberos.

This property provides the name of the section in the JAAS file that includes the correct service principal and keytab path.

If the table needs to be dropped at a later time, you can use the DROP TABLE command in Hive. This will remove the metadata stored in the table in Hive, but will not modify the underlying data (in this case, the Solr index).

Query and Insert Data to Hive

Once the table is configured, any syntactically correct Hive query will be able to query the index.

For example, to select three fields named "id", "field1", and "field2" from the "solr" table, you would use a query such as:

hive> SELECT id, field1, field2 FROM solr;

Replace the table name as appropriate to use this example with your data.

To join data from tables, you can make a request such as:

hive> SELECT id, field1, field2 FROM solr left
      JOIN sometable right
      WHERE left.id = right.id;

And finally, to insert data to a table, simply use the Solr table as the target for the Hive INSERT statement, such as:

hive> INSERT INTO solr
      SELECT id, field1, field2 FROM sometable;

Example Indexing Hive to Solr

Solr includes a small number of sample documents for use when getting started. One of these is a CSV file containing book metadata. This file is found in your Solr installation, at $SOLR_HOME/example/exampledocs/books.csv.

Using the sample books.csv file, we can see a detailed example of creating a table, loading data to it, and indexing that data to Solr.

CREATE TABLE books (id STRING, cat STRING, title STRING, price FLOAT, in_stock BOOLEAN, author STRING, series STRING, seq INT, genre STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; (1)

LOAD DATA LOCAL INPATH '/solr/example/exampledocs/books.csv' OVERWRITE INTO TABLE books; (2)

ADD JAR solr-hive-serde-3.0.0.jar; (3)

CREATE EXTERNAL TABLE solr (id STRING, cat_s STRING, title_s STRING, price_f FLOAT, in_stock_b BOOLEAN, author_s STRING, series_s STRING, seq_i INT, genre_s STRING) (4)
     STORED BY 'com.lucidworks.hadoop.hive.LWStorageHandler' (5)
     LOCATION '/tmp/solr' (6)
     TBLPROPERTIES('solr.zkhost' = 'zknode1:2181,zknode2:2181,zknode3:2181/solr',
                   'solr.collection' = 'gettingstarted',
                   'solr.query' = '*:*'), (7)
                   'lww.jaas.file' = '/data/jaas-client.conf'; (8)


INSERT OVERWRITE TABLE solr SELECT b.* FROM books b;
1 Define the table books, and provide the field names and field types that will make up the table.
2 Load the data from the books.csv file.
3 Add the solr-hive-serde-3.0.0.jar file to Hive. Note the jar name shown here omits the version information which will be included in the jar file you have. If you are using Hive 0.13, you must also use a jar specifically built for 0.13.
4 Create an external table named solr, and provide the field names and field types that will make up the table. These will be the same field names as in your local Hive table, so we can index all of the same data to Solr.
5 Define the custom storage handler provided by the solr-hive-serde-3.0.0.jar.
6 Define storage location in HDFS.
7 The query to run in Solr to read records from Solr for use in Hive.
8 Define the location of Solr (or ZooKeeper if using SolrCloud), the collection in Solr to index the data to, and the query to use when reading the table. This example also refers to a JAAS configuration file that will be used to authenticate to the Kerberized Solr cluster.

HBase Indexer

The HBase Indexer provides the ability to stream events from HBase to Solr for near real-time searching.

HBase Indexer Features

The HBase Indexer uses HBase replication to copy rows from HBase to the Solr index in near real-time.

This distribution is based on the original hbase-indexer project from NGDATA, with these additional features:

  • Support for HBase 0.94, 0.96, 0.98, 1.1.0 and 1.1.2

  • Support for Solr 5.x

  • Kerberos authentication

  • Output to Lucidworks Fusion pipelines (optional)

Due to the changes required for Solr 5.x support, builds from this fork of the main project will not work with Solr 4.x.

How the HBase Indexer Works

The HBase Indexer works as an HBase replication sink and runs as a daemon. When updates are written to HBase region servers, they are replicated to the HBase Indexer processes. These processes then pass the updates to Solr for indexing.

To use the HBase Indexer, there are several configuration files to prepare before using the HBase Indexer. Then a specific configuration for your your HBase table must be supplied to map HBase columns and rows to Solr fields and documents.

If you are already using HBase replication, the HBase Indexer process should not interfere.

HBase Indexer Daemon Setup

Each of the following configuration changes need to be made to use the HBase Indexer.

These instructions assume Solr is running and a collection already exists for the data to be indexed. Solr does not need to be restarted to start using the HBase Indexer.

Configure hbase-indexer-site.xml

The first step to configuring the HBase Indexer is to define the ZooKeeper connect string and quorum location in hbase-indexer-site.xml, found in /opt/$lucidworks-hdpsearch/hbase-indexer/conf.

The properties to define are the hbaseindexer.zookeeper.connectstring and hbaseindexer.zookeeper.quorum as in this example:

<configuration>
   <property>
      <name>hbaseindexer.zookeeper.connectstring</name>
      <value>server1:2181,server2:2181,server3:2181</value>
   </property>
   <property>
      <name>hbase.zookeeper.quorum</name>
      <value>server1,server2,server3</value>
   </property>
</configuration>

Customize the server names and ports as appropriate for your environment.

Configure hbase-site.xml

The hbase-site.xml file on each HBase node needs to be modified to include several new properties, as shown below:

<configuration>
   <!-- SEP is basically replication, so enable it -->
   <property>
      <name>hbase.replication</name>
      <value>true</value>
   </property>
   <property>
      <name>replication.source.ratio</name>
      <value>1.0</value>
   </property>
   <property>
      <name>replication.source.nb.capacity</name>
      <value>1000</value>
   </property>
   <property>
      <name>replication.replicationsource.implementation</name>
      <value>com.ngdata.sep.impl.SepReplicationSource</value>
   </property>
</configuration>

In more detail, these properties are:

  • hbase.replication: True or false; enables replication.

  • replication.source.ratio: Source ratio of 100% makes sure that each SEP consumer is actually used (otherwise, some can sit idle, especially with small clusters).

  • replication.source.ratio: The ratio of the number of slave cluster regionservers the master cluster regionserver will connect to for updates. This should be set to 1.0 to get updates from all slaves.

  • replication.source.nb.capacity: The maximum number of hlog entries to replicate in one go. If this is large, and a consumer takes a while to process the events, the HBase rpc call will time out.

  • replication.replicationsource.implementation: A custom replication source for indexing content to Solr.

Once these values have been inserted to hbase-site.xml, HBase will need to be restarted. That is an upcoming step in the configuration.

If you use Ambari to manage your cluster, you can set these properties by going to the HBase Configs screen at Ambari → Configs → Advanced tab → Custom hbase-site.

Copy hbase-site.xml to HBase Indexer

Once you have added the new properties to hbase-site.xml, copy it to the hbase-indexer conf directory. In many cases, this is from /etc/hbase/conf to hbase-indexer/conf.

Copying this file ensures all of the parameters configured for HBase (such as settings for Kerberos and ZooKeeper) are available to the HBase Indexer.

Copy JAR Files

The SEP replication being used by HBase Indexer requires 4 .jar files to be copied from the HBase Indexer distribution to each HBase node.

These .jar files can be found in the hbase-indexer/lib directory.

They need to be copied to the $HBASE_HOME/lib directory on each node running HBase. These files are:

  • hbase-sep-api-3.0.0.jar

  • hbase-sep-impl-3.0.0.jar

  • hbase-sep-impl-common-3.0.0.jar

  • hbase-sep-tools-3.0.0.jar

Enable Kerberos Support

If you want to index content to a Solr cluster that has been secured with Kerberos for internode communication, you will need to apply additional configuration.

A JAAS file configures the authentication properties, and will include a section for a service principal and keytab file for a user who has access to both HBase and Solr. This user should be a different user than the service principal that Solr is using for internode communication.

Kerberos Parameters

To configure HBase Indexer to be able to write to Kerberized Solr, you will modify the hbase-indexer script found in /opt/$lucidworks-hdpsearch/hbase-indexer/bin.

Find the section where two of the properties are commented out by default:

#HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.file="
#HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.appname="

Uncomment those properties to supply the correct values (explained below), and then add another property:

HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Djava.security.auth.login.config="

The three together should look similar to this:

HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.file="
HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Djava.security.auth.login.config="
HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.appname="

Remove the # to uncomment existing lines, and supply values for each property:

-Dlww.jaas.file

The full path to a JAAS configuration file that includes a section to define the keytab location and service principal that will be used to run the HBase Indexer. This user must have access to both HBase and Solr, but should be a different user.

-Djava.security.auth.login.config

The path to the JAAS file which includes a section for the HBase user. This can be the same file that contains the definitions for the HBase Indexer user, but must be defined separately.

-Dlww.jaas.appname

The name of the section in the JAAS file that includes the service principal and keytab location for the user who will run the HBase Indexer, as shown below. If this is not defined, a default of "Client" will be used.

Sample JAAS File

Here is a sample JAAS file, and the areas that must be changed for your environment:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/hbase.keytab" (2)
  storeKey=true
  useTicketCache=false
  debug=true
  principal="hbase@SOLRSERVER.COM"; (3)
};
SolrClient { (4)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (5)
  storeKey=true
  useTicketCache=false
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (6)
};
1 The name of the section of the JAAS file for the HBase user. This first section named "Client" should contain the proper credentials for HBase and ZooKeeper.
2 The path to the keyTab file for the HBase user. The user running the HBase Indexer must have access to this file.
3 The service principal name for the HBase user.
4 The name of the section for the user who will run the HBase Indexer. This second section is named "SolrClient" and should contain the proper credentials for the user who will run the HBase Indexer. This section name will be used with the -Dlww.jaas.appname parameter as described earlier.
5 The path to the keyTab file for the Solr user. The user running the HBase Indexer must have access to this file.
6 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and HBase.

Restart HBase

Once each of the above changes have been made, restart HBase on all nodes.

Start the HBase Indexer Daemon

When configuration is complete and HBase has been restarted, you can start the HBase Indexer daemon.

From hbase-indexer/bin, run:

hbase-indexer server &

The & portion of this command will run the server in the background. If you would like to run it in the foreground, omit the & part of the above example.

At this point the HBase Indexer daemon is running, you are ready to configure an indexer to start indexing content.

Stream Data from HBase Indexer to Solr

Once the HBase configuration files have been updated, HBase has been restarted on all nodes, and the daemon started, the next step is to create an indexer to stream data from a specific HBase table to Solr.

The HBase table that will be indexed must have the REPLICATION_SCOPE set to "1".

If you are not familiar with HBase, the HBase Indexer tutorial provides a good introduction to creating a simple table and indexing it to Solr.

Add an Indexer

In order to process HBase events, an indexer must be created.

First you need to create an indexer configuration file, which is a simple XML file to tell the HBase Indexer how to map HBase columns to Solr fields, For example:

<?xml version="1.0"?>
<indexer table="indexdemo-user">
   <field name="firstname_s" value="info:firstname"/>
   <field name="lastname_s" value="info:lastname"/>
   <field name="age_i" value="info:age" type="int"/>
</indexer>

Note that this defines the Solr field name, then the HBase column, and optionally, the field type. The indexer-table value must also reflect the name of the table you intend to index.

More details on the indexer configuration options are available from https://github.com/NGDATA/hbase-indexer/wiki/Indexer-configuration.

Start an Indexer Process

Once we have an indexer configuration file, we can then start the indexer itself with the add-indexer command.

This command takes several properties, as in this example:

./hbase-indexer add-indexer \ (1)
  -n myindexer \ (2)
  -c indexer-conf.xml \ (3)
  -cp solr.zk=server1:3181,server2:3181,server3:3181/solr \ (4)
  -cp solr.collection=myCollection (5)
  -z server1:3181,server2:3181,server3:3181 (6)
1 The hbase-indexer script is found in /opt/$lucidworks-hdpsearch/hbase-indexer/bin directory. The add-indexer command adds the indexer to the running hbase-indexer daemon.
2 The -n property provides a name for the indexer.
3 The -c property defines the location of the indexer configuration file. Provide the path as well as the filename if you are launching the
4 The -cp property allows you to provide a key-value pair. In this case, we use the solr.zk property to define the location of the ZooKeeper ensemble used with Solr. The ZooKeeper connect string should include /solr as the znode path.
5 Another -cp key-value pair, which defines the solr.collection property with a value of a collection name in Solr that the documents should be indexed to. This collection must exist prior to running the indexer.
6 The -z property defines the location of the ZooKeeper ensemble used with HBase. This may be the same as was defined in item (4) above, but needs to be additionally defined.

More details on the options for add-indexer are available from https://github.com/NGDATA/hbase-indexer/wiki/CLI-tools.

Pig Store/Load Functions

If you use Apache Pig to do large-scale data processing, you can use Pig scripts during indexing of content to Solr. For instance, you can use Pig to do large-scale preprocessing and joining of data and then output the resulting datasets to Solr so that you can more effectively query that data. This section describes the Pig Functions and how to use them.

Pig Functions

A single .jar provides functions for processing content before it is indexed to Solr. This jar is found at:

/opt/lucidworks-hdpsearch/pig/solr-pig-functions-3.0.0.jar

The appropriate Pig function jar must be stored in HDFS in order to be used by your Pig script. It can be located anywhere in HDFS; you will supply the path to the proper jar when invoking your script.

Available Functions

The Pig functions included in the solr-pig-functions-3.0.0.jar are three UserDefined Functions (UDF) and two Store functions. These functions are:

  • com/lucidworks/hadoop/pig/SolrStoreFunc.class

  • com/lucidworks/hadoop/pig/FusionIndexPipelinesStoreFunc.class

  • com/lucidworks/hadoop/pig/EpochToCalendar.class

  • com/lucidworks/hadoop/pig/Extract.class

  • com/lucidworks/hadoop/pig/Histogram.class

Register the Functions

There are two approaches to using functions in Pig: REGISTER them in the script, or load them with your Pig command line request.

If using REGISTER, the Pig function jars must be put in HDFS in order to be used by your Pig script. It can be located anywhere in HDFS; you can either supply the path in your script or use a variable and define the variable with -p property definition.

The example below uses the second approach, loading the jars with the -Dpig.additional.jars system property when launching the script. With this approach, the jars can be located anywhere on the machine where the script will be run.

Indexing Data to Solr

There are a few required parameters for your script to output data to Solr for indexing.

These parameters can be defined in the script itself, or turned into variables that are defined each time the script runs. The example Pig script below shows an example of using these parameters with variables.

solr.zkhost

The ZooKeeper connection string if using Solr in SolrCloud mode. This should be in the form of server:port,server:port,server:port/chroot.

If you are not using SolrCloud, use the solr.server.url parameter instead.

solr.server.url

The location of the Solr instance when Solr is running in standalone mode. This should be in the form of http://server:port/solr.

solr.collection

The name of the Solr collection where documents will be indexed.

Indexing to a Kerberos-Secured Solr Cluster

When a Solr cluster is secured with Kerberos for internode communication, Pig scripts must include the full path to a JAAS file that includes the service principal and the path to a keytab file that will be used to index the output of the script to Solr.

Two parameters provide the information the script needs to access the JAAS file:

lww.jaas.file

The path to the JAAS file that includes a section for the service principal who will write to the Solr indexes. For example, to use this property in a Pig script:

set lww.jaas.file '/path/to/login.conf';

The JAAS configuration file must be copied to the same path on every node where a Node Manager is running (i.e., every node where map/reduce tasks are executed).

lww.jaas.appname

The name of the section in the JAAS file that includes the correct service principal and keytab path. For example, to use this property in a Pig script:

set lww.jaas.appname 'Client';

Here is a sample section of a JAAS file:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (2)
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (3)
};
1 The name of this section of the JAAS file. This name will be used with the lww.jaas.appname parameter.
2 The location of the keytab file.
3 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and Pig.

Indexing to a SSL-Enabled Solr Cluster

When SSL is enabled in a Solr cluster, Pig scripts must include the full paths to the keystore and truststore with their respective passwords.

set lww.keystore '/path/to/solr-ssl.keystore.jks'
set lww.keystore.password 'secret'
set lww.truststore '/path/to/solr-ssl.truststore.jks'
set lww.truststore.password 'secret'
The paths (and secret configurations) should be the same in all YARN/MapReduce hosts.

Sample CSV Script

The following Pig script will take a simple CSV file and index it to Solr.

set solr.zkhost '$zkHost';
set solr.collection '$collection'; (1)

A = load '$csv' using PigStorage(',') as (id_s:chararray,city_s:chararray,country_s:chararray,code_s:chararray,code2_s:chararray,latitude_s:chararray,longitude_s:chararray,flag_s:chararray); (2)
--dump A;
B = FOREACH A GENERATE $0 as id, 'city_s', $1, 'country_s', $2, 'code_s', $3, 'code2_s', $4, 'latitude_s', $5, 'longitude_s', $6, 'flag_s', $7; (3)

ok = store B into 'SOLR' using com.lucidworks.hadoop.pig.SolrStoreFunc(); (4)

This relatively simple script is doing several things that help to understand how the Solr Pig functions work.

1 This and the line above define parameters that are needed by SolrStoreFunc to know where Solr is. SolrStoreFunc needs the properties solr.zkhost and solr.collection, and these lines are mapping the zkhost and collection parameters we will pass when invoking Pig to the required properties.
2 Load the CSV file, the path and name we will pass with the csv parameter. We also define the field names for each column in CSV file, and their types.
3 For each item in the CSV file, generate a document id from the first field ($0) and then define each field name and value in name, value pairs.
4 Load the documents into Solr, using the SolrStoreFunc. While we don’t need to define the location of Solr here, the function will use the zkhost and collection properties that we will pass when we invoke our Pig script.
When using SolrStoreFunc, the document ID must be the first field.

When we want to run this script, we invoke Pig and define several parameters we have referenced in the script with the -p option, such as in this command:

./bin/pig -Dpig.additional.jars=/path/to/solr-pig-functions-3.0.0.jar -p csv=/path/to/my/csv/airports.dat -p zkHost=zknode1:2181,zknode2:2181,zknode3:2181/solr -p collection=myCollection ~/myScripts/index-csv.pig

The parameters to pass are:

csv

The path and name of the CSV file we want to process.

zkhost

The ZooKeeper connection string for a SolrCloud cluster, in the form of zkhost1:port,zkhost2:port,zkhost3:port/chroot. In the script, we mapped this to the solr.zkhost property, which is required by the SolrStoreFunc to know where to send the output documents.

collection

The Solr collection to index into. In the script, we mapped this to the solr.collection property, which is required by the SolrStoreFunc to know the Solr collection the documents should be indexed to.

The zkhost parameter above is only used if you are indexing to a SolrCloud cluster, which uses ZooKeeper to route indexing and query requests.

If, however, you are not using SolrCloud, you can use the solrUrl parameter, which takes the location of a standalone Solr instance, in the form of http://host:port/solr.

In the script, you would change the line that maps solr.zkhost to the zkhost property to map solr.server.url to the solrUrl property. For example:

`set solr.server.url '$solrUrl';`

How to Contribute

  1. Fork this repo i.e. <username|organization>/hadoop-solr, following the fork a repo tutorial. Then, clone the forked repo on your local machine:

    $ git clone https://github.com/<username|organization>/hadoop-solr.git
  2. Configure remotes with the configuring remotes tutorial.

  3. Create a new branch:

    $ git checkout -b new_branch
    $ git push origin new_branch

    Use the creating branches tutorial to create the branch from GitHub UI if you prefer.

  4. Develop on new_branch branch only, do not merge new_branch to your master. Commit changes to new_branch as often as you like:

    $ git add <filename>
    $ git commit -m 'commit message'
  5. Push your changes to GitHub.

    $ git push origin new_branch
  6. Repeat the commit & push steps until your development is complete.

  7. Before submitting a pull request, fetch upstream changes that were done by other contributors:

    $ git fetch upstream
  8. And update master locally:

    $ git checkout master
    $ git pull upstream master
  9. Merge master branch into new_branch in order to avoid conflicts:

    $ git checkout new_branch
    $ git merge master
  10. If conflicts happen, use the resolving merge conflicts tutorial to fix them:

  11. Push master changes to new_branch branch

    $ git push origin new_branch
  12. Add jUnits, as appropriate, to test your changes.

  13. When all testing is done, use the create a pull request tutorial to submit your change to the repo.

Please be sure that your pull request sends only your changes, and no others. Check it using the command:

git diff new_branch upstream/master

Spark RDD

The Spark RDD provides a set of tools for reading data from Solr and indexing objects from Spark into Solr using SolrJ (a Java client for Solr).

The spark-solr Repository

Lucidworks has provided an SDK for creating a custom Apache Spark application to simplify integrating Spark and Solr.

We have provided a clone of the spark-solr GitHub repository found at https://github.com/Lucidworks/spark-solr. You can find this clone in:

/opt/lucidworks-hdpsearch/spark-solr/

Before using this repository, you should update it on your local machine with a simple git pull request.

Build from Source

mvn clean package -DskipTests

This will build 2 jars in the target directory:

  • spark-solr-${VERSION}.jar

  • spark-solr-${VERSION}-shaded.jar

${VERSION} will be something like 2.1.0-SNAPSHOT, for development builds.

The first .jar is what you’d want to use if you were using spark-solr in your own project. The second is what you’d use to submit one of the included example apps to Spark.

Getting started

Import jar File via spark-shell

cd $SPARK_HOME
./bin/spark-shell --jars spark-solr-3.0.1-shaded.jar

The shaded jar can be downloaded from the Maven Central or built from the respective branch

Connect to your SolrCloud Instance

via DataFrame

val options = Map(
  "collection" -> "{solr_collection_name}",
  "zkhost" -> "{zk_connect_string}"
)
val df = spark.read.format("solr")
  .options(options)
  .load

via RDD

import com.lucidworks.spark.rdd.SelectSolrRDD
val solrRDD = new SelectSolrRDD(zkHost, collectionName, sc)

SelectSolrRDD is an RDD of SolrDocument

via RDD (Java)

import com.lucidworks.spark.rdd.SolrJavaRDD;
import org.apache.spark.api.java.JavaRDD;

SolrJavaRDD solrRDD = SolrJavaRDD.get(zkHost, collection, jsc.sc());
JavaRDD<SolrDocument> resultsRDD = solrRDD.queryShards(solrQuery);

Features

  • Send objects from a Spark (Streaming or DataFrames) into Solr.

  • Read the results from a Solr query as a Spark RDD or DataFrame.

  • Stream documents from Solr using /export handler (only works for exporting fields that have docValues enabled).

  • Read large result sets from Solr using cursors or with /export handler.

  • Data locality. If Spark workers and Solr processes are co-located on the same nodes, the partitions are placed on the nodes where the replicas are located.

Querying

Cursors

Cursors are used by default to pull documents out of Solr. By default, the number of tasks allocated will be the number of shards available for the collection.

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. A good candidate for the split field is the version field that is attached to every document by the shard leader during indexing. See splits section to enable and configure intra shard splitting.

Cursors won’t work if the index changes during the query time. Constrain your query to a static index by using additional Solr parameters using solr.params.

Streaming API (/export)

If the fields that are being queried have docValues enabled, then the Streaming API can be used to pull documents from Solr in a true Streaming fashion. This method is 8-10x faster than Cursors. The option request_handler allows you to enable Streaming API via DataFrame.

Indexing

Objects can be sent to Solr via Spark Streaming or DataFrames. The schema is inferred from the DataFrame and any fields that do not exist in Solr schema will be added via Schema API. See ManagedIndexSchemaFactory.

See Index parameters for configuration and tuning.

Configuration and Tuning

The Solr DataSource supports a number of optional parameters that allow you to optimize performance when reading data from Solr. The only required parameters for the DataSource are zkhost and collection.

Query Parameters

query

Probably the most obvious option is to specify a Solr query that limits the rows you want to load into Spark. For instance, if we only wanted to load documents that mention "solr", we would do:

Usage: option("query","body_t:solr")

Default: *:*

If you don’t specify the "query" option, then all rows are read using the "match all documents" query (*:*).

fields

You can use the fields option to specify a subset of fields to retrieve for each document in your results:

Usage: option("fields","id,author_s,favorited_b,…​")

By default, all stored fields for each document are pulled back from Solr.

You can also specify an alias for a field using Solr’s field alias syntax, e.g. author:author_s. If you want to invoke a function query, such as rord(), then you’ll need to provide an alias, e.g. ord_user:ord(user_id). If the return type of the function query is something other than int or long, then you’ll need to specify the return type after the function query, such as: foo:div(sum(x,100),max(y,1)):double

If you request Solr function queries, then the library must use the /select handler to make the request as exporting function queries through /export is not supported by Solr.

filters

You can use the filters option to set filter queries on Solr query:

Usage: option("filters","firstName:Sam,lastName:Powell")

rows

You can use the rows option to specify the number of rows to retrieve from Solr per request; do not confuse this with max_rows (see below). Behind the scenes, the implementation uses either deep paging cursors or Streaming API and response streaming, so it is usually safe to specify a large number of rows.

To be clear, this is not the maximum number of rows to read from Solr. All matching rows on the backend are read. The rows parameter is the page size.

By default, the implementation uses 1000 rows but if your documents are smaller, you can increase this to 10000. Using too large a value can put pressure on the Solr JVM’s garbage collector.

Usage: option("rows","10000") Default: 1000

max_rows

Limits the result set to a maximum number of rows; only applies when using the /select handler. The library will issue the query from a single task and let Solr do the distributed query processing. In addition, no paging is performed, i.e. the rows param is set to max_rows when querying. Consequently, this option should not be used for large max_rows values, rather you should just retrieve all rows using multiple Spark tasks and then re-sort with Spark if needed.

Usage: option("max_rows", "100") Defalut: None

request_handler

Set the Solr request handler for queries. This option can be used to export results from Solr via /export handler which streams data out of Solr. See Exporting Result Sets for more information.

The /export handler needs fields to be explicitly specified. Please use the fields option or specify the fields in the query.

Usage: option("request_handler", "/export") Default: /select

Increase Read Parallelism using Intra-Shard Splits

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. The sub range splitting enables faster fetching from Solr by increasing the number of tasks in Solr. This should only be used if there are enough computing resources in the Spark cluster.

Shard splitting is disabled by default.

splits

Enable shard splitting on default field _version_.

Usage: option("splits", "true")

Default: false

The above option is equivalent to option("split_field", "_version_")

split_field

The field to split on can be changed using split_field option.

Usage: option("split_field", "id") Default: _version_

splits_per_shard

Behind the scenes, the DataSource implementation tries to split the shard into evenly sized splits using filter queries. You can also split on a string-based keyword field but it should have sufficient variance in the values to allow for creating enough splits to be useful. In other words, if your Spark cluster can handle 10 splits per shard, but there are only 3 unique values in a keyword field, then you will only get 3 splits.

Keep in mind that this is only a hint to the split calculator and you may end up with a slightly different number of splits than what was requested.

Usage: option("splits_per_shard", "30")

Default: 20

flatten_multivalued

This option is enabled by default and flattens multi valued fields from Solr.

Usage: option("flatten_multivalued", "false")

Default: true

dv

The dv option will fetch the docValues that are indexed but not stored by using function queries. Should be used for Solr versions lower than 5.5.0.

Usage: option("dv", "true")

Default: false

skip_non_dv

The skip_non_dv option instructs the solr datasource to skip all fields that are not docValues.

Usage: option("skip_non_dv", "true")

Default: false

Index parameters

soft_commit_secs

If specified, the soft_commit_secs option will be set via SolrConfig API during indexing

Usage: option("soft_commit_secs", "10")

Default: None

commit_within

The commit_within param sets commitWithin on the indexing requests processed by SolrClient. This value should be in milliseconds. See commitWithin

Usage: option("commit_within", "5000")

Default: None

batch_size

The batch_size option determines the number of documents that are sent to Solr via a HTTP call during indexing. Set this option higher if the docs are small and memory is available.

Usage: option("batch_size", "10000")

Default: 500

gen_uniq_key

If the documents are missing the unique key (derived from Solr schema), then the gen_uniq_key option will generate a unique value for each document before indexing to Solr. Instead of this option, the UUIDUpdateProcessorFactory can be used to generate UUID values for documents that are missing the unique key field

Usage: option("gen_uniq_key", "true")

Default: false

sample_seed

The sample_seed option allows you to read a random sample of documents from Solr using the specified seed. This option can be useful if you just need to explore the data before performing operations on the full result set. By default, if this option is provided, a 10% sample size is read from Solr, but you can use the sample_pct option to control the sample size.

Usage: option("sample_seed", "5150")

Default: None

sample_pct

The sample_pct option allows you to set the size of a random sample of documents from Solr; use a value between 0 and 1.

Usage: option("sample_pct", "0.05")

Default: 0.1

solr.params

The solr.params option can be used to specify any arbitrary Solr parameters in the form of a Solr query.

Don’t use this to pass parameters that are covered by other options, such as fl (use the fields option) or sort. This option is strictly intended for parameters that are NOT covered by other options.

Usage: option("solr.params", "fq=userId:[10 TO 1000]")

Querying Time Series Data

partition_by

Set this option as time, in order to query mutiple time series collections, partitioned according to some time period

Usage: option("partition_by", "time")

Default:none

time_period

This is of the form X DAYS/HOURS/MINUTES.This should be the time period with which the partitions are created.

Usage: option("time_period", "1MINUTES")

Default: 1DAYS

datetime_pattern

This pattern can be inferred from time_period. But this option can be used to explicitly specify.

Usage: option("datetime_pattern", "yyyy_MM_dd_HH_mm")

Default: yyyy_MM_dd

timestamp_field_name

This option is used to specify the field name in the indexed documents where time stamp is found.

Usage: option("timestamp_field_name", "ts")

Default: timestamp_tdt

timezone_id

Used to specify the timezone.

Usage: option("timezone_id", "IST")

Default: UTC

max_active_partitions

This option is used to specify the maximum number of partitions that must be allowed at a time.

Usage: option("max_active_partitions", "100")

Default: null

Troubleshooting Tips

Why is dataFrame.count so slow?

Solr can provide the number of matching documents nearly instantly, so why is calling count on a DataFrame backed by a Solr query so slow? The reason is that Spark likes to read all rows before performing any operations on a DataFrame. So when you ask SparkSQL to count the rows in a DataFrame, spark-solr has to read all matching documents from Solr and then count the rows in the RDD.

If you’re just exploring a Solr collection from Spark and need to know the number of matching rows for a query, you can use SolrQuerySupport.getNumDocsFromSolr utility function.

I set rows to 10 and now my job takes forever to read 10 rows from Solr!

The rows option sets the page size, but all matching rows are read from Solr for every query. So if your query matches many documents in Solr, then Spark is reading them all 10 docs per request.

Use the sample_seed option to limit the size of the results returned from Solr.

Developing a Spark Application

The com.lucidworks.spark.SparkApp provides a simple framework for implementing Spark applications in Java. The class saves you from having to duplicate boilerplate code needed to run a Spark application, giving you more time to focus on the business logic of your application.

To leverage this framework, you need to develop a concrete class that either implements RDDProcessor or extends StreamProcessor depending on the type of application you’re developing.

RDDProcessor

Implement the com.lucidworks.spark.SparkApp$RDDProcessor interface for building a Spark application that operates on a JavaRDD, such as one pulled from a Solr query (see SolrQueryProcessor as an example).

StreamProcessor

Extend the com.lucidworks.spark.SparkApp$StreamProcessor abstract class to build a Spark streaming application.

See com.lucidworks.spark.example.streaming.oneusagov.OneUsaGovStreamProcessor or com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor for examples of how to write a StreamProcessor.

Authenticating with Solr

For background on Solr security, see: Securing Solr.

Kerberos

The Kerberos config should be set via system param java.security.auth.login.config on extraJavaOptions for both executor and driver.

SparkApp

The SparkApp framework (in spark-solr) allows you to pass the path to a JAAS authentication configuration file using the -solrJaasAuthConfig option.

For example, if you need to authenticate using the "solr" Kerberos principal, you need to create a JAAS configuration file named jaas-client.conf that sets the location of your Kerberos keytab file, such as:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/keytabs/solr.keytab"
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr";
};

To use this configuration to authenticate to Solr, you simply need to pass the path to jaas-client.conf created above using the -solrJaasAuthConfig option, such as:

spark-submit --master yarn-server \
  --class com.lucidworks.spark.SparkApp \
  $SPARK_SOLR_PROJECT/target/spark-solr-${VERSION}-shaded.jar \
  hdfs-to-solr -zkHost $ZK -collection spark-hdfs \
  -hdfsPath /user/spark/testdata/syn_sample_50k \
  -solrJaasAuthConfig=/path/to/jaas-client.conf

Basic Auth

Basic auth can be configured via System properties basicauth or solr.httpclient.config. These system properties have to be set on Driver and Executor JVMs

Examples:

Using basicauth

 ./bin/spark-shell --master local[*] --jars ~/Git/spark-solr/target/spark-solr-3.0.1-SNAPSHOT-shaded.jar  --conf 'spark.driver.extraJavaOptions=-Dbasicauth=solr:SolrRocks'

Using solr.httpclient.config

 ./bin/spark-shell --master local[*] --jars ~/Git/spark-solr/target/spark-solr-3.0.1-SNAPSHOT-shaded.jar  --conf 'spark.driver.extraJavaOptions=-Dsolr.httpclient.config=/Users/kiran/spark/spark-2.1.0-bin-hadoop2.7/auth.txt'

Contents of config file

httpBasicAuthUser=solr
httpBasicAuthPassword=SolrRocks