Skip to content
Snippets Groups Projects
Select Git revision
  • 1b46acf2d5c8d188972c200461449bf8db94f30b
  • master default protected
  • config
  • piezo
4 results

client.py

Blame
  • client.py 16.29 KiB
    #!/usr/bin/env python 
    #-*- coding: utf-8 -*-
    
    import ast
    import threading
    import queue
    import time
    import sys
    import requests
    import uuid
    import socket
    import re
    import board
    import neopixel_spi
    import pyudev
    import os
    import ioctl_opt
    import fcntl
    import ctypes
    import struct
    import logging
    from typing import List
    import RPi.GPIO as GPIO
    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    
    from config import initialize_config, APPLICATION_NAME, DEFAULT_CONFIG
    from keycode import KEYCODE
    
    # Setup LED
    LEDS = neopixel_spi.NeoPixel_SPI(board.SPI(), 10)
    
    # Setup Buzzer
    buzzer_pin = 18
    GPIO.setup(buzzer_pin, GPIO.OUT)
    BUZZER = GPIO.PWM(buzzer_pin, 440) 
    
    # Setup Constants used fo binding to keypad
    # from input-event-codes.h
    # type can be EV_SYN, EV_KEY or EV_MSC
    EV_KEY = 1
    KEY_DOWN = 1
    EVENT_FORMAT = "llHHI"
    EVENT_SIZE = struct.calcsize(EVENT_FORMAT)
    
    
    def string_to_list(s:str, delimiter: str="\n") -> List[str]:
        """
        Split a string to a list derived from a delimiter. If the string contains
        no delimiter, still return a list containing the string
        """
        if delimiter in s:
            return s.split(delimiter)
        else:
            return [s]
    
    
    def repeat(l, index):
        """
        Returns a element of a list
        If the index is out of range, start over at the beginning
        """
        return l[index%len(l)]
    
    
    def dispatch_led(ledstate, config):
        """
        Dispatch a LED thread with a given LED state
        """
        led_thread = threading.Thread(target=set_led, args=(ledstate, config, ))
        led_thread.daemon = True
        led_thread.start()
    
    
    def set_led(ledstate, config):
        """
        Toggle between differen LED colors
        """
        ledstate = str(ledstate).strip().lower()
    
        if ledstate in ["success", "ok", "200"]:
            # Display the fitting LED color for the state at the given repetition
            for i in range(config["led"]["success_repetitions"] + 1):
                LEDS.fill(repeat(config["led"]["success_color"], i))
                time.sleep(repeat(config["led"]["success_on_time"], i))
                LEDS.fill((0, 0, 0))
                time.sleep(repeat(config["led"]["success_off_time"], i))
        elif ledstate in ["failure", "fail", "404"]:
            # Display the fitting LED color for the state at the given repetition
            for i in range(config["led"]["failure_repetitions"] + 1):
                LEDS.fill(repeat(config["led"]["failure_color"], i))
                time.sleep(repeat(config["led"]["failure_on_time"], i))
                LEDS.fill((0, 0, 0))
                time.sleep(repeat(config["led"]["failure_off_time"], i))
        elif ledstate in ["startup"]:
            # Display the fitting LED color for the state at the given repetition
            for i in range(4):
                LEDS.fill((255,255,255))
                time.sleep(0.2)
                LEDS.fill((0, 0, 0))
                time.sleep(0.1)
    
        # Return to default standby LED color in the end
        LEDS.fill(config["led"]["standby_color"][0])
    
    
    def dispatch_buzzer(state, config):
        """
        Dispatch a Buzzer thread with a given Buzzer state
        """
        # If the buzzer is not active, return before starting a thread
        if not config["buzzer"]["active"]:
            return
    
        # Start a daemonized buzzer thread
        buzzer_thread = threading.Thread(target=set_buzzer, args=(state, config, ))
        buzzer_thread.daemon = True
        buzzer_thread.start()
    
    
    def set_buzzer(state, config):
        """
        Enable a buzzer and play a certain melody depending on the state
        """
        global BUZZER
    
        state = str(state).strip().lower()
        if state in ["startup", "ready"]:
            for i, note in enumerate(config["buzzer"]["startup_notes"]):
                BUZZER.ChangeFrequency(midinumber_to_hertz(note))
                BUZZER.start(10) # Set dutycycle to 10
                time.sleep(repeat(config["buzzer"]["startup_note_lengths"], i))
                BUZZER.stop()
                time.sleep(repeat(config["buzzer"]["startup_note_stop_lengths"], i))
        elif state in ["success"]:
            for i, note in enumerate(config["buzzer"]["success_notes"]):
                BUZZER.ChangeFrequency(midinumber_to_hertz(note))
                BUZZER.start(10) # Set dutycycle to 10
                time.sleep(repeat(config["buzzer"]["success_note_lengths"], i))
                BUZZER.stop()
                time.sleep(repeat(config["buzzer"]["success_note_stop_lengths"], i))
        elif state in ["failure"]:
            for i, note in enumerate(config["buzzer"]["failure_notes"]):
                BUZZER.ChangeFrequency(midinumber_to_hertz(note))
                BUZZER.start(10) # Set dutycycle to 10
                time.sleep(repeat(config["buzzer"]["failure_note_lengths"], i))
                BUZZER.stop()
                time.sleep(repeat(config["buzzer"]["failure_note_stop_lengths"], i))
    
    
    def midinumber_to_hertz(note: int) -> float:
        """
        Convert a midi number to a given frequency in Hertz
        """
        return (440 / 32) * (2 ** ((note - 9) / 12))
    
    
    def build_url(config, endpoint: str = "") -> str:
        """
        Build a URL for requests
        """
        protocol = "https"
        if not config["server"]["https"]:
            protocol = "http"
    
        url = '{}://{}:{}/{}'.format(protocol, config["server"]["address"].rstrip("/"), config["server"]["port"], endpoint.lstrip("/"))
        return url
    
    
    def process_request(output_queue, config=None, logger=None):
        """
        Process a event from the output queue.
        This:
        1. Sends a requests
        2. Reacts to a given response
        3. Blinks a LED red as long as the server cannot be reached
        """
        while (True):
            if (output_queue.qsize() > 0):
                verified_id = output_queue.get()
                response = send_request(verified_id, config, logger)
                if response == "ok":
                    logger.info("Server Response: Success")
                    dispatch_led("success", config)
                    dispatch_buzzer("success", config)
                elif response in ["failure", "error"]:
                    logger.info("Server Response: Failed")
                    dispatch_led("failure", config)
                    dispatch_buzzer("failure", config)
    
            else:
                time.sleep(0.01)
    
    
    def send_request(verified_id, config, logger) -> str:
        """
        Send post request to stechuhr-server
        Return true if everything was ok, false otherwise
        """
        if int(config["server"]["https"]):
            target_address = 'https://{}:{}/'.format(config["server"]["address"], config["server"]["port"])
        else:
            target_address = 'http://{}:{}/'.format(config["server"]["address"], config["server"]["port"])
    
        logger.info("Posting {} to {}".format(verified_id, target_address))
    
        payload = {
            "location"  : config["client"]["location"],
            "entrance"  : config["client"]["entrance"],
            "direction"  : config["client"]["direction"],
            "id" : verified_id
        }
        if not config["application"]["dryrun"]:
            try:
                r = requests.post(target_address, json=payload, timeout=config["server"]["timeout"], verify=config["server"]["verify_cert"])
            except Exception as e:
                logger.error(e)
                return "error"
            if r.ok:
                return "ok"
            else:
                return "failure"
        else:
            # Always return true on dry run
            logger.info("Dryrun: Would post request to {}".format(target_address))
            logger.info("Dryrun: Payload: {}".format(str(payload)))
            return "ok"
    
    
    def id_pattern_check(visitor_id: str, config: dict) -> bool:
        """
        Returns True if any of the patterns from the config matches.
        Returns False if none of the patterns matches.
        """
        matches = []
        for match, pattern in [(re.match(pattern, visitor_id) is not None, pattern) for pattern in config["client"]["id_patterns"]]:
            # Debug statement: 
            # print('{} + {} = {}'.format(visitor_id, pattern, match))
            matches.append(match)
        return any(matches)
    
    
    def open_cardreader(config, logger) -> '_io.BufferedReader':
        """
        Find a device that matches the Vendor and Model IDs and return an exclusive
        file descriptor to it.
        """
    
        # List fitting input devices
        ctx = pyudev.Context()
        devices = ctx.list_devices(subsystem='input', ID_BUS='usb')
    
        # Filter out the devices that are None or don't fit vendor or model ids
        devices = [d for d in devices if d.device_node is not None]
        devices = [d for d in devices if d.properties['ID_VENDOR_ID'] == config["reader"]["vendor_id"]]
        devices = [d for d in devices if d.properties['ID_MODEL_ID'] == config["reader"]["model_id"]]
    
        # If no device found, exit
        if len(devices) == 0:
            logger.criticial("Error: No Cardreader connected? Found no device with vendor ID {} and model ID {}. Exiting.".format(config["reader"]["vendor_id"], config["reader"]["model_id"]))
            exit(0)
    
        # Select the first device that matches the specs
        device = devices[0]
    
        logger.info("Using cardreader device: {}".format(device))
    
        try:
            fd = open(device.device_node, 'rb')
        except FileNotFoundError:
            logger.criticial("No such device, is it connected?")
            exit(0)
        except PermissionError:
            logger.criticial("Insufficent permission to read, run me as root!")
            exit(0)
    
        # from input.h:
        EVIOCGNAME = lambda len: ioctl_opt.IOC(ioctl_opt.IOC_READ, ord('E'), 0x06, len)
        EVIOCGRAB = lambda len: ioctl_opt.IOW(ord('E'), 0x90, ctypes.c_int)
    
        # Get Device Name
        name = ctypes.create_string_buffer(256)
        fcntl.ioctl(fd, EVIOCGNAME(256), name, True)
    
        # Grab exclusive Access for fd
        logger.info("Grabbing device {} for exclusive access".format(name.value.decode('UTF-8')))
        fcntl.ioctl(fd, EVIOCGRAB(1), True)
    
        # Return file descriptor
        return fd
    
    
    def read_key_input(input_queue, config, logger):
        """
        Read Keyboard Input and put it onto the input queue
        """
    
        # Get a exclusive file descriptor for the card reader
        cardreader = open_cardreader(config, logger)
    
        # Variables for struct unpack
        e_sec = "" # unix epoch second
        e_usec = "" # unix epoch microsecond
        e_type = "" # EV_SYN, EV_KEY, EV_MSC etc
        e_code = "" # keycode
        e_val = "" # keydown = 1, keyup = 0
    
        logger.info("Listening for Input...")
    
        # Buffer to store incoming keys till ENTER (keycode 28) is received
        keybuffer = []
        last_seen_cards = []
    
        # Loop Forever over the Input
        while True:
            try:
                # Read bytes from the card reader
                byte = cardreader.read(EVENT_SIZE)
    
                # Unpack the struct (see above for a description of the elements)
                e_sec, e_usec, e_type, e_code, e_val = struct.unpack(EVENT_FORMAT, byte)
    
                # Filter by event type
                if e_type == EV_KEY and e_val == KEY_DOWN:
                    # Append to the buffer, until we receive an enter key
                    if not e_code == 28:
                        # Check if the e_code is a valid Keycode
                        if str(e_code) in KEYCODE.keys():
                            # Retrieve the actual character from the dict and append to buffer
                            key = KEYCODE[str(e_code)]
                            keybuffer.append(key)
                        else:
                            logger.warning("Received invalid Keycode: {}".format(e_code))
                    else:
                        # When enter is received put the joined string to the imput 
                        # buffer where it will be processed by the requests thread 
                        # and reset the keybuffer
                        joined_string = "".join(keybuffer)
    
    
    
                        if len(last_seen_cards) == 0:
                            input_queue.put(joined_string)
                            last_seen_cards.append({"id": joined_string, "time": time.time()})
                        else:
                            # Remove all cards from the list if they are past the threshold
                            last_seen_cards = [c for c in last_seen_cards if time.time()-c["time"] < config["client"]["ignore_duration"] ]
    
                            if not joined_string in [c["id"] for c in last_seen_cards]:
                                input_queue.put(joined_string)
                                last_seen_cards.append({"id": joined_string, "time": time.time()})
                            else:
                                card =  [c for c in last_seen_cards if joined_string == c["id"]][0]
                                logger.info("Ignored duplicated register attempt of card {} because it was last seen {} seconds ago (ignoring all repeated attempts within {} seconds, according to config)".format(card["id"], time.time()-card["time"], config["client"]["ignore_duration"]))
                        
                        keybuffer = []
            except (KeyboardInterrupt, SystemExit, OSError) as e:
                cardreader.close()
                logger.critical("Exiting because of {}".format(e))
                exit(0)
            time.sleep(0.01)
    
    
    def update_id_patterns(config, logger) -> 'Config':
        """
        Update the id_patterns from the server via http
        """
        url = build_url(config, "/config/database/id_patterns")
        logger.debug("Requesting id_pattern update at address: {}".format(url))
    
        # Send request
        try:
            r = requests.get(url, timeout=config["server"]["timeout"], verify=config["server"]["verify_cert"])
        except requests.packages.urllib3.exceptions.MaxRetryError as e:
            logger.error(f"Couldn't reach server at \"{url}\" (Timeout, using existing patterns instead): {e}")
            return config
        except requests.packages.urllib3.exceptions.NewConnectionError as e:
            logger.error(f"Couldn't reach server at \"{url}\" (NewConnectionError, using existing patterns instead): {e}")
            return config
        except BaseException as e:
            logger.error(f"Couldn't reach server at \"{url}\" (Error): {e}")
            return config
    
        # If the response was ok update the pattern if it changed, else display a warning
        if r.ok:
            patterns = string_to_list(r.text)
            if set(patterns) == set(config["client"]["id_patterns"]):
                logger.debug("No change in serverside id_pattern, so didn't update")
            else:
                # Only compile the patterns once we know something changed
                patterns = [re.compile(p) for p in patterns]
                config["client"]["id_patterns"] = patterns
                logger.info("Received a newly updated patterns list from the server")
        else:
            logger.warn("Server at \"{}\" responded with {}".format(url, r.status_code))
    
        return config
    
    
    def main():
        logger = logging.getLogger(APPLICATION_NAME)
        logger.info('Starting main() in {}'.format(APPLICATION_NAME))
    
        # Initialize the configuration (create a default one if needed)
        config = initialize_config(logger)
    
        # Create an input queue that stores all incoming keyboard events
        # and process them on a seperate thread
        input_queue = queue.Queue()
        inputThread = threading.Thread(target=read_key_input, args=([input_queue, config, logger]), daemon=True)
        inputThread.start()
    
        # Create an output queue that stores all processed input queues (separated by Enter)
        # and process them on a seperate thread
        output_queue = queue.Queue()
        outputThread = threading.Thread(target=process_request, args=(output_queue, config, logger), daemon=True)
        outputThread.start()
    
        # Dispatch a startup sound to signal readyness
        dispatch_buzzer("startup", config)
    
        # Set the LED to display readyness
        dispatch_led("startup", config)
    
        last_pattern_update = None
    
        # On the main thread handle communications between the two other threads. That means:
        # If there is sth. on the input queue, put it onto the output queue if it fits the
        # specified format
        while True:
            if (input_queue.qsize() > 0):
                potential_id = input_queue.get()
                if id_pattern_check(potential_id, config):
                    verified_id = potential_id
                    output_queue.put(verified_id)
                else:
                    logger.warning("Didn't register as a valid ID: {}".format(potential_id[:1024]))
                    dispatch_led("failure", config)
                    dispatch_buzzer("failure", config)
            else:
                time.sleep(0.01)
                
            # Update the patterns at the pace defined by config["server"]["update_frequency"]
            if last_pattern_update is None or time.time() - last_pattern_update > config["server"]["update_frequency"]:
                last_pattern_update = time.time()
                config = update_id_patterns(config, logger)
    
    
            time.sleep(0.01)
     
        logger.info("End.")
    
    
    if (__name__ == '__main__'): 
        main()