Offsiteteam
Robotics
December 22, 2025

Robotics Loop with Multiprocessing and Shared Memory

Intro

In the previous article, I highlighted how it is possible to organize a cooperative loop in robotics — that is, how to coordinate sensor–controller work within a single process without excessive CPU overruns by using generators in Python.

Here, I will try to scale this approach up a little and demonstrate how a larger system might be designed by utilizing both a cooperative loop (for controllers and non-demanding sensors) and a multiprocessing setup, where demanding sensors such as a camera run in a separate process and send data via a shared variable or a shared memory block.

In principle, the camera could also use a multiprocessing queue. However, since it sends a large amount of data every second, the overhead of using a queue would be enormous. Therefore, using shared memory is a more efficient way to handle such a high data throughput between processes.

By the end of this article, you will see an example of how to design and implement a hybrid robotics processing loop that combines cooperative scheduling for lightweight tasks with multiprocessing for high-load sensors, and how to use shared memory to efficiently transfer large data streams between processes.

Robotics Loop

How it works

At a high level, this system creates sensors and controllers, connects them through local or multiprocessing pipes, and executes their generator loops either cooperatively in the main process or in parallel using background processes.

The general way to build such a system is similar to the cooperative scheduling schema described earlier.

  • First, we create sensor objects
  • Then we create a so-called world, a simulation environment for our robotic system.
  • For each sensor, we create either a world.local_pipe or a world.mp_pipe (multiprocessing pipe), depending on whether we want the sensor to run in the main thread or in a separate process.
  • For each pipe, we create two generators: one for emitting sensor data and one for receiving data (controller). Each generator may run either in a separate process and work completely in parallel, or in the main thread and be cooperatively scheduled together with other local generators. The generators are implemented as two functions:
    • sensor_gen_fn() - for emitting sensors
    • controller_gen_fn() - for receiving controllers
  • All generators are placed into two lists:
    • emitting_loops — contains (sensor_gen_fn, (emitter, sensor)).
    • controller_loop — contains (controller_gen_fn, (receivers,)), where all receivers are grouped together because we use only one controller to receive all signals and run them using cooperative scheduling.
  • Next, we decide which generators should run in separate processes and which should run cooperatively within the main process. We do this by sorting them into two groups: cooperative_loops and bg_loops.
  • Finally, we run our world using these two lists:
    • world.run(cooperative_loops, bg_loops)
clk = Clock()       # we might use a custom clock
world = World(clk)

# create pipes between sensor and controller (emitter -> receiver)
pipes: list[Tuple[Sensor, SignalEmitter, SignalReceiver]] = []
for sensor in sensors:
    if PARALLELISM_TYPE == ParallelismType.LOCAL:
        emitter, receiver = world.local_pipe()
    else:
        emitter, receiver = world.mp_pipe(sensor.transport)
    pipes.append((sensor, emitter, receiver))

# list of necessary emitting loops
emitting_loops = [
    (sensor_gen_fn, (emitter, sensor))
    for sensor, emitter, _ in pipes
]

# Single controller loop with all receivers in it (because it is always a cooperative loop)
receivers = [(sensor, receiver) for sensor, _, receiver in pipes]
controller_loop = [
    (controller_gen_fn, (receivers,))
]

# decide what to run in background or foreground
if PARALLELISM_TYPE == ParallelismType.LOCAL:
    cooperative_loops = emitting_loops + controller_loop
    bg_loops = []   # No background processes
elif PARALLELISM_TYPE == ParallelismType.MULTIPROCESS:
    cooperative_loops = controller_loop
    bg_loops = emitting_loops
else:
    raise ValueError(" Wrong parallelism type")

# START simulation - run sensor robotics_control_loop in background and controllers loop cooperatively
world.run_cooperative(cooperative_loops, bg_loops)

Flow diagram

Now let’s outline how the system works using this schema.

Cooperative Loop

Combining Cooperative Scheduling with Multiprocessing in world.run()

Recall that cooperative scheduling is a way to emulate the parallel execution of multiple sensors and controllers within the same process. It is implemented using generators inside each sensor or controller loop, so the loop does not spin continuously. Instead, after one cycle each generator yields control back to a central cooperative scheduler, which plans the execution and sleep times for all generators together.

In some cases like camera it makes sense to run sensor in a separate process because we have either to much data to send or sensor consume too much resources. In this case it is not cooperative scheduling anymore, because we explicitly sleep inside this process if necessary.

In some cases, such as with a camera sensor, it makes sense to run the sensor in a separate process. This is usually done either because the amount of data to be transmitted is too large or because the sensor itself consumes too many computational resources. In this scenario, it is not cooperative scheduling anymore, since the process explicitly sleeps when necessary and runs fully independently. It is a blocking sleep now.

Both approaches are combined in the world.run() method.

First, the processes designated to run in separate background processes are started:

### Start background interprocess loops
for background_fn, args in bg_loops:
    pr = mp.Process(target=_bg_wrapper_loop, args=(background_fn, self._stop_event, *args))
    pr.start()
    self.background_processes.append(pr)
    print(f"[World] Started background process for {args[1].sensor_id}, {args[1].transport.name}")

Then, the foreground sensors and controllers are executed using cooperative scheduling within the main process:

Note that a priority queue implemented with a heap is used to always pop the generator with the sleep time closest to the current moment. A counter is added to avoid a so-called tie — a situation where two generators have the exact same scheduled execution time.

Also note that the cooperative scheduler sleeps only when necessary, and all sensors operate without blocking one another.

generators = [
    cooperative_fn(self._stop_event, *args)
    for cooperative_fn, args in fg_loops
]

priority_queue = []
counter = 0
clock = self.clock
now = clock.now_ns()

# First scheduling: start all generators immediately
for gen in generators:
    priority_queue.append((now, counter, gen))
    counter += 1
heapq.heapify(priority_queue)

###################### Cooperative Loop
while priority_queue and not self._stop_event.is_set():
    # get the closest scheduled gen
    next_time, _, gen = heapq.heappop(priority_queue)

    # wait until its scheduled execution time
    # note: we are not spinning the loop if no reading were scheduled - optimal!
    current = clock.now_ns()
    wait_ns = max(0, next_time - current)
    if wait_ns > 0:
        time.sleep(wait_ns / 1e9)

    command = next(gen)     # advance current generator (read sensor)
    if not isinstance(command, Sleep):
        raise ValueError(f"Unexpected command {command}")

    # reschedule this generator based on yielded sleep
    run_at = clock.now_ns() + int(command.seconds * 1e9)
    heapq.heappush(priority_queue,(run_at, counter, gen))
    counter += 1
#######################

How Data Exchange via Shared Memory Is Implemented

When we run a sensor generator (for example, a camera) in a separate process, we can use a shared queue to send its data from emitter to receiver. For interprocess communication, this would typically be a multiprocessing.Queue.

Queues are convenient and safe for passing small messages but introduce significant serialization and copying overhead for large data streams such as camera frames.

A much more efficient approach is to use a dedicated block of memory — shared memory — to exchange data directly between two processes (the sensor-emitter and the controller-receiver).

In this codebase, shared-memory handling functionality is implemented primarily inside:

  • mp_pipe(self, transport: TransportMode)
  • MultiprocessEmitter / MultiprocessReceiver

Briefly, within mp_pipe, when transport == TransportMode.SHARED_MEMORY, we create several additional shared memory primitives, such as:

  • sm_queue — a special dedicated queue used to pass metadata about the shared memory block.
  • up_value — a synchronized flag that indicates when the shared memory block contains a new, fresh value.

We also create a special kind of multiprocess emitter and receiver:

if transport == TransportMode.SHARED_MEMORY:
# Create SM primitives
lock = self._manager.Lock()
ts_value = self._manager.Value('Q', -1)
up_value = self._manager.Value('b', False)  # a flag that SM block has a new FRESH value
sm_queue = self._manager.Queue()            # a separate queue to send Shared Memory metadata

emitter = MultiprocessEmitter(
    transport, message_queue, self.clock,
    lock, ts_value, up_value, sm_queue
)
receiver = MultiprocessReceiver(
    transport, message_queue,
    lock, ts_value, up_value, sm_queue

Then, inside MultiprocessEmitter, again when transport == TransportMode.SHARED_MEMORY, instead of pushing the actual frame data through a queue, we perform two steps:

  1. First, we use the sm_queue to send information about the shared memory block to the receiver:
    # First time: create buffer and send metadata
    if self._sm is None:
        self._expected_buf_size = buf_size
        self._sm = shared_memory.SharedMemory(create=True, size=buf_size)
        sm_metadata = (
            self._sm.name,
            buf_size,
            type(data),
            data.instantiation_params())
        self.sm_queue.put(sm_metadata)      # send SM metadata
    
  2. After that, each time a new frame is written into the shared memory block, we update the shared flag up_value=True. This signals the receiver that a fresh frame is available and should be read directly from shared memory.

In this way, only small control messages travel through queues, while large data payloads remain in shared memory.

# Write data with lock
with self.lock:
    if not isinstance(data, NumpySMAdapter):
        raise ValueError(" Trying to send not SMComplient data to shared memory")
    data.set_to_buffer(self._sm.buf)
    self.ts_value.value = int(ts)
    self.up_value.value = True

Conclusion

Below is the complete Python implementation of the robotics loop described in this article. The code demonstrates how cooperative scheduling and multiprocessing can be combined to efficiently manage lightweight sensors and controllers alongside high-demand components such as camera pipelines, using shared memory for low-overhead data exchange.

Full Python Implementation

Ready to use Robotics
to improve your business?
Fill out the form below to tell us about your project.
We'll contact you promptly to discuss your needs.
We received your message!
Thank you!