from pathlib import Path import numpy as np from PIL import Image from rosbags.highlevel import AnyReader def list_topics(bag_file): frame_count = 0 output_folder = Path("./test") / bag_file.stem output_folder.mkdir(exist_ok=True, parents=True) with AnyReader([bag_file]) as reader: connections = reader.connections topics = dict(sorted({conn.topic: conn for conn in connections}.items())) topic = topics["/stereo_inertial_publisher/left/image_rect"] for connection, timestamp, rawdata in reader.messages(connections=[topic]): img_msg = reader.deserialize(rawdata, connection.msgtype) img_data = np.frombuffer(img_msg.data, dtype=np.uint8).reshape( (img_msg.height, img_msg.width) ) img = Image.fromarray(img_data, mode="L") frame_count += 1 img.save(output_folder / f"frame_{frame_count:04d}.png") print(f"Saved {frame_count} frames to {output_folder}") if __name__ == "__main__": bag_path = Path( "/home/fedex/mt/data/subter/3_smoke_robot_stationary_static_excess_smoke_2023-01-23.bag" ) print(f"{bag_path.exists()=}") list_topics(bag_path)