Python code can be entered directly in the job configuration editor or you can reference a script that has been uploaded to the blob store. Additional Python libraries or files can be supplied via a Python files configuration.
Example Python script that indexes data from parquet to Solr via a Managed Fusion index pipeline:
Copy
from pyspark.sql import SparkSessionimport sys"""Python script that indexes data from parquet to Managed Fusion via index pipelinezkhost of the cluster is always passed as the first argument"""if __name__ == "__main__": if len(sys.argv) != 5: print("Program requires 3 arguments. Args passed {}. Add <parquet_file> COLLECTION_NAME <index-pipeline> via submit args in the job config".format(sys.argv), file=sys.stderr) sys.exit(-1) zkhost = sys.argv[1] parquet_file = sys.argv[2] collection = sys.argv[3] index_pipeline = sys.argv[4] sparkSession = SparkSession.builder .appName("load_data_to_index_pipeline") .config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem") .config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS") .getOrCreate() df=sparkSession.read.parquet(parquet_file).limit(1000) df.write.format("lucidworks.fusion.index").option( "zkhost", zkhost).option( "collection", collection).option( "pipeline", index_pipeline).save()
The above script is wrapped in the script variable of the job config and several arguments are passed via the submitArgs configuration key:
Copy
{ "type": "custom_python_job", "id": "test_python_script", "script": "\nfrom pyspark.sql import SparkSession\n\nimport sys\n\n\"\"\"\nPython script that indexes data from parquet to Managed Fusion via index pipeline\n \nzkhost of the cluster is always passed as the first argument\n\"\"\"\nif __name__ == \"__main__\":\n if len(sys.argv) != 5:\n print(\"Program requires 3 arguments. Args passed {}. Add <parquet_file> COLLECTION_NAME <index-pipeline> via submit args in the job config\".format(sys.argv), file=sys.stderr)\n sys.exit(-1)\n\n zkhost = sys.argv[1]\n parquet_file = sys.argv[2]\n collection = sys.argv[3]\n index_pipeline = sys.argv[4] \n\n sparkSession = SparkSession.builder .appName(\"load_data_to_index_pipeline\") .config(\"fs.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\") .config(\"fs.AbstractFileSystem.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS\") .getOrCreate()\n\n df=sparkSession.read.parquet(parquet_file).limit(1000)\n df.write.format(\"lucidworks.fusion.index\").option(\n \"zkhost\", zkhost).option(\n \"collection\", collection).option(\n \"pipeline\", index_pipeline).save()\n", "submitArgs": [ "gs://smartdata-datasets/best_buy_product_catalog.snappy.parquet", "demo", "demo" ], "verboseReporting": true}
Apache Arrow is installed to the image and the two settings below are enabled by default. If you want to disable arrow optimization, set these properties to false in the job config or in job-launcher config map:
If you need to add extra libraries to run your code, you can upload the Python egg files to the blob store and reference their blob IDs in the job configuration.However, machine learning libraries (like tensorflow, keras, and pytorch) are not easy to install with that approach. To install those libraries, follow this approach instead:
Use this example Dockerfile to extend from the base image:
Copy
FROM lucidworks/fusion-spark:5.0.2RUN pip3 install tensorflow keras pytorch
Build the Docker image and publish it to your own Docker registry.
Once the image is built, the custom image can be specified in the Spark settings via spark.kubernetes.driver.container.image and spark.kubernetes.executor.container.image.
ImportantIf you upload .zip files to add libraries, use the Other blob type for binary files instead of the File blob type. If the File blob type is used, the custom Python job fails.