Looking for the old docs site? You can still view it for a limited time here.

Custom Python Jobs

The Custom Python job provides user the ability to run Python code via Fusion. This job supports Python 3.6+ code.


Python code can be entered directly in the job configuration editor or you can reference a script that has been uploaded to the blob store. Additional Python libraries or files can be supplied via a Python files configuration.


Example Python script that indexes data from parquet to Solr via a Fusion index pipeline:

from pyspark.sql import SparkSession

import sys

Python script that indexes data from parquet to Fusion via index pipeline

zkhost of the cluster is always passed as the first argument
if __name__ == "__main__":
  if len(sys.argv) != 5:
    print("Program requires 3 arguments. Args passed {}. Add <parquet_file> <collection> <index-pipeline> via submit args in the job config".format(sys.argv), file=sys.stderr)

  zkhost = sys.argv[1]
  parquet_file = sys.argv[2]
  collection = sys.argv[3]
  index_pipeline = sys.argv[4]

  sparkSession = SparkSession.builder\
    .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")\
    .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")\

      "zkhost", zkhost).option(
      "collection", collection).option(
      "pipeline", index_pipeline).save()

The above script is wrapped in the script variable of the job config and several arguments are passed via the submitArgs configuration key:

  "type": "custom_python_job",
  "id": "test_python_script",
  "script": "\nfrom pyspark.sql import SparkSession\n\nimport sys\n\n\"\"\"\nPython script that indexes data from parquet to Fusion via index pipeline\n \nzkhost of the cluster is always passed as the first argument\n\"\"\"\nif __name__ == \"__main__\":\n  if len(sys.argv) != 5:\n    print(\"Program requires 3 arguments. Args passed {}. Add <parquet_file> <collection> <index-pipeline> via submit args in the job config\".format(sys.argv), file=sys.stderr)\n    sys.exit(-1)\n\n  zkhost = sys.argv[1]\n  parquet_file = sys.argv[2]\n  collection = sys.argv[3]\n  index_pipeline = sys.argv[4]  \n\n  sparkSession = SparkSession.builder    .appName(\"load_data_to_index_pipeline\")    .config(\"fs.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\")    .config(\"fs.AbstractFileSystem.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS\")    .getOrCreate()\n\n  df=sparkSession.read.parquet(parquet_file).limit(1000)\n  df.write.format(\"lucidworks.fusion.index\").option(\n      \"zkhost\", zkhost).option(\n      \"collection\", collection).option(\n      \"pipeline\", index_pipeline).save()\n",
  "submitArgs": [
  "verboseReporting": true


Apache Arrow is installed to the image and the two settings below are enabled by default. If you want to disable arrow optimization, set these properties to false in the job config or in job-launcher config map:

spark.sql.execution.arrow.enabled true
spark.sql.execution.arrow.fallback.enabled true

Available libraries

These libraries are available in the Fusion Spark image:

  • numpy

  • scipy

  • matplotlib

  • pandas

  • scikit-learn

Adding libraries

If you need to add extra libraries to run your code, you can upload the Python egg files to the blob store and reference their blob IDs in the job configuration.

However, machine learning libraries (like tensorflow, keras, and pytorch) are not easy to install with that approach. To install those libraries, follow this approach instead:

  1. Use this example Dockerfile to extend from the base image:

    FROM lucidworks/fusion-spark:5.0.2
    RUN pip3 install tensorflow keras pytorch
  2. Build the Docker image and publish it to your own Docker registry.

  3. Once the image is built, the custom image can be specified in the Spark settings via spark.kubernetes.driver.container.image and spark.kubernetes.executor.container.image.