Skip to content
Snippets Groups Projects
Commit 0a2f0151 authored by David Huss's avatar David Huss :speech_balloon:
Browse files

Refactor, comments, pattern check

parent 716996da
No related branches found
No related tags found
No related merge requests found
#!/usr/bin/env python
#-*- coding: utf-8 -*-
import re
import datetime as dt
import sqlite3
from flask import Flask, request
from logging.config import dictConfig
from .config import initialize_config
......@@ -21,9 +21,13 @@ ignore_get_requests = false
# sqlite db path, use ":memory:" for RAM db
path = "visitors.db"
# minimum and maximum lengths
min_id_length = 6
max_id_length = 24
# A list of possible regex patterns for the id (logical OR!)
id_patterns = [
"[A-z0-9]{24}",
"[A-Z0-9]{6,8}",
]
# minimum and maximum lengths for the received strings
min_entrance_length = 1
max_entrance_length = 128
min_place_length = 1
......@@ -38,7 +42,11 @@ format = "[%(asctime)s] %(levelname)s in %(module)s: %(message)s"
# Initialize the configuration (create a default one if needed)
config = initialize_config(APPLICATION_NAME, DEFAULT_CONFIG)
# Compile the patterns for the ids once at startup
config["database"]["id_patterns"] = [re.compile(p) for p in config["database"]["id_patterns"]]
# Config for the logger, there should be no need to make
# manual changes here
dictConfig({
'version': 1,
'formatters': {'default': {
......@@ -56,7 +64,11 @@ dictConfig({
})
# Initialization
app = Flask(APPLICATION_NAME)
# Create an initial connection and create the needed tables if they
# don't exist yet.
conn = sqlite3.connect(config["database"]["path"])
cursor = conn.cursor()
sqlite_create_table_query = '''CREATE TABLE visitors (
......@@ -71,7 +83,6 @@ sqlite_create_table_query = '''CREATE TABLE visitors (
try:
app.logger.info('Constructing table for visitors')
cursor.execute(sqlite_create_table_query)
app.logger.info('Constructed table for visitors')
except sqlite3.OperationalError as e:
app.logger.warning(e)
pass
......@@ -82,12 +93,21 @@ def register_movement(data: dict, conn, cursor):
"""
Construct and store a new movment in the database
"""
# Create a timestamp
data["time"] = dt.datetime.now()
# Construct an ID for the current movement
data = construct_movement_id(data)
# 1. Draft SQL-Statement
sqlite_insert_with_param = """INSERT INTO 'visitors'
('movement_id', 'id', 'place', 'entrance', 'direction', 'time')
VALUES (?, ?, ?, ?, ?, ?);"""
# 2. Draft parameters
data_tuple = (data["movement_id"], data["id"], data["place"], data["entrance"], data["direction"], data["time"])
# Execute both and commit to db
cursor.execute(sqlite_insert_with_param, data_tuple)
conn.commit()
......@@ -95,11 +115,19 @@ def register_movement(data: dict, conn, cursor):
def construct_movement_id(data: dict) -> dict:
"""
Generate and set the "movement_id" for a given data dict
Needs to be collision proof for the db.
Looks like: 2020-11-12T07:10:49.81--543GFDHGfddf455-lerchenfeld_Haupteingang/in
"""
# If data["time"] is not set, raise Error
if not "time" in data.keys():
raise ValueError("No \"time\" key set yet. run construct_movement_id only after setting time")
# Use isoformat time like 2020-11-12T07:06:08.595770 ...
time = data["time"].isoformat()
movement_id = "{}--{}-{}/{}/{}".format(time, data["id"], data["place"], data["entrance"], data["direction"])
# ... but limit the precision of sub seconds to two places
time = "{}.{}".format(time.rsplit(".")[0], time.rsplit(".")[1][:2])
# Construct final movement ID.
movement_id = "{}--{}-{}_{}/{}".format(time, data["id"], data["place"], data["entrance"], data["direction"])
data["movement_id"] = movement_id
return data
......@@ -108,9 +136,11 @@ def get_records(conn, cursor):
"""
Get all existing movement records
"""
sqlite_select_query = """SELECT place, entrance, direction, time, id from visitors where movement_id = ?"""
# Draft select query
sqlite_select_query = """SELECT place, entrance, direction, time, id from visitors"""
try:
cursor.execute(sqlite_select_query, (1,))
# Execute query and fill visitor with results
cursor.execute(sqlite_select_query)
records = cursor.fetchall()
visitors = [{"place":r[0], "entrance":r[1], "direction":r[2], "time":r[3], "id":r[4]} for r in records]
except sqlite3.OperationalError as e:
......@@ -120,8 +150,6 @@ def get_records(conn, cursor):
return visitors
def is_valid_data(data: dict) -> bool:
"""
Check if the body data of the request is valid
......@@ -144,21 +172,41 @@ def is_valid_data(data: dict) -> bool:
return False
# Basic length check for place
if len(data["place"]) >= max_place_length:
app.logger.info('400, JSON "place"-key was too long: was {} should be <= {}'.format(len(data["place"]), max_entrance_length))
if not length_check(data, "place", config["database"]["min_place_length"], config["database"]["max_place_length"]):
return False
# Basic length check for entrance
if len(data["entrance"]) >= max_entrance_length:
app.logger.info('400, JSON "entrance"-key was too long: was {} should be <= {}'.format(len(data["entrance"]), max_entrance_length))
if not length_check(data, "entrance", config["database"]["min_entrance_length"], config["database"]["max_entrance_length"]):
return False
# Basic length check for ID
if len(data["id"]) >= max_id_length:
app.logger.info('400, JSON "id"-key was too long: was {} should be <= {}'.format(len(data["id"]), max_id_length))
# Match regex patterns with visitor ID
if not id_pattern_check(data["id"]):
app.logger.info('JSON "id"-value didn\'t match any pattern: {}'.format(data["id"]))
return False
# todo: Add more concise checks for the ID
return True
def id_pattern_check(visitor_id: str) -> bool:
"""
Returns True if any of the patterns from the config matches.
Returns False if none of the patterns matches.
"""
return any([re.match(p, visitor_id) for p in config["database"]["id_patterns"]])
def length_check(data: dict, key: str, minimum: int, maximum: int) -> bool:
"""
Returns True if the given data[key] value length is within
minimum and maximum. False and a log message otherwise.
"""
# Basic length check for ID (max)
if len(data[key]) >= maximum:
app.logger.info('JSON "{}"-value was too long: was {} should be <= {}'.format(key, len(data[key]), maximum))
return False
if len(data[key]) <= minimum:
app.logger.info('JSON "{}"-value was too short: was {} should be >= {}'.format(key, len(data[key]), minimum))
return False
return True
......
#! /bin/bash
curl --header "Content-Type: application/json" --request POST --data '{"place":"lerchenfeld", "entrance":"Haupteingang", "direction":"in", "id":"543GFDHGfddf455"}' http://localhost:5000/
curl --header "Content-Type: application/json" --request POST --data '{"place":"lerchenfeld", "entrance":"haupteingang", "direction":"in", "id":"543GFDHGfddf455"}' http://localhost:5000/
0% Loading or .
You are about to add 0 people to the discussion. Proceed with caution.
Please register or to comment