Source code for ParticleDetection.reconstruct_3D.matchND

# Copyright (c) 2023-24 Adrian Niemann, Dmitry Puzyrev
#
# This file is part of ParticleDetection.
# ParticleDetection is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# ParticleDetection is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with ParticleDetection. If not, see <http://www.gnu.org/licenses/>.

"""
Functions to reconstruct 3D rod endpoints from images of a stereo camera
system by solving an n-partite matching problem between the rods within one
frame and the respective previous frame .

**Authors:**    Adrian Niemann (adrian.niemann@ovgu.de)\n
**Date:**       31.10.2022

"""
import copy
import itertools
import os
import pathlib
import warnings
from typing import Iterable, Tuple

import cv2
import numpy as np
import pandas as pd
import pulp
from scipy.spatial.transform import Rotation as R
from tqdm import tqdm

import ParticleDetection.utils.data_loading as dl
from ParticleDetection.reconstruct_3D import match2D


# BUG: the minimization does not work yet
[docs] def npartite_matching( weights: np.ndarray, maximize: bool = True, solver: pulp.LpSolver = pulp.PULP_CBC_CMD(msg=False), ) -> Tuple[np.ndarray]: """Solve an n-partite matching problem. The weights given represent the cost of assigning the entities associated with the respective indices together. Parameters ---------- weights : ndarray A n-dimensional matrix where each entry is the cost associated with choosing the combination of indices. The number of dimensions represent the number of groups and the size of each dimension the number of members in this group. maximize : bool, optional By default ``True``. solver : LpSolver, optional A solver for the matching problem.\n By default ``PULP_CBC_CMD(msg=False)``. Returns ------- Tuple[ndarray] The length of the tuple is equal to the number of dimensions of ``weights``. Each element of the tuple has a maximum size of the smallest dimension of ``weights``. The elements of the arrays represent the index of the groups paired member. Examples -------- The following weights are defined: ``weights.shape`` -> ``[12, 12, 4]`` This means three groups must be associated to each other, with four paths, because the smallest dimension has size ``4``. ``weights[0, 5, 2]`` then represents the cost of pairing element ``0`` of group ``0`` with element ``5`` of group ``1`` and element ``2`` of group ``2``. >>> npartite_matching(np.random((12, 12, 4))) (array([2, 9, 4, 11]), array([6, 9, 1, 0]), array([2, 1, 0, 3])) Note ---- Adapted from: https://stackoverflow.com/questions/60940781/solving-the-assignment-problem-for-3-groups-instead-of-2 """ if not maximize: warnings.warn( "Minimization is currently not fully supported." "Results might be incorrect." ) # get dimensions from weights array dims = weights.shape # prepare auxiliary variables grid = [range(dim) for dim in dims] varx = itertools.product(*grid) # initialize variables xxx = pulp.LpVariable.dicts("xxx", varx, cat=pulp.LpBinary) # initialize optimization problem if maximize: problem = pulp.LpProblem("RodMatching_max", pulp.LpMaximize) else: problem = pulp.LpProblem("RodMatching_min", pulp.LpMinimize) # set objective # sum_ijk... c_ijk... x_ijk... problem += pulp.lpSum([weights[iii] * xxx[iii] for iii in xxx]) # set constraints # sum_i x_ijk... <= 1 # sum_j x_ijk... <= 1 # sum... for idi, dim in enumerate(dims): for idv in range(dim): gric = copy.deepcopy(grid) gric[idi] = [idv] vary = itertools.product(*gric) problem += pulp.lpSum(xxx[iii] for iii in vary) <= 1 problem.solve(solver) # write binary variables to array rex = weights.copy() * 0 for iii in xxx: rex[iii] = xxx[iii].value() # find optimal matching = path and path weights whr = np.where(rex) return whr
[docs] def create_weights( p_3D: np.ndarray, p_3D_prev: np.ndarray, repr_errs: np.ndarray ) -> np.ndarray: """Generate weights with 3D-displacement*reprojection-error. Creates a weight matrix for matching rods between frames, using the rod endpoint displacement between frames times the reprojection-error of the rod endpoints as the cost of an assignment. Parameters ---------- p_3D : np.ndarray Shape must be in (rod_ids(cam1), rod_ids(cam2), 4, 3) Dimension explanations: (rod_id(cam1), rod_id(cam2), endpoint-combination, 3D-coordinates) p_3D_prev : np.ndarray Shape must be in (rod_ids, 2, 3). Dimension explanations: (rod_id, end-point, 3D-coordinates) repr_errs : np.ndarray Shape must be in (rod_ids(cam1), rod_ids(cam2), 4, 2) Dimension explanations: (rod_id(cam1), rod_id(cam2), end-combo, err{cam1, cam2}) Returns ------- weights: np.ndarray Weights in the shape of (rod_id, rod_ids(cam1), rod_ids(cam2)) point_choices: np.ndarray Choices of minimizing endpoint combination (rod_id, rod_ids(cam1), rod_ids(cam2)) """ rods1, rods2 = p_3D.shape[0:2] rods_prev = p_3D_prev.shape[0] weights = np.zeros((rods_prev, rods1, rods2)) point_choices = np.zeros((rods_prev, rods1, rods2)) for i in range(rods_prev): prev_p1 = p_3D_prev[i, 0, :] prev_p2 = p_3D_prev[i, 1, :] for j in range(rods1): for k in range(rods2): c1_p1 = p_3D[j, k, 0, :] c1_p2 = p_3D[j, k, 3, :] c2_p1 = p_3D[j, k, 1, :] c2_p2 = p_3D[j, k, 2, :] c1_re = np.sum(repr_errs[j, k, 0::3, :]) c2_re = np.sum(repr_errs[j, k, 1:2, :]) # 0 -> 0,3 s1 = ( np.linalg.norm(c1_p1 - prev_p1, axis=-1) + np.linalg.norm(c1_p2 - prev_p2, axis=-1) ) * c1_re # 1 -> 1,2 s2 = ( np.linalg.norm(c2_p1 - prev_p1, axis=-1) + np.linalg.norm(c2_p2 - prev_p2, axis=-1) ) * c2_re # 2 -> 2,1 s3 = ( np.linalg.norm(c2_p1 - prev_p2, axis=-1) + np.linalg.norm(c2_p2 - prev_p1, axis=-1) ) * c2_re # 3 -> 3,0 s4 = ( np.linalg.norm(c1_p1 - prev_p2, axis=-1) + np.linalg.norm(c1_p2 - prev_p1, axis=-1) ) * c1_re weights[i, j, k] = np.min([s1, s2, s3, s4]) point_choices[i, j, k] = np.argmin([s1, s2, s3, s4]) weights = np.max(weights) - weights return weights, point_choices
[docs] def assign( input_folder: str, output_folder: str, colors: Iterable[str], cam1_name: str = "gp1", cam2_name: str = "gp2", frame_numbers: Iterable[int] = None, calibration_file: str = None, transformation_file: str = None, solver: pulp.LpSolver = pulp.PULP_CBC_CMD(msg=False), ) -> Tuple[np.ndarray]: """Matches, triangulates and tracks rods over frames from ``*.csv data`` files. The function matches rods over multiple frames using npartite matching and a combination of 3D displacement and 2D reprojection error as the weights. This results in 3D reconstructed rods, that are tracked over the course of the given frames. It takes ``*.csv files`` with the columns from :const:`~ParticleDetection.utils.datasets.DEFAULT_COLUMNS` as input and also outputs the results in this format. The resulting dataset is saved in the given output folder. Parameters ---------- input_folder : str Folder containing the ``*.csv`` files for all colors given in ``colors``. output_folder : str Folder to write the output to. The parent folder of this must exist already. colors : Iterable[str] Names of the colors present in the dataset. See :const:`~ParticleDetection.utils.datasets.DEFAULT_COLUMNS`. cam1_name : str, optional First camera's identifier in the given dataset.\n By default ``"gp1"``. cam2_name : str, optional Second camera's identifier in the given dataset.\n By default ``"gp2"``. frame_numbers : Iterable[int], optional An iterable of frame numbers present in the data.\n By default ``None``. calibration_file : str, optional Path to a ``*.json`` file with stereocalibration data for the cameras which produced the images for the rod position data.\n By default the calibration constructed with Matlab for GP1 and GP2 is used. transformation_file : str, optional Path to a ``*.json`` file with transformation matrices expressing the transformation from the first camera's coordinate system to the world/box coordinate system.\n By default the transformation constructed with Matlab is used. solver : LpSolver, optional Solver that is used for the n-partite matching problem.\n Default is ``PULP_CBC_CMD(msg=False)``. Returns ------- Tuple[ndarray, ndarray] [0]: reprojection errors\n [1]: rod lengths """ if calibration_file is None: this_dir = pathlib.Path(__file__).parent.resolve() calibration_file = this_dir.joinpath( "example_calibration/Matlab/gp12.json" ) if transformation_file is None: this_dir = pathlib.Path(__file__).parent.resolve() transformation_file = this_dir.joinpath( "example_calibration/Matlab/world_transformation.json" ) if not os.path.exists(output_folder): os.mkdir(output_folder) calibration = dl.load_camera_calibration(str(calibration_file)) transforms = dl.load_world_transformation(str(transformation_file)) # Derive projection matrices from the calibration r1 = np.eye(3) t1 = np.expand_dims(np.array([0.0, 0.0, 0.0]), 1) P1 = np.vstack((r1.T, t1.T)) @ calibration["CM1"].T P1 = P1.T r2 = calibration["R"] t2 = calibration["T"] P2 = np.vstack((r2.T, t2.T)) @ calibration["CM2"].T P2 = P2.T # Preparation of world transformations rot = R.from_matrix(transforms["rotation"]) trans = transforms["translation"] all_repr_errs = [] all_rod_lengths = [] for color in colors: f_in = input_folder + f"/rods_df_{color}.csv" data = pd.read_csv(f_in, sep=",", index_col=0) df_out = pd.DataFrame() for fn in tqdm(range(len(frame_numbers)), colour="green"): frame = frame_numbers[fn] if fn == 0: # Matching without a previous frame available tmp_df, tmp_costs, tmp_lengths = match2D.match_frame( data, cam1_name, cam2_name, frame, color, calibration, P1, P2, rot, trans, r1, r2, t1, t2, renumber=True, ) else: # Matching with a previous frame (+data) available tmp_df, tmp_costs, tmp_lengths = match_frame( data, cam1_name, cam2_name, frame, color, calibration, P1, P2, rot, trans, r1, r2, t1, t2, ) df_out = pd.concat([df_out, tmp_df]) all_repr_errs.append(tmp_costs) all_rod_lengths.append(tmp_lengths) # Save results to disk df_out.reset_index(drop=True, inplace=True) df_out.to_csv( os.path.join(output_folder, f"rods_df_{color}.csv"), sep="," ) return np.asarray(all_repr_errs), np.asarray(all_rod_lengths)
[docs] def match_frame( data: pd.DataFrame, cam1_name: str, cam2_name: str, frame: int, color: str, calibration: dict, P1: np.ndarray, P2: np.ndarray, rot: R, trans: np.ndarray, r1: np.ndarray, r2: np.ndarray, t1: np.ndarray, t2: np.ndarray, ): """Matches, triangulates and tracks rods for one frame from a ``DataFrame``. Parameters ---------- data : DataFrame Dataset of rod positions. cam1_name : str First camera's identifier in the given dataset, e.g. ``"gp1"``. cam2_name : str Second camera's identifier in the given dataset, e.g. ``"gp2"``. frame : int Frame in ``data`` who's endpoints shall be (re-)evaluated. color : str Color of the rods in ``data`` to match. calibration : dict Stereocamera calibration parameters with the required fields:\n ``"CM1"``: camera matrix of cam1\n ``"R"``: rotation matrix between cam1 & cam2\n ``"T"``: translation vector between cam1 & cam2\n ``"CM2"``: camera matrix of cam2 P1 : ndarray Projection matrix for camera 1. P2 : ndarray Projection matrix for camera 2. rot : Rotation Rotation from camera 1 coordinates to *world*/*experiment* coordinates. trans : ndarray Translation vector as part of the transformation to *world*/*experiment* coordinates. r1 : ndarray Rotation matrix of camera 1. r2 : ndarray Rotation matrix of camera 2. t1 : ndarray Translation vector of camera 1. t2 : ndarray Translation vector of camera 2. Returns ------- Tuple[DataFrame, ndarray, ndarray] Returns the ``DataFrame`` with tracked rods for ``frame`` of ``color``. Additionally, returns the assignment costs, i.e. the sum of end point reprojection errors per rod. Lastly, returns the lengths of the reconstructed rods. Raises ------ ValueError Is raised when an *unbalanced* dataset is encountered, i.e. on consecutive frames or camera angles an unequal number of rods is present. """ # Load data cols_cam1 = [ f"x1_{cam1_name}", f"y1_{cam1_name}", f"x2_{cam1_name}", f"y2_{cam1_name}", ] cols_cam2 = [ f"x1_{cam2_name}", f"y1_{cam2_name}", f"x2_{cam2_name}", f"y2_{cam2_name}", ] _data_cam1 = data.loc[data.frame == frame, cols_cam1] _data_cam2 = data.loc[data.frame == frame, cols_cam2] # remove rows with NaNs or only 0s _data_cam1.dropna(how="all", inplace=True) _data_cam2.dropna(how="all", inplace=True) _data_cam1 = _data_cam1.loc[(_data_cam1 != 0).any(axis=1)] _data_cam2 = _data_cam2.loc[(_data_cam2 != 0).any(axis=1)] if len(_data_cam1.index) == 0 or len(_data_cam2.index) == 0: # no rod data available for matching return # format of rods_camX: [rod, point, coordinate(x/y)] rods_cam1 = _data_cam1.to_numpy().reshape(-1, 2, 2) rods_cam2 = _data_cam2.to_numpy().reshape(-1, 2, 2) # Undistort points using the camera calibration tmp_points = cv2.undistortImagePoints( rods_cam1.reshape(2, -1), calibration["CM1"], calibration["dist1"] ).squeeze() undist_cam1 = np.zeros(tmp_points.shape) tmp_points = np.concatenate([tmp_points[:, 0], tmp_points[:, 1]]) for i in range(len(tmp_points)): undist_cam1[i // 2][i % 2] = tmp_points[i] undist_cam1 = undist_cam1.reshape((-1, 2, 2)) tmp_points = cv2.undistortImagePoints( rods_cam2.reshape(2, -1), calibration["CM2"], calibration["dist2"] ).squeeze() undist_cam2 = np.zeros(tmp_points.shape) tmp_points = np.concatenate([tmp_points[:, 0], tmp_points[:, 1]]) for i in range(len(tmp_points)): undist_cam2[i // 2][i % 2] = tmp_points[i] undist_cam2 = undist_cam2.reshape((-1, 2, 2)) # Triangulation of all possible point-pairs to 3D pairs_all = [ list(itertools.product(p[0], p[1])) for p in itertools.product(undist_cam1, undist_cam2) ] pairs_original = [ list(itertools.product(p[0], p[1])) for p in itertools.product(rods_cam1, rods_cam2) ] pairs_all = np.reshape(pairs_all, (-1, 2, 2)) pairs_original = np.reshape(pairs_original, (-1, 2, 2)) p_triang = cv2.triangulatePoints( P1, P2, pairs_all[:, 0, :].squeeze().transpose(), pairs_all[:, 1, :].squeeze().transpose(), ) p_triang = np.asarray([p[0:3] / p[3] for p in p_triang.transpose()]) # Reprojection to the image plane for point matching repr_cam1 = cv2.projectPoints( p_triang, r1, t1, calibration["CM1"], calibration["dist1"] )[0].squeeze() repr_cam2 = cv2.projectPoints( p_triang, r2, t2, calibration["CM2"], calibration["dist2"] )[0].squeeze() repr_cam1 = pairs_original[:, 0, :] - repr_cam1 repr_cam2 = pairs_original[:, 1, :] - repr_cam2 # p_repr: [combo, err_point, cam] p_repr = np.stack([repr_cam1, repr_cam2], axis=2) p_repr = np.swapaxes(p_repr, 1, 2) # [combo, cam, err_point] repr_errs = np.sum(np.linalg.norm(p_repr, axis=2), axis=1) # Transformation to world coordinates p_triang = rot.apply(p_triang) + trans # Consolidate data # Caution: the data order is different form the MATLAB script # ---> Matlab: (p11, p21), (p12, p21), (p11, p22), (p12, p22) # ---> Python: (p11, p21), (p11, p22), (p12, p21), (p12, p22) # repr_errs desired shape:[block x err(p)] with # block: re11, re12, re21, re22 repr_errs = np.reshape(repr_errs, (-1, 4)) costs = np.reshape( np.min( [ np.sum(repr_errs[:, 0::3], axis=1), np.sum(repr_errs[:, 1:3], axis=1), ], axis=0, ), (len(undist_cam1), len(undist_cam2)), ) # 3-assignment matching p_triang = p_triang.reshape((len(rods_cam1), len(rods_cam2), 4, 3)) rep_errs = np.linalg.norm(p_repr, axis=2) rep_errs = np.reshape(rep_errs, (len(rods_cam1), len(rods_cam2), 4, 2)) # TODO: check for NaN, so that an error is thrown, if NaNs encountered in # 3D coordinates last_points = data.loc[ data.frame == frame - 1, ["x1", "y1", "z1", "x2", "y2", "z2"] ] last_points = last_points.to_numpy() last_points = last_points.reshape((-1, 2, 3)) # rep_errs: (rod_id(cam1), rod_id(cam2), end-combo, err{cam1, cam2}) # p_triang: (rod_id(cam1), rod_id(cam2), end-combo, 3D-coordinates) # last_points: (rod_id, end-point, 3D-coordinates) weights, point_choices = create_weights(p_triang, last_points, rep_errs) rod, cam1_ind, cam2_ind = npartite_matching(weights, maximize=True) rod_nums = list(range(weights.shape[0])) idx_out = np.empty((3, weights.shape[0])) idx_out.fill(np.nan) idx_out[:, rod] = np.stack([rod, cam1_ind, cam2_ind], axis=0) if np.isnan(idx_out).any(): idx_out[0, np.isnan(idx_out[0, :])] = [ r for r in rod_nums if r not in rod ] idx_out[1, np.isnan(idx_out[1, :])] = [ r for r in rod_nums if r not in cam1_ind ] idx_out[2, np.isnan(idx_out[2, :])] = [ r for r in rod_nums if r not in cam2_ind ] idx_out = idx_out.astype(int) point_choices = point_choices.astype(int) out = np.zeros((idx_out.shape[1], 2 * 3 + 3 + 1 + 4 + 4)) for rod_id in range(idx_out.shape[1]): idx_r = idx_out[0, rod_id] i1 = idx_out[1, rod_id] i2 = idx_out[2, rod_id] try: i3 = point_choices[idx_r, i1, i2] except IndexError: raise ValueError( "Unbalanced dataset given. Please use dataset " "with the same number of rods on every frame." ) out[idx_r, 0:6] = np.concatenate( (p_triang[i1, i2, i3], p_triang[i1, i2, 3 - i3]) ) out[idx_r, 6:9] = ( p_triang[i1, i2, i3] + p_triang[i1, i2, 3 - i3] ) / 2.0 out[idx_r, 9] = np.linalg.norm( p_triang[i1, i2, i3] - p_triang[i1, i2, 3 - i3], axis=0 ) out[idx_r, 10:14] = rods_cam1[i1, :].flatten() out[idx_r, 14:] = rods_cam2[i2, :].flatten() out_columns = [ "x1", "y1", "z1", "x2", "y2", "z2", "x", "y", "z", "l", *cols_cam1, *cols_cam2, ] tmp_df = pd.DataFrame(out, columns=out_columns) tmp_df["frame"] = frame tmp_df["color"] = color tmp_df["particle"] = rod_nums seen_cols = [col for col in data.columns if "seen" in col] tmp_df[seen_cols] = 1 return tmp_df, costs[idx_out[1, :], idx_out[2, :]], out[:, 9]