Kerberos Support

Each of the connectors support indexing in a cluster running Kerberos security. Generally, this is as simple as running kinit as the user who will execute the indexing task.

Kerberos support is available in several different ways:

  • Index content from Hadoop components such as the filesystem, Hive, or Pig secured with Kerberos.

  • Store Solr indexes in a Hadoop filesystem secured with Kerberos.

  • Secure the Solr cluster with Kerberos for all internode communication and access to the Admin UI.

Each approach requires configuration, outlined in the sections below.

Index Content from a Kerberos-Secured Cluster

If Solr is not secured with Kerberos for internode communication, indexing content from a Kerberos-secured cluster is as simple as using kinit to get a ticket for the service principal (user) who will be running the connector.

For example, if using the pig user to use a Pig script while indexing, you should get a Kerberos ticket for the pig user.

To get the ticket, simply issue this command:

To verify that you have a ticket for the user, you can use the klist command. Here is a sample output from that command:

Ticket cache: FILE:lucid.cache
Default principal: user@HADOOP-VM1

Valid starting    Expires           Service principal
04/08/1513:49:06  04/09/1513:49:06  krbtgt/HADOOP-VM1@HADOOP-VM1
      renew until 04/08/1513:49:06

Once you have a ticket, you can use the connector as normal. No additional authentication or parameters are required.

Store Solr Indexes in HDFS

When storing your Solr indexes in HDFS when the Hadoop cluster is secured with Kerberos, you will need to configure Solr with the keytab location and service principal to use for communication with the Hadoop cluster.

See the section Kerberos Support in the HDPSearch Installation Guide for more information.

Secure Solr with Kerberos

The Apache Solr Reference Guide includes a section on how to use Kerberos authentication with Solr: Kerberos Authentication.

When Solr is secured with Kerberos for internode communication and access to the Admin UI, you will need to create a JAAS file and service principal to allow the Hadoop components to write to the secured Solr indexes. How to reference the JAAS file is covered in the documentation for each connector. This file must be copied to each NodeManager node in the cluster (i.e., every node where map/reduce tasks are executed).

Hadoop Job Jar

The Lucidworks Hadoop Job Jar allows indexing content from HDFS to Solr. This section describes the Job Jar and how to use it.

Lucidworks Job Jar Overview

The Job Jar is a connector of sorts, allowing you to index many different types of content stored in HDFS. It uses MapReduce to leverage the scaling qualities of Apache Hadoop while indexing content to Solr. It can be run from any location.

The Job Jar makes use of ingest mappers to parse documents and prepare them to be added to a Solr index. The selection of the ingest mapper is one of the arguments provided to the job when it’s started. Details of the available ingest mappers are included below.

How it Works

The Job Jar consists of a series of MapReduce-enabled jobs to convert raw content into documents for MapReduce-ready document conversion via Apache Tika and writing documents to Solr for indexing.

You must use the Job Jar with a user that has permissions to write to the hadoop.tmp.dir. The /tmp directory in HDFS must also be writable.

The Hadoop job jar works in three stages designed to take in raw content and output results to Solr. These stages are:

  1. Create one or more SequenceFiles from the raw content. This can be done in one of two ways:

    1. If the source files are available in a shared Hadoop filesystem, prepare a list of source files and their locations as a SequenceFile. The raw contents of each file are not processed until step 2.

    2. If the source files are not available, prepare a list of source files and the raw content. This process is currently done sequentially and can take a significant amount of time if there is a large number of documents and/or if they are very large.

  2. Run a MapReduce job to extract text and metadata from the raw content. This process uses Apache Tika if a pipeline has been configured to do so.

  3. Run a MapReduce job to send the extracted content from HDFS to Solr using the SolrJ client. This implementation works with SolrJ’s CloudServer Java client which is aware of where Solr is running via Zookeeper.

Incremental indexing, where only changes to a document or directory are processed on successive job jar runs, is not supported at this time. All three steps must be completed each time the Job Jar is run, regardless of whether the raw content hasn’t changed.

The first step of the crawl process converts the input content into a SequenceFile. In order to do this, the entire contents of that file must be read into memory so that it can be written out as a PipelineDocument in the SequenceFile. Thus, you should be careful to ensure that the system does not load into memory a file that is larger than the Java heap size of the process.

Ingest Mappers

Ingest mappers in the Hadoop job jar parse documents and prepare them for indexing to Solr.

When configuring the arguments for your job, selection of the correct ingest mapper is key to successful indexing of documents. In some cases, the choice of ingest mapper will determine the arguments you need to provide to the job jar.

The ingest mapper choice is passed to the job jar with the -cls argument. Available job jar arguments are described in detail in the section Job Jar Arguments below.

There are several supported ingest mappers, described in detail below.

CSV Ingest Mapper

This ingest mapper allows you to index files in CSV format. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.CSVIngestMapper.

There are several additional arguments that can be supplied when using this ingest mapper. These are described in detail in the section CSV Ingest Mapper Arguments. For reference, these are the additional arguments:

  • csvDelimiter: the character that is used to separate values for different fields.

  • csvFieldMapping: define default field mapping from column names to Solr fields.

  • csvFirstLineComment: declare that the first line of the document is a comment.

  • idField: the column to be used as the document ID.

  • csvStrategy: the format of the CSV file.

Supports: TextInputFormat documents.

Directory Ingest Mapper

This ingest mapper allows you to index documents found in a defined directory. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.DirectoryIngestMapper.

There are no special arguments for this ingest mapper.

Apache Tika will be used to extract content from these files, so file types supported by Tika will be parsed.

Grok Ingest Mapper

This ingest mapper allows you to index log files based on a Logstash configuration file. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.GrokIngestMapper.

LogStash filters such as grok, kv, date, etc., and grok patterns such as ID and WORD are supported. More information about Grok is available at http://logstash.net/docs/1.4.0/filters/grok.

During processing, any input and output statements in the configuration file will be ignored. The input will always be HDFS and the output will always be Solr.

There is one additional argument for this ingest mapper, grok.uri, which defines the location of the Logstash configuration file, in either the local filesystem or HDFS. More details are in the section Grok Ingest Mapper Arguments.

Supports: TextInputFormat documents.

RegEx Ingest Mapper

This ingest mapper allows definition of a regular expression that is used on the incoming file to extract content. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.RegexIngestMapper.

The ingest mapper expects that the key and value produced by the InputFormat are both Writable. The regular expression is only applied to the value.

There are three additional arguments that can be supplied with this ingest mapper, described in detail in the section Regular Expression Ingest Mapper Arguments. For reference, these additional properties are:

  • com.lucidworks.hadoop.ingest.RegexIngestMapper.regex: define a regular expression.

  • com.lucidworks.hadoop.ingest.RegexIngestMapper.groups_to_fields: map fields between regex capture groups and field names.

  • com.lucidworks.hadoop.ingest.RegexIngestMapper.match: use Java’s match method instead of find.

SequenceFile Ingest Mapper

This ingest mapper allows you to index a SequenceFile. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.SequenceFileIngestMapper.

If the type for the value of the key/value pair is "text", the string will be used, otherwise the raw bytes will be written.

There are no special arguments for this ingest mapper.

Supports: SequenceFileInputFormat documents.

SolrXML Ingest Mapper

This ingest mapper allows you to index a file in SolrXML format. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.SolrXMLIngestMapper.

The file should be in a SequenceFileInputFormat, where the key is any Writable and the value is text in SolrXML format. The default inputFormat of SequenceFileInputFormat can be overridden if required.

This mapper requires that the idField parameter be set when creating the workflow job. For more details, see the section SolrXML Ingest Mapper Arguments.

Only "add" commands in the SolrXML will be processed. All other commands will be ignored.

Supports: SequenceFileInputFormat documents.

WARC Ingest Mapper

This ingest mapper allows you to index web archive (.warc) files in WarcFileInputFormat. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.WarcIngestMapper.

There are no special arguments for this ingest mapper.

Supports: WarcFileInputFormat documents.

Zip Ingest Mapper

This ingest mapper allows you to index documents contained in .zip files. The class to use with the -cls argument is com.lucidworks.hadoop.ingest.ZipIngestMapper.

Tika will be used to extract content from these files, so file types supported by Tika will be parsed.

There are no special arguments for this ingest mapper.

Supports: ZipFileInputFormat documents.

Job Jar Arguments

The job jar arguments allow you to define the type of content in your Hadoop filesystem, choose the ingest mappers appropriate for that content, and set other job parameters as needed.

There are three main sections to the Job Jar arguments:

  • the main class

  • system and mapper-specific arguments

  • key-value pair arguments

The arguments must be supplied in the above order.

The available arguments and parameters are described in the following sections.

Main Class

The main class must be specified. For all of the mappers available, it is always defined as com.lucidworks.hadoop.ingest.IngestJob.

System and Mapper-specific Arguments

System or Mapper-specific arguments, defined with a pattern of -Dargument=value, are supplied after the class name. In many cases, the arguments chosen depend on the ingest mapper chosen. The ingest mapper will be defined later in the argument string.

There are several possible arguments:

Ingest Behavior Arguments
-Dlww.commit.on.close

Defines if a commit should be done when the connection to Solr is complete. Commits in Solr flush documents to disk instead of holding them in memory. A commit is required for the documents to be searchable. There are settings in Solr to perform automatic commits when the queue grows to a certain size (see UpdateHandlers in SolrConfig in the Apache Solr Reference Guide for more on commits).

Default: false. Required: No.

-Dadd.subdirectories

If true, the exploration of a folder will be recursive, meaning it will look for subdirectories to traverse for documents.

Default: false. Required: No.

-Dpipeline.uri

The path to a pipeline configuration file, which can be in the local filesystem (file:///path/pipeline.conf) or in HDFS (hdfs://path/pipeline.conf). See the section Using an Index Pipeline for more information about pipelines.

Default: empty. Required: No.

CSV Ingest Mapper Arguments

These arguments are used only when the CSVIngestMapper is chosen with the -cls property described in the section, Key-Value Pair Arguments, below.

-DcsvDelimiter

This is the file delimiter for CSV content.

Default: , (comma). Required: No.

-DcsvFieldMapping

This defines how to map columns in a CSV file to fields in Solr, in the format of 0=id. The key is a zero-based column number (the first column is always "0", the second column is "1", etc.), and the value is the name of the field to use to store the value in Solr. If this is not set, column 0 is used as the id, unless there is a column named 'id'. See the -DidField argument below for more on the ID field rules.

Default: none. Required: No.

-DcsvFirstLineComment

If true, the first line in a CSV file will be interpreted as a comment and will not be indexed.

Default: true. Required: No.

-DcsvStrategy

Defines the format of a CSV file. Three formats are supported:

  • default: a CSV file that adheres to the RFC-4180 standard.

  • excel: a CSV file exported from Microsoft Excel. This commonly uses a comma (,) as the field delimiter.

  • tdf: a tab-delimited CSV file. If you use the tdf strategy, you do not need to override the delimiter with the -DcsvDelimiter argument.

    Default: default. Required: No.

-DidField

The column to be used as an ID. The field name used is the name after any mapping that occurs as a result of the -DcsvFieldMapping argument. If there is a column named 'id' and it is different from the field named with this property, you will get an error because you have defined two IDs and IDs must be unique.

This argument is not required when using the CSV Ingest Mapper, but is required when using the SolrXML Ingest Mapper.

Default: none. Required: No.

Grok Ingest Mapper Arguments

These arguments are used only when the GrokIngestMapper is chosen with the -cls property described in the section, Key-Value Pair Arguments, below.

-Dgrok.uri

The path to a Logstash configuration file, which can be in the local filesystem (file:///path/logStash.conf) or in HDFS (hdfs://path/logStash.conf).

Default: none. Required: No.

Regular Expression Ingest Mapper Arguments

These arguments are used only when the RegexIngestMapper is chosen with the -cls property described in the section, Key-Value Pair Arguments, below.

-Dcom.lucidworks.hadoop.ingest.RegexIngestMapper.regex

A Java Pattern compliant Regex. See Pattern Javadocs for more details. This property cannot be null or empty.

Default: none. Required: No.

-Dcom.lucidworks.hadoop.ingest.RegexIngestMapper.groups_to_fields

A comma-separated mapping (such as key=value,key=value,…​) between regular expression capturing groups and field names. The key must be an integer and the value must be a String. For instance, 0=body,1=text. Any capturing group not represented in the map will not be added to the document.

Default: none. Required: No.

-Dcom.lucidworks.hadoop.ingest.RegexIngestMapper.match

If true, the mapper will use Java’s Matcher class matches method instead of the find method. This will require the regex to match the entire string instead of part of the string.

Default: none. Required: No.

SolrXML Ingest Mapper Arguments

These arguments are used only when the SolrXMLIngestMapper is chosen with the -cls property described in the section, Key-Value Pair Arguments, below.

-DidField

The field in the XML document to be used as a unique document ID in the index.

This argument is required when using the SolrXML Ingest Mapper, but not required when using the CSV Ingest Mapper.

Default: none. Required: Yes.

Kerberos Arguments

If Solr has been secured with Kerberos, additional arguments are required to ensure authentication can be achieved when the job jar attempts to write the documents to Solr.

There are two limitations to using the job jar with a Kerberized Solr cluster:

  • Hadoop does not check if Solr is available before starting the MapReduce process. This may cause a long delay after starting the process before an error appears if there are problems communicating with Solr.

  • The job jar will not issue a command to Solr to do a final commit, which saves documents to the index. To be sure all documents are committed to the index, you can issue a "hard commit", or enable auto-commits in Solr. See the Apache Solr Reference Guide section UpdateHandlers in SolrConfig for more information.

-Dlww.jaas.file

The full path to a JAAS configuration file that includes a section to define the keytab location and service principal that will be used to write index updates to Solr.

The JAAS configuration file must be copied to the same path on every node where a Node Manager is running (i.e., every node where map/reduce tasks are executed). This is an example of a JAAS section that can be used:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (2)
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (3)
};
1 The name of this section of the JAAS file. This name will be used with the lww.jaas.appname argument (described below).
2 The location of the keytab file.
3 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and Hadoop.

Default: None. Required: Yes, if Solr is secured with Kerberos.

-Dlww.jaas.appname

The name of the section in the JAAS file that includes the service principal and keytab location, as described above. If this is not defined, a default of "Client" will be used.

Default: Client. Required: No.

-Dlww.jaas-debug.all

When true, debug-level logging will be enabled in the YARN container for use in troubleshooting problems. This argument should not be used in production.

Default: False. Required: No.

Other arguments not described here (such as Hadoop-specific system arguments) can be supplied as needed and they will be added to the Hadoop configuration. These arguments should be defined with the -Dargument=value syntax.

Key-Value Pair Arguments

Key-value pair arguments apply to the ingest job generally. These arguments are expressed as -argument value. They are the last arguments supplied before the jar name is defined.

There are several possible arguments:

-cls

Required.

The ingest mapper class. This class must correspond to the content being indexed to ensure proper parsing of documents. See the section Ingest Mappers for a detailed explanation of each available ingest mapper.

  • com.lucidworks.hadoop.ingest.GrokIngestMapper

  • com.lucidworks.hadoop.ingest.CSVIngestMapper

  • com.lucidworks.hadoop.ingest.DirectoryIngestMapper

  • com.lucidworks.hadoop.ingest.RegexIngestMapper

  • com.lucidworks.hadoop.ingest.SequenceFileIngestMapper

  • com.lucidworks.hadoop.ingest.SolrXMLIngestMapper

  • com.lucidworks.hadoop.ingest.WarcIngestMapper

  • com.lucidworks.hadoop.ingest.ZipIngestMapper

-c

Required.

The collection name where documents should be indexed. This collection must exist prior to running the Hadoop job jar.

-of

Required.

The output format. For all cases, you can use the default com.lucidworks.hadoop.io.LWMapRedOutputFormat.

-i

Required.

The path to the Hadoop input data. This path should point to the HDFS directory. If the defined location is not a specific filename, the syntax must include a wildcard expression to find documents, such as /data/*.

-s

The Solr URL when running in standalone mode. In a default installation, this would be http://localhost:8983/solr. Use this parameter when you are not running in SolrCloud mode. If you are running Solr in SolrCloud mode, you should use -zk instead.

-zk

A list of ZooKeeper hosts, followed by the ZooKeeper root directory. For example, 10.0.1.1:2181,10.0.1.2:2181,10.0.1.3:2181/solr would be a valid value.

This parameter is used when running in SolrCloud mode, and allows the output of the ingest process to be routed via ZooKeeper to any available node. If you are not running in SolrCloud mode, use the -s argument instead.

-redcls

The class name of a custom IngestReducer, if any. In order for this to be invoked, you must also set -ur to a value higher than 0. If no value is specified, then the default reducer is used, which is com.lucidworks.hadoop.ingest.IngestReducer.

-ur

The number of reducers to use when outputting to the OutputFormat. Depending on the output format and your system resources, you may wish to have Hadoop do a reduce step so the output resource is not overwhelmed. The default is 0, which is to not use any reducers.

Summary of Argument Order

Using this example job jar argument:

bin/hadoop jar /opt/lucidworks-hdpsearch/job/lucidworks-hadoop-job.jar (1)

   com.lucidworks.hadoop.ingest.IngestJob (2)

   -Dlww.commit.on.close=true -DcsvDelimiter=| (3)

   -cls com.lucidworks.hadoop.ingest.CSVIngestMapper -c gettingstarted -i /data/CSV -of com.lucidworks.hadoop.io.LWMapRedOutputFormat -s http://localhost:8888/solr (4)

We can summarize the proper order as follows:

1 The Hadoop command to run a job.
2 The main ingest class.
3 Mapper arguments, which vary depending on the Mapper class chosen, in the format of -Dargument=value.
4 Key-value arguments, which include the ingest mapper, Solr collection name, and other parameters, in the format of -argument value.

Index Pipelines

Using an Index Pipeline

One of the arguments that can be provided to the Hadoop job jar is the path to an index pipeline configuration file.

Index pipelines take content and transform it into a document suitable for indexing to Solr via a series of operations called index stages. The stages are configured with stage-specific properties, which define how to perform the operations the stage supports.

The Hadoop job jar supports two types of index stages: an Apache Tika Parser stage and a Field Mapping stage.

Creating a Pipeline Configuration File

To use an index pipeline, you must create a pipeline configuration file with the properties for each stage. This file should be in JSON format, and include the following properties.

stages

The stages property should include an array with the definition of each stage, in the order they will be run. The valid properties for each stage depend on the type chosen.

pipeline

The pipeline property defines the ID of the pipeline, in a sub-property named id. The pipeline can have any ID you choose. At this time, it’s not used by the job jar.

Here is a sample pipeline configuration file showing two stage definitions:

{
"stages": [
   {
   "id": "tika1",
   "type": "tika-parser",
   "includeImages": true,
   "flattenCompound": false,
   "addFailedDocs": true,
   "addOriginalContent": true,
   "contentField": "_raw_content_"
   },
   {
   "id": "st2",
   "type": "field-mapping",
   "mappings": [
      {
      "source": "Content-Encoding",
      "target": "characterSet_s",
      "operation": "MOVE"
      },
      {
      "source": "Content-Encoding",
      "target": "characterSet_s",
      "operation": "MOVE"
      },],
   "unmapped": {
      "source": "/(.*)/",
      "target": "attr_$1_",
      "operation": "MOVE"
      }
   }
   ],
   "pipeline": {
      "id": "pipe1"
   }
}

Calling the Pipeline Configuration in a Job

When defining the job, the path to the pipeline definition file is passed with the -Dpipeline.uri property. For example, to define a job that uses a tika-pipeline.conf file to configure a pipeline, it would be referenced in the job as so:

bin/hadoop jar /opt/lucidworks-hdpsearch/job/lucidworks-hadoop-job.jar com.lucidworks.hadoop.ingest.IngestJob -Dlww.commit.on.close=true -DcsvDelimiter=| -Dpipeline.uri=/opt/lucidworks-hdpsearch/job/pipeline-example/tika-pipeline.conf -cls com.lucidworks.hadoop.ingest.CSVIngestMapper -c collection1 -i /data/CSV -of com.lucidworks.hadoop.io.LWMapRedOutputFormat -s http://localhost:8888/solr

There’s a lot going on there, described in detail in the section Job Jar Arguments, but you see one of the properties is -Dpipeline.uri=/opt/lucidworks-hdpsearch/job/pipeline-example/tika-pipeline.conf, which defines the location of the configuration file.

You can store pipeline configurations in HDFS and refer to it using hdfs:// instead of a local filesystem path.

Sample Pipeline Configuration Files

The HDPSearch package includes two sample stages in the /opt/lucidworks-hdpsearch/job/pipeline-example directory. The examples are intended as starting points for customization.

  • tika-pipeline.conf: includes a sample Apache Tika Parser stage and a Field Mapper stage.

  • fieldmapping-pipeline.conf: includes a sample Field Mapper stage.

The two types of stages available are described in more detail below.

Apache Tika Parser Stage

The Apache Tika Parser index stage allows using Apache Tika to parse documents. The job jar uses Apache Tika v1.4.

Tika Parser Properties

The Apache Tika Parser takes the following properties.

id

A unique identifier for this stage.

type

The stage type, which should be tika-parser.

includeImages

If images are found in documents, this property defines if they should be included with the content or omitted. The default is false, which will omit the documents during parsing.

flattenCompound

This property defines how to handle compound files, such as .zip files or other types of archives, including .tar and .tgz files. When true, all embedded content will be processed together with the main content, and presented as a single document.

When false, the parser will traverse the archive and compound files (e.g. office files with embedded parts, emails with attachments, etc) and process each embedded resource as a separate document. The default is false.

addFailedDocs

If documents fail parsing for any reason (perhaps the document is corrupt, or not a supported format), they can be added to the index anyway (or, more accurately, passed to the next stage of the pipeline) by changing this property to true. The default is false.

addOriginalContent

If true, the default, then the raw content of each document will be included as a field with the name defined in the contentField property in the parsed document. If you do not want to store the raw content of each document, then you could change this to false.

contentField

If the raw content of each document will be stored, this property defines the name of the field to use to store the data.

Sample Tika Parser Stage

This sample Tika Parser stage is included in the example pipeline tika-pipeline.conf in the pipeline-example directory.

{
   "id": "tika1",
   "type": "tika-parser",
   "includeImages": true,  (1)
   "flattenCompound": false, (2)
   "addFailedDocs": true,   (3)
   "addOriginalContent": true, (4)
   "contentField": "_raw_content_" (5)
}
1 Images found in documents will be included.
2 Compound files will be expanded, and files found in archives will be indexed as individual files.
3 Documents that fail parsing will be passed to the next stage of the pipeline.
4 The raw bytes of the original file will be retained as a field of the parsed document.
5 The raw bytes will be stored in a field named raw_content.

Field Mapping Stage

The Field Mapping index stage type allows mapping fields found in documents to other fields. You may want to do this so your documents conform to the schema you have already defined in Solr. This ensures consistency in the Solr index and makes forming queries more predictable.

You can also use field mapping with dynamic fields (such as if you are running in Schemaless Mode), and this would allow you to enforce field types on your data. For example, if you have a document that includes the field lastModifiedDate, you could map this to a dynamic field rule *_dt. Any documents from other repositories that include dates in different field names could be similarly mapped to the same dynamic field rule. This ensures that date fields, no matter how they appear in your content, behave with the same data type rules while indexing and for queries.

This stage should nearly always be placed after the Apache Tika Parser stage in a pipeline. The Apache Tika Parser stage does do some field manipulation during parsing, so performing field mapping after parsing ensures a higher level of control and consistency in your documents.

Field Mapping Behavior

When using the field mapping stage, you should consider the fact that the field mapping rules are applied in a specific order.

  1. The rules are traversed only once, in the order of their declaration in the mappings property. This means it is possible to do multiple operations on a field. However, note that if fields are moved, further operations should reference the new field name.

  2. Before each rule is evaluated, the current list of field names is prepared and sorted in alphabetic ascending order.

  3. The current rule is applied to field values for each matching name from the list of names prepared in step 2. New field names resulting from the current rule do not effect the snapshot list of field names; in order for a rule to be applied to a new field name, it will be included in a later round of the evaluation cycle.

  4. The process is repeated for each rule, and a list of matching source fields is noted.

  5. If the document contains any fields that were not affected by any mapping rule, the rules defined in the unmapped property are applied.

  6. Finally, the resulting transformed document is returned to the next stage of the indexing pipeline.

Properties of a Field Mapping Stage

The Field Mapping stage provides a great deal of flexibility to indexing operations by allowing field transformation to occur during indexing. Fields can be moved, copied, deleted, added, or transformed with regular expressions.

The properties of the field mapping stage are as follows.

id

A unique identifier for this stage.

type

The stage type, which should be field-mapping.

mappings

The mappings section defines how to modify incoming fields. These are defined as triples of {source,target,operation}, which define the source field name, the target field name, and the operation to perform.

source

The name of the source field in the incoming document that should be mapped to another field.

Java regular expressions can be used in the source field by surrounding the regular expression with backward slashes (/). For example, /(.)text(.)/ is a valid expression to find field names in the incoming document that contain the string text between any number of preceding or succeeding characters.

If a regular expression is not used, the value supplied for the source will be treated as a literal field name and will be matched ignoring the case (e.g., 'text' will match 'tExt' or 'Text', etc.).

target

The name of the target field.

If the value for the source was a regular expression, then this can also be a regular expression. It can also contain substitutions using references to capture groups (using Java’s Matcher.replaceAll).

Otherwise, the source field name will be simply substituted by the value of target, according to the operation rules described below.

operation

The operation defines what you want to do with the field. Several operations are supported.

copy

This operation copies the content in the source field into the target field. Content in the source field is retained.

move

This operation moves content in the source field to the target field. Content in the source field is discarded. This has the same effect as renaming field.

delete

This operation removes content from the source field and discards it. A target property does not need to be defined when using this operation.

add

This operation has two behaviors depending on if regular expressions are used or not:

  • If the source includes a regular expression, the literal value of the target will be added to the source.

  • If source is not a regular expression, target will be added to the document as a new field.

set

This operation has two behaviors depending on if regular expressions are used or not:

  • If the source includes a regular expression, the literal value of the target will be set as the new value of source, replacing the original content.

  • If source is not a regular expression, target will be set as a new field.

keep

This operation will retain the content in the source unchanged. The field will be added to a list of known fields which are unaffected by any rules defined in the unmapped properties section.

unmapped

The unmapped section defines how to deal with fields that do not already exist in the schema and are also not covered by the rules in the mappings section.

The unmapped section works the same as the mappings section, meaning that the mappings are defined as triples ({source,target,operation}) and the behavior of the source, target, and operation properties is identical.

Example Arguments

These are basic examples to demonstrate how to construct the job jar arguments for some content scenarios.

Index CSV files

To index CSV files, you could use the following arguments:

bin/hadoop jar /opt/lucidworks-hdpsearch/job/lucidworks-hadoop-job.jar \ (1)
com.lucidworks.hadoop.ingest.IngestJob \ (2)
-Dlww.commit.on.close=true \ (3)
-DcsvDelimiter=| \ (4)
-cls com.lucidworks.hadoop.ingest.CSVIngestMapper \ (5)
-c gettingstarted \ (6)
-i /data/CSV \ (7)
-of com.lucidworks.hadoop.io.LWMapRedOutputFormat \ (8)
-s http://localhost:8888/solr (9)

To explain in more detail, here is a breakdown of each parameter:

1 Use the hadoop binary with the jar command and supply the path to the lucidworks-hadoop-job.jar.
2 Define the main class which is always com.lucidworks.hadoop.ingest.IngestJob.
3 Commit the documents when finished.
4 Define the delimiter character as a pipe (|).
5 Define the mapper class, in this case -cls com.lucidworks.hadoop.ingest.CSVIngestMapper.
6 Name the collection in Solr to index the documents to.
7 Define the name and path of the file(s) to be indexed. Again, this path should be to the location of the file(s) in HDFS.
8 Define the output format, which is always com.lucidworks.hadoop.io.LWMapRedOutputFormat.
9 Provide the location of Solr. We’re not using SolrCloud, so the LucidWorks Solr is found at: -s http://localhost:8888/solr

Index a Directory of Files with SolrCloud

bin/hadoop jar /opt/lucidworks-hdpsearch/job/lucidworks-hadoop-job.jar \ (1)
com.lucidworks.hadoop.ingest.IngestJob \ (2)
-Dlww.commit.on.close=true \ (3)
-cls com.lucidworks.hadoop.ingest.DirectoryIngestMapper \ (4)
-c myCollection \ (5)
-i /data/files \ (6)
-of com.lucidworks.hadoop.io.LWMapRedOutputFormat \ (7)
-zk 10.0.1.7:2181,10.0.1.8:2181,10.0.1.9:2181/solr (8)

In this example, we have defined the job very similarly to the previous example. To step through it line-by-line:

1 Use the hadoop binary with the jar command and supply the path to the lucidworks-hadoop-job.jar.
2 Define the main class which is always com.lucidworks.hadoop.ingest.IngestJob.
3 Commit the documents when finished.
4 Use the Directory Ingest mapper class, which allows the job jar to traverse a directory of files for indexing.
5 Name the myCollection as the collection in Solr where the documents will be indexed.
6 Point the job jar to the input directory (/data/files).
7 Define the output format, which is always com.lucidworks.hadoop.io.LWMapRedOutputFormat.
8 Provide the location of ZooKeeper, because in this case we are using SolrCloud. This uses the -zk parameter to define the ZooKeeper connection string. The host:port locations should be separated by commas, followed by the root directory. In this case, the ZooKeeper root is /solr, but another root directory may have been defined during initial startup of Solr.

Pig Store/Load Functions

If you use Apache Pig to do large-scale data processing, you can use Pig scripts during indexing of content to Solr. For instance, you can use Pig to do large-scale preprocessing and joining of data and then output the resulting datasets to Solr so that you can more effectively query that data. This section describes the Pig Functions and how to use them.

Pig Function Jars

Due to backward compatibility issues between how Pig works with different versions of Hadoop, there are two available Pig function jars. The jar chosen must be compatible with the version of Hadoop you are using.

  • lucidworks-pig-functions-{version}-hd1.jar: Use this when working with Hadoop 1.x versions.

  • lucidworks-pig-functions-{version}-hd2.jar: Use this when working with Hadoop 2.x versions.

The appropriate Pig function jar must be put in HDFS in order to be used by your Pig script. It can be located anywhere in HDFS; you will supply the path to the proper jar when invoking your script.

Available Functions

The Pig functions included in the solr-pig-functions-.jar are three UserDefined Functions (UDF) and one Store function. These functions are:

  • com/lucidworks/hadoop/pig/SolrStoreFunc.class

  • com/lucidworks/hadoop/pig/EpochToCalendar.class

  • com/lucidworks/hadoop/pig/Extract.class

  • com/lucidworks/hadoop/pig/Histogram.class

Using the Functions

There are two approaches to using functions in Pig: to REGISTER them in the script, or to load them with your Pig command line request.

If using REGISTER, the Pig function jars must be put in HDFS in order to be used by your Pig script. It can be located anywhere in HDFS; you can either supply the path in your script or use a variable and define the variable with -p property definition.

The example below uses the second approach, loading the .jars with -Dpig.additional.jars system property when launching the script. With this approach, the .jars can be located anywhere on the machine where the script will be run.

When using the Pig functions, you can pass parameters for your script on the command line. The properties you will need to pass are the location of Solr and the collection to use; these are shown in detail in the example below.

Indexing to a Kerberos-Secured Solr Cluster

When a Solr cluster is secured with Kerberos for internode communication, Pig scripts must include the full path to a JAAS file that includes the service principal and the path to a keytab file that will be used to index the output of the script to Solr.

Two parameters provide the information the script needs to access the JAAS file:

lww.jaas.file

The path to the JAAS file that includes a section for the service principal who will write to the Solr indexes. For example, to use this property in a Pig script:

set lww.jaas.file '/opt/${namePackage}/conf/login.conf';

The JAAS configuration file must be copied to the same path on every node where a Node Manager is running (i.e., every node where map/reduce tasks are executed).

lww.jaas.appname

The name of the section in the JAAS file that includes the correct service principal and keytab path. For example, to use this property in a Pig script:

set lww.jaas.appname 'Client';

Here is a sample section of a JAAS file:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (2)
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (3)
};
1 The name of this section of the JAAS file. This name will be used with the lww.jaas.appname parameter.
2 The location of the keytab file.
3 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and Pig.

Sample CSV Script

The following Pig script will take a simple CSV file and index it to Solr.

set solr.zkhost '$zkHost';
set solr.collection '$collection'; (1)

A = load '$csv' using PigStorage(',') as (id_s:chararray,city_s:chararray,country_s:chararray,code_s:chararray,code2_s:chararray,latitude_s:chararray,longitude_s:chararray,flag_s:chararray); (2)
--dump A;
B = FOREACH A GENERATE $0 as id, 'city_s', $1, 'country_s', $2, 'code_s', $3, 'code2_s', $4, 'latitude_s', $5, 'longitude_s', $6, 'flag_s', $7; (3)

ok = store B into 'SOLR' using com.lucidworks.hadoop.pig.SolrStoreFunc(); (4)

This relatively simple script is doing several things that help to understand how the Solr Pig functions work.

1 This and the line above define parameters that are needed by SolrStoreFunc to know where Solr is. SolrStoreFunc needs the properties solr.zkhost and solr.collection, and these lines are mapping the zkhost and collection parameters we will pass when invoking Pig to the required properties.
2 Load the CSV file, the path and name we will pass with the csv parameter. We also define the field names for each column in CSV file, and their types.
3 For each item in the CSV file, generate a document id from the first field ($0) and then define each field name and value in name, value pairs.
4 Load the documents into Solr, using the SolrStoreFunc. While we don’t need to define the location of Solr here, the function will use the zkhost and collection properties that we will pass when we invoke our Pig script.
When using SolrStoreFunc, the document ID must be the first field.

When we want to run this script, we invoke Pig and define several parameters we have referenced in the script with the -p option, such as in this command:

./bin/pig -Dpig.additional.jars=/path/to/solr-pig-functions.jar -p csv=/path/to/my/csv/airports.dat -p zkHost=zknode1:2181,zknode2:2181,zknode3:2181/solr -p collection=myCollection ~/myScripts/index-csv.pig

The parameters to pass are:

csv

The path and name of the CSV file we want to process.

zkhost

The ZooKeeper connection string for a SolrCloud cluster, in the form of zkhost1:port,zkhost2:port,zkhost3:port/chroot. In the script, we mapped this to the solr.zkhost property, which is required by the SolrStoreFunc to know where to send the output documents.

collection

The Solr collection to index into. In the script, we mapped this to the solr.collection property, which is required by the SolrStoreFunc to know the Solr collection the documents should be indexed to.

The zkhost parameter above is only used if you are indexing to a SolrCloud cluster, which uses ZooKeeper to route indexing and query requests.

If, however, you are not using SolrCloud, you can use the solrUrl parameter, which takes the location of a standalone Solr instance, in the form of http://host:port/solr.

In the script, you would change the line that maps solr.zkhost to the zkhost property to map solr.server.url to the solrUrl property. For example:

set solr.server.url '$solrUrl';

Hive SerDe

The Lucidworks Hive SerDe allows reading and writing data to and from Solr using Apache Hive. Data from Solr can be presented as a Hive table to be joined with other Hive tables, and data. Additionally, data from Hive can be inserted into Solr with an INSERT statement. This section describes the Hive SerDe and how to use it.

Hive SerDe Jars

Due to backward compatibility issues between releases of Hive, there are two available Hive SerDe jars. The jar chosen must be compatible with the version of Hive you are using.

  • lucidworks-hive-serde-{version}.jar: This jar has been tested for use with Hive 0.12, 0.14, and 0.15. Do not use this jar if you are using Hive 0.13.

  • lucidworks-hive-serde-0.13-{version}.jar: This jar is for use with Hive 0.13.0 only.

Install the SerDe Jar to Hive

In order for Hive to work with Solr, the Hive SerDe jar must be added as a plugin to Hive.

From a Hive prompt, use the ADD JAR command and reference the path and filename of the SerDe jar for your Hive version.

hive> ADD JAR solr-hive-serde.jar;

This can also be done in your Hive command to create the table, as in the example below.

Create External Tables

An external table in Hive allows the data in the table to be used (read or write) by another system or application outside of Hive. For integration with Solr, the external table allows you to have Solr read from and write to Hive.

To create an external table for Solr, you can use a command similar to below. The properties available are described after the example.

hive> CREATE EXTERNAL TABLE solr (id string, field1 string, field2 int)
      STORED BY 'com.lucidworks.hadoop.hive.LWStorageHandler'
      LOCATION '/tmp/solr'
      TBLPROPERTIES('solr.server.url' = 'http://localhost:8888/solr',
                    'solr.collection' = 'collection1',
                    'solr.query' = '*:*');

In this example, we have created an external table named "solr", and defined a custom storage handler (STORED BY 'com.lucidworks.hadoop.hive.LWStorageHandler').

The LOCATION indicates the location in HDFS where the table data will be stored. In this example, we have chosen to use /tmp/solr.

In the section TBLPROPERTIES, we define several properties for Solr so the data can be indexed to the right Solr installation and collection:

solr.zkhost

The location of the ZooKeeper quorum if using LucidWorks in SolrCloud mode. If this property is set along with the solr.server.url property, the solr.server.url property will take precedence.

solr.server.url

The location of the Solr instance if not using LucidWorks in SolrCloud mode. If this property is set along with the solr.zkhost property, this property will take precedence.

solr.collection

The Solr collection for this table. If not defined, an exception will be thrown.

solr.query

The specific Solr query to execute to read this table. If not defined, a default of *:* will be used. This property is not needed when loading data to a table, but is needed when defining the table so Hive can later read the table.

lww.commit.on.close

If true, inserts will be automatically committed when the connection is closed. True is the default.

lww.jaas.file

Used only when indexing to or reading from a Solr cluster secured with Kerberos.

This property defines the path to a JAAS file that contains a service principal and keytab location for a user who is authorized to read from and write to Solr and Hive.

The JAAS configuration file must be copied to the same path on every node where a Node Manager is running (i.e., every node where map/reduce tasks are executed). Here is a sample section of a JAAS file:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (2)
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (3)
};
1 The name of this section of the JAAS file. This name will be used with the lww.jaas.appname parameter.
2 The location of the keytab file.
3 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and Hive.
lww.jaas.appname

Used only when indexing to or reading from a Solr cluster secured with Kerberos.

This property provides the name of the section in the JAAS file that includes the correct service principal and keytab path.

If the table needs to be dropped at a later time, you can use the DROP TABLE command in Hive. This will remove the metadata stored in the table in Hive, but will not modify the underlying data (in this case, the Solr index).

Query and Insert Content

Once the table is configured, any syntactically correct Hive query will be able to query the Solr index.

For example, to select three fields named "id", "field1", and "field2" from the "solr" table, you would use a query such as:

hive> SELECT id, field1, field2 FROM solr;

To join data from tables, you can make a request such as:

hive> SELECT id, field1, field2 FROM solr left
      JOIN sometable right
      WHERE left.id = right.id;

And finally, to insert data to a table, simply use the Solr table as the target for the Hive INSERT statement, such as:

hive> INSERT INTO solr
      SELECT id, field1, field2 FROM sometable;

Example Hive Table

Solr includes a small number of sample documents for use when getting started. One of these is a CSV file containing book metadata. This file is found in your Solr installation, at $SOLR_HOME/example/exampledocs/books.csv.

Using the sample books.csv file, we can see a detailed example of creating a table, loading data to it, and indexing that data to Solr.

CREATE TABLE books (id STRING, cat STRING, title STRING, price FLOAT, in_stock BOOLEAN, author STRING, series STRING, seq INT, genre STRING) ROW FORMAT DELIMITED FIELDS TERMINATED BY ','; (1)

LOAD DATA LOCAL INPATH '/solr/example/exampledocs/books.csv' OVERWRITE INTO TABLE books; (2)

ADD JAR solr-hive-serde.jar; (3)

CREATE EXTERNAL TABLE solr (id STRING, cat_s STRING, title_s STRING, price_f FLOAT, in_stock_b BOOLEAN, author_s STRING, series_s STRING, seq_i INT, genre_s STRING) (4)
     STORED BY 'com.lucidworks.hadoop.hive.LWStorageHandler' (5)
     LOCATION '/tmp/solr' (6)
     TBLPROPERTIES('solr.zkhost' = 'zknode1:2181,zknode2:2181,zknode3:2181/solr',
                   'solr.collection' = 'gettingstarted',
                   'solr.query' = '*:*'),
                   'lww.jaas.file' = '/data/jaas-client.conf'; (7)


INSERT OVERWRITE TABLE solr SELECT b.* FROM books b;
1 Define the table books, and provide the field names and field types that will make up the table.
2 Load the data from the books.csv file.
3 Add the lucidworks-hive-serde.jar file to Hive. Note the jar name shown here omits the version information which will be included in the jar file you have. If you are using Hive 0.13, you must also use a jar specifically built for 0.13.
4 Create an external table named solr, and provide the field names and field types that will make up the table. These will be the same field names as in your local Hive table, so we can index all of the same data to Solr.
5 Define the custom storage handler provided by the lucidworks-hive-serde.jar.
6 Define storage location in HDFS.
7 Define the location of Solr (or ZooKeeper if using SolrCloud), the collection in Solr to index the data to, and the query to use when reading the table. This example also refers to a JAAS configuration file that will be used to authenticate to the Kerberized Solr cluster.

HBase Indexer

The HBase Indexer provides the ability to stream events from HBase to Solr for near real time searching.

How the HBase Indexer Works

The HBase indexer works as an HBase replication sink and runs as a daemon. When updates are written to HBase region servers, they are replicated to the HBase indexer processes. These processes then pass the updates to Solr for indexing.

There are several configuration files to prepare before using the HBase indexer. Then a specific configuration for your your HBase table must be supplied to map HBase rows to Solr documents.

If you are already using HBase replication, the HBase Indexer process should not interfere.

HBase Indexer Configuration

Each of the following configuration changes need to be made to use the HBase indexer.

These instructions assume Solr is running and a collection already exists for the data to be indexed. Solr does not need to be restarted to start using the HBase indexer.

Configure hbase-indexer-site.xml

The first step to configuring the HBase indexer is to define the ZooKeeper connectstring and quorum location in hbase-indexer-site.xml, found in /opt/lucidworks-hdpsearch/hbase-indexer/conf.

The property to define is the hbaseindexer.zookeeper.connectstring, as in this example:

<configuration>
   <property>
      <name>hbaseindexer.zookeeper.connectstring</name>
      <value>server1:2181,server2:2181,server3:2181</value>
   </property>
   <property>
      <name>hbase.zookeeper.quorum</name>
      <value>server1,server2,server3</value>
   </property>
</configuration>

Configure hbase-site.xml

The hbase-site.xml file on each HBase node needs to be modified to include several new properties, as shown below:

<configuration>
   <!-- SEP is basically replication, so enable it -->
   <property>
      <name>hbase.replication</name>
      <value>true</value>
   </property>
   <property>
      <name>replication.source.ratio</name>
      <value>1.0</value>
   </property>
   <property>
      <name>replication.source.nb.capacity</name>
      <value>1000</value>
   </property>
   <property>
      <name>replication.replicationsource.implementation</name>
      <value>com.ngdata.sep.impl.SepReplicationSource</value>
   </property>
</configuration>

In more detail, these properties are:

  • hbase.replication: True or false; enables replication.

  • replication.source.ratio: Source ratio of 100% makes sure that each SEP consumer is actually used (otherwise, some can sit idle, especially with small clusters).

  • replication.source.rb: The maximum number of hlog entries to replicate in one go. If this is large, and a consumer takes a while to process the events, the HBase rpc call will time out.

  • replication.replicationsource.implementation: A custom replication source for indexing content to Solr.

Once these values have been changed, HBase will need to be restarted. However, the next step requires copying some JAR files, so wait until all steps are done before restarting.

If you use Ambari to manage your cluster, you can set the properties by going to the HBase Configs screen at Ambari → Configs → Advanced tab → Custom hbase-site.

Copy JAR Files

The SEP replication being used by HBase indexer requires 4 jar files to be copied from to each HBase node.

These jar files can be found in the /opt/lucidworks-hdpsearch/hbase-indexer/lib directory.

They need to be copied to the $HBASE_HOME/lib directory on each node running HBase. These files are:

  • hbase-sep-api-{version}.jar

  • hbase-sep-impl-{version}.jar

  • hbase-sep-impl-common-{version}.jar

  • hbase-sep-tools-{version}.jar

Enable Kerberos Support

If you want to content to a Solr cluster that has been secured with Kerberos for internode communication, you will need to supply the path to a JAAS file that includes a section for a service principal and keytab file for a user who has access to both HBase and Solr. This user should be a different user than the service principal that Solr is using for internode communication.

To configure HBase Indexer to be able to write to the Kerberized Solr, you will modify the hbase-indexer script found in /opt/lucidworks-hdpsearch/hbase-indexer/bin. The properties are commented out by default:

#HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.file="
#HBASE_INDEXER_OPTS="$HBASE_INDEXER_OPTS -Dlww.jaas.appname="

Remove the # to uncomment these lines, and supply values for each property:

-Dlww.jaas.file

The full path to a JAAS configuration file that includes a section to define the keytab location and service principal that will be used to write index updates to Solr.

Here is a sample section of a JAAS file:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/solr-indexer.keytab" (2)
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr-indexer@SOLRSERVER.COM"; (3)
};
1 The name of this section of the JAAS file. This name will be used with the lww.jaas.appname parameter as described below.
2 The location of the keytab file.
3 The service principal name. This should be a different principal than the one used for Solr, but must have access to both Solr and HBase.

Restart HBase

Once each of the above changes have been made, restart HBase on all nodes.

Copy hbase-site.xml to HBase Indexer

Once you have added the new properties to hbase-site.xml, copy the file from /etc/hbase/conf to /opt/lucidworks-hdpsearch/hbase-indexer/conf.

This step isn’t always required, but if you have problems with table data being indexed to Solr, it’s likely the HBase indexer is having problems connecting to ZooKeeper. Copying this file will ensure all of the ZooKeeper parameters are available to the HBase indexer.

Start HBase Indexer Daemon

When configuration is complete and HBase has been restarted, you can start the HBase indexer.

From /opt/lucidworks-hdpsearch/hbase-indexer/bin, run:

hbase-indexer server &

The & portion of this command will run the server in the background. If you would like to run it in the foreground, omit this part of the above example.

At this point you are ready to start indexing content.

Stream Data from HBase Indexer to Solr

The HBase indexer replicates content being written to HBase and streams it to Solr for indexing.

The HBase table that will be indexed must have the REPLICATION_SCOPE set to "1".

If you are not familiar with HBase, the HBase Indexer tutorial provides a good starting point for creating a simple table and indexing it to Solr at https://github.com/NGDATA/hbase-indexer/wiki/Tutorial.

Add an Indexer

In order to process HBase events, an indexer must be created.

First you need to create an indexer configuration file, which is a simple XML file to tell the HBase Indexer how to map HBase columns to Solr fields, For example:

<?xml version="1.0"?>
<indexer table="indexdemo-user">
   <field name="firstname_s" value="info:firstname"/>
   <field name="lastname_s" value="info:lastname"/>
   <field name="age_i" value="info:age" type="int"/>
</indexer>

Note that this defines the Solr field name, then the HBase column, and optionally, the field type. The indexer-table value must also reflect the name of the table you intend to index.

More details on the indexer configuration options are available from https://github.com/NGDATA/hbase-indexer/wiki/Indexer-configuration.

Start an Indexer Process

Once we have an indexer configuration file, we can then start the indexer itself with the add-indexer command.

This command takes several properties, as in this example:

./hbase-indexer add-indexer \ (1)
  -n myindexer \ (2)
  -c indexer-conf.xml \ (3)
  -cp solr.zk=server1:3181,server2:3181,server3:3181 \ (4)
  -cp solr.collection=myCollection (5)
1 The hbase-indexer script is found in /opt/lucidworks-hdpsearch/hbase-indexer/bin directory. The add-indexer command adds the indexer to the running hbase-indexer daemon.
2 The -n property provides a name for the indexer.
3 The -c property defines the location of the indexer configuration file. Provide the path as well as the filename if you are launching the
4 The -cp property is a key-value pair. In this case, it defines the location of ZooKeeper with the solr.zk property.
5 Another -cp key-value pair, which defines the solr.collection property with a value of a collection name in Solr that the documents should be indexed to. This collection must exist prior to running the indexer.

More details on the options for add-indexer are available from https://github.com/NGDATA/hbase-indexer/wiki/CLI-tools.

Storm Bolt

When you want to integrate Solr with Storm, Lucidworks has provided more than a bolt, but instead a framework for building a custom topology.

The storm-solr Repository

Lucidworks has provided a toolset for creating a custom Apache Storm topology to simplify integrating Storm and Solr. This toolset helps to resolve several questions that you should address during development.

We have provided a clone of the storm-solr Github repository found at https://github.com/LucidWorks/storm-solr.

You should update this repository on your local machine with a simple git pull request.

Build the Jar

First, build the Jar for this project:

mvn clean package

This will generate the target/storm-solr-1.0.jar that contains the Storm JAR needed to deploy a topology.

Packaging and Running a Storm Topology

To begin, let’s understand how to run a topology in Storm. Effectively, there are two basic modes of running a Storm topology: local and cluster mode. Local mode is great for testing your topology locally before pushing it out to a remote Storm cluster, such as staging or production. For starters, you need to compile and package your code and all of its dependencies into a single JAR with an main class that runs your topology.

For this project, the Maven Shade plugin is used to create a unified JAR with dependencies. The benefit of the Shade plugin is that it can relocate classes into different packages at the byte-code level to avoid dependency conflicts. This comes in quite handy if your application depends on 3rd party libraries that conflict with classes on the Storm classpath. You can look at the project pom.xml file for specific details about I use the Shade plugin. For now, let it suffice to say that the project makes it very easy to build a Storm JAR for your application. Once you have a unified JAR (storm-solr-1.0.jar), you’re ready to run your topology in Storm.

To run locally, you need to have some code that starts up a local cluster in the JVM, submits your topology, and then waits for some configurable amount of time before shutting it all back down, as shown in the following code:

LocalCluster cluster = new LocalCluster();
cluster.submitTopology(topologyName, stormConf, topologyFactory.build(this));
try {
   Thread.sleep(localRunSecs * 1000);
} catch (InterruptedException ie) { ... }
cluster.killTopology(topologyName);
cluster.shutdown();

However, if you want to run your topology in a remote cluster, such as production, you need to do something like:

StormSubmitter.submitTopology(topologyName, stormConf, topologyFactory.build(this));

Consequently, the first thing you’ll need is some way to decide whether to run locally or submit to a remote cluster using some command-line switch when invoking your application. You’ll also need some way to pass along environment specific configuration options. For instance, the Solr you want to index into will be different between local, staging, and production. Let it suffice to say that launching and configuring a Storm topology ends up requiring a fair amount of common boilerplate code. To address this need, our project provides a com.lucidworks.storm.StreamingApp class that:

  • Separates the process of defining a Storm topology from the process of running a Storm topology in different environments. This lets you focus on defining a topology for your specific requirements.

  • Provides a clean mechanism for separating environment-specific configuration settings.

  • Minimizes duplicated boilerplate code when developing multiple topologies and gives you a common place to insert reusable logic needed for all of your topologies.

To use StreamingApp, you simply need to implement the StormTopologyFactory interface, which defines the spouts and bolts in your topology:

public interface StormTopologyFactory {
   String getName();

   StormTopology build(StreamingApp app) throws Exception;
}

Developing a Storm Topology

Let’s look at an example that indexes tweets from Twitter into SolrCloud.

TwitterToSolrTopology

class TwitterToSolrTopology implements StormTopologyFactory {
   static final Fields spoutFields = new Fields("id", "tweet")

   String getName() { return "twitter-to-solr" }

   StormTopology build(StreamingApp app) throws Exception {
      // setup spout and bolts for accessing Spring-managed POJOs at runtime
      SpringSpout twitterSpout = new SpringSpout("twitterDataProvider", spoutFields);
      SpringBolt solrBolt = new SpringBolt("solrBoltAction", app.tickRate("solrBolt"));

      // wire up the topology to read tweets and send to Solr
      TopologyBuilder builder = new TopologyBuilder()
      builder.setSpout("twitterSpout", twitterSpout, app.parallelism("twitterSpout"))
      builder.setBolt("solrBolt", solrBolt, numShards).customGrouping("twitterSpout", shardGrouping)

   return builder.createTopology()
  }
}

A couple of things should stand out to you in this listing. First, there’s no command-line parsing, environment-specific configuration handling, or any code related to running this topology. All that is here is code defining a StormTopology.

Second, the code is quite easy to understand because it only does one thing.

Lastly, this class is written in Groovy instead of Java, which helps keep things nice and tidy. Of course if you don’t want to use Groovy, you can use Java, as the framework supports both seamlessly.

We’ll get into the specific details of the implementation shortly, but first, let’s see how to run the TwitterToSolrTopology using the StreamingApp framework. For local mode, you would do:

java -classpath $STORM_HOME/lib/*:target/storm-solr-1.0.jar com.lucidworks.storm.StreamingApp \
   example.twitter.TwitterToSolrTopology -localRunSecs 90

The command above will run the TwitterToSolrTopology for 90 seconds on your local workstation and then shutdown. All the setup work is provided by the StreamingApp class.

To submit to a remote cluster, you would do:

$STORM_HOME/bin/storm jar target/storm-solr-1.0.jar com.lucidworks.storm.StreamingApp \
   example.twitter.TwitterToSolrTopology -env staging

Notice that we use the -env flag to indicate running in a staging environment. It’s common to need to run a Storm topology in different environments, such as test, staging, and production, so it’s built into the StreamingApp framework.

SpringBolt

The com.lucidworks.storm.spring.SpringBolt class allows you to implement your bolt logic as a simple Spring-managed POJO. In the example above, the SpringBolt class delegates message processing logic to a Spring-managed bean with id solrBoltAction. The solrBoltAction bean is defined in the Spring container configuration file resources/spring.xml as:

  <bean id="solrBoltAction" class="com.lucidworks.storm.solr.SolrBoltAction">
    <property name="batchSize" value="100"/>
    <property name="bufferTimeoutMs" value="1000"/>
  </bean>

The SpringBolt framework provides clean separation of concerns and allows you to leverage the full power of the Spring framework for developing your Storm topology. Moreover, this approach makes it easier to test your bolt action logic in JUnit outside of the Storm framework.

The SolrBoltAction bean also depends on an instance of the CloudSolrClient class from SolrJ to be auto-wired by the Spring framework:

 <bean id="cloudSolrClient" class="shaded.apache.solr.client.solrj.impl.CloudSolrClient">
   <constructor-arg index="0" value="${zkHost}"/>
   <property name="defaultCollection" value="${defaultCollection}"/>
</bean>

The zkHost and defaultCollection properties are defined in resources/Config.groovy

SpringSpout

In Storm, a spout produces a stream of tuples. The TwitterToSolrTopology example uses an instance of SpringSpout and a Twitter data provider to stream tweets into the topology:

SpringSpout twitterSpout = new SpringSpout("twitterDataProvider", spoutFields);

SpringSpout allows you to focus on the application-specific logic needed to generate data without having to worry about Storm specific implementation details. As you might have guessed, the data provider is a Spring-managed POJO that implements the StreamingDataProvider interface:

public interface StreamingDataProvider {
   void open(Map stormConf);

 boolean next(NamedValues record) throws Exception;
}

Take a look at the TwitterDataProvider implementation provided in the project as a starting point for implementing a Spring-managed bean for your topology.

Unit Testing

When writing a unit test, you don’t want to have to spin up a Storm cluster to test application-specific logic that doesn’t depend on Storm. Recall that one of the benefits of using this framework is that it separates business logic from Storm boilerplate code.

Let’s look at some code from the unit test for our SolrBoltAction implementation.

@Test
public void testBoltAction() throws Exception {
  // Spring @Autowired property at runtime
  SolrBoltAction sba = new SolrBoltAction(cloudSolrServer);
  sba.setMaxBufferSize(1); // to avoid buffering docs

  // Mock the Storm tuple
  String docId = "1";
  TestDoc testDoc = new TestDoc(docId, "foo", 10);
  Tuple mockTuple = mock(Tuple.class);
  when(mockTuple.size()).thenReturn(2);
  when(mockTuple.getString(0)).thenReturn(docId);
  when(mockTuple.getValue(1)).thenReturn(testDoc);
  SpringBolt.ExecuteResult result = sba.execute(mockTuple, null);
  assertTrue(result == SpringBolt.ExecuteResult.ACK);
  cloudSolrServer.commit();
  ...
}

The first thing to notice is the unit test doesn’t need a Storm cluster to run. This makes tests run quickly and helps isolate bugs since there are fewer runtime dependencies in this test.

It’s also important to notice that the SolrBoltAction implementation is not running in a Spring-managed container in this unit test. We’re just creating the instance directly using the new operator. This is good test design as well since you don’t want to create a Spring container for every unit test and testing the Spring configuration is not the responsibility of this particular unit test.

The unit test is also using Mockito to mock the Storm Tuple object that is passed into SolrBoltAction. Mockito makes it easy to mock complex objects in a unit test.

The key take-away here is that the unit test focuses on verifying the SolrBoltAction implementation without having to worry about Storm or Spring.

Environment-specific Configuration

Commonly, you will need to manage configuration settings for different environments. For instance, we’ll need to index into a different SolrCloud cluster for staging and production. To address this need, the Spring-driven framework allows you to keep all environment-specific configuration properties in the same configuration file: Config.groovy.

Don’t worry if you don’t know Groovy, the syntax of the Config.groovy file is easy to understand and allows you to cleanly separate properties for the following environments: test, dev, staging, and production. This approach allows you to run the topology in multiple environments using a simple command-line switch, -env, to specify the environment settings that should be applied.

Here’s an example of Config.groovy that shows how to organize properties for the test, development, staging, and production environments:

environments {

 twitterSpout.parallelism = 1
 csvParserBolt.parallelism = 2
 solrBolt.tickRate = 5

 maxPendingMessages = -1

 test {
   env.name = "test"
 }

 development {
   env.name = "development"

   spring.zkHost = "localhost:9983"
   spring.defaultCollection = "gettingstarted"
   spring.fieldGuessingEnabled = true

   spring.fs.defaultFS = "hdfs://localhost:9000"
   spring.hdfsDirPath = "/user/timpotter/csv_files"
   spring.hdfsGlobFilter = "*.csv"
 }

 staging {
   env.name = "staging"

   spring.zkHost = "zkhost:2181"
   spring.defaultCollection = "staging_collection"
   spring.fieldGuessingEnabled = false
 }

 production {
   env.name = "production"

   spring.zkHost = "zkhost1:2181,zkhost2:2181,zkhost3:2181"
   spring.defaultCollection = "prod_collection"
   spring.fieldGuessingEnabled = false
 }
}

Notice that all dynamic variables used in the resources/storm-solr-spring.xml must be prefixed with "spring." in Config.groovy. For instance, the ${zkHost} setting in storm-solr-spring.xml resolves to the spring.zkHost property in Config.groovy.

You can also configure all Storm-topology related properties in the Config.groovy file. For instance, if you need to change the topology.max.task.parallelism property for your topology, you can set that in Config.groovy.

When adapting the project to your own requirements, the easiest approach is to update resources/Config.groovy with the configuration settings for each of your environments and then rebuild the Job JAR.

However, you can also specify a different Config.groovy file by using the -config command-line option when deploying the topology, such as:

$STORM_HOME/bin/storm jar target/storm-solr-1.0.jar com.lucidworks.storm.StreamingApp \
   example.twitter.TwitterToSolrTopology -env staging -config MyConfig.groovy

Metrics

Storm provides high-level metrics for bolts and spouts, but if you need more visibility into the inner workings of your application-specific logic, then it’s common to use the Java metrics library, such as: https://dropwizard.github.io/metrics/3.1.0/. Fortunately, there are open source options for integrating metrics with Spring, see: https://github.com/ryantenney/metrics-spring.

The Spring context configuration file resources/storm-solr-spring.xml comes pre-configured with all the infrastructure needed to inject metrics into your bean implementations:

<metrics:metric-registry id="metrics"/>
<metrics:annotation-driven metric-registry="metrics"/>
<metrics:reporter type="slf4j" metric-registry="metrics" period="1m"/>

By default, the project is configured to log metrics once a minute to the Storm log using the slf4j reporter.

When implementing your StreamingDataAction (bolt) or StreamingDataProvider (spout), you can have Spring auto-wire metrics objects using the @Metric annotation when declaring metrics-related member variables. For instance, the SolrBoltAction class uses a Timer to track how long it takes to send batches to Solr:

@Metric
public Timer sendBatchToSolr;

The SolrBoltAction class provides several examples of how to use metrics in your bean implementations.

Before moving on to some Solr specific features in the framework, it’s important to remember one more point. The example Twitter topology we’ve been working with in this blog is quite trivial. In practice, most topologies are more complex and have many spouts and bolts, typically written by multiple developers. Moreover, topologies tend to evolve over time to incorporate data from new systems and requirements. Using this framework will help you craft complex topologies in a simple, maintainable fashion.

Field Mapping

The SolrBoltAction bean takes care of sending documents to SolrCloud in an efficient manner, but it only works with SolrInputDocument objects from SolrJ. It’s unlikely that your Storm topology will be working with SolrInputDocument objects natively, so the SolrBoltAction bean delegates mapping of input Tuples to SolrInputDocument objects to a Spring-managed bean that implements the com.lucidworks.storm.solr.SolrInputDocumentMapper interface. This fits nicely with our design approach of separating concerns in our topology.

The default implementation provided in the project (DefaultSolrInputDocumentMapper) uses Java reflection to read data from a Java object to populate the fields of the SolrInputDocument. In the Twitter example, the default implementation uses Java reflection to read data from a Twitter4J Status object to populate dynamic fields on a SolrInputDocument instance.

When using the default mapper, you must have dynamic fields enabled in your Solr schema.xml or have Solr’s field guessing feature enabled for your collection, which is enabled by default for the data_driven_schema_configs configset. The default mapper bean is defined in the resources/storm-solr-spring.xml file as:

  <bean id="solrInputDocumentMapper"
        class="com.lucidworks.storm.solr.DefaultSolrInputDocumentMapper">
    <property name="fieldGuessingEnabled" value="${fieldGuessingEnabled}"/>
  </bean>

As discussed above, the ${fieldGuessingEnabled} variable will be resolved from the Config.groovy configuration file at runtime.

It should be clear, however, that you can inject your own SolrInputDocumentMapper implementation into the bolt bean using Spring if the default implementation does not meet your needs.

Working on a Kerberized Environment

The HdfsFileSystemProvider bean needs the Kerberos credentials (keytab and principal). By default the authentication is set to SIMPLE.

  <bean id="hdfsFileSystemProvider" class="com.lucidworks.storm.example.hdfs.HdfsFileSystemProvider">
    <property name="hdfsConfig">
      <map>
        <entry key="fs.defaultFS" value="${fs.defaultFS:}"/>
        <entry key="hdfs.keytab.file" value="${hdfs.keytab.file:}"/>
        <entry key="hdfs.kerberos.principal" value="${hdfs.kerberos.principal:}"/>
        <entry key="hadoop.security.authentication" value="${hadoop.security.authentication:SIMPLE}"/>
      </map>
    </property>
  </bean>

The SolrSecurity bean needs the full path of jaas-client.conf (see https://cwiki.apache.org/confluence/display/solr/Security). By default, the file is not set and no authentication will be performed.

  <bean id="solrSecurity" class="com.lucidworks.storm.utils.SolrSecurity" init-method="setConfigigurer">
     <property name="solrJaasFile" value="${solrJaasFile:}"/>
     <property name="solrJaasAppName" value="${solrJaasAppName:}"/>
  </bean>

Environment-Specific Configuration Example

All the properties for the kerberized environment are optional.

  production {
    env.name = "production"

    spring.zkHost = "host1:2181,host2:2181,host3:2181/solr"
    spring.defaultCollection = "storm-collection"
    spring.fieldGuessingEnabled = false

    spring.maxBufferSize = 100
    spring.bufferTimeoutMs = 500

    spring.fs.defaultFS = "hdfs://namenode:port"
    spring.hdfsDirPath = "/path/to/dataset"
    spring.hdfsGlobFilter = "*.csv"

    spring.hdfs.keytab.file = "hdfs.keytab"
    spring.hdfs.kerberos.principal = "storm"
    spring.hadoop.security.authentication = "KERBEROS"

    spring.solrJaasFile = "/path/to/jaas-client.conf"
    spring.solrJaasAppName = "Client"
  }

Spark RDD

The Spark RDD provides a set of tools for reading data from Solr and indexing objects from Spark into Solr using SolrJ (a Java client for Solr).

The spark-solr Repository

Lucidworks has provided an SDK for creating a custom Apache Spark application to simplify integrating Spark and Solr.

We have provided a clone of the spark-solr Github repository found at https://github.com/LucidWorks/spark-solr.

You should update this repository on your local machine with a simple git pull request.

Availability on Maven Central

Build the Jars

First, build the Jar for this project:

mvn clean package -DskipTests

This will build 2 jars in the /target directory:

  • spark-solr-${VERSION}.jar

  • spark-solr-${VERSION}-shaded.jar

(where ${VERSION} will be something like 1.2.0-SNAPSHOT, for development builds) The first is what you’d want to use if you were using spark-solr in your own project. The second is what you’d use to submit one of the included example apps to Spark.

Example Applications

After the jars are built, let’s populate a SolrCloud index with tweets (be sure to update the command shown below with your Twitter API credentials):

Start Solr running in Cloud mode and create a collection named “socialdata” partitioned into two shards:

bin/solr -c && bin/solr create -c socialdata -shards 2
The remaining sections in this document assume Solr is running in cloud mode on port 8983 with embedded ZooKeeper listening on localhost:9983.

Also, to ensure you can see tweets as they are indexed in near real-time, you should enable auto soft-commits using Solr’s Config API. (tested with 5.4.0, but trunk as of Jan '15 is 6.0.0, which uses a newer set of API endpoints for the Config API and this command will need to be modified) Specifically, for this exercise, we’ll commit tweets every 2 seconds.

curl -X POST http://localhost:8983/solr/socialdata/config \
 -d '{"set-property":{"updateHandler.autoSoftCommit.maxTime":"2000"}}'

Now, let’s populate Solr with tweets using Spark streaming:

$SPARK_HOME/bin/spark-submit --master $SPARK_MASTER \
 --conf "spark.executor.extraJavaOptions=-Dtwitter4j.oauth.consumerKey=? -Dtwitter4j.oauth.consumerSecret=? -Dtwitter4j.oauth.accessToken=? -Dtwitter4j.oauth.accessTokenSecret=?" \
 --class com.lucidworks.spark.SparkApp \
 ./target/spark-solr-1.0-SNAPSHOT-shaded.jar \
 twitter-to-solr -zkHost localhost:9983 -collection socialdata

Replace $SPARK_MASTER with the URL of your Spark master server. If you don’t have access to a Spark cluster, you can run the Spark job in local mode by passing:

--master local[2]

However, when running in local mode, there is no executor, so you’ll need to pass the Twitter credentials in the spark.driver.extraJavaOptions parameter instead of spark.executor.extraJavaOptions.

Tweets will start flowing into Solr; be sure to let the streaming job run for a few minutes to build up a few thousand tweets in your socialdata collection. You can kill the job using ctrl-C.

The sample command above includes properties for Twitter API credentials which need to be provided by you.

If you don’t already have your Twitter API credentials, you will need to set up a Twitter app (https://apps.twitter.com) and get your user and access tokens.

Working at the Spark Shell

Let’s start up the Spark Scala REPL shell to do some interactive data exploration with our indexed tweets:

cd $SPARK_HOME
bin/spark-shell --jars $PROJECT_HOME/target/spark-solr-${VERSION}-shaded.jar

$PROJECT_HOME is the location where you cloned the spark-solr project. You should (but might not, depending on logging config) see a message like this from Spark during shell initialization:

15/05/27 10:07:53 INFO SparkContext: Added JAR file:/spark-solr/target/spark-solr-1.0-SNAPSHOT-shaded.jar at http://192.168.1.3:57936/jars/spark-solr-1.0-SNAPSHOT-shaded.jar with timestamp 1432742873044

Let’s load the socialdata collection into Spark by executing the following Scala code in the shell:

val tweets = sqlContext.read.format("solr").options(
 Map("zkHost" -> "localhost:9983", "collection" -> "socialdata")
 ).load
 .filter("provider_s='twitter'")

On line 1, we use the sqlContext object loaded into the shell automatically by Spark to load a DataSource named “solr”. Behind the scenes, Spark locates the solr.DefaultSource class in the project JAR file we added to the shell using the --jars parameter.

On line 2, we pass configuration parameters needed by the Solr DataSource to connect to Solr using a Scala Map. At a minimum, we need to pass the ZooKeeper connection string (zkHost) and collection name (collection). By default, the DataSource matches all documents in the collection, but you can pass a Solr query to the DataSource using an optional query parameter. This allows to you restrict the documents seen by the DataSource using a Solr query.

On line 3, while it appears that we’re perhaps loading the data into some truly materialized set of objects, we’re still lazy at this point: we have a DataFrame (read on)

On line 4, we use a filter to only select documents that come from Twitter (provider_s='twitter').

At this point, we have a Spark SQL DataFrame object that can read tweets from Solr. In Spark, a DataFrame is a distributed collection of data organized into named columns (see: https://databricks.com/blog/2015/02/17/introducing-dataframes-in-spark-for-large-scale-data-science.html). Conceptually, DataFrames are similar to tables in a relational database except they are partitioned across multiple nodes in a Spark cluster.

It’s important to understand that Spark does not actually load the socialdata collection into memory at this point. We’re only setting up to perform some analysis on that data; the actual data isn’t loaded into Spark until it is needed to perform some calculation later in the job. This allows Spark to perform the necessary column and partition pruning operations to optimize data access into Solr.

Every DataFrame has a schema. You can use the printSchema() function to get information about the fields available for the tweets DataFrame:

tweets.printSchema()

Behind the scenes, our DataSource implementation uses Solr’s Schema API to determine the fields and field types for the collection automatically.

scala> tweets.printSchema()
 root
 |-- _indexed_at_tdt: timestamp (nullable = true)
 |-- _version_: long (nullable = true)
 |-- accessLevel_i: integer (nullable = true)
 |-- author_s: string (nullable = true)
 |-- createdAt_tdt: timestamp (nullable = true)
 |-- currentUserRetweetId_l: long (nullable = true)
 |-- favorited_b: boolean (nullable = true)
 |-- id: string (nullable = false)
 |-- id_l: long (nullable = true)
 ...

Next, let’s register the tweets DataFrame as a temp table so that we can use it in SQL queries:

tweets.registerTempTable("tweets")

For example, we can count the number of retweets by doing:

sqlContext.sql("SELECT COUNT(type_s) FROM tweets WHERE type_s='echo'").show()

If you check your Solr log, you’ll see the following query was generated by the Solr DataSource to process the SQL statement (note I added the newlines between parameters to make it easier to read the query):

 q=*:*&
 fq=provider_s:twitter&
 fq=type_s:echo&
 distrib=false&
 fl=type_s,provider_s&
 cursorMark=*&
 start=0&
 sort=id+asc&
 collection=socialdata&
 rows=1000

There are a couple of interesting aspects of this query.

First, notice that the provider_s field filter we used when we declared the DataFrame translated into a Solr filter query parameter (fq=provider_s:twitter). Solr will cache an efficient data structure for this filter that can be reused across queries, which improves performance when reading data from Solr to Spark.

In addition, the SQL statement included a WHERE clause that also translated into an additional filter query (fq=type_s:echo). Our DataSource implementation handles the translation of SQL clauses to Solr specific query constructs. On the backend, Spark handles the distribution and optimization of the logical plan to execute a job that accesses data sources.

Even though there are many fields available for each tweet in our collection, Spark ensures that only the fields needed to satisfy the query are retrieved from the data source, which in this case is only type_s and provider_s. In general, it’s a good idea to only request the specific fields you need access to when reading data in Spark.

The query also uses deep-paging cursors to efficiently read documents deep into the result set. If you’re curious how deep paging cursors work in Solr, please read: https://lucidworks.com/blog/coming-soon-to-solr-efficient-cursor-based-iteration-of-large-result-sets/. Also, matching documents are streamed back from Solr, which improves performance because the client side (Spark task) does not have to wait for a full page of documents (1000) to be constructed on the Solr side before receiving data. In other words, documents are streamed back from Solr as soon as the first hit is identified.

The last interesting aspect of this query is the distrib=false parameter. Behind the scenes, the Solr DataSource will read data from all shards in a collection in parallel from different Spark tasks. In other words, if you have a collection with ten shards, then the Solr DataSource implementation will use 10 Spark tasks to read from each shard in parallel. The distrib=false parameter ensures that each shard will only execute the query locally instead of distributing it to other shards.

However, reading from all shards in parallel does not work for Top N type use cases where you need to read documents from Solr in ranked order across all shards. You can disable the parallelization feature by setting the parallel_shards parameter to false. When set to false, the Solr DataSource will execute a standard distributed query. Consequently, you should use caution when disabling this feature, especially when reading very large result sets from Solr.

Beyond SQL, the Spark API exposes a number of functional operations you can perform on a DataFrame. For example, if we wanted to determine the top authors based on the number of posts, we could use the following SQL:

sqlContext.sql("select author_s, COUNT(author_s) num_posts from tweets where type_s='post' group by author_s order by num_posts desc limit 10").show()
tweets.filter("type_s='post'").groupBy("author_s").count().orderBy(desc("count")).limit(10).show()

Another subtle aspect of working with DataFrames is that you as a developer need to decide when to cache the DataFrame based on how expensive it was to create it. For instance, if you load 10’s of millions of rows from Solr and then perform some costly transformation that trims your DataFrame down to 10,000 rows, then it would be wise to cache the smaller DataFrame so that you won’t have to re-read millions of rows from Solr again. On the other hand, caching the original millions of rows pulled from Solr is probably not very useful, as that will consume too much memory. The general advice I follow is to cache DataFrames when you need to reuse them for additional computation and they require some computation to generate.

Tuning the Solr SparkSQL DataSource

The Solr DataSource supports a number of optional parameters to allow you to optimize performance when reading data from Solr. Let’s start with the most basic definition of the Solr DataSource and build up the options as we progress through this section:

var solr = sqlContext.read.format("solr").option("zkhost", "localhost:9983").option("collection","socialdata").load()

query

Probably the most obvious option is to specify a Solr query that limits the rows you want to load into Spark. For instance, if we only wanted to load documents that mention "solr", we would do:

option("query","body_t:solr")

If you don’t specify the "query" option, then all rows are read using the match all documents query (:).

fields

You can use the "query.fields" option to specify a subset of fields to retrieve for each document in your results:

option("query.fields","id,author_s,favorited_b,...")

By default, all fields for each document are pulled back from Solr.

query.rows

You can use the "query.rows" option to specify the number of rows to retrieve from Solr per request. Behind the scenes, the implemenation uses deep paging cursors and response streaming, so it is usually safe to specify a large number of rows. By default, the implementation uses 1000 but if your documents are smaller, you can increase this to 5000. Using too large a value can put pressure on the Solr JVM’s garbage collector.

option("query.rows","5000")

query.split.field

If your Spark cluster has more available executor slots than the number of shards, then you can increase parallelism when reading from Solr by splitting each shard into sub ranges using a split field. A good candidate for the split field is the version field that is attached to every document by the shard leader during indexing.

option("query.split.field","_version_")

Behind the scenes, the DataSource implementation tries to split the shard into evenly sized splits using filter queries. You can also split on a string-based keyword field but it should have sufficient variance in the values to allow for creating enough splits to be useful. In other words, if your Spark cluster can handle 10 splits per shard, but there are only 3 unique values in a keyword field, then you will only get 3 splits.

query.splits.per.shard

The "query.splits.per.shard" option provides a hint to the shard split planner on how many splits to create per shard. This should be based on the number of available executor slots in your Spark cluster divided by the number of shards in the collection you’re querying. For instance, if you’re querying into a 5 shard collection and your Spark cluster has 20 available executor slots to run the job, then you’ll want to use:

option("query.splits.per.shard","4")

Keep in mind that this is only a hint to the split calculator and you may end up with a slightly different number of splits than what was requested.

escape_fieldnames

The "escape_fieldnames" option will transform field names in Solr. Any dots ('.') inside the field name are replaced with '_'

option("escape_fieldnames", "true")

splits

The "splits" option will enable the parallelism by splitting on default field 'version'.

option("splits", "true")

The above option is equivalent to

option("split_field", "_version_")

dv

The "dv" option will fetch the doc values that are not stored.by using function queries.

option("dv", "true")

Since 5.5.0 Solr will return non-stored docValues by default. This option is only needed for versions that are older than 5.5

solr.params

The "solr.params" option is used for providing arbitrary Solr query parameters (just like a normal Solr query)

option("solr.params", "defType=edismax&timeAllowed=0")

The params provided here will override the common Solr params 'fields', 'query', 'rows'

Reading data from Solr as a Spark RDD

The com.lucidworks.spark.rdd.SolrRDD class transforms the results of a Solr query into a Spark RDD.

Scala:

import com.lucidworks.spark.rdd.SolrRDD
val solrRDD = new SolrRDD(zkHost, collection, sc)
val words = solrRDD.flatMap(doc => {
  val tweet = doc.get("text_t")
  var tweetStr = if (tweet != null) tweet.toString() else ""
  tweetStr = tweetStr.toLowerCase().replaceAll("[.,!?\n]", " ")
  tweetStr.split(" ")
  })

See com.lucidworks.spark.example.query.WordCount example for full code

Java

SolrJavaRDD solrRDD = SolrJavaRDD.get(zkHost, collection, jsc.sc());
JavaRDD<SolrDocument> resultsRDD = solrRDD.queryShards(solrQuery);

Once you’ve converted the results in an RDD, you can use the Spark API to perform analytics against the data from Solr. For instance, the following code extracts terms from the tweet_s field of each document in the results:

JavaRDD<String> words = resultsRDD.flatMap(new FlatMapFunction<SolrDocument, String>() {
  public Iterable<String> call(SolrDocument doc) {
    Object tweet_s = doc.get("tweet_s");
    String str = tweet_s != null ? tweet_s.toString() : "";
    str = str.toLowerCase().replaceAll("[.,!?\n]", " ");
    return Arrays.asList(str.split(" "));
  }
});

Writing data to Solr from Spark Streaming

The com.lucidworks.spark.SolrSupport class provides static helper functions for send data to Solr from a Spark streaming application. The TwitterToSolrStreamProcessor class provides a good example of how to use the SolrSupport API. For sending documents directly to Solr, you need to build-up a SolrInputDocument in your Spark streaming application code.

    String zkHost = cli.getOptionValue("zkHost", "localhost:9983");
    String collection = cli.getOptionValue("collection", "collection1");
    int batchSize = Integer.parseInt(cli.getOptionValue("batchSize", "10"));
    SolrSupport.indexDStreamOfDocs(zkHost, collection, batchSize, docs);

Developing a Spark Application

The com.lucidworks.spark.SparkApp provides a simple framework for implementing Spark applications in Java. The class saves you from having to duplicate boilerplate code needed to run a Spark application, giving you more time to focus on the business logic of your application.

To leverage this framework, you need to develop a concrete class that either implements RDDProcessor or extends StreamProcessor depending on the type of application you’re developing.

RDDProcessor

Implement the com.lucidworks.spark.SparkApp$RDDProcessor interface for building a Spark application that operates on a JavaRDD, such as one pulled from a Solr query (see SolrQueryProcessor as an example).

StreamProcessor

Extend the com.lucidworks.spark.SparkApp$StreamProcessor abstract class to build a Spark streaming application.

See com.lucidworks.spark.example.streaming.oneusagov.OneUsaGovStreamProcessor or com.lucidworks.spark.example.streaming.TwitterToSolrStreamProcessor for examples of how to write a StreamProcessor.

Authenticating with Kerberized Solr

For background on Solr security, see: https://cwiki.apache.org/confluence/display/solr/Security.

The SparkApp framework allows you to pass the path to a JAAS authentication configuration file using the -solrJaasAuthConfig option.

For example, if you need to authenticate using the "solr" Kerberos principal, you need to create a JAAS config file named jaas-client.conf that sets the location of your Kerberos keytab file, such as:

Client {
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/keytabs/solr.keytab"
  storeKey=true
  useTicketCache=true
  debug=true
  principal="solr";
};

To use this configuration to authenticate to Solr, you simply need to pass the path to jaas-client.conf created above using the -solrJaasAuthConfig option, such as:

spark-submit --master yarn-server \
  --class com.lucidworks.spark.SparkApp \
  $SPARK_SOLR_PROJECT/target/lucidworks-spark-rdd-2.0.3.jar \
  hdfs-to-solr -zkHost $ZK -collection spark-hdfs \
  -hdfsPath /user/spark/testdata/syn_sample_50k \
  -solrJaasAuthConfig=/path/to/jaas-client.conf