import argparse
import glob
import math
import os
import random
import threading
from typing import Callable, List
import numpy as np
from more_itertools import chunked
from kitcar_ml.utils.data_generation import object_generation_utils, utils
from kitcar_ml.utils.data_generation.generation_config import GenerationConfiguration
from kitcar_ml.utils.data_generation.sample_generator import SampleGenerator
[docs]def run_tasks(func: Callable[[str], None], input_images: List[str], num_workers: int):
task_step_size = math.ceil(len(input_images) / num_workers)
threads = [
threading.Thread(target=func, args=(imgs,))
for imgs in chunked(input_images, n=task_step_size)
]
for t in threads:
t.start()
for t in threads:
t.join()
[docs]def main(
image_dir: str,
config_path: str = None,
path_dataset_generated: str = ".",
augmentation_files: str = "./images_to_be_inserted",
seed: int = 1,
ts_sizes_path: str = None,
camera_specs_path: str = None,
oversampling_rate: int = 1,
num_workers: int = 4,
):
# If there are multiple workers, the execution order is not deterministic!
# Setting a seed does not make sense then.
if num_workers == 1:
random.seed(seed)
np.random.seed(seed)
config = GenerationConfiguration.from_yaml(config_path)
# Load images that are inserted
aug_images, name_list = utils.read_augmentation_data(augmentation_files)
dataset = utils.create_dataset(path_dataset_generated, name_list)
# Create generator of positions of traffic signs
sample_generator = SampleGenerator(
camera_specs_path,
ts_sizes_path,
config.min_distance,
config.max_distance,
)
# Find all input images & use oversampling to reuse each image multiple times
img_name_list = oversampling_rate * glob.glob(image_dir + "/*.png")
# Create&run tasks:
# Distribute creating synthetic images onto multiple threads.
def run_task(imgs):
object_generation_utils.load_and_create_artificial_images(
imgs, dataset, config, name_list, aug_images, sample_generator
)
run_tasks(func=run_task, input_images=img_name_list, num_workers=num_workers)
dataset.save_as_yaml(os.path.join(path_dataset_generated, "labels.yaml"))
if __name__ == "__main__":
parser = argparse.ArgumentParser()
parser.add_argument("--image-dir", help="Directory in which the input images are.")
parser.add_argument(
"--dataset-dir", help="Directory in which the dataset is created.", default="./out/"
)
parser.add_argument(
"--augmentation-files",
help="Directory in which the augmentations are located.",
default="./images_to_be_inserted",
)
parser.add_argument(
"--seed", help="Seed used to achieve reproducible randomness.", default=1, type=int
)
parser.add_argument(
"--camera",
help="Path to camera calibration used on vehicle.",
default=os.path.join(os.path.dirname(__file__), "./camera.yaml"),
type=str,
)
parser.add_argument(
"--config",
help="Path to general config file.",
default=os.path.join(os.path.dirname(__file__), "./config.yaml"),
type=str,
)
parser.add_argument(
"--ts-sizes",
help="Path to traffic sign sizes file.",
default=os.path.join(os.path.dirname(__file__), "./ts_sizes.yaml"),
type=str,
)
parser.add_argument(
"--dataset-size",
help="Total size of dataset.",
type=int,
)
parser.add_argument(
"--oversampling-rate",
help="How often is one loaded image used.",
type=int,
default=1,
)
parser.add_argument(
"--num-workers", help="How many threads are created.", type=int, default=4
)
args = parser.parse_args()
print("write yaml file and Images in Directory " + args.dataset_dir)
main(
image_dir=args.image_dir,
config_path=args.config,
path_dataset_generated=args.dataset_dir,
augmentation_files=args.augmentation_files,
ts_sizes_path=args.ts_sizes,
camera_specs_path=args.camera,
seed=args.seed,
oversampling_rate=args.oversampling_rate,
num_workers=args.num_workers,
)