Source code for flatland.envs.line_generators

"""Line generators: Railway Undertaking (RU) / Eisenbahnverkehrsunternehmen (EVU)."""
from typing import Tuple, List, Callable, Mapping, Optional, Any

from numpy.random.mtrand import RandomState

from flatland.core.grid.grid4 import Grid4TransitionsEnum
from flatland.core.grid.grid_utils import IntVector2DArray
from flatland.core.transition_map import GridTransitionMap
from flatland.envs import persistence
from flatland.envs.timetable_utils import Line

AgentPosition = Tuple[int, int]
LineGenerator = Callable[[GridTransitionMap, int, Optional[Any], Optional[int], Optional[RandomState]], Line]


[docs] def speed_initialization_helper(nb_agents: int, speed_ratio_map: Mapping[float, float] = None, np_random: RandomState = None) -> List[float]: """ Parameters ---------- nb_agents : int The number of agents to generate a speed for speed_ratio_map : Mapping[float,float] A map of speeds mapping to their ratio of appearance. The ratios must sum up to 1. np_random : RandomState Returns ------- List[float] A list of size nb_agents of speeds with the corresponding probabilistic ratios. """ if speed_ratio_map is None: return [1.0] * nb_agents nb_classes = len(speed_ratio_map.keys()) speed_ratio_map_as_list: List[Tuple[float, float]] = list(speed_ratio_map.items()) speed_ratios = list(map(lambda t: t[1], speed_ratio_map_as_list)) speeds = list(map(lambda t: t[0], speed_ratio_map_as_list)) return list(map(lambda index: speeds[index], np_random.choice(nb_classes, nb_agents, p=speed_ratios)))
[docs] class BaseLineGen(object): def __init__(self, speed_ratio_map: Mapping[float, float] = None, seed: int = 1, line_length: int = 2): self.speed_ratio_map = speed_ratio_map self.seed = seed self.line_length = line_length
[docs] def generate(self, rail: GridTransitionMap, num_agents: int, hints: dict = None, num_resets: int = 0, np_random: RandomState = None) -> Line: pass
def __call__(self, *args, **kwargs): return self.generate(*args, **kwargs)
[docs] def sparse_line_generator(speed_ratio_map: Mapping[float, float] = None, seed: int = 1, line_length: int = 2) -> LineGenerator: return SparseLineGen(speed_ratio_map, seed, line_length)
[docs] class SparseLineGen(BaseLineGen): def __init__(self, speed_ratio_map: Mapping[float, float] = None, seed: int = 1, line_length: int = 2): """ This is the line generator which is used for Round 2 of the Flatland challenge. It produces lines to railway networks provided by sparse_rail_generator. Parameters ---------- speed_ratio_map : Mapping[float, float] Speed ratios of all agents. They are probabilities of all different speeds and have to add up to 1. seed : int Initiate random seed generator line_length : int The length of the lines. """ super().__init__(speed_ratio_map, seed, line_length)
[docs] @staticmethod def decide_orientation(rail, start, target, possible_orientations, np_random: RandomState) -> int: feasible_orientations = [] for orientation in possible_orientations: if rail.check_path_exists(start[0], orientation, target[0]): feasible_orientations.append(orientation) if len(feasible_orientations) > 0: return np_random.choice(feasible_orientations) else: return 0
def _assign_station_in_start_and_target_city(self, hints: dict, rail: GridTransitionMap, city_start: int, city_target: int, np_random: RandomState): train_stations = hints['train_stations'] city_orientation = hints['city_orientations'] city_start_num_stations = len(train_stations[city_start]) city_target_num_stations = len(train_stations[city_target]) city_start_possible_orientations = [city_orientation[city_start], (city_orientation[city_start] + 2) % 4] agent_start_idx = (2 * np_random.randint(0, 10)) % city_start_num_stations agent_target_idx = ((2 * np_random.randint(0, 10)) + 1) % city_target_num_stations agent_start = train_stations[city_start][agent_start_idx] agent_target = train_stations[city_target][agent_target_idx] agent_orientation = self.decide_orientation( rail, agent_start, agent_target, city_start_possible_orientations, np_random) return agent_start, agent_orientation, agent_target
[docs] def generate(self, rail: GridTransitionMap, num_agents: int, hints: dict = None, num_resets: int = 0, np_random: RandomState = None) -> Line: """ Assigns tasks to all the agents. Parameters ---------- rail : GridTransitionMap Rail infrastructure given by the rail_generator num_agents : int Number of agents to include in the line hints : dict Hints provided by the rail_generator These include positions of start/target positions num_resets: int How often the generator has been reset. np_random : RandomState Returns ------- Line: the line """ _runtime_seed = self.seed + num_resets city_positions: IntVector2DArray = hints['city_positions'] # Place agents and targets within available train stations agent_positions = [] agent_targets = [] agents_directions = [] for agent_idx in range(num_agents): if agent_idx % 2 == 0: # Select line_length cities, find their num_stations and possible orientations city_idx: List[int] = np_random.choice(len(city_positions), self.line_length, replace=False) # Run a train in from city 0 to city -1 else: # Run a train in the opposite direction city -1 ...city 0 city_idx = list(reversed(city_idx)) cur_agent_orientations = [] cur_agent_positions = [] for city1, city2 in zip(city_idx, city_idx[1:]): cur_agent_start, cur_agent_orientation, cur_agent_target = self._assign_station_in_start_and_target_city(hints, rail, city1, city2, np_random) cur_agent_positions.append((cur_agent_start[0][0], cur_agent_start[0][1])) cur_agent_orientations.append(Grid4TransitionsEnum(cur_agent_orientation)) agent_positions.append(cur_agent_positions) agent_targets.append((cur_agent_target[0][0], cur_agent_target[0][1])) agents_directions.append(cur_agent_orientations) if self.speed_ratio_map: agent_speeds = speed_initialization_helper(num_agents, self.speed_ratio_map, np_random=np_random) else: agent_speeds = [1.0] * len(agent_positions) return Line(agent_positions=agent_positions, agent_directions=agents_directions, agent_targets=agent_targets, agent_speeds=agent_speeds)
[docs] def line_from_file(filename, load_from_package: str = None) -> LineGenerator: """ Utility to load pickle file Parameters ---------- filename : Pickle file generated by env.save() or editor load_from_package : str package Returns ------- Tuple[List[Tuple[int,int]], List[Tuple[int,int]], List[Tuple[int,int]], List[float]] initial positions, directions, targets speeds """ def generator(rail: GridTransitionMap, num_agents: int, hints: Any = None, num_resets: int = 0, np_random: RandomState = None) -> Line: env_dict = persistence.RailEnvPersister.load_env_dict(filename, load_from_package=load_from_package) agents = env_dict["agents"] # setup with loaded data agents_position = [[a.initial_position] for a in agents] agents_direction = [[a.initial_direction] for a in agents] agents_target = [a.target for a in agents] agents_speed = [a.speed_counter.speed for a in agents] return Line(agent_positions=agents_position, agent_directions=agents_direction, agent_targets=agents_target, agent_speeds=agents_speed) return generator