improved projection script + demo angular projection instead of original

id
This commit is contained in:
Jan Kowalczyk
2024-06-28 09:46:38 +02:00
parent de5f74df1f
commit ddcae64b79

View File

@@ -9,6 +9,8 @@ from pointcloudset import Dataset
from rich.progress import track from rich.progress import track
from pandas import DataFrame from pandas import DataFrame
from PIL import Image from PIL import Image
from math import pi
from typing import Optional
import matplotlib import matplotlib
import numpy as np import numpy as np
@@ -27,22 +29,6 @@ from util import (
) )
def fill_sparse_data(data: DataFrame, horizontal_resolution: int) -> DataFrame:
complete_original_ids = DataFrame(
{
"original_id": np.arange(
0,
(data["ring"].max() + 1) * horizontal_resolution,
dtype=np.uint32,
)
}
)
data = complete_original_ids.merge(data, on="original_id", how="left")
data["ring"] = data["original_id"] // horizontal_resolution
data["horizontal_position"] = data["original_id"] % horizontal_resolution
return data
def crop_lidar_data_to_roi( def crop_lidar_data_to_roi(
data: DataFrame, data: DataFrame,
roi_angle_start: float, roi_angle_start: float,
@@ -65,32 +51,6 @@ def crop_lidar_data_to_roi(
return cropped_data, roi_index_width return cropped_data, roi_index_width
def create_projection_data(
dataset: Dataset,
horizontal_resolution: int,
roi_angle_start: float,
roi_angle_width: float,
) -> list[Path]:
converted_lidar_frames = []
for i, pc in track(
enumerate(dataset, 1), description="Rendering images...", total=len(dataset)
):
lidar_data = fill_sparse_data(pc.data, horizontal_resolution)
lidar_data["normalized_range"] = 1 / np.sqrt(
lidar_data["x"] ** 2 + lidar_data["y"] ** 2 + lidar_data["z"] ** 2
)
lidar_data = lidar_data.pivot(
index="ring", columns="horizontal_position", values="normalized_range"
)
lidar_data, _ = crop_lidar_data_to_roi(
lidar_data, roi_angle_start, roi_angle_width, horizontal_resolution
)
converted_lidar_frames.append(lidar_data.to_numpy())
return np.stack(converted_lidar_frames, axis=0)
def create_2d_projection( def create_2d_projection(
df: DataFrame, df: DataFrame,
output_file_path: Path, output_file_path: Path,
@@ -123,7 +83,7 @@ def create_2d_projection(
tmp_file_path.unlink() tmp_file_path.unlink()
def render_2d_images( def create_projection_data(
dataset: Dataset, dataset: Dataset,
output_path: Path, output_path: Path,
colormap_name: str, colormap_name: str,
@@ -134,37 +94,91 @@ def render_2d_images(
horizontal_scale: int, horizontal_scale: int,
roi_angle_start: float, roi_angle_start: float,
roi_angle_width: float, roi_angle_width: float,
) -> list[Path]: render_images: bool,
) -> (np.ndarray, Optional[list[Path]]):
rendered_images = [] rendered_images = []
converted_lidar_frames = []
for i, pc in track( for i, pc in track(
enumerate(dataset, 1), description="Rendering images...", total=len(dataset) enumerate(dataset, 1), description="Creating projections...", total=len(dataset)
): ):
image_data = fill_sparse_data(pc.data, horizontal_resolution).pivot( vertical_resolution = int(pc.data["ring"].max() + 1)
index="ring", columns="horizontal_position", values="range"
# Angle calculation implementation
# projected_data = pc.data.copy()
# projected_data["arctan"] = np.arctan2(projected_data["y"], projected_data["x"])
# projected_data["arctan_normalized"] = 0.5 * (projected_data["arctan"] / pi + 1.0)
# projected_data["arctan_scaled"] = projected_data["arctan_normalized"] * horizontal_resolution
# #projected_data["horizontal_position"] = np.floor(projected_data["arctan_scaled"])
# projected_data["horizontal_position"] = np.round(projected_data["arctan_scaled"])
# projected_data["normalized_range"] = 1 / np.sqrt(
# projected_data["x"] ** 2 + projected_data["y"] ** 2 + projected_data["z"] ** 2
# )
# duplicates = projected_data[projected_data.duplicated(subset=['ring', 'horizontal_position'], keep=False)].sort_values(by=['ring', 'horizontal_position'])
# sorted = projected_data.sort_values(by=['ring', 'horizontal_position'])
# FIXME: following pivot fails due to duplicates in the data, some points (x, y) are mapped to the same pixel in the projection, have to decide how to handles
# these cases
# projected_image_data = projected_data.pivot(
# index="ring", columns="horizontal_position", values="normalized_range"
# )
# projected_image_data = projected_image_data.reindex(columns=range(horizontal_resolution), fill_value=0)
# projected_image_data, output_horizontal_resolution = crop_lidar_data_to_roi(
# projected_image_data, roi_angle_start, roi_angle_width, horizontal_resolution
# )
# create_2d_projection(
# projected_image_data,
# output_path / f"frame_{i:04d}_projection.png",
# output_path / "tmp.png",
# colormap_name,
# missing_data_color,
# reverse_colormap,
# horizontal_resolution=output_horizontal_resolution * horizontal_scale,
# vertical_resolution=vertical_resolution * vertical_scale,
# )
lidar_data = pc.data.copy()
lidar_data["horizontal_position"] = (
lidar_data["original_id"] % horizontal_resolution
)
lidar_data["normalized_range"] = 1 / np.sqrt(
lidar_data["x"] ** 2 + lidar_data["y"] ** 2 + lidar_data["z"] ** 2
)
lidar_data = lidar_data.pivot(
index="ring", columns="horizontal_position", values="normalized_range"
)
lidar_data = lidar_data.reindex(
columns=range(horizontal_resolution), fill_value=0
)
lidar_data, output_horizontal_resolution = crop_lidar_data_to_roi(
lidar_data, roi_angle_start, roi_angle_width, horizontal_resolution
) )
image_data, output_horizontal_resolution = crop_lidar_data_to_roi( converted_lidar_frames.append(lidar_data.to_numpy())
image_data, roi_angle_start, roi_angle_width, horizontal_resolution if render_images:
)
normalized_data = (image_data - image_data.min().min()) / (
image_data.max().max() - image_data.min().min()
)
image_path = create_2d_projection( image_path = create_2d_projection(
normalized_data, lidar_data,
output_path / f"frame_{i:04d}.png", output_path / f"frame_{i:04d}.png",
output_path / "tmp.png", output_path / "tmp.png",
colormap_name, colormap_name,
missing_data_color, missing_data_color,
reverse_colormap, reverse_colormap,
horizontal_resolution=output_horizontal_resolution * horizontal_scale, horizontal_resolution=output_horizontal_resolution * horizontal_scale,
vertical_resolution=(pc.data["ring"].max() + 1) * vertical_scale, vertical_resolution=vertical_resolution * vertical_scale,
) )
rendered_images.append(image_path) rendered_images.append(image_path)
return rendered_images projection_data = np.stack(converted_lidar_frames, axis=0)
if render_images:
return rendered_images, projection_data
else:
return projection_data
def main() -> int: def main() -> int:
@@ -196,28 +210,24 @@ def main() -> int:
help="path rendered frames should be written to", help="path rendered frames should be written to",
) )
parser.add_argument( parser.add_argument(
"--output-images", "--output-no-images",
type=bool, action="store_true",
default=True, help="do not create individual image files for the projection frames",
help="if rendered frames should be outputted as images",
) )
parser.add_argument( parser.add_argument(
"--output-video", "--output-no-video",
type=bool, action="store_true",
default=True, help="do not create a video file from the projection frames",
help="if rendered frames should be outputted as a video",
) )
parser.add_argument( parser.add_argument(
"--output-pickle", "--output-no-numpy",
default=True, action="store_true",
type=bool, help="do not create a numpy file with the projection data",
help="if the processed data should be saved as a pickle file",
) )
parser.add_argument( parser.add_argument(
"--skip-existing", "--force-generation",
default=True, action="store_true",
type=bool, help="if used will force the generation even if output already exists",
help="if true will skip rendering existing files",
) )
parser.add_argument( parser.add_argument(
"--colormap-name", "--colormap-name",
@@ -273,13 +283,29 @@ def main() -> int:
output_path = args.output_path / args.input_experiment_path.stem output_path = args.output_path / args.input_experiment_path.stem
output_path.mkdir(parents=True, exist_ok=True) output_path.mkdir(parents=True, exist_ok=True)
parser.write_config_file(
parser.parse_known_args()[0],
output_file_paths=[(output_path / "config.yaml").as_posix()],
)
# Create temporary folder for images, if outputting images we use the output folder itself as temp folder # Create temporary folder for images, if outputting images we use the output folder itself as temp folder
tmp_path = output_path / "frames" if args.output_images else output_path / "tmp" tmp_path = output_path / "tmp" if args.output_no_images else output_path / "frames"
tmp_path.mkdir(parents=True, exist_ok=True) tmp_path.mkdir(parents=True, exist_ok=True)
dataset = load_dataset(args.input_experiment_path, args.pointcloud_topic) dataset = load_dataset(args.input_experiment_path, args.pointcloud_topic)
images = render_2d_images( images = []
if not args.output_no_images or not args.output_no_video:
if not args.force_generation and all(
(tmp_path / f"frame_{i:04d}.png").exists()
for i in range(1, len(dataset) + 1)
):
print(
f"Skipping image generation for {args.input_experiment_path} as all frames already exist"
)
else:
projection_data, images = create_projection_data(
dataset, dataset,
tmp_path, tmp_path,
args.colormap_name, args.colormap_name,
@@ -290,21 +316,47 @@ def main() -> int:
args.horizontal_scale, args.horizontal_scale,
args.roi_angle_start, args.roi_angle_start,
args.roi_angle_width, args.roi_angle_width,
render_images=True,
) )
if args.output_pickle: output_numpy_path = (output_path / args.input_experiment_path.stem).with_suffix(
output_pickle_path = ( ".npy"
output_path / args.input_experiment_path.stem )
).with_suffix(".pkl") if not args.output_no_numpy:
processed_range_data = create_projection_data( if not args.force_generation and output_numpy_path.exists():
print(
f"Skipping numpy file generation for {args.input_experiment_path} as {output_numpy_path} already exists"
)
else:
if args.output_no_images:
projection_data, _ = create_projection_data(
dataset, dataset,
tmp_path,
args.colormap_name,
args.missing_data_color,
args.reverse_colormap,
args.horizontal_resolution, args.horizontal_resolution,
args.vertical_scale,
args.horizontal_scale,
args.roi_angle_start, args.roi_angle_start,
args.roi_angle_width, args.roi_angle_width,
render_images=False,
) )
processed_range_data.dump(output_pickle_path)
if args.output_video: # processed_range_data.dump(output_numpy_path)
np.save(output_numpy_path, projection_data, fix_imports=False)
if not args.output_no_video:
if (
not args.force_generation
and (output_path / args.input_experiment_path.stem)
.with_suffix(".mp4")
.exists()
):
print(
f"Skipping video generation for {args.input_experiment_path} as {output_path / args.input_experiment_path.stem}.mp4 already exists"
)
else:
input_images_pattern = f"{tmp_path}/frame_%04d.png" input_images_pattern = f"{tmp_path}/frame_%04d.png"
create_video_from_images( create_video_from_images(
input_images_pattern, input_images_pattern,
@@ -312,7 +364,7 @@ def main() -> int:
calculate_average_frame_rate(dataset), calculate_average_frame_rate(dataset),
) )
if not args.output_images: if args.output_no_images:
for image in images: for image in images:
image.unlink() image.unlink()
tmp_path.rmdir() tmp_path.rmdir()