Product Selector

Fusion 5.9
    Fusion 5.9

    Parallel Bulk Loader

    Summary

    The Parallel Bulk Loader (PBL) job enables bulk ingestion of structured and semi-structured data from big data systems, NoSQL databases, and common file formats like Parquet and Avro.

    Datasources the PBL uses include not only common file formats, but Solr databases, JDBC-compliant databases, MongoDB databases and more.

    In addition, the PBL distributes the load across the Fusion Spark cluster to optimize performance. And because no parsing is needed, indexing performance is also maximized by writing directly to Solr.

    For more information about available datasources and key features of the Parallel Bulk Loader, see Parallel Bulk Loader concepts.

    Usage

    Use the Parallel Bulk Loader job to load data into Fusion from a SparkSQL-compliant datasource, and then send the data to any Spark-supported datasource such as Solr. index pipeline, etc.

    To create a Parallel Bulk Loader job in the Fusion UI, sign in to Fusion and click Collections > Jobs. Then click Add+ and in the Custom and Others Jobs section, select Parallel Bulk Loader. You can enter basic and advanced parameters to configure the job. If the field has a default value, it is populated when you click to add the job.

    Parallel Bulk Loader can be configured for many different use cases. Two examples are:

    • Organizations that need to meet financial/bank-level transactional integrity requirements use SQL databases. Those datasources are structured in relational tables and ensure data integrity even if errors or power failures occur. In addition, these datasources are useful in large-scale operations that employ complex queries for analytics and reporting. For example, if categories and products have various prices, discounts, and offering dates, a SQL datasource is the most efficient option. Lucidworks supports SQL databases such as JDBC-compliant databases.

    • In contrast, NoSQL databases are based on documents and allow for more flexibility with structured, semi-structured, and unstructured data. For example, your organization might need information about user product reviews or session data. And if your organization needs to process massive amount of data from multiple systems, NoSQL is an efficient option. Lucidworks supports NoSQL databses such as MongoDB and Apache Cassandra.

    Example configuration

    The following is an example configuration. The table after the configuration defines the fields.

    {
            "id": "store_typeahead_entity_load",
            "format": "solr",
            "path": "https://example.com/products_database/*.*",
            "streaming": [
                {
                    "enableStreaming": true,
                    "outputMode": "append"
                }
            ],
            "readOptions": [
                {
                    "key": "collection",
                    "value": "store"
                }
            ],
            "outputCollection": "store_typeahead",
            "outputIndexPipeline": "store_typeahead",
            "outputParser": "store_typeahead",
            "clearDatasource": true,
            "outputPartitions": 5,
            "optimizeOutput": 2,
            "defineFieldsUsingInputSchema": true,
            "atomicUpdates": false,
            "writeOptions": [
                {
                    "key": "write_collection",
                    "value": "store2"
                }
            ],
            "transformScala": "import script",
            "mlModelId": "llm_model_id",
            "sparkConfig": [
                {
                    "key": "spark.sql.caseSensitive",
                    "value": "true"
                },
                {
                    "key": "spark.typeField_1",
                    "value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s"
                }
            ],
            "cacheAfterRead": false,
            "continueAfterFailure": false,
            "type": "parallel-bulk-loader",
            "updates": [
                {
                    "userId": "service_account",
                    "timestamp": "2024-05-06T09:06:43.739877Z"
                },
                {
                    "userId": "power_user",
                    "timestamp": "2024-07-30T20:30:31.347930292Z"
                },
                {
                    "userId": "power_user",
                    "timestamp": "2024-07-30T20:30:31.350243642Z"
                }
            ]
        }

    Additional field information

    This section provides more detailed information about some of the configuration fields.

    • format. Spark scans the job’s classpath for a class named DefaultSource in the <format> package. For the solr format, where the solr.DefaultSource class is defined in the spark-solr repository.

    • transformScala. If the Scala script is not sufficient, you might need the full power of the Spark API to transform data into an indexable form.

      • The transformScala option lets you filter and/or transform the input DataFrame any way you would like. You can even define UDFs to use during your transformation. For an example of using Scala to transform the input DataFrame before indexing in Solr, see the Read from Parquet example.

      • Another powerful use of the transformScala option is that you can pull in advanced libraries, such as Spark NLP (from John Snow Labs) to do NLP work on your content before indexing. See the Use NLP during indexing example.

      • Your Scala script can do other things but, at a minimum, it must define the following function that the Parallel Bulk Loader invokes:

        def transform(inputDF: Dataset[Row]) : Dataset[Row] = {
          // do transformations and/or filter the inputDF here
        }
        
        Your script can rely on the following vals:
        spark: SparkSession
        sc: SparkContext
        fusionZKConn: ZKConnection // needed to access Fusion API
        solrCollection: SolrCollection // output collection
        jobId: Loader job config ID
        
        Also, the following classes have already been imported:
        import org.apache.spark.SparkContext._
        import spark.implicits._
        import spark.sql
        import org.apache.spark.sql.functions._
        import com.lucidworks.spark.util.{SparkShellSupport => _lw}
        import com.lucidworks.spark.job.sql.SparkSQLLoader
        import com.lucidworks.spark.ml.recommenders.SolrCollection
        import com.lucidworks.spark.ZKConnection
        import org.apache.spark.sql.{Dataset, Row}
    • cacheAfterRead. This hidden field specifies if input data is retained in memory (and on disk as needed) after reading. If set to true, it may help stability of the job by reading all data from the input source first before transforming or writing to Solr. This could make the job run slower because it adds an intermediate write operation. For example, false.

    • updates. This field lists the userId accounts that have been updated. The timestamp contains the date and time the account was updated. The value is displayed in Unix epoch time in a yyyy-mm-ddThh:mm:ssZ format. For example, the power_user was updated on 2024-07-30T20:30:31.350243642Z.

    API usage examples

    The following examples include requests and responses for some of the API endpoints.

    Create a parallel bulk loader job

    An example request to create a parallel bulk loader job with the REST API is as follows:

    curl --request POST \
      --url https://FUSION_HOST/api/spark/configurations \
      --header "Accept: */*" \
      --header "Authorization: Basic ACCESS_TOKEN" \
      --header "Content-Type: application/json" \
      --data '{
      "id": "store_typeahead_entity_load",
      "format": "solr",
      "sparkConfig": [
        {
          "key": "spark.sql.caseSensitive",
          "value": "true"
        },
      ],
      "readOptions": [
            {
          "key": "collection",
          "value": "store"
        },
        {
          "key": "zkHost",
          "value": "zookeeper-549"
        },
      ],
      "type": "parallel-bulk-loader"
    }

    The response is a message indicating success or failure.

    Start a parallel bulk loader job

    This request starts the pbl_load PBL job.

    curl --request POST \
      --url https://FUSION_HOST.com/api/spark/jobs/store_typeahead_entity_load \
      --header "Accept: */*" \
      --header "Authorization: Basic ACCESS_TOKEN" \
      --header "Content-Type: application/json"

    The response is:

    {
        "state": "starting",
        "jobId": "a2b50rrce7",
        "jobConfig": {
            "type": "parallel-bulk-loader",
            "id": "store_typeahead_entity_load",
            "format": "solr",
            "readOptions": [
                {
                    "key": "collection",
                    "value": "store"
                },
                {
                    "key": "zkHost",
                    "value": "zookeeper-549"
                }
            ],
            "outputCollection": "store_typeahead",
            "outputIndexPipeline": "store_typeahead",
            "clearDatasource": true,
            "defineFieldsUsingInputSchema": true,
            "atomicUpdates": false,
            "transformScala": "import script",
            "sparkConfig": [
                {
                    "key": "spark.sql.caseSensitive",
                    "value": "true"
                },
                {
                    "key": "spark.typeField_1",
                    "value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s"
                }
            ],
            "cacheAfterRead": false,
            "continueAfterFailure": false,
            "type": "parallel-bulk-loader",
            "updates": [
                {
                    "userId": "service_account",
                    "timestamp": "2024-05-06T09:06:43.739877Z"
                },
                {
                    "userId": "power_user",
                    "timestamp": "2024-07-30T20:30:31.347930292Z"
                },
                {
                    "userId": "power_user",
                    "timestamp": "2024-07-30T20:30:31.350243642Z"
                }
            ]
        },
        "hostname": "111.111.111.111",
        "result": {
            "jobConfigId": "store_typeahead_entity_load",
            "jobRunId": "b2c50rrde6"
        }
    }

    POST spark configuration to a parallel bulk loader job

    This request posts Spark configuration information to the store_typeahead_entity_load PBL job.

    curl --request POST \
      --url https://FUSION_HOST.com/api/spark/configurations \
      --header "Accept: */*" \
      --header "Authorization: Basic ACCESS_TOKEN" \
      --header "Content-Type: application/json" \
      --data '{
      "id": "store_typeahead_entity_load",
      "sparkConfig": [
        {
          "key": "spark.sql.caseSensitive",
          "value": "true",
        },
        {
          "key": "spark.typeField_1",
          "value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s",
        }
      ],
      "type": "parallel-bulk-loader"
    }'

    Use this job when you want to load data into Fusion from a SparkSQL compliant datasource, and send this data to any Spark supported datasource (Solr/Index Pipeline/S3/GCS/...).

    id - stringrequired

    This required field is the unique ID for the Spark job that references this job in the API. Allowed characters: a-z, A-Z, dash (-) and underscore (_). Maximum length: 63 characters. Example: pbl_load_llm_signals.

    <= 63 characters

    Match pattern: [a-zA-Z][_\-a-zA-Z0-9]*[a-zA-Z0-9]?

    sparkConfig - array[object]

    This section lets you enter optional `parameter name:parameter value` options to use for Spark configuration. These are the `Format` and `Path` fields in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example key:value pair is key: spark.kubernetes.credentials and value: credentials.json.

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    format - stringrequired

    This required field is the format of the input datasource. Values include parquet, json, or textinputformat. Example: parquet.

    path - string

    This optional field is the path to load the datasource. If the datasource has multiple paths, separate the paths with commas. Example: gs://llm_signals_store/company/*.parquet.

    streaming - Streaming

    enableStreaming - boolean

    If this optional checkbox is selected (set to `true`), the job streams the data from the input datasource to an output Solr collection. Example: true.

    outputMode - string

    This optional field specifies how the output is processed in the Solr collection where the documents loaded from the input datasource are stored. The options are append (default), complete, and update. Example: append.

    Default: append

    Allowed values: appendcompleteupdate

    readOptions - array[object]

    This section lets you enter `parameter name:parameter value` options to use when reading input from datasources. Options differ for every datasource, so refer to the documentation for that datasource for more information. Example of key:value pair is key: zkHost and value: zookeeper-549.

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    outputCollection - string

    This optional field is the Solr collection where the documents loaded from the input datasource are stored. Example: llm_signals.

    outputIndexPipeline - string

    This optional field is the index pipeline where the documents are loaded from the input datasource instead of being loaded directly to Solr. Example: signalstore_001.

    outputParser - string

    This advanced, optional field is the parser where documents are sent while sending to an index pipeline. The default value is the defined index pipeline in the `outputIndexPipeline` field. This is the `Send to Parser` field in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example: signal_parser.

    defineFieldsUsingInputSchema - boolean

    If this advanced, optional checkbox is selected (set to `true`), fields are defined in Solr using the input schema. However, if a SQL transform is defined, the fields to define are based on the transformed DataFrame schema instead of the input. This is the `Define fields in Solr?` checkbox in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example: true.

    Default: true

    atomicUpdates - boolean

    If this advanced, optional checkbox is selected (set to `true`), the job sends documents to Solr as atomic updates. An atomic update allows changes to one or more fields of a document without having to reindex the whole document. This feature only applies if sending directly to Solr and not an index pipeline. This is the `Send as Atomic updated?` checkbox in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example: false.

    Default: false

    timestampFieldName - string

    This advanced, optional field is the name of the field that contains the timestamp value for each document. This field is only required if timestamps are used to filter new rows from the input source. This is the `Timestamp field name` field in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example: start_date.

    clearDatasource - boolean

    If this advanced, optional checkbox is selected (set to `true`), the job deletes any documents indexed in Solr by previous runs of this job. The default is `false`. This is the `Clear existing documents` field in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example: false.

    Default: false

    outputPartitions - integer

    This advanced, optional field is the number of partitions to create in the input DataFrame where data is stored before it is written to Solr or Fusion. This is the `Output partitions` field in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example: 3.

    optimizeOutput - integer

    This advanced, optional field is the number of segments into which the Solr collection is optimized after data is written to Solr. This is the `Optimize` field in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example: 5.

    writeOptions - array[object]

    This advanced, optional section lets you enter `parameter name:parameter value` options to use when writing output to sources other than Solr or the index pipeline. To access this section in the Fusion UI, click the *Advanced* toggle on the screen.

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    transformScala - string

    The advanced, optional field is the Scala script used to transform the results returned by the datasource before indexing. Define the transform script in a method with `signature: def transform(inputDF: Dataset[Row]) : Dataset[Row]`. This is the `Transform Scala` field in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen.

    mlModelId - string

    This optional field is the identifier of the Spark machine learning (ML) pipeline model that is stored in the Fusion blob store. Example: paraphrase-multilingual-MiniLM-L12-v2

    transformSql - string

    This advanced, optional field is the SQL script used to transform the results returned by the datasource before indexing. The input DataFrame returned from the datasource is registered as a temp table named `_input`. The Scala transform is applied before the SQL transform if both are provided, which lets you define custom user-defined functions (UDFs) in the Scala script for use in your transformation SQL. This is the `Transform SQL` field in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen.

    shellOptions - array[object]

    This advanced, optional section lets you enter `parameter name:parameter value` options to send to the Spark shell when the job is run. This is the `Spark shell options` section in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example key:value pair is key: --driver-memory and value: 5g

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    templateParams - array[object]

    This advanced, optional section lets you enter `parameter name:parameter value` options to bind the `key:value` pairs to the script interpreter. This is the `Interpreter params` section in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen.

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    continueAfterFailure - boolean

    If this advanced, optional checkbox is selected (set to `true`), the job skips over a document that fails when it is sent through an index pipeline, and continues to the next document without failing the job. This is the `Continue after index failure` checkbox in the Fusion UI. To access this field in the Fusion UI, click the *Advanced* toggle on the screen. Example: false.

    Default: false

    type - stringrequired

    Default: parallel-bulk-loader

    Allowed values: parallel-bulk-loader