Source code for flatland.envs.rail_env_shortest_paths

import math
from typing import Dict, List, Optional, Tuple, Set

import matplotlib.pyplot as plt
import numpy as np

from flatland.core.grid.grid4 import Grid4TransitionsEnum
from flatland.core.grid.grid4_utils import get_new_position
from flatland.core.transition_map import GridTransitionMap
from flatland.envs.step_utils.states import TrainState
from flatland.envs.distance_map import DistanceMap
from flatland.envs.fast_methods import fast_count_nonzero
from flatland.envs.rail_env_action import RailEnvActions, RailEnvNextAction
from flatland.envs.rail_trainrun_data_structures import Waypoint
from flatland.utils.decorators import enable_infrastructure_lru_cache
from flatland.utils.ordered_set import OrderedSet


[docs] @enable_infrastructure_lru_cache() def get_valid_move_actions_(agent_direction: Grid4TransitionsEnum, agent_position: Tuple[int, int], rail: GridTransitionMap) -> Set[RailEnvNextAction]: """ Get the valid move actions (forward, left, right) for an agent. TODO The implementation could probably be more efficient and more elegant, but given the few calls this has no priority now. Parameters ---------- agent_direction : Grid4TransitionsEnum agent_position: Tuple[int,int] rail : GridTransitionMap Returns ------- Set of `RailEnvNextAction` (tuples of (action,position,direction)) Possible move actions (forward,left,right) and the next position/direction they lead to. It is not checked that the next cell is free. """ valid_actions: Set[RailEnvNextAction] = [] possible_transitions = rail.get_transitions(*agent_position, agent_direction) num_transitions = fast_count_nonzero(possible_transitions) # Start from the current orientation, and see which transitions are available; # organize them as [left, forward, right], relative to the current orientation # If only one transition is possible, the forward branch is aligned with it. if rail.is_dead_end(agent_position): action = RailEnvActions.MOVE_FORWARD exit_direction = (agent_direction + 2) % 4 if possible_transitions[exit_direction]: new_position = get_new_position(agent_position, exit_direction) valid_actions = [(RailEnvNextAction(action, new_position, exit_direction))] elif num_transitions == 1: action = RailEnvActions.MOVE_FORWARD for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: new_position = get_new_position(agent_position, new_direction) valid_actions = [(RailEnvNextAction(action, new_position, new_direction))] else: for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: if new_direction == agent_direction: action = RailEnvActions.MOVE_FORWARD elif new_direction == (agent_direction + 1) % 4: action = RailEnvActions.MOVE_RIGHT elif new_direction == (agent_direction - 1) % 4: action = RailEnvActions.MOVE_LEFT else: raise Exception("Illegal state") new_position = get_new_position(agent_position, new_direction) valid_actions.append(RailEnvNextAction(action, new_position, new_direction)) return valid_actions
[docs] @enable_infrastructure_lru_cache() def get_new_position_for_action( agent_position: Tuple[int, int], agent_direction: Grid4TransitionsEnum, action: RailEnvActions, rail: GridTransitionMap) -> Tuple[int, int, int]: """ Get the next position for this action. TODO The implementation could probably be more efficient and more elegant, but given the few calls this has no priority now. Parameters ---------- agent_position agent_direction action rail Returns ------- Tuple[int,int,int] row, column, direction """ possible_transitions = rail.get_transitions(*agent_position, agent_direction) num_transitions = fast_count_nonzero(possible_transitions) # Start from the current orientation, and see which transitions are available; # organize them as [left, forward, right], relative to the current orientation # If only one transition is possible, the forward branch is aligned with it. if rail.is_dead_end(agent_position): valid_action = RailEnvActions.MOVE_FORWARD exit_direction = (agent_direction + 2) % 4 if possible_transitions[exit_direction]: new_position = get_new_position(agent_position, exit_direction) if valid_action == action: return new_position, exit_direction elif num_transitions == 1: valid_action = RailEnvActions.MOVE_FORWARD for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: new_position = get_new_position(agent_position, new_direction) if valid_action == action: return new_position, new_direction else: for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: if new_direction == agent_direction: valid_action = RailEnvActions.MOVE_FORWARD if valid_action == action: new_position = get_new_position(agent_position, new_direction) return new_position, new_direction elif new_direction == (agent_direction + 1) % 4: valid_action = RailEnvActions.MOVE_RIGHT if valid_action == action: new_position = get_new_position(agent_position, new_direction) return new_position, new_direction elif new_direction == (agent_direction - 1) % 4: valid_action = RailEnvActions.MOVE_LEFT if valid_action == action: new_position = get_new_position(agent_position, new_direction) return new_position, new_direction
[docs] @enable_infrastructure_lru_cache(maxsize=4_000_000) def get_action_for_move( agent_position: Tuple[int, int], agent_direction: Grid4TransitionsEnum, next_agent_position: Tuple[int, int], next_agent_direction: int, rail: GridTransitionMap) -> Optional[RailEnvActions]: """ Get the action (if any) to move from a position and direction to another. TODO The implementation could probably be more efficient and more elegant, but given the few calls this has no priority now. Parameters ---------- agent_position agent_direction next_agent_position next_agent_direction rail Returns ------- Optional[RailEnvActions] the action (if direct transition possible) or None. """ possible_transitions = rail.get_transitions(*agent_position, agent_direction) num_transitions = fast_count_nonzero(possible_transitions) # Start from the current orientation, and see which transitions are available; # organize them as [left, forward, right], relative to the current orientation # If only one transition is possible, the forward branch is aligned with it. if rail.is_dead_end(agent_position): valid_action = RailEnvActions.MOVE_FORWARD new_direction = (agent_direction + 2) % 4 if possible_transitions[new_direction]: new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action elif num_transitions == 1: valid_action = RailEnvActions.MOVE_FORWARD for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action else: for new_direction in [(agent_direction + i) % 4 for i in range(-1, 2)]: if possible_transitions[new_direction]: if new_direction == agent_direction: valid_action = RailEnvActions.MOVE_FORWARD new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action elif new_direction == (agent_direction + 1) % 4: valid_action = RailEnvActions.MOVE_RIGHT new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action elif new_direction == (agent_direction - 1) % 4: valid_action = RailEnvActions.MOVE_LEFT new_position = get_new_position(agent_position, new_direction) if new_position == next_agent_position and new_direction == next_agent_direction: return valid_action
# N.B. get_shortest_paths is not part of distance_map since it refers to RailEnvActions (would lead to circularity!)
[docs] def get_shortest_paths(distance_map: DistanceMap, max_depth: Optional[int] = None, agent_handle: Optional[int] = None) \ -> Dict[int, Optional[List[Waypoint]]]: """ Computes the shortest path for each agent to its target and the action to be taken to do so. The paths are derived from a `DistanceMap`. If there is no path (rail disconnected), the path is given as None. The agent state (moving or not) and its speed are not taken into account example: agent_fixed_travel_paths = get_shortest_paths(env.distance_map, None, agent.handle) path = agent_fixed_travel_paths[agent.handle] Parameters ---------- distance_map : reference to the distance_map max_depth : max path length, if the shortest path is longer, it will be cutted agent_handle : if set, the shortest for agent.handle will be returned , otherwise for all agents Returns ------- Dict[int, Optional[List[WalkingElement]]] """ shortest_paths = dict() def _shortest_path_for_agent(agent): if agent.state.is_off_map_state(): position = agent.initial_position elif agent.state.is_on_map_state(): position = agent.position elif agent.state == TrainState.DONE: position = agent.target else: shortest_paths[agent.handle] = None return direction = agent.direction shortest_paths[agent.handle] = [] distance = math.inf depth = 0 while (position != agent.target and (max_depth is None or depth < max_depth)): next_actions = get_valid_move_actions_(direction, position, distance_map.rail) best_next_action = None for next_action in next_actions: next_action_distance = distance_map.get()[ agent.handle, next_action.next_position[0], next_action.next_position[ 1], next_action.next_direction] if next_action_distance < distance: best_next_action = next_action distance = next_action_distance shortest_paths[agent.handle].append(Waypoint(position, direction)) depth += 1 # if there is no way to continue, the rail must be disconnected! # (or distance map is incorrect) if best_next_action is None: shortest_paths[agent.handle] = None return position = best_next_action.next_position direction = best_next_action.next_direction if max_depth is None or depth < max_depth: shortest_paths[agent.handle].append(Waypoint(position, direction)) if agent_handle is not None: _shortest_path_for_agent(distance_map.agents[agent_handle]) else: for agent in distance_map.agents: _shortest_path_for_agent(agent) return shortest_paths
[docs] def get_k_shortest_paths(env, source_position: Tuple[int, int], source_direction: int, target_position=Tuple[int, int], k: int = 1, debug=False) -> List[Tuple[Waypoint]]: """ Computes the k shortest paths using modified Dijkstra following pseudo-code https://en.wikipedia.org/wiki/K_shortest_path_routing In contrast to the pseudo-code in wikipedia, we do not a allow for loopy paths. Parameters ---------- env : RailEnv source_position: Tuple[int,int] source_direction: int target_position: Tuple[int,int] k : int max number of shortest paths debug: bool print debug statements Returns ------- List[Tuple[WalkingElement]] We use tuples since we need the path elements to be hashable. We use a list of paths in order to keep the order of length. """ # P: set of shortest paths from s to t # P =empty, shortest_paths: List[Tuple[Waypoint]] = [] # countu: number of shortest paths found to node u # countu = 0, for all u in V count = {(r, c, d): 0 for r in range(env.height) for c in range(env.width) for d in range(4)} # B is a heap data structure containing paths # N.B. use OrderedSet to make result deterministic! heap: OrderedSet[Tuple[Waypoint]] = OrderedSet() # insert path Ps = {s} into B with cost 0 heap.add((Waypoint(source_position, source_direction),)) # while B is not empty and countt < K: while len(heap) > 0 and len(shortest_paths) < k: if debug: print("iteration heap={}, shortest_paths={}".format(heap, shortest_paths)) # – let Pu be the shortest cost path in B with cost C cost = np.inf pu = None for path in heap: if len(path) < cost: pu = path cost = len(path) u: Waypoint = pu[-1] if debug: print(" looking at pu={}".format(pu)) # – B = B − {Pu } heap.remove(pu) # – countu = countu + 1 urcd = (*u.position, u.direction) count[urcd] += 1 # – if u = t then P = P U {Pu} if u.position == target_position: if debug: print(" found of length {} {}".format(len(pu), pu)) shortest_paths.append(pu) # – if countu ≤ K then # CAVEAT: do not allow for loopy paths elif count[urcd] <= k: possible_transitions = env.rail.get_transitions(*urcd) if debug: print(" looking at neighbors of u={}, transitions are {}".format(u, possible_transitions)) # for each vertex v adjacent to u: for new_direction in range(4): if debug: print(" looking at new_direction={}".format(new_direction)) if possible_transitions[new_direction]: new_position = get_new_position(u.position, new_direction) if debug: print(" looking at neighbor v={}".format((*new_position, new_direction))) v = Waypoint(position=new_position, direction=new_direction) # CAVEAT: do not allow for loopy paths if v in pu: continue # – let Pv be a new path with cost C + w(u, v) formed by concatenating edge (u, v) to path Pu pv = pu + (v,) # – insert Pv into B heap.add(pv) # return P return shortest_paths
[docs] def visualize_distance_map(distance_map: DistanceMap, agent_handle: int = 0): if agent_handle >= distance_map.get().shape[0]: print("Error: agent_handle cannot be larger than actual number of agents") return # take min value of all 4 directions min_distance_map = np.min(distance_map.get(), axis=3) plt.imshow(min_distance_map[agent_handle][:][:]) plt.show()