Importing Data with Pig

You can use Pig to import data into Fusion, using the lucidworks-pig-functions-2.2.6.jar file found in fusion/3.1.x/apps/connectors/resources/lucid.hadoop/jobs.

Available Functions

The Pig functions included in the lucidworks-pig-functions-2.2.6.jar are three UserDefined Functions (UDF) and two Store functions. These functions are:

  • com/lucidworks/hadoop/pig/SolrStoreFunc.class

  • com/lucidworks/hadoop/pig/FusionIndexPipelinesStoreFunc.class

  • com/lucidworks/hadoop/pig/EpochToCalendar.class

  • com/lucidworks/hadoop/pig/Extract.class

  • com/lucidworks/hadoop/pig/Histogram.class

Using The Functions

Register the Functions

There are two approaches to using functions in Pig: REGISTER them in the script, or load them with your Pig command line request.

If using REGISTER, the Pig function jars must be put in HDFS in order to be used by your Pig script. It can be located anywhere in HDFS; you can either supply the path in your script or use a variable and define the variable with -p property definition.

The example below uses the second approach, loading the jars with the -Dpig.additional.jars system property when launching the script. With this approach, the jars can be located anywhere on the machine where the script will be run.

Indexing Data to Fusion

When indexing data to Fusion, there are several parameters to pass with your script in order to output data to Fusion for indexing.

These parameters can be made into variables in the script, with the proper values passed on the command line when the script is initiated. The example script below shows how to do this for Solr. The theory is the same for Fusion, only the parameter names would change as appropriate:

fusion.endpoints

The full URL to the index pipeline in Fusion. The URL should include the pipeline name and the collection data will be indexed to.

fusion.fail.on.error

If true, when an error is encountered, such as if a row could not be parsed, indexing will stop. This is false by default.

fusion.buffer.timeoutms

The amount of time, in milliseconds, to buffer documents before sending them to Fusion. The default is 1000. Documents will be sent to Fusion when either this value or fusion.batchSize is met.

fusion.batchSize

The number of documents to batch before sending the batch to Fusion. The default is 500. Documents will be sent to Fusion when either this value or fusion.buffer.timeoutms is met.

fusion.realm

This is used with fusion.user and fusion.password to authenticate to Fusion for indexing data. Two options are supported, KERBEROS or NATIVE.

Kerberos authentication is supported with the additional definition of a JAAS file. The properties java.security.auth.login.config and fusion.jaas.appname are used to define the location of the JAAS file and the section of the file to use. These are described in more detail below.

Native authentication uses a Fusion-defined username and password. This user must exist in Fusion, and have the proper permissions to index documents.

fusion.user

The Fusion username or Kerberos principal to use for authentication to Fusion.

If a Fusion username is used ('fusion.realm' = 'NATIVE'), the fusion.password must also be supplied.

fusion.pass

This property is not shown in the example above. The password for the fusion.user when the fusion.realm is NATIVE.

Indexing to a Kerberized Fusion Installation

When Fusion is secured with Kerberos, Pig scripts must include the full path to a JAAS file that includes the service principal and the path to a keytab file that will be used to index the output of the script to Fusion.

Additionally, a Kerberos ticket must be obtained on the server for the principal using kinit.

java.security.auth.login.config

This property defines the path to a JAAS file that contains a service principal and keytab location for a user who is authorized to write to Fusion.

The JAAS configuration file must be copied to the same path on every node where a Node Manager is running (i.e., every node where map/reduce tasks are executed). Here is a sample section of a JAAS file:

Client { (1)
  com.sun.security.auth.module.Krb5LoginModule required
  useKeyTab=true
  keyTab="/data/fusion-indexer.keytab" (2)
  storeKey=true
  useTicketCache=true
  debug=true
  principal="fusion-indexer@FUSIONSERVER.COM"; (3)
};
  1. The name of this section of the JAAS file. This name will be used with the fusion.jaas.appname parameter.

  2. The location of the keytab file.

  3. The service principal name. This should be a different principal than the one used for Fusion, but must have access to both Fusion and Pig. This name is used with the fusion.user parameter described above.

fusion.jaas.appname

Used only when indexing to or reading from Fusion when it is secured with Kerberos.

This property provides the name of the section in the JAAS file that includes the correct service principal and keytab path.

Sample CSV Script

The following Pig script will take a simple CSV file and index it to Solr.

set solr.zkhost '$zkHost';
set solr.collection '$collection'; (1)

A = load '$csv' using PigStorage(',') as (id_s:chararray,city_s:chararray,country_s:chararray,code_s:chararray,code2_s:chararray,latitude_s:chararray,longitude_s:chararray,flag_s:chararray); (2)
--dump A;
B = FOREACH A GENERATE $0 as id, 'city_s', $1, 'country_s', $2, 'code_s', $3, 'code2_s', $4, 'latitude_s', $5, 'longitude_s', $6, 'flag_s', $7; (3)

ok = store B into 'SOLR' using com.lucidworks.hadoop.pig.SolrStoreFunc(); (4)

This relatively simple script is doing several things that help to understand how the Solr Pig functions work.

  1. This and the line above define parameters that are needed by SolrStoreFunc to know where Solr is. SolrStoreFunc needs the properties solr.zkhost and solr.collection, and these lines are mapping the zkhost and collection parameters we will pass when invoking Pig to the required properties.

  2. Load the CSV file, the path and name we will pass with the csv parameter. We also define the field names for each column in CSV file, and their types.

  3. For each item in the CSV file, generate a document id from the first field ($0) and then define each field name and value in name, value pairs.

  4. Load the documents into Solr, using the SolrStoreFunc. While we don’t need to define the location of Solr here, the function will use the zkhost and collection properties that we will pass when we invoke our Pig script.

Warning
When using SolrStoreFunc, the document ID must be the first field.

When we want to run this script, we invoke Pig and define several parameters we have referenced in the script with the -p option, such as in this command:

./bin/pig -Dpig.additional.jars=/path/to/lucidworks-pig-functions-2.2.6.jar -p csv=/path/to/my/csv/airports.dat -p zkHost=zknode1:2181,zknode2:2181,zknode3:2181/solr -p collection=myCollection ~/myScripts/index-csv.pig

The parameters to pass are:

csv

The path and name of the CSV file we want to process.

zkhost

The ZooKeeper connection string for a SolrCloud cluster, in the form of zkhost1:port,zkhost2:port,zkhost3:port/chroot. In the script, we mapped this to the solr.zkhost property, which is required by the SolrStoreFunc to know where to send the output documents.

collection

The Solr collection to index into. In the script, we mapped this to the solr.collection property, which is required by the SolrStoreFunc to know the Solr collection the documents should be indexed to.

Tip

The zkhost parameter above is only used if you are indexing to a SolrCloud cluster, which uses ZooKeeper to route indexing and query requests.

If, however, you are not using SolrCloud, you can use the solrUrl parameter, which takes the location of a standalone Solr instance, in the form of http://host:port/solr.

In the script, you would change the line that maps solr.zkhost to the zkhost property to map solr.server.url to the solrUrl property. For example:

`set solr.server.url '$solrUrl';`

How to Contribute

  1. Fork this repo i.e. <username|organization>/hadoop-solr, following the fork a repo tutorial. Then, clone the forked repo on your local machine:

    $ git clone https://github.com/<username|organization>/hadoop-solr.git
  2. Configure remotes with the configuring remotes tutorial.

  3. Create a new branch:

    $ git checkout -b new_branch
    $ git push origin new_branch

    Use the creating branches tutorial to create the branch from GitHub UI if you prefer.

  4. Develop on new_branch branch only, do not merge new_branch to your master. Commit changes to new_branch as often as you like:

    $ git add <filename>
    $ git commit -m 'commit message'
  5. Push your changes to GitHub.

    $ git push origin new_branch
  6. Repeat the commit & push steps until your development is complete.

  7. Before submitting a pull request, fetch upstream changes that were done by other contributors:

    $ git fetch upstream
  8. And update master locally:

    $ git checkout master
    $ git pull upstream master
  9. Merge master branch into new_branch in order to avoid conflicts:

    $ git checkout new_branch
    $ git merge master
  10. If conflicts happen, use the resolving merge conflicts tutorial to fix them:

  11. Push master changes to new_branch branch

    $ git push origin new_branch
  12. Add jUnits, as appropriate, to test your changes.

  13. When all testing is done, use the create a pull request tutorial to submit your change to the repo.

Note

Please be sure that your pull request sends only your changes, and no others. Check it using the command:

git diff new_branch upstream/master