org.elasticsearch:elasticsearch-spark-20_2.11:6.2.2
com.couchbase.client:spark-connector_2.11:2.2.0
Import with the Bulk Loader
Setting | Description |
format | Unique identifier of the data source provider. Spark scans the job’s classpath for a class named DefaultSource in the <format> package. For example, for the solr format, we provide the solr.DefaultSource class in our spark-solr repository: |
path (optional) | Comma-delimited list of paths to load. Some data sources, such as parquet, require a path. Others, such as Solr, do not. Refer to the documentation for your data source to determine if you need to provide a path. |
readOptions | Options passed to the Spark SQL data source to configure the read operation. Options differ for every data source. Refer to the specific data source documentation for more information. |
sparkConfig (optional) | List of Spark configuration settings needed to run the Parallel Bulk Loader. |
shellOptions | Behind the scenes, the Parallel Bulk Loader job submits a Scala script to the Fusion Spark shell. The shellOptions setting lets you pass any additional options needed by the Spark shell. The two most common options are --packages and --repositories : --packages Comma-separated list of Maven coordinates of JAR files to include on the driver and executor classpaths. Spark searches the local Maven repository, and then Maven central and any additional remote repositories given in the config. The format for the coordinates should be groupId:artifactId:version . The HBase example below demonstrates the use of the packages option for loading the com.hortonworks:shc-core:1.1.1-2.1-s_2.11 package. TIP: Use the https://spark-packages.org/ site to find interesting packages to add to your Parallel Bulk Loader jobs. --repositories Comma-separated list of additional remote Maven repositories to search for the Maven coordinates given in the packages config setting. The Index HBase tables example below demonstrates the use of the repositories option for loading the com.hortonworks:shc-core:1.1.1-2.1-s_2.11 package from the Hortonworks repository. |
timestampFieldName | For datasources that support time-based filters, the Parallel Bulk Loader computes the timestamp of the last document written to Solr and the current timestamp of the Parallel Bulk Loader job. For example, the HBase data source lets you filter the read between a MIN_STAMP and MAX_STAMP , for example: val timeRangeOpts = Map(HBaseRelation.MIN_STAMP -> minStamp.toString, HBaseRelation.MAX_STAMP -> maxStamp.toString) lets Parallel Bulk Loader jobs run on schedules, and pull only the newest rows from the underlying datasources. To support timestamp based filtering, the Parallel Bulk Loader provides two simple macros: $lastTimestamp(format) $nowTimestamp(format) The format argument is optional. If not supplied, then an ISO-8601 date/time string is used. The timestampFieldName setting is used to determine the value of lastTimestamp , using a Top 1 query to Solr to get the max timestamp. You can also pass $lastTimestamp(EPOCH) or $lastTimestamp(EPOCH_MS) to get the timestamp in seconds or milliseconds. See the Index HBase tables example below for an example of using this configuration property. |
Setting | Description |
transformScala | Sometimes, you can write a small script to transform input data into the correct form for indexing. But at other times, you might need the full power of the Spark API to transform data into an indexable form. The transformScala option lets you filter and/or transform the input DataFrame any way you would like. You can even define UDFs to use during your transformation. For an example of using Scala to transform the input DataFrame before indexing in Solr, see the Read from Parquet example. Another powerful use of the transformScala option is that you can pull in advanced libraries, such as Spark NLP (from John Snow Labs) to do NLP work on your content before indexing. See the Use NLP during indexing example. Your Scala script can do other things but, at a minimum, it must define the function that the Parallel Bulk Loader invokes (see below this table). |
transformSql | The transformSql option lets you write a SQL query to transform the input DataFrame. The SQL is executed after the transformScala script (if both are defined). The input DataFrame is exposed to your SQL as the _input view. See the Clean up data with SQL transformations example below for an example of using SQL to transform the input before indexing in Solr. This option also lets you leverage the UDF/UDAF functions provided by Spark SQL. |
mlModelId | If you have a Spark ML PipelineModel loaded into the blob store, you can supply the blob ID to the Parallel Bulk Loader and it will: . Load the model from the blob store. . Transform the input DataFrame (after the Scala transform but before the SQL transform). . Add the predicted output field (specified in the model metadata stored in the blob store) to the projected fields list. ![]() |
transformScala
:Setting | Description |
outputCollection | Name of the Fusion collection to write to. The Parallel Bulk Loader uses the Collections API to resolve the underlying Solr collection at runtime. |
outputIndexPipeline | Name of a Fusion index pipeline to which to send documents, instead of directly indexing to Solr. This option lets you perform additional ETL (extract, transform, and load) processing on the documents before they are indexed in Solr. If you need to write to time-partitioned indexes, then you must use an index pipeline, because writing directly to Solr is not partition aware. |
defineFieldsUsingInputSchema | Flag to indicate if the Parallel Bulk Loader should use the input schema to create fields in Solr, after applying the Scala and/or SQL transformations. If false , then the Parallel Bulk Loader relies on the Fusion index pipeline and/or Solr field guessing to create the fields. If true , only fields that do not exist already in Solr are created. Consequently, if there is a type mismatch between an existing field in Solr and the input schema, you will need to use a transformation to rename the field in the input schema. |
clearDatasource | If checked, the Parallel Bulk Loader deletes any existing documents in the output collection that match the query _lw_loader_id_s:<JOB> . Consequently, the Parallel Bulk Loader adds two metadata fields to each row: _lw_loader_id_s and _lw_loader_job_id_s . |
atomicUpdates | Flag to send documents directly to Solr as atomic updates instead of as new documents. This option is not supported when using an index profile. Also note that the Parallel Bulk Loader tracking fields _lw_loader_id_s and _lw_loader_job_id_s are not sent when using atomic updates, so the clear datasource option does not work with documents created using atomic updates. |
outputOptions | Options used when writing directly to Solr. See Spark-Solr: https://github.com/lucidworks/spark-solr#index-parameters For example, if your docs are relatively small, you might want to increase the batch_size (2000 default) as shown below: ![]() |
outputPartitions | Coalesce the DataFrame into N partitions before writing to Solr. This can help spread the indexing work out across more executors that are available in Spark, or limit the parallelism when writing to Solr. |
Parameter Name | Description and Default |
--driver-cores | Cores for the driver Default: 1 |
--driver-memory | Memory for the driver (for example, 1000M or 2G ) Default: 1024M |
--executor-cores | Cores per executor Default: 1 in YARN mode, or all available cores on the worker in standalone mode |
--executor-memory | Memory per executor (for example, 1000M or 2G ) Default: 1G |
--total-executor-cores | Total cores for all executors Default: Without setting this parameter, the total cores for all executors is the number of executors in YARN mode, or all available cores on all workers in standalone mode. |
JohnSnowLabs:spark-nlp:1.4.2
package using Spark Shell Options.core-site.xml
in the apps/spark-dist/conf
directory, such as:s3a://sstk-dev/data/u.user
. If you are running a Fusion cluster then each instance of Fusion will need a core-site.xml
file. S3a is the preferred protocol for reading data into Spark because it uses Amazon’s libraries to read from S3 instead of the legacy Hadoop libraries. If you need other S3 protocols (for example, s3 or s3n) you will need to add the equivalent properties to core-site.xml
.org.apache.hadoop:hadoop-aws:2.7.3
package to the job using the --packages
Spark option. Also, you will need to exclude the com.fasterxml.jackson.core:jackson-core,joda-time:joda-time
packages using the --exclude-packages
option.core-site.xml
; see Installing the Cloud Storage connector.transformScala
option to filter and transform the input DataFrame into a better form for indexing using the following Scala script:emp_no
column (int
). Behind the scenes, Spark sends four separate queries to the database and processes the result sets in parallel.resourceType=spark:jar
; for example:resourceType=spark:jar
from the blob store to the appropriate classpaths before running a Parallel Bulk Loader job.$MIN(emp_no)
and $MAX(emp_no)
macros in the read options. These are macros offered by the Parallel Bulk Loader to help configure parallel reads of JDBC tables. Behind the scenes, the macros are translated into SQL queries to get the MAX and MIN values of the specified field, which Spark uses to compute splits for partitioned queries. As mentioned above, the field must be numeric and must have a relatively balanced distribution of values between MAX and MIN; otherwise, you are unlikely to see much performance benefit to partitioning.hbase-site.xml
(and possibly core-site.xml
) to apps/spark-dist/conf
in Fusion, for example:fusion_nums
with a single column family named lw
:
$lastTimestamp
macro in the read options. This lets us filter rows read from HBase using the timestamp of the last document the Parallel Bulk Loader wrote to Solr, that is, to get the newest updates from HBase only (incremental updates). Most Spark data sources provide a way to filter results based on timestamp.Job JSON:org.elasticsearch:elasticsearch-spark-20_2.11:6.2.1
package, here is a Scala script to run in bin/spark-shell
to index some test data:cbq -e=http://<host>:8091 -u <user> -p <password>
. Ensure the provided user is an authorized user of the test bucket.CREATE PRIMARY INDEX 'test-primary-index' ON 'test' USING GSI;
.INSERT INTO 'test' ( KEY, VALUE ) VALUES ( "1", { "id": "01", "field1": "a value", "field2": "another value"} ) RETURNING META().id as docid, *;
.select * from 'test';
.com.couchbase.spark.sql.DefaultSource
. Then specify the com.couchbase.client:spark-connector_2.11:2.2.0
package as the spark shell --packages
option, as well as a few spark settings that direct the connector to a particular Couchbase server and bucket to connect to using the provided credentials. See here for all of the available Spark configuration settings for the Couchbase Spark connector.Putting it all together:format
and --packages
. In addition, you must specify the filepath in the readOptions
section. For example: