Forked from
flow3r / flow3r firmware
1416 commits behind the upstream repository.
event.py 7.86 KiB
from st3m import logging
log = logging.Log(__name__,level=logging.INFO)
log.info("import")
from st3m.system import hardware
import time
import math
import random
EVENTTYPE_TIMED = 1
EVENTTYPE_INPUT = 2
#EVENTTYPE_BUTTON = 2
#EVENTTYPE_CAPTOUCH = 3
#EVENTTYPE_CAPCROSS = 4
class Engine():
def __init__(self):
self.events_timed = []
self.events_input = []
self.next_timed = None
self.last_input_state = None
self.userloop = None
self.is_running = False
self.foreground_app = None
self.active_menu = None
def add(self,event):
if isinstance(event,EventTimed):
self.add_timed(event)
elif isinstance(event,Event):
self.add_input(event)
def add_timed(self,event):
self.events_timed.append(event)
self._sort_timed()
def add_input(self,event):
self.events_input.append(event)
def remove(self,group_id):
self.remove_input(group_id)
self.remove_timed(group_id)
def remove_timed(self,group_id):
#print("before:",len(self.events_timed))
self.events_timed = [event for event in self.events_timed if event.group_id!=group_id]
self._sort_timed()
#print("after",len(self.events_timed))
def remove_input(self,group_id):
self.events_input = [event for event in self.events_input if event.group_id!=group_id]
def _sort_timed(self):
self.events_timed = sorted(self.events_timed, key = lambda event: event.deadline)
def _handle_timed(self):
if not self.next_timed and self.events_timed:
self.next_timed = self.events_timed.pop(0)
now = time.ticks_ms()
if self.next_timed:
diff = time.ticks_diff(self.next_timed.deadline, now)
if diff <= 0:
self.next_timed.trigger({"ticks_ms":now, "ticks_delay": -diff})
self.next_timed = None
def _handle_input(self):
input_state = []
#buttons
input_state.append({
"type" : "button",
"index" : 0,
"value" : hardware.menu_button_get()
})
input_state.append({
"type" : "button",
"index" : 1,
"value" : hardware.application_button_get()
})
#captouch
for i in range(0,10):
input_state.append({
"type" : "captouch",
"index" : i,
"value" : hardware.get_captouch(i),
"radius" : hardware.captouch_get_petal_rad(i),
"angle" : hardware.captouch_get_petal_phi(i)/10000
})
if not self.last_input_state:
self.last_input_state=input_state
#tprint (input_state)
return
for i in range(len(input_state)):
entry = input_state[i]
last_entry = self.last_input_state[i]
#update for all
entry["ticks_ms"] = time.ticks_ms()
if entry["value"] != last_entry["value"]:
#update only when value changed
entry["change"] = True
entry["from"] = last_entry["value"]
else:
#update only when value did not change
entry["change"] = False
#find and trigger the events q
triggered_events = list(filter(lambda e: e.enabled and e.condition(entry),self.events_input))
#print (triggered_events)
#map(lambda e: e.trigger(d), triggered_events)
for e in triggered_events:
e.trigger(entry)
self.last_input_state=input_state
def _handle_userloop(self):
if self.foreground_app:
self.foreground_app.tick()
def _handle_draw(self):
if self.foreground_app:
self.foreground_app.draw()
if self.active_menu:
self.active_menu.draw()
hardware.display_update()
def _eventloop_single(self):
self._handle_timed()
self._handle_input()
self._handle_userloop()
def eventloop(self):
log.info("starting eventloop")
if self.is_running:
log.warning("eventloop already running, doing nothing")
return
self.is_running=True
last_draw = 0
while self.is_running:
self._eventloop_single()
now = time.ticks_ms()
diff = time.ticks_diff(now,last_draw)
#print("diff:",diff)
if diff>10:
#print("eventloop draw")
self._handle_draw()
last_draw = time.ticks_ms()
#self.deadline = time.ticks_add(time.ticks_ms(),ms)
time.sleep_ms(1)
class Event():
def __init__(self,name="unknown",data={},action=None,condition=None,group_id=None,enabled=False):
#print (action)
self.name = name
self.eventtype = None
self.data = data
self.action = action
self.condition = condition
self.enabled = enabled
if not condition:
self.condition = lambda x: True
self.group_id=group_id
if enabled:
self.set_enabled()
def trigger(self,triggerdata={}):
log.debug("triggered {} (with {})".format(self.name,triggerdata))
if not self.action is None:
triggerdata.update(self.data)
self.action(triggerdata)
def set_enabled(self,enabled=True):
self.enabled=enabled
if enabled:
the_engine.add(self)
else:
self.remove()
def remove(self):
log.info(f"remove {self}")
while self in the_engine.events_input:
#print ("from input")
the_engine.events_input.remove(self)
while self in the_engine.events_timed:
#print("from timed")
the_engine.events_timed.remove(self)
the_engine._sort_timed()
class EventTimed(Event):
def __init__(self,ms,name="timer", *args, **kwargs):
#super().__init__(name,data,action)
self.deadline = time.ticks_add(time.ticks_ms(),ms)
super().__init__(*args, **kwargs)
self.name=name
self.type=EVENTTYPE_TIMED
def __repr__(self):
return ("event on tick {} ({})".format(self.deadline,self.name))
#hack, make this oo
def on_restart(data):
print("loop sequence")
obj = data["object"]
if obj.is_running:
obj.start()
class Sequence():
def __init__(self,bpm=60,loop=True,steps=16,action=None):
self.group_id = random.randint(0,100000000)
self.bpm = bpm
self.steps = steps
self.repeat_event = None
self.loop = loop
self.events = []
self.is_running = False
if not action:
self.action = lambda data: log.info("step {}".format(data.get("step")))
else:
self.action = action
def start(self):
if self.is_running: self.stop()
stepsize_ms = int(60*1000/self.bpm)
for i in range(self.steps):
log.debug("adding sequence event ", i)
self.events.append(EventTimed(stepsize_ms*i,name="seq{}".format(i),action=self.action, data={'step':i}, group_id=self.group_id,enabled=True))
if self.loop:
self.repeat_event=EventTimed(stepsize_ms*self.steps,name="loop", group_id=self.group_id, enabled=True, action=on_restart, data={"object":self})
self.is_running=True
def stop(self):
#for e in self.events: e.remove()
log.info("sequence stop")
the_engine.remove_timed(group_id=self.group_id)
self.events = []
if self.repeat_event: self.repeat_event.remove()
self.is_running=False
global the_engine
the_engine = Engine()