Usage Guide
ZROS mimics the ROS 2 workflow but uses ZeroMQ for communication.
Note
The following examples use zCompressedCVBridge and cv2. Make sure you have installed ZROS with Computer Vision support first: uv pip install zros[cv2]
Create a Publisher (publisher.py)
from zros import zNode, zCompressedCVBridge
import cv2
class CameraPublisher(zNode):
def __init__(self):
super().__init__("camera_pub")
self.pub = self.create_publisher("video_topic")
self.bridge = zCompressedCVBridge() # up to 30x more efficient bandwidth usage than zCvBridge
self.cap = cv2.VideoCapture(0)
self.create_timer(1/60, self.timer_callback)
def timer_callback(self):
ret, frame = self.cap.read()
if ret:
# msg is a dictionary
msg = {
"zimgmsg": self.bridge.cv2_to_zimgmsg(frame),
"info": "My Camera Frame"
}
self.pub.publish(msg)
if __name__ == "__main__":
CameraPublisher().spin()
Create a Subscriber (subscriber.py)
from zros import zNode, zCompressedCVBridge
import cv2
class VideoSubscriber(zNode):
def __init__(self):
super().__init__("video_sub")
self.bridge = zCompressedCVBridge()
self.create_subscriber("video_topic", self.callback)
def callback(self, msg):
img = self.bridge.zimgmsg_to_cv2(msg["zimgmsg"])
# info = msg["info"]
# print(info)
cv2.imshow("Video", img)
cv2.waitKey(1)
if __name__ == "__main__":
VideoSubscriber().spin()
Running Examples
You can find another examples in the examples/ directory.
CLI Tools
ZROS provides command-line tools to inspect and interact with the system.
zros_status
Displays the currently active nodes and their topic connections (publishers and subscribers). It scans the network for a second and then prints the results hierarchically.
Output:
zros_pub
Publishes a dictionary payload to a topic from the terminal.
Syntax:
Example:
Note: The input keys and values should be between single quotes.Output:
zros_echo
Prints messages received on a topic to the console.
Syntax:
Example:
Output: