import ast
import os
import uuid
from collections import defaultdict
from pathlib import Path
from typing import Optional, Tuple, Any, Dict
import pandas as pd
from attr import attrs, attrib
from flatland.envs.persistence import RailEnvPersister
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_env_action import RailEnvActions
from flatland.envs.rewards import Rewards
from flatland.envs.step_utils.states import TrainState
EVENT_LOGS_SUBDIR = 'event_logs'
DISCRETE_ACTION_FNAME = os.path.join(EVENT_LOGS_SUBDIR, "ActionEvents.discrete_action.tsv")
TRAINS_ARRIVED_FNAME = os.path.join(EVENT_LOGS_SUBDIR, "TrainMovementEvents.trains_arrived.tsv")
TRAINS_POSITIONS_FNAME = os.path.join(EVENT_LOGS_SUBDIR, "TrainMovementEvents.trains_positions.tsv")
TRAINS_REWARDS_DONES_INFOS_FNAME = os.path.join(EVENT_LOGS_SUBDIR, "TrainMovementEvents.trains_rewards_dones_infos.tsv")
SERIALISED_STATE_SUBDIR = 'serialised_state'
OUTPUTS_SUBDIR = 'outputs'
def _uuid_str():
return str(uuid.uuid4())
[docs]
@attrs
class Trajectory:
"""
Encapsulates episode data (actions, positions etc.) for one or multiple episodes for further analysis/evaluation.
Aka. Episode
Aka. Recording
In contrast to rllib (https://github.com/ray-project/ray/blob/master/rllib/env/multi_agent_episode.py), we use a tabular approach (tsv-backed) instead of `dict`s.
Directory structure:
- event_logs
ActionEvents.discrete_action -- holds set of action to be replayed for the related episodes.
TrainMovementEvents.trains_arrived -- holds success rate for the related episodes.
TrainMovementEvents.trains_positions -- holds the positions for the related episodes.
TrainMovementEvents.trains_rewards_dones_infos -- holds the rewards for the related episodes.
- serialised_state
<ep_id>.pkl -- Holds the pickled environment version for the episode.
Indexing:
- actions for step i are index i-1 (i.e. starting at 0)
- positions before step i are indexed i-1 (i.e. starting at 0)
- positions after step are indexed i (i.e. starting at 1)
"""
data_dir = attrib(type=Path)
ep_id = attrib(type=str, factory=_uuid_str)
trains_positions = attrib(type=pd.DataFrame, default=None)
actions = attrib(type=pd.DataFrame, default=None)
trains_arrived = attrib(type=pd.DataFrame, default=None)
trains_rewards_dones_infos = attrib(type=pd.DataFrame, default=None)
_trains_positions_collect = None
_actions_collect = None
_trains_arrived_collect = None
_trains_rewards_dones_infos_collect = None
def _load(self, episode_only: bool = False):
self.trains_positions = self._read_trains_positions(episode_only=episode_only)
self.actions = self._read_actions(episode_only=episode_only)
self.trains_arrived = self._read_trains_arrived(episode_only=episode_only)
self.trains_rewards_dones_infos = self._read_trains_rewards_dones_infos(episode_only=episode_only)
self._trains_positions_collect = []
self._actions_collect = []
self._trains_arrived_collect = []
self._trains_rewards_dones_infos_collect = []
self.outputs_dir.mkdir(exist_ok=True, parents=True)
[docs]
def persist(self):
self.actions = pd.concat([self.actions, self._collected_actions_to_df()])
self.trains_positions = pd.concat([self.trains_positions, self._collected_trains_positions_to_df()])
self.trains_arrived = pd.concat([self.trains_arrived, self._collected_trains_arrived_to_df()])
self.trains_rewards_dones_infos = pd.concat([self.trains_rewards_dones_infos, self._collected_trains_rewards_dones_infos_to_df()])
self._write_actions(self.actions)
self._write_trains_positions(self.trains_positions)
self._write_trains_arrived(self.trains_arrived)
self._write_trains_rewards_dones_infos(self.trains_rewards_dones_infos)
def _collected_trains_rewards_dones_infos_to_df(self) -> pd.DataFrame:
return pd.DataFrame.from_records(self._trains_rewards_dones_infos_collect)
def _collected_trains_arrived_to_df(self) -> pd.DataFrame:
return pd.DataFrame.from_records(self._trains_arrived_collect)
def _collected_trains_positions_to_df(self) -> pd.DataFrame:
return pd.DataFrame.from_records(self._trains_positions_collect)
def _collected_actions_to_df(self) -> pd.DataFrame:
return pd.DataFrame.from_records(self._actions_collect)
def _read_actions(self, episode_only: bool = False) -> pd.DataFrame:
"""Returns pd df with all actions for all episodes.
Parameters
----------
episode_only : bool
Filter df to contain only this episode.
"""
f = os.path.join(self.data_dir, DISCRETE_ACTION_FNAME)
if not os.path.exists(f):
return pd.DataFrame(columns=['episode_id', 'env_time', 'agent_id', 'action'])
df = pd.read_csv(f, sep='\t')
if episode_only:
df = df[df['episode_id'] == self.ep_id]
df["action"] = df["action"].map(RailEnvActions.from_value)
return df
def _read_trains_arrived(self, episode_only: bool = False) -> pd.DataFrame:
"""Returns pd df with success rate for all episodes.
Parameters
----------
episode_only : bool
Filter df to contain only this episode.
"""
f = os.path.join(self.data_dir, TRAINS_ARRIVED_FNAME)
if not os.path.exists(f):
return pd.DataFrame(columns=['episode_id', 'env_time', 'success_rate', 'normalized_reward'])
df = pd.read_csv(f, sep='\t')
if episode_only:
return df[df['episode_id'] == self.ep_id]
return df
def _read_trains_positions(self, episode_only: bool = False) -> pd.DataFrame:
"""Returns pd df with all trains' positions for all episodes.
Parameters
----------
episode_only : bool
Filter df to contain only this episode.
"""
f = os.path.join(self.data_dir, TRAINS_POSITIONS_FNAME)
if not os.path.exists(f):
return pd.DataFrame(columns=['episode_id', 'env_time', 'agent_id', 'position'])
df = pd.read_csv(f, sep='\t')
df["position"] = df["position"].map(ast.literal_eval).map(lambda p: (p[0], int(p[1])))
if episode_only:
return df[df['episode_id'] == self.ep_id]
return df
def _read_trains_rewards_dones_infos(self, episode_only: bool = False) -> pd.DataFrame:
"""Returns pd df with all trains' rewards, dones, infos for all episodes.
Parameters
----------
episode_only : bool
Filter df to contain only this episode.
"""
f = os.path.join(self.data_dir, TRAINS_REWARDS_DONES_INFOS_FNAME)
if not os.path.exists(f):
return pd.DataFrame(columns=['episode_id', 'env_time', 'agent_id', 'reward', 'info', 'done'])
df = pd.read_csv(f, sep='\t')
if episode_only:
df = df[df['episode_id'] == self.ep_id]
df["info"] = df["info"].map(lambda s: s.replace("<TrainState.WAITING: 0>", "0").replace("<TrainState.READY_TO_DEPART: 1>", "1").replace(
"<TrainState.MALFUNCTION_OFF_MAP: 2>", "2").replace("<TrainState.MOVING: 3>", "3").replace("<TrainState.STOPPED: 4>", "4").replace(
"<TrainState.MALFUNCTION: 5>", "5").replace("<TrainState.DONE: 6>", "6"))
df["info"] = df["info"].map(ast.literal_eval)
df["info"] = df["info"].map(lambda d: {k: (v if k != "state" else TrainState(v)) for k, v in d.items()})
if df.dtypes["reward"] == object:
df["reward"] = df["reward"].map(ast.literal_eval)
return df
def _write_trains_positions(self, df: pd.DataFrame):
"""Store pd df with all trains' positions for all episodes."""
f = os.path.join(self.data_dir, TRAINS_POSITIONS_FNAME)
Path(f).parent.mkdir(parents=True, exist_ok=True)
df["position"] = df["position"].map(lambda p: (p[0], int(p[1])))
df.to_csv(f, sep='\t', index=False)
def _write_actions(self, df: pd.DataFrame):
"""Store pd df with all trains' actions for all episodes."""
f = os.path.join(self.data_dir, DISCRETE_ACTION_FNAME)
Path(f).parent.mkdir(parents=True, exist_ok=True)
df["action"] = df["action"].map(lambda a: a.value if isinstance(a, RailEnvActions) else a)
df.to_csv(f, sep='\t', index=False)
def _write_trains_arrived(self, df: pd.DataFrame):
"""Store pd df with all trains' success rates for all episodes."""
f = os.path.join(self.data_dir, TRAINS_ARRIVED_FNAME)
Path(f).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(f, sep='\t', index=False)
def _write_trains_rewards_dones_infos(self, df: pd.DataFrame):
"""Store pd df with all trains' rewards for all episodes."""
f = os.path.join(self.data_dir, TRAINS_REWARDS_DONES_INFOS_FNAME)
Path(f).parent.mkdir(parents=True, exist_ok=True)
df.to_csv(f, sep='\t', index=False)
def _find_closest_snapshot(self, start_step):
closest = None
for p in (Path(self.data_dir) / SERIALISED_STATE_SUBDIR).iterdir():
p: Path = p
if not (p.name.startswith(f"{self.ep_id}_step") and p.name.endswith(".pkl")):
continue
step = int(p.name.replace(f"{self.ep_id}_step", "").replace(".pkl", ""))
if step <= start_step and (closest is None or (step > closest)):
closest = step
return closest
[docs]
def position_collect(self, env_time: int, agent_id: int, position: Tuple[Tuple[int, int], int]):
self._trains_positions_collect.append({'episode_id': self.ep_id, 'env_time': env_time, 'agent_id': agent_id, 'position': position})
[docs]
def action_collect(self, env_time: int, agent_id: int, action: RailEnvActions):
self._actions_collect.append({'episode_id': self.ep_id, 'env_time': env_time, 'agent_id': agent_id, 'action': action})
[docs]
def arrived_collect(self, env_time: int, success_rate: float, normalized_reward: float):
self._trains_arrived_collect.append(
{'episode_id': self.ep_id, 'env_time': env_time, 'success_rate': success_rate, 'normalized_reward': normalized_reward})
[docs]
def rewards_dones_infos_collect(self, env_time: int, agent_id: int, reward: float, info: Any, done: bool):
self._trains_rewards_dones_infos_collect.append({
'episode_id': self.ep_id, 'env_time': env_time, 'agent_id': agent_id, 'reward': reward, 'info': info, 'done': done
})
[docs]
def build_cache(self) -> Tuple[dict, dict, dict]:
action_cache = defaultdict(lambda: defaultdict(dict))
for item in self.actions[self.actions["episode_id"] == self.ep_id].to_records():
action_cache[item["env_time"]][item["agent_id"]] = RailEnvActions.from_value(item["action"])
position_cache = defaultdict(lambda: defaultdict(dict))
for item in self.trains_positions[self.trains_positions["episode_id"] == self.ep_id].to_records():
p, d = item['position']
position_cache[item["env_time"]][item["agent_id"]] = (p, d)
trains_rewards_dones_infos_cache = defaultdict(lambda: defaultdict(dict))
for data in self.trains_rewards_dones_infos[self.trains_rewards_dones_infos["episode_id"] == self.ep_id].to_records():
trains_rewards_dones_infos_cache[data["env_time"]][data["agent_id"]] = (data["reward"], data["done"], data["info"])
return action_cache, position_cache, trains_rewards_dones_infos_cache
[docs]
def position_lookup(self, env_time: int, agent_id: int) -> Tuple[Tuple[int, int], int]:
"""Method used to retrieve the stored position (if available).
Parameters
----------
env_time: int
position before (!) step env_time
agent_id: int
agent ID
Returns
-------
Tuple[Tuple[int, int], int]
The position in the format ((row, column), direction).
"""
df = self.trains_positions
pos = df.loc[(df['env_time'] == env_time) & (df['agent_id'] == agent_id) & (df['episode_id'] == self.ep_id)]['position']
if len(pos) != 1:
print(f"Found {len(pos)} positions for {self.ep_id} {env_time} {agent_id}")
print(df[(df['agent_id'] == agent_id) & (df['episode_id'] == self.ep_id)]["env_time"])
assert len(pos) == 1, f"Found {len(pos)} positions for {self.ep_id} {env_time} {agent_id}"
# fail fast
p, d = pos.iloc[0]
return p, d
[docs]
def action_lookup(self, env_time: int, agent_id: int) -> RailEnvActions:
"""Method used to retrieve the stored action (if available). Defaults to 2 = MOVE_FORWARD.
Parameters
----------
env_time: int
action going into step env_time
agent_id: int
agent ID
Returns
-------
RailEnvActions
The action to step the env.
"""
actions_df = self.actions
action = actions_df.loc[
(actions_df['env_time'] == env_time) &
(actions_df['agent_id'] == agent_id) &
(actions_df['episode_id'] == self.ep_id)
]['action'].to_numpy()
if len(action) == 0:
return RailEnvActions.MOVE_FORWARD
return RailEnvActions.from_value(action[0])
[docs]
def trains_arrived_lookup(self) -> pd.Series:
"""Method used to retrieve the trains arrived for the episode.
Returns
-------
pd.Series
The trains arrived data.
"""
movements_df = self.trains_arrived
movement = movements_df.loc[(movements_df['episode_id'] == self.ep_id)]
if len(movement) == 1:
return movement.iloc[0]
raise Exception(f"No entry for {self.ep_id} found in data frame.")
[docs]
def trains_rewards_dones_infos_lookup(self, env_time: int, agent_id: int) -> Tuple[float, bool, Dict]:
"""Method used to retrieve the rewards for the episode.
Parameters
----------
env_time: int
action going into step env_time
agent_id: int
agent ID
Returns
-------
pd.DataFrame
The trains arrived data.
"""
rewards_df = self.trains_rewards_dones_infos
data = rewards_df.loc[(rewards_df['env_time'] == env_time) & (rewards_df['agent_id'] == agent_id) & (rewards_df['episode_id'] == self.ep_id)]
assert len(data) == 1, (env_time, agent_id, self.ep_id, data)
data = data.iloc[0]
return data["reward"], data["done"], data["info"]
@property
def outputs_dir(self) -> Path:
return self.data_dir / OUTPUTS_SUBDIR
[docs]
def compare_actions(self, other: "Trajectory", start_step: int = None, end_step: int = None, ignoring_waiting=False) -> pd.DataFrame:
df = self._read_actions(episode_only=True)
other_df = other._read_actions(episode_only=True)
num_agents = df["agent_id"].max() + 1
if ignoring_waiting:
df["state"] = self._read_trains_rewards_dones_infos(episode_only=True)["info"].map(lambda d: d["state"])
other_df["state"] = other._read_trains_rewards_dones_infos(episode_only=True)["info"].map(lambda d: d["state"])
# we need to consider prev_state as the state for env_time is after the state update at the end of step function!
df["prev_state"] = df["state"].shift(num_agents)
other_df["prev_state"] = other_df["state"].shift(num_agents)
df = df[df["prev_state"] != TrainState.WAITING]
other_df = other_df[other_df["prev_state"] != TrainState.WAITING]
return self._compare(df, other_df, ['env_time', 'agent_id', 'action'], end_step, start_step)
[docs]
def compare_positions(self, other: "Trajectory", start_step: int = None, end_step: int = None) -> pd.DataFrame:
df = self._read_trains_positions(episode_only=True)
other_df = other._read_trains_positions(episode_only=True)
return self._compare(df, other_df, ['env_time', 'agent_id', 'position'], end_step, start_step)
[docs]
def compare_arrived(self, other: "Trajectory", start_step: int = None, end_step: int = None, skip_normalized_reward: bool = True) -> pd.DataFrame:
df = self._read_trains_arrived(episode_only=True)
other_df = other._read_trains_arrived(episode_only=True)
columns = ['env_time', 'success_rate']
# TODO re-generate regression trajectories.
if not skip_normalized_reward:
columns.append('normalized_reward')
return self._compare(df, other_df, columns, end_step, start_step)
[docs]
def compare_rewards_dones_infos(self, other: "Trajectory", start_step: int = None, end_step: int = None, ignoring_rewards: bool = False) -> pd.DataFrame:
df = self._read_trains_rewards_dones_infos(episode_only=True)
other_df = other._read_trains_rewards_dones_infos(episode_only=True)
columns = ['env_time', 'agent_id', 'reward', 'info', 'done']
if ignoring_rewards:
columns = ['env_time', 'agent_id', 'info', 'done']
return self._compare(df, other_df, columns, end_step, start_step)
@staticmethod
def _compare(df, other_df, columns, end_step, start_step, return_frames=False):
if start_step is not None:
df = df[df["env_time"] >= start_step]
other_df = other_df[other_df["env_time"] >= start_step]
if end_step is not None:
df = df[df["env_time"] < end_step]
other_df = other_df[other_df["env_time"] < end_step]
df.reset_index(drop=True, inplace=True)
other_df.reset_index(drop=True, inplace=True)
df.drop(columns="episode_id", inplace=True)
other_df.drop(columns="episode_id", inplace=True)
diff = df[columns].compare(other_df[columns])
if return_frames:
return diff, df, other_df
return diff
[docs]
def load_env(self, start_step: int = None, inexact: bool = False, rewards: Rewards = None) -> Optional[RailEnv]:
"""
Restore an episode's env.
Parameters
----------
start_step : Optional[int]
start from snapshot (if it exists)
inexact : bool
allows returning the last snapshot before start_step
rewards : Rewards
rewards for the loaded env. If not provided, defaults to the loaded env's rewards.
Returns
-------
RailEnv
the rail env or None if the snapshot at the step does not exist
"""
self.outputs_dir.mkdir(exist_ok=True)
if start_step is None:
f = os.path.join(self.data_dir, SERIALISED_STATE_SUBDIR, f'{self.ep_id}.pkl')
env, _ = RailEnvPersister.load_new(f, rewards=rewards)
return env
else:
closest = start_step
if inexact:
closest = self._find_closest_snapshot(start_step)
if closest is None:
f = os.path.join(self.data_dir, SERIALISED_STATE_SUBDIR, f'{self.ep_id}.pkl')
env, _ = RailEnvPersister.load_new(f, rewards=rewards)
return env
f = os.path.join(self.data_dir, SERIALISED_STATE_SUBDIR, f"{self.ep_id}_step{closest:04d}.pkl")
env, _ = RailEnvPersister.load_new(f, rewards=rewards)
return env
[docs]
@staticmethod
def load_existing(data_dir: Path, ep_id: str) -> "Trajectory":
"""
Load existing trajectory from disk.
Parameters
----------
data_dir : Path
the data dir backing the trajectory.
ep_id
the ep_id - the data dir may contain multiple trajectories in the same data frames.
Returns
-------
Trajectory
"""
t = Trajectory(data_dir=data_dir, ep_id=ep_id)
t._load()
return t
[docs]
def fork(self, data_dir: Path, start_step: int, ep_id: Optional[str] = None) -> "Trajectory":
"""
Fork a trajectory to a new location and a new episode ID.
Parameters
----------
data_dir : Path
the data dir backing the forked trajectory.
ep_id : str
the new episode ID for the fork. If not provided, a new UUID is generated.
start_step : int
where to start the fork
Returns
-------
Trajectory
"""
trajectory = Trajectory.create_empty(data_dir=data_dir, ep_id=ep_id)
env = self.load_env(start_step=start_step, inexact=True)
self._load(episode_only=True)
# will run action start_step into step start_step+1
trajectory.actions = self.actions[self.actions["env_time"] < start_step]
trajectory.trains_positions = self.trains_positions[self.trains_positions["env_time"] <= start_step]
trajectory.trains_arrived = self.trains_arrived[self.trains_arrived["env_time"] <= start_step]
trajectory.trains_rewards_dones_infos = self.trains_rewards_dones_infos[
self.trains_rewards_dones_infos["env_time"] <= start_step]
trajectory.actions["episode_id"] = trajectory.ep_id
trajectory.trains_positions["episode_id"] = trajectory.ep_id
trajectory.trains_arrived["episode_id"] = trajectory.ep_id
trajectory.trains_rewards_dones_infos["episode_id"] = trajectory.ep_id
trajectory.persist()
if env is None or env._elapsed_steps < start_step:
from flatland.evaluators.trajectory_evaluator import TrajectoryEvaluator
(trajectory.data_dir / SERIALISED_STATE_SUBDIR).mkdir(parents=True)
if env is None:
# copy initial env
RailEnvPersister.save(env, trajectory.data_dir / SERIALISED_STATE_SUBDIR / f"{trajectory.ep_id}.pkl")
# replay the trajectory to the start_step from the latest snapshot
env = TrajectoryEvaluator(trajectory=trajectory).evaluate(end_step=start_step)
RailEnvPersister.save(env, trajectory.data_dir / SERIALISED_STATE_SUBDIR / f"{trajectory.ep_id}_step{env._elapsed_steps:04d}.pkl")
else:
# copy latest snapshot
RailEnvPersister.save(env, trajectory.data_dir / SERIALISED_STATE_SUBDIR / f"{trajectory.ep_id}_step{env._elapsed_steps:04d}.pkl")
# replay the trajectory to the start_step from the latest snapshot
env = TrajectoryEvaluator(trajectory=trajectory).evaluate(start_step=env._elapsed_steps, end_step=start_step)
RailEnvPersister.save(env, trajectory.data_dir / SERIALISED_STATE_SUBDIR / f"{trajectory.ep_id}_step{env._elapsed_steps:04d}.pkl")
trajectory._load()
return trajectory
[docs]
@staticmethod
def create_empty(data_dir: Path, ep_id: Optional[str] = None) -> "Trajectory":
"""
Create a new empty trajectory.
Parameters
----------
data_dir : Path
the data dir backing the trajectory. Must be empty.
ep_id
the episode ID for the new trajectory. If not provided, a new UUID is generated.
Returns
-------
Trajectory
"""
data_dir.mkdir(parents=True, exist_ok=True)
if ep_id is not None:
trajectory = Trajectory.load_existing(data_dir=data_dir, ep_id=ep_id)
else:
trajectory = Trajectory.load_existing(data_dir=data_dir, ep_id=_uuid_str())
# ensure to start with new empty df to avoid inconsistencies:
assert len(trajectory.trains_positions) == 0
assert len(trajectory.actions) == 0
assert len(trajectory.trains_arrived) == 0
assert len(trajectory.trains_rewards_dones_infos) == 0
return trajectory