FEATURE: Add GPIO handler for buttons and LED

This commit is contained in:
Dominique Barton 2019-03-24 18:52:30 +01:00
parent ad42be10a0
commit 6d7e7753aa
2 changed files with 177 additions and 91 deletions

View file

@ -5,119 +5,45 @@ Python module for Mopidy Pummeluff frontend.
from __future__ import absolute_import, unicode_literals, print_function
from os import path, system
from threading import Thread, Event
from time import time
__all__ = (
'PummeluffFrontend',
)
from threading import Event
from logging import getLogger
import pykka
from mopidy import core as mopidy_core
from .rfid_reader import RFIDReader, ReadError
from .cards import Card
from .threads import GPIOHandler, CardReader
LOGGER = getLogger(__name__)
class CardReader(Thread):
'''
Thread which reads RFID cards from the RFID reader.
Because the RFID reader algorithm is reacting to an IRQ (interrupt), it is
blocking as long as no card is touched, even when Mopidy is exiting. Thus,
we're running the thread as daemon thread, which means it's exiting at the
same moment as the main thread (aka Mopidy core) is exiting.
'''
daemon = True
latest = None
@staticmethod
def play_sound(sound):
'''
Play sound via aplay.
:param str sound: The name of the sound file
'''
file_path = path.join(path.dirname(__file__), 'sounds', sound)
system('aplay -q {}'.format(file_path))
def __init__(self, core, stop_event):
'''
Class constructor.
:param threading.Event stop_event: The stop event
'''
super(CardReader, self).__init__()
self.core = core
self.stop_event = stop_event
def run(self):
'''
Run RFID reading loop.
'''
reader = RFIDReader()
prev_time = time()
prev_uid = ''
while not self.stop_event.is_set():
reader.wait_for_tag()
try:
now = time()
uid = reader.uid
if now - prev_time > 1 or uid != prev_uid:
LOGGER.info('Card %s read', uid)
self.handle_uid(uid)
prev_time = now
prev_uid = uid
except ReadError:
pass
reader.cleanup()
def handle_uid(self, uid):
'''
Handle the scanned card / retreived UID.
:param str uid: The UID
'''
card = Card(uid)
if card.registered:
LOGGER.info('Triggering action of registered card')
self.play_sound('success.wav')
card(mopidy_core=self.core)
else:
LOGGER.info('Card is not registered, thus doing nothing')
self.play_sound('fail.wav')
card.scanned = time()
CardReader.latest = card
class PummeluffFrontend(pykka.ThreadingActor, mopidy_core.CoreListener):
'''
Pummeluff frontend which basically reads cards from the RFID reader.
Pummeluff frontend which basically reacts to GPIO button pushes and touches
of RFID cards.
'''
def __init__(self, config, core): # pylint: disable=unused-argument
super(PummeluffFrontend, self).__init__()
self.core = core
self.stop_event = Event()
self.card_reader = CardReader(core=core, stop_event=self.stop_event)
self.core = core
self.stop_event = Event()
self.gpio_handler = GPIOHandler(core=core, stop_event=self.stop_event)
self.card_reader = CardReader(core=core, stop_event=self.stop_event)
def on_start(self):
'''
Start card reader thread after the actor is started.
Start GPIO handler & card reader threads.
'''
self.gpio_handler.start()
self.card_reader.start()
def on_stop(self):
'''
Stop card reader thread before the actor stops.
Set threading stop event to tell GPIO handler & card reader threads to
stop their operations.
'''
self.stop_event.set()

160
mopidy_pummeluff/threads.py Normal file
View file

@ -0,0 +1,160 @@
# -*- coding: utf-8 -*-
'''
Python module for the dedicated Mopidy Pummeluff threads.
'''
__all__ = (
'GPIOHandler',
'CardReader',
)
from __future__ import absolute_import, unicode_literals, print_function
from os import path, system
from threading import Thread
from time import time
from logging import getLogger
import RPi.GPIO as GPIO
from .rfid_reader import RFIDReader, ReadError
from .actions import shutdown, play_pause
from .cards import Card
LOGGER = getLogger(__name__)
class GPIOHandler(Thread):
'''
Thread which handles the GPIO ports, which basically means activating the
LED when it's started and then reacting to button presses.
'''
button_pins = {
5: shutdown,
7: play_pause,
}
led_pin = 11
def __init__(self, core, stop_event):
'''
Class constructor.
:param mopidy.core.Core core: The mopidy core instance
:param threading.Event stop_event: The stop event
'''
super(CardReader, self).__init__()
self.core = core
self.stop_event = stop_event
def init_gpio(self):
'''
Initialize the GPIO button pins and LED pin.
'''
GPIO.setmode(GPIO.BOARD)
for pin in self.button_pins.keys():
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(self.led_pin, GPIO.OUT)
GPIO.output(self.led_pin, GPIO.HIGH)
def run(self):
'''
Run the thread.
'''
GPIO.setmode(GPIO.BOARD)
def callback(pin):
self.button_pins[pin](self.core)
for pin in self.button_pins.values():
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(pin, GPIO.BOTH, callback=callback)
GPIO.setup(self.led_pin, GPIO.OUT)
GPIO.output(self.led_pin, GPIO.HIGH)
self.stop_event.wait()
GPIO.cleanup()
class CardReader(Thread):
'''
Thread which reads RFID cards from the RFID reader.
Because the RFID reader algorithm is reacting to an IRQ (interrupt), it is
blocking as long as no card is touched, even when Mopidy is exiting. Thus,
we're running the thread as daemon thread, which means it's exiting at the
same moment as the main thread (aka Mopidy core) is exiting.
'''
daemon = True
latest = None
@staticmethod
def play_sound(sound):
'''
Play sound via aplay.
:param str sound: The name of the sound file
'''
file_path = path.join(path.dirname(__file__), 'sounds', sound)
system('aplay -q {}'.format(file_path))
def __init__(self, core, stop_event):
'''
Class constructor.
:param mopidy.core.Core core: The mopidy core instance
:param threading.Event stop_event: The stop event
'''
super(CardReader, self).__init__()
self.core = core
self.stop_event = stop_event
def run(self):
'''
Run RFID reading loop.
'''
reader = RFIDReader()
prev_time = time()
prev_uid = ''
while not self.stop_event.is_set():
reader.wait_for_tag()
try:
now = time()
uid = reader.uid
if now - prev_time > 1 or uid != prev_uid:
LOGGER.info('Card %s read', uid)
self.handle_uid(uid)
prev_time = now
prev_uid = uid
except ReadError:
pass
reader.cleanup()
def handle_uid(self, uid):
'''
Handle the scanned card / retreived UID.
:param str uid: The UID
'''
card = Card(uid)
if card.registered:
LOGGER.info('Triggering action of registered card')
self.play_sound('success.wav')
card(mopidy_core=self.core)
else:
LOGGER.info('Card is not registered, thus doing nothing')
self.play_sound('fail.wav')
card.scanned = time()
CardReader.latest = card