Python Examples
We provide two GitHub repositoiries to show how to interact with MCAP files and ROS2 topics over Zenoh using Python. The first repository, the MCAP-py
repository, has a viewer.py
script to show how to use Python to interact with MCAP files generated by the Raivin. The second repository, the topicsub-py
repository, has a number of scripts demonstrating how to connect and report on the various topics generated by a Raivin.
MCAP
The MCAP-py
repository has a single script, viewer.py
that demonstrates the following functionality:
* loads MCAP file
* parses the MCAP file
* searches the MCAP file for necessary topics
* /camera/info
which is used to get the length and width of the images,
* /camera/h264
which is used to get the images to display
* /model/boxes2d
which is used to get the object detection bounding boxes
* decodes the bounding boxes from the /model/boxes2d
topic and draws them on the image found from the /camera/h264
topic
It uses the MCAP Python Library and our EdgeFirst Schemas Library.
Environments
This has been tested on Python version 3.10.12 on Windows 10 and Ubuntu 22.04 on Windows Subsystem for Linux (WSL).
Setup
Please follow the README.md file for specific installation instructions.
After you download the repository, it is recommened you create a Python virtual environment in the directory, then install the requirements.
$ python -m venv venv
$ source venv/bin/activate
(venv) $ pip install --upgrade -r requirements.txt
Usage
The usage of the viewer.py
script is as follows:
(venv) kris@AUZONEDEV02:~/mcap-py$ python viewer.py -h
usage: viewer.py [-h] [-m [MODEL]] [-s SCALE] [-t THICKNESS] [-b] mcap_file
Process MCAP to view images with bounding boxes.
positional arguments:
mcap_file MCAP that needs to be parsed
options:
-h, --help show this help message and exit
-m [MODEL], --model [MODEL]
Run the MCAP frames through a custom ONNX model to display bounding box.
-s SCALE, --scale SCALE
Resizing factor to view the final image 0.1-1.0. Default: 1.0
-t THICKNESS, --thickness THICKNESS
Choose the thickness of the bounding box. Default: 2
-b, --display_bbox Choose to view the bounding box. Default: True
Codeblocks of Note
To read an MCAP file, all you need to do is open the MCAP file and pass it as an argument to mcap.reader.make_reader()
.
with open(mcap_file, "rb") as f: # Open the MCAP file for reading
logger.info(f"Opening {mcap_file}")
reader = make_reader(f) # Create a reader object for reading messages
sum = reader.get_summary()
for schema, channel, message in reader.iter_messages(): # Iterate over messages in the file
if channel.topic == "/camera/info" and scale_not_set:
def is_topic_present(summary:Summary, topic:str) -> bool:
for id, channel in summary.channels.items():
if channel.topic == topic:
logger.info(f"Found topic {topic}")
return True
logger.error(f"Did not find topic {topic}")
return False
reader.iter_messages()
can be converted into scriptable outputs using our Edgefirst Schemas. Importing them as:
from edgefirst.schemas.sensor_msgs import CameraInfo as Info # Custom message module for camera information
from edgefirst.schemas.foxglove_msgs import CompressedVideo # Custom message module for H264 decoding
from edgefirst.schemas.edgefirst_msgs import Detect # Custom message module for detection
image_data = CompressedVideo.deserialize(message.data) # Deserialize the message data to get H264 frames
mcap_image = get_image(bytes(image_data.data), frame_position) # Get the image frame from the message
image_data.data
can then be used by the PyAV library to generate images and Matplotlib to display them.
We can get image size (height and width) a similar way:
def set_image_size(message, scale):
global frame_height # Access the global variable
frame_height = int(Info.deserialize(message.data).height*scale) # Update the frame height
global frame_width # Access the global variable
frame_width = int(Info.deserialize(message.data).width*scale) # Update the frame width
return False # Return False to indicate that scale is set
boxes = Detect.deserialize(message.data)
box_time = boxes.header.stamp.sec + (boxes.header.stamp.nanosec / 1e9) # Get the box time
boxes_map[box_time] = boxes.boxes
closest_time = get_closest_time(boxes_map, frame_time)
if mcap_image is not None:
logger.info(f"Found {len(boxes_map[closest_time])} boxes at time {closest_time}")
for points in boxes_map[closest_time]: # Iterate over annotation points
if points and mcap_image is not None: # Check if points and image are available
x = int((points.center_x - points.width / 2) * frame_width/scale)
y = int((points.center_y - points.height / 2) * frame_height/scale)
w = int(points.width * frame_width/scale)
h = int(points.height * frame_height/scale)
Topicsub-py
This repository contains a number of Python scripts that read the network topics generated by the ROS2 messaging over Zenoh DDS and demonstrates the following functionality:
* lists the topics a Raivin is sending data out on
* lists the schemas the topics use to encode data on
* outputs from the Raivin:
* the PointCloud information from the Radars Target topic rt/radar/targets
* GPS information from rt/gps
* IMU information from rt/imu
* processed radar data in summary and raw output from rt/fusion/targets
* camera information from rt/camera/info
* detect boxes from rt/model/boxes2d
It uses the Zenoh Python Library and our EdgeFirst Schemas Library.
Environments
This has been tested on the Raivin device itself (release 2025-Q1, Python version 3.10.15), Python version 3.10.12 on Windows 10, and Ubuntu 22.04 on Windows Subsystem for Linux (WSL).
Setup
Please follow the README.md file for specific installation instructions. It is recommended you create a Python virtual environment to run the scripts in.
Libraries not present on Raivin
If the Raivin does not have the Python libraries installed and does not have external Internet access, the packages can be downloaded on a Internet-connected PC with the following command:
pip download --no-cache-dir -r requirements.txt --platform=manylinux_2_28_aarch64 --platform=manylinux_2_17_aarch64 --only-binary=:all:
edgefirst_schemas-1.2.4-py3-none-any.whl
eclipse_zenoh-1.2.1-cp38-abi3-manylinux_2_28_aarch64.whl
numpy-2.2.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
pycdr2-1.0.0-py3-none-any.whl
$ python -m venv venv
$ source venv/bin/activate
(venv) $ pip install pycdr2-1.0.0-py3-none-any.whl
(venv) $ pip install edgefirst_schemas-1.2.4-py3-none-any.whl
(venv) $ pip install eclipse_zenoh-1.2.1-cp38-abi3-manylinux_2_28_aarch64.whl
(venv) $ pip install numpy-2.2.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
Start the Zenohd Router
The Zenohd Router will need to be started to send messages off-device. On the Raivin, run the following command:
sudo systemctl start zenohd
You do not need to enable the Zenohd Router service unless you want it to start on boot. If you do, either run the command `sudo systemctl enable zenohd` or, on step 3 above, toggle the "Enable/Disable" switch on the "zenohd" card to "Enable".
Confirm the router is running with the command:
torizon@verdin-imx8mp-07129940:~$ systemctl status zenohd
● zenohd.service - Zenoh Message Router
Loaded: loaded (/usr/lib/systemd/system/zenohd.service; disabled; vendor preset: disabled)
Active: active (running) since Mon 2025-03-17 21:49:18 UTC; 1 day 21h ago
Main PID: 3480 (zenohd)
Tasks: 11 (limit: 3772)
Memory: 4.6M
CGroup: /system.slice/zenohd.service
└─ 3480 /usr/bin/zenohd --config /etc/zenohd.yaml
Usage
The usage of most of the scripts is as follows:
python .\fusionraw.py -h
usage: fusionraw.py [-h] [-c CONNECT]
Imu Example
options:
-h, --help show this help message and exit
-c CONNECT, --connect CONNECT
Connection point for the zenoh session, default='tcp/127.0.0.1:7447'
python .\fusionraw.py -c tcp/<IP address or hostname>:7447
The IP address can be:
* IPv4 address (e.g. `python .\topics.py tcp/10.10.41.195:7447` )
* IPv6 address (e.g. `python .\topics.py tcp/[fe80::c91e:a40:1c9c:5a7c]:7447` )
* the hostname (e.g. `python .\topics.py -c tcp/verdin-imx8mp-07130049.local:7447`)
Most scripts need the user to hit CTRL-C to stop.
Codeblocks of Note
The scripts start trying to connect to the Raivin:
try:
cfg = zenoh.Config()
cfg.insert_json5("mode", "'client'")
cfg.insert_json5("connect", '{ "endpoints": ["%s"] }' % args.connect)
session = zenoh.open(cfg)
except zenoh.ZError as e:
print(f"Failed to open Zenoh session: {e}")
sys.exit(1)
try:
while True:
time.sleep(0.1)
except KeyboardInterrupt:
print("\nExiting...")
session.close()
sys.exit(0)
sub = session.declare_subscriber('rt/fusion/model_output', fusion_listener)
edgefirst.schemas
to decode the output messages into manipulatible objects:
from edgefirst.schemas.edgefirst_msgs import Mask
def fusion_listener(msg):
fusion = Mask.deserialize(bytes(msg.payload))
print(f"\nMask Data:")
print(f"Dimensions: {fusion.width} x {fusion.height}")
print(f"Encoding: {fusion.encoding}")
print(f"Length: {fusion.length}")