import ast
import os
import uuid
from pathlib import Path
from typing import Optional, Tuple, Any, Dict
import pandas as pd
from attr import attrs, attrib
from flatland.envs.persistence import RailEnvPersister
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_env_action import RailEnvActions
from flatland.envs.step_utils.states import TrainState
EVENT_LOGS_SUBDIR = 'event_logs'
DISCRETE_ACTION_FNAME = os.path.join(EVENT_LOGS_SUBDIR, "ActionEvents.discrete_action.tsv")
TRAINS_ARRIVED_FNAME = os.path.join(EVENT_LOGS_SUBDIR, "TrainMovementEvents.trains_arrived.tsv")
TRAINS_POSITIONS_FNAME = os.path.join(EVENT_LOGS_SUBDIR, "TrainMovementEvents.trains_positions.tsv")
trains_rewards_dones_infos_FNAME = os.path.join(EVENT_LOGS_SUBDIR, "TrainMovementEvents.trains_rewards_dones_infos.tsv")
SERIALISED_STATE_SUBDIR = 'serialised_state'
OUTPUTS_SUBDIR = 'outputs'
def _uuid_str():
return str(uuid.uuid4())
[docs]
@attrs
class Trajectory:
"""
Encapsulates episode data (actions, positions etc.) for one or multiple episodes for further analysis/evaluation.
Aka. Episode
Aka. Recording
In contrast to rllib (https://github.com/ray-project/ray/blob/master/rllib/env/multi_agent_episode.py), we use a tabular approach (tsv-backed) instead of `dict`s.
Directory structure:
- event_logs
ActionEvents.discrete_action -- holds set of action to be replayed for the related episodes.
TrainMovementEvents.trains_arrived -- holds success rate for the related episodes.
TrainMovementEvents.trains_positions -- holds the positions for the related episodes.
TrainMovementEvents.trains_rewards_dones_infos -- holds the rewards for the related episodes.
- serialised_state
<ep_id>.pkl -- Holds the pickled environment version for the episode.
Indexing:
- actions for step i are index i-1 (i.e. starting at 0)
- positions before step i are indexed i-1 (i.e. starting at 0)
- positions after step are indexed i (i.e. starting at 1)
"""
data_dir = attrib(type=Path)
ep_id = attrib(type=str, factory=_uuid_str)
trains_positions = attrib(type=pd.DataFrame, default=None)
actions = attrib(type=pd.DataFrame, default=None)
trains_arrived = attrib(type=pd.DataFrame, default=None)
trains_rewards_dones_infos = attrib(type=pd.DataFrame, default=None)
[docs]
def load(self, episode_only: bool = False):
self.trains_positions = self._read_trains_positions(episode_only=episode_only)
self.actions = self._read_actions(episode_only=episode_only)
self.trains_arrived = self._read_trains_arrived(episode_only=episode_only)
self.trains_rewards_dones_infos = self._read_trains_rewards_dones_infos(episode_only=episode_only)
[docs]
def persist(self):
self._write_actions(self.actions)
self._write_trains_positions(self.trains_positions)
self._write_trains_arrived(self.trains_arrived)
self._write_trains_rewards_dones_infos(self.trains_rewards_dones_infos)
def _read_actions(self, episode_only: bool = False) -> pd.DataFrame:
"""Returns pd df with all actions for all episodes.
Parameters
----------
episode_only : bool
Filter df to contain only this episode.
"""
f = os.path.join(self.data_dir, DISCRETE_ACTION_FNAME)
if not os.path.exists(f):
return pd.DataFrame(columns=['episode_id', 'env_time', 'agent_id', 'action'])
df = pd.read_csv(f, sep='\t')
if episode_only:
df = df[df['episode_id'] == self.ep_id]
df["action"] = df["action"].map(RailEnvActions.from_value)
return df
def _read_trains_arrived(self, episode_only: bool = False) -> pd.DataFrame:
"""Returns pd df with success rate for all episodes.
Parameters
----------
episode_only : bool
Filter df to contain only this episode.
"""
f = os.path.join(self.data_dir, TRAINS_ARRIVED_FNAME)
if not os.path.exists(f):
return pd.DataFrame(columns=['episode_id', 'env_time', 'success_rate'])
df = pd.read_csv(f, sep='\t')
if episode_only:
return df[df['episode_id'] == self.ep_id]
return df
def _read_trains_positions(self, episode_only: bool = False) -> pd.DataFrame:
"""Returns pd df with all trains' positions for all episodes.
Parameters
----------
episode_only : bool
Filter df to contain only this episode.
"""
f = os.path.join(self.data_dir, TRAINS_POSITIONS_FNAME)
if not os.path.exists(f):
return pd.DataFrame(columns=['episode_id', 'env_time', 'agent_id', 'position'])
df = pd.read_csv(f, sep='\t')
df["position"] = df["position"].map(ast.literal_eval).map(lambda p: (p[0], int(p[1])))
if episode_only:
return df[df['episode_id'] == self.ep_id]
return df
def _read_trains_rewards_dones_infos(self, episode_only: bool = False) -> pd.DataFrame:
"""Returns pd df with all trains' rewards, dones, infos for all episodes.
Parameters
----------
episode_only : bool
Filter df to contain only this episode.
"""
f = os.path.join(self.data_dir, trains_rewards_dones_infos_FNAME)
if not os.path.exists(f):
return pd.DataFrame(columns=['episode_id', 'env_time', 'agent_id', 'reward', 'info', 'done'])
df = pd.read_csv(f, sep='\t')
if episode_only:
df = df[df['episode_id'] == self.ep_id]
df["info"] = df["info"].map(lambda s: s.replace("<TrainState.WAITING: 0>", "0").replace("<TrainState.READY_TO_DEPART: 1>", "1").replace(
"<TrainState.MALFUNCTION_OFF_MAP: 2>", "2").replace("<TrainState.MOVING: 3>", "3").replace("<TrainState.STOPPED: 4>", "4").replace(
"<TrainState.MALFUNCTION: 5>", "5").replace("<TrainState.DONE: 6>", "6"))
df["info"] = df["info"].map(ast.literal_eval)
df["info"] = df["info"].map(lambda d: {k: (v if k != "state" else TrainState(v)) for k, v in d.items()})
return df
def _write_trains_positions(self, df: pd.DataFrame):
"""Store pd df with all trains' positions for all episodes."""
f = os.path.join(self.data_dir, TRAINS_POSITIONS_FNAME)
Path(f).parent.mkdir(parents=True, exist_ok=True)
df["position"] = df["position"].map(lambda p: (p[0], int(p[1])))
df.to_csv(f, sep='\t', index=False)
def _write_actions(self, df: pd.DataFrame):
"""Store pd df with all trains' actions for all episodes."""
f = os.path.join(self.data_dir, DISCRETE_ACTION_FNAME)
Path(f).parent.mkdir(parents=True, exist_ok=True)
df["action"] = df["action"].map(lambda a: a.value if isinstance(a, RailEnvActions) else a)
df.to_csv(f, sep='\t', index=False)
def _write_trains_arrived(self, df: pd.DataFrame):
"""Store pd df with all trains' success rates for all episodes."""
f = os.path.join(self.data_dir, TRAINS_ARRIVED_FNAME)
Path(f).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(f, sep='\t', index=False)
def _write_trains_rewards_dones_infos(self, df: pd.DataFrame):
"""Store pd df with all trains' rewards for all episodes."""
f = os.path.join(self.data_dir, trains_rewards_dones_infos_FNAME)
Path(f).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(f, sep='\t', index=False)
[docs]
def restore_episode(self, start_step: int = None) -> Optional[RailEnv]:
"""Restore an episode.
Parameters
----------
start_step : Optional[int]
start from snapshot (if it exists)
Returns
-------
RailEnv
the rail env or None if the snapshot at the step does not exist
"""
if start_step is None:
f = os.path.join(self.data_dir, SERIALISED_STATE_SUBDIR, f'{self.ep_id}.pkl')
env, _ = RailEnvPersister.load_new(f)
return env
else:
f = os.path.join(self.data_dir, SERIALISED_STATE_SUBDIR, f"{self.ep_id}_step{start_step:04d}.pkl")
if not os.path.isfile(f):
return None
env, _ = RailEnvPersister.load_new(f)
return env
[docs]
def position_collect(self, env_time: int, agent_id: int, position: Tuple[Tuple[int, int], int]):
df = self.trains_positions
df.loc[len(df)] = {'episode_id': self.ep_id, 'env_time': env_time, 'agent_id': agent_id, 'position': position}
[docs]
def action_collect(self, env_time: int, agent_id: int, action: RailEnvActions):
df = self.actions
df.loc[len(df)] = {'episode_id': self.ep_id, 'env_time': env_time, 'agent_id': agent_id, 'action': action}
[docs]
def arrived_collect(self, env_time: int, success_rate: float):
df = self.trains_arrived
df.loc[len(df)] = {'episode_id': self.ep_id, 'env_time': env_time, 'success_rate': success_rate}
[docs]
def rewards_dones_infos_collect(self, env_time: int, agent_id: int, reward: float, info: Any, done: bool):
df = self.trains_rewards_dones_infos
df.loc[len(df)] = {'episode_id': self.ep_id, 'env_time': env_time, 'agent_id': agent_id, 'reward': reward, 'info': info, 'done': done}
[docs]
def position_lookup(self, env_time: int, agent_id: int) -> Tuple[Tuple[int, int], int]:
"""Method used to retrieve the stored position (if available).
Parameters
----------
env_time: int
position before (!) step env_time
agent_id: int
agent ID
Returns
-------
Tuple[Tuple[int, int], int]
The position in the format ((row, column), direction).
"""
df = self.trains_positions
pos = df.loc[(df['env_time'] == env_time) & (df['agent_id'] == agent_id) & (df['episode_id'] == self.ep_id)]['position']
if len(pos) != 1:
print(f"Found {len(pos)} positions for {self.ep_id} {env_time} {agent_id}")
print(df[(df['agent_id'] == agent_id) & (df['episode_id'] == self.ep_id)]["env_time"])
assert len(pos) == 1, f"Found {len(pos)} positions for {self.ep_id} {env_time} {agent_id}"
# fail fast
p, d = pos.iloc[0]
return (p, d)
[docs]
def action_lookup(self, env_time: int, agent_id: int) -> RailEnvActions:
"""Method used to retrieve the stored action (if available). Defaults to 2 = MOVE_FORWARD.
Parameters
----------
env_time: int
action going into step env_time
agent_id: int
agent ID
Returns
-------
RailEnvActions
The action to step the env.
"""
actions_df = self.actions
action = actions_df.loc[
(actions_df['env_time'] == env_time) &
(actions_df['agent_id'] == agent_id) &
(actions_df['episode_id'] == self.ep_id)
]['action'].to_numpy()
if len(action) == 0:
return RailEnvActions.MOVE_FORWARD
return RailEnvActions.from_value(action[0])
[docs]
def trains_arrived_lookup(self) -> pd.Series:
"""Method used to retrieve the trains arrived for the episode.
Parameters
----------
movements_df: pd.DataFrame
Data frame from event_logs/TrainMovementEvents.trains_arrived.tsv
Returns
-------
pd.Series
The trains arrived data.
"""
movements_df = self.trains_arrived
movement = movements_df.loc[(movements_df['episode_id'] == self.ep_id)]
if len(movement) == 1:
return movement.iloc[0]
raise Exception(f"No entry for {self.ep_id} found in data frame.")
[docs]
def trains_rewards_dones_infos_lookup(self, env_time: int, agent_id: int) -> Tuple[float, bool, Dict]:
"""Method used to retrieve the rewards for the episode.
Parameters
----------
env_time: int
action going into step env_time
agent_id: int
agent ID
Returns
-------
pd.DataFrame
The trains arrived data.
"""
rewards_df = self.trains_rewards_dones_infos
data = rewards_df.loc[(rewards_df['env_time'] == env_time) & (rewards_df['agent_id'] == agent_id) & (rewards_df['episode_id'] == self.ep_id)]
assert len(data) == 1
data = data.iloc[0]
return data["reward"], data["done"], data["info"]
@property
def outputs_dir(self) -> Path:
return self.data_dir / OUTPUTS_SUBDIR
[docs]
def compare_actions(self, other: "Trajectory", start_step: int = None, end_step: int = None) -> pd.DataFrame:
df = self._read_actions(episode_only=True)
other_df = other._read_actions(episode_only=True)
return self._compare(df, other_df, end_step, start_step)
[docs]
def compare_positions(self, other: "Trajectory", start_step: int = None, end_step: int = None) -> pd.DataFrame:
df = self._read_trains_positions(episode_only=True)
other_df = other._read_trains_positions(episode_only=True)
return self._compare(df, other_df, end_step, start_step)
[docs]
def compare_arrived(self, other: "Trajectory", start_step: int = None, end_step: int = None) -> pd.DataFrame:
df = self._read_trains_arrived(episode_only=True)
other_df = other._read_trains_arrived(episode_only=True)
return self._compare(df, other_df, end_step, start_step)
[docs]
def compare_rewards_dones_infos(self, other: "Trajectory", start_step: int = None, end_step: int = None) -> pd.DataFrame:
df = self._read_trains_rewards_dones_infos(episode_only=True)
other_df = other._read_trains_rewards_dones_infos(episode_only=True)
return self._compare(df, other_df, end_step, start_step)
def _compare(self, df, other_df, end_step, start_step):
if start_step is not None:
df = df[df["env_time"] >= start_step]
other_df = other_df[other_df["env_time"] >= start_step]
if end_step is not None:
df = df[df["env_time"] < end_step]
other_df = other_df[other_df["env_time"] < end_step]
df.reset_index(drop=True, inplace=True)
other_df.reset_index(drop=True, inplace=True)
df.drop(columns="episode_id", inplace=True)
other_df.drop(columns="episode_id", inplace=True)
diff = df.compare(other_df)
return diff