Product Selector

Fusion 5.12
    Fusion 5.12

    Custom Python Jobs

    The Custom Python job provides user the ability to run Python code via Fusion. This job supports Python 3.6+ code.

    Usage

    Python code can be entered directly in the job configuration editor or you can reference a script that has been uploaded to the blob store. Additional Python libraries or files can be supplied via a Python files configuration.

    Examples

    Example Python script that indexes data from parquet to Solr via a Fusion index pipeline:

    from pyspark.sql import SparkSession
    
    import sys
    
    """
    Python script that indexes data from parquet to Fusion via index pipeline
    
    zkhost of the cluster is always passed as the first argument
    """
    if __name__ == "__main__":
      if len(sys.argv) != 5:
        print("Program requires 3 arguments. Args passed {}. Add <parquet_file> COLLECTION_NAME <index-pipeline> via submit args in the job config".format(sys.argv), file=sys.stderr)
        sys.exit(-1)
    
      zkhost = sys.argv[1]
      parquet_file = sys.argv[2]
      collection = sys.argv[3]
      index_pipeline = sys.argv[4]
    
      sparkSession = SparkSession.builder\
        .appName("load_data_to_index_pipeline")\
        .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")\
        .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")\
        .getOrCreate()
    
      df=sparkSession.read.parquet(parquet_file).limit(1000)
      df.write.format("lucidworks.fusion.index").option(
          "zkhost", zkhost).option(
          "collection", collection).option(
          "pipeline", index_pipeline).save()

    The above script is wrapped in the script variable of the job config and several arguments are passed via the submitArgs configuration key:

    {
      "type": "custom_python_job",
      "id": "test_python_script",
      "script": "\nfrom pyspark.sql import SparkSession\n\nimport sys\n\n\"\"\"\nPython script that indexes data from parquet to Fusion via index pipeline\n \nzkhost of the cluster is always passed as the first argument\n\"\"\"\nif __name__ == \"__main__\":\n  if len(sys.argv) != 5:\n    print(\"Program requires 3 arguments. Args passed {}. Add <parquet_file> COLLECTION_NAME <index-pipeline> via submit args in the job config\".format(sys.argv), file=sys.stderr)\n    sys.exit(-1)\n\n  zkhost = sys.argv[1]\n  parquet_file = sys.argv[2]\n  collection = sys.argv[3]\n  index_pipeline = sys.argv[4]  \n\n  sparkSession = SparkSession.builder    .appName(\"load_data_to_index_pipeline\")    .config(\"fs.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\")    .config(\"fs.AbstractFileSystem.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS\")    .getOrCreate()\n\n  df=sparkSession.read.parquet(parquet_file).limit(1000)\n  df.write.format(\"lucidworks.fusion.index\").option(\n      \"zkhost\", zkhost).option(\n      \"collection\", collection).option(\n      \"pipeline\", index_pipeline).save()\n",
      "submitArgs": [
        "gs://smartdata-datasets/best_buy_product_catalog.snappy.parquet",
        "demo",
        "demo"
      ],
      "verboseReporting": true
    }

    Configuration

    Apache Arrow is installed to the image and the two settings below are enabled by default. If you want to disable arrow optimization, set these properties to false in the job config or in job-launcher config map:

    spark.sql.execution.arrow.enabled true
    spark.sql.execution.arrow.fallback.enabled true

    Available libraries

    These libraries are available in the Fusion Spark image:

    • numpy

    • scipy

    • matplotlib

    • pandas

    • scikit-learn

    Adding libraries

    If you need to add extra libraries to run your code, you can upload the Python egg files to the blob store and reference their blob IDs in the job configuration.

    However, machine learning libraries (like tensorflow, keras, and pytorch) are not easy to install with that approach. To install those libraries, follow this approach instead:

    1. Use this example Dockerfile to extend from the base image:

      FROM lucidworks/fusion-spark:5.0.2
      RUN pip3 install tensorflow keras pytorch
    2. Build the Docker image and publish it to your own Docker registry.

    3. Once the image is built, the custom image can be specified in the Spark settings via spark.kubernetes.driver.container.image and spark.kubernetes.executor.container.image.

    Use this job when you want to run a python/pyspark job

    id - stringrequired

    The ID for this Spark job. Used in the API to reference this job. Allowed characters: a-z, A-Z, dash (-) and underscore (_). Maximum length: 63 characters.

    <= 63 characters

    Match pattern: [a-zA-Z][_\-a-zA-Z0-9]*[a-zA-Z0-9]?

    sparkConfig - array[object]

    Spark configuration settings.

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    script - string

    Custom python/pyspark script to be submitted as a Fusion job

    >= 1 characters

    resourceName - string

    Name of the resource uploaded to Blob store. This should match with the Blob name

    >= 1 characters

    pythonFiles - array[string]

    Blob resource (.zip, .egg, .py files) to place on the PYTHONPATH for Python apps

    submitArgs - array[string]

    Additional options to pass to the Spark Submit when running this job.

    javaOptions - array[object]

    Java options to pass to Spark driver/executor

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    verboseReporting - boolean

    Enables verbose reporting for SparkSubmit

    Default: true

    envOptions - array[object]

    Set environment variables for driver

    object attributes:{key required : {
     display name: Parameter Name
     type: string
    }
    value : {
     display name: Parameter Value
     type: string
    }
    }

    type - stringrequired

    Default: custom_python_job

    Allowed values: custom_python_job