Offsiteteam
Robotics
October 21, 2025

Foundations of Robotics: Cooperative Scheduling in Python (with a Toy Example)

General Idea

Problem: In robotics, many things happen simultaneously — sensors gather data, motors respond, and controllers make decisions — and we need to process them appropriately. This is one of the main challenges in robotics.

The naive idea is simple: just read all sensors one at a time in an event loop, as frequently as possible.

That approach works, but it has several drawbacks. For example, each sensor may need to be read at a different frequency. It’s enough to read a temperature sensor once a minute, but a video camera or accelerometer must be read much more often. Reading all sensors at the same frequency causes unnecessary CPU overhead and power consumption — which are critical issues for embedded systems.

For example

while True:
    read_temp()
    read_lidar()
    read_camera()
    ...

if read_camera() takes 500 ms but read_temp() needs to sample every 50 ms, then temperature updates are always delayed — they have to wait for the camera.

To solve this problem, we can design a cooperative scheduling system — where each component (sensor, controller, actuator) takes turns using the CPU, sharing time voluntarily rather than competing for it.

Cooperative scheduling strategy — a way to organize concurrent tasks so that each sensor loop voluntarily gives up control (cooperates) rather than being forcibly interrupted (preempted).

A cooperative scheduler stands in contrast to Preemptive Scheduling.

In preemptive scheduling, you run each sensor-reading loop in a separate thread or process, and the operating system (OS) scheduler decides when a thread or process has run long enough and should pause to let another run — even if the first thread hasn’t finished its current operation.

Note that in cooperative scheduling we do not have to run each sensor in a separate parallel thread or process — everything happens within a single thread. Python has an excellent mechanism for organizing cooperative scheduling: the generator (coroutines), which can stop at each loop iteration and return control back to the scheduler so it can read other sensors.

How Cooperative Scheduling Is Organized Programmatically

Each sensor is read in a loop. After its first reading, it sends (emits) a message to the controller loop (the receiver) through a pipe, which is essentially a queue.

During each iteration, the sensor (emitter) sends not only the reading itself but also a command to the controller — usually a Sleep command, but it could also be something else (e.g., logging or control instructions).

Note that Sleep command is not an actual OS sleep — it’s an instruction interpreted by the controller to decide when to next schedule the sensor.

Overall, in each loop iteration, every sensor sends both:

  • Its reading (e.g., temperature = 22.4°C), and
  • A command, like Sleep(2), meaning “wait 2 seconds before asking me again”.

Since the sensor loop is organized as a generator, it stops at the yield Sleep(1) statement, gives control back to the controller loop, and waits until the controller resumes it for the next iteration.

This means the sensor is not running on every tick (loop iteration) — avoiding unnecessary energy and CPU consumption — but only when necessary for that specific sensor.

Below you can see a very simple example of how cooperative scheduling is organized.

Controlling Time: Sensor-Controlled vs Global-Clock Worlds

Time can be controlled either by the sensor loop or by the World.

Generally, the robotic control loop is organized around the so-called World — a simulation environment (class) that implements basic methods such as:

  • connect(emitter, receiver)
  • run(sensor_loops, controller_loops)

along with separate sensor_loop and controller_loop generators.

The main idea of cooperative scheduling is that each sensor loop does not keep the CPU busy all the time; it runs only when it’s explicitly scheduled to do so, after a certain period.

So, each sensor sends the amount of time until its next reading as a command like Sleep(1.0):

def sensor_loop(stop_check: Callable[[], bool], emitter: Emitter, name: str) -> Iterator[Sleep]:
    while not stop_check():
        sensor_reading = random.randint(0, 100)
        emitter.emit((name, sensor_reading))
        yield Sleep(2.0)

This Sleep command is received by the cooperative scheduler loop, which plans the next reading accordingly:

# Initialize scheduling of the next reading
sensor_next_time = {loop: 0.0 for loop in sensor_loops}
controller_next_time = {loop: 0.0 for loop in controller_loops}

while not self._stop:
    now = time.time()

    #### Cooperative scheduling for sensors
    for s_loop in sensor_loops:
        if now >= sensor_next_time[s_loop]:     # read only if its time has come
        try:
            command = next(s_loop)
            if isinstance(command, Sleep):
                sensor_next_time[s_loop] = now + command.seconds
            else:
                raise ValueError(f"Unexpected command: {command}")
        except StopIteration:
            # generator finished (provide no more iterations) - remove it from the list
            sensor_next_time.pop(s_loop, None)
            sensor_loops.remove(s_loop)
            continue

The key line is if now >= sensor_next_time[s_loop]:

This ensures that the next sensor reading isn’t even called until its scheduled time has arrived.

In this setup, the sensor loop tells the scheduler how long to wait. As long as the outer while loop spins fast enough, each sensor will be read exactly when it requests to be — meaning that time is controlled by the sensor itself.

This approach is called a sensor-controlled world.

In contrast, we can let the World advance at a fixed frame rate (FPS), i.e., according to a global clock that sleeps between frames. In that case, even if a sensor wants to be read more frequently, it will only be processed at the next clock tick. This approach is called a global-clock world and it is more common in a real production systems.

Conclusion

Cooperative Scheduling offers a simple yet powerful way to organize concurrent activities in robotics without relying on threads or complex synchronization. It si done by letting each sensor (or control loop) voluntarily yield control back to scheduler — typically through generators or coroutines — we can build systems that are both efficient and predictable.

This toy example demonstrates the underlying principle: even with plain Python generators, we can emulate real-time coordination patterns found in embedded and robotic systems.

"""
The minimum example of cooperative scheduling with sensor-controlled world in robotics.

Note
 - all loops are running cooperatively inside a single thread, no separate thread/processes.
 - time is controlled by sensors, no global clock.
"""
import time
import random
from collections import deque
from dataclasses import dataclass
from typing import Iterator, Callable


@dataclass
class Message:
    """That what sensors send to controllers"""
    data: any
    ts: int = -1

@dataclass
class Sleep:
    """A command to send from Sensor to Controller"""
    seconds: float


class Emitter:
    """Emitter that can send messages to several receivers (fan-out)."""
    def __init__(self):
        self._queues: list[deque] = []

    def _bind(self, queue: deque):
        self._queues.append(queue)

    def emit(self, data):
        if not self._queues:
            raise RuntimeError("Emitter not connected to any receiver")
        msg = Message(data)
        for q in self._queues:
            if len(q) < q.maxlen:
                q.append(msg)
            print(f"   --> Emitted {data}")


class Receiver:
    """Receiver that can only have one source (one emitter)."""
    def __init__(self):
        self._queue: deque | None = None

    def _bind(self, queue: deque):
        if self._queue is not None:
            raise RuntimeError("Receiver can only be connected to one emitter")
        self._queue = queue

    def read(self):
        if self._queue is None:
            raise RuntimeError("Receiver not connected to any emitter")
        if self._queue:
            return self._queue.popleft()
        return None


# --- Control loops ----------------------------------------------------------
def sensor_loop(stop_check: Callable[[], bool], emitter: Emitter, name: str) -> Iterator[Sleep]:
    """Sensor loop with generator, might be run a background process later."""
    while not stop_check():
        sensor_reading = random.randint(0, 100)
        emitter.emit((name, sensor_reading))
        yield Sleep(2.0)   # stop here and give up control to Controller by sending a Sleep Command
    print(f"Sensor [{name}] Finished")


def controller_loop(stop_check: Callable[[], bool], receiver: Receiver, name: str) -> Iterator[Sleep]:
    """Foreground controller consuming messages cooperatively."""
    while not stop_check():
        msg = receiver.read()
        if msg:
            sensor_name, value = msg.data
            print(f"Receiver [{name}] Got {value} from {sensor_name} (ts={msg.ts})")
            # ACTION: take an action based on sensor reading, maybe emit another message
        yield Sleep(5.0)    # Time is controlled here by individual loop not world!
    print(f"Controller [{name}] Finished")



# --- Cooperative world ------------------------------------------------------
class World:
    """Toy cooperative scheduler managing sensors and controllers separately."""
    def __init__(self):
        self._stop = False

    def stop(self):
        self._stop = True
        print("[World] Stop signal received.")

    def is_stopped(self) -> bool:   # helper for readability
        return self._stop

    def connect(self, emitter: Emitter, receiver: Receiver):
        queue = deque(maxlen=10)
        emitter._bind(queue)
        receiver._bind(queue)
        print("[World] Connected emitter <-> receiver")

    def run(self, *, sensor_loops: list[Iterator[Sleep]], controller_loops: list[Iterator[Sleep]]):
        """Run cooperative scheduling loop for both sensors and controllers."""
        print("[World] Starting cooperative world... (Ctrl+C to stop)")
        try:
            # Initialize scheduling of the next reading
            sensor_next_time = {loop: 0.0 for loop in sensor_loops}
            controller_next_time = {loop: 0.0 for loop in controller_loops}

            while not self._stop:
                now = time.time()

                # sensors loop might be run in separate thread/process in background, that is we separate them
                #### Cooperative scheduling for sensors
                for s_loop in sensor_loops:
                    if now >= sensor_next_time[s_loop]:     # read only if its time has come
                        try:
                            command = next(s_loop)
                            if isinstance(command, Sleep):
                                sensor_next_time[s_loop] = now + command.seconds
                            else:
                                raise ValueError(f"Unexpected command: {command}")
                        except StopIteration:
                            # generator finished (provide no more iterations) - remove it from the list
                            sensor_next_time.pop(s_loop, None)
                            sensor_loops.remove(s_loop)
                            continue

                #### Cooperative scheduling for controllers
                for c_loop in controller_loops:
                    if now >= controller_next_time[c_loop]:
                        try:
                            command = next(c_loop)
                            if isinstance(command, Sleep):
                                controller_next_time[c_loop] = now + command.seconds
                            else:
                                raise ValueError(f"Unexpected command: {command}")
                        except StopIteration:
                            # generator finished (provide no more iterations) - remove it from the list
                            controller_next_time.pop(c_loop, None)
                            controller_loops.remove(c_loop)
                            continue

                # Prevent tight CPU spin
                time.sleep(0.01)

        except KeyboardInterrupt:
            print("\n[World] User interruption received (Ctrl+C).")
        finally:
            self._stop = True
            print("[World] Cooperative world stopped.")


# --- Main simulation --------------------------------------------------------

if __name__ == "__main__":
    world = World()

    # Create emitters and receivers
    camera = Emitter()
    lidar = Emitter()

    camera_controller = Receiver()
    lidar_controller = Receiver()

    # Connect emitters to receivers
    world.connect(camera, camera_controller)
    world.connect(lidar, lidar_controller)

    # Define coroutines
    sensor_loops = [
        sensor_loop(world.is_stopped, camera, "CameraSensor"),
        sensor_loop(world.is_stopped, lidar, "LidarSensor"),
    ]

    controller_loops = [
        controller_loop(world.is_stopped, camera_controller, "CameraController"),
        controller_loop(world.is_stopped, lidar_controller, "LidarController"),
    ]

    # Run everything cooperatively in one thread
    world.run(sensor_loops=sensor_loops, controller_loops=controller_loops)
Ready to use Robotics
to improve your business?
Fill out the form below to tell us about your project.
We'll contact you promptly to discuss your needs.
We received your message!
Thank you!