Source code for flatland.envs.timetable_generators
"""Timetable generators: Railway Undertaking (RU) / Eisenbahnverkehrsunternehmen (EVU)."""
from typing import List
import numpy as np
from numpy.random.mtrand import RandomState
from flatland.envs.agent_utils import EnvAgent
from flatland.envs.distance_map import DistanceMap
from flatland.envs.rail_env_shortest_paths import get_shortest_paths
from flatland.envs.timetable_utils import Timetable
[docs]
def timetable_generator(agents: List[EnvAgent], distance_map: DistanceMap,
agents_hints: dict, np_random: RandomState = None) -> Timetable:
"""
Calculates earliest departure and latest arrival times for the agents
This is the new addition in Flatland 3
Also calculates the max episodes steps based on the density of the timetable
inputs:
agents - List of all the agents rail_env.agents
distance_map - Distance map of positions to targets of each agent in each direction
agent_hints - Uses the number of cities
np_random - RNG state for seeding
returns:
Timetable with the latest_arrivals, earliest_departures and max_episdode_steps
"""
# max_episode_steps calculation
if agents_hints:
city_positions = agents_hints['city_positions']
num_cities = len(city_positions)
else:
num_cities = 2
timedelay_factor = 4
alpha = 2
num_agents = len(agents)
max_episode_steps = int(timedelay_factor * alpha * \
(distance_map.rail.width + distance_map.rail.height + (num_agents / num_cities)))
# Multipliers
old_max_episode_steps_multiplier = 3.0
new_max_episode_steps_multiplier = 1.5
travel_buffer_multiplier = 1.3 # must be strictly lesser than new_max_episode_steps_multiplier
assert new_max_episode_steps_multiplier > travel_buffer_multiplier
end_buffer_multiplier = 0.05
mean_shortest_path_multiplier = 0.2
if len(agents[0].waypoints) > 1:
# distance for intermediates parts and sum up
line_length = len(agents[0].waypoints) - 1
fake_agents = []
for i in range(line_length):
for a in agents:
waypoints = a.waypoints
fake_agents.append(EnvAgent(
handle=i * num_agents + a.handle,
initial_position=waypoints[i].position,
initial_direction=waypoints[i].direction,
position=waypoints[i].position,
direction=waypoints[i].direction,
target=waypoints[i + 1].position,
))
distance_map_with_intermediates = DistanceMap(fake_agents, distance_map.env_height, distance_map.env_width)
distance_map_with_intermediates.reset(fake_agents, distance_map.rail)
shortest_paths = get_shortest_paths(distance_map_with_intermediates)
shortest_path_segment_lengths = [[] for _ in range(num_agents)]
for k, v in shortest_paths.items():
shortest_path_segment_lengths[k % num_agents].append(len_handle_none(v))
shortest_paths_lengths = [sum(l) for l in shortest_path_segment_lengths]
else:
shortest_paths = get_shortest_paths(distance_map)
shortest_paths_lengths = [len_handle_none(v) for k, v in shortest_paths.items()]
shortest_path_segment_lengths = [[l] for l in shortest_paths_lengths]
# Find mean_shortest_path_time
agent_speeds = [agent.speed_counter.speed for agent in agents]
agent_shortest_path_times = np.array(shortest_paths_lengths) / np.array(agent_speeds)
mean_shortest_path_time = np.mean(agent_shortest_path_times)
# Deciding on a suitable max_episode_steps
longest_speed_normalized_time = np.max(agent_shortest_path_times)
mean_path_delay = mean_shortest_path_time * mean_shortest_path_multiplier
max_episode_steps_new = int(np.ceil(longest_speed_normalized_time * new_max_episode_steps_multiplier) + mean_path_delay)
max_episode_steps_old = int(max_episode_steps * old_max_episode_steps_multiplier)
max_episode_steps = min(max_episode_steps_new, max_episode_steps_old)
end_buffer = int(max_episode_steps * end_buffer_multiplier)
latest_arrival_max = max_episode_steps - end_buffer
earliest_departures = []
latest_arrivals = []
for agent in agents:
agent_shortest_path_time = agent_shortest_path_times[agent.handle]
agent_travel_time_max = int(np.ceil((agent_shortest_path_time * travel_buffer_multiplier) + mean_path_delay))
departure_window_max = max(latest_arrival_max - agent_travel_time_max, 1)
earliest_departure = np_random.randint(0, departure_window_max)
latest_arrival = earliest_departure + agent_travel_time_max
agent.earliest_departure = earliest_departure
agent.latest_arrival = latest_arrival
ed = earliest_departure
eds = [earliest_departure]
for l in shortest_path_segment_lengths[agent.handle]:
ed += l
eds.append(ed)
la = latest_arrival
las = [latest_arrival]
for l in reversed(shortest_path_segment_lengths[agent.handle]):
la -= l
las.insert(0, la)
eds[-1] = None
las[0] = None
earliest_departures.append(eds)
latest_arrivals.append(las)
return Timetable(earliest_departures=earliest_departures, latest_arrivals=latest_arrivals,
max_episode_steps=max_episode_steps)
[docs]
def ttgen_flatland2(agents: List[EnvAgent], distance_map: DistanceMap,
agents_hints: dict, np_random: RandomState = None) -> Timetable:
n_max_steps = 1000
return Timetable(
earliest_departures=[[0]] * len(agents),
latest_arrivals=[[n_max_steps]] * len(agents),
max_episode_steps=n_max_steps)