ROS2-Python之消息的发布与订阅
发布者
# my_publisher_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
import time
class MinimalPublisher(Node):
def __init__(self):
# my_publisher_node 是节点名字
super().__init__('my_publisher_node')
# topic是发布的主题名字
self.publisher_ = self.create_publisher(String, 'topic', 10)
timer_period = 1 # seconds
self.timer = self.create_timer(timer_period, self.timer_callback)
def timer_callback(self):
msg = String()
msg.data = 'Hello, ROS 2!' # 发布的消息
self.publisher_.publish(msg)
self.get_logger().info('Publishing: "%s"' % msg.data)
def main(args=None):
rclpy.init(args=args)
my_publisher_node = MinimalPublisher()
try:
rclpy.spin(my_publisher_node)
except KeyboardInterrupt:
pass
finally:
my_publisher_node.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
订阅者
# my_subscriber_node.py
import rclpy
from rclpy.node import Node
from std_msgs.msg import String
class Subscriber(Node):
def __init__(self):
super().__init__("my_subscriber_node")
self.subscriber = self.create_subscription(String,"topic",self.sub_callback,10)
def sub_callback(self,msg):
self.get_logger().info("I heard: %s" %msg.data)
def main(args=None):
rclpy.init(args=args)
node = Subscriber()
rclpy.spin(node)
rclpy.shutdown()
if __name__ == "__main__":
main()
修改setup.py,添加节点
from setuptools import find_packages, setup
package_name = 'robotpy'
setup(
name=package_name,
version='0.0.0',
packages=find_packages(exclude=['test']),
data_files=[
('share/ament_index/resource_index/packages',
['resource/' + package_name]),
('share/' + package_name, ['package.xml']),
],
install_requires=['setuptools'],
zip_safe=True,
maintainer='wheeltec',
maintainer_email='wheeltec@todo.todo',
description='TODO: Package description',
license='TODO: License declaration',
tests_require=['pytest'],
entry_points={
'console_scripts': [
# 前面的是节点名字,后面的是节点路径
"my_publisher_node=robotpy.my_publisher_node:main",
"my_subscriber_node=robotpy.my_subscriber_node:main",
],
},
)
执行:
# 执行发布者
ros2 run robotpy my_publisher_node
# 执行订阅者
ros2 run robotpy my_subscriber_node
效果如下:
