from math import pi from multiprocessing import Pool from pathlib import Path from sys import exit from typing import Optional, Tuple import matplotlib import numpy as np from configargparse import ( ArgParser, ArgumentDefaultsRawHelpFormatter, YAMLConfigFileParser, ) from pandas import DataFrame from pathvalidate import sanitize_filename from PIL import Image from pointcloudset import Dataset from rich.progress import track from rosbags.highlevel import AnyReader matplotlib.use("Agg") import matplotlib.pyplot as plt from util import ( angle, angle_width, calculate_average_frame_rate, create_video_from_images, existing_path, get_colormap_with_special_missing_color, load_dataset, positive_int, ) def save_image_topics( bag_file_path: Path, output_folder_path: Path, topic_names: list[str] ) -> list[Path]: with AnyReader([bag_file_path]) as reader: topic_paths = [] for topic_name in topic_names: connections = [x for x in reader.connections if x.topic == topic_name] frame_count = 0 topic_output_folder_path = output_folder_path / sanitize_filename( topic_name ) topic_output_folder_path.mkdir(exist_ok=True, parents=True) topic_paths.append(topic_output_folder_path) for connection, timestamp, rawdata in reader.messages( connections=connections ): img_msg = reader.deserialize(rawdata, connection.msgtype) img_data = np.frombuffer( img_msg.data, dtype=np.uint8 if "8" in img_msg.encoding else np.uint16, ).reshape((img_msg.height, img_msg.width)) img = Image.fromarray(img_data, mode="L") frame_count += 1 img.save(topic_output_folder_path / f"frame_{frame_count:04d}.png") return topic_paths def crop_projection_data_to_roi( data: DataFrame, roi_angle_start: float, roi_angle_width: float, horizontal_resolution: int, ) -> tuple[DataFrame, int]: if roi_angle_width == 360: return data, horizontal_resolution roi_index_start = int(horizontal_resolution / 360 * roi_angle_start) roi_index_width = int(horizontal_resolution / 360 * roi_angle_width) roi_index_end = roi_index_start + roi_index_width if roi_index_end < horizontal_resolution: cropped_data = data.iloc[:, roi_index_start:roi_index_end] else: roi_index_end = roi_index_end - horizontal_resolution cropped_data = data.iloc[:, roi_index_end:roi_index_start] return cropped_data, roi_index_width def create_2d_projection( df: DataFrame, output_file_path: Path, tmp_file_path: Path, colormap_name: str, missing_data_color: str, reverse_colormap: bool, horizontal_resolution: int, vertical_resolution: int, ): fig, ax = plt.subplots( figsize=(float(horizontal_resolution) / 100, float(vertical_resolution) / 100) ) ax.imshow( df, cmap=get_colormap_with_special_missing_color( colormap_name, missing_data_color, reverse_colormap ), aspect="auto", ) ax.axis("off") fig.subplots_adjust(left=0, right=1, top=1, bottom=0) plt.savefig(tmp_file_path, dpi=100, bbox_inches="tight", pad_inches=0) plt.close() img = Image.open(tmp_file_path) img_resized = img.resize( (horizontal_resolution, vertical_resolution), Image.LANCZOS ) img_resized.save(output_file_path) tmp_file_path.unlink() def process_frame(args) -> Tuple[int, np.ndarray, Optional[Path]]: ( i, pc, output_path, colormap_name, missing_data_color, reverse_colormap, vertical_resolution, horizontal_resolution, vertical_scale, horizontal_scale, roi_angle_start, roi_angle_width, render_images, ) = args lidar_data = pc.data.copy() lidar_data["horizontal_position"] = ( lidar_data["original_id"] % horizontal_resolution ) # lidar_data["horizontal_position_yaw_f"] = ( # 0.5 # * horizontal_resolution # * (np.arctan2(lidar_data["y"], lidar_data["x"]) / pi + 1.0) # ) # lidar_data["horizontal_position_yaw"] = np.floor( # lidar_data["horizontal_position_yaw_f"] # ) lidar_data["vertical_position"] = np.floor( lidar_data["original_id"] / horizontal_resolution ) # fov = 32 * pi / 180 # fov_down = 17 * pi / 180 # fov = 31.76 * pi / 180 # fov_down = 17.3 * pi / 180 # lidar_data["vertical_angle"] = np.arcsin( # lidar_data["z"] # / np.sqrt(lidar_data["x"] ** 2 + lidar_data["y"] ** 2 + lidar_data["z"] ** 2) # ) # lidar_data["vertical_angle_degree"] = lidar_data["vertical_angle"] * 180 / pi # lidar_data["vertical_position_pitch_f"] = vertical_resolution * ( # 1 - ((lidar_data["vertical_angle"] + fov_down) / fov) # ) # lidar_data["vertical_position_pitch"] = np.floor( # lidar_data["vertical_position_pitch_f"] # ) # duplicates = lidar_data[ # lidar_data.duplicated( # subset=["vertical_position_pitch", "horizontal_position_yaw"], # keep=False, # ) # ].sort_values(by=["vertical_position_pitch", "horizontal_position_yaw"]) lidar_data["normalized_range"] = 1 / np.sqrt( lidar_data["x"] ** 2 + lidar_data["y"] ** 2 + lidar_data["z"] ** 2 ) # check for zero values in normalized_range and output the x y and z values for them zero_range_indices = lidar_data[lidar_data["normalized_range"] == 0].index if not zero_range_indices.empty: print( f"Warning: Found {len(zero_range_indices)} points with zero normalized range at indices: {zero_range_indices.tolist()}" ) print( "X, Y, Z values for these points:", lidar_data.loc[zero_range_indices, ["x", "y", "z"]].to_numpy(), ) # Set zero values to NaN to avoid issues with training lidar_data.loc[zero_range_indices, "normalized_range"] = np.nan projection_data = lidar_data.pivot( index="vertical_position", columns="horizontal_position", values="normalized_range", ) projection_data = projection_data.reindex( columns=range(horizontal_resolution), fill_value=np.nan ) projection_data = projection_data.reindex( index=range(vertical_resolution), fill_value=np.nan ) projection_data, output_horizontal_resolution = crop_projection_data_to_roi( projection_data, roi_angle_start, roi_angle_width, horizontal_resolution ) if render_images: image_path = create_2d_projection( projection_data, output_path / f"frame_{i:04d}.png", output_path / f"tmp_{i:04d}.png", colormap_name, missing_data_color, reverse_colormap, horizontal_resolution=output_horizontal_resolution * horizontal_scale, vertical_resolution=vertical_resolution * vertical_scale, ) else: image_path = None return ( i, projection_data.to_numpy(), image_path, # lidar_data["vertical_position_pitch_f"].min(), # lidar_data["vertical_position_pitch_f"].max(), ) # Adjusted to use a generator for args_list def create_projection_data( dataset: Dataset, output_path: Path, colormap_name: str, missing_data_color: str, reverse_colormap: bool, vertical_resolution: int, horizontal_resolution: int, vertical_scale: int, horizontal_scale: int, roi_angle_start: float, roi_angle_width: float, render_images: bool, ) -> Tuple[np.ndarray, Optional[list[Path]]]: rendered_images = [] converted_lidar_frames = [] # Generator for args_list # def args_generator(): # for i, pc in enumerate(dataset, 1): # yield ( # i, # pc, # output_path, # colormap_name, # missing_data_color, # reverse_colormap, # vertical_resolution, # horizontal_resolution, # vertical_scale, # horizontal_scale, # roi_angle_start, # roi_angle_width, # render_images, # ) # results = [] # with Pool() as pool: # results_gen = pool.imap(process_frame, args_generator()) # for result in track( # results_gen, description="Processing...", total=len(dataset) # ): # results.append(result) # results.sort(key=lambda x: x[0]) for i, pc in track( enumerate(dataset, 1), description="Processing...", total=len(dataset) ): results = process_frame( ( i, pc, output_path, colormap_name, missing_data_color, reverse_colormap, vertical_resolution, horizontal_resolution, vertical_scale, horizontal_scale, roi_angle_start, roi_angle_width, render_images, ) ) converted_lidar_frames.append(results[1]) if render_images: rendered_images.append(results[2]) # min_all = 100 # max_all = 0 # for _, data, img_path, min_frame, max_frame in results: # converted_lidar_frames.append(data) # if img_path: # rendered_images.append(img_path) # if min_frame < min_all: # min_all = min_frame # if max_frame > max_all: # max_all = max_frame # print(f"{min_all=}, {max_all=}") projection_data = np.stack(converted_lidar_frames, axis=0) if render_images: return projection_data, rendered_images else: return projection_data, None def main() -> int: parser = ArgParser( config_file_parser_class=YAMLConfigFileParser, default_config_files=["render2d_config.yaml"], formatter_class=ArgumentDefaultsRawHelpFormatter, description="Render a 2d projection of a point cloud", ) parser.add_argument( "--render-config-file", is_config_file=True, help="yaml config file path" ) parser.add_argument( "--input-experiment-path", required=True, type=existing_path, help="path to experiment. (directly to bag file, to parent folder for mcap)", ) parser.add_argument( "--pointcloud-topic", default="/ouster/points", type=str, help="topic in the ros/mcap bag file containing the point cloud data", ) parser.add_argument( "--image-topics", default=[], nargs="+", help="topics in the ros/mcap bag file containing the image data", ) parser.add_argument( "--output-path", default=Path("./output"), type=Path, help="path rendered frames should be written to", ) parser.add_argument( "--output-no-images", action="store_true", help="do not create individual image files for the projection frames", ) parser.add_argument( "--output-no-video", action="store_true", help="do not create a video file from the projection frames", ) parser.add_argument( "--output-no-numpy", action="store_true", help="do not create a numpy file with the projection data", ) parser.add_argument( "--force-generation", action="store_true", help="if used will force the generation even if output already exists", ) parser.add_argument( "--colormap-name", default="viridis", type=str, help="name of matplotlib colormap to be used", ) parser.add_argument( "--missing-data-color", default="black", type=str, help="name of color to be used for missing data", ) parser.add_argument( "--reverse-colormap", default=True, type=bool, help="if colormap should be reversed", ) parser.add_argument( "--vertical-resolution", default=32, type=positive_int, help="number of vertical lidar data point rows", ) parser.add_argument( "--horizontal-resolution", default=2048, type=positive_int, help="number of horizontal lidar data point columns", ) parser.add_argument( "--vertical-scale", default=1, type=positive_int, help="multiplier for vertical scale, for better visualization", ) parser.add_argument( "--horizontal-scale", default=1, type=positive_int, help="multiplier for horizontal scale, for better visualization", ) parser.add_argument( "--roi-angle-start", default=0, type=angle, help="angle where roi starts", ) parser.add_argument( "--roi-angle-width", default=360, type=angle_width, help="width of roi in degrees", ) args = parser.parse_args() output_path = args.output_path / args.input_experiment_path.stem output_path.mkdir(parents=True, exist_ok=True) parser.write_config_file( parser.parse_known_args()[0], output_file_paths=[(output_path / "config.yaml").as_posix()], ) # Create temporary folder for images, if outputting images we use the output folder itself as temp folder tmp_path = output_path / "tmp" if args.output_no_images else output_path / "frames" tmp_path.mkdir(parents=True, exist_ok=True) dataset = load_dataset(args.input_experiment_path, args.pointcloud_topic) images = [] topic_paths = [] if not args.output_no_images or not args.output_no_video: if not args.force_generation and all( (tmp_path / f"frame_{i:04d}.png").exists() for i in range(1, len(dataset) + 1) ): print( f"Skipping image generation for {args.input_experiment_path} as all frames already exist" ) else: projection_data, images = create_projection_data( dataset, tmp_path, args.colormap_name, args.missing_data_color, args.reverse_colormap, args.vertical_resolution, args.horizontal_resolution, args.vertical_scale, args.horizontal_scale, args.roi_angle_start, args.roi_angle_width, render_images=True, ) if args.image_topics: topic_paths = save_image_topics( args.input_experiment_path, output_path, args.image_topics ) output_numpy_path = (output_path / args.input_experiment_path.stem).with_suffix( ".npy" ) if not args.output_no_numpy: if not args.force_generation and output_numpy_path.exists(): print( f"Skipping numpy file generation for {args.input_experiment_path} as {output_numpy_path} already exists" ) else: if args.output_no_images: projection_data, _ = create_projection_data( dataset, tmp_path, args.colormap_name, args.missing_data_color, args.reverse_colormap, args.vertical_resolution, args.horizontal_resolution, args.vertical_scale, args.horizontal_scale, args.roi_angle_start, args.roi_angle_width, render_images=False, ) # processed_range_data.dump(output_numpy_path) np.save(output_numpy_path, projection_data, fix_imports=False) if not args.output_no_video: if ( not args.force_generation and (output_path / args.input_experiment_path.stem) .with_suffix(".mp4") .exists() ): print( f"Skipping video generation for {args.input_experiment_path} as {output_path / args.input_experiment_path.stem}.mp4 already exists" ) else: frame_rate = calculate_average_frame_rate(dataset) input_images_pattern = f"{tmp_path}/frame_%04d.png" create_video_from_images( input_images_pattern, (output_path / args.input_experiment_path.stem).with_suffix(".mp4"), frame_rate, ) for topic_path in topic_paths: input_images_pattern = f"{topic_path}/frame_%04d.png" create_video_from_images( input_images_pattern, (output_path / topic_path.stem).with_suffix(".mp4"), frame_rate, ) if args.output_no_images: for image in images: image.unlink() tmp_path.rmdir() return 0 if __name__ == "__main__": exit(main())