Skip to content
Snippets Groups Projects

Compare revisions

Changes are shown as if the source revision was being merged into the target revision. Learn more about comparing revisions.

Source

Select target project
No results found
Select Git revision
  • blinkisync-as-preload
  • button_gestures
  • ch3/api-speed-eval2
  • ch3/dual-core
  • ch3/genapi-refactor
  • ch3/leds-api
  • ch3/splashscreen
  • dualcore
  • ecg-bpm-counter
  • ecg-button-gestures
  • fix-bhi160-axes
  • freertos-btle
  • genofire/ble-follow-py
  • genofire/haule-ble-fs-deactive
  • genofire/leds_rgb_get_state
  • genofire/rockets-state
  • hauke/ble-cleanups
  • ios-workarounds
  • koalo/bhi160-works-but-dirty
  • koalo/factory-reset
  • koalo/wip/i2c-for-python
  • master
  • msgctl/faultscreen
  • msgctl/gfx_rle
  • msgctl/textbuffer_api
  • patch-1
  • plaetzchen/ios-workaround
  • rahix/bhi
  • rahix/bma
  • rahix/simple_menu
  • renze/hatchery_apps
  • renze/safe_mode
  • schleicher-test
  • schneider/ble-buffers
  • schneider/bonding
  • schneider/bootloader-update-9a0d158
  • schneider/bsec
  • schneider/fundamental-test
  • schneider/max30001
  • schneider/max30001-epicaridum
  • schneider/max30001-pycardium
  • schneider/maxim-sdk-update
  • schneider/mp-for-old-bl
  • schneider/schleicher-test
  • schneider/stream-locks
  • bootloader-v1
  • release-1
  • v0.0
  • v1.0
  • v1.1
  • v1.2
  • v1.3
  • v1.4
  • v1.5
  • v1.6
  • v1.7
  • v1.8
57 results

Target

Select target project
  • card10/firmware
  • annejan/firmware
  • astro/firmware
  • fpletz/firmware
  • gerd/firmware
  • fleur/firmware
  • swym/firmware
  • l/firmware
  • uberardy/firmware
  • wink/firmware
  • madonius/firmware
  • mot/firmware
  • filid/firmware
  • q3k/firmware
  • hauke/firmware
  • Woazboat/firmware
  • pink/firmware
  • mossmann/firmware
  • omniskop/firmware
  • zenox/firmware
  • trilader/firmware
  • Danukeru/firmware
  • shoragan/firmware
  • zlatko/firmware
  • sistason/firmware
  • datenwolf/firmware
  • bene/firmware
  • amedee/firmware
  • martinling/firmware
  • griffon/firmware
  • chris007/firmware
  • adisbladis/firmware
  • dbrgn/firmware
  • jelly/firmware
  • rnestler/firmware
  • mh/firmware
  • ln/firmware
  • penguineer/firmware
  • monkeydom/firmware
  • jens/firmware
  • jnaulty/firmware
  • jeffmakes/firmware
  • marekventur/firmware
  • pete/firmware
  • h2obrain/firmware
  • DooMMasteR/firmware
  • jackie/firmware
  • prof_r/firmware
  • Draradech/firmware
  • Kartoffel/firmware
  • hinerk/firmware
  • abbradar/firmware
  • JustTB/firmware
  • LuKaRo/firmware
  • iggy/firmware
  • ente/firmware
  • flgr/firmware
  • Lorphos/firmware
  • matejo/firmware
  • ceddral7/firmware
  • danb/firmware
  • joshi/firmware
  • melle/firmware
  • fitch/firmware
  • deurknop/firmware
  • sargon/firmware
  • markus/firmware
  • kloenk/firmware
  • lucaswerkmeister/firmware
  • derf/firmware
  • meh/firmware
  • dx/card10-firmware
  • torben/firmware
  • yuvadm/firmware
  • AndyBS/firmware
  • klausdieter1/firmware
  • katzenparadoxon/firmware
  • xiretza/firmware
  • ole/firmware
  • techy/firmware
  • thor77/firmware
  • TilCreator/firmware
  • fuchsi/firmware
  • dos/firmware
  • yrlf/firmware
  • PetePriority/firmware
  • SuperVirus/firmware
  • sur5r/firmware
  • tazz/firmware
  • Alienmaster/firmware
  • flo_h/firmware
  • baldo/firmware
  • mmu_man/firmware
  • Foaly/firmware
  • sodoku/firmware
  • Guinness/firmware
  • ssp/firmware
  • led02/firmware
  • Stormwind/firmware
  • arist/firmware
  • coon/firmware
  • mdik/firmware
  • pippin/firmware
  • royrobotiks/firmware
  • zigot83/firmware
  • mo_k/firmware
106 results
Select Git revision
  • add_menu_vibration
  • blinkisync-as-preload
  • ch3/api-speed-eval2
  • ch3/dual-core
  • ch3/genapi-refactor
  • ch3/leds-api
  • ch3/splashscreen
  • dualcore
  • dx/flatten-config-module
  • dx/meh-bdf-to-stm
  • freertos-btle
  • genofire/ble-follow-py
  • koalo/bhi160-works-but-dirty
  • koalo/factory-reset
  • koalo/wip/i2c-for-python
  • master
  • msgctl/faultscreen
  • msgctl/textbuffer_api
  • plaetzchen/ios-workaround
  • rahix/bhi
  • rahix/bluetooth-app-favorite
  • rahix/bma
  • rahix/user-space-ctx
  • renze/hatchery_apps
  • renze/safe_mode
  • schleicher-test
  • schneider/212-reset-hardware-when-entering-repl
  • schneider/ancs
  • schneider/ble-buffers
  • schneider/ble-central
  • schneider/ble-ecg-stream-visu
  • schneider/ble-fixes-2020-3
  • schneider/ble-mini-demo
  • schneider/ble-stability
  • schneider/ble-stability-new-phy
  • schneider/bonding
  • schneider/bonding-fail-if-full
  • schneider/bootloader-update-9a0d158
  • schneider/deepsleep
  • schneider/deepsleep2
  • schneider/deepsleep4
  • schneider/default-main
  • schneider/freertos-list-debug
  • schneider/fundamental-test
  • schneider/iaq-python
  • schneider/ir
  • schneider/max30001
  • schneider/max30001-epicaridum
  • schneider/max30001-pycardium
  • schneider/maxim-sdk-update
  • schneider/mp-exception-print
  • schneider/mp-for-old-bl
  • schneider/png
  • schneider/schleicher-test
  • schneider/sdk-0.2.1-11
  • schneider/sdk-0.2.1-7
  • schneider/sleep-display
  • schneider/spo2-playground
  • schneider/stream-locks
  • schneider/v1.17-changelog
  • bootloader-v1
  • release-1
  • v0.0
  • v1.0
  • v1.1
  • v1.10
  • v1.11
  • v1.12
  • v1.13
  • v1.14
  • v1.15
  • v1.16
  • v1.17
  • v1.18
  • v1.2
  • v1.3
  • v1.4
  • v1.5
  • v1.6
  • v1.7
  • v1.8
  • v1.9
82 results
Show changes
Commits on Source (2)
......@@ -97,6 +97,7 @@ autodoc_mock_imports = [
"ucollections",
"urandom",
"utime",
"vibra",
]
autodoc_member_order = "bysource"
......
......@@ -26,6 +26,7 @@ Last but not least, if you want to start hacking the lower-level firmware, the
pycardium/bme680
pycardium/max30001
pycardium/buttons
pycardium/button_gestures
pycardium/color
pycardium/display
pycardium/gpio
......
``button_gestures`` - Button Gesture Detection
==============================================
.. automodule:: button_gestures
:members:
import button_gestures as bg
import os
import display
import leds
......@@ -26,17 +27,23 @@ COLOR_WRITE_FG = [255, 255, 255]
COLOR_WRITE_BG = [255, 0, 0]
current_mode = MODE_FINGER
# FIFO from callback
new_datasets = []
history = []
filebuffer = bytearray()
write = 0
recording_timestamp = 0
bias = True
update_screen = 0
samples_since_draw = 0
pause_screen = 0
pause_histogram = False
histogram_offset = 0
sensor = 0
disp = display.open()
mapper = bg.ActionMapper()
KEYMAP_PAUSED = {}
KEYMAP_NORMAL = {}
leds.dim_top(1)
COLORS = [((23 + (15 * i)) % 360, 1.0, 1.0) for i in range(11)]
......@@ -47,9 +54,9 @@ alpha = 2
beta = 3
def update_history(datasets):
def update_history(samples):
global history, moving_average, alpha, beta
for val in datasets:
for val in samples:
history.append(val - moving_average)
moving_average = (alpha * moving_average + beta * val) / (alpha + beta)
......@@ -75,8 +82,8 @@ def neighbours(n, lst):
yield lst[i : i + n]
def detect_pulse(num_new_samples):
global history, pulse, samples_since_last_pulse, q_threshold, r_threshold, q_spike
def detect_pulse(all_samples, num_new_samples):
global pulse, samples_since_last_pulse, q_threshold, r_threshold, q_spike
# look at 3 consecutive samples, starting 2 samples before the samples that were just added, e.g.:
# existing samples: "ABCD"
......@@ -84,7 +91,7 @@ def detect_pulse(num_new_samples):
# consider ["CDE", "DEF"]
# new samples: "GHI" => "ABCDEFGHI"
# consider ["EFG", "FGH", "GHI"]
for [prev, cur, next_] in neighbours(3, history[-(num_new_samples + 2) :]):
for [prev, cur, next_] in neighbours(3, all_samples[-(num_new_samples + 2) :]):
samples_since_last_pulse += 1
if prev > cur < next_ and cur < q_threshold:
......@@ -110,32 +117,36 @@ def detect_pulse(num_new_samples):
pulse = -1
def callback_ecg(datasets):
global update_screen, history, filebuffer, write
update_screen += len(datasets)
def callback_ecg(dataset):
new_datasets.append(dataset)
def process_samples(samples):
global samples_since_draw, history, filebuffer, recording_timestamp
samples_since_draw += len(samples)
# update histogram datalist
if not pause_histogram:
update_history(datasets)
detect_pulse(len(datasets))
update_history(samples)
detect_pulse(history, len(samples))
# buffer for writes
if write > 0:
for value in datasets:
if recording_timestamp > 0:
for value in samples:
filebuffer.extend(struct.pack("h", value))
if len(filebuffer) >= FILEBUFFERBLOCK:
write_filebuffer()
# don't update on every callback
if update_screen >= DRAW_AFTER_SAMPLES:
if samples_since_draw >= DRAW_AFTER_SAMPLES:
draw_histogram()
samples_since_draw = 0
def write_filebuffer():
global write, filebuffer
global recording_timestamp, filebuffer
# write to file
chars = ""
lt = utime.localtime(write)
lt = utime.localtime(recording_timestamp)
filename = "/ecg-{:04d}-{:02d}-{:02d}_{:02d}{:02d}{:02d}.log".format(*lt)
# write stuff to disk
......@@ -145,7 +156,7 @@ def write_filebuffer():
f.close()
except OSError as e:
print("Please check the file or filesystem", e)
write = 0
recording_timestamp = 0
pause_screen = -1
disp.clear(COLOR_BACKGROUND)
disp.print("IO Error", posy=0, fg=COLOR_TEXT)
......@@ -156,7 +167,7 @@ def write_filebuffer():
close_sensor()
except:
print("Unexpected error, stop writeing logfile")
write = 0
recording_timestamp = 0
filebuffer = bytearray()
......@@ -172,8 +183,8 @@ def open_sensor():
def close_sensor():
global sensor
sensor.close()
pass
def toggle_mode():
......@@ -191,30 +202,52 @@ def toggle_bias():
def toggle_write():
global write, disp, pause_screen
global recording_timestamp, disp, pause_screen
pause_screen = utime.time_ms() + 1000
disp.clear(COLOR_BACKGROUND)
if write > 0:
if recording_timestamp > 0:
write_filebuffer()
write = 0
recording_timestamp = 0
disp.print("Stop", posx=50, posy=20, fg=COLOR_TEXT)
disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT)
else:
filebuffer = bytearray()
write = utime.time()
recording_timestamp = utime.time()
disp.print("Start", posx=45, posy=20, fg=COLOR_TEXT)
disp.print("logging", posx=30, posy=40, fg=COLOR_TEXT)
disp.update()
def toggle_pause():
def set_paused(paused):
global pause_histogram, histogram_offset, history
if pause_histogram:
pause_histogram = False
history = []
pause_histogram = paused
histogram_offset = 0
if paused:
mapper.set_mapping(KEYMAP_PAUSED)
else:
pause_histogram = True
mapper.set_mapping(KEYMAP_NORMAL)
history = []
def scroll_left():
global histogram_offset
l = len(history)
histogram_offset += ECG_RATE / 2
if l - histogram_offset < WIDTH:
histogram_offset = l - WIDTH
def scroll_right():
global histogram_offset
histogram_offset -= ECG_RATE / 2
histogram_offset -= histogram_offset % (ECG_RATE / 2)
if histogram_offset < 0:
histogram_offset = 0
......@@ -226,7 +259,7 @@ def draw_leds(val):
def draw_histogram():
global disp, history, current_mode, bias, write, pause_screen, update_screen
global disp, history, current_mode, bias, recording_timestamp, pause_screen
# skip rendering due to message beeing shown
if pause_screen == -1:
......@@ -294,17 +327,29 @@ def draw_histogram():
)
# announce writing ecg log
if write > 0:
if recording_timestamp > 0:
t = utime.time()
if write > 0 and t % 2 == 0:
if recording_timestamp > 0 and t % 2 == 0:
disp.print("LOG", posx=0, posy=60, fg=COLOR_WRITE_FG, bg=COLOR_WRITE_BG)
disp.update()
update_screen = 0
def main():
global pause_histogram, histogram_offset
global new_datasets, KEYMAP_NORMAL, KEYMAP_PAUSED
KEYMAP_NORMAL = {
(buttons.BOTTOM_LEFT, bg.SHORT_PRESS): toggle_write,
(buttons.BOTTOM_RIGHT, bg.SHORT_PRESS): toggle_bias,
(buttons.TOP_RIGHT, bg.SHORT_PRESS): toggle_mode,
(buttons.TOP_RIGHT, bg.LONG_PRESS): lambda: set_paused(True),
}
KEYMAP_PAUSED = {
(buttons.BOTTOM_LEFT, bg.SHORT_PRESS): scroll_left,
(buttons.BOTTOM_RIGHT, bg.SHORT_PRESS): scroll_right,
(buttons.TOP_RIGHT, bg.SHORT_PRESS): lambda: set_paused(False),
}
# show button layout
disp.clear(COLOR_BACKGROUND)
......@@ -317,71 +362,17 @@ def main():
# start ecg
open_sensor()
while True:
button_pressed = {"BOTTOM_LEFT": 0, "BOTTOM_RIGHT": 0, "TOP_RIGHT": 0}
while True:
v = buttons.read(
buttons.BOTTOM_LEFT | buttons.BOTTOM_RIGHT | buttons.TOP_RIGHT
)
# BOTTOM LEFT
if button_pressed["BOTTOM_LEFT"] == 0 and v & buttons.BOTTOM_LEFT != 0:
button_pressed["BOTTOM_LEFT"] = utime.time_ms()
if not pause_histogram:
toggle_write()
else:
l = len(history)
histogram_offset += ECG_RATE / 2
if l - histogram_offset < WIDTH:
histogram_offset = l - WIDTH
if button_pressed["BOTTOM_LEFT"] > 0 and v & buttons.BOTTOM_LEFT == 0:
duration = utime.time_ms() - button_pressed["BOTTOM_LEFT"]
button_pressed["BOTTOM_LEFT"] = 0
# BOTTOM RIGHT
if button_pressed["BOTTOM_RIGHT"] == 0 and v & buttons.BOTTOM_RIGHT != 0:
button_pressed["BOTTOM_RIGHT"] = utime.time_ms()
if not pause_histogram:
toggle_bias()
else:
histogram_offset -= ECG_RATE / 2
histogram_offset -= histogram_offset % (ECG_RATE / 2)
if histogram_offset < 0:
histogram_offset = 0
if button_pressed["BOTTOM_RIGHT"] > 0 and v & buttons.BOTTOM_RIGHT == 0:
duration = utime.time_ms() - button_pressed["BOTTOM_RIGHT"]
button_pressed["BOTTOM_RIGHT"] = 0
# TOP RIGHT
# down, and still pressed
if button_pressed["TOP_RIGHT"] > 0 and v & buttons.TOP_RIGHT != 0:
duration = utime.time_ms() - button_pressed["TOP_RIGHT"]
if duration > 1000:
button_pressed["TOP_RIGHT"] = -1
toggle_pause()
# register down event
elif button_pressed["TOP_RIGHT"] == 0 and v & buttons.TOP_RIGHT != 0:
button_pressed["TOP_RIGHT"] = utime.time_ms()
# register up event but event already called
if button_pressed["TOP_RIGHT"] == -1 and v & buttons.TOP_RIGHT == 0:
button_pressed["TOP_RIGHT"] = 0
mapper.set_mapping(KEYMAP_NORMAL)
while True:
mapper.update()
# register normal up event
elif button_pressed["TOP_RIGHT"] > 0 and v & buttons.TOP_RIGHT == 0:
duration = utime.time_ms() - button_pressed["TOP_RIGHT"]
button_pressed["TOP_RIGHT"] = 0
if pause_histogram:
toggle_pause()
else:
toggle_mode()
processed_datasets = 0
for dataset in new_datasets:
process_samples(dataset)
processed_datasets += 1
new_datasets = new_datasets[processed_datasets:]
if __name__ == "__main__":
main()
"""
This module provides button gesture detection (short/medium/long press, double
press, ...) using either an event-stream (:py:class:`GestureFeed`) or a
key-map/functional (:py:class:`ActionMapper`) interface.
Recognized events:
* :py:const:`BUTTON_PRESSED`: The button has just been pressed
* :py:const:`BUTTON_RELEASED`: The button has just been released
* :py:const:`SHORT_PRESS`
* :py:const:`MEDIUM_PRESS`
* :py:const:`LONG_PRESS`
* :py:const:`DOUBLE_PRESS`
"""
import buttons
from functools import reduce
import utime
import vibra
import sys
_BUTTONS = [
buttons.TOP_LEFT,
buttons.TOP_RIGHT,
buttons.BOTTOM_LEFT,
buttons.BOTTOM_RIGHT,
]
BUTTON_PRESSED = 1
BUTTON_RELEASED = 2
SHORT_PRESS = 3
MEDIUM_PRESS = 4
LONG_PRESS = 5
DOUBLE_PRESS = 6
# represents a short press for double press detection when short presses are disabled
# only used internally
HALF_DOUBLE_PRESS = 7
# dummy event, emitted once per button for each _raw_events() call
NO_EVENT = 100
def _is_dummy_event(ev):
return ev.event in [NO_EVENT]
class Button:
def __init__(self):
# Button has been pressed since this timestamp
self.last_pressed = 0
# The latest duration timeout that was reached, or None if not pressed
self.current_duration = None
# The events that should be returned for this button
self.enabled_events = set()
# If the current press has been handled already
self.press_handled = True
# Timestamp of the last press if it was short, otherwise None
# (for double press detection)
self.last_double_press = None
class ButtonEvent:
"""
A gesture event.
.. py:attribute:: button
The ID of the button this event was triggered on
(e.g. :py:data:`buttons.BOTTOM_LEFT`)
.. py:attribute:: event
The Event that was triggered (e.g. :py:const:`SHORT_PRESS`)
"""
def __init__(self, button, event, time):
self.button = button
self.event = event
self.time = time
def __str__(self):
return "btn={}, ev={}".format(self.button, self.event)
class GestureFeed:
"""
Recognizes various button gestures, emits events accordingly and provides
tactile feedback for durations.
:param int time_medium: The time in ms a button needs to be pressed to count as a medium press
:param int time_long: The time in ms a button needs to be pressed to count as a long press
:param int double_press_delay: The maximum time in ms between short presses to still count as a double press
:param int feedback_time: The duration in ms the vibration motor should be running for tactile feedback
**Example**:
.. code-block:: python
import buttons
import button_gestures as bg
feed = bg.GestureFeed()
feed.set_enabled_events(buttons.BOTTOM_LEFT, [
bg.SHORT_PRESS, bg.MEDIUM_PRESS, bg.DOUBLE_PRESS
])
for ev in feed.run():
if ev.button == buttons.BOTTOM_LEFT and ev.event == bg.SHORT_PRESS:
print("Normal press!")
elif ev.button == buttons.BOTTOM_LEFT and ev.event == bg.MEDIUM_PRESS:
print("Slightly longer press!")
elif ev.button == buttons.BOTTOM_LEFT and ev.event == bg.DOUBLE_PRESS:
print("Double press!")
"""
def __init__(
self, time_medium=400, time_long=1000, double_press_delay=400, feedback_time=20
):
if time_medium > 10000 or time_long > 10000:
raise ValueError("Sanity check failed: press duration over 10s")
self.time_medium = time_medium
self.time_long = time_long
self.double_press_delay = double_press_delay
self.feedback_time = feedback_time
self._buttons = {b: Button() for b in _BUTTONS}
def _vibra_feedback(self, ev):
if ev.event in [MEDIUM_PRESS, LONG_PRESS]:
vibra.vibrate(self.feedback_time)
def _raw_events(self, btn_values=None, curr_time=None):
"""
This method generates the raw event stream used by all other methods.
Every time it is called, it updates internal state and yields all raw
events that have occured since the last call.
Pressing a button immediately triggers these events:
* :py:data:`BUTTON_PRESSED`
* :py:data:`SHORT_PRESS`
Keeping a button pressed triggers the following events after their
respective timeout:
* :py:data:`MEDIUM_PRESS`
* :py:data:`LONG_PRESS`
When the button is released, a :py:data:`BUTTON_RELEASED` event is triggered.
Additionally, a :py:data:`NO_EVENT` pseudo-event is generated for every
button on every call, which can be used by other event stream functions
(like :py:meth:`_add_double_presses`) to perform actions periodically,
even if no actual button events occured.
:param int btn_values: Mock button values, used for debugging.
:param int curr_time: Mock timestamp, used for debugging.
"""
if btn_values is None:
btn_values = buttons.read(reduce(lambda x, y: x | y, self._buttons.keys()))
if curr_time is None:
curr_time = utime.time_ms()
for index, btn in self._buttons.items():
# whether the button is pressed right now
pressed = (btn_values & index) != 0
def make_event(event_id):
return ButtonEvent(index, event_id, curr_time)
# insert a dummy event for each button, used to refresh double-press logic
# even when no actual change occured
yield make_event(NO_EVENT)
if pressed and btn.current_duration is None:
# just pressed
btn.last_pressed = curr_time
yield make_event(BUTTON_PRESSED)
btn.current_duration = SHORT_PRESS
yield make_event(SHORT_PRESS)
elif pressed:
# still pressed
duration = curr_time - btn.last_pressed
if btn.current_duration == SHORT_PRESS and duration > self.time_medium:
btn.current_duration = MEDIUM_PRESS
yield make_event(MEDIUM_PRESS)
if btn.current_duration == MEDIUM_PRESS and duration > self.time_long:
btn.current_duration = LONG_PRESS
yield make_event(LONG_PRESS)
elif not pressed and btn.current_duration is not None:
# just released
yield make_event(BUTTON_RELEASED)
btn.current_duration = None
def _add_feedback(self, events, callback=None):
"""
Passes through a raw event stream (i.e. from :py:meth:`_raw_events`)
unchanged, but provides tactile feedback for enabled events using the
vibration motor (or calls any arbitrary feedback function).
:param events: The unfiltered event stream
:param function callback: The function to be called when any enabled event is emitted (default: pulse the vibration motor)
:type events: Iterable of ButtonEvents
:return: The unchanged event stream
"""
if callback is None:
callback = self._vibra_feedback
for ev in events:
btn = self._buttons[ev.button]
enabled = btn.enabled_events
if ev.event in enabled:
callback(ev)
yield ev
def _clean_events(self, events):
"""
Filters raw events (i.e. from :py:meth:`_raw_events`) to those enabled
for the respective button, deduplicates duration events and synthesizes
them when a button is released without one having been sent before
(e.g. if the button was held for only a short duration, but a long
duration is also enabled).
Additionally, a virtual :py:data:`HALF_DOUBLE_PRESS` event is inserted
on short presses if double presses are enabled, so they can later be
accumulated by :py:meth:`_add_double_presses`.
:param events: The unfiltered event stream
:type events: Iterable of ButtonEvents
:return: The cleaned event stream
"""
for ev in events:
btn = self._buttons[ev.button]
enabled = btn.enabled_events
# pass through dummy events
if _is_dummy_event(ev):
yield ev
continue
# a fresh press, mark it as unhandled
if ev.event == BUTTON_PRESSED:
btn.press_handled = False
# synthesize duration event when button was released without one having been passed through before
elif ev.event == BUTTON_RELEASED and not btn.press_handled:
btn.press_handled = True
if LONG_PRESS in enabled and btn.current_duration == LONG_PRESS:
yield ButtonEvent(ev.button, LONG_PRESS, ev.time)
elif MEDIUM_PRESS in enabled and btn.current_duration in [
MEDIUM_PRESS,
LONG_PRESS,
]:
yield ButtonEvent(ev.button, MEDIUM_PRESS, ev.time)
elif SHORT_PRESS in enabled:
yield ButtonEvent(ev.button, SHORT_PRESS, ev.time)
elif DOUBLE_PRESS in enabled:
# we need to detect double presses, but can't send short presses -
# use a dummy event instead
yield ButtonEvent(ev.button, HALF_DOUBLE_PRESS, ev.time)
# pass through raw events if enabled
if ev.event == BUTTON_PRESSED and BUTTON_PRESSED in enabled:
yield ev
elif ev.event == BUTTON_RELEASED and BUTTON_RELEASED in enabled:
yield ev
# duration events should only be triggered once
if btn.press_handled:
continue
enabled_masked_short = enabled & {
DOUBLE_PRESS,
SHORT_PRESS,
MEDIUM_PRESS,
LONG_PRESS,
}
# if a duration event comes in and no longer duration event is enabled for this button, send it immediately
if ev.event == SHORT_PRESS and (
enabled_masked_short & {DOUBLE_PRESS, SHORT_PRESS}
== enabled_masked_short
and enabled_masked_short != set()
):
# either short or double press are enabled, and neither medium nor long are
btn.press_handled = True
if SHORT_PRESS in enabled:
yield ev
elif DOUBLE_PRESS in enabled:
# we need to detect double presses, but can't send short presses - use a dummy event instead
yield ButtonEvent(ev.button, HALF_DOUBLE_PRESS, ev.time)
elif ev.event == MEDIUM_PRESS and (
enabled & {MEDIUM_PRESS, LONG_PRESS}
) == {MEDIUM_PRESS}:
# medium press is enabled, and long press isn't
btn.press_handled = True
yield ev
elif ev.event == LONG_PRESS and (enabled & {LONG_PRESS}) == {LONG_PRESS}:
btn.press_handled = True
yield ev
def _add_double_presses(self, events):
"""
Combines :py:data:`SHORT_PRESS`/:py:data:`HALF_DOUBLE_PRESS` events from
a cleaned event stream (i.e. from :py:meth:`_clean_events`) into
:py:data:`DOUBLE_PRESS` events if they are within a timeout, and passes
everything else through as-is.
:param events: The cleaned event stream
:type events: Iterable of ButtonEvents
:return: The same event stream, with :py:data:`DOUBLE_PRESS` events added and :py:data:`SHORT_PRESS` events suppressed
"""
for ev in events:
btn = self._buttons[ev.button]
# check for expired double presses
if btn.last_double_press is not None:
duration = ev.time - btn.last_double_press
if duration >= self.double_press_delay:
if SHORT_PRESS in btn.enabled_events:
# replay the previously swallowed short press
yield ButtonEvent(ev.button, SHORT_PRESS, btn.last_double_press)
# reset marker
btn.last_double_press = None
if (
ev.event in [SHORT_PRESS, HALF_DOUBLE_PRESS]
and DOUBLE_PRESS in btn.enabled_events
):
if btn.last_double_press is None:
# mark double press as armed, swallow short press event
btn.last_double_press = ev.time
else:
# there has been a recent short press, and since any
# expired markers have been discarded already, it has to be
# a complete double press now
yield ButtonEvent(ev.button, DOUBLE_PRESS, btn.last_double_press)
btn.last_double_press = None
else:
if (
ev.event in [MEDIUM_PRESS, LONG_PRESS]
and btn.last_double_press is not None
):
# invalidate any previous short presses when a longer press is detected
if SHORT_PRESS in btn.enabled_events:
yield ButtonEvent(ev.button, SHORT_PRESS, btn.last_double_press)
btn.last_double_press = None
# always pass through normal press events
yield ev
def _remove_dummy_events(self, events):
return (ev for ev in events if not _is_dummy_event(ev))
def new_events(self, btn_values=None, curr_time=None, feedback=None):
"""
The output of all event stream functions chained together. This method
should be called periodically, and returns all events that have occured
since the last time it was called.
:return: All events that have occured since the last call
:rtype: Iterable of :py:class:`ButtonEvent`
"""
# function chaining for dummies
return self._remove_dummy_events(
self._add_double_presses(
self._clean_events(
self._add_feedback(
self._raw_events(btn_values=btn_values, curr_time=curr_time),
callback=feedback,
)
)
)
)
def run(self, sleep_time=100):
"""
A wrapper around :py:meth:`new_events` that yields a continuous feed of
gesture events.
:param sleep_time: The time in ms to sleep between button polling (or None to disable sleeping)
:type sleep_time: int or None
:return: An event feed
:rtype: Iterable of :py:class:`ButtonEvent`
"""
while True:
for ev in self.new_events():
yield ev
if sleep_time is not None:
utime.sleep_ms(sleep_time)
def disable_all_events(self):
"""
Disable all events on all buttons. Useful for cleaning up before
changing to another button layout.
"""
for btn in self._buttons.values():
btn.enabled_events.clear()
def set_enabled_events(self, button, events):
"""
Set the events to watch for on a specific button.
It is important to enable only the exact events required, otherwise
wanted events may be swallowed in favor of unused ones (example: only
short presses are required, but long presses are also enabled - pressing
a button for a longer period of time will not trigger a short press
event anymore)
**Example**:
.. code-block:: python
import button_gestures as bg
feed = bg.GestureFeed()
feed.set_enabled_events(buttons.BOTTOM_LEFT, [bg.SHORT_PRESS, bg.LONG_PRESS])
feed.set_enabled_events(buttons.BOTTOM_RIGHT, [bg.DOUBLE_PRESS])
"""
if button not in self._buttons:
raise ValueError("Bad button index: {}".format(button))
self._buttons[button].enabled_events = set(events)
class ActionMapper:
"""
A convenience class that maps events to actions and manages the underlying
:py:class:`GestureFeed`.
:param feed: An optional feed instance to use internally
:type feed: :py:class:`GestureFeed` or None
**Example**:
.. code-block:: python
import sys
import button_gestures as bg
def hello():
print("Hello world!")
mapper = bg.ActionMapper()
mapping = {
(buttons.BOTTOM_LEFT, bg.SHORT_PRESS): lambda: print("Short!"),
(buttons.BOTTOM_LEFT, bg.MEDIUM_PRESS): lambda: print("Medium!"),
(buttons.BOTTOM_LEFT, bg.LONG_PRESS): lambda: print("Long!"),
(buttons.BOTTOM_LEFT, bg.DOUBLE_PRESS): hello,
(buttons.BOTTOM_RIGHT, bg.DOUBLE_PRESS): sys.exit,
}
mapper.set_mapping(mapping)
mapper.run()
"""
def __init__(self, feed=None):
self.disabled_actions = set()
# mapping from events to actions
self._bindings = {}
if feed is None:
self._feed = GestureFeed()
else:
self._feed = feed
def set_mapping(self, mapping):
"""
Apply a button mapping (dictionary of (button, event) -> action).
:param mapping: The actions to be bound to their respecive events
"""
self._bindings = mapping
self._feed.disable_all_events()
# this would be the place for a defaultdict! Unfortunately that's
# completely broken in micropython-lib#b89114c (no way to iterate
# over it)
required_events = {}
for btn, event in mapping.keys():
if btn in required_events:
required_events[btn].append(event)
else:
required_events[btn] = [event]
for btn, events in required_events.items():
self._feed.set_enabled_events(btn, events)
def on_action(self, event, action):
"""
By default, call the action as a function. If this is not desired, this
method can be overridden to use the event and action arbitrarily.
"""
try:
action()
except Exception as e:
print("Action for event ({}) failed to execute:".format(event))
sys.print_exception(e)
def update(self):
"""
Poll for new events from the feed and call :py:meth:`on_action` for
each. Needs to be run periodically.
"""
for ev in self._feed.new_events():
ident = (ev.button, ev.event)
if ident not in self._bindings:
raise RuntimeError(
"Received event that isn't bound to anything, please report a bug for button_gestures ({})".format(
ev
)
)
else:
action = self._bindings[ident]
if action not in self.disabled_actions:
self.on_action(ev, action)
def run(self, sleep_time=100):
"""
Run the mapper continuously. The buttons will be polled with the
specified interval, and :py:meth:`on_action` will be called any time a
mapped event is recognized.
:param sleep_time: The time in ms to sleep after every button poll (or None to disable waiting)
:type sleep_time: int or None
"""
while True:
self.update()
if sleep_time is not None:
utime.sleep_ms(sleep_time)
......@@ -9,6 +9,7 @@ python_modules = files(
'pride.py',
'ledfx.py',
'simple_menu.py',
'button_gestures.py',
# MicroPython Standard-Library
'col_defaultdict.py',
......