Problem: In robotics, many things happen simultaneously — sensors gather data, motors respond, and controllers make decisions — and we need to process them appropriately. This is one of the main challenges in robotics.
The naive idea is simple: just read all sensors one at a time in an event loop, as frequently as possible.
That approach works, but it has several drawbacks. For example, each sensor may need to be read at a different frequency. It’s enough to read a temperature sensor once a minute, but a video camera or accelerometer must be read much more often. Reading all sensors at the same frequency causes unnecessary CPU overhead and power consumption — which are critical issues for embedded systems.
For example
while True:
read_temp()
read_lidar()
read_camera()
...
if read_camera() takes 500 ms but read_temp() needs to sample every 50 ms, then temperature updates are always delayed — they have to wait for the camera.
To solve this problem, we can design a cooperative scheduling system — where each component (sensor, controller, actuator) takes turns using the CPU, sharing time voluntarily rather than competing for it.
Cooperative scheduling strategy — a way to organize concurrent tasks so that each sensor loop voluntarily gives up control (cooperates) rather than being forcibly interrupted (preempted).
A cooperative scheduler stands in contrast to Preemptive Scheduling.
In preemptive scheduling, you run each sensor-reading loop in a separate thread or process, and the operating system (OS) scheduler decides when a thread or process has run long enough and should pause to let another run — even if the first thread hasn’t finished its current operation.
Note that in cooperative scheduling we do not have to run each sensor in a separate parallel thread or process — everything happens within a single thread. Python has an excellent mechanism for organizing cooperative scheduling: the generator (coroutines), which can stop at each loop iteration and return control back to the scheduler so it can read other sensors.
Each sensor is read in a loop. After its first reading, it sends (emits) a message to the controller loop (the receiver) through a pipe, which is essentially a queue.
During each iteration, the sensor (emitter) sends not only the reading itself but also a command to the controller — usually a Sleep command, but it could also be something else (e.g., logging or control instructions).
Note that Sleep command is not an actual OS sleep — it’s an instruction interpreted by the controller to decide when to next schedule the sensor.
Overall, in each loop iteration, every sensor sends both:
Sleep(2), meaning “wait 2 seconds before asking me again”.Since the sensor loop is organized as a generator, it stops at the yield Sleep(1) statement, gives control back to the controller loop, and waits until the controller resumes it for the next iteration.
This means the sensor is not running on every tick (loop iteration) — avoiding unnecessary energy and CPU consumption — but only when necessary for that specific sensor.
Below you can see a very simple example of how cooperative scheduling is organized.
Time can be controlled either by the sensor loop or by the World.
Generally, the robotic control loop is organized around the so-called World — a simulation environment (class) that implements basic methods such as:
along with separate sensor_loop and controller_loop generators.
The main idea of cooperative scheduling is that each sensor loop does not keep the CPU busy all the time; it runs only when it’s explicitly scheduled to do so, after a certain period.
So, each sensor sends the amount of time until its next reading as a command like Sleep(1.0):
def sensor_loop(stop_check: Callable[[], bool], emitter: Emitter, name: str) -> Iterator[Sleep]:
while not stop_check():
sensor_reading = random.randint(0, 100)
emitter.emit((name, sensor_reading))
yield Sleep(2.0)
This Sleep command is received by the cooperative scheduler loop, which plans the next reading accordingly:
# Initialize scheduling of the next reading
sensor_next_time = {loop: 0.0 for loop in sensor_loops}
controller_next_time = {loop: 0.0 for loop in controller_loops}
while not self._stop:
now = time.time()
#### Cooperative scheduling for sensors
for s_loop in sensor_loops:
if now >= sensor_next_time[s_loop]: # read only if its time has come
try:
command = next(s_loop)
if isinstance(command, Sleep):
sensor_next_time[s_loop] = now + command.seconds
else:
raise ValueError(f"Unexpected command: {command}")
except StopIteration:
# generator finished (provide no more iterations) - remove it from the list
sensor_next_time.pop(s_loop, None)
sensor_loops.remove(s_loop)
continue
The key line is if now >= sensor_next_time[s_loop]:
This ensures that the next sensor reading isn’t even called until its scheduled time has arrived.
In this setup, the sensor loop tells the scheduler how long to wait. As long as the outer while loop spins fast enough, each sensor will be read exactly when it requests to be — meaning that time is controlled by the sensor itself.
This approach is called a sensor-controlled world.
In contrast, we can let the World advance at a fixed frame rate (FPS), i.e., according to a global clock that sleeps between frames. In that case, even if a sensor wants to be read more frequently, it will only be processed at the next clock tick. This approach is called a global-clock world and it is more common in a real production systems.
Cooperative Scheduling offers a simple yet powerful way to organize concurrent activities in robotics without relying on threads or complex synchronization. It si done by letting each sensor (or control loop) voluntarily yield control back to scheduler — typically through generators or coroutines — we can build systems that are both efficient and predictable.
This toy example demonstrates the underlying principle: even with plain Python generators, we can emulate real-time coordination patterns found in embedded and robotic systems.
"""
The minimum example of cooperative scheduling with sensor-controlled world in robotics.
Note
- all loops are running cooperatively inside a single thread, no separate thread/processes.
- time is controlled by sensors, no global clock.
"""
import time
import random
from collections import deque
from dataclasses import dataclass
from typing import Iterator, Callable
@dataclass
class Message:
"""That what sensors send to controllers"""
data: any
ts: int = -1
@dataclass
class Sleep:
"""A command to send from Sensor to Controller"""
seconds: float
class Emitter:
"""Emitter that can send messages to several receivers (fan-out)."""
def __init__(self):
self._queues: list[deque] = []
def _bind(self, queue: deque):
self._queues.append(queue)
def emit(self, data):
if not self._queues:
raise RuntimeError("Emitter not connected to any receiver")
msg = Message(data)
for q in self._queues:
if len(q) < q.maxlen:
q.append(msg)
print(f" --> Emitted {data}")
class Receiver:
"""Receiver that can only have one source (one emitter)."""
def __init__(self):
self._queue: deque | None = None
def _bind(self, queue: deque):
if self._queue is not None:
raise RuntimeError("Receiver can only be connected to one emitter")
self._queue = queue
def read(self):
if self._queue is None:
raise RuntimeError("Receiver not connected to any emitter")
if self._queue:
return self._queue.popleft()
return None
# --- Control loops ----------------------------------------------------------
def sensor_loop(stop_check: Callable[[], bool], emitter: Emitter, name: str) -> Iterator[Sleep]:
"""Sensor loop with generator, might be run a background process later."""
while not stop_check():
sensor_reading = random.randint(0, 100)
emitter.emit((name, sensor_reading))
yield Sleep(2.0) # stop here and give up control to Controller by sending a Sleep Command
print(f"Sensor [{name}] Finished")
def controller_loop(stop_check: Callable[[], bool], receiver: Receiver, name: str) -> Iterator[Sleep]:
"""Foreground controller consuming messages cooperatively."""
while not stop_check():
msg = receiver.read()
if msg:
sensor_name, value = msg.data
print(f"Receiver [{name}] Got {value} from {sensor_name} (ts={msg.ts})")
# ACTION: take an action based on sensor reading, maybe emit another message
yield Sleep(5.0) # Time is controlled here by individual loop not world!
print(f"Controller [{name}] Finished")
# --- Cooperative world ------------------------------------------------------
class World:
"""Toy cooperative scheduler managing sensors and controllers separately."""
def __init__(self):
self._stop = False
def stop(self):
self._stop = True
print("[World] Stop signal received.")
def is_stopped(self) -> bool: # helper for readability
return self._stop
def connect(self, emitter: Emitter, receiver: Receiver):
queue = deque(maxlen=10)
emitter._bind(queue)
receiver._bind(queue)
print("[World] Connected emitter <-> receiver")
def run(self, *, sensor_loops: list[Iterator[Sleep]], controller_loops: list[Iterator[Sleep]]):
"""Run cooperative scheduling loop for both sensors and controllers."""
print("[World] Starting cooperative world... (Ctrl+C to stop)")
try:
# Initialize scheduling of the next reading
sensor_next_time = {loop: 0.0 for loop in sensor_loops}
controller_next_time = {loop: 0.0 for loop in controller_loops}
while not self._stop:
now = time.time()
# sensors loop might be run in separate thread/process in background, that is we separate them
#### Cooperative scheduling for sensors
for s_loop in sensor_loops:
if now >= sensor_next_time[s_loop]: # read only if its time has come
try:
command = next(s_loop)
if isinstance(command, Sleep):
sensor_next_time[s_loop] = now + command.seconds
else:
raise ValueError(f"Unexpected command: {command}")
except StopIteration:
# generator finished (provide no more iterations) - remove it from the list
sensor_next_time.pop(s_loop, None)
sensor_loops.remove(s_loop)
continue
#### Cooperative scheduling for controllers
for c_loop in controller_loops:
if now >= controller_next_time[c_loop]:
try:
command = next(c_loop)
if isinstance(command, Sleep):
controller_next_time[c_loop] = now + command.seconds
else:
raise ValueError(f"Unexpected command: {command}")
except StopIteration:
# generator finished (provide no more iterations) - remove it from the list
controller_next_time.pop(c_loop, None)
controller_loops.remove(c_loop)
continue
# Prevent tight CPU spin
time.sleep(0.01)
except KeyboardInterrupt:
print("\n[World] User interruption received (Ctrl+C).")
finally:
self._stop = True
print("[World] Cooperative world stopped.")
# --- Main simulation --------------------------------------------------------
if __name__ == "__main__":
world = World()
# Create emitters and receivers
camera = Emitter()
lidar = Emitter()
camera_controller = Receiver()
lidar_controller = Receiver()
# Connect emitters to receivers
world.connect(camera, camera_controller)
world.connect(lidar, lidar_controller)
# Define coroutines
sensor_loops = [
sensor_loop(world.is_stopped, camera, "CameraSensor"),
sensor_loop(world.is_stopped, lidar, "LidarSensor"),
]
controller_loops = [
controller_loop(world.is_stopped, camera_controller, "CameraController"),
controller_loop(world.is_stopped, lidar_controller, "LidarController"),
]
# Run everything cooperatively in one thread
world.run(sensor_loops=sensor_loops, controller_loops=controller_loops)