Source code for kitcar_ml.utils.data_generation.data_generation_tool

import argparse
import glob
import math
import os
import random
import threading
from typing import Callable, List

import numpy as np
from more_itertools import chunked

from kitcar_ml.utils.data_generation import object_generation_utils, utils
from kitcar_ml.utils.data_generation.generation_config import GenerationConfiguration
from kitcar_ml.utils.data_generation.sample_generator import SampleGenerator


[docs]def run_tasks(func: Callable[[str], None], input_images: List[str], num_workers: int): task_step_size = math.ceil(len(input_images) / num_workers) threads = [ threading.Thread(target=func, args=(imgs,)) for imgs in chunked(input_images, n=task_step_size) ] for t in threads: t.start() for t in threads: t.join()
[docs]def main( image_dir: str, config_path: str = None, path_dataset_generated: str = ".", augmentation_files: str = "./images_to_be_inserted", seed: int = 1, ts_sizes_path: str = None, camera_specs_path: str = None, oversampling_rate: int = 1, num_workers: int = 4, ): # If there are multiple workers, the execution order is not deterministic! # Setting a seed does not make sense then. if num_workers == 1: random.seed(seed) np.random.seed(seed) config = GenerationConfiguration.from_yaml(config_path) # Load images that are inserted aug_images, name_list = utils.read_augmentation_data(augmentation_files) dataset = utils.create_dataset(path_dataset_generated, name_list) # Create generator of positions of traffic signs sample_generator = SampleGenerator( camera_specs_path, ts_sizes_path, config.min_distance, config.max_distance, ) # Find all input images & use oversampling to reuse each image multiple times img_name_list = oversampling_rate * glob.glob(image_dir + "/*.png") # Create&run tasks: # Distribute creating synthetic images onto multiple threads. def run_task(imgs): object_generation_utils.load_and_create_artificial_images( imgs, dataset, config, name_list, aug_images, sample_generator ) run_tasks(func=run_task, input_images=img_name_list, num_workers=num_workers) dataset.save_as_yaml(os.path.join(path_dataset_generated, "labels.yaml"))
if __name__ == "__main__": parser = argparse.ArgumentParser() parser.add_argument("--image-dir", help="Directory in which the input images are.") parser.add_argument( "--dataset-dir", help="Directory in which the dataset is created.", default="./out/" ) parser.add_argument( "--augmentation-files", help="Directory in which the augmentations are located.", default="./images_to_be_inserted", ) parser.add_argument( "--seed", help="Seed used to achieve reproducible randomness.", default=1, type=int ) parser.add_argument( "--camera", help="Path to camera calibration used on vehicle.", default=os.path.join(os.path.dirname(__file__), "./camera.yaml"), type=str, ) parser.add_argument( "--config", help="Path to general config file.", default=os.path.join(os.path.dirname(__file__), "./config.yaml"), type=str, ) parser.add_argument( "--ts-sizes", help="Path to traffic sign sizes file.", default=os.path.join(os.path.dirname(__file__), "./ts_sizes.yaml"), type=str, ) parser.add_argument( "--dataset-size", help="Total size of dataset.", type=int, ) parser.add_argument( "--oversampling-rate", help="How often is one loaded image used.", type=int, default=1, ) parser.add_argument( "--num-workers", help="How many threads are created.", type=int, default=4 ) args = parser.parse_args() print("write yaml file and Images in Directory " + args.dataset_dir) main( image_dir=args.image_dir, config_path=args.config, path_dataset_generated=args.dataset_dir, augmentation_files=args.augmentation_files, ts_sizes_path=args.ts_sizes, camera_specs_path=args.camera, seed=args.seed, oversampling_rate=args.oversampling_rate, num_workers=args.num_workers, )