In the previous article, I highlighted how it is possible to organize a cooperative loop in robotics — that is, how to coordinate sensor–controller work within a single process without excessive CPU overruns by using generators in Python.
Here, I will try to scale this approach up a little and demonstrate how a larger system might be designed by utilizing both a cooperative loop (for controllers and non-demanding sensors) and a multiprocessing setup, where demanding sensors such as a camera run in a separate process and send data via a shared variable or a shared memory block.
In principle, the camera could also use a multiprocessing queue. However, since it sends a large amount of data every second, the overhead of using a queue would be enormous. Therefore, using shared memory is a more efficient way to handle such a high data throughput between processes.
By the end of this article, you will see an example of how to design and implement a hybrid robotics processing loop that combines cooperative scheduling for lightweight tasks with multiprocessing for high-load sensors, and how to use shared memory to efficiently transfer large data streams between processes.

At a high level, this system creates sensors and controllers, connects them through local or multiprocessing pipes, and executes their generator loops either cooperatively in the main process or in parallel using background processes.
The general way to build such a system is similar to the cooperative scheduling schema described earlier.
world.local_pipe or a world.mp_pipe (multiprocessing pipe), depending on whether we want the sensor to run in the main thread or in a separate process.sensor_gen_fn() - for emitting sensorscontroller_gen_fn() - for receiving controllersemitting_loops — contains (sensor_gen_fn, (emitter, sensor)).controller_loop — contains (controller_gen_fn, (receivers,)), where all receivers are grouped together because we use only one controller to receive all signals and run them using cooperative scheduling.cooperative_loops and bg_loops.world.run(cooperative_loops, bg_loops)clk = Clock() # we might use a custom clock
world = World(clk)
# create pipes between sensor and controller (emitter -> receiver)
pipes: list[Tuple[Sensor, SignalEmitter, SignalReceiver]] = []
for sensor in sensors:
if PARALLELISM_TYPE == ParallelismType.LOCAL:
emitter, receiver = world.local_pipe()
else:
emitter, receiver = world.mp_pipe(sensor.transport)
pipes.append((sensor, emitter, receiver))
# list of necessary emitting loops
emitting_loops = [
(sensor_gen_fn, (emitter, sensor))
for sensor, emitter, _ in pipes
]
# Single controller loop with all receivers in it (because it is always a cooperative loop)
receivers = [(sensor, receiver) for sensor, _, receiver in pipes]
controller_loop = [
(controller_gen_fn, (receivers,))
]
# decide what to run in background or foreground
if PARALLELISM_TYPE == ParallelismType.LOCAL:
cooperative_loops = emitting_loops + controller_loop
bg_loops = [] # No background processes
elif PARALLELISM_TYPE == ParallelismType.MULTIPROCESS:
cooperative_loops = controller_loop
bg_loops = emitting_loops
else:
raise ValueError(" Wrong parallelism type")
# START simulation - run sensor robotics_control_loop in background and controllers loop cooperatively
world.run_cooperative(cooperative_loops, bg_loops)
Now let’s outline how the system works using this schema.

world.run()Recall that cooperative scheduling is a way to emulate the parallel execution of multiple sensors and controllers within the same process. It is implemented using generators inside each sensor or controller loop, so the loop does not spin continuously. Instead, after one cycle each generator yields control back to a central cooperative scheduler, which plans the execution and sleep times for all generators together.
In some cases like camera it makes sense to run sensor in a separate process because we have either to much data to send or sensor consume too much resources. In this case it is not cooperative scheduling anymore, because we explicitly sleep inside this process if necessary.
In some cases, such as with a camera sensor, it makes sense to run the sensor in a separate process. This is usually done either because the amount of data to be transmitted is too large or because the sensor itself consumes too many computational resources. In this scenario, it is not cooperative scheduling anymore, since the process explicitly sleeps when necessary and runs fully independently. It is a blocking sleep now.
Both approaches are combined in the world.run() method.
First, the processes designated to run in separate background processes are started:
### Start background interprocess loops
for background_fn, args in bg_loops:
pr = mp.Process(target=_bg_wrapper_loop, args=(background_fn, self._stop_event, *args))
pr.start()
self.background_processes.append(pr)
print(f"[World] Started background process for {args[1].sensor_id}, {args[1].transport.name}")
Then, the foreground sensors and controllers are executed using cooperative scheduling within the main process:
Note that a priority queue implemented with a heap is used to always pop the generator with the sleep time closest to the current moment. A counter is added to avoid a so-called tie — a situation where two generators have the exact same scheduled execution time.
Also note that the cooperative scheduler sleeps only when necessary, and all sensors operate without blocking one another.
generators = [
cooperative_fn(self._stop_event, *args)
for cooperative_fn, args in fg_loops
]
priority_queue = []
counter = 0
clock = self.clock
now = clock.now_ns()
# First scheduling: start all generators immediately
for gen in generators:
priority_queue.append((now, counter, gen))
counter += 1
heapq.heapify(priority_queue)
###################### Cooperative Loop
while priority_queue and not self._stop_event.is_set():
# get the closest scheduled gen
next_time, _, gen = heapq.heappop(priority_queue)
# wait until its scheduled execution time
# note: we are not spinning the loop if no reading were scheduled - optimal!
current = clock.now_ns()
wait_ns = max(0, next_time - current)
if wait_ns > 0:
time.sleep(wait_ns / 1e9)
command = next(gen) # advance current generator (read sensor)
if not isinstance(command, Sleep):
raise ValueError(f"Unexpected command {command}")
# reschedule this generator based on yielded sleep
run_at = clock.now_ns() + int(command.seconds * 1e9)
heapq.heappush(priority_queue,(run_at, counter, gen))
counter += 1
#######################
When we run a sensor generator (for example, a camera) in a separate process, we can use a shared queue to send its data from emitter to receiver. For interprocess communication, this would typically be a multiprocessing.Queue.
Queues are convenient and safe for passing small messages but introduce significant serialization and copying overhead for large data streams such as camera frames.
A much more efficient approach is to use a dedicated block of memory — shared memory — to exchange data directly between two processes (the sensor-emitter and the controller-receiver).
In this codebase, shared-memory handling functionality is implemented primarily inside:
mp_pipe(self, transport: TransportMode)MultiprocessEmitter / MultiprocessReceiverBriefly, within mp_pipe, when transport == TransportMode.SHARED_MEMORY, we create several additional shared memory primitives, such as:
sm_queue — a special dedicated queue used to pass metadata about the shared memory block.up_value — a synchronized flag that indicates when the shared memory block contains a new, fresh value.We also create a special kind of multiprocess emitter and receiver:
if transport == TransportMode.SHARED_MEMORY:
# Create SM primitives
lock = self._manager.Lock()
ts_value = self._manager.Value('Q', -1)
up_value = self._manager.Value('b', False) # a flag that SM block has a new FRESH value
sm_queue = self._manager.Queue() # a separate queue to send Shared Memory metadata
emitter = MultiprocessEmitter(
transport, message_queue, self.clock,
lock, ts_value, up_value, sm_queue
)
receiver = MultiprocessReceiver(
transport, message_queue,
lock, ts_value, up_value, sm_queue
Then, inside MultiprocessEmitter, again when transport == TransportMode.SHARED_MEMORY, instead of pushing the actual frame data through a queue, we perform two steps:
sm_queue to send information about the shared memory block to the receiver:
# First time: create buffer and send metadata
if self._sm is None:
self._expected_buf_size = buf_size
self._sm = shared_memory.SharedMemory(create=True, size=buf_size)
sm_metadata = (
self._sm.name,
buf_size,
type(data),
data.instantiation_params())
self.sm_queue.put(sm_metadata) # send SM metadata
up_value=True. This signals the receiver that a fresh frame is available and should be read directly from shared memory.In this way, only small control messages travel through queues, while large data payloads remain in shared memory.
# Write data with lock
with self.lock:
if not isinstance(data, NumpySMAdapter):
raise ValueError(" Trying to send not SMComplient data to shared memory")
data.set_to_buffer(self._sm.buf)
self.ts_value.value = int(ts)
self.up_value.value = True
Below is the complete Python implementation of the robotics loop described in this article. The code demonstrates how cooperative scheduling and multiprocessing can be combined to efficiently manage lightweight sensors and controllers alongside high-demand components such as camera pipelines, using shared memory for low-overhead data exchange.
"""
Further development of cooperative scheduling loop.
Here it has been added:
- running sensors in a separate process
- interprocess shared memory and queue transport
"""
from abc import ABC, abstractmethod
from collections import deque
from dataclasses import dataclass
from enum import IntEnum
import multiprocessing as mp
from multiprocessing import resource_tracker, shared_memory
from queue import Empty, Full
import random
import time
from typing import Any, Generic, List, Tuple, TypeVar
import numpy as np
from shared_memory import SMCompliant, NumpySMAdapter
T = TypeVar("T")
class TransportMode(IntEnum):
QUEUE = 1
SHARED_MEMORY = 2
class ParallelismType(IntEnum):
LOCAL = 0
MULTIPROCESS = 2
@dataclass
class Message(Generic[T]):
data: T
ts: float
updated: bool = True # is data new since the last read? Set by receiver
@dataclass
class Sleep:
seconds: float
class Clock:
def now_ns(self) -> int:
return time.time_ns()
### Define sensors
class Sensor(ABC):
def __init__(self, sensor_id: str, transport: TransportMode, interval: float):
self.sensor_id = sensor_id
self.transport = transport
self.interval = interval
@abstractmethod
def read(self) -> Any:
"""Read sensor value."""
pass
class TemperatureSensor(Sensor):
def __init__(self, transport: TransportMode, interval: float):
super().__init__(sensor_id="temp_sensor", transport=transport, interval=interval)
self.unit = "В°C"
def read(self) -> float:
return round(20 + random.uniform(-2, 5), 2)
class CloudinessSensor(Sensor):
def __init__(self, transport, interval: float):
super().__init__(sensor_id="cloudy_sensor", transport=transport, interval=interval)
def read(self) -> str:
return random.choice(["Clear", "Partly Cloudy", "Cloudy", "Rain"])
class CameraSensor(Sensor):
def __init__(self, transport: TransportMode, interval: float):
super().__init__(sensor_id="camera", transport=transport, interval=interval)
self.width = 200
self.height = 320
self.unit = "frame"
def read(self) -> np.ndarray:
frame = np.random.randint(0, 256, (self.height, self.width, 3), dtype=np.uint8)
return frame
######### Emitters / Receivers
class SignalEmitter(Generic[T]):
def emit(self, data: T) -> bool:
"""Add data to a queue as a Message"""
...
class SignalReceiver(Generic[T]):
def read(self) -> Message[T] | None:
"""Returns next message, otherwise last value. None if nothing was read yet."""
...
class LocalQueueEmitter(SignalEmitter[T]):
def __init__(self, queue: deque, clock: Clock):
self.queue = queue
self.clock = clock
def emit(self, data: T):
msg = Message(data=data, ts=self.clock.now_ns()) # TODO: better get it from World as parameter
self.queue.append(msg)
print_friendly_data = msg.data if not isinstance(msg.data, np.ndarray) else msg.data[1][0,0].tolist()
print(f"[Local Emitter] Emitted |{print_friendly_data}| at {msg.ts:.2f}\n")
return True
class LocalQueueReceiver(SignalReceiver[T]):
def __init__(self, queue: deque):
self.queue = queue
self.last_msg = None # if no new data, emit the last message
def read(self) -> Message[T] | None:
if self.queue:
self.last_msg = self.queue.popleft()
return self.last_msg
# Multiprocess emitter/receiver using multiprocessing.Queue
class MultiprocessEmitter(SignalEmitter[T]):
"""Emitter for inter-process communication."""
def __init__(self,
transport: TransportMode,
queue: mp.Queue, # use for QUEUE transport
clock: Clock,
lock: mp.Lock = None,
ts_value: mp.Value = None, # time of emitting fresh value
up_value: mp.Value = None, # for SM: flag of fresh SM block
sm_queue: mp.Queue = None): # for SM: SM metadata queue
self.transport = transport
self.queue = queue
self.clock = clock
# Shared memory primitives and state
self.lock = lock
self.ts_value = ts_value
self.up_value = up_value
self.sm_queue = sm_queue
self._sm: shared_memory.SharedMemory | None = None
self._expected_buf_size: int | None = None
def _emit_queue(self, data: T, ts: float) -> bool:
"""
Send via regular queue.
Not too efficient for camera data, every frame must go via queue
"""
msg = Message(data=data, ts=ts)
try:
self.queue.put_nowait(msg)
print("[Multiprocess Emitter]: Data emitted to QUEUE")
return True
except Full:
try:
self.queue.get_nowait() # drop the oldest of queue is full
self.queue.put_nowait(msg)
return True
except (Empty, Full):
return False
def _emit_shared_memory(self, data: SMCompliant, ts: float) -> bool:
"""Send via shared memory, marking all receivers as needing update."""
assert isinstance(data, SMCompliant), f"SHARED_MEMORY mode requires SMCompliant data, got {type(data)}"
buf_size = data.buf_size()
# First time: create buffer and send metadata
if self._sm is None:
self._expected_buf_size = buf_size
self._sm = shared_memory.SharedMemory(create=True, size=buf_size)
sm_metadata = (
self._sm.name,
buf_size,
type(data),
data.instantiation_params())
self.sm_queue.put(sm_metadata) # send SM metadata
print(f" [Multiprocess Emitter] Created SM buffer and sent its metadata: {self._sm.name}, size={buf_size}")
assert buf_size == self._expected_buf_size, f"Buffer size changed: {buf_size} != {self._expected_buf_size}"
# Write data with lock
with self.lock:
if not isinstance(data, NumpySMAdapter):
raise ValueError(" Trying to send not SMComplient data to shared memory")
data.set_to_buffer(self._sm.buf)
self.ts_value.value = int(ts)
self.up_value.value = True
print("[Multiprocess Emitter]: Data emitted to Shared Memory")
return True
def emit(self, data: T) -> None:
ts = self.clock.now_ns()
if self.transport == TransportMode.SHARED_MEMORY:
data_sm = NumpySMAdapter.lazy_init(data)
self._emit_shared_memory(data_sm, ts)
else:
self._emit_queue(data, ts)
def close(self) -> None:
"""Clean up shared memory."""
if self._sm is not None:
try:
self._sm.close() # stop using it
self._sm.unlink() # tell OS to delete the block
except:
pass
class MultiprocessReceiver(SignalReceiver[T]):
def __init__(self,
transport: TransportMode,
queue: mp.Queue,
lock: mp.Lock = None,
ts_value: mp.Value = None,
up_value: mp.Value = None,
sm_queue: mp.Queue = None):
self.transport = transport
self.queue = queue
# Shared memory primitives and state
self.lock = lock
self.ts_value = ts_value
self.up_value = up_value
self.sm_queue = sm_queue
self._sm: shared_memory.SharedMemory | None = None
self._out_value: SMCompliant | None = None # buffer that will hold the latest data read from shared memory
self._readonly_buffer: memoryview | None = None
self.last_msg: Message[T] | None = None
def _ensure_shared_memory_initialized(self) -> bool:
"""
Lazy initialization from metadata queue.
lazily attaches to a shared memory block created by another process.
"""
# is SM already attached?
if self._out_value is not None:
return True
# Reads metadata from a queue
try:
# data_type: class that interprets the bytes, NumpySMAdapter
# instantiation_params: size of numpy array
sm_name, buf_size, data_type, instantiation_params = self.sm_queue.get_nowait()
except Empty:
return False
# Attach Receiver to existing shared memory
self._sm = shared_memory.SharedMemory(name=sm_name)
# The creator process is responsible for unlinking it, not the receiver
try:
resource_tracker.unregister(self._sm._name, 'shared_memory')
except:
pass
# in case buffer is slightly larger than requested;
self._readonly_buffer = self._sm.buf.toreadonly()[:buf_size]
self._out_value = data_type(*instantiation_params)
print(f"[Receiver] Attached to SM: {sm_name}")
return True
def _read_queue(self) -> Message[T] | None:
"""Read from regular queue."""
try:
self.last_msg = self.queue.get_nowait()
if self.last_msg:
self.last_msg.updated = True
return self.last_msg
except Empty:
if self.last_msg:
return Message(self.last_msg.data, self.last_msg.ts, False)
return None
def _read_shared_memory(self) -> Message[T] | None:
# Initialize if needed
if not self._ensure_shared_memory_initialized():
return None
# Read with lock
with self.lock:
if self.ts_value.value == -1:
return None
assert self._readonly_buffer is not None
assert self._out_value is not None
# read buffer
self._out_value.read_from_buffer(self._readonly_buffer)
# Determine if data is fresh
updated = self.up_value.value
self.up_value.value = False
return Message(
data=self._out_value,
ts=float(self.ts_value.value),
updated=updated
)
def read(self) -> Message[T] | None:
if self.transport == TransportMode.SHARED_MEMORY:
return self._read_shared_memory()
else:
return self._read_queue()
def close(self) -> None:
if self._readonly_buffer is not None:
self._readonly_buffer.release()
if self._sm is not None:
self._sm.close()
# ---------------------------------------------------------------------
# Control Loops
# ---------------------------------------------------------------------
# helper function to check stop condition
def should_stop(stop_event) -> bool:
if stop_event is None:
return False
elif hasattr(stop_event, 'is_set'):
return stop_event.is_set()
else:
return stop_event
def sensor_gen_fn(stop_event: mp.Event, emitter: SignalEmitter, sensor: Sensor):
"""
Generator (NOTE: it cannot be sent to a thread because of serialization issue, needs a wrapper!)
- permanently read one sensor and emit its reading into queue or shared memory
- return a command to be executed in main cooperative loop (Sleep)
"""
while not should_stop(stop_event):
reading = sensor.read()
emitter.emit(reading)
yield Sleep(sensor.interval) # Give control back to the world
print("[Sensor] loop ended.", flush=True)
def controller_gen_fn(stop_event: mp.Event,
receivers: List[Tuple[Sensor, SignalReceiver]]):
"""Controller loop - reads from all receivers cooperatively, act then wait."""
while not should_stop(stop_event):
# read all sensors once
for sensor, receiver in receivers:
msg = receiver.read()
if msg:
value = msg.data
# Extract from adapter if Shared Memory
if isinstance(value, NumpySMAdapter):
value = value.array
display_value = value if not isinstance(value, np.ndarray) else f"array[0,0]={value[0, 0]}"
status = "FRESH" if msg.updated else "STALE"
print(f"[Controller] {sensor.sensor_id} received: {display_value} [{status}]", end='')
if msg.updated:
print(f" в†’ Action required based on {sensor.sensor_id} reading")
yield Sleep(5) # Give control back to the world - do all readings every 5 sec
print("[Controller] loop ended.")
def _bg_wrapper_loop(sensor_gen_fn, stop_event, *args):
"""
Needed because
- we need to loop generator inside a separate process
- we cannot run sensor_loop_gen in a separate process (serialization issue)
"""
generator_fn = sensor_gen_fn(stop_event, *args)
try:
for command in generator_fn:
if isinstance(command, Sleep):
# NOTE: this is Blocking Sleep - not cooperative scheduling!
time.sleep(command.seconds)
else:
raise ValueError(f'Unknown command: {command}')
except KeyboardInterrupt:
# silently handle in bg process
pass
except Exception:
print("Error in background process")
finally:
print(f"[Background] Stopping {sensor_gen_fn.__name__} with {args[1].sensor_id} inside")
stop_event.set()
from abc import ABC, abstractmethod
import numpy as np
### SM: Shared Memory Support
class SMCompliant(ABC):
"""Any data to be sent to shared memory should comply, i.e. be able to serialize"""
@abstractmethod
def buf_size(self) -> int:
pass
@abstractmethod
def set_to_buffer(self, buffer: memoryview | bytearray) -> None:
"""Serialize data to buffer."""
pass
@abstractmethod
def read_from_buffer(self, buffer: memoryview | bytes) -> None:
"""Deserialize data from buffer."""
pass
@abstractmethod
def instantiation_params(self) -> tuple:
pass
class NumpySMAdapter(SMCompliant):
"""
Adapter between a NumPy array and a shared-memory (SM) communication block.
It does not allocate SM, it only knows how to copy to/from a buffer
"""
def __init__(self, shape: tuple[int, ...], dtype: np.dtype):
"""Array to be mapped to SM block"""
self.array = np.empty(shape, dtype=dtype)
@staticmethod
def lazy_init(array: np.ndarray, adapter: 'NumpySMAdapter | None'=None) -> 'NumpySMAdapter':
"""Lazily initialize adapter and copy array data."""
if adapter is None:
adapter = NumpySMAdapter(array.shape, array.dtype)
adapter.array[:] = array # copy new data: equivalent to np.copyto(adapter.array, array)
return adapter
def instantiation_params(self) -> tuple:
""" Receiving class it to reconstruct the same array """
return (self.array.shape, self.array.dtype)
def buf_size(self) -> int:
return self.array.nbytes
def set_to_buffer(self, buffer: memoryview | bytearray) -> None:
# copy raw bytes into the shared memory buffer
buffer[:self.array.nbytes] = self.array.tobytes()
def read_from_buffer(self, buffer: memoryview | bytes) -> None:
# a temporary view pointing to the data from SM
self.array[:] = np.frombuffer(buffer[:self.array.nbytes], dtype=self.array.dtype).reshape(self.array.shape)
"""
- Ran every generator every iteration
- Blocked on time.sleep(command.seconds) inside each loop
- Did not coordinate sleep across tasks
- Could not determine the next earliest task to run
"""
from collections import deque
import heapq
import multiprocessing as mp
import time
from typing import Any, Generic, List, Tuple, TypeVar
from interprocess_shared_memory import (
Clock, Sleep, TransportMode, ParallelismType,
Sensor, CloudinessSensor, TemperatureSensor, CameraSensor,
SignalEmitter, SignalReceiver, LocalQueueReceiver, LocalQueueEmitter, MultiprocessEmitter, MultiprocessReceiver,
_bg_wrapper_loop, sensor_gen_fn, controller_gen_fn
)
# ---------------------------------------------------------------------
class World:
""" Scheduler: orchestrating control loops """
def __init__(self, clock):
self._stop_event = mp.Event()
self.background_processes = []
self.clock = clock
self._manager = mp.Manager()
self._cleanup_resources = []
def local_pipe(self):
""" Create data queue and assign it to both emitter and receiver """
q = deque(maxlen=5)
emitter = LocalQueueEmitter(q, self.clock)
receiver = LocalQueueReceiver(q)
return emitter, receiver
# create interprocess pipe
def mp_pipe(self, transport: TransportMode):
message_queue = self._manager.Queue(maxsize=5)
if transport == TransportMode.SHARED_MEMORY:
# Create SM primitives
lock = self._manager.Lock()
ts_value = self._manager.Value('Q', -1)
up_value = self._manager.Value('b', False) # a flag that SM block has a new FRESH value
sm_queue = self._manager.Queue() # a separate queue to send Shared Memory metadata
emitter = MultiprocessEmitter(
transport, message_queue, self.clock,
lock, ts_value, up_value, sm_queue
)
receiver = MultiprocessReceiver(
transport, message_queue,
lock, ts_value, up_value, sm_queue
)
else: # TransportMode.QUEUE
emitter = MultiprocessEmitter(
transport, message_queue, self.clock
)
receiver = MultiprocessReceiver(
transport, message_queue
)
self._cleanup_resources.append((emitter, receiver)) # we need to clean shared memory at the end
return emitter, receiver
@property
def should_stop(self) -> bool:
return self._stop_event.is_set()
def run_blocking_sleep(self, fg_loops, bg_loops):
print(f"[world] is starting")
### Start background interprocess loops
for background_fn, args in bg_loops:
pr = mp.Process(target=_bg_wrapper_loop, args=(background_fn, self._stop_event, *args))
pr.start()
self.background_processes.append(pr)
print(f"[World] Started background process for {args[1].sensor_id}, {args[1].transport.name}")
#### Run main loop (cooperative scheduling - coroutines) inside the main process
try: # if KeyboardInterrupt stop all processes
generators = [
cooperative_fn(self._stop_event, *args)
for cooperative_fn, args in fg_loops
]
# Initialize scheduling of the next reading
all_next_times = {next_time: 0.0 for next_time in generators}
################
while not self._stop_event.is_set():
now = self.clock.now_ns()
for gen in generators:
if now >= all_next_times[gen]:
command = next(gen)
if isinstance(command, Sleep):
time.sleep(command.seconds)
else:
raise ValueError(f" Unexpected command {command}")
################
except KeyboardInterrupt:
pass
finally:
print("[World] Stopping... sending stop_event")
self._stop_event.set()
# Cleanup
for emitter, receiver in self._cleanup_resources:
emitter.close() # clean up shared memory
receiver.close()
print(" ... Shared memory released by all emitters and receivers")
print(" ... joining each bg process to let stop_event make effect")
for pr in self.background_processes:
pr.join()
def run_cooperative(self, fg_loops, bg_loops):
print(f"[world] is starting")
### Start background interprocess loops
for background_fn, args in bg_loops:
pr = mp.Process(target=_bg_wrapper_loop, args=(background_fn, self._stop_event, *args))
pr.start()
self.background_processes.append(pr)
print(f"[World] Started background process for {args[1].sensor_id}, {args[1].transport.name}")
#### Run foreground cooperative scheduling loop inside the main process
try: # if KeyboardInterrupt -> stop and clean all processes
generators = [
cooperative_fn(self._stop_event, *args)
for cooperative_fn, args in fg_loops
]
priority_queue = []
counter = 0
clock = self.clock
now = clock.now_ns()
# First scheduling: start all generators immediately
for gen in generators:
priority_queue.append((now, counter, gen))
counter += 1
heapq.heapify(priority_queue)
######################
while priority_queue and not self._stop_event.is_set():
# get the closest scheduled gen
next_time, _, gen = heapq.heappop(priority_queue)
# wait until its scheduled execution time
# note: we are not spinning the loop if no reading were scheduled - optimal!
current = clock.now_ns()
wait_ns = max(0, next_time - current)
if wait_ns > 0:
time.sleep(wait_ns / 1e9)
command = next(gen) # advance current generator (read sensor)
if not isinstance(command, Sleep):
raise ValueError(f"Unexpected command {command}")
# reschedule this generator based on yielded sleep
run_at = clock.now_ns() + int(command.seconds * 1e9)
heapq.heappush(priority_queue,(run_at, counter, gen))
counter += 1
#######################
except KeyboardInterrupt:
pass
finally:
print("[World] Stopping... sending stop_event")
self._stop_event.set()
# Cleanup
for emitter, receiver in self._cleanup_resources:
emitter.close() # clean up shared memory
receiver.close()
print(" ... Shared memory released by all emitters and receivers")
print(" ... joining each bg process to let stop_event make effect")
for pr in self.background_processes:
pr.join()
# ---------------------------------------------------------------------
# Demo
# ---------------------------------------------------------------------
if __name__ == "__main__":
PARALLELISM_TYPE = ParallelismType.MULTIPROCESS # LOCAL / MULTIPROCESS
sensors: list[Sensor] = [
TemperatureSensor(transport=TransportMode.QUEUE, interval=5.0),
CloudinessSensor(transport=TransportMode.QUEUE, interval=10.0),
#CameraSensor(transport=TransportMode.QUEUE, interval=1.0)
CameraSensor(transport=TransportMode.SHARED_MEMORY, interval=1.0)
]
# Sanity check: Ensure SM only used in MULTIPROCESS mode
if PARALLELISM_TYPE == ParallelismType.LOCAL and (
wrong_s := [s for s in sensors if s.transport==TransportMode.SHARED_MEMORY]):
raise ValueError(f"SHARED_MEMORY transport not allowed in LOCAL mode for {[s.sensor_id for s in wrong_s]}")
#### create World environment
clk = Clock() # we might use a custom clock
world = World(clk)
# create pipes between sensor and controller (emitter -> receiver)
pipes: list[Tuple[Sensor, SignalEmitter, SignalReceiver]] = []
for sensor in sensors:
if PARALLELISM_TYPE == ParallelismType.LOCAL:
emitter, receiver = world.local_pipe()
else:
emitter, receiver = world.mp_pipe(sensor.transport)
pipes.append((sensor, emitter, receiver))
# list of necessary emitting loops
emitting_loops = [
(sensor_gen_fn, (emitter, sensor))
for sensor, emitter, _ in pipes
]
# Single controller loop with all receivers in it (because it is always a cooperative loop)
receivers = [(sensor, receiver) for sensor, _, receiver in pipes]
controller_loop = [
(controller_gen_fn, (receivers,))
]
# decide what to run in background or foreground
if PARALLELISM_TYPE == ParallelismType.LOCAL:
cooperative_loops = emitting_loops + controller_loop
bg_loops = [] # No background processes
elif PARALLELISM_TYPE == ParallelismType.MULTIPROCESS:
cooperative_loops = controller_loop
bg_loops = emitting_loops
else:
raise ValueError(" Wrong parallelism type")
# START simulation - run sensor robotics_control_loop in background and controllers loop cooperatively
#world.run_blocking_sleep(cooperative_loops, bg_loops)
world.run_cooperative(cooperative_loops, bg_loops)