import os
import subprocess
from typing import Dict, List, Tuple
import numpy as np
import sklearn.preprocessing
import torch
import torchvision
from torch.utils.data import DataLoader
from torch.utils.tensorboard import SummaryWriter
from torchvision.models.detection.faster_rcnn import FastRCNNPredictor
from torchvision.transforms import transforms
from tqdm import tqdm
from kitcar_ml.traffic_sign_detection.detection_model import DetectionModel
[docs]class Model(DetectionModel):
def __init__(self, class_names: List[str], pretrained: bool = True):
"""Initialize a machine learning model for object detection.
Models are built on top of PyTorch's `pre-trained models.
<https://pytorch.org/docs/stable/torchvision/models.html>`_,
specifically the Faster R-CNN ResNet-50 FPN, but allow for
fine-tuning to predict on custom classes/labels.
Args:
class_names : A list of classes/labels for the model to predict.
If none given, uses the default classes specified
`here <https://pytorch.org/docs/stable/torchvision/models.html>`_.
pretrained: Whether to load pretrained weights or not.
"""
self._device = torch.device("cuda" if torch.cuda.is_available() else "cpu")
# background starts with an underscore, because LabelEncoder encodes alphabetically.
background_label = "_background"
if background_label not in class_names:
class_names.append(background_label)
# Load a model pre-trained on COCO
fasterrcnn = torchvision.models.detection.fasterrcnn_resnet50_fpn(
pretrained=pretrained
)
# Get the number of input features for the classifier
in_features = fasterrcnn.roi_heads.box_predictor.cls_score.in_features
# Replace the pre-trained head with a new one
fasterrcnn.roi_heads.box_predictor = FastRCNNPredictor(
in_features, len(class_names)
)
fasterrcnn.to(self._device)
self.class_name_encoder = sklearn.preprocessing.LabelEncoder()
self.class_name_encoder.fit(class_names)
# FasterRCNN needs the background to be the label with id 0
assert self.class_name_encoder.transform([background_label])[0] == 0
self._model = fasterrcnn
@staticmethod
def __get_prediction_indices(
boxes: List[np.ndarray], scores: List[float], min_score=0, max_iou=0.2
):
"""Apply non maximum suppression and min_score.
Returns resulting indices.
"""
nms_suppressed = torch.zeros(len(boxes))
nms_suppressed[torchvision.ops.nms(boxes, scores, iou_threshold=max_iou)] = True
lim = scores >= min_score
return torch.logical_and(lim, nms_suppressed)
[docs] @torch.no_grad()
def predict(
self, images: List[np.ndarray], **kwargs
) -> List[Tuple[List[np.ndarray], List[str], List[float]]]:
"""Take in a list of images and predict the bounding boxes.
Returns: A list of bounding boxes with labels and scores.
"""
self._model.eval()
# Convert to tensor
if not isinstance(images[0], torch.Tensor):
images = [transforms.ToTensor()(img) for img in images]
# Send images to the specified device
predictions = self._model([img.to(self._device) for img in images])
boxes_list, labels_list, scores_list = (
[prediction[key].to(torch.device("cpu")) for prediction in predictions]
for key in ("boxes", "labels", "scores")
)
indices = [
self.__get_prediction_indices(*per_image, **kwargs)
for per_image in zip(boxes_list, scores_list)
]
boxes_list = [boxes[index] for boxes, index in zip(boxes_list, indices)]
labels_list = [
self.class_name_encoder.inverse_transform(labels[index])
for labels, index in zip(labels_list, indices)
]
scores_list = [scores[index] for scores, index in zip(scores_list, indices)]
return [r for r in zip(boxes_list, labels_list, scores_list)]
DEFAULT_OPTIMIZER_KWARGS = {"lr": 0.005, "momentum": 0.9, "weight_decay": 0.0005}
[docs] def fit(
self,
data_loader: DataLoader,
val_data_loader: DataLoader,
epochs: int = 10,
optimizer_name: str = "SGD",
optimizer_args: Dict[str, float] = DEFAULT_OPTIMIZER_KWARGS,
visualize: bool = False,
tensorboard_path: str = "runs",
) -> List[float]:
"""Train the model on the given data_loader.
If given a validation data_loader, returns a list of loss scores at each epoch.
"""
torch.backends.cudnn.benchmark = True
if visualize:
os.makedirs(tensorboard_path, exist_ok=True)
subprocess.Popen(
f"tensorboard --logdir {tensorboard_path}",
shell=True,
stdout=subprocess.PIPE,
stderr=subprocess.PIPE,
)
print("Launched Tensorboard: http://localhost:6006")
writer = SummaryWriter(tensorboard_path)
# Get parameters that have grad turned on (i.e. parameters that should be trained)
parameters = [
parameter for parameter in self._model.parameters() if parameter.requires_grad
]
print(
f"Model has {sum(param.numel() for param in parameters)} trainable parameters."
)
dataset_len = len(data_loader)
print(f"Train on Dataset with length: {dataset_len}")
# Select optimizer
optim_func = getattr(torch.optim, optimizer_name)
# Create optimizer
optimizer = optim_func(parameters, **optimizer_args)
# Create scheduler
scheduler = torch.optim.lr_scheduler.ReduceLROnPlateau(
optimizer, patience=5, factor=0.1, min_lr=1e-5
)
# Track loss values while training
total_losses = []
# Train on the entire dataset for the specified number of times (epochs)
for epoch in tqdm(range(epochs), desc="Epoch"):
tqdm.write(f"Epoch #{epoch+1}: (lr={optimizer.param_groups[0]['lr']})")
# Training step
self._model.train()
epoch_losses = []
for iteration, (images, boxes, labels) in tqdm(
enumerate(data_loader), desc="Iteration", leave=False, total=dataset_len
):
global_step = epoch * dataset_len + iteration
if visualize:
images_labeled = []
for viz_image, viz_boxes, viz_labels in zip(images, boxes, labels):
viz_image = (255 * viz_image).type(torch.uint8)
image_labeled = torchvision.utils.draw_bounding_boxes(
viz_image, viz_boxes, viz_labels
)
images_labeled.append(image_labeled)
grid = torchvision.utils.make_grid(images_labeled, nrow=2)
writer.add_image(
"Training Images",
grid,
global_step=global_step,
)
# Move Tensors to the same device as model
images = [image.to(self._device) for image in images]
boxes = [box.to(self._device) for box in boxes]
# Convert class names to internal ids
labels = [
torch.as_tensor(
self.class_name_encoder.transform(label),
dtype=torch.int64,
device=self._device,
)
for label in labels
]
targets = [
{"boxes": box, "labels": label} for box, label in zip(boxes, labels)
]
# Calculate the model's loss (i.e. how well it does on the current
# image and target, with a lower loss being better)
loss_dict = self._model(images, targets)
iteration_loss = sum(loss for loss in loss_dict.values())
epoch_losses.append(iteration_loss.item())
if visualize:
writer.add_scalar(
"Training Loss",
iteration_loss.item(),
global_step=global_step,
)
# Zero any old/existing gradients on the model's parameters
optimizer.zero_grad()
# Compute gradients for each parameter based on the current loss calculation
iteration_loss.backward()
# Update model parameters from gradients:
# param -= learning_rate * param.grad
optimizer.step()
avg_loss = np.mean(np.array(epoch_losses))
stddev_loss = np.std(np.array(epoch_losses))
scheduler.step(avg_loss)
tqdm.write(f" l: {round(avg_loss,4)} +- {round(stddev_loss,4)}")
if val_data_loader:
tqdm.write(str(self.evaluate(val_data_loader)))
total_losses.extend(epoch_losses)
if visualize:
writer.close()
return total_losses
[docs] def save(self, file: str):
"""Save the internal model weights to a file.
Args:
file: The name of the file. Should have a .pth file extension.
"""
save_dict = {
"state_dict": self._model.state_dict(),
"class_names": self.class_name_encoder.classes_,
}
torch.save(save_dict, file)
[docs] @classmethod
def load(cls, file: str) -> "Model":
"""Load a model from a .pth file containing the model weights.
Args:
file: The path to the .pth file containing the saved model.
Returns:
The model loaded from the file.
"""
# Load the checkpoint
load_dict = torch.load(file, map_location=torch.device("cpu"))
# Create new Model
model = cls(list(load_dict["class_names"]), pretrained=False)
# Use parameters from saved state dict
model._model.load_state_dict(load_dict["state_dict"])
# Move internal model to correct device
model._model.to(model._device)
return model
[docs] def export_to_onnx(self, output_file: str):
"""Export this model into a onnx format.
Args:
output_file: Path to the output file
"""
dummy_input = torch.randn(1, 1, 300, 1280, requires_grad=True)
torch.onnx.export(
self._model.to(torch.device("cpu")),
dummy_input.cpu(),
output_file,
export_params=True,
opset_version=12,
do_constant_folding=True,
enable_onnx_checker=True,
)
[docs] def half(self):
"""Switch to FP16 values instead of FP32 to speed up inference.
This does only work on CUDA.
"""
self._model.half()