Skip to content
Snippets Groups Projects
client.py 16.3 KiB
Newer Older
  • Learn to ignore specific revisions
  • David Huss's avatar
    David Huss committed
    #!/usr/bin/env python 
    #-*- coding: utf-8 -*-
    
    
    David Huss's avatar
    David Huss committed
    import ast
    
    David Huss's avatar
    David Huss committed
    import threading
    import queue
    import time
    import sys
    import requests
    import uuid
    import socket
    
    David Huss's avatar
    David Huss committed
    import re
    
    import board
    
    David Huss's avatar
    David Huss committed
    import neopixel_spi
    
    David Huss's avatar
    David Huss committed
    import pyudev
    
    David Huss's avatar
    David Huss committed
    import os
    import ioctl_opt
    import fcntl
    import ctypes
    import struct
    
    import logging
    
    David Huss's avatar
    David Huss committed
    from typing import List
    
    David Huss's avatar
    David Huss committed
    import RPi.GPIO as GPIO
    
    from requests.packages.urllib3.exceptions import InsecureRequestWarning
    requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
    
    David Huss's avatar
    David Huss committed
    
    
    from config import initialize_config, APPLICATION_NAME, DEFAULT_CONFIG
    
    David Huss's avatar
    David Huss committed
    from keycode import KEYCODE
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    # Setup LED
    
    David Huss's avatar
    David Huss committed
    LEDS = neopixel_spi.NeoPixel_SPI(board.SPI(), 10)
    
    David Huss's avatar
    David Huss committed
    # Setup Buzzer
    
    David Huss's avatar
    David Huss committed
    buzzer_pin = 18
    
    David Huss's avatar
    David Huss committed
    GPIO.setup(buzzer_pin, GPIO.OUT)
    
    David Huss's avatar
    David Huss committed
    BUZZER = GPIO.PWM(buzzer_pin, 440) 
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    # Setup Constants used fo binding to keypad
    
    David Huss's avatar
    David Huss committed
    # from input-event-codes.h
    # type can be EV_SYN, EV_KEY or EV_MSC
    EV_KEY = 1
    KEY_DOWN = 1
    EVENT_FORMAT = "llHHI"
    EVENT_SIZE = struct.calcsize(EVENT_FORMAT)
    
    
    def string_to_list(s:str, delimiter: str="\n") -> List[str]:
        """
        Split a string to a list derived from a delimiter. If the string contains
        no delimiter, still return a list containing the string
        """
        if delimiter in s:
            return s.split(delimiter)
        else:
            return [s]
    
    
    def repeat(l, index):
        """
        Returns a element of a list
        If the index is out of range, start over at the beginning
        """
        return l[index%len(l)]
    
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    def dispatch_led(ledstate, config):
    
        """
        Dispatch a LED thread with a given LED state
        """
    
    David Huss's avatar
    David Huss committed
        led_thread = threading.Thread(target=set_led, args=(ledstate, config, ))
    
    David Huss's avatar
    David Huss committed
        led_thread.daemon = True
    
    David Huss's avatar
    David Huss committed
        led_thread.start()
    
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    def set_led(ledstate, config):
    
        """
        Toggle between differen LED colors
        """
        ledstate = str(ledstate).strip().lower()
    
        if ledstate in ["success", "ok", "200"]:
    
    David Huss's avatar
    David Huss committed
            # Display the fitting LED color for the state at the given repetition
    
            for i in range(config["led"]["success_repetitions"] + 1):
    
                LEDS.fill(repeat(config["led"]["success_color"], i))
                time.sleep(repeat(config["led"]["success_on_time"], i))
    
    David Huss's avatar
    David Huss committed
                LEDS.fill((0, 0, 0))
    
                time.sleep(repeat(config["led"]["success_off_time"], i))
    
        elif ledstate in ["failure", "fail", "404"]:
    
    David Huss's avatar
    David Huss committed
            # Display the fitting LED color for the state at the given repetition
    
            for i in range(config["led"]["failure_repetitions"] + 1):
    
                LEDS.fill(repeat(config["led"]["failure_color"], i))
                time.sleep(repeat(config["led"]["failure_on_time"], i))
    
    David Huss's avatar
    David Huss committed
                LEDS.fill((0, 0, 0))
    
                time.sleep(repeat(config["led"]["failure_off_time"], i))
    
        elif ledstate in ["startup"]:
            # Display the fitting LED color for the state at the given repetition
            for i in range(4):
                LEDS.fill((255,255,255))
                time.sleep(0.2)
                LEDS.fill((0, 0, 0))
                time.sleep(0.1)
    
    David Huss's avatar
    David Huss committed
    
        # Return to default standby LED color in the end
    
    David Huss's avatar
    David Huss committed
        LEDS.fill(config["led"]["standby_color"][0])
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    def dispatch_buzzer(state, config):
        """
        Dispatch a Buzzer thread with a given Buzzer state
        """
    
    David Huss's avatar
    David Huss committed
        # If the buzzer is not active, return before starting a thread
        if not config["buzzer"]["active"]:
            return
    
        # Start a daemonized buzzer thread
    
    David Huss's avatar
    David Huss committed
        buzzer_thread = threading.Thread(target=set_buzzer, args=(state, config, ))
    
    David Huss's avatar
    David Huss committed
        buzzer_thread.daemon = True
    
    David Huss's avatar
    David Huss committed
        buzzer_thread.start()
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    def set_buzzer(state, config):
    
    David Huss's avatar
    David Huss committed
        """
        Enable a buzzer and play a certain melody depending on the state
        """
    
    David Huss's avatar
    David Huss committed
        global BUZZER
    
    David Huss's avatar
    David Huss committed
    
        state = str(state).strip().lower()
        if state in ["startup", "ready"]:
    
    David Huss's avatar
    David Huss committed
            for i, note in enumerate(config["buzzer"]["startup_notes"]):
    
    David Huss's avatar
    David Huss committed
                BUZZER.ChangeFrequency(midinumber_to_hertz(note))
                BUZZER.start(10) # Set dutycycle to 10
    
    David Huss's avatar
    David Huss committed
                time.sleep(repeat(config["buzzer"]["startup_note_lengths"], i))
    
    David Huss's avatar
    David Huss committed
                BUZZER.stop()
    
    David Huss's avatar
    David Huss committed
                time.sleep(repeat(config["buzzer"]["startup_note_stop_lengths"], i))
    
    David Huss's avatar
    David Huss committed
        elif state in ["success"]:
    
    David Huss's avatar
    David Huss committed
            for i, note in enumerate(config["buzzer"]["success_notes"]):
    
    David Huss's avatar
    David Huss committed
                BUZZER.ChangeFrequency(midinumber_to_hertz(note))
                BUZZER.start(10) # Set dutycycle to 10
    
    David Huss's avatar
    David Huss committed
                time.sleep(repeat(config["buzzer"]["success_note_lengths"], i))
    
    David Huss's avatar
    David Huss committed
                BUZZER.stop()
    
    David Huss's avatar
    David Huss committed
                time.sleep(repeat(config["buzzer"]["success_note_stop_lengths"], i))
    
    David Huss's avatar
    David Huss committed
        elif state in ["failure"]:
    
    David Huss's avatar
    David Huss committed
            for i, note in enumerate(config["buzzer"]["failure_notes"]):
    
    David Huss's avatar
    David Huss committed
                BUZZER.ChangeFrequency(midinumber_to_hertz(note))
                BUZZER.start(10) # Set dutycycle to 10
    
    David Huss's avatar
    David Huss committed
                time.sleep(repeat(config["buzzer"]["failure_note_lengths"], i))
    
    David Huss's avatar
    David Huss committed
                BUZZER.stop()
    
    David Huss's avatar
    David Huss committed
                time.sleep(repeat(config["buzzer"]["failure_note_stop_lengths"], i))
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
    def midinumber_to_hertz(note: int) -> float:
        """
        Convert a midi number to a given frequency in Hertz
        """
        return (440 / 32) * (2 ** ((note - 9) / 12))
    
    David Huss's avatar
    David Huss committed
    def build_url(config, endpoint: str = "") -> str:
        """
        Build a URL for requests
        """
    
    David Huss's avatar
    David Huss committed
        protocol = "https"
        if not config["server"]["https"]:
    
    David Huss's avatar
    David Huss committed
            protocol = "http"
    
    
        url = '{}://{}:{}/{}'.format(protocol, config["server"]["address"].rstrip("/"), config["server"]["port"], endpoint.lstrip("/"))
    
    David Huss's avatar
    David Huss committed
        return url
    
    
    
    def process_request(output_queue, config=None, logger=None):
    
    David Huss's avatar
    David Huss committed
        """
        Process a event from the output queue.
        This:
        1. Sends a requests
    
        2. Reacts to a given response
        3. Blinks a LED red as long as the server cannot be reached
    
    David Huss's avatar
    David Huss committed
        """
        while (True):
            if (output_queue.qsize() > 0):
                verified_id = output_queue.get()
    
                response = send_request(verified_id, config, logger)
                if response == "ok":
    
                    logger.info("Server Response: Success")
    
                    dispatch_led("success", config)
    
    David Huss's avatar
    David Huss committed
                    dispatch_buzzer("success", config)
    
    David Huss's avatar
    David Huss committed
                elif response in ["failure", "error"]:
    
                    logger.info("Server Response: Failed")
    
                    dispatch_led("failure", config)
    
    David Huss's avatar
    David Huss committed
                    dispatch_buzzer("failure", config)
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
            else:
    
    David Huss's avatar
    David Huss committed
                time.sleep(0.01)
    
    David Huss's avatar
    David Huss committed
    
    
    
    def send_request(verified_id, config, logger) -> str:
    
    David Huss's avatar
    David Huss committed
        """
        Send post request to stechuhr-server
        Return true if everything was ok, false otherwise
        """
    
    David Huss's avatar
    David Huss committed
        if int(config["server"]["https"]):
    
    David Huss's avatar
    David Huss committed
            target_address = 'https://{}:{}/'.format(config["server"]["address"], config["server"]["port"])
        else:
            target_address = 'http://{}:{}/'.format(config["server"]["address"], config["server"]["port"])
    
        logger.info("Posting {} to {}".format(verified_id, target_address))
    
    David Huss's avatar
    David Huss committed
    
        payload = {
    
    David Huss's avatar
    David Huss committed
            "location"  : config["client"]["location"],
            "entrance"  : config["client"]["entrance"],
            "direction"  : config["client"]["direction"],
            "id" : verified_id
    
    David Huss's avatar
    David Huss committed
        }
    
        if not config["application"]["dryrun"]:
    
    David Huss's avatar
    David Huss committed
            try:
                r = requests.post(target_address, json=payload, timeout=config["server"]["timeout"], verify=config["server"]["verify_cert"])
            except Exception as e:
    
                return "error"
            if r.ok:
                return "ok"
            else:
                return "failure"
    
    David Huss's avatar
    David Huss committed
        else:
            # Always return true on dry run
    
            logger.info("Dryrun: Would post request to {}".format(target_address))
            logger.info("Dryrun: Payload: {}".format(str(payload)))
    
    David Huss's avatar
    David Huss committed
    
    
    
    David Huss's avatar
    David Huss committed
    def id_pattern_check(visitor_id: str, config: dict) -> bool:
    
    David Huss's avatar
    David Huss committed
        """
    
    David Huss's avatar
    David Huss committed
        Returns True if any of the patterns from the config matches.
        Returns False if none of the patterns matches.
    
    David Huss's avatar
    David Huss committed
        """
    
    David Huss's avatar
    David Huss committed
        matches = []
        for match, pattern in [(re.match(pattern, visitor_id) is not None, pattern) for pattern in config["client"]["id_patterns"]]:
            # Debug statement: 
            # print('{} + {} = {}'.format(visitor_id, pattern, match))
            matches.append(match)
        return any(matches)
    
    David Huss's avatar
    David Huss committed
    
    
    
    def open_cardreader(config, logger) -> '_io.BufferedReader':
    
    David Huss's avatar
    David Huss committed
        """
    
    David Huss's avatar
    David Huss committed
        Find a device that matches the Vendor and Model IDs and return an exclusive
        file descriptor to it.
    
    David Huss's avatar
    David Huss committed
        """
    
    David Huss's avatar
    David Huss committed
    
        # List fitting input devices
    
    David Huss's avatar
    David Huss committed
        ctx = pyudev.Context()
        devices = ctx.list_devices(subsystem='input', ID_BUS='usb')
    
    David Huss's avatar
    David Huss committed
    
        # Filter out the devices that are None or don't fit vendor or model ids
    
    David Huss's avatar
    David Huss committed
        devices = [d for d in devices if d.device_node is not None]
    
        devices = [d for d in devices if d.properties['ID_VENDOR_ID'] == config["reader"]["vendor_id"]]
        devices = [d for d in devices if d.properties['ID_MODEL_ID'] == config["reader"]["model_id"]]
    
    David Huss's avatar
    David Huss committed
    
        # If no device found, exit
        if len(devices) == 0:
    
            logger.criticial("Error: No Cardreader connected? Found no device with vendor ID {} and model ID {}. Exiting.".format(config["reader"]["vendor_id"], config["reader"]["model_id"]))
    
    David Huss's avatar
    David Huss committed
            exit(0)
    
        # Select the first device that matches the specs
        device = devices[0]
    
    
        logger.info("Using cardreader device: {}".format(device))
    
    David Huss's avatar
    David Huss committed
    
        try:
    
    David Huss's avatar
    David Huss committed
            fd = open(device.device_node, 'rb')
    
    David Huss's avatar
    David Huss committed
        except FileNotFoundError:
    
            logger.criticial("No such device, is it connected?")
    
    David Huss's avatar
    David Huss committed
            exit(0)
        except PermissionError:
    
            logger.criticial("Insufficent permission to read, run me as root!")
    
    David Huss's avatar
    David Huss committed
            exit(0)
    
        # from input.h:
        EVIOCGNAME = lambda len: ioctl_opt.IOC(ioctl_opt.IOC_READ, ord('E'), 0x06, len)
        EVIOCGRAB = lambda len: ioctl_opt.IOW(ord('E'), 0x90, ctypes.c_int)
    
    
    David Huss's avatar
    David Huss committed
        # Get Device Name
        name = ctypes.create_string_buffer(256)
    
    David Huss's avatar
    David Huss committed
        fcntl.ioctl(fd, EVIOCGNAME(256), name, True)
    
    
    David Huss's avatar
    David Huss committed
        # Grab exclusive Access for fd
    
        logger.info("Grabbing device {} for exclusive access".format(name.value.decode('UTF-8')))
    
    David Huss's avatar
    David Huss committed
        fcntl.ioctl(fd, EVIOCGRAB(1), True)
    
    
    David Huss's avatar
    David Huss committed
        # Return file descriptor
    
    David Huss's avatar
    David Huss committed
        return fd
    
    
    
    def read_key_input(input_queue, config, logger):
    
    David Huss's avatar
    David Huss committed
        """
        Read Keyboard Input and put it onto the input queue
        """
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
        # Get a exclusive file descriptor for the card reader
    
        cardreader = open_cardreader(config, logger)
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
        # Variables for struct unpack
    
    David Huss's avatar
    David Huss committed
        e_sec = "" # unix epoch second
        e_usec = "" # unix epoch microsecond
        e_type = "" # EV_SYN, EV_KEY, EV_MSC etc
    
    David Huss's avatar
    David Huss committed
        e_code = "" # keycode
    
    David Huss's avatar
    David Huss committed
        e_val = "" # keydown = 1, keyup = 0
    
    
        logger.info("Listening for Input...")
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
        # Buffer to store incoming keys till ENTER (keycode 28) is received
    
        keybuffer = []
    
    David Huss's avatar
    David Huss committed
        last_seen_cards = []
    
    David Huss's avatar
    David Huss committed
        # Loop Forever over the Input
    
    David Huss's avatar
    David Huss committed
        while True:
            try:
    
    David Huss's avatar
    David Huss committed
                # Read bytes from the card reader
    
    David Huss's avatar
    David Huss committed
                byte = cardreader.read(EVENT_SIZE)
    
    David Huss's avatar
    David Huss committed
    
                # Unpack the struct (see above for a description of the elements)
    
    David Huss's avatar
    David Huss committed
                e_sec, e_usec, e_type, e_code, e_val = struct.unpack(EVENT_FORMAT, byte)
    
    David Huss's avatar
    David Huss committed
    
                # Filter by event type
    
    David Huss's avatar
    David Huss committed
                if e_type == EV_KEY and e_val == KEY_DOWN:
    
    David Huss's avatar
    David Huss committed
                    # Append to the buffer, until we receive an enter key
    
    David Huss's avatar
    David Huss committed
                    if not e_code == 28:
    
    David Huss's avatar
    David Huss committed
                        # Check if the e_code is a valid Keycode
    
    David Huss's avatar
    David Huss committed
                        if str(e_code) in KEYCODE.keys():
    
    David Huss's avatar
    David Huss committed
                            # Retrieve the actual character from the dict and append to buffer
    
    David Huss's avatar
    David Huss committed
                            key = KEYCODE[str(e_code)]
    
    David Huss's avatar
    David Huss committed
                            keybuffer.append(key)
    
    David Huss's avatar
    David Huss committed
                        else:
    
                            logger.warning("Received invalid Keycode: {}".format(e_code))
    
    David Huss's avatar
    David Huss committed
                    else:
    
    David Huss's avatar
    David Huss committed
                        # When enter is received put the joined string to the imput 
                        # buffer where it will be processed by the requests thread 
    
    David Huss's avatar
    David Huss committed
                        # and reset the keybuffer
                        joined_string = "".join(keybuffer)
    
    
    David Huss's avatar
    David Huss committed
                        if len(last_seen_cards) == 0:
    
    David Huss's avatar
    David Huss committed
                            input_queue.put(joined_string)
    
                            last_seen_cards.append({"id": joined_string, "time": time.time()})
                        else:
                            # Remove all cards from the list if they are past the threshold
    
                            last_seen_cards = [c for c in last_seen_cards if time.time()-c["time"] < config["client"]["ignore_duration"] ]
    
    
                            if not joined_string in [c["id"] for c in last_seen_cards]:
                                input_queue.put(joined_string)
                                last_seen_cards.append({"id": joined_string, "time": time.time()})
                            else:
                                card =  [c for c in last_seen_cards if joined_string == c["id"]][0]
    
                                logger.info("Ignored duplicated register attempt of card {} because it was last seen {} seconds ago (ignoring all repeated attempts within {} seconds, according to config)".format(card["id"], time.time()-card["time"], config["client"]["ignore_duration"]))
    
                        keybuffer = []
    
    David Huss's avatar
    David Huss committed
            except (KeyboardInterrupt, SystemExit, OSError) as e:
    
    David Huss's avatar
    David Huss committed
                cardreader.close()
    
                logger.critical("Exiting because of {}".format(e))
    
    David Huss's avatar
    David Huss committed
                exit(0)
    
    David Huss's avatar
    David Huss committed
            time.sleep(0.01)
    
    David Huss's avatar
    David Huss committed
    
    
    
    def update_id_patterns(config, logger) -> 'Config':
        """
        Update the id_patterns from the server via http
        """
    
    David Huss's avatar
    David Huss committed
        url = build_url(config, "/config/database/id_patterns")
    
        logger.debug("Requesting id_pattern update at address: {}".format(url))
    
        # Send request
    
    David Huss's avatar
    David Huss committed
        try:
    
            r = requests.get(url, timeout=config["server"]["timeout"], verify=config["server"]["verify_cert"])
    
        except requests.packages.urllib3.exceptions.MaxRetryError as e:
    
            logger.error(f"Couldn't reach server at \"{url}\" (Timeout, using existing patterns instead): {e}")
            return config
        except requests.packages.urllib3.exceptions.NewConnectionError as e:
            logger.error(f"Couldn't reach server at \"{url}\" (NewConnectionError, using existing patterns instead): {e}")
    
    David Huss's avatar
    David Huss committed
            return config
    
        except BaseException as e:
    
            logger.error(f"Couldn't reach server at \"{url}\" (Error): {e}")
            return config
    
    
        # If the response was ok update the pattern if it changed, else display a warning
        if r.ok:
            patterns = string_to_list(r.text)
            if set(patterns) == set(config["client"]["id_patterns"]):
                logger.debug("No change in serverside id_pattern, so didn't update")
            else:
                # Only compile the patterns once we know something changed
                patterns = [re.compile(p) for p in patterns]
                config["client"]["id_patterns"] = patterns
                logger.info("Received a newly updated patterns list from the server")
        else:
            logger.warn("Server at \"{}\" responded with {}".format(url, r.status))
    
        return config
    
    
    David Huss's avatar
    David Huss committed
    def main():
    
        logger = logging.getLogger(APPLICATION_NAME)
        logger.info('Starting main() in {}'.format(APPLICATION_NAME))
    
    David Huss's avatar
    David Huss committed
        # Initialize the configuration (create a default one if needed)
    
        config = initialize_config(logger)
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
        # Create an input queue that stores all incoming keyboard events
        # and process them on a seperate thread
        input_queue = queue.Queue()
    
        inputThread = threading.Thread(target=read_key_input, args=([input_queue, config, logger]), daemon=True)
    
    David Huss's avatar
    David Huss committed
        inputThread.start()
    
        # Create an output queue that stores all processed input queues (separated by Enter)
        # and process them on a seperate thread
        output_queue = queue.Queue()
    
        outputThread = threading.Thread(target=process_request, args=(output_queue, config, logger), daemon=True)
    
    David Huss's avatar
    David Huss committed
        outputThread.start()
    
    
    David Huss's avatar
    David Huss committed
        # Dispatch a startup sound to signal readyness
    
    David Huss's avatar
    David Huss committed
        dispatch_buzzer("startup", config)
    
    
        # Set the LED to display readyness
    
        dispatch_led("startup", config)
    
        last_pattern_update = None
    
    
    David Huss's avatar
    David Huss committed
        # On the main thread handle communications between the two other threads. That means:
        # If there is sth. on the input queue, put it onto the output queue if it fits the
        # specified format
        while True:
            if (input_queue.qsize() > 0):
                potential_id = input_queue.get()
    
    David Huss's avatar
    David Huss committed
                if id_pattern_check(potential_id, config):
    
    David Huss's avatar
    David Huss committed
                    verified_id = potential_id
                    output_queue.put(verified_id)
                else:
    
                    logger.warning("Didn't register as a valid ID: {}".format(potential_id[:1024]))
                    dispatch_led("failure", config)
    
    David Huss's avatar
    David Huss committed
                    dispatch_buzzer("failure", config)
    
    David Huss's avatar
    David Huss committed
            else:
    
    David Huss's avatar
    David Huss committed
                time.sleep(0.01)
    
    David Huss's avatar
    David Huss committed
                
    
            # Update the patterns at the pace defined by config["server"]["update_frequency"]
    
            if last_pattern_update is None or time.time() - last_pattern_update > config["server"]["update_frequency"]:
                last_pattern_update = time.time()
    
                config = update_id_patterns(config, logger)
    
    David Huss's avatar
    David Huss committed
    
    
    David Huss's avatar
    David Huss committed
            time.sleep(0.01)
    
    David Huss's avatar
    David Huss committed
     
    
        logger.info("End.")
    
    David Huss's avatar
    David Huss committed
    
    
    if (__name__ == '__main__'): 
        main()