This tutorial walks you through setting up a basic publisher-subscriber system using Ark.
You’ll learn how to:
All within a lightweight client-server architecture powered by Ark.
In Ark, a node is any script connected to the Ark network. Nodes can publish messages, subscribe to channels, or provide/request services.
In this section, we’ll create a "talker" node that publishes "Hello World"
messages at 10Hz.
from ark.client.comm_infrastructure.base_node import BaseNode, main
from arktypes import string_t
from ark.tools.log import log
from typing import Dict, Any, Optional
import time
class TalkerNode(BaseNode):
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__("Talker")
self.pub = self.create_publisher("chatter", string_t)
self.create_stepper(10, self.step) # 10 Hz
def step(self):
msg = string_t()
msg.data = f"Hello World {time.time()}"
self.pub.publish(msg)
log.info(f"Published message data: {msg.data}")
if __name__ == "__main__":
main(TalkerNode)
from ark.client.comm_infrastructure.base_node import BaseNode, main
from arktypes import string_t
from ark.tools.log import log
from typing import Dict, Any, Optional
import time
You need to import BaseNode
, main
to create a node. The ark types
sting_t
is so that we can use the string_t
message type for publishing.
super().__init__("Talker")
self.pub = self.create_publisher("chatter", string_t)
The line super().__init__("Talker")
tells Ark the name of the node, this is required first to set up the node to allow communication. In this case the node has the name “Talker”
The line self.pub = self.create_publisher("chatter", string_t)
defines the talkers interface with Ark. It says that the node will publish to the “chatter” channel using the message type string_t
.
self.create_stepper(10, self.step) # 10 Hz
This line creates a stepper that will run the self.step
callback function at the frequency of 10 times per second. (as long as our processing time does not exceed 1/10th of a second).
def step(self):
msg = string_t()
msg.data = f"Hello World {time.time()}"
self.pub.publish(msg)
log.info(f"Published message data: {msg.data}")
The string_t
message is a simple message type. You can see the way to pack this message in default_structs.lcm string_t
. Here we just need to set the data parameter to the string we want to send.
We can publish the message using the publisher we defined earlier self.pub.publish(msg)
This callback also calls log.info(str)
which prints the message that has been published to the screen.
Now let’s create a "listener" node that listens for messages published on "chatter"
.
from ark.client.comm_infrastructure.base_node import BaseNode, main
from ark.tools.log import log
from arktypes import string_t
import time
from typing import Dict, Any, Optional
class ListenerNode(BaseNode):
def __init__(self, config: Optional[Dict[str, Any]] = None):
super().__init__("Listener")
self.create_subscriber("chatter", string_t, self.callback)
def callback(self, t, channel_name, msg):
log.info(f"at {t} I heard {msg.data=} on channel '{channel_name}'")
if __name__ == "__main__":
main(ListenerNode)
The code for listener.py
is similar to talker.py
. Except we have introduced a callback-mechanism for subscribing to messages.
super().__init__("Listener")
self.create_subscriber("chatter", string_t, self.callback)
This declares that the node subscribes to the chatter topic which is of the type string_t
. When new messages are heard on the channel chatter the callback
function will be triggered.
The callback function will be triggered with the parameters of t, channel_name, msg
. t
is the time the message was received. channel_name
was the channel the message was heard on. msg
is the message heard on that channel.
✅ You’ve now created a basic real-time communication system using Ark’s publish-subscribe model. You can build on this to transmit sensor data, robot commands, status updates, and more.
Try to launch both scripts or visualize them with ark graph
!
Table of Contents