DEV Community

Ranjith11903952
Ranjith11903952

Posted on

ConnectionResetError (104, 'Connection reset by peer) by the kserve?

I have A Hugging face model https://huggingface.co/TahaDouaji/detr-doc-table-detection in Torch serve it is working fine when I deployed it in locally. I tried to deploy the model in kserve as DockerImage. Pods are running fine without any error i tried to CURL request the model from the model pod, I'm getting the error like "ConnectionResetError (104, 'Connection reset by peer)". Here is my Transformerhandler.py file

import os
import json
import logging
from PIL import Image
import io
import torch
from transformers import DetrForObjectDetection
from ts.torch_handler.base_handler  import BaseHandler
from detectron2.structures import Boxes, ImageList, Instances, BitMasks, PolygonMasks
from detectron2.modeling import META_ARCH_REGISTRY, build_backbone, detector_postprocess
from detectron2.utils.visualizer import Visualizer, VisImage
from detectron2.layers import batched_nms
import torchvision.transforms as T
import torch.nn.functional
import numpy as np
import base64

def box_cxcywh_to_xyxy(x):
    x_c, y_c, w, h = x.unbind(-1)
    b = [(x_c - 0.5 * w), (y_c - 0.5 * h),
         (x_c + 0.5 * w), (y_c + 0.5 * h)]
    return torch.stack(b, dim=-1)

logger = logging.getLogger(__name__)

ipex_enabled = False
if os.environ.get("TS_IPEX_ENABLE", "false") == "true":
    try:
        import intel_extension_for_pytorch as ipex

        ipex_enabled = True
    except ImportError as error:
        logger.warning(
            "IPEX is enabled but intel-extension-for-pytorch is not installed. Proceeding without IPEX."
        )

class TransformersDetrHandler(BaseHandler):
    """
    The handler takes an input string and returns the classification text
    based on the serialized transformers checkpoint.
    """
    def __init__(self):
        super(TransformersDetrHandler, self).__init__()
        self.initialized = False
        self.pixel_mean = np.asarray([0.485, 0.456, 0.406])
        self.pixel_std = np.asarray([0.229, 0.224, 0.225])
        self.normalizer = lambda x: (x - self.pixel_mean) / self.pixel_std
        self.test_nms_thresh = 0.6
        self.score_thresh = 0.7
        self.mask_on = False

        self.transform = T.Compose([
            T.Resize(800),
            T.ToTensor(),
            T.Normalize(self.pixel_mean, self.pixel_std)
        ])

    @torch.no_grad()
    def initialize(self, ctx):
        """ Loads the model.pt file and initializes the model object.
        Instantiates Tokenizer for preprocessor to use
        Loads labels to name mapping file for post-processing inference response
        """
        self.manifest = ctx.manifest
        logger.debug(f"manifest => {self.manifest} =====================")

        properties = ctx.system_properties
        logger.debug(f"properties => {properties} =====================")

        model_dir = properties.get("model_dir")
        self.device = torch.device("cuda:" + str(properties.get("gpu_id")) if torch.cuda.is_available() else "cpu")

        # Read model serialize/pt file
        serialized_file = self.manifest["model"]["serializedFile"]
        model_pt_path = os.path.join(model_dir, serialized_file)
        if not os.path.isfile(model_pt_path):
            raise RuntimeError("Missing the model.pt or pytorch_model.bin file")

        # Load model
        self.model = DetrForObjectDetection.from_pretrained(model_dir)

        if ipex_enabled:
            self.model = self.model.to(memory_format=torch.channels_last)
            self.model = ipex.optimize(self.model)
        else:
            self.model.to(self.device)

        self.model.eval()
        logger.debug('Transformer model from path {0} loaded successfully'.format(model_dir))

        # Read the mapping file, index to object name
        mapping_file_path = os.path.join(model_dir, "index_to_name.json")

        if os.path.isfile(mapping_file_path):
            with open(mapping_file_path) as f:
                self.mapping = json.load(f)
                logger.debug(f'Label => {self.mapping}')
        else:
            logger.warning('Missing the index_to_name.json file. Inference output will not include class name.')

        self.initialized = True

    def preprocess(self, data):
        images = []
        self.image_sizes = []
        for row in data:
            # Read image from body and data
            image = row.get("data") or row.get("body")


            # Parse base64 to image to bytes
            if isinstance(image, str):
                # if the image is a string of bytesarray.
                image = base64.urlsafe_b64decode(image.split(',')[1])

            if isinstance(image, (bytearray, bytes)):
                image = Image.open(io.BytesIO(image))

            self.image_sizes.append(image.size)

            # Normalize and resize images
            image = self.transform(image)

            # To handel case of png
            if image.shape[-1] == 4:
                image = image[:, :, :, :3]

            images.append(image.unsqueeze(0))

        return images

    def convert_to_result_dict(self, box_cls_lst, box_pred_lst, mask_pred_lst, image_sizes):
        """
        Arguments:
            box_cls_lst List(Tensor): tensor of shape (batch_size, num_queries, K).
                The tensor predicts the classification probability for each query.
            box_pred_lst List(Tensor): tensors of shape (batch_size, num_queries, 4).
                The tensor predicts 4-vector (x,y,w,h) box
                regression values for every queryx
            image_sizes (List[List]): the input image sizes
        Returns:
            results (List[Instances]): a list of #images elements.
        """
        # assert len(box_cls_lst) == len(image_sizes)

        # Apply NMS and softmax
        labels = []
        scores = []
        new_box_cls_lst = []
        new_box_pred_lst = []
        for box_cls, box_pred in zip(box_cls_lst, box_pred_lst):
            # For each box we assign the best class or the second best if the best on is `no_object`.
            score, label = box_cls.softmax(-1)[:, :-1].max(-1)

            # Select bounding box above score_thresh
            keep = score.cpu().detach().numpy() >= self.score_thresh
            box_cls = box_cls[keep]
            box_pred = box_pred[keep]
            label = label[keep]
            score = score[keep]

            keep = batched_nms(box_pred, score, label, self.test_nms_thresh)
            box_cls = box_cls[keep]
            box_pred = box_pred[keep]
            label = label[keep]
            score = score[keep]

            labels.append(label)
            scores.append(score)
            new_box_cls_lst.append(box_cls)
            new_box_pred_lst.append(box_pred)

        box_cls_lst = new_box_cls_lst
        box_pred_lst = new_box_pred_lst

        # Create detectron2 instance of object detection
        results = []
        for i, (scores_per_image, labels_per_image, box_pred_per_image, image_size) in enumerate(zip(
            scores, labels, box_pred_lst, self.image_sizes
        )):
            width, height = image_size
            result = Instances([height, width])


            result.pred_boxes = Boxes(box_cxcywh_to_xyxy(box_pred_per_image))
            result.pred_boxes.scale(scale_x=width, scale_y=height)
            result.scores = scores_per_image
            result.pred_classes = labels_per_image

            # if self.mask_on:
            #     mask = torch.nn.functional.interpolate(mask_pred[i].unsqueeze(0), size=image_size, mode='bilinear', align_corners=False)
            #     mask = mask[0].sigmoid() > 0.5
            #     B, N, H, W = mask_pred.shape
            #     mask = BitMasks(mask.cpu()).crop_and_resize(result.pred_boxes.tensor.cpu(), 32)
            #     result.pred_masks = mask.unsqueeze(1).to(mask_pred[0].device)

            results.append(result)
        return results

    def inference(self, images):
        """ Predict the class of a text using a trained transformer model.
        """

        box_cls_lst = []
        box_pred_lst = []
        mask_pred_lst = []
        for image in images:

            output = dict(self.model(image))

            box_cls = output['logits'][0]
            box_pred = output['pred_boxes'][0]
            mask_pred = output.get("pred_masks", None) if self.mask_on else None
            if mask_pred is not None:
                mask_pred = mask_pred[0]

            box_cls_lst.append(box_cls)
            box_pred_lst.append(box_pred)
            mask_pred_lst.append(mask_pred)

        results = self.convert_to_result_dict(box_cls_lst, box_pred_lst, mask_pred_lst, self.image_sizes)

        return results

    def postprocess(self, results):
        processed_results = []
        for results_per_image, image_size in zip(results, self.image_sizes):
            width, height = image_size

            r = detector_postprocess(results_per_image, height, width)

            for box, label, prob in zip(
                [[float(d) for d in list(bbox)] for bbox in r.pred_boxes.tensor.cpu().detach().numpy()],
                [self.mapping[str(cs)] for cs in list(r.pred_classes.cpu().detach().numpy())],
                [float(d) for d in list(r.scores.cpu().detach().numpy())]
            ):
                processed_results.append({
                    'score': prob,
                    'label': label,
                    'box': {
                        'xmin': int(box[0]),
                        'ymin': int(box[1]),
                        'xmax': int(box[2]),
                        'ymax': int(box[3])
                    }
                })

        return processed_results
Enter fullscreen mode Exit fullscreen mode

Here is my InferenceService

apiVersion: serving.kserve.io/v1beta1
kind: InferenceService
metadata:
  name: custom-model
  namespace: user1
spec:
  predictor:
    containers:
      - name: kserve-container
        image: <Dockerhub>/<Docker image >
        env:
          - name: "MODEL_NAME"
            value: "table_det"
          - name: "PROTOCOL"
            value: "v1"
        ports:
          - containerPort: 8085
        readinessProbe:
          failureThreshold: 3
          periodSeconds: 100
          successThreshold: 1
          tcpSocket:
            port: 8085
          timeoutSeconds: 10
          initialDelaySeconds: 120
          periodSeconds: 30
        resources:
          limits:
            cpu: "1"
            memory: 2Gi
          requests:
            cpu: "1"
            memory: 2Gi
Enter fullscreen mode Exit fullscreen mode

I have deployed the Inference service file with Docker image.
I got the services like

Inference service {true}
ksvc {true}
Revision {true]
route {true}
virtualservice {true}
Enter fullscreen mode Exit fullscreen mode

All the services created by Kserve ,knative-serving and istio is created successfully without any error but i'm not able to establish the connection between the ingress of istio and the model.

Give me the best solution : thank you

I need solution and how the connection happens between the kserve and model deployed. I want to set perfect connection between the model and kserve

Top comments (0)