Custom Python Jobs
The Custom Python job provides user the ability to run Python code via Fusion. This job supports Python 3.6+ code.
Usage
Python code can be entered directly in the job configuration editor or you can reference a script that has been uploaded to the blob store. Additional Python libraries or files can be supplied via a Python files configuration.
Examples
Example Python script that indexes data from parquet to Solr via a Fusion index pipeline:
from pyspark.sql import SparkSession
import sys
"""
Python script that indexes data from parquet to Fusion via index pipeline
zkhost of the cluster is always passed as the first argument
"""
if __name__ == "__main__":
if len(sys.argv) != 5:
print("Program requires 3 arguments. Args passed {}. Add <parquet_file> COLLECTION_NAME <index-pipeline> via submit args in the job config".format(sys.argv), file=sys.stderr)
sys.exit(-1)
zkhost = sys.argv[1]
parquet_file = sys.argv[2]
collection = sys.argv[3]
index_pipeline = sys.argv[4]
sparkSession = SparkSession.builder\
.appName("load_data_to_index_pipeline")\
.config("fs.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem")\
.config("fs.AbstractFileSystem.gs.impl", "com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS")\
.getOrCreate()
df=sparkSession.read.parquet(parquet_file).limit(1000)
df.write.format("lucidworks.fusion.index").option(
"zkhost", zkhost).option(
"collection", collection).option(
"pipeline", index_pipeline).save()
The above script is wrapped in the script
variable of the job config and several arguments are passed via the submitArgs
configuration key:
{
"type": "custom_python_job",
"id": "test_python_script",
"script": "\nfrom pyspark.sql import SparkSession\n\nimport sys\n\n\"\"\"\nPython script that indexes data from parquet to Fusion via index pipeline\n \nzkhost of the cluster is always passed as the first argument\n\"\"\"\nif __name__ == \"__main__\":\n if len(sys.argv) != 5:\n print(\"Program requires 3 arguments. Args passed {}. Add <parquet_file> COLLECTION_NAME <index-pipeline> via submit args in the job config\".format(sys.argv), file=sys.stderr)\n sys.exit(-1)\n\n zkhost = sys.argv[1]\n parquet_file = sys.argv[2]\n collection = sys.argv[3]\n index_pipeline = sys.argv[4] \n\n sparkSession = SparkSession.builder .appName(\"load_data_to_index_pipeline\") .config(\"fs.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFileSystem\") .config(\"fs.AbstractFileSystem.gs.impl\", \"com.google.cloud.hadoop.fs.gcs.GoogleHadoopFS\") .getOrCreate()\n\n df=sparkSession.read.parquet(parquet_file).limit(1000)\n df.write.format(\"lucidworks.fusion.index\").option(\n \"zkhost\", zkhost).option(\n \"collection\", collection).option(\n \"pipeline\", index_pipeline).save()\n",
"submitArgs": [
"gs://smartdata-datasets/best_buy_product_catalog.snappy.parquet",
"demo",
"demo"
],
"verboseReporting": true
}
For more PySpark script examples, see https://github.com/apache/spark/blob/v2.4.4/examples/src/main/python.
Configuration
Apache Arrow is installed to the image and the two settings below are enabled by default. If you want to disable arrow optimization, set these properties to false in the job config or in job-launcher config map:
spark.sql.execution.arrow.enabled true
spark.sql.execution.arrow.fallback.enabled true
Available libraries
These libraries are available in the Fusion Spark image:
-
numpy
-
scipy
-
matplotlib
-
pandas
-
scikit-learn
Adding libraries
If you need to add extra libraries to run your code, you can upload the Python egg files to the blob store and reference their blob IDs in the job configuration.
However, machine learning libraries (like tensorflow
, keras
, and pytorch
) are not easy to install with that approach. To install those libraries, follow this approach instead:
-
Use this example
Dockerfile
to extend from the base image:FROM lucidworks/fusion-spark:5.0.2 RUN pip3 install tensorflow keras pytorch
-
Build the Docker image and publish it to your own Docker registry.
-
Once the image is built, the custom image can be specified in the Spark settings via
spark.kubernetes.driver.container.image
andspark.kubernetes.executor.container.image
.
If you upload .zip files to add libraries, use the Other blob type for binary files instead of the File blob type. If the File blob type is used, the custom Python job fails.
|