REFACTOR: Split threads into separate Python modules

This commit is contained in:
Dominique Barton 2019-03-24 19:01:18 +01:00
parent 6d7e7753aa
commit 6b0bc3908d
4 changed files with 115 additions and 109 deletions

View file

@ -1,48 +0,0 @@
# -*- coding: utf8 -*-
'''
Python card reader module.
'''
from __future__ import absolute_import, unicode_literals, print_function
from RPi.GPIO import cleanup # pylint: disable=no-name-in-module
from pirc522 import RFID
class ReadError(Exception):
'''
Exception which is thrown when the UID could not be read from the card.
'''
class RFIDReader(RFID):
'''
Card reader for the RC522 RFID card reader board, based on the excellent
:py:class:`pirc522.RFID` class.
'''
@staticmethod
def cleanup():
'''
Cleanup GPIO ports.
'''
cleanup()
@property
def uid(self):
'''
Return the UID from the card.
:return: The hex UID
:rtype: string
'''
error, data = self.request() # pylint: disable=unused-variable
if error:
raise ReadError('Could not read tag')
error, uid_chunks = self.anticoll()
if error:
raise ReadError('Could not read UID')
uid = '{0[0]:02X}{0[1]:02X}{0[2]:02X}{0[3]:02X}'.format(uid_chunks) # pylint: disable=invalid-format-index
return uid

View file

@ -0,0 +1,9 @@
# -*- coding: utf-8 -*-
'''
Threads of Mopidy Pummeluff.
'''
from __future__ import absolute_import, unicode_literals, print_function
from .gpio_handler import *
from .card_reader import *

View file

@ -3,80 +3,29 @@
Python module for the dedicated Mopidy Pummeluff threads.
'''
from __future__ import absolute_import, unicode_literals, print_function
__all__ = (
'GPIOHandler',
'CardReader',
)
from __future__ import absolute_import, unicode_literals, print_function
from os import path, system
from threading import Thread
from time import time
from logging import getLogger
import RPi.GPIO as GPIO
from pirc522 import RFID
from .rfid_reader import RFIDReader, ReadError
from .actions import shutdown, play_pause
from .cards import Card
from mopidy_pummeluff.cards import Card
LOGGER = getLogger(__name__)
class GPIOHandler(Thread):
class ReadError(Exception):
'''
Thread which handles the GPIO ports, which basically means activating the
LED when it's started and then reacting to button presses.
Exception which is thrown when an RFID read error occurs.
'''
button_pins = {
5: shutdown,
7: play_pause,
}
led_pin = 11
def __init__(self, core, stop_event):
'''
Class constructor.
:param mopidy.core.Core core: The mopidy core instance
:param threading.Event stop_event: The stop event
'''
super(CardReader, self).__init__()
self.core = core
self.stop_event = stop_event
def init_gpio(self):
'''
Initialize the GPIO button pins and LED pin.
'''
GPIO.setmode(GPIO.BOARD)
for pin in self.button_pins.keys():
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(self.led_pin, GPIO.OUT)
GPIO.output(self.led_pin, GPIO.HIGH)
def run(self):
'''
Run the thread.
'''
GPIO.setmode(GPIO.BOARD)
def callback(pin):
self.button_pins[pin](self.core)
for pin in self.button_pins.values():
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(pin, GPIO.BOTH, callback=callback)
GPIO.setup(self.led_pin, GPIO.OUT)
GPIO.output(self.led_pin, GPIO.HIGH)
self.stop_event.wait()
GPIO.cleanup()
class CardReader(Thread):
@ -111,21 +60,22 @@ class CardReader(Thread):
super(CardReader, self).__init__()
self.core = core
self.stop_event = stop_event
self.rfid = RFID()
def run(self):
'''
Run RFID reading loop.
'''
reader = RFIDReader()
rfid = self.rfid
prev_time = time()
prev_uid = ''
while not self.stop_event.is_set():
reader.wait_for_tag()
rfid.wait_for_tag()
try:
now = time()
uid = reader.uid
uid = self.read_uid()
if now - prev_time > 1 or uid != prev_uid:
LOGGER.info('Card %s read', uid)
@ -137,7 +87,27 @@ class CardReader(Thread):
except ReadError:
pass
reader.cleanup()
GPIO.cleanup() # pylint: disable=no-member
def read_uid(self):
'''
Return the UID from the card.
:return: The hex UID
:rtype: string
'''
rfid = self.rfid
error, data = rfid.request() # pylint: disable=unused-variable
if error:
raise ReadError('Could not read tag')
error, uid_chunks = rfid.anticoll()
if error:
raise ReadError('Could not read UID')
uid = '{0[0]:02X}{0[1]:02X}{0[2]:02X}{0[3]:02X}'.format(uid_chunks) # pylint: disable=invalid-format-index
return uid
def handle_uid(self, uid):
'''

View file

@ -0,0 +1,75 @@
# -*- coding: utf-8 -*-
'''
Python module for the dedicated Mopidy Pummeluff threads.
'''
from __future__ import absolute_import, unicode_literals, print_function
__all__ = (
'GPIOHandler',
)
from threading import Thread
from logging import getLogger
import RPi.GPIO as GPIO
from mopidy_pummeluff.actions import shutdown, play_pause
LOGGER = getLogger(__name__)
class GPIOHandler(Thread):
'''
Thread which handles the GPIO ports, which basically means activating the
LED when it's started and then reacting to button presses.
'''
button_pins = {
11: shutdown,
12: play_pause,
}
led_pin = 7
def __init__(self, core, stop_event):
'''
Class constructor.
:param mopidy.core.Core core: The mopidy core instance
:param threading.Event stop_event: The stop event
'''
super(GPIOHandler, self).__init__()
self.core = core
self.stop_event = stop_event
# pylint: disable=no-member
def init_gpio(self):
'''
Initialize the GPIO button pins and LED pin.
'''
GPIO.setmode(GPIO.BOARD)
for pin in self.button_pins:
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.setup(self.led_pin, GPIO.OUT)
GPIO.output(self.led_pin, GPIO.HIGH)
def run(self):
'''
Run the thread.
'''
GPIO.setmode(GPIO.BOARD)
def callback(pin): # pylint: disable=missing-docstring
self.button_pins[pin](self.core)
for pin in self.button_pins.values():
GPIO.setup(pin, GPIO.IN, pull_up_down=GPIO.PUD_UP)
GPIO.add_event_detect(pin, GPIO.BOTH, callback=callback)
GPIO.setup(self.led_pin, GPIO.OUT)
GPIO.output(self.led_pin, GPIO.HIGH)
self.stop_event.wait()
GPIO.cleanup() # pylint: disable=no-member