#!/usr/bin/env python #-*- coding: utf-8 -*- import ast import threading import queue import time import sys import requests import uuid import socket import re import board import neopixel_spi import pyudev import os import ioctl_opt import fcntl import ctypes import struct import logging from typing import List import RPi.GPIO as GPIO from config import initialize_config, APPLICATION_NAME, DEFAULT_CONFIG from keycode import KEYCODE # Setup LED LEDS = neopixel_spi.NeoPixel_SPI(board.SPI(), 10) # Setup Buzzer buzzer_pin = 18 GPIO.setup(buzzer_pin, GPIO.OUT) BUZZER = GPIO.PWM(buzzer_pin, 440) # Setup Constants used fo binding to keypad # from input-event-codes.h # type can be EV_SYN, EV_KEY or EV_MSC EV_KEY = 1 KEY_DOWN = 1 EVENT_FORMAT = "llHHI" EVENT_SIZE = struct.calcsize(EVENT_FORMAT) def string_to_list(s:str, delimiter: str="\n") -> List[str]: """ Split a string to a list derived from a delimiter. If the string contains no delimiter, still return a list containing the string """ if delimiter in s: return s.split(delimiter) else: return [s] def repeat(l, index): """ Returns a element of a list If the index is out of range, start over at the beginning """ return l[index%len(l)] def dispatch_led(ledstate, config): """ Dispatch a LED thread with a given LED state """ led_thread = threading.Thread(target=set_led, args=(ledstate, config, )) led_thread.daemon = True led_thread.start() def set_led(ledstate, config): """ Toggle between differen LED colors """ ledstate = str(ledstate).strip().lower() if ledstate in ["success", "ok", "200"]: # Display the fitting LED color for the state at the given repetition for i in range(config["led"]["success_repetitions"] + 1): LEDS.fill(repeat(config["led"]["success_color"], i)) time.sleep(repeat(config["led"]["success_on_time"], i)) LEDS.fill((0, 0, 0)) time.sleep(repeat(config["led"]["success_off_time"], i)) elif ledstate in ["failure", "error", "fail", "404"]: # Display the fitting LED color for the state at the given repetition for i in range(config["led"]["failure_repetitions"] + 1): LEDS.fill(repeat(config["led"]["failure_color"], i)) time.sleep(repeat(config["led"]["failure_on_time"], i)) LEDS.fill((0, 0, 0)) time.sleep(repeat(config["led"]["failure_off_time"], i)) elif ledstate in ["startup"]: # Display the fitting LED color for the state at the given repetition for i in range(4): LEDS.fill((255,255,255)) time.sleep(0.2) LEDS.fill((0, 0, 0)) time.sleep(0.1) # Return to default standby LED color in the end LEDS.fill(config["led"]["standby_color"][0]) def dispatch_buzzer(state, config): """ Dispatch a Buzzer thread with a given Buzzer state """ # If the buzzer is not active, return before starting a thread if not config["buzzer"]["active"]: return # Start a daemonized buzzer thread buzzer_thread = threading.Thread(target=set_buzzer, args=(state, config, )) buzzer_thread.daemon = True buzzer_thread.start() def set_buzzer(state, config): """ Enable a buzzer and play a certain melody depending on the state """ global BUZZER state = str(state).strip().lower() if state in ["startup", "ready"]: for i, note in enumerate(config["buzzer"]["startup_notes"]): BUZZER.ChangeFrequency(midinumber_to_hertz(note)) BUZZER.start(10) # Set dutycycle to 10 time.sleep(repeat(config["buzzer"]["startup_note_lengths"], i)) BUZZER.stop() time.sleep(repeat(config["buzzer"]["startup_note_stop_lengths"], i)) elif state in ["success"]: for i, note in enumerate(config["buzzer"]["success_notes"]): BUZZER.ChangeFrequency(midinumber_to_hertz(note)) BUZZER.start(10) # Set dutycycle to 10 time.sleep(repeat(config["buzzer"]["success_note_lengths"], i)) BUZZER.stop() time.sleep(repeat(config["buzzer"]["success_note_stop_lengths"], i)) elif state in ["failure"]: for i, note in enumerate(config["buzzer"]["failure_notes"]): BUZZER.ChangeFrequency(midinumber_to_hertz(note)) BUZZER.start(10) # Set dutycycle to 10 time.sleep(repeat(config["buzzer"]["failure_note_lengths"], i)) BUZZER.stop() time.sleep(repeat(config["buzzer"]["failure_note_stop_lengths"], i)) def midinumber_to_hertz(note: int) -> float: """ Convert a midi number to a given frequency in Hertz """ return (440 / 32) * (2 ** ((note - 9) / 12)) def process_request(output_queue, config=None, logger=None): """ Process a event from the output queue. This: 1. Sends a requests 2. Reacts to a given respnse """ while (True): if (output_queue.qsize() > 0): verified_id = output_queue.get() success = send_request(verified_id, config, logger) if success: logger.info("Server Response: Success") dispatch_led("success", config) dispatch_buzzer("success", config) else: logger.info("Server Response: Failed") dispatch_led("failure", config) dispatch_buzzer("failure", config) else: time.sleep(0.01) def send_request(verified_id, config, logger) -> bool: """ Send post request to stechuhr-server Return true if everything was ok, false otherwise """ if int(config["server"]["port"]) == 443: target_address = 'https://{}:{}/'.format(config["server"]["address"], config["server"]["port"]) else: target_address = 'http://{}:{}/'.format(config["server"]["address"], config["server"]["port"]) logger.info("Posting {} to {}".format(verified_id, target_address)) payload = { "location" : config["client"]["location"], "entrance" : config["client"]["entrance"], "direction" : config["client"]["direction"], "id" : verified_id } if not config["application"]["dryrun"]: try: r = requests.post(target_address, json=payload, timeout=config["server"]["timeout"], verify=config["server"]["verify_cert"]) except Exception as e: logger.error(e) return False return r.ok else: # Always return true on dry run logger.info("Dryrun: Would post request to {}".format(target_address)) logger.info("Dryrun: Payload: {}".format(str(payload))) return True def id_pattern_check(visitor_id: str, config: dict) -> bool: """ Returns True if any of the patterns from the config matches. Returns False if none of the patterns matches. """ matches = [] for match, pattern in [(re.match(pattern, visitor_id) is not None, pattern) for pattern in config["client"]["id_patterns"]]: # Debug statement: # print('{} + {} = {}'.format(visitor_id, pattern, match)) matches.append(match) return any(matches) def open_cardreader(config, logger) -> '_io.BufferedReader': """ Find a device that matches the Vendor and Model IDs and return an exclusive file descriptor to it. """ # List fitting input devices ctx = pyudev.Context() devices = ctx.list_devices(subsystem='input', ID_BUS='usb') # Filter out the devices that are None or don't fit vendor or model ids devices = [d for d in devices if d.device_node is not None] devices = [d for d in devices if d.properties['ID_VENDOR_ID'] == config["reader"]["vendor_id"]] devices = [d for d in devices if d.properties['ID_MODEL_ID'] == config["reader"]["model_id"]] # If no device found, exit if len(devices) == 0: logger.criticial("Error: No Cardreader connected? Found no device with vendor ID {} and model ID {}. Exiting.".format(config["reader"]["vendor_id"], config["reader"]["model_id"])) exit(0) # Select the first device that matches the specs device = devices[0] logger.info("Using cardreader device: {}".format(device)) try: fd = open(device.device_node, 'rb') except FileNotFoundError: logger.criticial("No such device, is it connected?") exit(0) except PermissionError: logger.criticial("Insufficent permission to read, run me as root!") exit(0) # from input.h: EVIOCGNAME = lambda len: ioctl_opt.IOC(ioctl_opt.IOC_READ, ord('E'), 0x06, len) EVIOCGRAB = lambda len: ioctl_opt.IOW(ord('E'), 0x90, ctypes.c_int) # Get Device Name name = ctypes.create_string_buffer(256) fcntl.ioctl(fd, EVIOCGNAME(256), name, True) # Grab exclusive Access for fd logger.info("Grabbing device {} for exclusive access".format(name.value.decode('UTF-8'))) fcntl.ioctl(fd, EVIOCGRAB(1), True) # Return file descriptor return fd def read_key_input(input_queue, config, logger): """ Read Keyboard Input and put it onto the input queue """ # Get a exclusive file descriptor for the card reader cardreader = open_cardreader(config, logger) # Variables for struct unpack e_sec = "" # unix epoch second e_usec = "" # unix epoch microsecond e_type = "" # EV_SYN, EV_KEY, EV_MSC etc e_code = "" # keycode e_val = "" # keydown = 1, keyup = 0 logger.info("Listening for Input...") # Buffer to store incoming keys till ENTER (keycode 28) is received keybuffer = [] last_seen_cards = [] # Loop Forever over the Input while True: try: # Read bytes from the card reader byte = cardreader.read(EVENT_SIZE) # Unpack the struct (see above for a description of the elements) e_sec, e_usec, e_type, e_code, e_val = struct.unpack(EVENT_FORMAT, byte) # Filter by event type if e_type == EV_KEY and e_val == KEY_DOWN: # Append to the buffer, until we receive an enter key if not e_code == 28: # Check if the e_code is a valid Keycode if str(e_code) in KEYCODE.keys(): # Retrieve the actual character from the dict and append to buffer key = KEYCODE[str(e_code)] keybuffer.append(key) else: logger.warning("Received invalid Keycode: {}".format(e_code)) else: # When enter is received put the joined string to the imput # buffer where it will be processed by the requests thread # and reset the keybuffer joined_string = "".join(keybuffer) if len(last_seen_cards) == 0: input_queue.put(joined_string) last_seen_cards.append({"id": joined_string, "time": time.time()}) else: # Remove all cards from the list if they are past the threshold last_seen_cards = [c for c in last_seen_cards if time.time()-c["time"] > config["client"]["ignore_duration"] ] if not joined_string in [c["id"] for c in last_seen_cards]: input_queue.put(joined_string) last_seen_cards.append({"id": joined_string, "time": time.time()}) else: card = [c for c in last_seen_cards if joined_string == c["id"]][0] logger.info("Ignored duplicated register attempt of card {} because it was seen {} seconds ago".format(card["id"], card["time"])) keybuffer = [] except (KeyboardInterrupt, SystemExit, OSError) as e: cardreader.close() logger.critical("Exiting because of {}".format(e)) exit(0) time.sleep(0.01) def update_id_patterns(config, logger) -> 'Config': """ Update the id_patterns from the server via http """ if config["server"]["port"] == 80: protocol = "http" else: protocol = "https" url = '{}://{}/config/database/id_patterns'.format(protocol, config["server"]["address"].rstrip("/")) logger.debug("Requesting id_pattern update at address: {}".format(url)) # Send request r = requests.get(url, timeout=config["server"]["timeout"]) # If the response was ok update the pattern if it changed, else display a warning if r.ok: patterns = string_to_list(r.text) if set(patterns) == set(config["client"]["id_patterns"]): logger.debug("No change in serverside id_pattern, so didn't update") else: # Only compile the patterns once we know something changed patterns = [re.compile(p) for p in patterns] config["client"]["id_patterns"] = patterns logger.info("Received a newly updated patterns list from the server") else: logger.warn("Server at \"{}\" responded with {}".format(url, r.status)) return config def main(): logger = logging.getLogger(APPLICATION_NAME) logger.info('Starting main() in {}'.format(APPLICATION_NAME)) # Initialize the configuration (create a default one if needed) config = initialize_config(logger) # Create an input queue that stores all incoming keyboard events # and process them on a seperate thread input_queue = queue.Queue() inputThread = threading.Thread(target=read_key_input, args=([input_queue, config, logger]), daemon=True) inputThread.start() # Create an output queue that stores all processed input queues (separated by Enter) # and process them on a seperate thread output_queue = queue.Queue() outputThread = threading.Thread(target=process_request, args=(output_queue, config, logger), daemon=True) outputThread.start() # Dispatch a startup sound to signal readyness dispatch_buzzer("startup", config) # Set the LED to display readyness dispatch_led("startup", config) last_pattern_update = None # On the main thread handle communications between the two other threads. That means: # If there is sth. on the input queue, put it onto the output queue if it fits the # specified format while True: if (input_queue.qsize() > 0): potential_id = input_queue.get() if id_pattern_check(potential_id, config): verified_id = potential_id output_queue.put(verified_id) else: logger.warning("Didn't register as a valid ID: {}".format(potential_id[:1024])) dispatch_led("failure", config) dispatch_buzzer("failure", config) else: time.sleep(0.01) # Update the patterns at the pace defined by config["server"]["update_frequency"] if last_pattern_update is None or time.time() - last_pattern_update > config["server"]["update_frequency"]: last_pattern_update = time.time() config = update_id_patterns(config, logger) time.sleep(0.01) logger.info("End.") if (__name__ == '__main__'): main()