Import Data with the Parallel Bulk Loader

Fusion Parallel Bulk Loader jobs enable bulk ingestion of structured and semi-structured data from big data systems, NoSQL databases, and common file formats like Parquet and Avro.

The Parallel Bulk Loader leverages the popularity of Spark as a prominent distributed computing platform for big data. A number of companies invest heavily in building production-ready Spark SQL data source implementations for big data and NoSQL systems, much as Lucidworks has done for Solr. The Parallel Bulk Loader uses connectors provided by the experts who develop these complex systems.

Available data sources

The Parallel Bulk Loader can load documents from any system that implements the Data Sources API for Spark SQL 2.2.1 or later.

These are data sources that the Parallel Bulk Loader can use. For data sources that use Spark SQL connectors, the source of the connector is indicated.

  • Solr databases

    Connector (Lucidworks): spark-solr

  • Files in these common formats: JSON, CSV, XML, Apache Avro, and Apache Parquet

  • JDBC-compliant databases

  • Apache HBase databases

    Connector (Hortonworks): Apache Spark - Apache HBase Connector

  • Datasets accessible through Apache Hive

  • Apache Cassandra NoSQL databases

    Connector (DataStax): Spark-Cassandra connector

  • Elastic databases

    Use the package: org.elasticsearch:elasticsearch-spark-20_2.11:6.2.2

  • MongoDB databases

  • Riak databases

  • Couchbase NoSQL databases

  • Redis in-memory data structure stores

  • Google data sources, including Google Analytics, Sheets, and BigQuery

    Connectors:

  • Microsoft Azure DataLake, Cosmos DB, and SQL Database

Key features

Key features of the Parallel Bulk Loader are:

  • Load distribution – To distribute load and maximize performance, the Parallel Bulk Loader parallelizes operations and distributes them across the Fusion Spark cluster.

  • No parsing – No parsing is needed. The Spark Data Sources API returns a DataFrame (RDD + schema) that has an easy-to-use tabular format.

  • Dynamic resolution of dependencies – There is nothing to download or install. Users just provide the Maven coordinates of dependencies during configuration, and Spark distributes the necessary JAR files to worker nodes.

  • Leverage integration libraries – Similar to JDBC, the Parallel Bulk Loader leverages integration libraries built by the experts of the underlying systems, for example, Databricks, DataStax, Hortonworks, Elastic, Lucidworks, Microsoft, and so forth.

  • Direct write operations – The Parallel Bulk Loader writes directly to Solr (for maximum performance) or to Fusion Server index pipelines (for maximum flexibility).

  • Solr atomic updates – The Parallel Bulk Loader uses atomic updates to update existing documents in Solr.

  • Incremental queries – To obtain the latest changes to data sources, macros built into the Parallel Bulk Loader use timestamps to filter queries.

  • Seamless integration with Spark-NLP Do natural language processing, including part-of-speech tagging, stemming or lemmatization, sentiment analysis, named-entity recognition, and other NLP tasks.

  • SQL joins – Use SQL to join data from multiple Solr collections.

  • Load Fusion ML models – To classify incoming documents, load Fusion Machine Learning models stored in the Fusion blob store.

  • SQL transformations – Leverage the full power of the Spark Scala’s DataFrame APIs and SQL to filter and transform data.

  • UDF and UDAF functions – Select from hundreds of user-defined functions and user-defined aggregate functions.

Differences between the Parallel Bulk Loader and Fusion classic connectors

The primary difference between the Bulk Loader and Fusion classic connectors is that the Bulk Loader uses Spark SQL and Spark/Solr integration to perform distributed reads from data sources.

Here are some examples of how the Parallel Bulk Loader performs distributed reads:

  • HBase table – To support high-volume data ingestion into Solr, the Parallel Bulk loader can distribute queries sent to HBase tables across multiple region servers.

  • Parquet files – The Parallel Bulk loader processes a directory of Parquet files in HDFS in parallel using the built-in support for computing splits in HDFS files.

  • Spark/Solr integration – With Spark/Solr integration, the Parallel Bulk Loader uses a Spark-Solr data source to send queries to all replicas of a collection, so it can read from Solr in parallel across the Spark cluster.

This diagram depicts how the Spark-Solr data source partitions queries across all shards/replicas of a Solr collection to better utilize cluster resources and to improve read performance. Most Spark-SQL data sources do something similar for their respective databases, which is one of the main benefits of using the Parallel Bulk Loader job.

SolrRDD diagram

In contrast, most classic connectors have only a single-fetcher mode. To scale the fetching with classic connectors, you must distribute the connector itself, which differs from relying on the built-in parallelization of Spark.

Also, most classic connectors rely on Fusion parsers and index pipelines to prepare data for indexing, whereas no parsing is needed for the Parallel Bulk Loader, which can achieve maximum indexing performance by writing directly to Solr.

Create and run Parallel Bulk Loader jobs

Use the Jobs manager to create and run Parallel Bulk Loader jobs. You can also use the Scheduler to schedule jobs.

In the procedures, select Parallel Bulk Loader as the job type and configure the job as needed.

Configuration settings for the Parallel Bulk Loader job

This section provides configuration settings for the Parallel Bulk Loader job. Also see configuration properties in the Jobs Configuration Reference.

Read settings

Setting Description

format

Unique identifier of the data source provider. Spark scans the job’s classpath for a class named DefaultSource in the <format> package. For example, for the solr format, we provide the solr.DefaultSource class in spark-solr:

path (optional)

Comma-delimited list of paths to load. Some data sources, such as parquet, require a path. Others, such as Solr, don’t. Refer to the documentation for your data source to determine if you need to provide a path.

readOptions

Options passed to the Spark SQL data source to configure the read operation. Options differ for every data source. Refer to the specific data source documentation for more information.

sparkConfig (optional)

List of Spark configuration settings needed to run the Parallel Bulk Loader.

shellOptions

Behind the scenes, the Parallel Bulk Loader job submits a Scala script to the Fusion Spark shell. The shellOptions setting lets you pass any additional options needed by the Spark shell. The two most common options are --packages and --repositories:

--packages

Comma-separated list of Maven coordinates of JAR files to include on the driver and executor classpaths. Spark searches the local Maven repository, and then Maven central and any additional remote repositories given in the config. The format for the coordinates should be groupId:artifactId:version. The HBase example below demonstrates the use of the packages option for loading the com.hortonworks:shc-core:1.1.1-2.1-s_2.11 package.

Tip
Use the https://spark-packages.org/ site to find interesting packages to add to your Parallel Bulk Loader jobs.

--repositories

Comma-separated list of additional remote Maven repositories to search for the Maven coordinates given in the packages config setting. The Index HBase tables example below demonstrates the use of the repositories option for loading the com.hortonworks:shc-core:1.1.1-2.1-s_2.11 package from the http://repo.hortonworks.com/content/repositories/releases repository.

timestampFieldName

For datasources that support time-based filters, the Parallel Bulk Loader computes the timestamp of the last document written to Solr and the current timestamp of the Parallel Bulk Loader job. For example, the HBase data source lets you filter the read between a MIN_STAMP and MAX_STAMP, for example:

val timeRangeOpts = Map(HBaseRelation.MIN_STAMP -> minStamp.toString, HBaseRelation.MAX_STAMP -> maxStamp.toString)

This lets Parallel Bulk Loader jobs run on schedules, and pull only the newest rows from the underlying datasources.

To support timestamp based filtering, the Parallel Bulk Loader provides two simple macros:

$lastTimestamp(format)

$nowTimestamp(format)

The format argument is optional. If not supplied, then an ISO-8601 date/time string is used. The timestampFieldName setting is used to determine the value of lastTimestamp, using a Top 1 query to Solr to get the max timestamp. You can also pass $lastTimestamp(EPOCH) or $lastTimestamp(EPOCH_MS) to get the timestamp in seconds or milliseconds.

See the Index HBase tables example below for an example of using this configuration property.

Transformation settings

Setting Description

transformScala

Sometimes, you can write a small script to transform input data into the correct form for indexing. But at other times, you might need the full power of the Spark API to transform data into an indexable form.

The transformScala option lets you filter and/or transform the input DataFrame any way you’d like. You can even define UDFs to use during your transformation. For an example of using Scala to transform the input DataFrame before indexing in Solr, see the Read from Parquet example.

Another powerful use of the transformScala option is that you can pull in advanced libraries, such as Spark NLP (from John Snow Labs) to do NLP work on your content before indexing. See the Use NLP during indexing example.

Your Scala script can do other things but, at a minimum, it must define the following function that the Parallel Bulk Loader invokes:

def transform(inputDF: Dataset[Row]) : Dataset[Row] = {
  // do transformations and/or filter the inputDF here
}

Your script can rely on the following vals:
spark: SparkSession
sc: SparkContext
fusionZKConn: ZKConnection // needed to access Fusion API
solrCollection: SolrCollection // output collection
jobId: Loader job config ID

Also, the following classes have already been imported:
import org.apache.spark.SparkContext._
import spark.implicits._
import spark.sql
import org.apache.spark.sql.functions._
import com.lucidworks.spark.util.{SparkShellSupport => _lw}
import com.lucidworks.spark.job.sql.SparkSQLLoader
import com.lucidworks.spark.ml.recommenders.SolrCollection
import com.lucidworks.spark.ZKConnection
import org.apache.spark.sql.{Dataset, Row}

transformSql

The transformSql option lets you write a SQL query to transform the input DataFrame. The SQL is executed after the transformScala script (if both are defined). The input DataFrame is exposed to your SQL as the _input view. See the Clean up data with SQL transformations example below for an example of using SQL to transform the input before indexing in Solr. This option also lets you leverage the UDF/UDAF functions provided by Spark SQL.

mlModelId

If you have a Spark ML PipelineModel loaded into the blob store, you can supply the blob ID to the Parallel Bulk Loader and it will:

  1. Load the model from the blob store.

  2. Transform the input DataFrame (after the Scala transform but before the SQL transform).

  3. Add the predicted output field (specified in the model metadata stored in the blob store) to the projected fields list.

PipelineModel ID

This lets you use Spark ML models to make predictions in a more scalable, performant manner than what can be achieved with a Machine Learning index stage.

Output settings

Setting Description

outputCollection

Name of the Fusion collection to write to. The Parallel Bulk Loader uses the Collections API to resolve the underlying Solr collection at runtime.

outputIndexPipeline

Name of a Fusion index pipeline to which to send documents, instead of directly indexing to Solr. This option lets you perform additional ETL (extract, transform, and load) processing on the documents before they are indexed in Solr. If you need to write to time-partitioned indexes, then you must use an index pipeline, because writing directly to Solr is not partition aware.

defineFieldsUsingInputSchema

Flag to indicate if the Parallel Bulk Loader should use the input schema to create fields in Solr, after applying the Scala and/or SQL transformations. If false, then the Parallel Bulk Loader relies on the Fusion index pipeline and/or Solr field guessing to create the fields. If true, only fields that don’t exist already in Solr are created. Consequently, if there is a type mismatch between an existing field in Solr and the input schema, you’ll need to use a transformation to rename the field in the input schema.

clearDatasource

If checked, the Parallel Bulk Loader deletes any existing documents in the output collection that match the query _lw_loader_id_s:<JOB>. Consequently, the Parallel Bulk Loader adds two metadata fields to each row: _lw_loader_id_s and _lw_loader_job_id_s.

atomicUpdates

Flag to send documents directly to Solr as atomic updates instead of as new documents. This option is not supported when using an index profile. Also note that the Parallel Bulk Loader tracking fields _lw_loader_id_s and _lw_loader_job_id_s are not sent when using atomic updates, so the clear datasource option doesn’t work with documents created using atomic updates.

outputOptions

Options used when writing directly to Solr. See Spark-Solr: https://github.com/lucidworks/spark-solr#index-parameters

For example, if your docs are relatively small, you might want to increase the batch_size (2000 default) as shown below:

batch_size parameter

outputPartitions

Coalesce the DataFrame into N partitions before writing to Solr. This can help spread the indexing work out across more executors that are available in Spark, or limit the parallelism when writing to Solr.

Tune performance

As the name of the Parallel Bulk Loader job implies, it’s designed to ingest large amounts of data into Fusion by parallelizing the work across your Spark cluster. To achieve scalability, you might need to increase the amount of memory and/or CPU resources allocated to the job.

By default, Fusion’s Spark configuration settings control the resources allocated to Parallel Bulk Loader jobs.

You can pass these properties in the job configuration to override the default Spark shell options:

Parameter Name Description and Default

--driver-cores

Cores for the driver

Default: 1

--driver-memory

Memory for the driver (for example, 1000M or 2G)

Default: 1024M

--executor-cores

Cores per executor

Default: 1 in YARN mode, or all available cores on the worker in standalone mode

--executor-memory

Memory per executor (for example, 1000M or 2G)

Default: 1G

--total-executor-cores

Total cores for all executors

Default: Without setting this parameter, the total cores for all executors is the number of executors in YARN mode, or all available cores on all workers in standalone mode.

Spark shell options

Examples

Here we provide screenshots and example JSON job definitions to illustrate key points about how to load from different data sources.

Use NLP during indexing

In this example, we leverage the John Snow labs NLP library during indexing. This is just quick-and-dirty to show the concept.

Also see:

NLP during indexing

Use this transform Scala script:

import com.johnsnowlabs.nlp._
import com.johnsnowlabs.nlp.annotators._
import org.apache.spark.ml.Pipeline
import com.johnsnowlabs.nlp.annotators.sbd.pragmatic.SentenceDetector

def transform(inputDF: Dataset[Row]) : Dataset[Row] = {
  val documentAssembler = new DocumentAssembler().setInputCol("plot_txt_en").setOutputCol("document")
  val sentenceDetector = new SentenceDetector().setInputCols(Array("document")).setOutputCol("sentences")

  val finisher = new Finisher()
    .setInputCols("sentences")
    .setOutputCols("sentences_ss")
    .setOutputAsArray(true)
    .setCleanAnnotations(false)

  val pipeline = new Pipeline().setStages(Array(documentAssembler,sentenceDetector,finisher))
  pipeline.fit(inputDF).transform(inputDF).drop("document").drop("sentences")
}

Be sure to add the JohnSnowLabs:spark-nlp:1.4.2 package using Spark Shell Options.

Clean up data with SQL transformations

Fusion has a Local Filesystem connector that can handle files such as CSV and JSON files. Using the Parallel Bulk Loader lets you leverage features that are not in the Local Filesystem connector, such as using SQL to clean up the input data.

SQL transformation of CSV data

Use the following SQL to clean up the input data before indexing:

SELECT _c0 as user_id,
       CAST(_c1 as INT) as age,
       _c2 as gender,
       _c3 as occupation,
       _c4 as zip_code
 FROM _input

Job JSON:
{
  "type" : "parallel-bulk-loader",
  "id" : "csv",
  "format" : "csv",
  "path" : "/Users/tjp/dev/lw/projects/fusion-spark-bootcamp/labs/movielens/ml-100k/u.user",
  "readOptions" : [ {
    "key" : "delimiter",
    "value" : "|"
  }, {
    "key" : "header",
    "value" : "false"
  } ],
  "outputCollection" : "users",
  "clearDatasource" : false,
  "defineFieldsUsingInputSchema" : true,
  "atomicUpdates" : false,
  "transformSql" : "SELECT _c0 as user_id, \n       CAST(_c1 as INT) as age, \n       _c2 as gender,\n       _c3 as occupation,\n       _c4 as zip_code \n FROM _input"
}

Read from S3

It’s easy to read from an S3 bucket without pulling data down to your local workstation first. To avoid exposing your AWS credentials, add them to a file named core-site.xml in the apps/spark-dist/conf directory, such as:

<configuration>
  <property>
    <name>fs.s3a.access.key</name>
    <value>???</value>
  </property>
  <property>
    <name>fs.s3a.secret.key</name>
    <value>???</value>
  </property>
</configuration>

Then you can load files using the S3a protocol, such as: s3a://sstk-dev/data/u.user. S3a is the preferred protocol for reading data into Spark because it uses Amazon’s libraries to read from S3 instead of the legacy Hadoop libraries. If you need other S3 protocols (for example, s3 or s3n) you’ll need to add the equivalent properties to core-site.xml.

S3 Read Options

You’ll need to add the org.apache.hadoop:hadoop-aws:2.7.3 package to the job using the --packages Spark option. Also, you’ll need to exclude the com.fasterxml.jackson.core:jackson-core,joda-time:joda-time packages using the --exclude-packages option.

S3 Spark Shell Options

You can also read from Google Cloud Storage (GCS), but you’ll need a few more properties in your core-site.xml; see Installing the Cloud Storage connector.

Read from Parquet

Reading from parquet files is built into Spark using the "parquet" format. For additional read options, see Configuration of Parquet.

Read signals from parquet file

Job JSON:

{
  "type" : "parallel-bulk-loader",
  "id" : "ecomm demo parquet signals",
  "format" : "parquet",
  "path" : "./part-00000-c1951958-98ae-4f2a-b7b4-2e3a69fcf403-c000.snappy.parquet",
  "outputCollection" : "best-buy_signals",
  "clearDatasource" : false,
  "defineFieldsUsingInputSchema" : true,
  "atomicUpdates" : false
}

This example also uses the transformScala option to filter and transform the input DataFrame into a better form for indexing using the following Scala script:

import java.util.Calendar
import java.util.Locale
import java.util.TimeZone

def transform(inputDF: Dataset[Row]) : Dataset[Row] = {
  // do transformations and/or filter the inputDF here
  val signalsDF =
inputDF.filter((unix_timestamp($"timestamp_tdt", "MM/dd/yyyy HH:mm:ss.SSS") < 1325376000))
  val now = System.currentTimeMillis()
  val maxDate = signalsDF.agg(max("timestamp_tdt")).take(1)(0).getAs[java.sql.Timestamp](0).getTime
  val diff = now - maxDate
  val add_time =
udf((t: java.sql.Timestamp, diff : Long) => new java.sql.Timestamp(t.getTime + diff))

  val day_of_week = udf((t: java.sql.Timestamp) => {
    val calendar = Calendar.getInstance(TimeZone.getTimeZone("UTC"))
    calendar.setTimeInMillis(t.getTime)
    calendar.getDisplayName(Calendar.DAY_OF_WEEK, Calendar.LONG, Locale.getDefault)
  })

  //Remap some columns to bring the timestamps current
  signalsDF
      .withColumnRenamed("timestamp_tdt", "orig_timestamp_tdt").withColumn("timestamp_tdt", add_time($"orig_timestamp_tdt", lit(diff)))
      .withColumn("date", $"timestamp_tdt")
      .withColumn("tx_timestamp_txt", date_format($"timestamp_tdt", "E YYYY-MM-d HH:mm:ss.SSS Z"))
      .withColumn("param.query_time_dt", $"timestamp_tdt")
      .withColumn("date_day", date_format(date_sub($"date", 0), "YYYY-MM-d'T'HH:mm:ss.SSS'Z'"))
      .withColumn("date_month", date_format(trunc($"date", "mm"), "YYYY-MM-d'T'HH:mm:ss.SSS'Z'"))
      .withColumn("date_year", date_format(trunc($"date", "yyyy"), "YYYY-MM-d'T'HH:mm:ss.SSS'Z'"))
      .withColumn("day_of_week", day_of_week($"date"))
}

Read from JDBC tables

You can use the Parallel Bulk Loader to parallelize reads from JDBC tables, if the tables have numeric columns that can be partitioned into relatively equal partition sizes. In the example below, we partition the employees table into 4 partitions using the emp_no column (int). Behind the scenes, Spark sends four separate queries to the database and processes the result sets in parallel.

Load the JDBC driver JAR file into the Blob store

Before you ingest from a JDBC data source, you need to use the Fusion Admin UI to upload the JDBC driver JAR file into the blob store.

Alternatively, you can add the JAR file to the Fusion blob store with resourceType=spark:jar; for example:

curl -XPUT -H "Content-type:application/octet-stream" "http://localhost:8765/api/v1/blobs/mysql_jdbc_jar?resourceType=spark:jar" --data-binary @mysql-connector-java-5.1.45-bin.jar

At runtime, Fusion’s Spark job management framework knows how to add any JAR files with resourceType=spark:jar from the blob store to the appropriate classpaths before running a Parallel Bulk Loader job.

Read from a table

Read from JDBC tables

For more information on reading from JDBC-compliant databases, see:

Job JSON:

{
  "type" : "parallel-bulk-loader",
  "id" : "load dbtable",
  "format" : "jdbc",
  "readOptions" : [ {
    "key" : "url",
    "value" : "jdbc:mysql://localhost/employees?user=?&password=?"
  }, {
    "key" : "dbtable",
    "value" : "employees"
  }, {
    "key" : "partitionColumn",
    "value" : "emp_no"
  }, {
    "key" : "numPartitions",
    "value" : "4"
  }, {
    "key" : "driver",
    "value" : "com.mysql.jdbc.Driver"
  }, {
    "key" : "lowerBound",
    "value" : "$MIN(emp_no)"
  }, {
    "key" : "upperBound",
    "value" : "$MAX(emp_no)"
  } ],
  "outputCollection" : "employees",
  "clearDatasource" : false,
  "defineFieldsUsingInputSchema" : true,
  "atomicUpdates" : false
}

Notice the use of the $MIN(emp_no) and $MAX(emp_no) macros in the read options. These are macros offered by the Parallel Bulk Loader to help configure parallel reads of JDBC tables. Behind the scenes, the macros are translated into SQL queries to get the MAX and MIN values of the specified field, which Spark uses to compute splits for partitioned queries. As mentioned above, the field must be numeric and must have a relatively balanced distribution of values between MAX and MIN; otherwise, you’re unlikely to see much performance benefit to partitioning.

Index HBase tables

To index an HBase table, use the Hortonworks connector found at https://github.com/hortonworks-spark/shc.

Note
The Parallel Bulk Loader lets us replace the HBase Indexer.

You’ll need to add an hbase-site.xml (and possibly core-site.xml) to apps/spark-dist/conf in Fusion, for example:

<configuration>
 <property>
   <name>hbase.defaults.for.version.skip</name>
   <value>true</value>
 </property>
 <property>
   <name>hbase.zookeeper.quorum</name>
   <value>localhost:2181</value>
 </property>
 <property>
    <name>zookeeper.znode.parent</name>
    <value>/hbase</value>
 </property>
</configuration>

For this example, we’ll create a test table in HBase. If you already have a table in HBase, feel free to use that table instead.

  1. Launch the HBase shell and create a table named fusion_nums with a single column family named lw:

    create 'fusion_nums', 'lw'
  2. Do a list command to see your table:

    hbase(main):002:0> list
    TABLE
    fusion_nums
    1 row(s) in 0.0250 seconds
    
    => ["fusion_nums"]
  3. Fill the table with some data:

    for i in '1'..'100' do for j in '1'..'2' do put 'fusion_nums', "row#{i}", "lw:c#{j}", "#{i}#{j}" end end
  4. Scan the fusion_nums table to see your data:

    scan 'fusion_nums'

The HBase connector requires a catalog read option that defines the columns you want to read and how to map them into a Spark DataFrame. For our sample table, the following suffices:

{
   "table":{"namespace":"default", "name":"fusion_nums"},
   "rowkey":"key",
   "columns":{
     "id":{"cf":"rowkey", "col":"key", "type":"string"},
     "lw_c1_s":{"cf":"lw", "col":"c1", "type":"string"},
     "lw_c2_s":{"cf":"lw", "col":"c2", "type":"string"}
   }
 }

Index HBase tables

Notice the use of the $lastTimestamp macro in the read options. This lets us filter rows read from HBase using the timestamp of the last document the Parallel Bulk Loader wrote to Solr, that is, to get the newest updates from HBase only (incremental updates). Most Spark data sources provide a way to filter results based on timestamp.

Job JSON:

{
  "type" : "parallel-bulk-loader",
  "id" : "hbase",
  "format" : "org.apache.spark.sql.execution.datasources.hbase",
  "readOptions" : [ {
    "key" : "catalog",
    "value" : "{    \"table\":{\"namespace\":\"default\", \"name\":\"fusion_nums\"},    \"rowkey\":\"key\",    \"columns\":{      \"id\":{\"cf\":\"rowkey\", \"col\":\"key\", \"type\":\"string\"},      \"lw_c1_s\":{\"cf\":\"lw\", \"col\":\"c1\", \"type\":\"string\"},      \"lw_c2_s\":{\"cf\":\"lw\", \"col\":\"c2\", \"type\":\"string\"}    }  }"
  }, {
    "key" : "minStamp",
    "value" : "$lastTimestamp(EPOCH_MS)"
  } ],
  "outputCollection" : "hbase",
  "timestampFieldName" : "timestamp_tdt",
  "clearDatasource" : false,
  "defineFieldsUsingInputSchema" : true,
  "atomicUpdates" : false,
  "shellOptions" : [ {
    "key" : "--packages",
    "value" : "com.hortonworks:shc-core:1.1.1-2.1-s_2.11"
  }, {
    "key" : "--repositories",
    "value" : "http://repo.hortonworks.com/content/repositories/releases"
  } ]
}

Index Elastic data

With Elasticsearch 6.2.2 using the org.elasticsearch:elasticsearch-spark-20_2.11:6.2.1 package, here’s a Scala script to run in bin/spark-shell to index some test data:

import spark.implicits._

case class SimpsonCharacter(name: String, actor: String, episodeDebut: String)

val simpsonsDF = sc.parallelize(
  SimpsonCharacter("Homer", "Dan Castellaneta", "Good Night") ::
  SimpsonCharacter("Marge", "Julie Kavner", "Good Night") ::
  SimpsonCharacter("Bart", "Nancy Cartwright", "Good Night") ::
  SimpsonCharacter("Lisa", "Yeardley Smith", "Good Night") ::
  SimpsonCharacter("Maggie", "Liz Georges and more", "Good Night") ::
  SimpsonCharacter("Sideshow Bob", "Kelsey Grammer", "The Telltale Head") :: Nil).toDF()

val writeOpts = Map("es.nodes" -> "127.0.0.1",
  "es.port" -> "9200",
  "es.index.auto.create" -> "true",
  "es.resouce.auto.create" -> "shows/simpsons")
simpsonsDF.write.format("org.elasticsearch.spark.sql").mode("Overwrite").save("shows/simpsons")

Elastic

Job JSON:

{
  "type" : "parallel-bulk-loader",
  "id" : "elastic",
  "format" : "org.elasticsearch.spark.sql",
  "readOptions" : [ {
    "key" : "es.nodes",
    "value" : "127.0.0.1"
  }, {
    "key" : "es.port",
    "value" : "9200"
  }, {
    "key" : "es.resource",
    "value" : "shows/simpsons"
  } ],
  "outputCollection" : "hbase_signals_aggr",
  "clearDatasource" : false,
  "defineFieldsUsingInputSchema" : true,
  "atomicUpdates" : false,
"shellOptions" : [ {
    "key" : "--packages",
    "value" : "org.elasticsearch:elasticsearch-spark-20_2.11:6.2.2"
  } ]
}