# Copyright (c) 2023-24 Adrian Niemann, Dmitry Puzyrev, and others
#
# This file is part of ParticleDetection.
# ParticleDetection is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ParticleDetection is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ParticleDetection. If not, see <http://www.gnu.org/licenses/>.
"""
Collection of function to convert between different file formats used over the
course of the particle detection project, e.g. camera calibrations from MATLAB
to the now used json-format.
**Authors:** Adrian Niemann (adrian.niemann@ovgu.de)\n
**Date:** 02.11.2022
"""
import json
import logging
import os
from pathlib import Path
from typing import Iterable, List
import numpy as np
import pandas as pd
import scipy.io as sio
import ParticleDetection.utils.data_loading as dl
import ParticleDetection.utils.datasets as ds
_logger = logging.getLogger(__name__)
[docs]
def txt2mat(
input_folder: Path,
frames: Iterable[int],
expected_rods: int,
cam1_id: str = "gp1",
cam2_id: str = "gp2",
output_folder: Path = None,
) -> None:
"""Read rod position data in old ``*.txt`` format and save it in ``*.mat``
format.
Converts the rod positions from the ``*.txt`` format to ``*.mat`` format
assuming, that only one color is saved in the given input folder.
The converted files are then saved to two subfolders of the output folder,
named after ``cam1_id`` and ``cam2_id``.
Parameters
----------
input_folder : Path
Folder containing the 3D data in txt-files of format:
``{frame:05d}.txt``
frames : Iterable[int]
Frame numbers found in the input folder and intended to be converted.
expected_rods : int
Number of rods to expect in one frame.
cam1_id : str, optional
First camera's identifier in the given dataset.
By default ``"gp1"``.
cam2_id : str, optional
Second camera's identifier in the given dataset.
By default ``"gp2"``.
output_folder : Path, optional
Parent folder of the two output folders.
By default set to the parent folder of the input folder.
"""
col_names = [
col.format(id1=cam1_id, id2=cam2_id)
for col in ds.DEFAULT_COLUMNS
if "seen" not in col
]
data_format = str(input_folder.resolve()) + "/{:05d}.txt"
if output_folder is None:
output_folder = input_folder.parent
output_format = str(output_folder.resolve()) + "/{cam:s}/{frame:05d}.mat"
dt = np.dtype([("Point1", np.float, (2,)), ("Point2", np.float, (2,))])
data = dl.load_positions_from_txt(data_format, col_names, frames)
rods_cam1 = data[
[f"x1_{cam1_id}", f"y1_{cam1_id}", f"x2_{cam1_id}", f"y2_{cam1_id}"]
].to_numpy()
rods_cam2 = data[
[f"x1_{cam2_id}", f"y1_{cam2_id}", f"x2_{cam2_id}", f"y2_{cam2_id}"]
].to_numpy()
rods_cam1 = rods_cam1.reshape((-1, expected_rods, 4))
rods_cam2 = rods_cam2.reshape((-1, expected_rods, 4))
# Create output directories
test_out = output_format.format(cam=cam1_id, frame=0)
Path(test_out).parent.mkdir(parents=True, exist_ok=True)
test_out = output_format.format(cam=cam2_id, frame=0)
Path(test_out).parent.mkdir(parents=True, exist_ok=True)
for r_c1, r_c2, fr in zip(rods_cam1, rods_cam2, frames):
arr = np.zeros((expected_rods,), dtype=dt)
arr[:]["Point1"] = r_c1[:, 0:2]
arr[:]["Point2"] = r_c1[:, 2:]
out_file1 = output_format.format(cam=cam1_id, frame=fr)
sio.savemat(out_file1, {"rod_data_links": arr})
arr2 = np.zeros((expected_rods,), dtype=dt)
arr2[:]["Point1"] = r_c2[:, 0:2]
arr2[:]["Point2"] = r_c2[:, 2:]
out_file2 = output_format.format(cam=cam2_id, frame=fr)
sio.savemat(out_file2, {"rod_data_links": arr2})
[docs]
def csv_combine(
input_files: List[str], output_file: str = "rods_df.csv"
) -> str:
"""Concatenates multiple ``*.csv`` files to a single one.
The given input files are combined into a single one. The function does not
distinguish what data it is given and might fail, if it is not rod position
data in all given files. The function does NOT check for duplicates.
Parameters
----------
input_files : List[str]
``*.csv`` files that contains rod position data.
output_file : str, optional
Path to the output file. If this is just a file name without a path,
the parent directory of the first input file is taken as the intended
file location.
By default ``"rods_df.csv"``.
Returns
-------
str
Path to the written, combined file. The string is empty, if nothing has
been written.
"""
combined = pd.DataFrame()
written = ""
for file in input_files:
if not os.path.exists(file):
_logger.warning(f"The file {file} does not exist.")
continue
new_data = pd.read_csv(file, sep=",", index_col=0)
combined = pd.concat([combined, new_data])
if len(combined) > 0:
if not os.path.dirname(output_file):
output_file = os.path.join(
os.path.dirname(input_files[0]), output_file
)
combined.reset_index(drop=True, inplace=True)
combined.to_csv(output_file, sep=",")
written = output_file
return written
[docs]
def csv_split_by_frames(input_file: str, cut_frames: List[int]) -> List[str]:
"""Splits the rod data at the given frames.
Splits the given ``*.csv`` file into individual files at the given frame
numbers.
Example:\n
The data has frames from 0 to 33.\n
``cut_frames = [15, 20, 25]``\n
-> ``out_0_14.csv``, ``out_15_19.csv``, ``out_20_24.csv``,
``out_25_33.csv``
Parameters
----------
input_file : str
Path to a ``*.csv file`` containing rod position data.
cut_frames : List[int]
Frames at which to partition the data. All frames in the original data
are perserved.
The lower bound is inclusive, while the upper bound is exclusive.
Returns
-------
List[str]
List of paths to the written files. This list is empty, if no files
were written.
"""
written = []
data_main = pd.read_csv(input_file, sep=",", index_col=0)
base_path = os.path.splitext(input_file)[0]
for i in range(0, len(cut_frames) + 1):
if (i - 1) >= 0:
next_min = cut_frames[i - 1]
else:
next_min = data_main.frame.min()
try:
next_max = cut_frames[i]
except IndexError:
next_max = data_main.frame.max() + 1
next_slice = data_main.loc[
(data_main.frame >= next_min) & (data_main.frame < next_max)
]
if len(next_slice) == 0:
continue
next_slice.reset_index(drop=True, inplace=True)
new_path = base_path + f"_{next_min}_{next_max - 1}.csv"
next_slice.to_csv(new_path, sep=",")
written.append(new_path)
return written
[docs]
def convert_txt_config(folder: Path):
"""Convert camera calibrations from MATLAB's ``*.txt``/``*.mat`` to
``*.json`` format.
This function converts a stereo camera calibration saved by MATLAB to
``*.txt`` and ``*.mat`` files into the ``*.json`` format used by functions
in this package.
The resulting files are saved as ``converted.json`` and
``world_transformations_converted.json``.
Parameters
----------
folder : Path
Folder containing the stereo calibration output, consisting of the
following files: \n
``c.txt``, ``f.txt``, ``c2.txt``, ``f2.txt``, ``kc.txt``, ``kc2.txt``,
``R.txt``, ``transvek.txt``, ``transformations.mat``
"""
cm1 = np.zeros((3, 3))
cm1[[0, 1], [2, 2]] = np.loadtxt(folder / "c.txt")
cm1[[0, 1], [0, 1]] = np.loadtxt(folder / "f.txt")
cm1[2, 2] = 1.0
cm2 = np.zeros((3, 3))
cm2[[0, 1], [2, 2]] = np.loadtxt(folder / "c2.txt")
cm2[[0, 1], [0, 1]] = np.loadtxt(folder / "f2.txt")
cm2[2, 2] = 1.0
dist1 = np.loadtxt(folder / "kc.txt")
dist2 = np.loadtxt(folder / "kc2.txt")
R = np.loadtxt(folder / "R.txt", delimiter=",")
T = np.loadtxt(folder / "transvek.txt")
to_json = {
"CM1": cm1.tolist(),
"dist1": [dist1.tolist()],
"CM2": cm2.tolist(),
"dist2": [dist2.tolist()],
"R": R.tolist(),
"T": [T.tolist()],
}
with open(folder / "converted.json", "w") as f:
json.dump(to_json, f, indent=2)
trafos = sio.loadmat(folder / "transformations.mat")["transformations"][0][
0
]
world_to_json = {
"transformations": {
"M_rotate_x": trafos[0].tolist(),
"M_rotate_y": trafos[1].tolist(),
"M_rotate_z": trafos[2].tolist(),
"M_trans2": trafos[3].tolist(),
"M_trans": trafos[4].tolist(),
}
}
with open(folder / "world_transformations_converted.json", "w") as f:
json.dump(world_to_json, f, indent=2)