Skip to content

Python Examples

We provide two GitHub repositoiries to show how to interact with MCAP files and ROS2 topics over Zenoh using Python. The first repository, the MCAP-py repository, has a viewer.py script to show how to use Python to interact with MCAP files generated by the Raivin. The second repository, the topicsub-py repository, has a number of scripts demonstrating how to connect and report on the various topics generated by a Raivin.

MCAP

The MCAP-py repository has a single script, viewer.py that demonstrates the following functionality: * loads MCAP file * parses the MCAP file * searches the MCAP file for necessary topics * /camera/info which is used to get the length and width of the images, * /camera/h264 which is used to get the images to display * /model/boxes2d which is used to get the object detection bounding boxes * decodes the bounding boxes from the /model/boxes2d topic and draws them on the image found from the /camera/h264 topic

It uses the MCAP Python Library and our EdgeFirst Schemas Library.

Environments

This has been tested on Python version 3.10.12 on Windows 10 and Ubuntu 22.04 on Windows Subsystem for Linux (WSL).

Setup

Please follow the README.md file for specific installation instructions.

After you download the repository, it is recommened you create a Python virtual environment in the directory, then install the requirements.

$ python -m venv venv
$ source venv/bin/activate
(venv) $ pip install --upgrade -r requirements.txt

Usage

The usage of the viewer.py script is as follows:

(venv) kris@AUZONEDEV02:~/mcap-py$ python viewer.py -h
usage: viewer.py [-h] [-m [MODEL]] [-s SCALE] [-t THICKNESS] [-b] mcap_file

Process MCAP to view images with bounding boxes.

positional arguments:
  mcap_file             MCAP that needs to be parsed

options:
  -h, --help            show this help message and exit
  -m [MODEL], --model [MODEL]
                        Run the MCAP frames through a custom ONNX model to display bounding box.
  -s SCALE, --scale SCALE
                        Resizing factor to view the final image 0.1-1.0. Default: 1.0
  -t THICKNESS, --thickness THICKNESS
                        Choose the thickness of the bounding box. Default: 2
  -b, --display_bbox    Choose to view the bounding box. Default: True

Codeblocks of Note

To read an MCAP file, all you need to do is open the MCAP file and pass it as an argument to mcap.reader.make_reader().

with open(mcap_file, "rb") as f:  # Open the MCAP file for reading
    logger.info(f"Opening {mcap_file}")
    reader = make_reader(f)  # Create a reader object for reading messages
You can then use the MCAP reader object in your code, for example, to read its summary:
sum = reader.get_summary()
or to iterate through its messages:
for schema, channel, message in reader.iter_messages():  # Iterate over messages in the file
    if channel.topic == "/camera/info" and scale_not_set: 
The summary contains a list of topics that can be checked:
def is_topic_present(summary:Summary, topic:str) -> bool:
    for id, channel in summary.channels.items():
        if channel.topic == topic:
            logger.info(f"Found topic {topic}")
            return True
    logger.error(f"Did not find topic {topic}")
    return False   
The message data from reader.iter_messages() can be converted into scriptable outputs using our Edgefirst Schemas. Importing them as:

from edgefirst.schemas.sensor_msgs import CameraInfo as Info  # Custom message module for camera information  
from edgefirst.schemas.foxglove_msgs import CompressedVideo  # Custom message module for H264 decoding 
from edgefirst.schemas.edgefirst_msgs import Detect # Custom message module for detection
We can then use:
image_data =  CompressedVideo.deserialize(message.data) # Deserialize the message data to get H264 frames
mcap_image = get_image(bytes(image_data.data), frame_position)  # Get the image frame from the message
The byte-string of the image_data.data can then be used by the PyAV library to generate images and Matplotlib to display them.

We can get image size (height and width) a similar way:

def set_image_size(message, scale):
    global frame_height  # Access the global variable
    frame_height = int(Info.deserialize(message.data).height*scale)  # Update the frame height
    global frame_width  # Access the global variable
    frame_width = int(Info.deserialize(message.data).width*scale)  # Update the frame width
    return False  # Return False to indicate that scale is set
We can get the box decodes as well:
boxes = Detect.deserialize(message.data)
box_time = boxes.header.stamp.sec + (boxes.header.stamp.nanosec / 1e9) # Get the box time
boxes_map[box_time] = boxes.boxes
closest_time = get_closest_time(boxes_map, frame_time)
if mcap_image is not None:
    logger.info(f"Found {len(boxes_map[closest_time])} boxes at time {closest_time}")
for points in boxes_map[closest_time]:  # Iterate over annotation points
    if points and mcap_image is not None:  # Check if points and image are available
        x = int((points.center_x - points.width / 2) * frame_width/scale)
        y = int((points.center_y - points.height / 2) * frame_height/scale)
        w = int(points.width * frame_width/scale)
        h = int(points.height * frame_height/scale)

Topicsub-py

This repository contains a number of Python scripts that read the network topics generated by the ROS2 messaging over Zenoh DDS and demonstrates the following functionality: * lists the topics a Raivin is sending data out on * lists the schemas the topics use to encode data on * outputs from the Raivin: * the PointCloud information from the Radars Target topic rt/radar/targets * GPS information from rt/gps * IMU information from rt/imu * processed radar data in summary and raw output from rt/fusion/targets * camera information from rt/camera/info * detect boxes from rt/model/boxes2d

It uses the Zenoh Python Library and our EdgeFirst Schemas Library.

Environments

This has been tested on the Raivin device itself (release 2025-Q1, Python version 3.10.15), Python version 3.10.12 on Windows 10, and Ubuntu 22.04 on Windows Subsystem for Linux (WSL).

Setup

Please follow the README.md file for specific installation instructions. It is recommended you create a Python virtual environment to run the scripts in.

Libraries not present on Raivin

If the Raivin does not have the Python libraries installed and does not have external Internet access, the packages can be downloaded on a Internet-connected PC with the following command:

pip download --no-cache-dir -r requirements.txt --platform=manylinux_2_28_aarch64 --platform=manylinux_2_17_aarch64 --only-binary=:all:
Which should output 4 files:
edgefirst_schemas-1.2.4-py3-none-any.whl
eclipse_zenoh-1.2.1-cp38-abi3-manylinux_2_28_aarch64.whl
numpy-2.2.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl
pycdr2-1.0.0-py3-none-any.whl
These wheel files (as well as the Python topics scripts) can copied onto the device via SSH secure copy. Once there, they can be installed in a virtual enviornment:
$ python -m venv venv
$ source venv/bin/activate
(venv) $ pip install pycdr2-1.0.0-py3-none-any.whl
(venv) $ pip install edgefirst_schemas-1.2.4-py3-none-any.whl
(venv) $ pip install eclipse_zenoh-1.2.1-cp38-abi3-manylinux_2_28_aarch64.whl
(venv) $ pip install numpy-2.2.3-cp310-cp310-manylinux_2_17_aarch64.manylinux2014_aarch64.whl

Start the Zenohd Router

The Zenohd Router will need to be started to send messages off-device. On the Raivin, run the following command:

sudo systemctl start zenohd
Or start the service from the Raivin's WebUi: 1. Click the Settings "gear" icon in the top right of the Raivin UI: 2. Click the "Service Status" card on the Settings Page. 3. Toggle the "Start/Stop" switch on the "zenohd" card to "Start"

You do not need to enable the Zenohd Router service unless you want it to start on boot.  If you do, either run the command `sudo systemctl enable zenohd` or, on step 3 above, toggle the "Enable/Disable" switch on the "zenohd" card to "Enable".

Confirm the router is running with the command:

torizon@verdin-imx8mp-07129940:~$ systemctl status zenohd
● zenohd.service - Zenoh Message Router
     Loaded: loaded (/usr/lib/systemd/system/zenohd.service; disabled; vendor preset: disabled)
     Active: active (running) since Mon 2025-03-17 21:49:18 UTC; 1 day 21h ago
   Main PID: 3480 (zenohd)
      Tasks: 11 (limit: 3772)
     Memory: 4.6M
     CGroup: /system.slice/zenohd.service
             └─ 3480 /usr/bin/zenohd --config /etc/zenohd.yaml
Or on the "Services Status" page by seeing the status set to "RUNNING" on the 'zenohd' card.

Usage

The usage of most of the scripts is as follows:

python .\fusionraw.py -h
usage: fusionraw.py [-h] [-c CONNECT]

Imu Example

options:
  -h, --help            show this help message and exit
  -c CONNECT, --connect CONNECT
                        Connection point for the zenoh session, default='tcp/127.0.0.1:7447'
Running the scripts on a PC to connect to a Raivin with the Zenoh router running would have the following command:

python .\fusionraw.py -c tcp/<IP address or hostname>:7447
The IP address can be:
* IPv4 address (e.g. `python .\topics.py tcp/10.10.41.195:7447` )
* IPv6 address (e.g. `python .\topics.py tcp/[fe80::c91e:a40:1c9c:5a7c]:7447` )
* the hostname (e.g. `python .\topics.py -c tcp/verdin-imx8mp-07130049.local:7447`)

Most scripts need the user to hit CTRL-C to stop.

Codeblocks of Note

The scripts start trying to connect to the Raivin:

try:
    cfg = zenoh.Config()
    cfg.insert_json5("mode", "'client'")
    cfg.insert_json5("connect", '{ "endpoints": ["%s"] }' % args.connect)
    session = zenoh.open(cfg)
except zenoh.ZError as e:
    print(f"Failed to open Zenoh session: {e}")
    sys.exit(1)
and close the session during exit:
    try:
        while True:
            time.sleep(0.1)
    except KeyboardInterrupt:
        print("\nExiting...")
        session.close()
        sys.exit(0)
Most scripts subscribe to a single topic:
sub = session.declare_subscriber('rt/fusion/model_output', fusion_listener)
And use the edgefirst.schemas to decode the output messages into manipulatible objects:
from edgefirst.schemas.edgefirst_msgs import Mask

def fusion_listener(msg):
    fusion = Mask.deserialize(bytes(msg.payload))
    print(f"\nMask Data:")
    print(f"Dimensions: {fusion.width} x {fusion.height}")
    print(f"Encoding: {fusion.encoding}")
    print(f"Length: {fusion.length}")