Source code for flatland.utils.editor_controller



import os
import time
from collections import deque

from typing import Optional

import ipywidgets
#import jpy_canvas
from ipycanvas import Canvas
import ipyevents as ipe

import numpy as np
from ipywidgets import IntSlider, VBox, HBox, Checkbox, Output, Text, RadioButtons, Tab
from numpy import array

import flatland.utils.rendertools as rt
from flatland.core.grid.grid4_utils import mirror
from flatland.envs.agent_utils import EnvAgent
from flatland.envs.line_generators import sparse_line_generator
from flatland.envs.observations import TreeObsForRailEnv
from flatland.envs.predictions import ShortestPathPredictorForRailEnv
from flatland.envs.rail_env import RailEnv
from flatland.envs.rail_generators import sparse_rail_generator, empty_rail_generator

from flatland.utils.editor_interfaces import AbstractController, AbstractModel, AbstractView


[docs] class Controller(AbstractController): """ Controller to handle incoming events from the ipywidgets Updates the editor/model. Calls the View directly for things which do not directly effect the model (this means the mouse drag path before it is interpreted as transitions) """ def __init__(self, model:"AbstractModel", view:AbstractView): self.editor = self.model = model self.view = view self.q_events = deque() self.drawMode = "Draw" self.bMouseDown = False self.boundingRectWidth = None self.boundingRectHeight = None
[docs] def set_model(self, model): self.model = model
def _getCoords(self, event:dict): #x = event['canvasX'] #y = event['canvasY'] x = event['relativeX'] y = event['relativeY'] #self.debug("debug:", x, y) return x, y
[docs] def getModKeys(self, event:dict): bShift = event["shiftKey"] bCtrl = event["ctrlKey"] bAlt = event["altKey"] return bShift, bCtrl, bAlt
#def handle_event(self, event:dict):
[docs] def handle_event(self, event: ipe.Event): if "boundingRectWidth" in event: self.boundingRectWidth = event["boundingRectWidth"] if "boundingRectHeight" in event: self.boundingRectHeight = event["boundingRectHeight"] if event['type'] == 'mousemove': nButtons = int(event["buttons"]) if nButtons > 0: self.bMouseDown = True self.on_mouse_move(event) else: # nButtons == 0, ie mouse is now up if self.bMouseDown: # mouse was down, now up self.bMouseDown = False self.log("testing 123") self.log("mouse up event: ", event) self.log("lrcStroke: ", self.model.lrcStroke) self.log("testing 456") self.on_mouse_move(event) # process a move event with no buttons pressed elif event['type'] == 'click': nStroke = self.model.get_len_stroke() if nStroke == 0: self.on_click(event) else: self.log("ignoring click, stroke len:", nStroke) self.debug(event)
#self.on_click(event)
[docs] def on_click(self, event): self.debug("on_click: ", event) x, y = self._getCoords(event) rc_cell = self.view.xy_to_rc(x, y) bShift, bCtrl, bAlt = self.getModKeys(event) if bCtrl and not bShift and not bAlt: # only ctrl -> click agent self.model.click_agent(rc_cell) self.model.clear_stroke() elif bShift and bCtrl: # ctrl+shift -> add_target self.model.add_target(rc_cell) self.model.clear_stroke() elif bAlt and not bShift and not bCtrl: # only alt -> clear cell self.model.clear_cell(rc_cell) self.model.clear_stroke() self.debug("click in cell", rc_cell) self.model.debug_cell(rc_cell) if self.model.selected_agent is not None: self.model.clear_stroke()
[docs] def set_debug(self, event): self.model.set_debug(event["new"])
[docs] def set_debug_move(self, event): self.model.set_debug_move(event["new"])
[docs] def set_draw_mode(self, event): self.set_draw_mode = event["new"]
[docs] def set_filename(self, event): self.model.set_filename(event["new"])
[docs] def on_mouse_move(self, event: dict): """ Mouse motion event handler for drawing. """ x, y = self._getCoords(event) q_events = self.q_events # only log mousedown drag events, unless debug_move is set nButtons = int(event["buttons"]) #if self.model.debug_bool and (nButtons > 0 or self.model.debug_move_bool): # in fact we already filter so we only get drags and mouse up. self.debug_event(event) # If the mouse is held down, enqueue an event in our own queue # The intention was to avoid too many redraws. if nButtons > 0: q_events.append((time.time(), x, y)) bShift, bCtrl, bAlt = self.getModKeys(event) # Reset the stroke, if ALT, CTRL or SHIFT pressed if bShift or bCtrl or bAlt: self.model.clear_stroke() while len(q_events) > 0: t, x, y = q_events.popleft() return # NCW: this can't be right. If the mouse is not held down, treat it as a mouseup, and draw the stroke. #else: # self.model.clear_stroke() # JW: I think this clause causes all editing to fail once an agent is selected. # I also can't see why it's necessary. So I've if-falsed it out. if False: if self.model.selected_agent is not None: self.model.clear_stroke() while len(q_events) > 0: t, x, y = q_events.popleft() return # Process the low-level events in our queue: # Draw a black square to indicate a trail # Convert the xy position to a cell rc # Enqueue transitions across cells in another queue if len(q_events) > 0: t_now = time.time() if t_now - q_events[0][0] > 0.1: # wait before trying to draw while len(q_events) > 0: t, x, y = q_events.popleft() # get events from our queue self.view.drag_path_element(x, y) # Translate and scale from x,y to integer row,col (note order change) rc_cell = self.view.xy_to_rc(x, y) self.editor.drag_path_element(rc_cell) self.view.redisplay_image() #self.view.redraw() # if mouse up, process the stroke if nButtons == 0: self.model.mod_path(not event["shiftKey"])
[docs] def refresh(self, event): self.debug("refresh") self.view.redraw()
[docs] def clear(self, event): self.model.clear()
[docs] def clear_stroke(self, msg:str=""): self.debug("controller clear_stroke: ", msg) self.model.clear_stroke()
[docs] def reset(self, event): self.log("Reset - nAgents:", self.view.regen_n_agents.value) self.log("Reset - size:", self.model.regen_size_width) self.log("Reset - size:", self.model.regen_size_height) self.model.reset(regenerate_schedule=self.view.replace_agents.value, nAgents=self.view.regen_n_agents.value)
[docs] def rotate_agent(self, event): self.log("Rotate Agent:", self.model.selected_agent) if self.model.selected_agent is not None: for agent_idx, agent in enumerate(self.model.env.agents): if agent is None: continue if agent_idx == self.model.selected_agent: agent.initial_direction = (agent.initial_direction + 1) % 4 agent.direction = agent.initial_direction agent.old_direction = agent.direction self.model.redraw()
[docs] def reset_agents(self, event): self.log("Restart Agents - nAgents:", self.view.regen_n_agents.value) self.model.env.reset(False, False) self.refresh(event)
[docs] def regenerate(self, event): method = self.view.regen_method.value n_agents = self.view.regen_n_agents.value self.model.regenerate(method, n_agents)
[docs] def set_regen_width(self, event): self.model.set_regen_width(event["new"])
[docs] def set_regen_height(self, event): self.model.set_regen_height(event["new"])
[docs] def load(self, event): self.model.load()
[docs] def save(self, event): self.model.save()
[docs] def save_image(self, event): self.model.save_image()
[docs] def step(self, event): self.model.step()
[docs] def log(self, *args, **kwargs): if self.view is None: print(*args, **kwargs) else: self.view.log(*args, **kwargs)
[docs] def debug(self, *args, **kwargs): self.model.debug(*args, **kwargs)
[docs] def debug_event(self, event): self.model.debug_event(event)
[docs] def getBoundingRectYX(self): return self.boundingRectHeight, self.boundingRectWidth