Source code for flatland.envs.rewards
from collections import defaultdict
from flatland.envs.agent_utils import EnvAgent
from flatland.envs.distance_map import DistanceMap
from flatland.envs.step_utils.states import TrainState
[docs]
class Rewards:
"""
Reward Function:
It costs each agent a step_penalty for every time-step taken in the environment. Independent of the movement
of the agent. Currently all other penalties such as penalty for stopping, starting and invalid actions are set to 0.
alpha = 0
beta = 0
Reward function parameters:
- invalid_action_penalty = 0
- step_penalty = -alpha
- global_reward = beta
- epsilon = avoid rounding errors
- stop_penalty = 0 # penalty for stopping a moving agent
- start_penalty = 0 # penalty for starting a stopped agent
- intermediate_not_served_penalty = -1
- intermediate_late_arrival_penalty_factor = 0.2
- intermediate_early_departure_penalty_factor = 0.5
"""
# Epsilon to avoid rounding errors
epsilon = 0.01
# NEW : REW: Sparse Reward
alpha = 0
beta = 0
step_penalty = -1 * alpha
global_reward = 1 * beta
invalid_action_penalty = 0 # previously -2; GIACOMO: we decided that invalid actions will carry no penalty
stop_penalty = 0 # penalty for stopping a moving agent
start_penalty = 0 # penalty for starting a stopped agent
cancellation_factor = 1
cancellation_time_buffer = 0
intermediate_not_served_penalty = -1
intermediate_late_arrival_penalty_factor = 0.2
intermediate_early_departure_penalty_factor = 0.5
def __init__(self):
# https://stackoverflow.com/questions/16439301/cant-pickle-defaultdict
self.arrivals = defaultdict(defaultdict)
self.departures = defaultdict(defaultdict)
[docs]
def step_reward(self, agent: EnvAgent, distance_map: DistanceMap, elapsed_steps: int):
"""
Handles end-of-step-reward for a particular agent.
Parameters
----------
agent: EnvAgent
distance_map: DistanceMap
elapsed_steps: int
"""
if agent.position not in self.arrivals[agent.handle]:
self.arrivals[agent.handle][agent.position] = elapsed_steps
self.departures[agent.handle][agent.old_position] = elapsed_steps
return 0
[docs]
def end_of_episode_reward(self, agent: EnvAgent, distance_map: DistanceMap, elapsed_steps: int) -> int:
"""
Handles end-of-episode reward for a particular agent.
Parameters
----------
agent: EnvAgent
distance_map: DistanceMap
elapsed_steps: int
"""
reward = None
# agent done? (arrival_time is not None)
if agent.state == TrainState.DONE:
# if agent arrived earlier or on time = 0
# if agent arrived later = -ve reward based on how late
reward = min(agent.latest_arrival - agent.arrival_time, 0)
# Agents not done (arrival_time is None)
else:
# CANCELLED check (never departed)
if (agent.state.is_off_map_state()):
reward = -1 * self.cancellation_factor * \
(agent.get_travel_time_on_shortest_path(distance_map) + self.cancellation_time_buffer)
# Departed but never reached
if (agent.state.is_on_map_state()):
reward = agent.get_current_delay(elapsed_steps, distance_map)
for et, la, ed in zip(agent.waypoints[1:-1], agent.waypoints_latest_arrival[1:-1], agent.waypoints_earliest_departure[1:-1]):
if et not in self.arrivals[agent.handle]:
reward += self.intermediate_not_served_penalty
else:
reward += self.intermediate_late_arrival_penalty_factor * min(la - self.arrivals[agent.handle][et], 0)
# if arrival but not departure, handled by above by departed but never reached.
if et in self.departures[agent.handle]:
reward += self.intermediate_early_departure_penalty_factor * min(self.departures[agent.handle][et] - ed, 0)
return reward