Source code for flatland.envs.step_utils.speed_counter

SEGMENT_LENGTH = 1.0


[docs] class SpeedCounter: def __init__(self, speed: float, max_speed: float = None): self._speed = 1 / (round(1 / speed)) self._distance = 0.0 self._is_cell_entry = True if max_speed is not None: self._max_speed = max_speed else: # old constant speed behaviour self._max_speed = self._speed assert self._max_speed <= 1.0 assert self._speed <= self._max_speed assert self._speed >= 0.0 self.reset()
[docs] def step(self, speed: float = None): """ Step the speed counter. Parameters ---------- speed : float Set new speed effective immediately. """ if speed is not None: self._speed = max(min(speed, self._max_speed), 0.0) assert self._speed >= 0.0 assert self.speed <= 1.0 self._distance += self._speed # If trains cannot move to the next cell, they are in state stopped, so it's safe to apply modulo to reflect the distance travelled in the new cell! self._distance = self._distance % SEGMENT_LENGTH if self._distance < self._speed: self._is_cell_entry = True else: self._is_cell_entry = False
def __repr__(self): return f"speed: {self.speed} \ max_speed: {self.max_speed} \ distance: {self.distance} \ is_cell_entry: {self.is_cell_entry}"
[docs] def reset(self): self._distance = 0 self._is_cell_entry = True
@property def is_cell_entry(self): """ Have just entered the cell in the previous step? """ return self._is_cell_entry
[docs] def is_cell_exit(self, speed: float): """ With the given speed, do we exit cell at next time step? """ speed = max(min(speed, self._max_speed), 0.0) return self._distance + speed >= 1.0
@property def speed(self): return self._speed @property def max_speed(self): return self._max_speed @property def distance(self): """ Distance travelled in current cell. """ return self._distance def __getstate__(self): return { "speed": self._speed, "max_speed": self._max_speed, "distance": self._distance, "is_cell_entry": self._is_cell_entry, } def __setstate__(self, load_dict): if "_speed" in load_dict: # backwards compatibility self._speed = 1 / (round(1 / load_dict['_speed'])) else: self._speed = load_dict["speed"] if "counter" in load_dict: # old pickles have constant speed self._distance = load_dict['counter'] * self._speed self._is_cell_entry = load_dict['counter'] == 0 else: self._distance = load_dict['distance'] if "is_cell_entry" in load_dict: self._is_cell_entry = load_dict['distance'] if "max_speed" in load_dict: self._max_speed = load_dict["max_speed"] else: # old pickles have constant speed self._max_speed = self._speed def __eq__(self, other): if not isinstance(other, SpeedCounter): return False return self._speed == other._speed and self._distance == other._distance and self._max_speed == other._max_speed