Source code for flatland.core.distance_map_walker

from collections import deque
from typing import List, Generic, TypeVar

from flatland.core.distance_map import AbstractDistanceMap
from flatland.core.transition_map import TransitionMap

UnderlyingDistanceMapType = TypeVar('UnderlyingDistanceMapType', bound=AbstractDistanceMap)
UnderlyingTransitionMapType = TypeVar('UnderlyingTransitionMapType', bound=TransitionMap)
UnderlyingConfigurationType = TypeVar('UnderlyingConfigurationType')


[docs] class DistanceMapWalker(Generic[UnderlyingDistanceMapType, UnderlyingTransitionMapType, UnderlyingConfigurationType]): """ Utility class to compute distance maps from each cell in the rail network (and each possible orientation within it) to each agent's target cell. """ def __init__(self, distance_map: AbstractDistanceMap): self.distance_map = distance_map def _distance_map_walker(self, rail: UnderlyingTransitionMapType, target_nr: int, target_configurations: List[UnderlyingConfigurationType]): """ Utility function to compute distance maps from each cell in the rail network (and each possible orientation within it) to each agent's target cell. Parameters ---------- target_configurations """ # Returns max distance to target, from the farthest away node, while filling in distance_map for target_configuration in target_configurations: self.distance_map._set_distance(target_configuration, target_nr, 0) # Fill in the (up to) 4 neighboring nodes # direction is the direction of movement, meaning that at least one possible orientation of an agent # in cell (row,col) allows a movement in direction `direction' nodes_queue = deque(x for xs in (self._get_and_update_neighbors(rail, tc, target_nr, 0) for tc in target_configurations) for x in xs) # BFS from target `position' to all the reachable nodes in the grid # Stop the search if the target position is re-visited, in any direction visited = set(target_configurations) max_distance = 0 while nodes_queue: configuration, distance = nodes_queue.popleft() if configuration not in visited: visited.add(configuration) # From the list of possible neighbors that have at least a path to the current node, only keep those # whose new orientation in the current cell would allow a transition to the configuration valid_neighbors = self._get_and_update_neighbors(rail, configuration, target_nr, distance) for n in valid_neighbors: nodes_queue.append(n) if len(valid_neighbors) > 0: max_distance = max(max_distance, distance + 1) return max_distance def _get_and_update_neighbors(self, rail: UnderlyingTransitionMapType, configuration: UnderlyingConfigurationType, target_nr: int, current_distance: int): """ Utility function used by _distance_map_walker to perform a BFS walk over the rail, filling in the minimum distances from each target cell. """ neighbors = [] for configuration in rail.get_predecessor_configurations(configuration): new_distance = min(self.distance_map._get_distance(configuration, target_nr), current_distance + 1) neighbors.append((configuration, new_distance)) self.distance_map._set_distance(configuration, target_nr, new_distance) return neighbors