From d118d4079512f5800bc66a3e867c646910069457 Mon Sep 17 00:00:00 2001 From: Jan Kowalczyk Date: Thu, 12 Dec 2024 15:00:22 +0100 Subject: [PATCH] rosbag analyzer wip first commit --- tools/data_analyze.py | 133 ++++++++++++++++++++++++++++++++++++++++++ 1 file changed, 133 insertions(+) create mode 100644 tools/data_analyze.py diff --git a/tools/data_analyze.py b/tools/data_analyze.py new file mode 100644 index 0000000..ba04d88 --- /dev/null +++ b/tools/data_analyze.py @@ -0,0 +1,133 @@ +from pathlib import Path +from sys import exit + +import numpy as np +from configargparse import ( + ArgParser, + ArgumentDefaultsRawHelpFormatter, + YAMLConfigFileParser, +) +from numpy.lib import recfunctions as rfn +from rich.progress import Progress +from rosbags.highlevel import AnyReader + +from util import existing_path + +# Mapping of PointField datatypes to NumPy dtypes +POINTFIELD_DATATYPES = { + 1: np.int8, # INT8 + 2: np.uint8, # UINT8 + 3: np.int16, # INT16 + 4: np.uint16, # UINT16 + 5: np.int32, # INT32 + 6: np.uint32, # UINT32 + 7: np.float32, # FLOAT32 + 8: np.float64, # FLOAT64 +} + + +def read_pointcloud(msg): + # Build the dtype dynamically from the fields + dtype_fields = {} + column_names = [] + current_offset = 0 + + for field in msg.fields: + np_dtype = POINTFIELD_DATATYPES.get(field.datatype) + if np_dtype is None: + raise ValueError( + f"Unsupported datatype {field.datatype} for field {field.name}" + ) + + if field.offset > current_offset: + gap_size = field.offset - current_offset + gap_field_name = f"unused_{current_offset}" + dtype_fields[gap_field_name] = ( + f"V{gap_size}", + current_offset, + ) # Raw bytes as filler + current_offset += gap_size + + dtype_fields[field.name] = (np_dtype, field.offset) + column_names.append(field.name) + current_offset = field.offset + np_dtype().itemsize + + if current_offset < msg.point_step: + gap_size = msg.point_step - current_offset + gap_field_name = f"unused_{current_offset}" + dtype_fields[gap_field_name] = (f"V{gap_size}", current_offset) + + dtype = np.dtype(dtype_fields) + return np.frombuffer(msg.data, dtype=dtype) + + +def clean_pointcloud(points): + valid_fields = [ + name for name in points.dtype.names if not name.startswith("unused_") + ] + cleaned_points = rfn.repack_fields(points[valid_fields]) + return cleaned_points + + +def main() -> int: + parser = ArgParser( + config_file_parser_class=YAMLConfigFileParser, + default_config_files=["data_analyze_config.yaml"], + formatter_class=ArgumentDefaultsRawHelpFormatter, + description="Analyse data from a rosbag or mcap file and output additional data", + ) + parser.add_argument( + "--config-file", is_config_file=True, help="yaml config file path" + ) + parser.add_argument( + "--input-experiment-path", + required=True, + type=existing_path, + help="path to experiment. (directly to bag file, to parent folder for mcap)", + ) + parser.add_argument( + "--pointcloud-topic", + default="/ouster/points", + type=str, + help="topic in the ros/mcap bag file containing the point cloud data", + ) + parser.add_argument( + "--output-path", + default=Path("./output"), + type=Path, + help="path augmented dataset should be written to", + ) + args = parser.parse_args() + + output_path = args.output_path / args.input_experiment_path.stem + output_path.mkdir(parents=True, exist_ok=True) + + parser.write_config_file( + parser.parse_known_args()[0], + output_file_paths=[(output_path / "config.yaml").as_posix()], + ) + + with AnyReader([args.input_experiment_path]) as reader: + connections = reader.connections + topics = dict(sorted({conn.topic: conn for conn in connections}.items())) + assert ( + args.pointcloud_topic in topics + ), f"Topic {args.pointcloud_topic} not found" + topic = topics[args.pointcloud_topic] + + with Progress() as progress: + task = progress.add_task("Analyzing data", total=topic.msgcount) + for connection, timestamp, rawdata in reader.messages(connections=[topic]): + pointcloud_msg = reader.deserialize(rawdata, connection.msgtype) + original_pointcloud = read_pointcloud(pointcloud_msg) + cleaned_pointcloud = clean_pointcloud(original_pointcloud) + + # todo add analysis here + + progress.advance(task) + + return 0 + + +if __name__ == "__main__": + exit(main())