Skip to main content
The Ray/Seldon Vectorize Field stage invokes a machine learning model to encode a string field to a vector representation. In Managed Fusion 5.9.11 and earlier, this stage is called Seldon Vectorize Field.
This feature is only available in Managed Fusion 5.9.x for versions 5.9.6+.
For more information on setting up vector search with Ray or Seldon, see Configure Ray/Seldon vector search.
This tutorial walks you through deploying your own model to Fusion with Ray.
This feature is only available in Fusion 5.9.x for versions 5.9.12 and later.

Prerequisites

  • A Fusion instance with an app and indexed data.
  • An understanding of Python and the ability to write Python code.
  • Docker installed locally, plus a private or public Docker repository.
  • Ray installed locally: pip install ray[serve] using the version of ray[serve] found in the release notes for your version of Managed Fusion.
  • Code editor; you can use any editor, but Visual Studio Code is used in this example.
  • Model: intfloat/e5-small-v2
  • Docker image: e5-small-v2-ray

Tips

  • Always test your Python code locally before uploading to Docker and then Fusion. This simplifies troubleshooting significantly.
  • Once you’ve created your Docker you can also test locally by doing docker run with a specified port, like 9000, which you can then curl to confirm functionality in Fusion. See the testing example below.
  • If you previously deployed a model with Seldon, you can deploy the same model with Ray after making a few changes to your Docker image as explained in this topic. To avoid conflicts, deploy the model with a different name. When you have verified that the Ray model is working after deployment with Ray, you can delete the Seldon model using the Delete Seldon Core Model Deployment job.
  • If you run into an issue with the model not deploying and you’re using the ‘real’ example, there is a very good chance you haven’t allocated enough memory or CPU in your job spec or in the Ray-Argo config. It’s easy to increase the resources. To edit the ConfigMap, run kubectl edit configmap argo-deploy-ray-model-workflow -n <namespace> and then find the ray-head container in the artisanal escaped YAML and change the memory limit. Exercise caution when editing because it can break the YAML. Just delete and replace a single character at a time without changing any formatting.
LucidAcademyLucidworks offers free training to help you get started.The Course for Intro to Machine Learning in Fusion focuses on using machine learning to infer the goals of customers and users in order to deliver a more sophisticated search experience:
Intro to Machine Learning in FusionPlay Button
Visit the LucidAcademy to see the full training catalog.

Local testing example

  1. Docker command:
    docker run -p 127.0.0.1:9000:9000 DOCKER_IMAGE
    
  2. Curl to hit Docker:
    curl -i -X POST http://127.0.0.1:8000 -H 'Content-Type: application/json' -d '{"text": "The quick brown fox jumps over the lazy dog."}'
    
  3. Curl model in Fusion:
    curl -u $FUSION_USER:$FUSION_PASSWORD -X POST -H 'Content-Type: application/json' -d '{"text": "i love fusion"}' https://FUSION_HOST.com:6764/api/ai/ml-models/MODEL_NAME/prediction
    
  4. See all your deployed models:
    curl -u USERNAME:PASSWORD http://FUSION_HOST:FUSION_PORT/api/ai/ml-models
    
  5. Check the Ray UI to see Replica State, Resources, and Logs.
    If you are getting an internal model error, the best way to see what is going on is to query via port-forwarding the model.
    The MODEL_DEPLOYMENT in the command below can be found with kubectl get svc -n NAMESPACE. It will have the same name as set in the model name in the Create Ray Model Deployment job.
    kubectl -n NAMESPACE port-forward svc/MODEL_DEPLOYMENT-head-svc 8000:8000
    
    Once port-forwarding is successful, you can use the below cURL command to see the issue. At that point your worker logs should show helpful error messages.
    curl --location 'http://127.0.0.1:8000/' \
    --header 'charset: utf-8' \
    --header 'Content-Type: application/json' \
    --data '{"text": "i love fusion"}'
    

Download the model

This tutorial uses the e5-small-v2 model from Hugging Face, but any pre-trained model from https://huggingface.co will work with this tutorial.If you want to use your own model instead, you can do so, but your model must have been trained and then saved though a function similar to the PyTorch’s torch.save(model, PATH) function. See Saving and Loading Models in the PyTorch documentation.

Format a Python class

The next step is to format a Python class which will be invoked by Fusion to get the results from your model. The skeleton below represents the format that you should follow. See also Getting Started in the Ray Serve documentation.
from ray import serve
from starlette.requests import Request

# These defaults are for the ray serve deployment
# when running simply from docker. The 'Create Ray Model Deployment'
# job can override these replicas and resources if needed.
@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 1})
class Deployment(object):
    def __init__(self):
        """
        Add any initialization parameters. Generally this is where you would load
        your model. This method will be called once when the deployment is created.
        """
        print("Initializing")
        self.model = load_model() #faux code

    # This can be named as any method which takes a dictionary as input and returns a dictionary
    # as output. In this example, we are using the encode method to encode the
    # input text into a vector.
    def encode(self, input_dict: Dict[str, Any]) -> Dict[str, Any]:
        """
        This method will be called when the deployment is queried. It will receive
        the input data and should return the output data.
        """
        text = input_dict["text"]
        embeddings = self.model.encode #faux code
        return { "vector": embeddings } # To use the 'Ray / Seldon Vectorize Field' stage, the output key should be `vector`, if using the 'Machine Learning' stage you must ensure the output key matches the output key in the 'Machine Learning' stage

    async def __call__(self, http_request: Request) -> Dict[str, Any]:
        input_dict: Dict[str, Any] = await http_request.json()
        return self.encode(input_dict=input_dict) # This will be the function you defined above, in this case encode


app = Deployment.bind()

A real instance of this class with the e5-small-v2 model is as follows:
This code pulls from Hugging Face. To have the model load in the image without pulling from Hugging Face or other external sources, download the model weights into a folder name and change the model name to the folder name preceded by ./.
import json
import sys
from time import time
from typing import Any, Dict

import torch
import torch.nn.functional as F
from ray import serve
from starlette.requests import Request
from starlette.responses import JSONResponse
from torch import Tensor
from transformers import AutoModel, AutoTokenizer

HUB_MODEL_NAME = "intfloat/e5-small-v2"


@serve.deployment(num_replicas=1, ray_actor_options={"num_cpus": 1})
class Deployment(object):
    def __init__(self):
        from loguru import logger

        self.logger = logger
        # Initializing logger
        self.logger.remove()
        self.logger.add(sys.stdout, level="INFO", serialize=False, colorize=True)

        # Initializing model
        self.logger.info("Loading model...")
        self.tokenizer = AutoTokenizer.from_pretrained(HUB_MODEL_NAME)
        self.model = AutoModel.from_pretrained(HUB_MODEL_NAME)
        self.model.eval()
        self.logger.info("Model initialization finished!")

    def encode(self, input_dict: Dict[str, Any]) -> Dict[str, Any]:
        _start_time = time()

        # Extracting text from input
        text = input_dict["text"]

        # Tokenization
        tokenized_texts = self.tokenizer(
            text,
            max_length=512,
            padding=True,
            truncation=True,
            return_tensors="pt",
        )

        # Encoding
        with torch.inference_mode():
            # Forward pass of the model
            outputs = self.model(**tokenized_texts)

            # Average pooling the last hidden states
            embeddings = self.average_pool(
                outputs.last_hidden_state, tokenized_texts["attention_mask"]
            )

            # Normalizing embeddings
            embeddings = F.normalize(embeddings, p=2, dim=1)

            # Converting into output format
            output_dict = {"vector": embeddings.squeeze().tolist()}

        prediction_time = (time() - _start_time) * 1000
        self.logger.info(f"Time taken to make a prediction: {prediction_time:.0f}ms")
        return output_dict

    async def __call__(self, http_request: Request) -> Dict[str, Any]:
        try:
            input_dict: Dict[str, Any] = await http_request.json()
        except UnicodeDecodeError:
            body_bytes = await http_request.body()
            try:
                decoded = body_bytes.decode("utf-8", errors="replace")
                input_dict = json.loads(decoded)
            except json.JSONDecodeError:
                return JSONResponse({"error": "Invalid JSON"}, status_code=400)
        return self.encode(input_dict=input_dict)

    @staticmethod
    def average_pool(last_hidden_states: Tensor, attention_mask: Tensor) -> Tensor:
        last_hidden = last_hidden_states.masked_fill(
            ~attention_mask[..., None].bool(), 0.0
        )
        return last_hidden.sum(dim=1) / attention_mask.sum(dim=1)[..., None]


app = Deployment.bind()
In the preceding code, logging has been added for debugging purposes.The preceding code example contains the following functions:
  • __call__: This function is non-negotiable.
  • init: The init function is where models, tokenizers, vectorizers, and the like should be set to self for invoking. It is recommended that you include your model’s trained parameters directly into the Docker container rather than reaching out to external storage inside init.
  • encode: The encode function is where the field or query that is passed to the model from Fusion is processed. Alternatively, you can process it all in the __call__ function, but it is cleaner not to. The encode function can handle any text processing needed for the model to accept input invoked in its model.predict() or equivalent function which gets the expected model result.
If the output needs additional manipulation, that should be done before the result is returned. For embedding models, the return value must have the shape of (1, DIM), where DIM (vector dimension) is a consistent integer, to enable Fusion to handle the vector encoding into Ray.
Use the exact name of the class when naming this file.
In the preceding example, the Python file is named deployment.py and the class name is Deployment().

Create a Dockerfile

The next step is to create a Dockerfile. The Dockerfile should follow this general outline; read the comments for additional details:
#It is important that python version is 3.x-slim
FROM python:3.10-slim

# Install dependencies
RUN apt-get update && apt-get install -y wget

# Create working app directory
RUN mkdir -p /app
WORKDIR /app

# Copy the requirements file and install the dependencies
COPY requirements.txt /app
RUN pip install -r requirements.txt --no-cache-dir

# Copy source code
COPY deployment.py /app

# Expose serving port for HTTP communication with Fusion
EXPOSE 8000

# The end of the command follows module:application and the below value should be set in the RAY DEPLOYMENT IMPORT PATH field in 'Create Ray Model Deployment' job
CMD exec serve run deployment:app

Create a requirements file

The requirements.txt file is a list of installs for the Dockerfile to run to ensure the Docker container has the right resources to run the model. For the e5-small-v2 model, the requirements are as follows:
torch -f https://download.pytorch.org/whl/torch_stable.html # Make sure that we download CPU version of PyTorch
transformers
loguru
ray[serve]==2.42.1
Any recent ray[serve] version should work, but the tested value and known supported version is 2.42.1. In general, if an item was used in an import statement in your Python file, it should be included in the requirements file.To populate the requirements, use the following command in the terminal, inside the directory that contains your code:
pip freeze > requirements.txt

Build and push the Docker image

After creating the MODEL_NAME.py, Dockerfile, and requirements.txt files, you need to run a few Docker commands. Run the following commands in order:
DOCKER_DEFAULT_PLATFORM=linux/amd64 docker build . -t [DOCKERHUB-USERNAME]/[REPOSITORY]:[VERSION-TAG]
docker push [DOCKERHUB USERNAME]/[REPOSITORY]:[VERSION-TAG]
Using the example model, the terminal commands would be as follows:
DOCKER_DEFAULT_PLATFORM=linux/amd64 docker build . -t jstrmec/e5-small-v2-ray:0.1
docker push jstrmec/e5-small-v2-ray:0.1
This repository is public and you can visit it here: e5-small-v2-ray

Deploy the model in Fusion

Now you can go to Fusion to deploy your model.
  1. In Fusion, navigate to Collections > Jobs.
  2. Add a job by clicking the Add+ Button and selecting Create Ray Model Deployment.
  3. Fill in each of the text fields: Create a Ray model deployment job
    ParameterDescription
    Job IDA string used by the Fusion API to reference the job after its creation.
    Model nameA name for the deployed model. This is used to generate the deployment name in Ray. It is also the name that you reference as a model-id when making predictions with the ML Service.
    Model min replicasThe minimum number of load-balanced replicas of the model to deploy.
    Model max replicasThe maximum number of load-balanced replicas of the model to deploy. Specify multiple replicas for a higher-volume intake.
    Model CPU limitThe number of CPUs to allocate to a single model replica.
    Model memory limitThe maximum amount of memory to allocate to a single model replica.
    Ray Deployment Import PathThe path to your top-level Ray Serve deployment (or the same path passed to serve run). For example, deployment:app
    Docker RepositoryThe public or private repository where the Docker image is located. If you’re using Docker Hub, fill in the Docker Hub username here.
    Image nameThe name of the image. For example, e5-small-v2-ray:0.1.
    Kubernetes secretIf you’re using a private repository, supply the name of the Kubernetes secret used for access.
  4. Click Advanced to view and configure advanced details:
    ParameterDescription
    Additional parameters.This section lets you enter parameter name:parameter value options to be injected into the training JSON map at runtime. The values are inserted as they are entered, so you must surround string values with ". This is the sparkConfig field in the configuration file.
    Write Options.This section lets you enter parameter name:parameter value options to use when writing output to Solr or other sources. This is the writeOptions field in the configuration file.
    Read Options.This section lets you enter parameter name:parameter value options to use when reading input from Solr or other sources. This is the readOptions field in the configuration file.
  5. Click Save, then Run and Start. Start a Ray model deployment job When the job finishes successfully, you can proceed to the next section.
Now that the model is in Fusion, you can use it in the Machine Learning or Ray / Seldon Vectorize index and query stages.

Configure the Fusion pipelines

Your real-world pipeline configuration depends on your use case and model, but for our example we will configure the index pipeline and then the query pipeline.Configure the index pipeline
  1. Create a new index pipeline or load an existing one for editing.
  2. Click Add a Stage and then Machine Learning.
  3. In the new stage, fill in these fields:
    • The model ID
    • The model input
    • The model output
  4. Save the stage in the pipeline and index your data with it.
Configure the query pipeline
  1. Create a new query pipeline or load an existing one for editing.
  2. Click Add a Stage and then Machine Learning
  3. In the new stage, fill in these fields:
    • The model ID
    • The model input
    • The model output
  4. Save the stage and then run a query by typing a search term.
  5. To verify the Ray results are correct, use the Compare+ button to see another pipeline without the model implementation and compare the number of results.
You have now successfully uploaded a Ray model to Fusion and deployed it.
ImportantThis index stage must be placed before the Solr Indexer stage.

Configuration

When entering configuration values in the UI, use unescaped characters, such as \t for the tab character. When entering configuration values in the API, use escaped characters, such as \\t for the tab character.
I