Parallel Bulk LoaderJob configuration specifications
The Parallel Bulk Loader (PBL) job enables bulk ingestion of structured and semi-structured data from big data systems, NoSQL databases, and common file formats like Parquet and Avro.
Datasources the PBL uses include not only common file formats, but Solr databases, JDBC-compliant databases, MongoDB databases and more.
In addition, the PBL distributes the load across the Managed Fusion Spark cluster to optimize performance. And because no parsing is needed, indexing performance is also maximized by writing directly to Solr.
Usage
Use this job to load data into Managed Fusion from a SparkSQL-compliant datasource, and then send the data to any Spark-supported datasource such as Solr, an index pipeline, etc.
To create a Parallel Bulk Loader job, sign in to Managed Fusion and click Collections > Jobs. Then click Add+ and in the Custom and Others Jobs section, select Parallel Bulk Loader. You can enter basic and advanced parameters to configure the job. If the field has a default value, it is populated when you click to add the job.
Parallel Bulk Loader can be configured for many different use cases. Two examples are:
-
Organizations that need to meet financial/bank-level transactional integrity requirements use SQL databases. Those datasources are structured in relational tables and ensure data integrity even if errors or power failures occur. In addition, these datasources are useful in large-scale operations that employ complex queries for analytics and reporting. For example, if categories and products have various prices, discounts, and offering dates, a SQL datasource is the most efficient option. Lucidworks supports SQL databases such as JDBC-compliant databases.
-
In contrast, NoSQL databases are based on documents and allow for more flexibility with structured, semi-structured, and unstructured data. For example, your organization might need information about user product reviews or session data. And if your organization needs to process massive amount of data from multiple systems, NoSQL is an efficient option. Lucidworks supports NoSQL databses such as MongoDB and Apache Cassandra.
Example configuration
The following is an example configuration. The table after the configuration defines the fields.
{
"id": "store_typeahead_entity_load",
"format": "solr",
"path": "https://example.com/products_database/*.*",
"streaming": [
{
"enableStreaming": true,
"outputMode": "append"
}
],
"readOptions": [
{
"key": "collection",
"value": "store"
}
],
"outputCollection": "store_typeahead",
"outputIndexPipeline": "store_typeahead",
"outputParser": "store_typeahead",
"clearDatasource": true,
"outputPartitions": 5,
"optimizeOutput": 2,
"defineFieldsUsingInputSchema": true,
"atomicUpdates": false,
"writeOptions": [
{
"key": "write_collection",
"value": "store2"
}
],
"transformScala": "import script",
"mlModelId": "llm_model_id",
"sparkConfig": [
{
"key": "spark.sql.caseSensitive",
"value": "true"
},
{
"key": "spark.typeField_1",
"value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s"
}
],
"cacheAfterRead": false,
"continueAfterFailure": false,
"type": "parallel-bulk-loader",
"updates": [
{
"userId": "service_account",
"timestamp": "2024-05-06T09:06:43.739877Z"
},
{
"userId": "power_user",
"timestamp": "2024-07-30T20:30:31.347930292Z"
},
{
"userId": "power_user",
"timestamp": "2024-07-30T20:30:31.350243642Z"
}
]
}
Additional field information
This section provides more detailed information about some of the configuration fields.
-
format. Spark scans the job’s
classpath
for a class namedDefaultSource
in the<format>
package. For thesolr
format, where thesolr.DefaultSource
class is defined in thespark-solr
repository. -
transformScala. If the Scala script is not sufficient, you might need the full power of the Spark API to transform data into an indexable form.
-
The
transformScala
option lets you filter and/or transform the input DataFrame any way you would like. You can even define UDFs to use during your transformation. For an example of using Scala to transform the input DataFrame before indexing in Solr, see the Read from Parquet example. -
Another powerful use of the
transformScala
option is that you can pull in advanced libraries, such as Spark NLP (from John Snow Labs) to do NLP work on your content before indexing. See the Use NLP during indexing example. -
Your Scala script can do other things but, at a minimum, it must define the following function that the Parallel Bulk Loader invokes:
def transform(inputDF: Dataset[Row]) : Dataset[Row] = { // do transformations and/or filter the inputDF here } Your script can rely on the following vals: spark: SparkSession sc: SparkContext fusionZKConn: ZKConnection // needed to access Fusion API solrCollection: SolrCollection // output collection jobId: Loader job config ID Also, the following classes have already been imported: import org.apache.spark.SparkContext._ import spark.implicits._ import spark.sql import org.apache.spark.sql.functions._ import com.lucidworks.spark.util.{SparkShellSupport => _lw} import com.lucidworks.spark.job.sql.SparkSQLLoader import com.lucidworks.spark.ml.recommenders.SolrCollection import com.lucidworks.spark.ZKConnection import org.apache.spark.sql.{Dataset, Row}
-
-
cacheAfterRead. This hidden field specifies if input data is retained in memory (and on disk as needed) after reading. If set to
true
, it may help stability of the job by reading all data from the input source first before transforming or writing to Solr. This could make the job run slower because it adds an intermediate write operation. For example, false. -
updates. This field lists the
userId
accounts that have been updated. Thetimestamp
contains the date and time the account was updated. The value is displayed in Unix epoch time in ayyyy-mm-ddThh:mm:ssZ
format. For example, the power_user was updated on 2024-07-30T20:30:31.350243642Z.
API usage examples
The following examples include requests and responses for some of the API endpoints.
Create a parallel bulk loader job
An example request to create a parallel bulk loader job with the REST API is as follows:
curl --request POST \
--url https://FUSION_HOST/api/spark/configurations \
--header "Accept: */*" \
--header "Authorization: Basic ACCESS_TOKEN" \
--header "Content-Type: application/json" \
--data '{
"id": "store_typeahead_entity_load",
"format": "solr",
"sparkConfig": [
{
"key": "spark.sql.caseSensitive",
"value": "true"
},
],
"readOptions": [
{
"key": "collection",
"value": "store"
},
{
"key": "zkHost",
"value": "zookeeper-549"
},
],
"type": "parallel-bulk-loader"
}
The response is a message indicating success or failure.
Start a parallel bulk loader job
This request starts the pbl_load
PBL job.
curl --request POST \
--url https://FUSION_HOST.com/api/spark/jobs/store_typeahead_entity_load \
--header "Accept: */*" \
--header "Authorization: Basic ACCESS_TOKEN" \
--header "Content-Type: application/json"
The response is:
{
"state": "starting",
"jobId": "a2b50rrce7",
"jobConfig": {
"type": "parallel-bulk-loader",
"id": "store_typeahead_entity_load",
"format": "solr",
"readOptions": [
{
"key": "collection",
"value": "store"
},
{
"key": "zkHost",
"value": "zookeeper-549"
}
],
"outputCollection": "store_typeahead",
"outputIndexPipeline": "store_typeahead",
"clearDatasource": true,
"defineFieldsUsingInputSchema": true,
"atomicUpdates": false,
"transformScala": "import script",
"sparkConfig": [
{
"key": "spark.sql.caseSensitive",
"value": "true"
},
{
"key": "spark.typeField_1",
"value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s"
}
],
"cacheAfterRead": false,
"continueAfterFailure": false,
"type": "parallel-bulk-loader",
"updates": [
{
"userId": "service_account",
"timestamp": "2024-05-06T09:06:43.739877Z"
},
{
"userId": "power_user",
"timestamp": "2024-07-30T20:30:31.347930292Z"
},
{
"userId": "power_user",
"timestamp": "2024-07-30T20:30:31.350243642Z"
}
]
},
"hostname": "111.111.111.111",
"result": {
"jobConfigId": "store_typeahead_entity_load",
"jobRunId": "b2c50rrde6"
}
}
POST spark configuration to a parallel bulk loader job
This request posts Spark configuration information to the store_typeahead_entity_load
PBL job.
curl --request POST \
--url https://FUSION_HOST.com/api/spark/configurations \
--header "Accept: */*" \
--header "Authorization: Basic ACCESS_TOKEN" \
--header "Content-Type: application/json" \
--data '{
"id": "store_typeahead_entity_load",
"sparkConfig": [
{
"key": "spark.sql.caseSensitive",
"value": "true",
},
{
"key": "spark.typeField_1",
"value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s",
}
],
"type": "parallel-bulk-loader"
}'