Product Selector

Fusion 5.9
    Fusion 5.9

    Parallel Bulk LoaderJob configuration specifications

    The Parallel Bulk Loader (PBL) job enables bulk ingestion of structured and semi-structured data from big data systems, NoSQL databases, and common file formats like Parquet and Avro.

    Datasources the PBL uses include not only common file formats, but Solr databases, JDBC-compliant databases, MongoDB databases and more.

    In addition, the PBL distributes the load across the Managed Fusion Spark cluster to optimize performance. And because no parsing is needed, indexing performance is also maximized by writing directly to Solr.

    Usage

    Use this job to load data into Managed Fusion from a SparkSQL-compliant datasource, and then send the data to any Spark-supported datasource such as Solr, an index pipeline, etc.

    To create a Parallel Bulk Loader job, sign in to Managed Fusion and click Collections > Jobs. Then click Add+ and in the Custom and Others Jobs section, select Parallel Bulk Loader. You can enter basic and advanced parameters to configure the job. If the field has a default value, it is populated when you click to add the job.

    Parallel Bulk Loader can be configured for many different use cases. Two examples are:

    • Organizations that need to meet financial/bank-level transactional integrity requirements use SQL databases. Those datasources are structured in relational tables and ensure data integrity even if errors or power failures occur. In addition, these datasources are useful in large-scale operations that employ complex queries for analytics and reporting. For example, if categories and products have various prices, discounts, and offering dates, a SQL datasource is the most efficient option. Lucidworks supports SQL databases such as JDBC-compliant databases.

    • In contrast, NoSQL databases are based on documents and allow for more flexibility with structured, semi-structured, and unstructured data. For example, your organization might need information about user product reviews or session data. And if your organization needs to process massive amount of data from multiple systems, NoSQL is an efficient option. Lucidworks supports NoSQL databses such as MongoDB and Apache Cassandra.

    Example configuration

    The following is an example configuration. The table after the configuration defines the fields.

    {
            "id": "store_typeahead_entity_load",
            "format": "solr",
            "path": "https://example.com/products_database/*.*",
            "streaming": [
                {
                    "enableStreaming": true,
                    "outputMode": "append"
                }
            ],
            "readOptions": [
                {
                    "key": "collection",
                    "value": "store"
                }
            ],
            "outputCollection": "store_typeahead",
            "outputIndexPipeline": "store_typeahead",
            "outputParser": "store_typeahead",
            "clearDatasource": true,
            "outputPartitions": 5,
            "optimizeOutput": 2,
            "defineFieldsUsingInputSchema": true,
            "atomicUpdates": false,
            "writeOptions": [
                {
                    "key": "write_collection",
                    "value": "store2"
                }
            ],
            "transformScala": "import script",
            "mlModelId": "llm_model_id",
            "sparkConfig": [
                {
                    "key": "spark.sql.caseSensitive",
                    "value": "true"
                },
                {
                    "key": "spark.typeField_1",
                    "value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s"
                }
            ],
            "cacheAfterRead": false,
            "continueAfterFailure": false,
            "type": "parallel-bulk-loader",
            "updates": [
                {
                    "userId": "service_account",
                    "timestamp": "2024-05-06T09:06:43.739877Z"
                },
                {
                    "userId": "power_user",
                    "timestamp": "2024-07-30T20:30:31.347930292Z"
                },
                {
                    "userId": "power_user",
                    "timestamp": "2024-07-30T20:30:31.350243642Z"
                }
            ]
        }

    Additional field information

    This section provides more detailed information about some of the configuration fields.

    • format. Spark scans the job’s classpath for a class named DefaultSource in the <format> package. For the solr format, where the solr.DefaultSource class is defined in the spark-solr repository.

    • transformScala. If the Scala script is not sufficient, you might need the full power of the Spark API to transform data into an indexable form.

      • The transformScala option lets you filter and/or transform the input DataFrame any way you would like. You can even define UDFs to use during your transformation. For an example of using Scala to transform the input DataFrame before indexing in Solr, see the Read from Parquet example.

      • Another powerful use of the transformScala option is that you can pull in advanced libraries, such as Spark NLP (from John Snow Labs) to do NLP work on your content before indexing. See the Use NLP during indexing example.

      • Your Scala script can do other things but, at a minimum, it must define the following function that the Parallel Bulk Loader invokes:

        def transform(inputDF: Dataset[Row]) : Dataset[Row] = {
          // do transformations and/or filter the inputDF here
        }
        
        Your script can rely on the following vals:
        spark: SparkSession
        sc: SparkContext
        fusionZKConn: ZKConnection // needed to access Fusion API
        solrCollection: SolrCollection // output collection
        jobId: Loader job config ID
        
        Also, the following classes have already been imported:
        import org.apache.spark.SparkContext._
        import spark.implicits._
        import spark.sql
        import org.apache.spark.sql.functions._
        import com.lucidworks.spark.util.{SparkShellSupport => _lw}
        import com.lucidworks.spark.job.sql.SparkSQLLoader
        import com.lucidworks.spark.ml.recommenders.SolrCollection
        import com.lucidworks.spark.ZKConnection
        import org.apache.spark.sql.{Dataset, Row}
    • cacheAfterRead. This hidden field specifies if input data is retained in memory (and on disk as needed) after reading. If set to true, it may help stability of the job by reading all data from the input source first before transforming or writing to Solr. This could make the job run slower because it adds an intermediate write operation. For example, false.

    • updates. This field lists the userId accounts that have been updated. The timestamp contains the date and time the account was updated. The value is displayed in Unix epoch time in a yyyy-mm-ddThh:mm:ssZ format. For example, the power_user was updated on 2024-07-30T20:30:31.350243642Z.

    API usage examples

    The following examples include requests and responses for some of the API endpoints.

    Create a parallel bulk loader job

    An example request to create a parallel bulk loader job with the REST API is as follows:

    curl --request POST \
      --url https://FUSION_HOST/api/spark/configurations \
      --header "Accept: */*" \
      --header "Authorization: Basic ACCESS_TOKEN" \
      --header "Content-Type: application/json" \
      --data '{
      "id": "store_typeahead_entity_load",
      "format": "solr",
      "sparkConfig": [
        {
          "key": "spark.sql.caseSensitive",
          "value": "true"
        },
      ],
      "readOptions": [
            {
          "key": "collection",
          "value": "store"
        },
        {
          "key": "zkHost",
          "value": "zookeeper-549"
        },
      ],
      "type": "parallel-bulk-loader"
    }

    The response is a message indicating success or failure.

    Start a parallel bulk loader job

    This request starts the pbl_load PBL job.

    curl --request POST \
      --url https://FUSION_HOST.com/api/spark/jobs/store_typeahead_entity_load \
      --header "Accept: */*" \
      --header "Authorization: Basic ACCESS_TOKEN" \
      --header "Content-Type: application/json"

    The response is:

    {
        "state": "starting",
        "jobId": "a2b50rrce7",
        "jobConfig": {
            "type": "parallel-bulk-loader",
            "id": "store_typeahead_entity_load",
            "format": "solr",
            "readOptions": [
                {
                    "key": "collection",
                    "value": "store"
                },
                {
                    "key": "zkHost",
                    "value": "zookeeper-549"
                }
            ],
            "outputCollection": "store_typeahead",
            "outputIndexPipeline": "store_typeahead",
            "clearDatasource": true,
            "defineFieldsUsingInputSchema": true,
            "atomicUpdates": false,
            "transformScala": "import script",
            "sparkConfig": [
                {
                    "key": "spark.sql.caseSensitive",
                    "value": "true"
                },
                {
                    "key": "spark.typeField_1",
                    "value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s"
                }
            ],
            "cacheAfterRead": false,
            "continueAfterFailure": false,
            "type": "parallel-bulk-loader",
            "updates": [
                {
                    "userId": "service_account",
                    "timestamp": "2024-05-06T09:06:43.739877Z"
                },
                {
                    "userId": "power_user",
                    "timestamp": "2024-07-30T20:30:31.347930292Z"
                },
                {
                    "userId": "power_user",
                    "timestamp": "2024-07-30T20:30:31.350243642Z"
                }
            ]
        },
        "hostname": "111.111.111.111",
        "result": {
            "jobConfigId": "store_typeahead_entity_load",
            "jobRunId": "b2c50rrde6"
        }
    }

    POST spark configuration to a parallel bulk loader job

    This request posts Spark configuration information to the store_typeahead_entity_load PBL job.

    curl --request POST \
      --url https://FUSION_HOST.com/api/spark/configurations \
      --header "Accept: */*" \
      --header "Authorization: Basic ACCESS_TOKEN" \
      --header "Content-Type: application/json" \
      --data '{
      "id": "store_typeahead_entity_load",
      "sparkConfig": [
        {
          "key": "spark.sql.caseSensitive",
          "value": "true",
        },
        {
          "key": "spark.typeField_1",
          "value": "Suggested_Doc, doc_title, url, doc_locale, doc_sector_ss, doc_economic_buyer_ss, doc_topic_ss, title_s",
        }
      ],
      "type": "parallel-bulk-loader"
    }'

    Use this job when you want to load data into Fusion from a SparkSQL compliant datasource, and send this data to any Spark supported datasource (Solr/Index Pipeline/S3/GCS/...).

    id - stringrequired

    The ID for this Spark job. Used in the API to reference this job. Allowed characters: a-z, A-Z, dash (-) and underscore (_). Maximum length: 63 characters.

    <= 63 characters

    Match pattern: [a-zA-Z][_\-a-zA-Z0-9]*[a-zA-Z0-9]?

    sparkConfig - array[object]

    Spark configuration settings.

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    format - stringrequired

    Specifies the input data source format; common examples include: parquet, json, textinputformat

    path - string

    Path to load; for data sources that support multiple paths, separate by commas

    streaming - Streaming

    enableStreaming - boolean

    Stream data from input source to output Solr collection

    outputMode - string

    Specifies the output mode for streaming. E.g., append (default), complete, update

    Default: append

    Allowed values: appendcompleteupdate

    readOptions - array[object]

    Options passed to the data source to configure the read operation; options differ for every data source so refer to the documentation for more information.

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    outputCollection - string

    Solr Collection to send the documents loaded from the input data source.

    outputIndexPipeline - string

    Send the documents loaded from the input data source to an index pipeline instead of going directly to Solr.

    outputParser - string

    Parser to send the documents to while sending to index pipeline. (Defaults to same as index pipeline)

    defineFieldsUsingInputSchema - boolean

    If true, define fields in Solr using the input schema; if a SQL transform is defined, the fields to define are based on the transformed DataFrame schema instead of the input.

    Default: true

    atomicUpdates - boolean

    Send documents to Solr as atomic updates; only applies if sending directly to Solr and not an index pipeline.

    Default: false

    timestampFieldName - string

    Name of the field that holds a timestamp for each document; only required if using timestamps to filter new rows from the input source.

    clearDatasource - boolean

    If true, delete any documents indexed in Solr by previous runs of this job. Default is false.

    Default: false

    outputPartitions - integer

    Partition the input DataFrame into partitions before writing out to Solr or Fusion

    optimizeOutput - integer

    Optimize the Solr collection down to the specified number of segments after writing to Solr.

    writeOptions - array[object]

    Options used when writing output. For output formats other than solr or index-pipeline, format and path options can be specified here

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    transformScala - string

    Optional Scala script used to transform the results returned by the data source before indexing. You must define your transform script in a method with signature: def transform(inputDF: Dataset[Row]) : Dataset[Row]

    mlModelId - string

    The ID of the Spark ML PipelineModel stored in the Fusion blob store.

    transformSql - string

    Optional SQL used to transform the results returned by the data source before indexing. The input DataFrame returned from the data source will be registered as a temp table named '_input'. The Scala transform is applied before the SQL transform if both are provided, which allows you to define custom UDFs in the Scala script for use in your transformation SQL.

    shellOptions - array[object]

    Additional options to pass to the Spark shell when running this job.

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    templateParams - array[object]

    Bind the key/values to the script interpreter

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    continueAfterFailure - boolean

    If set to true, when a failure occurs when sending a document through an index pipeline, the job will continue onto the next document instead of failing

    Default: false

    type - stringrequired

    Default: parallel-bulk-loader

    Allowed values: parallel-bulk-loader