Source code for flatland.envs.graph.rail_graph_transition_map

"""
A Flatland (microscopic) topology can be represented by different kinds of graphs.
The topology must reflect the possible paths through the rail network - it must not be possible to traverse a switch in the acute angle.
With the help of the graph it is very easy to calculate the shortest connection from node A to node B. The API makes it possible to solve such tasks very efficiently. Moreover, the graph can be simplified so that only decision-relevant nodes remain in the graph and all other nodes are merged. A decision node is a node or flatland cell (track) that reasonably allows the agent to stop, go, or branch off. For straight track edges within a route, it makes little sense to wait in many situations. This is because the agent would block many resources, i.e., if an agent does not drive to the decision point: a cell before a crossing, the agent blocks the area in between. This makes little sense from an optimization point of view.

Two (dual, equivalent) approaches are possible:
- agents are positioned on the nodes
- agents are positioned on the edges.
The second approach makes it easier to visualize agents moving forward on edges. Hence, we choose the second approach.

Our directed graph consists of nodes and edges:
* A node in the graph is defined by position and direction. The position corresponds to the position of the underlying cell in the original flatland topology, and the direction corresponds to the direction in which an agent reaches the cell. Thus, the node is defined by (r, c, d), where c (column) is the index of the horizontal cell grid position, r (row) is the index of the vertical cell grid position, and d (direction) is the direction of cell entry. In the Flatland (2d grid), not every of the eight neighbor cells can be reached from every direction. Therefore, the entry direction information is key.
* An edge is defined by "from-node" u and "to-node" v such that for the edge e = (u, v).  Edges reflect feasible transition from node u to node v. We can think of the suggestive notation $[u,v)$ in terms of resource occupation of the underlying cell, as the "from-node" refers to the underlying cell entered, and the "to-node" refers to the neighbor cell entered when the edge is left.

The implementation uses networkX, so there are also many graph functions available.

References:
- Egli, Adrian. FlatlandGraphBuilder. https://github.com/aiAdrian/flatland_railway_extension/blob/e2b15bdd851ad32fb26c1a53f04621a3ca38fc00/flatland_railway_extension/FlatlandGraphBuilder.py
- Nygren, E., Eichenberger, Ch., Frejinger, E. Scope Restriction for Scalable Real-Time Railway Rescheduling: An Exploratory Study. https://arxiv.org/abs/2305.03574
"""
from collections import defaultdict
from typing import Tuple

import networkx as nx

from flatland.core.grid.grid4_utils import get_new_position
from flatland.core.transition_map import GridTransitionMap, TransitionMap
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_env_action import RailEnvActions

GridNode = Tuple[Tuple[int, int], int]
GridEdge = Tuple[GridNode, GridNode]

[docs] class GraphTransitionMap(TransitionMap[GridNode, GridEdge, bool, RailEnvActions]): """ Flatland 3 Transition map represented by a directed graph. The grid transition map contains for all cells a set of pairs (heading at cell entry, heading at cell exit). E.g. horizontal straight is {(E,E), (W,W)}. The directed graph's nodes are entry pins (cell + plus heading at entry). Edges always go from entry pin at one cell to entry pin of a neighboring cell. The outgoing heading for the grid transition map is the incoming heading at a neighboring cell. Incoming heading: S | E >---+---< W | ^ N Outgoing heading (=incoming at neighbor cell): N (of cell-to-the-north) ^ | E <---+---> E (of cell-to-the-east) (of cell-to- | the-east) ⌄ S (of cell-to-the-south) Attributes ---------- g: nx.DiGraph """ def __init__(self, g: nx.DiGraph): self.g = g self.cell_in_pins = defaultdict(lambda: set())
[docs] @staticmethod def grid_to_digraph(transition_map: GridTransitionMap) -> nx.DiGraph: """ The graph representation of a grid transition map. Parameters ---------- transition_map Returns ------- """ g = nx.DiGraph() for r in range(transition_map.height): for c in range(transition_map.width): for d in range(4): possible_transitions = transition_map.get_transitions(((r, c), d)) for new_direction in range(4): if possible_transitions[new_direction]: new_position = get_new_position((r, c), new_direction) g.add_edge((r, c, d), (*new_position, new_direction)) return g
[docs] @staticmethod def from_rail_env(env: RailEnv) -> "GraphTransitionMap": """ Factory method to create a graph transition map from a rail env. Parameters ---------- env: RailEnv Returns ------- GraphTransitionMap The graph transition map. """ return GraphTransitionMap(GraphTransitionMap.grid_to_digraph(env.rail))
[docs] def check_action_on_agent(self, action: RailEnvActions, configuration: GridNode) -> Tuple[ bool, GridNode, bool, RailEnvActions]: position, direction = configuration new_position = None new_direction, transition_valid, preprocessed_action = direction, True, action n = (*position, direction) succs = list(self.g.successors(n)) assert 1 <= len(succs) <= 2 if len(succs) == 1: succ = list(succs)[0] r, c, d = succ new_position = r, c new_direction = d else: if action == RailEnvActions.MOVE_LEFT: # find new_direction = (direction - 1) % 4 for r, c, d in succs: if d == new_direction: new_position = r, c break elif action == RailEnvActions.MOVE_RIGHT: # find new_direction = (direction + 1) % 4 for r, c, d in succs: if d == new_direction: new_position = r, c break if new_position is None: new_direction = direction for r, c, d in succs: if d == new_direction: new_position = r, c break assert new_position is not None transition_valid = True if action == RailEnvActions.MOVE_LEFT and new_direction != ((direction - 1) % 4): transition_valid = False preprocessed_action = RailEnvActions.MOVE_FORWARD elif action == RailEnvActions.MOVE_RIGHT and new_direction != ((direction + 1) % 4): transition_valid = False preprocessed_action = RailEnvActions.MOVE_FORWARD new_cell_valid = (*new_position, new_direction) in self.g.nodes return new_cell_valid, (new_position, new_direction), transition_valid, preprocessed_action
[docs] def get_transitions(self, configuration: GridNode) -> Tuple[bool]: return True,