Summary

The Parallel Bulk Loader (PBL) job enables bulk ingestion of structured and semi-structured data from big data systems, NoSQL databases, and common file formats like Parquet and Avro. Datasources the PBL uses include not only common file formats, but Solr databases, JDBC-compliant databases, MongoDB databases and more. In addition, the PBL distributes the load across the Fusion Spark cluster to optimize performance. And because no parsing is needed, indexing performance is also maximized by writing directly to Solr. For more information about available datasources and key features of the Parallel Bulk Loader, see Parallel Bulk Loader concepts.

Usage

Use the Parallel Bulk Loader job to load data into Fusion from a SparkSQL-compliant datasource, and then send the data to any Spark-supported datasource such as Solr. index pipeline, etc. To create a Parallel Bulk Loader job in the Fusion UI, sign in to Fusion and click Collections > Jobs. Then click Add+ and in the Custom and Others Jobs section, select Parallel Bulk Loader. You can enter basic and advanced parameters to configure the job. If the field has a default value, it is populated when you click to add the job. Parallel Bulk Loader can be configured for many different use cases. Two examples are:
  • Organizations that need to meet financial/bank-level transactional integrity requirements use SQL databases. Those datasources are structured in relational tables and ensure data integrity even if errors or power failures occur. In addition, these datasources are useful in large-scale operations that employ complex queries for analytics and reporting. For example, if categories and products have various prices, discounts, and offering dates, a SQL datasource is the most efficient option. Lucidworks supports SQL databases such as JDBC-compliant databases.
  • In contrast, NoSQL databases are based on documents and allow for more flexibility with structured, semi-structured, and unstructured data. For example, your organization might need information about user product reviews or session data. And if your organization needs to process massive amount of data from multiple systems, NoSQL is an efficient option. Lucidworks supports NoSQL databses such as MongoDB and Apache Cassandra.

Example configuration

The following is an example configuration. The table after the configuration defines the fields.
{
        "id": "store_typeahead_entity_load",
        "format": "solr",
        "path": "https://example.com/products_database/*.*",
        "streaming": [
            {
                "enableStreaming": true,
                "outputMode": "append"
            }
        ],
        "readOptions": [
            {
                "key": "collection",
                "value": "store"
            }
        ],
        "outputCollection": "store_typeahead",
        "outputIndexPipeline": "store_typeahead",
        "outputParser": "store_typeahead",
        "clearDatasource": true,
        "outputPartitions": 5,
        "optimizeOutput": 2,
        "defineFieldsUsingInputSchema": true,
        "atomicUpdates": false,
        "writeOptions": [
            {
                "key": "write_collection",
                "value": "store2"
            }
        ],
        "transformScala": "import script",
        "mlModelId": "llm_model_id",
        "sparkConfig": [
            {
                "key": "spark.sql.caseSensitive",
                "value": "true"
            },
            {
                "key": "spark.typeField_1",
                "value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s"
            }
        ],
        "cacheAfterRead": false,
        "continueAfterFailure": false,
        "type": "parallel-bulk-loader",
        "updates": [
            {
                "userId": "service_account",
                "timestamp": "2024-05-06T09:06:43.739877Z"
            },
            {
                "userId": "power_user",
                "timestamp": "2024-07-30T20:30:31.347930292Z"
            },
            {
                "userId": "power_user",
                "timestamp": "2024-07-30T20:30:31.350243642Z"
            }
        ]
    }

Additional field information

This section provides more detailed information about some of the configuration fields.
  • format. Spark scans the job’s classpath for a class named DefaultSource in the <format> package. For the solr format, where the solr.DefaultSource class is defined in the spark-solr repository.
  • transformScala. If the Scala script is not sufficient, you might need the full power of the Spark API to transform data into an indexable form.
    • The transformScala option lets you filter and/or transform the input DataFrame any way you would like. You can even define UDFs to use during your transformation. For an example of using Scala to transform the input DataFrame before indexing in Solr, see the Import with the Bulk Loader example.
    • Another powerful use of the transformScala option is that you can pull in advanced libraries, such as Spark NLP (from John Snow Labs) to do NLP work on your content before indexing. See the Import with the Bulk Loader example.
    • Your Scala script can do other things but, at a minimum, it must define the following function that the Parallel Bulk Loader invokes:
def transform(inputDF: Dataset[Row]) : Dataset[Row] = {
  // do transformations and/or filter the inputDF here
}

Your script can rely on the following vals:
spark: SparkSession
sc: SparkContext
fusionZKConn: ZKConnection // needed to access Fusion API
solrCollection: SolrCollection // output collection
jobId: Loader job config ID

Also, the following classes have already been imported:
import org.apache.spark.SparkContext._
import spark.implicits._
import spark.sql
import org.apache.spark.sql.functions._
import com.lucidworks.spark.util.{SparkShellSupport => _lw}
import com.lucidworks.spark.job.sql.SparkSQLLoader
import com.lucidworks.spark.ml.recommenders.SolrCollection
import com.lucidworks.spark.ZKConnection
import org.apache.spark.sql.{Dataset, Row}
  • cacheAfterRead. This hidden field specifies if input data is retained in memory (and on disk as needed) after reading. If set to true, it may help stability of the job by reading all data from the input source first before transforming or writing to Solr. This could make the job run slower because it adds an intermediate write operation. For example, false.
  • updates. This field lists the userId accounts that have been updated. The timestamp contains the date and time the account was updated. The value is displayed in Unix epoch time in a yyyy-mm-ddThh:mm:ssZ format. For example, the power_user was updated on 2024-07-30T20:30:31.350243642Z.

API usage examples

The following examples include requests and responses for some of the API endpoints.

Create a parallel bulk loader job

An example request to create a parallel bulk loader job with the REST API is as follows:
curl --request POST \
  --url https://FUSION_HOST/api/spark/configurations \
  --header "Accept: */*" \
  --header "Authorization: Basic ACCESS_TOKEN" \
  --header "Content-Type: application/json" \
  --data '{
  "id": "store_typeahead_entity_load",
  "format": "solr",
  "sparkConfig": [
    {
      "key": "spark.sql.caseSensitive",
      "value": "true"
    },
  ],
  "readOptions": [
        {
      "key": "collection",
      "value": "store"
    },
    {
      "key": "zkHost",
      "value": "zookeeper-549"
    },
  ],
  "type": "parallel-bulk-loader"
}
The response is a message indicating success or failure.

Start a parallel bulk loader job

This request starts the pbl_load PBL job.
curl --request POST \
  --url https://FUSION_HOST.com/api/spark/jobs/store_typeahead_entity_load \
  --header "Accept: */*" \
  --header "Authorization: Basic ACCESS_TOKEN" \
  --header "Content-Type: application/json"
The response is:
{
    "state": "starting",
    "jobId": "a2b50rrce7",
    "jobConfig": {
        "type": "parallel-bulk-loader",
        "id": "store_typeahead_entity_load",
        "format": "solr",
        "readOptions": [
            {
                "key": "collection",
                "value": "store"
            },
            {
                "key": "zkHost",
                "value": "zookeeper-549"
            }
        ],
        "outputCollection": "store_typeahead",
        "outputIndexPipeline": "store_typeahead",
        "clearDatasource": true,
        "defineFieldsUsingInputSchema": true,
        "atomicUpdates": false,
        "transformScala": "import script",
        "sparkConfig": [
            {
                "key": "spark.sql.caseSensitive",
                "value": "true"
            },
            {
                "key": "spark.typeField_1",
                "value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s"
            }
        ],
        "cacheAfterRead": false,
        "continueAfterFailure": false,
        "type": "parallel-bulk-loader",
        "updates": [
            {
                "userId": "service_account",
                "timestamp": "2024-05-06T09:06:43.739877Z"
            },
            {
                "userId": "power_user",
                "timestamp": "2024-07-30T20:30:31.347930292Z"
            },
            {
                "userId": "power_user",
                "timestamp": "2024-07-30T20:30:31.350243642Z"
            }
        ]
    },
    "hostname": "111.111.111.111",
    "result": {
        "jobConfigId": "store_typeahead_entity_load",
        "jobRunId": "b2c50rrde6"
    }
}

POST spark configuration to a parallel bulk loader job

This request posts Spark configuration information to the store_typeahead_entity_load PBL job.
curl --request POST \
  --url https://FUSION_HOST.com/api/spark/configurations \
  --header "Accept: */*" \
  --header "Authorization: Basic ACCESS_TOKEN" \
  --header "Content-Type: application/json" \
  --data '{
  "id": "store_typeahead_entity_load",
  "sparkConfig": [
    {
      "key": "spark.sql.caseSensitive",
      "value": "true",
    },
    {
      "key": "spark.typeField_1",
      "value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s",
    }
  ],
  "type": "parallel-bulk-loader"
}'

Configuration properties