Select Git revision
client.py 16.29 KiB
#!/usr/bin/env python
#-*- coding: utf-8 -*-
import ast
import threading
import queue
import time
import sys
import requests
import uuid
import socket
import re
import board
import neopixel_spi
import pyudev
import os
import ioctl_opt
import fcntl
import ctypes
import struct
import logging
from typing import List
import RPi.GPIO as GPIO
from requests.packages.urllib3.exceptions import InsecureRequestWarning
requests.packages.urllib3.disable_warnings(InsecureRequestWarning)
from config import initialize_config, APPLICATION_NAME, DEFAULT_CONFIG
from keycode import KEYCODE
# Setup LED
LEDS = neopixel_spi.NeoPixel_SPI(board.SPI(), 10)
# Setup Buzzer
buzzer_pin = 18
GPIO.setup(buzzer_pin, GPIO.OUT)
BUZZER = GPIO.PWM(buzzer_pin, 440)
# Setup Constants used fo binding to keypad
# from input-event-codes.h
# type can be EV_SYN, EV_KEY or EV_MSC
EV_KEY = 1
KEY_DOWN = 1
EVENT_FORMAT = "llHHI"
EVENT_SIZE = struct.calcsize(EVENT_FORMAT)
def string_to_list(s:str, delimiter: str="\n") -> List[str]:
"""
Split a string to a list derived from a delimiter. If the string contains
no delimiter, still return a list containing the string
"""
if delimiter in s:
return s.split(delimiter)
else:
return [s]
def repeat(l, index):
"""
Returns a element of a list
If the index is out of range, start over at the beginning
"""
return l[index%len(l)]
def dispatch_led(ledstate, config):
"""
Dispatch a LED thread with a given LED state
"""
led_thread = threading.Thread(target=set_led, args=(ledstate, config, ))
led_thread.daemon = True
led_thread.start()
def set_led(ledstate, config):
"""
Toggle between differen LED colors
"""
ledstate = str(ledstate).strip().lower()
if ledstate in ["success", "ok", "200"]:
# Display the fitting LED color for the state at the given repetition
for i in range(config["led"]["success_repetitions"] + 1):
LEDS.fill(repeat(config["led"]["success_color"], i))
time.sleep(repeat(config["led"]["success_on_time"], i))
LEDS.fill((0, 0, 0))
time.sleep(repeat(config["led"]["success_off_time"], i))
elif ledstate in ["failure", "fail", "404"]:
# Display the fitting LED color for the state at the given repetition
for i in range(config["led"]["failure_repetitions"] + 1):
LEDS.fill(repeat(config["led"]["failure_color"], i))
time.sleep(repeat(config["led"]["failure_on_time"], i))
LEDS.fill((0, 0, 0))
time.sleep(repeat(config["led"]["failure_off_time"], i))
elif ledstate in ["startup"]:
# Display the fitting LED color for the state at the given repetition
for i in range(4):
LEDS.fill((255,255,255))
time.sleep(0.2)
LEDS.fill((0, 0, 0))
time.sleep(0.1)
# Return to default standby LED color in the end
LEDS.fill(config["led"]["standby_color"][0])
def dispatch_buzzer(state, config):
"""
Dispatch a Buzzer thread with a given Buzzer state
"""
# If the buzzer is not active, return before starting a thread
if not config["buzzer"]["active"]:
return
# Start a daemonized buzzer thread
buzzer_thread = threading.Thread(target=set_buzzer, args=(state, config, ))
buzzer_thread.daemon = True
buzzer_thread.start()
def set_buzzer(state, config):
"""
Enable a buzzer and play a certain melody depending on the state
"""
global BUZZER
state = str(state).strip().lower()
if state in ["startup", "ready"]:
for i, note in enumerate(config["buzzer"]["startup_notes"]):
BUZZER.ChangeFrequency(midinumber_to_hertz(note))
BUZZER.start(10) # Set dutycycle to 10
time.sleep(repeat(config["buzzer"]["startup_note_lengths"], i))
BUZZER.stop()
time.sleep(repeat(config["buzzer"]["startup_note_stop_lengths"], i))
elif state in ["success"]:
for i, note in enumerate(config["buzzer"]["success_notes"]):
BUZZER.ChangeFrequency(midinumber_to_hertz(note))
BUZZER.start(10) # Set dutycycle to 10
time.sleep(repeat(config["buzzer"]["success_note_lengths"], i))
BUZZER.stop()
time.sleep(repeat(config["buzzer"]["success_note_stop_lengths"], i))
elif state in ["failure"]:
for i, note in enumerate(config["buzzer"]["failure_notes"]):
BUZZER.ChangeFrequency(midinumber_to_hertz(note))
BUZZER.start(10) # Set dutycycle to 10
time.sleep(repeat(config["buzzer"]["failure_note_lengths"], i))
BUZZER.stop()
time.sleep(repeat(config["buzzer"]["failure_note_stop_lengths"], i))
def midinumber_to_hertz(note: int) -> float:
"""
Convert a midi number to a given frequency in Hertz
"""
return (440 / 32) * (2 ** ((note - 9) / 12))
def build_url(config, endpoint: str = "") -> str:
"""
Build a URL for requests
"""
protocol = "https"
if not config["server"]["https"]:
protocol = "http"
url = '{}://{}:{}/{}'.format(protocol, config["server"]["address"].rstrip("/"), config["server"]["port"], endpoint.lstrip("/"))
return url
def process_request(output_queue, config=None, logger=None):
"""
Process a event from the output queue.
This:
1. Sends a requests
2. Reacts to a given response
3. Blinks a LED red as long as the server cannot be reached
"""
while (True):
if (output_queue.qsize() > 0):
verified_id = output_queue.get()
response = send_request(verified_id, config, logger)
if response == "ok":
logger.info("Server Response: Success")
dispatch_led("success", config)
dispatch_buzzer("success", config)
elif response in ["failure", "error"]:
logger.info("Server Response: Failed")
dispatch_led("failure", config)
dispatch_buzzer("failure", config)
else:
time.sleep(0.01)
def send_request(verified_id, config, logger) -> str:
"""
Send post request to stechuhr-server
Return true if everything was ok, false otherwise
"""
if int(config["server"]["https"]):
target_address = 'https://{}:{}/'.format(config["server"]["address"], config["server"]["port"])
else:
target_address = 'http://{}:{}/'.format(config["server"]["address"], config["server"]["port"])
logger.info("Posting {} to {}".format(verified_id, target_address))
payload = {
"location" : config["client"]["location"],
"entrance" : config["client"]["entrance"],
"direction" : config["client"]["direction"],
"id" : verified_id
}
if not config["application"]["dryrun"]:
try:
r = requests.post(target_address, json=payload, timeout=config["server"]["timeout"], verify=config["server"]["verify_cert"])
except Exception as e:
logger.error(e)
return "error"
if r.ok:
return "ok"
else:
return "failure"
else:
# Always return true on dry run
logger.info("Dryrun: Would post request to {}".format(target_address))
logger.info("Dryrun: Payload: {}".format(str(payload)))
return "ok"
def id_pattern_check(visitor_id: str, config: dict) -> bool:
"""
Returns True if any of the patterns from the config matches.
Returns False if none of the patterns matches.
"""
matches = []
for match, pattern in [(re.match(pattern, visitor_id) is not None, pattern) for pattern in config["client"]["id_patterns"]]:
# Debug statement:
# print('{} + {} = {}'.format(visitor_id, pattern, match))
matches.append(match)
return any(matches)
def open_cardreader(config, logger) -> '_io.BufferedReader':
"""
Find a device that matches the Vendor and Model IDs and return an exclusive
file descriptor to it.
"""
# List fitting input devices
ctx = pyudev.Context()
devices = ctx.list_devices(subsystem='input', ID_BUS='usb')
# Filter out the devices that are None or don't fit vendor or model ids
devices = [d for d in devices if d.device_node is not None]
devices = [d for d in devices if d.properties['ID_VENDOR_ID'] == config["reader"]["vendor_id"]]
devices = [d for d in devices if d.properties['ID_MODEL_ID'] == config["reader"]["model_id"]]
# If no device found, exit
if len(devices) == 0:
logger.criticial("Error: No Cardreader connected? Found no device with vendor ID {} and model ID {}. Exiting.".format(config["reader"]["vendor_id"], config["reader"]["model_id"]))
exit(0)
# Select the first device that matches the specs
device = devices[0]
logger.info("Using cardreader device: {}".format(device))
try:
fd = open(device.device_node, 'rb')
except FileNotFoundError:
logger.criticial("No such device, is it connected?")
exit(0)
except PermissionError:
logger.criticial("Insufficent permission to read, run me as root!")
exit(0)
# from input.h:
EVIOCGNAME = lambda len: ioctl_opt.IOC(ioctl_opt.IOC_READ, ord('E'), 0x06, len)
EVIOCGRAB = lambda len: ioctl_opt.IOW(ord('E'), 0x90, ctypes.c_int)
# Get Device Name
name = ctypes.create_string_buffer(256)
fcntl.ioctl(fd, EVIOCGNAME(256), name, True)
# Grab exclusive Access for fd
logger.info("Grabbing device {} for exclusive access".format(name.value.decode('UTF-8')))
fcntl.ioctl(fd, EVIOCGRAB(1), True)
# Return file descriptor
return fd
def read_key_input(input_queue, config, logger):
"""
Read Keyboard Input and put it onto the input queue
"""
# Get a exclusive file descriptor for the card reader
cardreader = open_cardreader(config, logger)
# Variables for struct unpack
e_sec = "" # unix epoch second
e_usec = "" # unix epoch microsecond
e_type = "" # EV_SYN, EV_KEY, EV_MSC etc
e_code = "" # keycode
e_val = "" # keydown = 1, keyup = 0
logger.info("Listening for Input...")
# Buffer to store incoming keys till ENTER (keycode 28) is received
keybuffer = []
last_seen_cards = []
# Loop Forever over the Input
while True:
try:
# Read bytes from the card reader
byte = cardreader.read(EVENT_SIZE)
# Unpack the struct (see above for a description of the elements)
e_sec, e_usec, e_type, e_code, e_val = struct.unpack(EVENT_FORMAT, byte)
# Filter by event type
if e_type == EV_KEY and e_val == KEY_DOWN:
# Append to the buffer, until we receive an enter key
if not e_code == 28:
# Check if the e_code is a valid Keycode
if str(e_code) in KEYCODE.keys():
# Retrieve the actual character from the dict and append to buffer
key = KEYCODE[str(e_code)]
keybuffer.append(key)
else:
logger.warning("Received invalid Keycode: {}".format(e_code))
else:
# When enter is received put the joined string to the imput
# buffer where it will be processed by the requests thread
# and reset the keybuffer
joined_string = "".join(keybuffer)
if len(last_seen_cards) == 0:
input_queue.put(joined_string)
last_seen_cards.append({"id": joined_string, "time": time.time()})
else:
# Remove all cards from the list if they are past the threshold
last_seen_cards = [c for c in last_seen_cards if time.time()-c["time"] < config["client"]["ignore_duration"] ]
if not joined_string in [c["id"] for c in last_seen_cards]:
input_queue.put(joined_string)
last_seen_cards.append({"id": joined_string, "time": time.time()})
else:
card = [c for c in last_seen_cards if joined_string == c["id"]][0]
logger.info("Ignored duplicated register attempt of card {} because it was last seen {} seconds ago (ignoring all repeated attempts within {} seconds, according to config)".format(card["id"], time.time()-card["time"], config["client"]["ignore_duration"]))
keybuffer = []
except (KeyboardInterrupt, SystemExit, OSError) as e:
cardreader.close()
logger.critical("Exiting because of {}".format(e))
exit(0)
time.sleep(0.01)
def update_id_patterns(config, logger) -> 'Config':
"""
Update the id_patterns from the server via http
"""
url = build_url(config, "/config/database/id_patterns")
logger.debug("Requesting id_pattern update at address: {}".format(url))
# Send request
try:
r = requests.get(url, timeout=config["server"]["timeout"], verify=config["server"]["verify_cert"])
except requests.packages.urllib3.exceptions.MaxRetryError as e:
logger.error(f"Couldn't reach server at \"{url}\" (Timeout, using existing patterns instead): {e}")
return config
except requests.packages.urllib3.exceptions.NewConnectionError as e:
logger.error(f"Couldn't reach server at \"{url}\" (NewConnectionError, using existing patterns instead): {e}")
return config
except BaseException as e:
logger.error(f"Couldn't reach server at \"{url}\" (Error): {e}")
return config
# If the response was ok update the pattern if it changed, else display a warning
if r.ok:
patterns = string_to_list(r.text)
if set(patterns) == set(config["client"]["id_patterns"]):
logger.debug("No change in serverside id_pattern, so didn't update")
else:
# Only compile the patterns once we know something changed
patterns = [re.compile(p) for p in patterns]
config["client"]["id_patterns"] = patterns
logger.info("Received a newly updated patterns list from the server")
else:
logger.warn("Server at \"{}\" responded with {}".format(url, r.status_code))
return config
def main():
logger = logging.getLogger(APPLICATION_NAME)
logger.info('Starting main() in {}'.format(APPLICATION_NAME))
# Initialize the configuration (create a default one if needed)
config = initialize_config(logger)
# Create an input queue that stores all incoming keyboard events
# and process them on a seperate thread
input_queue = queue.Queue()
inputThread = threading.Thread(target=read_key_input, args=([input_queue, config, logger]), daemon=True)
inputThread.start()
# Create an output queue that stores all processed input queues (separated by Enter)
# and process them on a seperate thread
output_queue = queue.Queue()
outputThread = threading.Thread(target=process_request, args=(output_queue, config, logger), daemon=True)
outputThread.start()
# Dispatch a startup sound to signal readyness
dispatch_buzzer("startup", config)
# Set the LED to display readyness
dispatch_led("startup", config)
last_pattern_update = None
# On the main thread handle communications between the two other threads. That means:
# If there is sth. on the input queue, put it onto the output queue if it fits the
# specified format
while True:
if (input_queue.qsize() > 0):
potential_id = input_queue.get()
if id_pattern_check(potential_id, config):
verified_id = potential_id
output_queue.put(verified_id)
else:
logger.warning("Didn't register as a valid ID: {}".format(potential_id[:1024]))
dispatch_led("failure", config)
dispatch_buzzer("failure", config)
else:
time.sleep(0.01)
# Update the patterns at the pace defined by config["server"]["update_frequency"]
if last_pattern_update is None or time.time() - last_pattern_update > config["server"]["update_frequency"]:
last_pattern_update = time.time()
config = update_id_patterns(config, logger)
time.sleep(0.01)
logger.info("End.")
if (__name__ == '__main__'):
main()