from pathlib import Path from sys import exit import numpy as np from configargparse import ( ArgParser, ArgumentDefaultsRawHelpFormatter, YAMLConfigFileParser, ) from numpy.lib import recfunctions as rfn from rich.progress import Progress from rosbags.highlevel import AnyReader from util import existing_path # Mapping of PointField datatypes to NumPy dtypes POINTFIELD_DATATYPES = { 1: np.int8, # INT8 2: np.uint8, # UINT8 3: np.int16, # INT16 4: np.uint16, # UINT16 5: np.int32, # INT32 6: np.uint32, # UINT32 7: np.float32, # FLOAT32 8: np.float64, # FLOAT64 } def read_pointcloud(msg): # Build the dtype dynamically from the fields dtype_fields = {} column_names = [] current_offset = 0 for field in msg.fields: np_dtype = POINTFIELD_DATATYPES.get(field.datatype) if np_dtype is None: raise ValueError( f"Unsupported datatype {field.datatype} for field {field.name}" ) if field.offset > current_offset: gap_size = field.offset - current_offset gap_field_name = f"unused_{current_offset}" dtype_fields[gap_field_name] = ( f"V{gap_size}", current_offset, ) # Raw bytes as filler current_offset += gap_size dtype_fields[field.name] = (np_dtype, field.offset) column_names.append(field.name) current_offset = field.offset + np_dtype().itemsize if current_offset < msg.point_step: gap_size = msg.point_step - current_offset gap_field_name = f"unused_{current_offset}" dtype_fields[gap_field_name] = (f"V{gap_size}", current_offset) dtype = np.dtype(dtype_fields) return np.frombuffer(msg.data, dtype=dtype) def clean_pointcloud(points): valid_fields = [ name for name in points.dtype.names if not name.startswith("unused_") ] cleaned_points = rfn.repack_fields(points[valid_fields]) return cleaned_points def main() -> int: parser = ArgParser( config_file_parser_class=YAMLConfigFileParser, default_config_files=["data_analyze_config.yaml"], formatter_class=ArgumentDefaultsRawHelpFormatter, description="Analyse data from a rosbag or mcap file and output additional data", ) parser.add_argument( "--config-file", is_config_file=True, help="yaml config file path" ) parser.add_argument( "--input-experiment-path", required=True, type=existing_path, help="path to experiment. (directly to bag file, to parent folder for mcap)", ) parser.add_argument( "--pointcloud-topic", default="/ouster/points", type=str, help="topic in the ros/mcap bag file containing the point cloud data", ) parser.add_argument( "--output-path", default=Path("./output"), type=Path, help="path augmented dataset should be written to", ) args = parser.parse_args() output_path = args.output_path / args.input_experiment_path.stem output_path.mkdir(parents=True, exist_ok=True) parser.write_config_file( parser.parse_known_args()[0], output_file_paths=[(output_path / "config.yaml").as_posix()], ) with AnyReader([args.input_experiment_path]) as reader: connections = reader.connections topics = dict(sorted({conn.topic: conn for conn in connections}.items())) assert ( args.pointcloud_topic in topics ), f"Topic {args.pointcloud_topic} not found" topic = topics[args.pointcloud_topic] with Progress() as progress: task = progress.add_task("Analyzing data", total=topic.msgcount) for connection, timestamp, rawdata in reader.messages(connections=[topic]): pointcloud_msg = reader.deserialize(rawdata, connection.msgtype) original_pointcloud = read_pointcloud(pointcloud_msg) cleaned_pointcloud = clean_pointcloud(original_pointcloud) # todo add analysis here progress.advance(task) return 0 if __name__ == "__main__": exit(main())