Source code for flatland.envs.malfunction_generators

"""Malfunction generators for rail systems"""
from functools import lru_cache
from typing import Callable, NamedTuple, Optional, Tuple

import numpy as np
from numpy.random.mtrand import RandomState

from flatland.envs.agent_utils import EnvAgent
from flatland.envs.step_utils.states import TrainState
from flatland.envs import persistence

# why do we have both MalfunctionParameters and MalfunctionProcessData - they are both the same!
MalfunctionParameters = NamedTuple('MalfunctionParameters',
                                   [('malfunction_rate', float), ('min_duration', int), ('max_duration', int)])
MalfunctionProcessData = NamedTuple('MalfunctionProcessData',
                                    [('malfunction_rate', float), ('min_duration', int), ('max_duration', int)])

Malfunction = NamedTuple('Malfunction', [('num_broken_steps', int)])

# Why is the return value Optional?  We always return a Malfunction.
MalfunctionGenerator = Callable[[RandomState, bool], Malfunction]

NBR_CHACHED_RAND = 1_000_000


@lru_cache()
def _malfunction_prob(rate: float) -> float:
    """
    Probability of a single agent to break. According to Poisson process with given rate
    :param rate:
    :return:
    """
    if rate <= 0:
        return 0.
    else:
        return 1 - np.exp(-rate)


@lru_cache()
def _make_Malfunction_object(nbr) -> Malfunction:
    return Malfunction(nbr)


[docs] class ParamMalfunctionGen(object): """ Preserving old behaviour of using MalfunctionParameters for constructor, but returning MalfunctionProcessData in get_process_data. Data structure and content is the same. """ def __init__(self, parameters: MalfunctionParameters): # self.mean_malfunction_rate = parameters.malfunction_rate # self.min_number_of_steps_broken = parameters.min_duration # self.max_number_of_steps_broken = parameters.max_duration self.MFP = parameters self._cached_rand = None self._rand_idx = 0
[docs] def generate_rand_numbers(self, np_random: RandomState): if self._cached_rand is None: self._cached_rand = np_random.rand(NBR_CHACHED_RAND) self._rand_idx += 1 rnd = self._cached_rand[self._rand_idx % NBR_CHACHED_RAND] return rnd
[docs] def generate(self, np_random: RandomState) -> Malfunction: if self.generate_rand_numbers(np_random) < _malfunction_prob(self.MFP.malfunction_rate): num_broken_steps = np_random.randint(self.MFP.min_duration, self.MFP.max_duration + 1) + 1 else: num_broken_steps = 0 return _make_Malfunction_object(num_broken_steps)
[docs] def get_process_data(self): return MalfunctionProcessData(*self.MFP)
[docs] class NoMalfunctionGen(ParamMalfunctionGen): def __init__(self): super().__init__(MalfunctionParameters(0, 0, 0))
[docs] class FileMalfunctionGen(ParamMalfunctionGen): def __init__(self, env_dict=None, filename=None, load_from_package=None): """ uses env_dict if populated, otherwise tries to load from file / package. """ if env_dict is None: env_dict = persistence.RailEnvPersister.load_env_dict(filename, load_from_package=load_from_package) if env_dict.get('malfunction') is not None: oMFP = MalfunctionParameters(*env_dict["malfunction"]) else: oMFP = MalfunctionParameters(0, 0, 0) # no malfunctions super().__init__(oMFP)
################################################################################################ # OLD / DEPRECATED generator functions below. To be removed.
[docs] def no_malfunction_generator() -> Tuple[MalfunctionGenerator, MalfunctionProcessData]: """ Malfunction generator which generates no malfunctions Parameters ---------- Nothing Returns ------- generator, Tuple[float, int, int] with mean_malfunction_rate, min_number_of_steps_broken, max_number_of_steps_broken """ print("DEPRECATED - use NoMalfunctionGen instead of no_malfunction_generator") # Mean malfunction in number of time steps mean_malfunction_rate = 0. # Uniform distribution parameters for malfunction duration min_number_of_steps_broken = 0 max_number_of_steps_broken = 0 def generator(np_random: RandomState = None) -> Malfunction: return _make_Malfunction_object(0) return generator, MalfunctionProcessData(mean_malfunction_rate, min_number_of_steps_broken, max_number_of_steps_broken)
[docs] def single_malfunction_generator(earlierst_malfunction: int, malfunction_duration: int) -> Tuple[ MalfunctionGenerator, MalfunctionProcessData]: """ Malfunction generator which guarantees exactly one malfunction during an episode of an ACTIVE agent. Parameters ---------- earlierst_malfunction: Earliest possible malfunction onset malfunction_duration: The duration of the single malfunction Returns ------- generator, Tuple[float, int, int] with mean_malfunction_rate, min_number_of_steps_broken, max_number_of_steps_broken """ # Mean malfunction in number of time steps mean_malfunction_rate = 0. # Uniform distribution parameters for malfunction duration min_number_of_steps_broken = 0 max_number_of_steps_broken = 0 # Keep track of the total number of malfunctions in the env global_nr_malfunctions = 0 # Malfunction calls per agent malfunction_calls = dict() def generator(agent: EnvAgent = None, np_random: RandomState = None, reset=False) -> Optional[Malfunction]: # We use the global variable to assure only a single malfunction in the env nonlocal global_nr_malfunctions nonlocal malfunction_calls # Reset malfunciton generator if reset: nonlocal global_nr_malfunctions nonlocal malfunction_calls global_nr_malfunctions = 0 malfunction_calls = dict() return _make_Malfunction_object(0) # No more malfunctions if we already had one, ignore all updates if global_nr_malfunctions > 0: return _make_Malfunction_object(0) # Update number of calls per agent if agent.handle in malfunction_calls: malfunction_calls[agent.handle] += 1 else: malfunction_calls[agent.handle] = 1 # Break an agent that is active at the time of the malfunction if (agent.state == TrainState.MOVING or agent.state == TrainState.STOPPED) \ and malfunction_calls[agent.handle] >= earlierst_malfunction: # TODO : Dipam : Is this needed? global_nr_malfunctions += 1 return _make_Malfunction_object(malfunction_duration) else: return _make_Malfunction_object(0) return generator, MalfunctionProcessData(mean_malfunction_rate, min_number_of_steps_broken, max_number_of_steps_broken)
[docs] def malfunction_from_file(filename: str, load_from_package=None) -> Tuple[MalfunctionGenerator, MalfunctionProcessData]: """ Utility to load pickle file Parameters ---------- input_file : Pickle file generated by env.save() or editor Returns ------- generator, Tuple[float, int, int] with mean_malfunction_rate, min_number_of_steps_broken, max_number_of_steps_broken """ print("DEPRECATED - use FileMalfunctionGen instead of malfunction_from_file") env_dict = persistence.RailEnvPersister.load_env_dict(filename, load_from_package=load_from_package) # TODO: make this better by using namedtuple in the pickle file. See issue 282 if env_dict.get('malfunction') is not None: env_dict['malfunction'] = oMPD = MalfunctionProcessData._make(env_dict['malfunction']) else: oMPD = None if oMPD is not None: # Mean malfunction in number of time steps mean_malfunction_rate = oMPD.malfunction_rate # Uniform distribution parameters for malfunction duration min_number_of_steps_broken = oMPD.min_duration max_number_of_steps_broken = oMPD.max_duration else: # Mean malfunction in number of time steps mean_malfunction_rate = 0. # Uniform distribution parameters for malfunction duration min_number_of_steps_broken = 0 max_number_of_steps_broken = 0 def generator(agent: EnvAgent = None, np_random: RandomState = None, reset=False) -> Optional[Malfunction]: """ Generate malfunctions for agents Parameters ---------- agent np_random Returns ------- int: Number of time steps an agent is broken """ # Dummy reset function as we don't implement specific seeding here if reset: return _make_Malfunction_object(0) if agent.malfunction_handler.malfunction_down_counter < 1: if np_random.rand() < _malfunction_prob(mean_malfunction_rate): num_broken_steps = np_random.randint(min_number_of_steps_broken, max_number_of_steps_broken + 1) + 1 return _make_Malfunction_object(num_broken_steps) return _make_Malfunction_object(0) return generator, MalfunctionProcessData(mean_malfunction_rate, min_number_of_steps_broken, max_number_of_steps_broken)
[docs] def malfunction_from_params(parameters: MalfunctionParameters) -> Tuple[MalfunctionGenerator, MalfunctionProcessData]: """ Utility to load malfunction from parameters Parameters ---------- parameters : contains all the parameters of the malfunction malfunction_rate : float rate per timestep at which each agent malfunctions min_duration : int minimal duration of a failure max_number_of_steps_broken : int maximal duration of a failure Returns ------- generator, Tuple[float, int, int] with mean_malfunction_rate, min_number_of_steps_broken, max_number_of_steps_broken """ print("DEPRECATED - use ParamMalfunctionGen instead of malfunction_from_params") mean_malfunction_rate = parameters.malfunction_rate min_number_of_steps_broken = parameters.min_duration max_number_of_steps_broken = parameters.max_duration def generator(np_random: RandomState = None, reset=False) -> Optional[Malfunction]: """ Generate malfunctions for agents Parameters ---------- agent np_random Returns ------- int: Number of time steps an agent is broken """ # Dummy reset function as we don't implement specific seeding here if reset: return _make_Malfunction_object(0) if np_random.rand() < _malfunction_prob(mean_malfunction_rate): num_broken_steps = np_random.randint(min_number_of_steps_broken, max_number_of_steps_broken + 1) return _make_Malfunction_object(num_broken_steps) return _make_Malfunction_object(0) return generator, MalfunctionProcessData(mean_malfunction_rate, min_number_of_steps_broken, max_number_of_steps_broken)