# Copyright (c) 2023 Adrian Niemann Dmitry Puzyrev
#
# This file is part of ParticleDetection.
# ParticleDetection is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ParticleDetection is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ParticleDetection. If not, see <http://www.gnu.org/licenses/>.
"""
Functions to export a Detectron2 model as a pure pytorch model and functions to
export its output to other formats.
**Author:** Adrian Niemann (adrian.niemann@ovgu.de)\n
**Date:** 31.10.2022
"""
import json
import logging
from pathlib import Path
from typing import Literal, Union
import cv2
import numpy as np
import pandas as pd
import scipy.io as sio
from skimage.measure import approximate_polygon
import torch
from detectron2.config import CfgNode
from detectron2.engine import DefaultPredictor
# Don't remove, registers model parts in detectron2
from detectron2.projects import point_rend # noqa: F401
from detectron2.export import TracingAdapter
from detectron2.data.detection_utils import read_image
import ParticleDetection.utils.data_conversions as d_conv
import ParticleDetection.utils.datasets as ds
import ParticleDetection.utils.helper_funcs as hf
_logger = logging.getLogger(__name__)
EXPORT_OPTIONS = Literal["cpu", "cuda"]
[docs]def get_sample_img(sample: Path) -> torch.Tensor:
"""Loads an image into the format necessary for inference by a Detectron2
model.
"""
img = read_image(sample, format="BGR")
img = torch.from_numpy(np.ascontiguousarray(img.transpose(2, 0, 1)))
return img
[docs]def export_model(config_path: Path, weights_path: Path, sample_img: Path,
option: EXPORT_OPTIONS = "cuda") -> None:
"""Exports a Detectron2 model to be usable with just pytorch.
Parameters
----------
config_path : Path
File that holds the model's configuration in yaml format.
weights_path : Path
File that holds the trained model's weights.
sample_img : Path
Image to be used to trace the model.
option : EXPORT_OPTIONS, optional
Option whether to restrict the exported model to be used on the CPU or
to also allow the use of a GPU.\n
By default ``"cuda"``.
Note
----
The GPU version then requires the pytorch GPU version to be installed.
The CPU version can be run with both, pytorch's CPU and GPU version.
"""
def inference_func(model, image):
inputs = [{"image": image}]
return model.inference(inputs, do_postprocess=False)[0]
cfg = CfgNode(CfgNode.load_yaml_with_base(str(config_path.resolve())))
cfg.MODEL.WEIGHTS = str(weights_path.resolve())
cfg.MODEL.DEVICE = option
image = get_sample_img(sample_img)
inputs = tuple(image.clone() for _ in range(1))
model = DefaultPredictor(cfg).model
wrapper = TracingAdapter(model, inputs, inference_func)
wrapper.eval()
with torch.no_grad():
traced_model = torch.jit.trace(wrapper, inputs)
# Save to disk
save_path = Path(f"./model_{cfg.MODEL.DEVICE}.pt").resolve()
torch.jit.save(traced_model, str(save_path))
_logger.info(f"Exported model to '{str(save_path)}'")
[docs]def annotation_to_json(prediction: ds.DetectionResult,
image: Union[Path, str],
classes: dict = None,
output_dir: Union[Path, str] = Path(),
*,
filename: str = "extracted_meta_data.json",
**_):
"""Saves detected object masks in the metadata format used for model
training.
.. hint::
This function is intended to be used as a saving function with
:func:`~ParticleDetection.modelling.runners.detection.detect`.
Parameters
----------
prediction : :class:`~ParticleDetection.utils.datasets.DetectionResult`
Prediction output of a Detectron2 network. It can also be given as
``prediction["instances"]`` as ``detectron2.structures.Instances`` or
``dict``, as long as the resulting ``dict`` contains at least the same
keys as :class:`~ParticleDetection.utils.datasets.DetectionResult`.
image : Union[Path, str]
Path to image that `prediction` was created from.
classes: dict, optional
Dictionary of classes detectable by the model with\n
``{key}`` -> Index of class in the model\n
``{value}`` -> Name of the class\n
By default ``None``.
output_dir : Path | str, optional
Path to a folder the output file will be written to.\n
By default ``Path()``.
filename : str, optional
Name of the ``*.json`` file the annotation data should be saved in.
Already existing data for an image in this file will be overwritten.\n
By default ``"extracted_meta_data.json"``.
"""
meta_data = {}
if isinstance(output_dir, str):
output_dir = Path(output_dir)
output = (output_dir / filename).resolve()
if output.exists():
try:
with open(output, "r") as f:
meta_data = json.load(f)
except json.JSONDecodeError:
# overwrite the file
_logger.warning(f"Metadata file is not readable and will "
f"be overwritten: {output}")
if isinstance(image, str):
image = Path(image)
image = image.resolve()
image_size = image.stat().st_size
image_id = image.name + str(image_size)
meta_data[image_id] = {
"filename": image.name,
"size": image_size,
"regions": [],
"file_attributes": {},
}
if "instances" in prediction.keys():
prediction = prediction["instances"].get_fields()
for k, v in prediction.items():
prediction[k] = v.to("cpu")
if classes is None:
classes = {cl: "not_defined" for cl in set(
prediction["pred_classes"].tolist())}
for i in range(len(prediction["pred_masks"])):
predicted_class = int(prediction["pred_classes"][i])
region = {
"shape_attributes": {
"name": "polygon",
"all_points_x": [],
"all_points_y": []
},
"region_attributes": {
"name": classes[predicted_class],
"type": str(predicted_class),
}
}
idxs = np.nonzero(prediction["pred_masks"][i])
# [outer_points, 2]
points = np.asarray((idxs[:, 1], idxs[:, 0])).swapaxes(0, 1)
hull = cv2.convexHull(points).squeeze()
if len(hull) > 20:
hull_prev = len(hull)
hull = approximate_polygon(hull, 1)
if hull_prev == len(hull):
_logger.warning(
f"No simplification could be performed for a segmentation "
f"polygon with {hull_prev} nodes.")
region["shape_attributes"]["all_points_x"] = hull[:, 0].tolist()
region["shape_attributes"]["all_points_y"] = hull[:, 1].tolist()
meta_data[image_id]["regions"].append(region)
with open(output, "w") as f:
_logger.info(f"Saving metadata to {output}")
json.dump(meta_data, f, indent=2)
[docs]def rods_to_mat(prediction: ds.DetectionResult,
image: Union[Path, str],
classes: dict = None,
output_dir: Union[Path, str] = Path(),
*_, **kwargs) -> None:
"""Extract rod enpoint positions from detected object masks and save them
to ``*.mat`` files.
The generated ``*.mat`` contain one variable `rod_data_links` with each
rod being represented by a `Point1` and `Point2`.
.. hint::
This function is intended to be used as a saving function with
:func:`~ParticleDetection.modelling.runners.detection.detect`.
Parameters
----------
prediction : :class:`~ParticleDetection.utils.datasets.DetectionResult`
Prediction output of a Detectron2 network. It can also be given as
``prediction["instances"]`` as ``detectron2.structures.Instances`` or
``dict``, as long as the resulting ``dict`` contains at least the same
keys as :class:`~ParticleDetection.utils.datasets.DetectionResult`.
image : Union[Path, str]
Path to image that `prediction` was created from.
classes : dict, optional
Dictionary of classes detectable by the model with\n
``{key}`` -> Index of class in the model\n
``{value}`` -> Name of the class\n
By default ``None``.
output_dir : Union[Path, str]
Path to a folder the output file will be written to.\n
By default ``Path()``.
**kwargs : dict, optional
Keywords, that are propagated to
:func:`~ParticleDetection.utils.helper_funcs.rod_endpoints`:\n
`method`: Literal["simple", "advanced"]\n
`expected_particles` : Union[int, Dict[int, int], None]
See also
--------
:func:`~ParticleDetection.utils.helper_funcs.rod_endpoints`
"""
if "instances" in prediction.keys():
prediction = prediction["instances"].get_fields()
if "pred_masks" not in prediction.keys():
return
if isinstance(image, str):
image = Path(image)
if isinstance(output_dir, str):
output_dir = Path(output_dir)
file_name_tmp = str(output_dir / image.stem) + "_{}.mat"
method = kwargs.pop("method", "simple")
expected_particles = kwargs.pop("expected_particles", None)
points = hf.rod_endpoints(prediction, classes, method, expected_particles)
for idx, vals in points.items():
if not vals.size:
# skip classes without saved points
continue
dt = np.dtype(
[('Point1', float, (2,)), ('Point2', float, (2,))])
arr = np.zeros((vals.shape[0],), dtype=dt)
arr[:]['Point1'] = vals[:, 0, :]
arr[:]['Point2'] = vals[:, 1, :]
sio.savemat(file_name_tmp.format(idx), {'rod_data_links': arr})
[docs]def rods_to_csv(prediction: ds.DetectionResult,
image: Union[Path, str, np.ndarray],
classes: dict = None,
output_dir: Union[Path, str] = Path(),
*,
filename: str = "extracted_rods.csv",
**kwargs) -> None:
"""Extract rod enpoint positions from detected object masks and save them
to ``*.csv`` files.
For each detected object, two enpoints are determined from the detected
segmentation mask. These endpoints are then saved in ``*.csv`` format
described by :data:`~ParticleDetection.utils.datasets.DEFAULT_COLUMNS`.
The data is either saved into a new file, given by ``filename`` and
``output_dir`` or integrated into this file, if it already exists.
.. hint::
This function is intended to be used as a saving function with
:func:`~ParticleDetection.modelling.runners.detection.detect`.
Parameters
----------
prediction : :class:`~ParticleDetection.utils.datasets.DetectionResult`
Prediction output of a Detectron2 network. It can also be given as
``prediction["instances"]`` as ``detectron2.structures.Instances`` or
``dict``, as long as the resulting ``dict`` contains at least the same
keys as :class:`~ParticleDetection.utils.datasets.DetectionResult`.
image : Union[Path, str, np.ndarray]
(Path to ) the image that `prediction` was created from.
classes : dict, optional
_description_\n
By default ``None``.
output_dir : Union[Path, str], optional
Path to a folder the output file will be written to.\n
By default ``Path()``.
filename : str, optional
Name of the ``*.csv`` file the rod position data should be saved in.
Already existing data for an image in this file might get
overwritten.\n
By default ``"extracted_rods.csv"``.
**kwargs : dict, optional
The following keywords are used to determine the used
frame-camera combination of the image used to create the
``prediction``. This allows the proper saving when a dataset format is
given to the
:func:`~ParticleDetection.modelling.runners.detection.detect` function
instead of a list of files.\n
`cam1_name` : str\n
`cam2_name` : str\n
`frames` : Iterable[int]\n
`dataset_format` : str\n
The following keyword arguments are passed to
:func:`~ParticleDetection.utils.helper_funcs.rod_endpoints`:\n
`method`: Literal["simple", "advanced"]\n
`expected_particles` : Union[int, Dict[int, int], None]
See also
--------
:func:`~ParticleDetection.utils.datasets.replace_missing_rods`
:func:`~ParticleDetection.utils.datasets.add_points`
:func:`~ParticleDetection.utils.helper_funcs.rod_endpoints`
:func:`~ParticleDetection.utils.data_conversions.csv_extract_colors`
"""
if "instances" in prediction.keys():
prediction = prediction["instances"].get_fields()
if "pred_masks" not in prediction.keys():
return
cam_1 = kwargs.get("cam1_name", None)
cam_2 = kwargs.get("cam2_name", None)
frames = kwargs.get("frames", None)
dataset = kwargs.get("dataset_format", "")
method = kwargs.pop("method", "simple")
expected_particles = kwargs.pop("expected_particles", None)
output_file = Path(output_dir) / filename
if not output_file.exists():
cols = [col.format(id1=cam_1, id2=cam_2) for col in ds.DEFAULT_COLUMNS]
data = pd.DataFrame(columns=cols)
else:
data = pd.read_csv(output_file, sep=",", index_col=0)
this_frame = -1
this_cam = ""
if frames is not None:
for frame in frames:
if cam_1 is not None and (
dataset.format(cam_id=cam_1, frame=frame) == image):
this_frame = frame
this_cam = cam_1
break
if cam_2 is not None and (
dataset.format(cam_id=cam_2, frame=frame) == image):
this_frame = frame
this_cam = cam_2
break
else:
if cam_1 is not None and (
dataset.format(cam_id=cam_1, frame=frame) == image):
this_cam = cam_1
if cam_2 is not None and (
dataset.format(cam_id=cam_2, frame=frame) == image):
this_cam = cam_2
points = hf.rod_endpoints(prediction, classes, method, expected_particles)
data = ds.add_points(points, data, this_cam, this_frame)
data.reset_index(drop=True, inplace=True)
data = ds.replace_missing_rods(data, cam_1, cam_2)
data.to_csv(output_file, ",")
d_conv.csv_extract_colors(str(output_file))