# Ultralytics YOLOv5 🚀, AGPL-3.0 license """Common modules.""" import ast import contextlib import json import math import platform import warnings import zipfile from collections import OrderedDict, namedtuple from copy import copy from pathlib import Path from urllib.parse import urlparse import cv2 import numpy as np import pandas as pd import requests import torch import torch.nn as nn from PIL import Image from torch.cuda import amp # Import 'ultralytics' package or install if missing try: import ultralytics assert hasattr(ultralytics, "__version__") # verify package is not directory except (ImportError, AssertionError): import os os.system("pip install -U ultralytics") import ultralytics from ultralytics.utils.plotting import Annotator, colors, save_one_box from utils import TryExcept from utils.dataloaders import exif_transpose, letterbox from utils.general import ( LOGGER, ROOT, Profile, check_requirements, check_suffix, check_version, colorstr, increment_path, is_jupyter, make_divisible, non_max_suppression, scale_boxes, xywh2xyxy, xyxy2xywh, yaml_load, ) from utils.torch_utils import copy_attr, smart_inference_mode def autopad(k, p=None, d=1): """ Pads kernel to 'same' output shape, adjusting for optional dilation; returns padding size. `k`: kernel, `p`: padding, `d`: dilation. """ if d > 1: k = d * (k - 1) + 1 if isinstance(k, int) else [d * (x - 1) + 1 for x in k] # actual kernel-size if p is None: p = k // 2 if isinstance(k, int) else [x // 2 for x in k] # auto-pad return p class Conv(nn.Module): # Standard convolution with args(ch_in, ch_out, kernel, stride, padding, groups, dilation, activation) default_act = nn.SiLU() # default activation def __init__(self, c1, c2, k=1, s=1, p=None, g=1, d=1, act=True): """Initializes a standard convolution layer with optional batch normalization and activation.""" super().__init__() self.conv = nn.Conv2d(c1, c2, k, s, autopad(k, p, d), groups=g, dilation=d, bias=False) self.bn = nn.BatchNorm2d(c2) self.act = self.default_act if act is True else act if isinstance(act, nn.Module) else nn.Identity() def forward(self, x): """Applies a convolution followed by batch normalization and an activation function to the input tensor `x`.""" return self.act(self.bn(self.conv(x))) def forward_fuse(self, x): """Applies a fused convolution and activation function to the input tensor `x`.""" return self.act(self.conv(x)) class DWConv(Conv): # Depth-wise convolution def __init__(self, c1, c2, k=1, s=1, d=1, act=True): """Initializes a depth-wise convolution layer with optional activation; args: input channels (c1), output channels (c2), kernel size (k), stride (s), dilation (d), and activation flag (act). """ super().__init__(c1, c2, k, s, g=math.gcd(c1, c2), d=d, act=act) class DWConvTranspose2d(nn.ConvTranspose2d): # Depth-wise transpose convolution def __init__(self, c1, c2, k=1, s=1, p1=0, p2=0): """Initializes a depth-wise transpose convolutional layer for YOLOv5; args: input channels (c1), output channels (c2), kernel size (k), stride (s), input padding (p1), output padding (p2). """ super().__init__(c1, c2, k, s, p1, p2, groups=math.gcd(c1, c2)) class TransformerLayer(nn.Module): # Transformer layer https://arxiv.org/abs/2010.11929 (LayerNorm layers removed for better performance) def __init__(self, c, num_heads): """ Initializes a transformer layer, sans LayerNorm for performance, with multihead attention and linear layers. See as described in https://arxiv.org/abs/2010.11929. """ super().__init__() self.q = nn.Linear(c, c, bias=False) self.k = nn.Linear(c, c, bias=False) self.v = nn.Linear(c, c, bias=False) self.ma = nn.MultiheadAttention(embed_dim=c, num_heads=num_heads) self.fc1 = nn.Linear(c, c, bias=False) self.fc2 = nn.Linear(c, c, bias=False) def forward(self, x): """Performs forward pass using MultiheadAttention and two linear transformations with residual connections.""" x = self.ma(self.q(x), self.k(x), self.v(x))[0] + x x = self.fc2(self.fc1(x)) + x return x class TransformerBlock(nn.Module): # Vision Transformer https://arxiv.org/abs/2010.11929 def __init__(self, c1, c2, num_heads, num_layers): """Initializes a Transformer block for vision tasks, adapting dimensions if necessary and stacking specified layers. """ super().__init__() self.conv = None if c1 != c2: self.conv = Conv(c1, c2) self.linear = nn.Linear(c2, c2) # learnable position embedding self.tr = nn.Sequential(*(TransformerLayer(c2, num_heads) for _ in range(num_layers))) self.c2 = c2 def forward(self, x): """Processes input through an optional convolution, followed by Transformer layers and position embeddings for object detection. """ if self.conv is not None: x = self.conv(x) b, _, w, h = x.shape p = x.flatten(2).permute(2, 0, 1) return self.tr(p + self.linear(p)).permute(1, 2, 0).reshape(b, self.c2, w, h) class Bottleneck(nn.Module): # Standard bottleneck def __init__(self, c1, c2, shortcut=True, g=1, e=0.5): """Initializes a standard bottleneck layer with optional shortcut and group convolution, supporting channel expansion. """ super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_, c2, 3, 1, g=g) self.add = shortcut and c1 == c2 def forward(self, x): """Processes input through two convolutions, optionally adds shortcut if channel dimensions match; input is a tensor. """ return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class BottleneckCSP(nn.Module): # CSP Bottleneck https://github.com/WongKinYiu/CrossStagePartialNetworks def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): """Initializes CSP bottleneck with optional shortcuts; args: ch_in, ch_out, number of repeats, shortcut bool, groups, expansion. """ super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = nn.Conv2d(c1, c_, 1, 1, bias=False) self.cv3 = nn.Conv2d(c_, c_, 1, 1, bias=False) self.cv4 = Conv(2 * c_, c2, 1, 1) self.bn = nn.BatchNorm2d(2 * c_) # applied to cat(cv2, cv3) self.act = nn.SiLU() self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n))) def forward(self, x): """Performs forward pass by applying layers, activation, and concatenation on input x, returning feature- enhanced output. """ y1 = self.cv3(self.m(self.cv1(x))) y2 = self.cv2(x) return self.cv4(self.act(self.bn(torch.cat((y1, y2), 1)))) class CrossConv(nn.Module): # Cross Convolution Downsample def __init__(self, c1, c2, k=3, s=1, g=1, e=1.0, shortcut=False): """ Initializes CrossConv with downsampling, expanding, and optionally shortcutting; `c1` input, `c2` output channels. Inputs are ch_in, ch_out, kernel, stride, groups, expansion, shortcut. """ super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, (1, k), (1, s)) self.cv2 = Conv(c_, c2, (k, 1), (s, 1), g=g) self.add = shortcut and c1 == c2 def forward(self, x): """Performs feature sampling, expanding, and applies shortcut if channels match; expects `x` input tensor.""" return x + self.cv2(self.cv1(x)) if self.add else self.cv2(self.cv1(x)) class C3(nn.Module): # CSP Bottleneck with 3 convolutions def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): """Initializes C3 module with options for channel count, bottleneck repetition, shortcut usage, group convolutions, and expansion. """ super().__init__() c_ = int(c2 * e) # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c1, c_, 1, 1) self.cv3 = Conv(2 * c_, c2, 1) # optional act=FReLU(c2) self.m = nn.Sequential(*(Bottleneck(c_, c_, shortcut, g, e=1.0) for _ in range(n))) def forward(self, x): """Performs forward propagation using concatenated outputs from two convolutions and a Bottleneck sequence.""" return self.cv3(torch.cat((self.m(self.cv1(x)), self.cv2(x)), 1)) class C3x(C3): # C3 module with cross-convolutions def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): """Initializes C3x module with cross-convolutions, extending C3 with customizable channel dimensions, groups, and expansion. """ super().__init__(c1, c2, n, shortcut, g, e) c_ = int(c2 * e) self.m = nn.Sequential(*(CrossConv(c_, c_, 3, 1, g, 1.0, shortcut) for _ in range(n))) class C3TR(C3): # C3 module with TransformerBlock() def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): """Initializes C3 module with TransformerBlock for enhanced feature extraction, accepts channel sizes, shortcut config, group, and expansion. """ super().__init__(c1, c2, n, shortcut, g, e) c_ = int(c2 * e) self.m = TransformerBlock(c_, c_, 4, n) class C3SPP(C3): # C3 module with SPP() def __init__(self, c1, c2, k=(5, 9, 13), n=1, shortcut=True, g=1, e=0.5): """Initializes a C3 module with SPP layer for advanced spatial feature extraction, given channel sizes, kernel sizes, shortcut, group, and expansion ratio. """ super().__init__(c1, c2, n, shortcut, g, e) c_ = int(c2 * e) self.m = SPP(c_, c_, k) class C3Ghost(C3): # C3 module with GhostBottleneck() def __init__(self, c1, c2, n=1, shortcut=True, g=1, e=0.5): """Initializes YOLOv5's C3 module with Ghost Bottlenecks for efficient feature extraction.""" super().__init__(c1, c2, n, shortcut, g, e) c_ = int(c2 * e) # hidden channels self.m = nn.Sequential(*(GhostBottleneck(c_, c_) for _ in range(n))) class SPP(nn.Module): # Spatial Pyramid Pooling (SPP) layer https://arxiv.org/abs/1406.4729 def __init__(self, c1, c2, k=(5, 9, 13)): """Initializes SPP layer with Spatial Pyramid Pooling, ref: https://arxiv.org/abs/1406.4729, args: c1 (input channels), c2 (output channels), k (kernel sizes).""" super().__init__() c_ = c1 // 2 # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_ * (len(k) + 1), c2, 1, 1) self.m = nn.ModuleList([nn.MaxPool2d(kernel_size=x, stride=1, padding=x // 2) for x in k]) def forward(self, x): """Applies convolution and max pooling layers to the input tensor `x`, concatenates results, and returns output tensor. """ x = self.cv1(x) with warnings.catch_warnings(): warnings.simplefilter("ignore") # suppress torch 1.9.0 max_pool2d() warning return self.cv2(torch.cat([x] + [m(x) for m in self.m], 1)) class SPPF(nn.Module): # Spatial Pyramid Pooling - Fast (SPPF) layer for YOLOv5 by Glenn Jocher def __init__(self, c1, c2, k=5): """ Initializes YOLOv5 SPPF layer with given channels and kernel size for YOLOv5 model, combining convolution and max pooling. Equivalent to SPP(k=(5, 9, 13)). """ super().__init__() c_ = c1 // 2 # hidden channels self.cv1 = Conv(c1, c_, 1, 1) self.cv2 = Conv(c_ * 4, c2, 1, 1) self.m = nn.MaxPool2d(kernel_size=k, stride=1, padding=k // 2) def forward(self, x): """Processes input through a series of convolutions and max pooling operations for feature extraction.""" x = self.cv1(x) with warnings.catch_warnings(): warnings.simplefilter("ignore") # suppress torch 1.9.0 max_pool2d() warning y1 = self.m(x) y2 = self.m(y1) return self.cv2(torch.cat((x, y1, y2, self.m(y2)), 1)) class Focus(nn.Module): # Focus wh information into c-space def __init__(self, c1, c2, k=1, s=1, p=None, g=1, act=True): """Initializes Focus module to concentrate width-height info into channel space with configurable convolution parameters. """ super().__init__() self.conv = Conv(c1 * 4, c2, k, s, p, g, act=act) # self.contract = Contract(gain=2) def forward(self, x): """Processes input through Focus mechanism, reshaping (b,c,w,h) to (b,4c,w/2,h/2) then applies convolution.""" return self.conv(torch.cat((x[..., ::2, ::2], x[..., 1::2, ::2], x[..., ::2, 1::2], x[..., 1::2, 1::2]), 1)) # return self.conv(self.contract(x)) class GhostConv(nn.Module): # Ghost Convolution https://github.com/huawei-noah/ghostnet def __init__(self, c1, c2, k=1, s=1, g=1, act=True): """Initializes GhostConv with in/out channels, kernel size, stride, groups, and activation; halves out channels for efficiency. """ super().__init__() c_ = c2 // 2 # hidden channels self.cv1 = Conv(c1, c_, k, s, None, g, act=act) self.cv2 = Conv(c_, c_, 5, 1, None, c_, act=act) def forward(self, x): """Performs forward pass, concatenating outputs of two convolutions on input `x`: shape (B,C,H,W).""" y = self.cv1(x) return torch.cat((y, self.cv2(y)), 1) class GhostBottleneck(nn.Module): # Ghost Bottleneck https://github.com/huawei-noah/ghostnet def __init__(self, c1, c2, k=3, s=1): """Initializes GhostBottleneck with ch_in `c1`, ch_out `c2`, kernel size `k`, stride `s`; see https://github.com/huawei-noah/ghostnet.""" super().__init__() c_ = c2 // 2 self.conv = nn.Sequential( GhostConv(c1, c_, 1, 1), # pw DWConv(c_, c_, k, s, act=False) if s == 2 else nn.Identity(), # dw GhostConv(c_, c2, 1, 1, act=False), ) # pw-linear self.shortcut = ( nn.Sequential(DWConv(c1, c1, k, s, act=False), Conv(c1, c2, 1, 1, act=False)) if s == 2 else nn.Identity() ) def forward(self, x): """Processes input through conv and shortcut layers, returning their summed output.""" return self.conv(x) + self.shortcut(x) class Contract(nn.Module): # Contract width-height into channels, i.e. x(1,64,80,80) to x(1,256,40,40) def __init__(self, gain=2): """Initializes a layer to contract spatial dimensions (width-height) into channels, e.g., input shape (1,64,80,80) to (1,256,40,40). """ super().__init__() self.gain = gain def forward(self, x): """Processes input tensor to expand channel dimensions by contracting spatial dimensions, yielding output shape `(b, c*s*s, h//s, w//s)`. """ b, c, h, w = x.size() # assert (h / s == 0) and (W / s == 0), 'Indivisible gain' s = self.gain x = x.view(b, c, h // s, s, w // s, s) # x(1,64,40,2,40,2) x = x.permute(0, 3, 5, 1, 2, 4).contiguous() # x(1,2,2,64,40,40) return x.view(b, c * s * s, h // s, w // s) # x(1,256,40,40) class Expand(nn.Module): # Expand channels into width-height, i.e. x(1,64,80,80) to x(1,16,160,160) def __init__(self, gain=2): """ Initializes the Expand module to increase spatial dimensions by redistributing channels, with an optional gain factor. Example: x(1,64,80,80) to x(1,16,160,160). """ super().__init__() self.gain = gain def forward(self, x): """Processes input tensor x to expand spatial dimensions by redistributing channels, requiring C / gain^2 == 0. """ b, c, h, w = x.size() # assert C / s ** 2 == 0, 'Indivisible gain' s = self.gain x = x.view(b, s, s, c // s**2, h, w) # x(1,2,2,16,80,80) x = x.permute(0, 3, 4, 1, 5, 2).contiguous() # x(1,16,80,2,80,2) return x.view(b, c // s**2, h * s, w * s) # x(1,16,160,160) class Concat(nn.Module): # Concatenate a list of tensors along dimension def __init__(self, dimension=1): """Initializes a Concat module to concatenate tensors along a specified dimension.""" super().__init__() self.d = dimension def forward(self, x): """Concatenates a list of tensors along a specified dimension; `x` is a list of tensors, `dimension` is an int. """ return torch.cat(x, self.d) class DetectMultiBackend(nn.Module): # YOLOv5 MultiBackend class for python inference on various backends def __init__(self, weights="yolov5s.pt", device=torch.device("cpu"), dnn=False, data=None, fp16=False, fuse=True): """Initializes DetectMultiBackend with support for various inference backends, including PyTorch and ONNX.""" # PyTorch: weights = *.pt # TorchScript: *.torchscript # ONNX Runtime: *.onnx # ONNX OpenCV DNN: *.onnx --dnn # OpenVINO: *_openvino_model # CoreML: *.mlmodel # TensorRT: *.engine # TensorFlow SavedModel: *_saved_model # TensorFlow GraphDef: *.pb # TensorFlow Lite: *.tflite # TensorFlow Edge TPU: *_edgetpu.tflite # PaddlePaddle: *_paddle_model from models.experimental import attempt_download, attempt_load # scoped to avoid circular import super().__init__() w = str(weights[0] if isinstance(weights, list) else weights) pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, paddle, triton = self._model_type(w) fp16 &= pt or jit or onnx or engine or triton # FP16 nhwc = coreml or saved_model or pb or tflite or edgetpu # BHWC formats (vs torch BCWH) stride = 32 # default stride cuda = torch.cuda.is_available() and device.type != "cpu" # use CUDA if not (pt or triton): w = attempt_download(w) # download if not local if pt: # PyTorch model = attempt_load(weights if isinstance(weights, list) else w, device=device, inplace=True, fuse=fuse) stride = max(int(model.stride.max()), 32) # model stride names = model.module.names if hasattr(model, "module") else model.names # get class names model.half() if fp16 else model.float() self.model = model # explicitly assign for to(), cpu(), cuda(), half() elif jit: # TorchScript LOGGER.info(f"Loading {w} for TorchScript inference...") extra_files = {"config.txt": ""} # model metadata model = torch.jit.load(w, _extra_files=extra_files, map_location=device) model.half() if fp16 else model.float() if extra_files["config.txt"]: # load metadata dict d = json.loads( extra_files["config.txt"], object_hook=lambda d: {int(k) if k.isdigit() else k: v for k, v in d.items()}, ) stride, names = int(d["stride"]), d["names"] elif dnn: # ONNX OpenCV DNN LOGGER.info(f"Loading {w} for ONNX OpenCV DNN inference...") check_requirements("opencv-python>=4.5.4") net = cv2.dnn.readNetFromONNX(w) elif onnx: # ONNX Runtime LOGGER.info(f"Loading {w} for ONNX Runtime inference...") check_requirements(("onnx", "onnxruntime-gpu" if cuda else "onnxruntime")) import onnxruntime providers = ["CUDAExecutionProvider", "CPUExecutionProvider"] if cuda else ["CPUExecutionProvider"] session = onnxruntime.InferenceSession(w, providers=providers) output_names = [x.name for x in session.get_outputs()] meta = session.get_modelmeta().custom_metadata_map # metadata if "stride" in meta: stride, names = int(meta["stride"]), eval(meta["names"]) elif xml: # OpenVINO LOGGER.info(f"Loading {w} for OpenVINO inference...") check_requirements("openvino>=2023.0") # requires openvino-dev: https://pypi.org/project/openvino-dev/ from openvino.runtime import Core, Layout, get_batch core = Core() if not Path(w).is_file(): # if not *.xml w = next(Path(w).glob("*.xml")) # get *.xml file from *_openvino_model dir ov_model = core.read_model(model=w, weights=Path(w).with_suffix(".bin")) if ov_model.get_parameters()[0].get_layout().empty: ov_model.get_parameters()[0].set_layout(Layout("NCHW")) batch_dim = get_batch(ov_model) if batch_dim.is_static: batch_size = batch_dim.get_length() ov_compiled_model = core.compile_model(ov_model, device_name="AUTO") # AUTO selects best available device stride, names = self._load_metadata(Path(w).with_suffix(".yaml")) # load metadata elif engine: # TensorRT LOGGER.info(f"Loading {w} for TensorRT inference...") import tensorrt as trt # https://developer.nvidia.com/nvidia-tensorrt-download check_version(trt.__version__, "7.0.0", hard=True) # require tensorrt>=7.0.0 if device.type == "cpu": device = torch.device("cuda:0") Binding = namedtuple("Binding", ("name", "dtype", "shape", "data", "ptr")) logger = trt.Logger(trt.Logger.INFO) with open(w, "rb") as f, trt.Runtime(logger) as runtime: model = runtime.deserialize_cuda_engine(f.read()) context = model.create_execution_context() bindings = OrderedDict() output_names = [] fp16 = False # default updated below dynamic = False is_trt10 = not hasattr(model, "num_bindings") num = range(model.num_io_tensors) if is_trt10 else range(model.num_bindings) for i in num: if is_trt10: name = model.get_tensor_name(i) dtype = trt.nptype(model.get_tensor_dtype(name)) is_input = model.get_tensor_mode(name) == trt.TensorIOMode.INPUT if is_input: if -1 in tuple(model.get_tensor_shape(name)): # dynamic dynamic = True context.set_input_shape(name, tuple(model.get_profile_shape(name, 0)[2])) if dtype == np.float16: fp16 = True else: # output output_names.append(name) shape = tuple(context.get_tensor_shape(name)) else: name = model.get_binding_name(i) dtype = trt.nptype(model.get_binding_dtype(i)) if model.binding_is_input(i): if -1 in tuple(model.get_binding_shape(i)): # dynamic dynamic = True context.set_binding_shape(i, tuple(model.get_profile_shape(0, i)[2])) if dtype == np.float16: fp16 = True else: # output output_names.append(name) shape = tuple(context.get_binding_shape(i)) im = torch.from_numpy(np.empty(shape, dtype=dtype)).to(device) bindings[name] = Binding(name, dtype, shape, im, int(im.data_ptr())) binding_addrs = OrderedDict((n, d.ptr) for n, d in bindings.items()) batch_size = bindings["images"].shape[0] # if dynamic, this is instead max batch size elif coreml: # CoreML LOGGER.info(f"Loading {w} for CoreML inference...") import coremltools as ct model = ct.models.MLModel(w) elif saved_model: # TF SavedModel LOGGER.info(f"Loading {w} for TensorFlow SavedModel inference...") import tensorflow as tf keras = False # assume TF1 saved_model model = tf.keras.models.load_model(w) if keras else tf.saved_model.load(w) elif pb: # GraphDef https://www.tensorflow.org/guide/migrate#a_graphpb_or_graphpbtxt LOGGER.info(f"Loading {w} for TensorFlow GraphDef inference...") import tensorflow as tf def wrap_frozen_graph(gd, inputs, outputs): """Wraps a TensorFlow GraphDef for inference, returning a pruned function.""" x = tf.compat.v1.wrap_function(lambda: tf.compat.v1.import_graph_def(gd, name=""), []) # wrapped ge = x.graph.as_graph_element return x.prune(tf.nest.map_structure(ge, inputs), tf.nest.map_structure(ge, outputs)) def gd_outputs(gd): """Generates a sorted list of graph outputs excluding NoOp nodes and inputs, formatted as ':0'.""" name_list, input_list = [], [] for node in gd.node: # tensorflow.core.framework.node_def_pb2.NodeDef name_list.append(node.name) input_list.extend(node.input) return sorted(f"{x}:0" for x in list(set(name_list) - set(input_list)) if not x.startswith("NoOp")) gd = tf.Graph().as_graph_def() # TF GraphDef with open(w, "rb") as f: gd.ParseFromString(f.read()) frozen_func = wrap_frozen_graph(gd, inputs="x:0", outputs=gd_outputs(gd)) elif tflite or edgetpu: # https://www.tensorflow.org/lite/guide/python#install_tensorflow_lite_for_python try: # https://coral.ai/docs/edgetpu/tflite-python/#update-existing-tf-lite-code-for-the-edge-tpu from tflite_runtime.interpreter import Interpreter, load_delegate except ImportError: import tensorflow as tf Interpreter, load_delegate = ( tf.lite.Interpreter, tf.lite.experimental.load_delegate, ) if edgetpu: # TF Edge TPU https://coral.ai/software/#edgetpu-runtime LOGGER.info(f"Loading {w} for TensorFlow Lite Edge TPU inference...") delegate = {"Linux": "libedgetpu.so.1", "Darwin": "libedgetpu.1.dylib", "Windows": "edgetpu.dll"}[ platform.system() ] interpreter = Interpreter(model_path=w, experimental_delegates=[load_delegate(delegate)]) else: # TFLite LOGGER.info(f"Loading {w} for TensorFlow Lite inference...") interpreter = Interpreter(model_path=w) # load TFLite model interpreter.allocate_tensors() # allocate input_details = interpreter.get_input_details() # inputs output_details = interpreter.get_output_details() # outputs # load metadata with contextlib.suppress(zipfile.BadZipFile): with zipfile.ZipFile(w, "r") as model: meta_file = model.namelist()[0] meta = ast.literal_eval(model.read(meta_file).decode("utf-8")) stride, names = int(meta["stride"]), meta["names"] elif tfjs: # TF.js raise NotImplementedError("ERROR: YOLOv5 TF.js inference is not supported") elif paddle: # PaddlePaddle LOGGER.info(f"Loading {w} for PaddlePaddle inference...") check_requirements("paddlepaddle-gpu" if cuda else "paddlepaddle") import paddle.inference as pdi if not Path(w).is_file(): # if not *.pdmodel w = next(Path(w).rglob("*.pdmodel")) # get *.pdmodel file from *_paddle_model dir weights = Path(w).with_suffix(".pdiparams") config = pdi.Config(str(w), str(weights)) if cuda: config.enable_use_gpu(memory_pool_init_size_mb=2048, device_id=0) predictor = pdi.create_predictor(config) input_handle = predictor.get_input_handle(predictor.get_input_names()[0]) output_names = predictor.get_output_names() elif triton: # NVIDIA Triton Inference Server LOGGER.info(f"Using {w} as Triton Inference Server...") check_requirements("tritonclient[all]") from utils.triton import TritonRemoteModel model = TritonRemoteModel(url=w) nhwc = model.runtime.startswith("tensorflow") else: raise NotImplementedError(f"ERROR: {w} is not a supported format") # class names if "names" not in locals(): names = yaml_load(data)["names"] if data else {i: f"class{i}" for i in range(999)} if names[0] == "n01440764" and len(names) == 1000: # ImageNet names = yaml_load(ROOT / "data/ImageNet.yaml")["names"] # human-readable names self.__dict__.update(locals()) # assign all variables to self def forward(self, im, augment=False, visualize=False): """Performs YOLOv5 inference on input images with options for augmentation and visualization.""" b, ch, h, w = im.shape # batch, channel, height, width if self.fp16 and im.dtype != torch.float16: im = im.half() # to FP16 if self.nhwc: im = im.permute(0, 2, 3, 1) # torch BCHW to numpy BHWC shape(1,320,192,3) if self.pt: # PyTorch y = self.model(im, augment=augment, visualize=visualize) if augment or visualize else self.model(im) elif self.jit: # TorchScript y = self.model(im) elif self.dnn: # ONNX OpenCV DNN im = im.cpu().numpy() # torch to numpy self.net.setInput(im) y = self.net.forward() elif self.onnx: # ONNX Runtime im = im.cpu().numpy() # torch to numpy y = self.session.run(self.output_names, {self.session.get_inputs()[0].name: im}) elif self.xml: # OpenVINO im = im.cpu().numpy() # FP32 y = list(self.ov_compiled_model(im).values()) elif self.engine: # TensorRT if self.dynamic and im.shape != self.bindings["images"].shape: i = self.model.get_binding_index("images") self.context.set_binding_shape(i, im.shape) # reshape if dynamic self.bindings["images"] = self.bindings["images"]._replace(shape=im.shape) for name in self.output_names: i = self.model.get_binding_index(name) self.bindings[name].data.resize_(tuple(self.context.get_binding_shape(i))) s = self.bindings["images"].shape assert im.shape == s, f"input size {im.shape} {'>' if self.dynamic else 'not equal to'} max model size {s}" self.binding_addrs["images"] = int(im.data_ptr()) self.context.execute_v2(list(self.binding_addrs.values())) y = [self.bindings[x].data for x in sorted(self.output_names)] elif self.coreml: # CoreML im = im.cpu().numpy() im = Image.fromarray((im[0] * 255).astype("uint8")) # im = im.resize((192, 320), Image.BILINEAR) y = self.model.predict({"image": im}) # coordinates are xywh normalized if "confidence" in y: box = xywh2xyxy(y["coordinates"] * [[w, h, w, h]]) # xyxy pixels conf, cls = y["confidence"].max(1), y["confidence"].argmax(1).astype(np.float) y = np.concatenate((box, conf.reshape(-1, 1), cls.reshape(-1, 1)), 1) else: y = list(reversed(y.values())) # reversed for segmentation models (pred, proto) elif self.paddle: # PaddlePaddle im = im.cpu().numpy().astype(np.float32) self.input_handle.copy_from_cpu(im) self.predictor.run() y = [self.predictor.get_output_handle(x).copy_to_cpu() for x in self.output_names] elif self.triton: # NVIDIA Triton Inference Server y = self.model(im) else: # TensorFlow (SavedModel, GraphDef, Lite, Edge TPU) im = im.cpu().numpy() if self.saved_model: # SavedModel y = self.model(im, training=False) if self.keras else self.model(im) elif self.pb: # GraphDef y = self.frozen_func(x=self.tf.constant(im)) else: # Lite or Edge TPU input = self.input_details[0] int8 = input["dtype"] == np.uint8 # is TFLite quantized uint8 model if int8: scale, zero_point = input["quantization"] im = (im / scale + zero_point).astype(np.uint8) # de-scale self.interpreter.set_tensor(input["index"], im) self.interpreter.invoke() y = [] for output in self.output_details: x = self.interpreter.get_tensor(output["index"]) if int8: scale, zero_point = output["quantization"] x = (x.astype(np.float32) - zero_point) * scale # re-scale y.append(x) y = [x if isinstance(x, np.ndarray) else x.numpy() for x in y] y[0][..., :4] *= [w, h, w, h] # xywh normalized to pixels if isinstance(y, (list, tuple)): return self.from_numpy(y[0]) if len(y) == 1 else [self.from_numpy(x) for x in y] else: return self.from_numpy(y) def from_numpy(self, x): """Converts a NumPy array to a torch tensor, maintaining device compatibility.""" return torch.from_numpy(x).to(self.device) if isinstance(x, np.ndarray) else x def warmup(self, imgsz=(1, 3, 640, 640)): """Performs a single inference warmup to initialize model weights, accepting an `imgsz` tuple for image size.""" warmup_types = self.pt, self.jit, self.onnx, self.engine, self.saved_model, self.pb, self.triton if any(warmup_types) and (self.device.type != "cpu" or self.triton): im = torch.empty(*imgsz, dtype=torch.half if self.fp16 else torch.float, device=self.device) # input for _ in range(2 if self.jit else 1): # self.forward(im) # warmup @staticmethod def _model_type(p="path/to/model.pt"): """ Determines model type from file path or URL, supporting various export formats. Example: path='path/to/model.onnx' -> type=onnx """ # types = [pt, jit, onnx, xml, engine, coreml, saved_model, pb, tflite, edgetpu, tfjs, paddle] from export import export_formats from utils.downloads import is_url sf = list(export_formats().Suffix) # export suffixes if not is_url(p, check=False): check_suffix(p, sf) # checks url = urlparse(p) # if url may be Triton inference server types = [s in Path(p).name for s in sf] types[8] &= not types[9] # tflite &= not edgetpu triton = not any(types) and all([any(s in url.scheme for s in ["http", "grpc"]), url.netloc]) return types + [triton] @staticmethod def _load_metadata(f=Path("path/to/meta.yaml")): """Loads metadata from a YAML file, returning strides and names if the file exists, otherwise `None`.""" if f.exists(): d = yaml_load(f) return d["stride"], d["names"] # assign stride, names return None, None class AutoShape(nn.Module): # YOLOv5 input-robust model wrapper for passing cv2/np/PIL/torch inputs. Includes preprocessing, inference and NMS conf = 0.25 # NMS confidence threshold iou = 0.45 # NMS IoU threshold agnostic = False # NMS class-agnostic multi_label = False # NMS multiple labels per box classes = None # (optional list) filter by class, i.e. = [0, 15, 16] for COCO persons, cats and dogs max_det = 1000 # maximum number of detections per image amp = False # Automatic Mixed Precision (AMP) inference def __init__(self, model, verbose=True): """Initializes YOLOv5 model for inference, setting up attributes and preparing model for evaluation.""" super().__init__() if verbose: LOGGER.info("Adding AutoShape... ") copy_attr(self, model, include=("yaml", "nc", "hyp", "names", "stride", "abc"), exclude=()) # copy attributes self.dmb = isinstance(model, DetectMultiBackend) # DetectMultiBackend() instance self.pt = not self.dmb or model.pt # PyTorch model self.model = model.eval() if self.pt: m = self.model.model.model[-1] if self.dmb else self.model.model[-1] # Detect() m.inplace = False # Detect.inplace=False for safe multithread inference m.export = True # do not output loss values def _apply(self, fn): """ Applies to(), cpu(), cuda(), half() etc. to model tensors excluding parameters or registered buffers. """ self = super()._apply(fn) if self.pt: m = self.model.model.model[-1] if self.dmb else self.model.model[-1] # Detect() m.stride = fn(m.stride) m.grid = list(map(fn, m.grid)) if isinstance(m.anchor_grid, list): m.anchor_grid = list(map(fn, m.anchor_grid)) return self @smart_inference_mode() def forward(self, ims, size=640, augment=False, profile=False): """ Performs inference on inputs with optional augment & profiling. Supports various formats including file, URI, OpenCV, PIL, numpy, torch. """ # For size(height=640, width=1280), RGB images example inputs are: # file: ims = 'data/images/zidane.jpg' # str or PosixPath # URI: = 'https://ultralytics.com/images/zidane.jpg' # OpenCV: = cv2.imread('image.jpg')[:,:,::-1] # HWC BGR to RGB x(640,1280,3) # PIL: = Image.open('image.jpg') or ImageGrab.grab() # HWC x(640,1280,3) # numpy: = np.zeros((640,1280,3)) # HWC # torch: = torch.zeros(16,3,320,640) # BCHW (scaled to size=640, 0-1 values) # multiple: = [Image.open('image1.jpg'), Image.open('image2.jpg'), ...] # list of images dt = (Profile(), Profile(), Profile()) with dt[0]: if isinstance(size, int): # expand size = (size, size) p = next(self.model.parameters()) if self.pt else torch.empty(1, device=self.model.device) # param autocast = self.amp and (p.device.type != "cpu") # Automatic Mixed Precision (AMP) inference if isinstance(ims, torch.Tensor): # torch with amp.autocast(autocast): return self.model(ims.to(p.device).type_as(p), augment=augment) # inference # Pre-process n, ims = (len(ims), list(ims)) if isinstance(ims, (list, tuple)) else (1, [ims]) # number, list of images shape0, shape1, files = [], [], [] # image and inference shapes, filenames for i, im in enumerate(ims): f = f"image{i}" # filename if isinstance(im, (str, Path)): # filename or uri im, f = Image.open(requests.get(im, stream=True).raw if str(im).startswith("http") else im), im im = np.asarray(exif_transpose(im)) elif isinstance(im, Image.Image): # PIL Image im, f = np.asarray(exif_transpose(im)), getattr(im, "filename", f) or f files.append(Path(f).with_suffix(".jpg").name) if im.shape[0] < 5: # image in CHW im = im.transpose((1, 2, 0)) # reverse dataloader .transpose(2, 0, 1) im = im[..., :3] if im.ndim == 3 else cv2.cvtColor(im, cv2.COLOR_GRAY2BGR) # enforce 3ch input s = im.shape[:2] # HWC shape0.append(s) # image shape g = max(size) / max(s) # gain shape1.append([int(y * g) for y in s]) ims[i] = im if im.data.contiguous else np.ascontiguousarray(im) # update shape1 = [make_divisible(x, self.stride) for x in np.array(shape1).max(0)] # inf shape x = [letterbox(im, shape1, auto=False)[0] for im in ims] # pad x = np.ascontiguousarray(np.array(x).transpose((0, 3, 1, 2))) # stack and BHWC to BCHW x = torch.from_numpy(x).to(p.device).type_as(p) / 255 # uint8 to fp16/32 with amp.autocast(autocast): # Inference with dt[1]: y = self.model(x, augment=augment) # forward # Post-process with dt[2]: y = non_max_suppression( y if self.dmb else y[0], self.conf, self.iou, self.classes, self.agnostic, self.multi_label, max_det=self.max_det, ) # NMS for i in range(n): scale_boxes(shape1, y[i][:, :4], shape0[i]) return Detections(ims, y, files, dt, self.names, x.shape) class Detections: # YOLOv5 detections class for inference results def __init__(self, ims, pred, files, times=(0, 0, 0), names=None, shape=None): """Initializes the YOLOv5 Detections class with image info, predictions, filenames, timing and normalization.""" super().__init__() d = pred[0].device # device gn = [torch.tensor([*(im.shape[i] for i in [1, 0, 1, 0]), 1, 1], device=d) for im in ims] # normalizations self.ims = ims # list of images as numpy arrays self.pred = pred # list of tensors pred[0] = (xyxy, conf, cls) self.names = names # class names self.files = files # image filenames self.times = times # profiling times self.xyxy = pred # xyxy pixels self.xywh = [xyxy2xywh(x) for x in pred] # xywh pixels self.xyxyn = [x / g for x, g in zip(self.xyxy, gn)] # xyxy normalized self.xywhn = [x / g for x, g in zip(self.xywh, gn)] # xywh normalized self.n = len(self.pred) # number of images (batch size) self.t = tuple(x.t / self.n * 1e3 for x in times) # timestamps (ms) self.s = tuple(shape) # inference BCHW shape def _run(self, pprint=False, show=False, save=False, crop=False, render=False, labels=True, save_dir=Path("")): """Executes model predictions, displaying and/or saving outputs with optional crops and labels.""" s, crops = "", [] for i, (im, pred) in enumerate(zip(self.ims, self.pred)): s += f"\nimage {i + 1}/{len(self.pred)}: {im.shape[0]}x{im.shape[1]} " # string if pred.shape[0]: for c in pred[:, -1].unique(): n = (pred[:, -1] == c).sum() # detections per class s += f"{n} {self.names[int(c)]}{'s' * (n > 1)}, " # add to string s = s.rstrip(", ") if show or save or render or crop: annotator = Annotator(im, example=str(self.names)) for *box, conf, cls in reversed(pred): # xyxy, confidence, class label = f"{self.names[int(cls)]} {conf:.2f}" if crop: file = save_dir / "crops" / self.names[int(cls)] / self.files[i] if save else None crops.append( { "box": box, "conf": conf, "cls": cls, "label": label, "im": save_one_box(box, im, file=file, save=save), } ) else: # all others annotator.box_label(box, label if labels else "", color=colors(cls)) im = annotator.im else: s += "(no detections)" im = Image.fromarray(im.astype(np.uint8)) if isinstance(im, np.ndarray) else im # from np if show: if is_jupyter(): from IPython.display import display display(im) else: im.show(self.files[i]) if save: f = self.files[i] im.save(save_dir / f) # save if i == self.n - 1: LOGGER.info(f"Saved {self.n} image{'s' * (self.n > 1)} to {colorstr('bold', save_dir)}") if render: self.ims[i] = np.asarray(im) if pprint: s = s.lstrip("\n") return f"{s}\nSpeed: %.1fms pre-process, %.1fms inference, %.1fms NMS per image at shape {self.s}" % self.t if crop: if save: LOGGER.info(f"Saved results to {save_dir}\n") return crops @TryExcept("Showing images is not supported in this environment") def show(self, labels=True): """ Displays detection results with optional labels. Usage: show(labels=True) """ self._run(show=True, labels=labels) # show results def save(self, labels=True, save_dir="runs/detect/exp", exist_ok=False): """ Saves detection results with optional labels to a specified directory. Usage: save(labels=True, save_dir='runs/detect/exp', exist_ok=False) """ save_dir = increment_path(save_dir, exist_ok, mkdir=True) # increment save_dir self._run(save=True, labels=labels, save_dir=save_dir) # save results def crop(self, save=True, save_dir="runs/detect/exp", exist_ok=False): """ Crops detection results, optionally saves them to a directory. Args: save (bool), save_dir (str), exist_ok (bool). """ save_dir = increment_path(save_dir, exist_ok, mkdir=True) if save else None return self._run(crop=True, save=save, save_dir=save_dir) # crop results def render(self, labels=True): """Renders detection results with optional labels on images; args: labels (bool) indicating label inclusion.""" self._run(render=True, labels=labels) # render results return self.ims def pandas(self): """ Returns detections as pandas DataFrames for various box formats (xyxy, xyxyn, xywh, xywhn). Example: print(results.pandas().xyxy[0]). """ new = copy(self) # return copy ca = "xmin", "ymin", "xmax", "ymax", "confidence", "class", "name" # xyxy columns cb = "xcenter", "ycenter", "width", "height", "confidence", "class", "name" # xywh columns for k, c in zip(["xyxy", "xyxyn", "xywh", "xywhn"], [ca, ca, cb, cb]): a = [[x[:5] + [int(x[5]), self.names[int(x[5])]] for x in x.tolist()] for x in getattr(self, k)] # update setattr(new, k, [pd.DataFrame(x, columns=c) for x in a]) return new def tolist(self): """ Converts a Detections object into a list of individual detection results for iteration. Example: for result in results.tolist(): """ r = range(self.n) # iterable return [ Detections( [self.ims[i]], [self.pred[i]], [self.files[i]], self.times, self.names, self.s, ) for i in r ] def print(self): """Logs the string representation of the current object's state via the LOGGER.""" LOGGER.info(self.__str__()) def __len__(self): """Returns the number of results stored, overrides the default len(results).""" return self.n def __str__(self): """Returns a string representation of the model's results, suitable for printing, overrides default print(results). """ return self._run(pprint=True) # print results def __repr__(self): """Returns a string representation of the YOLOv5 object, including its class and formatted results.""" return f"YOLOv5 {self.__class__} instance\n" + self.__str__() class Proto(nn.Module): # YOLOv5 mask Proto module for segmentation models def __init__(self, c1, c_=256, c2=32): """Initializes YOLOv5 Proto module for segmentation with input, proto, and mask channels configuration.""" super().__init__() self.cv1 = Conv(c1, c_, k=3) self.upsample = nn.Upsample(scale_factor=2, mode="nearest") self.cv2 = Conv(c_, c_, k=3) self.cv3 = Conv(c_, c2) def forward(self, x): """Performs a forward pass using convolutional layers and upsampling on input tensor `x`.""" return self.cv3(self.cv2(self.upsample(self.cv1(x)))) class Classify(nn.Module): # YOLOv5 classification head, i.e. x(b,c1,20,20) to x(b,c2) def __init__( self, c1, c2, k=1, s=1, p=None, g=1, dropout_p=0.0 ): # ch_in, ch_out, kernel, stride, padding, groups, dropout probability super().__init__() c_ = 1280 # efficientnet_b0 size self.conv = Conv(c1, c_, k, s, autopad(k, p), g) self.pool = nn.AdaptiveAvgPool2d(1) # to x(b,c_,1,1) self.drop = nn.Dropout(p=dropout_p, inplace=True) self.linear = nn.Linear(c_, c2) # to x(b,c2) def forward(self, x): """Processes input through conv, pool, drop, and linear layers; supports list concatenation input.""" if isinstance(x, list): x = torch.cat(x, 1) return self.linear(self.drop(self.pool(self.conv(x)).flatten(1)))