Creating custom msg and srv. Python
In previous sections, predefined messages and service types were used. Recall the String message type in the publisher and subscriber example or the AddTwoInts service in the service and client examples. These types of interfaces already existed and were ready to be used. In this section, custom messages and services types will be created and applied into program examples under the python programming language.
Setup for working with custom msg and srv
Make sure to be in a brand new terminal window and no ROS command is currently running.
Create a new package. This package should be contained in the ros2_ws workspace, within its /src folder. The name provided to this new package will be tutorial_interfaces.
For more reference on package creation consult the package creation section.
ros2 pkg create --build-type ament_cmake --license Apache-2.0 tutorial_interfaces
Notice that the package created, is a CMake package. This is where the custom messages and services will be stored, but these can be used in any kind of packages, python or C++ packages.
Next, create the folder: msg and srv inside ros2_ws/src/tutorial_interfaces. This is where messages and services types will be stored respectively.
Message definition
Inside tutorial_interfaces/msg create a new file named Sphere.msg. Edit the content of Sphere.msg to include:
geometry_msgs/Point center
float64 radius
This custom message uses a message from another message package (geometry_msgs/Point in this case).
Service definition
Inside tutorial_interfaces/srv create a new file named AddThreeInts.srv. Edit the content of AddThreeInts.srv to include:
int64 a
int64 b
int64 c
---
int64 sum
This custom service requests three integers named a, b, and c, and responds with an integer called sum.
Edditing the CMakeLists.txt
To convert the defined interfaces into language-specific code (like C++ and Python) so that they can be used in those languages, add the following lines to CMakeLists.txt:
find_package(geometry_msgs REQUIRED)
find_package(rosidl_default_generators REQUIRED)
rosidl_generate_interfaces(${PROJECT_NAME}
"msg/Sphere.msg"
"srv/AddThreeInts.srv"
DEPENDENCIES geometry_msgs # Add packages that above messages depend on, in this case geometry_msgs for Sphere.msg
)
The find_package() commands make the compiler look for the required packages. In this case, geometry_msgs and rosidl_default_generators are the required packages.
The rosidl_generate_interfaces() command line, actually generates the code for the custom message and service interfaces. It takes as arguments: The name of the project, the path to the custom message and service files and necessary package dependencies.
The CMakeLists.txt file should look similar to:
Editing the package.xml file
The following should be added to tutorial_interfaces/package.xml:
<depend>geometry_msgs</depend>
<buildtool_depend>rosidl_default_generators</buildtool_depend>
<exec_depend>rosidl_default_runtime</exec_depend>
<member_of_group>rosidl_interface_packages</member_of_group>
Because the interfaces rely on
rosidl_default_generatorsfor generating language-specific code, you need to declare a build tool dependency on it.rosidl_default_runtimeis a runtime or execution-stage dependency, needed to be able to use the interfaces later.The
rosidl_interface_packagesis the name of the dependency group that thetutorial_interfaces package, should be associated with, declared using the<member_of_group>tag.
The pacakge.xml file should look similar to:
Build and test
Open a brand new terminal , make sure that no other ROS 2 command is currently running, navigate to the workspace directory and execute:
colcon build --packages-select tutorial_interfaces
Now, source the setup file:
source install/setup.bash
For more reference on sourcing the setup file, see sourcing the setup file.
Next, to check that the custom message is correctly created, run:
ros2 interface show tutorial_interfaces/msg/Sphere
The otuput should be:
geometry_msgs/Point center
float64 x
float64 y
float64 z
float64 radius
And to test the service, run:
ros2 interface show tutorial_interfaces/srv/AddThreeInts
Should output the following:
int64 a
int64 b
int64 c
---
int64 sum
Testing the Sphere custom msg in a python package
Make sure to be in a brand new terminal window and no ROS commands are currently running.
Create a new python package, this package should be contained in the ros2_ws workspace, within its /src folder. The name provided to this new package will be testing_interfaces_python. For more reference on package creation consult the package creation section.
ros2 pkg create --build-type ament_python --license Apache-2.0 testing_interfaces_python --dependencies rclpy tutorial_interfaces
The --dependencies argument will automatically add the necessary dependency lines to package.xml. In this case, tutorial_interfaces is the package that includes the Sphere.msg file that is needed for this test.
The code
Next, inside testing_interfaces_python/testing_interfaces_python create a python script, name it sphere_publisher.py.
Copy this content into the new python script.
import rclpy
from rclpy.node import Node
from tutorial_interfaces.msg import Sphere # Change
class SpherePublisher(Node): # Change
def __init__(self):
super().__init__('sphere_publisher') # Change
self.publisher_ = self.create_publisher(Sphere, 'sphere_topic', 10) # Change
timer_period = 0.5 # seconds
self.timer_ = self.create_timer(timer_period, self.timer_callback)
self.count_ = 0.0
def timer_callback(self):
msg = Sphere() # Change
msg.center.x = self.count_ # Change
msg.center.y = 1.0 # Change
msg.center.z = 2.0 # Change
msg.radius = 10.0 # Change
self.publisher_.publish(msg) # Change
self.get_logger().info('Publishing sphere params (x, y, z, radius):' + # Change
'x=%s, y=%s, z=%s, radius=%s' %
(msg.center.x, msg.center.y, msg.center.z, msg.radius))
self.count_ += 1.0
def main(args=None):
rclpy.init(args=args)
sphere_publisher = SpherePublisher()
rclpy.spin(sphere_publisher)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
sphere_publisher.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
Notice that this code is very similar to the publisher script that was studied previously.
Check the important changes in this script.
from tutorial_interfaces.msg import Sphere # Change
...
self.publisher_ = self.create_publisher(Sphere, 'sphere_topic', 10) # Change
...
def timer_callback(self):
msg = Sphere() # Change
msg.center.x = self.count_ # Change
msg.center.y = 1.0 # Change
msg.center.z = 2.0 # Change
msg.radius = 10.0 # Change
self.publisher_.publish(msg)
It is important to correctly import the required libraries.
The publisher node will now publish different type of messages and will also publish to a different topic. The topic name could have stayed the same, but it is better to name the topics accordingly.
Finally, the callback function, instead of directly publishing a string message, it is necessary to fill every parameter that is needed for the new message type.
Next, create another node a listener node for this publisher. Inside testing_interfaces_python/testing_interfaces_python create a python script, name it sphere_listener.py.
Copy this content into the new python script.
import rclpy
from rclpy.node import Node
from tutorial_interfaces.msg import Sphere # Change
class SphereListener(Node):
def __init__(self):
super().__init__('sphere_listener') # Change
self.subscription_ = self.create_subscription( # Change
Sphere,
'sphere_topic',
self.listener_callback,
10)
self.subscription_ # prevent unused variable warning
def listener_callback(self, msg):
self.get_logger().info('I heard (x, y, z, radius):'+ # Change
'x=%s, y=%s, z=%s, radius=%s' %
(msg.center.x, msg.center.y, msg.center.z, msg.radius))
def main(args=None):
rclpy.init(args=args)
sphere_listener = SphereListener()
rclpy.spin(sphere_listener)
# Destroy the node explicitly
# (optional - otherwise it will be done automatically
# when the garbage collector destroys the node object)
sphere_listener.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
The code is very similar to the listener script that was studied previously.
Again, the relevant changes here, have to do with dealing with the appropriate topic name and message type.
Dependencies and entry points
Once, these two python scripts are ready, it is necessary to add the required dependencies in the package.xml file, which was already added when creating this package. See that in the package.xml file it is present the tag package.xml: <depend>tutorial_interfaces</depend>.
Next, add the entry points in the setup.py file:
entry_points={
'console_scripts': [
'sphere_publisher = testing_interfaces_python.sphere_publisher:main',
'sphere_listener = testing_interfaces_python.sphere_listener:main'
],
}
Build and run the custom msg
Build the package with either of these commands:
colcon build --symlink-install
colcon build --packages-select testing_interfaces_python
Source the setup file:
source install/setup.bash
And run the sphere_publisher node that was recently created.
ros2 run testing_interfaces_python sphere_publisher
The result should be like the following:
[INFO] [1712658428.246483307] [sphere_publisher]: Publishing sphere params (x, y, z, radius):x=0.0, y=1.0, z=2.0, radius=10.0
[INFO] [1712658428.603038612] [sphere_publisher]: Publishing sphere params (x, y, z, radius):x=1.0, y=1.0, z=2.0, radius=10.0
[INFO] [1712658429.101586253] [sphere_publisher]: Publishing sphere params (x, y, z, radius):x=2.0, y=1.0, z=2.0, radius=10.0
...
Open a new terminal and execute the sphere_listener node:
ros2 run testing_interfaces_python sphere_listener
The expected result is:
[INFO] [1712658569.240308588] [sphere_listener]: I heard (x, y, z, radius):x=282.0, y=1.0, z=2.0, radius=10.0
[INFO] [1712658569.597305674] [sphere_listener]: I heard (x, y, z, radius):x=283.0, y=1.0, z=2.0, radius=10.0
[INFO] [1712658570.098490216] [sphere_listener]: I heard (x, y, z, radius):x=284.0, y=1.0, z=2.0, radius=10.0
...
Finally, it can also be checked the echo of the messages arriving to the desired topic. Open a new terminal and execute:
ros2 topic echo /sphere_topic
The expected result is:
x: 484.0
y: 1.0
z: 2.0
radius: 10.0
---
center:
x: 485.0
y: 1.0
z: 2.0
radius: 10.0
---
...
At this point, it can be seen that the custom message Sphere.msg that was created is being used successfully.
Testing the AddThreeInts custom srv in a python package
This example will be worked in the testing_interfaces_python package.
Make sure to be in a brand new terminal window and no ROS commands are currently running.
The code
Inside testing_interfaces_python/testing_interfaces_python create a python script, name it add_service_node.py.
Copy this content into the new python script.
from tutorial_interfaces.srv import AddThreeInts
import rclpy
from rclpy.node import Node
class AdditionService(Node):
def __init__(self):
super().__init__('add_service_node')
self.srv = self.create_service(AddThreeInts, 'add_three_ints', self.add_three_ints_callback)
def add_three_ints_callback(self, request, response):
response.sum = request.a + request.b + request.c
self.get_logger().info('Incoming request\na: %d b: %d c: %d' % (request.a, request.b, request.c))
return response
def main():
rclpy.init()
addition_service = AdditionService()
rclpy.spin(addition_service)
rclpy.shutdown()
if __name__ == '__main__':
main()
Notice that this code is very similar to the service script that was studied previously.
Check the important changes in this script.
from tutorial_interfaces.srv import AddThreeInts
...
self.srv = self.create_service(AddThreeInts, 'add_three_ints', self.add_three_ints_callback)
...
def add_three_ints_callback(self, request, response):
response.sum = request.a + request.b + request.c
self.get_logger().info('Incoming request\na: %d b: %d c: %d' % (request.a, request.b, request.c))
return response
It is important to correctly import the required service.
The service node will now be of type
AddThreeInts, and the a service name of:add_three_ints.Finally, the callback function, instead of adding two values it will summ the three parameters in the request section of the service.
Next, create a client node for this service. Inside testing_interfaces_python/testing_interfaces_python create a python script, name it add_client_node.py.
Copy this content into the new python script.
import sys
import rclpy
from rclpy.node import Node
from tutorial_interfaces.srv import AddThreeInts
class AdditionClientAsync(Node):
def __init__(self):
super().__init__('add_client_node')
self.cli = self.create_client(AddThreeInts, 'add_three_ints')
while not self.cli.wait_for_service(timeout_sec=1.0):
self.get_logger().info('service not available, waiting again...')
self.req = AddThreeInts.Request()
def send_request(self, a, b, c):
self.req.a = a
self.req.b = b
self.req.c = c
self.future = self.cli.call_async(self.req)
rclpy.spin_until_future_complete(self, self.future)
return self.future.result()
def main():
rclpy.init()
add_client = AdditionClientAsync()
response = add_client.send_request(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3]))
add_client.get_logger().info(
'Result of add_three_ints: for %d + %d + %d = %d' %
(int(sys.argv[1]), int(sys.argv[2]), int(sys.argv[3]), response.sum))
add_client.destroy_node()
rclpy.shutdown()
if __name__ == '__main__':
main()
The code is very similar to the service client script that was studied previously.
Again, the relevant changes here, have to do with dealing with the appropriate service name and service type.
Dependencies and entry points
Once, these two python scripts are ready, it is necessary to add the required dependencies in the package.xml file, which was already added when creating this package. See that in the package.xml file it is present the tag package.xml: <depend>tutorial_interfaces</depend>.
Next, add the entry points in the setup.py file:
entry_points={
'console_scripts': [
'sphere_publisher = testing_interfaces_python.sphere_publisher:main',
'sphere_listener = testing_interfaces_python.sphere_listener:main',
'add_service_node = testing_interfaces_python.add_service_node:main',
'add_client_node = testing_interfaces_python.add_client_node:main',
],
},
Build and run the custom srv
Build the package with either of these commands:
colcon build --symlink-install
colcon build --packages-select testing_interfaces_python
Source the setup file:
source install/setup.bash
And run the add_service_node node that was recently created.
ros2 run testing_interfaces_python add_service_node
As a result, nothing will be printed in the terminal. The service is ready to be consumed.
Open a new terminal and execute the add_client_node node:
ros2 run testing_interfaces_python add_client_node 4 5 8
The expected result is:
[INFO] [1712660818.668964970] [add_client_node]: Result of add_three_ints: for 4 + 5 + 8 = 17
Finally, the add_three_ints service can also be called from the terminal directly, without the necessity of coding a client node. Open a new terminal and execute:
ros2 service call /add_three_ints tutorial_interfaces/srv/AddThreeInts "{a: 2, b: 3, c: 5}"
The expected result is:
requester: making request: tutorial_interfaces.srv.AddThreeInts_Request(a=2, b=3, c=5)
response:
tutorial_interfaces.srv.AddThreeInts_Response(sum=10)
At this point, it can be seen that the custom service AddThreeInts.srv that was created is being used successfully.