Source code for kitcar_ml.traffic_sign_detection.fasterrcnn.model

import os
import subprocess
from typing import Dict, List, Tuple

import numpy as np
import sklearn.preprocessing
import torch
import torchvision
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.transforms import transforms
from tqdm import tqdm

from kitcar_ml.traffic_sign_detection.detection_model import DetectionModel


[docs]class Model(DetectionModel): def __init__(self, class_names: List[str], pretrained: bool = True): """Initialize a machine learning model for object detection. Models are built on top of PyTorch's `pre-trained models. <https://pytorch.org/docs/stable/torchvision/models.html>`_, specifically the Faster R-CNN ResNet-50 FPN, but allow for fine-tuning to predict on custom classes/labels. Args: class_names : A list of classes/labels for the model to predict. If none given, uses the default classes specified `here <https://pytorch.org/docs/stable/torchvision/models.html>`_. pretrained: Whether to load pretrained weights or not. """ self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu") # background starts with an underscore, because LabelEncoder encodes alphabetically. background_label = "_background" if background_label not in class_names: class_names.append(background_label) # Load a model pre-trained on COCO fasterrcnn = torchvision.models.detection.fasterrcnn_resnet50_fpn( pretrained=pretrained ) # Get the number of input features for the classifier in_features = fasterrcnn.roi_heads.box_predictor.cls_score.in_features # Replace the pre-trained head with a new one fasterrcnn.roi_heads.box_predictor = FastRCNNPredictor( in_features, len(class_names) ) fasterrcnn.to(self._device) self.class_name_encoder = sklearn.preprocessing.LabelEncoder() self.class_name_encoder.fit(class_names) # FasterRCNN needs the background to be the label with id 0 assert self.class_name_encoder.transform([background_label])[0] == 0 self._model = fasterrcnn @staticmethod def __get_prediction_indices( boxes: List[np.ndarray], scores: List[float], min_score=0, max_iou=0.2 ): """Apply non maximum suppression and min_score. Returns resulting indices. """ nms_suppressed = torch.zeros(len(boxes)) nms_suppressed[torchvision.ops.nms(boxes, scores, iou_threshold=max_iou)] = True lim = scores >= min_score return torch.logical_and(lim, nms_suppressed)
[docs] @torch.no_grad() def predict( self, images: List[np.ndarray], **kwargs ) -> List[Tuple[List[np.ndarray], List[str], List[float]]]: """Take in a list of images and predict the bounding boxes. Returns: A list of bounding boxes with labels and scores. """ self._model.eval() # Convert to tensor if not isinstance(images[0], torch.Tensor): images = [transforms.ToTensor()(img) for img in images] # Send images to the specified device predictions = self._model([img.to(self._device) for img in images]) boxes_list, labels_list, scores_list = ( [prediction[key].to(torch.device("cpu")) for prediction in predictions] for key in ("boxes", "labels", "scores") ) indices = [ self.__get_prediction_indices(*per_image, **kwargs) for per_image in zip(boxes_list, scores_list) ] boxes_list = [boxes[index] for boxes, index in zip(boxes_list, indices)] labels_list = [ self.class_name_encoder.inverse_transform(labels[index]) for labels, index in zip(labels_list, indices) ] scores_list = [scores[index] for scores, index in zip(scores_list, indices)] return [r for r in zip(boxes_list, labels_list, scores_list)]
DEFAULT_OPTIMIZER_KWARGS = {"lr": 0.005, "momentum": 0.9, "weight_decay": 0.0005}
[docs] def fit( self, data_loader: DataLoader, val_data_loader: DataLoader, epochs: int = 10, optimizer_name: str = "SGD", optimizer_args: Dict[str, float] = DEFAULT_OPTIMIZER_KWARGS, visualize: bool = False, tensorboard_path: str = "runs", ) -> List[float]: """Train the model on the given data_loader. If given a validation data_loader, returns a list of loss scores at each epoch. """ torch.backends.cudnn.benchmark = True if visualize: os.makedirs(tensorboard_path, exist_ok=True) subprocess.Popen( f"tensorboard --logdir {tensorboard_path}", shell=True, stdout=subprocess.PIPE, stderr=subprocess.PIPE, ) print("Launched Tensorboard: http://localhost:6006") writer = SummaryWriter(tensorboard_path) # Get parameters that have grad turned on (i.e. parameters that should be trained) parameters = [ parameter for parameter in self._model.parameters() if parameter.requires_grad ] print( f"Model has {sum(param.numel() for param in parameters)} trainable parameters." ) dataset_len = len(data_loader) print(f"Train on Dataset with length: {dataset_len}") # Select optimizer optim_func = getattr(torch.optim, optimizer_name) # Create optimizer optimizer = optim_func(parameters, **optimizer_args) # Create scheduler scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau( optimizer, patience=5, factor=0.1, min_lr=1e-5 ) # Track loss values while training total_losses = [] # Train on the entire dataset for the specified number of times (epochs) for epoch in tqdm(range(epochs), desc="Epoch"): tqdm.write(f"Epoch #{epoch+1}: (lr={optimizer.param_groups[0]['lr']})") # Training step self._model.train() epoch_losses = [] for iteration, (images, boxes, labels) in tqdm( enumerate(data_loader), desc="Iteration", leave=False, total=dataset_len ): global_step = epoch * dataset_len + iteration if visualize: images_labeled = [] for viz_image, viz_boxes, viz_labels in zip(images, boxes, labels): viz_image = (255 * viz_image).type(torch.uint8) image_labeled = torchvision.utils.draw_bounding_boxes( viz_image, viz_boxes, viz_labels ) images_labeled.append(image_labeled) grid = torchvision.utils.make_grid(images_labeled, nrow=2) writer.add_image( "Training Images", grid, global_step=global_step, ) # Move Tensors to the same device as model images = [image.to(self._device) for image in images] boxes = [box.to(self._device) for box in boxes] # Convert class names to internal ids labels = [ torch.as_tensor( self.class_name_encoder.transform(label), dtype=torch.int64, device=self._device, ) for label in labels ] targets = [ {"boxes": box, "labels": label} for box, label in zip(boxes, labels) ] # Calculate the model's loss (i.e. how well it does on the current # image and target, with a lower loss being better) loss_dict = self._model(images, targets) iteration_loss = sum(loss for loss in loss_dict.values()) epoch_losses.append(iteration_loss.item()) if visualize: writer.add_scalar( "Training Loss", iteration_loss.item(), global_step=global_step, ) # Zero any old/existing gradients on the model's parameters optimizer.zero_grad() # Compute gradients for each parameter based on the current loss calculation iteration_loss.backward() # Update model parameters from gradients: # param -= learning_rate * param.grad optimizer.step() avg_loss = np.mean(np.array(epoch_losses)) stddev_loss = np.std(np.array(epoch_losses)) scheduler.step(avg_loss) tqdm.write(f" l: {round(avg_loss,4)} +- {round(stddev_loss,4)}") if val_data_loader: tqdm.write(str(self.evaluate(val_data_loader))) total_losses.extend(epoch_losses) if visualize: writer.close() return total_losses
[docs] def save(self, file: str): """Save the internal model weights to a file. Args: file: The name of the file. Should have a .pth file extension. """ save_dict = { "state_dict": self._model.state_dict(), "class_names": self.class_name_encoder.classes_, } torch.save(save_dict, file)
[docs] @classmethod def load(cls, file: str) -> "Model": """Load a model from a .pth file containing the model weights. Args: file: The path to the .pth file containing the saved model. Returns: The model loaded from the file. """ # Load the checkpoint load_dict = torch.load(file, map_location=torch.device("cpu")) # Create new Model model = cls(list(load_dict["class_names"]), pretrained=False) # Use parameters from saved state dict model._model.load_state_dict(load_dict["state_dict"]) # Move internal model to correct device model._model.to(model._device) return model
[docs] def export_to_onnx(self, output_file: str): """Export this model into a onnx format. Args: output_file: Path to the output file """ dummy_input = torch.randn(1, 1, 300, 1280, requires_grad=True) torch.onnx.export( self._model.to(torch.device("cpu")), dummy_input.cpu(), output_file, export_params=True, opset_version=12, do_constant_folding=True, enable_onnx_checker=True, )
[docs] def half(self): """Switch to FP16 values instead of FP32 to speed up inference. This does only work on CUDA. """ self._model.half()