Files
mt/tools/render2d.py
2025-03-14 18:02:23 +01:00

530 lines
16 KiB
Python

from math import pi
from multiprocessing import Pool
from pathlib import Path
from sys import exit
from typing import Optional, Tuple
import matplotlib
import numpy as np
from configargparse import (
ArgParser,
ArgumentDefaultsRawHelpFormatter,
YAMLConfigFileParser,
)
from pandas import DataFrame
from pathvalidate import sanitize_filename
from PIL import Image
from pointcloudset import Dataset
from rich.progress import track
from rosbags.highlevel import AnyReader
matplotlib.use("Agg")
import matplotlib.pyplot as plt
from util import (
angle,
angle_width,
calculate_average_frame_rate,
create_video_from_images,
existing_path,
get_colormap_with_special_missing_color,
load_dataset,
positive_int,
)
def save_image_topics(
bag_file_path: Path, output_folder_path: Path, topic_names: list[str]
) -> list[Path]:
with AnyReader([bag_file_path]) as reader:
topic_paths = []
for topic_name in topic_names:
connections = [x for x in reader.connections if x.topic == topic_name]
frame_count = 0
topic_output_folder_path = output_folder_path / sanitize_filename(
topic_name
)
topic_output_folder_path.mkdir(exist_ok=True, parents=True)
topic_paths.append(topic_output_folder_path)
for connection, timestamp, rawdata in reader.messages(
connections=connections
):
img_msg = reader.deserialize(rawdata, connection.msgtype)
img_data = np.frombuffer(
img_msg.data,
dtype=np.uint8 if "8" in img_msg.encoding else np.uint16,
).reshape((img_msg.height, img_msg.width))
img = Image.fromarray(img_data, mode="L")
frame_count += 1
img.save(topic_output_folder_path / f"frame_{frame_count:04d}.png")
return topic_paths
def crop_projection_data_to_roi(
data: DataFrame,
roi_angle_start: float,
roi_angle_width: float,
horizontal_resolution: int,
) -> tuple[DataFrame, int]:
if roi_angle_width == 360:
return data, horizontal_resolution
roi_index_start = int(horizontal_resolution / 360 * roi_angle_start)
roi_index_width = int(horizontal_resolution / 360 * roi_angle_width)
roi_index_end = roi_index_start + roi_index_width
if roi_index_end < horizontal_resolution:
cropped_data = data.iloc[:, roi_index_start:roi_index_end]
else:
roi_index_end = roi_index_end - horizontal_resolution
cropped_data = data.iloc[:, roi_index_end:roi_index_start]
return cropped_data, roi_index_width
def create_2d_projection(
df: DataFrame,
output_file_path: Path,
tmp_file_path: Path,
colormap_name: str,
missing_data_color: str,
reverse_colormap: bool,
horizontal_resolution: int,
vertical_resolution: int,
):
fig, ax = plt.subplots(
figsize=(float(horizontal_resolution) / 100, float(vertical_resolution) / 100)
)
ax.imshow(
df,
cmap=get_colormap_with_special_missing_color(
colormap_name, missing_data_color, reverse_colormap
),
aspect="auto",
)
ax.axis("off")
fig.subplots_adjust(left=0, right=1, top=1, bottom=0)
plt.savefig(tmp_file_path, dpi=100, bbox_inches="tight", pad_inches=0)
plt.close()
img = Image.open(tmp_file_path)
img_resized = img.resize(
(horizontal_resolution, vertical_resolution), Image.LANCZOS
)
img_resized.save(output_file_path)
tmp_file_path.unlink()
def process_frame(args) -> Tuple[int, np.ndarray, Optional[Path]]:
(
i,
pc,
output_path,
colormap_name,
missing_data_color,
reverse_colormap,
vertical_resolution,
horizontal_resolution,
vertical_scale,
horizontal_scale,
roi_angle_start,
roi_angle_width,
render_images,
) = args
lidar_data = pc.data.copy()
lidar_data["horizontal_position"] = (
lidar_data["original_id"] % horizontal_resolution
)
# lidar_data["horizontal_position_yaw_f"] = (
# 0.5
# * horizontal_resolution
# * (np.arctan2(lidar_data["y"], lidar_data["x"]) / pi + 1.0)
# )
# lidar_data["horizontal_position_yaw"] = np.floor(
# lidar_data["horizontal_position_yaw_f"]
# )
lidar_data["vertical_position"] = np.floor(
lidar_data["original_id"] / horizontal_resolution
)
# fov = 32 * pi / 180
# fov_down = 17 * pi / 180
# fov = 31.76 * pi / 180
# fov_down = 17.3 * pi / 180
# lidar_data["vertical_angle"] = np.arcsin(
# lidar_data["z"]
# / np.sqrt(lidar_data["x"] ** 2 + lidar_data["y"] ** 2 + lidar_data["z"] ** 2)
# )
# lidar_data["vertical_angle_degree"] = lidar_data["vertical_angle"] * 180 / pi
# lidar_data["vertical_position_pitch_f"] = vertical_resolution * (
# 1 - ((lidar_data["vertical_angle"] + fov_down) / fov)
# )
# lidar_data["vertical_position_pitch"] = np.floor(
# lidar_data["vertical_position_pitch_f"]
# )
# duplicates = lidar_data[
# lidar_data.duplicated(
# subset=["vertical_position_pitch", "horizontal_position_yaw"],
# keep=False,
# )
# ].sort_values(by=["vertical_position_pitch", "horizontal_position_yaw"])
lidar_data["normalized_range"] = 1 / np.sqrt(
lidar_data["x"] ** 2 + lidar_data["y"] ** 2 + lidar_data["z"] ** 2
)
projection_data = lidar_data.pivot(
index="vertical_position",
columns="horizontal_position",
values="normalized_range",
)
projection_data = projection_data.reindex(
columns=range(horizontal_resolution), fill_value=0
)
projection_data = projection_data.reindex(
index=range(vertical_resolution), fill_value=0
)
projection_data, output_horizontal_resolution = crop_projection_data_to_roi(
projection_data, roi_angle_start, roi_angle_width, horizontal_resolution
)
if render_images:
image_path = create_2d_projection(
projection_data,
output_path / f"frame_{i:04d}.png",
output_path / f"tmp_{i:04d}.png",
colormap_name,
missing_data_color,
reverse_colormap,
horizontal_resolution=output_horizontal_resolution * horizontal_scale,
vertical_resolution=vertical_resolution * vertical_scale,
)
else:
image_path = None
return (
i,
projection_data.to_numpy(),
image_path,
# lidar_data["vertical_position_pitch_f"].min(),
# lidar_data["vertical_position_pitch_f"].max(),
)
# Adjusted to use a generator for args_list
def create_projection_data(
dataset: Dataset,
output_path: Path,
colormap_name: str,
missing_data_color: str,
reverse_colormap: bool,
vertical_resolution: int,
horizontal_resolution: int,
vertical_scale: int,
horizontal_scale: int,
roi_angle_start: float,
roi_angle_width: float,
render_images: bool,
) -> Tuple[np.ndarray, Optional[list[Path]]]:
rendered_images = []
converted_lidar_frames = []
# Generator for args_list
# def args_generator():
# for i, pc in enumerate(dataset, 1):
# yield (
# i,
# pc,
# output_path,
# colormap_name,
# missing_data_color,
# reverse_colormap,
# vertical_resolution,
# horizontal_resolution,
# vertical_scale,
# horizontal_scale,
# roi_angle_start,
# roi_angle_width,
# render_images,
# )
# results = []
# with Pool() as pool:
# results_gen = pool.imap(process_frame, args_generator())
# for result in track(
# results_gen, description="Processing...", total=len(dataset)
# ):
# results.append(result)
# results.sort(key=lambda x: x[0])
for i, pc in track(
enumerate(dataset, 1), description="Processing...", total=len(dataset)
):
results = process_frame(
(
i,
pc,
output_path,
colormap_name,
missing_data_color,
reverse_colormap,
vertical_resolution,
horizontal_resolution,
vertical_scale,
horizontal_scale,
roi_angle_start,
roi_angle_width,
render_images,
)
)
converted_lidar_frames.append(results[1])
if render_images:
rendered_images.append(results[2])
# min_all = 100
# max_all = 0
# for _, data, img_path, min_frame, max_frame in results:
# converted_lidar_frames.append(data)
# if img_path:
# rendered_images.append(img_path)
# if min_frame < min_all:
# min_all = min_frame
# if max_frame > max_all:
# max_all = max_frame
# print(f"{min_all=}, {max_all=}")
projection_data = np.stack(converted_lidar_frames, axis=0)
if render_images:
return projection_data, rendered_images
else:
return projection_data, None
def main() -> int:
parser = ArgParser(
config_file_parser_class=YAMLConfigFileParser,
default_config_files=["render2d_config.yaml"],
formatter_class=ArgumentDefaultsRawHelpFormatter,
description="Render a 2d projection of a point cloud",
)
parser.add_argument(
"--render-config-file", is_config_file=True, help="yaml config file path"
)
parser.add_argument(
"--input-experiment-path",
required=True,
type=existing_path,
help="path to experiment. (directly to bag file, to parent folder for mcap)",
)
parser.add_argument(
"--pointcloud-topic",
default="/ouster/points",
type=str,
help="topic in the ros/mcap bag file containing the point cloud data",
)
parser.add_argument(
"--image-topics",
default=[],
nargs="+",
help="topics in the ros/mcap bag file containing the image data",
)
parser.add_argument(
"--output-path",
default=Path("./output"),
type=Path,
help="path rendered frames should be written to",
)
parser.add_argument(
"--output-no-images",
action="store_true",
help="do not create individual image files for the projection frames",
)
parser.add_argument(
"--output-no-video",
action="store_true",
help="do not create a video file from the projection frames",
)
parser.add_argument(
"--output-no-numpy",
action="store_true",
help="do not create a numpy file with the projection data",
)
parser.add_argument(
"--force-generation",
action="store_true",
help="if used will force the generation even if output already exists",
)
parser.add_argument(
"--colormap-name",
default="viridis",
type=str,
help="name of matplotlib colormap to be used",
)
parser.add_argument(
"--missing-data-color",
default="black",
type=str,
help="name of color to be used for missing data",
)
parser.add_argument(
"--reverse-colormap",
default=True,
type=bool,
help="if colormap should be reversed",
)
parser.add_argument(
"--vertical-resolution",
default=32,
type=positive_int,
help="number of vertical lidar data point rows",
)
parser.add_argument(
"--horizontal-resolution",
default=2048,
type=positive_int,
help="number of horizontal lidar data point columns",
)
parser.add_argument(
"--vertical-scale",
default=1,
type=positive_int,
help="multiplier for vertical scale, for better visualization",
)
parser.add_argument(
"--horizontal-scale",
default=1,
type=positive_int,
help="multiplier for horizontal scale, for better visualization",
)
parser.add_argument(
"--roi-angle-start",
default=0,
type=angle,
help="angle where roi starts",
)
parser.add_argument(
"--roi-angle-width",
default=360,
type=angle_width,
help="width of roi in degrees",
)
args = parser.parse_args()
output_path = args.output_path / args.input_experiment_path.stem
output_path.mkdir(parents=True, exist_ok=True)
parser.write_config_file(
parser.parse_known_args()[0],
output_file_paths=[(output_path / "config.yaml").as_posix()],
)
# Create temporary folder for images, if outputting images we use the output folder itself as temp folder
tmp_path = output_path / "tmp" if args.output_no_images else output_path / "frames"
tmp_path.mkdir(parents=True, exist_ok=True)
dataset = load_dataset(args.input_experiment_path, args.pointcloud_topic)
images = []
topic_paths = []
if not args.output_no_images or not args.output_no_video:
if not args.force_generation and all(
(tmp_path / f"frame_{i:04d}.png").exists()
for i in range(1, len(dataset) + 1)
):
print(
f"Skipping image generation for {args.input_experiment_path} as all frames already exist"
)
else:
projection_data, images = create_projection_data(
dataset,
tmp_path,
args.colormap_name,
args.missing_data_color,
args.reverse_colormap,
args.vertical_resolution,
args.horizontal_resolution,
args.vertical_scale,
args.horizontal_scale,
args.roi_angle_start,
args.roi_angle_width,
render_images=True,
)
if args.image_topics:
topic_paths = save_image_topics(
args.input_experiment_path, output_path, args.image_topics
)
output_numpy_path = (output_path / args.input_experiment_path.stem).with_suffix(
".npy"
)
if not args.output_no_numpy:
if not args.force_generation and output_numpy_path.exists():
print(
f"Skipping numpy file generation for {args.input_experiment_path} as {output_numpy_path} already exists"
)
else:
if args.output_no_images:
projection_data, _ = create_projection_data(
dataset,
tmp_path,
args.colormap_name,
args.missing_data_color,
args.reverse_colormap,
args.vertical_resolution,
args.horizontal_resolution,
args.vertical_scale,
args.horizontal_scale,
args.roi_angle_start,
args.roi_angle_width,
render_images=False,
)
# processed_range_data.dump(output_numpy_path)
np.save(output_numpy_path, projection_data, fix_imports=False)
if not args.output_no_video:
if (
not args.force_generation
and (output_path / args.input_experiment_path.stem)
.with_suffix(".mp4")
.exists()
):
print(
f"Skipping video generation for {args.input_experiment_path} as {output_path / args.input_experiment_path.stem}.mp4 already exists"
)
else:
frame_rate = calculate_average_frame_rate(dataset)
input_images_pattern = f"{tmp_path}/frame_%04d.png"
create_video_from_images(
input_images_pattern,
(output_path / args.input_experiment_path.stem).with_suffix(".mp4"),
frame_rate,
)
for topic_path in topic_paths:
input_images_pattern = f"{topic_path}/frame_%04d.png"
create_video_from_images(
input_images_pattern,
(output_path / topic_path.stem).with_suffix(".mp4"),
frame_rate,
)
if args.output_no_images:
for image in images:
image.unlink()
tmp_path.rmdir()
return 0
if __name__ == "__main__":
exit(main())