From 225da05558456564dc5f736f2ac78421daadb87e Mon Sep 17 00:00:00 2001 From: atoav <dh@atoav.com> Date: Wed, 29 Apr 2020 13:05:25 +0200 Subject: [PATCH] Refactor printing and meeting code into files --- bbbmon/bbbmon.py | 259 ++------------------------------------------- bbbmon/meetings.py | 171 ++++++++++++++++++++++++++++++ bbbmon/printing.py | 103 ++++++++++++++++++ 3 files changed, 285 insertions(+), 248 deletions(-) create mode 100644 bbbmon/meetings.py create mode 100644 bbbmon/printing.py diff --git a/bbbmon/bbbmon.py b/bbbmon/bbbmon.py index 02b94ac..42b2578 100755 --- a/bbbmon/bbbmon.py +++ b/bbbmon/bbbmon.py @@ -2,27 +2,17 @@ # -*- coding: utf-8 -*- import os import time -import hashlib -from datetime import datetime, timedelta -import requests import click -from xml.etree import cElementTree as ElementTree -from typing import NewType, Optional, Tuple, Iterable, List + # Local module imports from bbbmon.xmldict import XmlListConfig, XmlDictConfig from bbbmon.configuration import Config, Endpoint, SERVER_PROPERTIES_FILE, Url, Secret, get_user_config_path, init_config, new_config +from bbbmon.meetings import * +from bbbmon.printing import * -FRIENDLY_KEYNAMES = { - "participantCount" : "Participants", - "listenerCount" : "only listening", - "voiceParticipantCount" : "Mics on", - "videoCount" : "Webcams on", - "moderatorCount" : "Number of Moderators" -} - # Allow -h as help option as well CONTEXT_SETTINGS = dict(help_option_names=['-h', '--help']) @@ -46,238 +36,6 @@ class AliasedGroup(click.Group): ctx.fail('Too many matches: %s' % ', '.join(sorted(matches))) -def generate_checksum(call_name: str, query_string: str, secret: Secret) -> str: - """ - Generate Checksum for the request header (passed as value for `?checksum=`) - """ - m = hashlib.sha1() - m.update(call_name.encode('utf-8')) - m.update(query_string.encode('utf-8')) - m.update(secret.encode('utf-8')) - return m.hexdigest() - - -def request_meetings(secret: Secret, bbb_url: Url, user_config_path: str) -> XmlDictConfig: - """ - Make a getMeetings-API Call to the bbb instance and return a XmlDictConfig - with the servers response - """ - call_name = "getMeetings" - checksum = generate_checksum(call_name, "", secret) - url = "{}/api/{}?checksum={}".format(bbb_url, call_name, checksum) - - try: - r = requests.get(url) - except: - click.echo("{} The URL \"{}\" is unreachable.\n Check your network connection, and the URL and Secret of the endpoint.".format(click.style('Error:', fg='red', bold=True), url)) - print() - time.sleep(1) - if click.confirm(click.style('Do you want to open the config file at {} with your default editor?'.format(user_config_path), fg="yellow"), abort=True): - click.edit(filename=user_config_path) - exit() - - root = ElementTree.XML(r.text) - xmldict = XmlDictConfig(root) - if "returncode" in xmldict.keys(): - if xmldict['returncode'] == "FAILED": - print(xmldict) - exit() - else: - print(r.text) - exit() - return xmldict - - -def get_meetings(secret: Secret, bbb_url: Url, user_config_path: str) -> Iterable[XmlDictConfig]: - """ - Request meetings and return a list of them. Sorted by biggest first - """ - meetings = [] - d = request_meetings(secret, bbb_url, user_config_path) - - if d["meetings"] is None: - return [] - - if type(d["meetings"]["meeting"]) is XmlListConfig: - meetings = sorted([m for m in d["meetings"]["meeting"] if m["running"] == "true"], key=lambda x:int(x['participantCount']), reverse=True) - elif type(d["meetings"]["meeting"]) is XmlDictConfig: - meetings = [d["meetings"]["meeting"]] - return meetings - - -def get_presenter(meeting: XmlDictConfig) -> Optional[XmlDictConfig]: - """ - Get the presenter of a meeting (return None if there is none) - """ - presenters = [] - if type(meeting["attendees"]["attendee"]) is XmlListConfig: - presenters = [a for a in meeting["attendees"]["attendee"] if a["isPresenter"] == "true"] - elif type(meeting["attendees"]["attendee"]) is XmlDictConfig: - presenters = [meeting["attendees"]["attendee"]] - - if len(presenters) > 0: - return presenters[0] - else: - return None - - -def get_duration(meeting: XmlDictConfig) -> timedelta: - """ - Return the duration of a meeting - """ - timestamp = int(meeting["startTime"][:-3]) - start_time = datetime.fromtimestamp(timestamp) - duration = datetime.now() - start_time - return duration - - -def strfdelta(duration: timedelta) -> str: - """ - Helper function for datetime.timedelta formatting, use like this: - strfdelta(delta_obj, "{days} days {hours}:{minutes}:{seconds}") - """ - s = int(duration.total_seconds()) - - return '{:02}:{:02}:{:02}'.format(s // 3600, s % 3600 // 60, s % 60) - - -def format_duration(meeting: XmlDictConfig) -> str: - """ - Helper functions for duration - """ - duration = get_duration(meeting) - return strfdelta(duration) - - - -def get_formated_presenter_name_id(meeting: XmlDictConfig) -> str: - """ - Get the formated name of the presenter for a given meeting (with id) - """ - presenter = get_presenter(meeting) - if presenter is not None: - return "{:<30} ({})".format(presenter["fullName"], presenter["userID"]) - else: - return "no Presenter" - -def get_formated_presenter_name(meeting: XmlDictConfig) -> str: - """ - Get the formated name of the presenter for a given meeting - """ - presenter = get_presenter(meeting) - if presenter is not None: - return "{:<30}".format(presenter["fullName"]) - else: - return "no Presenter" - - -def print_leaderboard(meetings: Iterable[XmlDictConfig], key: str, endpoint_name: str, presenter: bool, presenter_id: bool, fancy: bool): - """ - Print a leaderboard of all meetings sorted by a given key (e.g. - participantCount) - """ - print_header(endpoint_name, "LEADERBOARD ({})".format(FRIENDLY_KEYNAMES[key]), fancy) - sorted_by = sorted([m for m in meetings], key=lambda x:int(x[key]), reverse=True) - for m in sorted_by: - if presenter: - if presenter_id: - print("{:>5} {:<45} {}".format(m[key], m["meetingName"], get_formated_presenter_name_id(m))) - else: - print("{:>5} {:<45} {}".format(m[key], m["meetingName"], get_formated_presenter_name(m))) - else: - print("{:>5} {}".format(m[key], m["meetingName"])) - - -def print_duration_leaderboard(meetings: Iterable[XmlDictConfig], endpoint_name: str, presenter: bool, presenter_id: bool, fancy: bool): - """ - Print a leaderboard of all meetings sorted by a given key (e.g. - participantCount) - """ - print_header(endpoint_name, "LEADERBOARD (Duration)", fancy) - by_duration = sorted([m for m in meetings], key=lambda x:int(get_duration(x).total_seconds()), reverse=True) - - for m in by_duration: - if presenter: - if presenter_id: - print("{:>12} {:<38} {}".format(format_duration(m), m["meetingName"], get_formated_presenter_name_id(m))) - else: - print("{:>12} {:<38} {}".format(format_duration(m), m["meetingName"], get_formated_presenter_name(m))) - else: - print("{:>12} {}".format(format_duration(m), m["meetingName"])) - - -def print_header(endpoint_name: str, text: str, fancy=True): - if fancy: - click.echo(click.style(" [{}] {} ".format(endpoint_name, text), fg='black', bg='white', bold=True)) - else: - print("[{}] {}".format(endpoint_name, text)) - - -def print_overview(config: Config, leaderboards: bool, participants: bool, presenter: bool, presenter_id: bool, show_meetings: bool, watch: int, fancy: bool): - """ - For each endpoint in the configuration get the active meetings and print - out an overview of the current bbb-usage - """ - - # Request Meetings from API - meetings = [get_meetings(e.secret, e.url, config.path) for e in config.endpoints] - - # Clear screen after request is done, and before printing new data to keep - # blinking to a minimum - if watch is not None: - click.clear() - - - for i, endpoint in enumerate(config.endpoints): - meeting = meetings[i] - - # Print divider if there is more than one endpoint - if i > 0: - print() - print("="*click.get_terminal_size()[0]) - print() - - # If there are no meetings, skip to next endpoint - if len(meeting) == 0: - if show_meetings: - print_header(endpoint.name, "MEETINGS", fancy) - print(" └─── Currently no active meetings.") - continue - - n_running = len(meeting) - n_recording = len([m for m in meeting if m["recording"] == "true"]) - n_participants = sum([int(m["participantCount"]) for m in meeting]) - n_listeners = sum([int(m["listenerCount"]) for m in meeting]) - n_voice = sum([int(m["voiceParticipantCount"]) for m in meeting]) - n_video = sum([int(m["videoCount"]) for m in meeting]) - n_moderator = sum([int(m["moderatorCount"]) for m in meeting]) - - if show_meetings: - print_header(endpoint.name, "MEETINGS", fancy) - print(" ├─── {:>4} running".format(n_running)) - print(" └─── {:>4} recording".format(n_recording)) - print() - - if participants: - print_header(endpoint.name, "PARTICIPANTS across all {} rooms".format(n_running), fancy) - print(" └─┬─ {:>4} total".format(n_participants)) - print(" ├─ {:>4} listening only".format(n_listeners)) - print(" ├─ {:>4} mic on".format(n_voice)) - print(" ├─ {:>4} video on".format(n_video)) - print(" └─ {:>4} moderators".format(n_moderator)) - - if leaderboards: - print() - print_leaderboard(meeting, "participantCount", endpoint.name, presenter, presenter_id, fancy) - print() - print_leaderboard(meeting, "videoCount", endpoint.name, presenter, presenter_id, fancy) - print() - print_leaderboard(meeting, "voiceParticipantCount", endpoint.name, presenter, presenter_id, fancy) - print() - print_duration_leaderboard(meeting, endpoint.name, presenter, presenter_id, fancy) - - - @click.group(context_settings=CONTEXT_SETTINGS, cls=AliasedGroup) def main(): """BBBMON is a small CLI utility to monitor bbb usage @@ -290,6 +48,8 @@ def main(): """ pass + + @main.command(context_settings=CONTEXT_SETTINGS) @click.pass_context @click.option('--endpoint', '-e', multiple=True, help="Filter by one or more endpoints as named in the user configuration (e.g. [servername]). Order is respected.") @@ -317,10 +77,10 @@ def meetings(ctx, short, all_, leaderboards, participants, presenter, watch, pre config.filter_endpoints(endpoint) if watch is not None: while watch is not None: - print_overview(config, leaderboards, participants, presenter, presenter_id, meetings, watch, fancy) + list_meetings(config, leaderboards, participants, presenter, presenter_id, meetings, watch, fancy) time.sleep(watch) else: - print_overview(config, leaderboards, participants, presenter, presenter_id, meetings, watch, fancy) + list_meetings(config, leaderboards, participants, presenter, presenter_id, meetings, watch, fancy) @@ -330,7 +90,7 @@ def meetings(ctx, short, all_, leaderboards, participants, presenter, watch, pre @click.option('--edit', is_flag=True, help="Open the config in the default editor") @click.option('--print', 'print_', is_flag=True, help="Print the config to stdout") @click.option('--path', is_flag=True, help="Print the path to the config") -def config(ctx, new, short, edit, path, print_): +def config(ctx, new, edit, path, print_): """Print, show or edit the config""" user_config_path = get_user_config_path() @@ -360,5 +120,8 @@ def config(ctx, new, short, edit, path, print_): + + + if __name__ == "__main__": main() \ No newline at end of file diff --git a/bbbmon/meetings.py b/bbbmon/meetings.py new file mode 100644 index 0000000..59720b4 --- /dev/null +++ b/bbbmon/meetings.py @@ -0,0 +1,171 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import time +import hashlib +from datetime import datetime, timedelta +import requests +from xml.etree import cElementTree as ElementTree +from typing import NewType, Optional, Tuple, Iterable, List +import click + +from bbbmon.xmldict import XmlListConfig, XmlDictConfig +from bbbmon.configuration import Config, Endpoint, SERVER_PROPERTIES_FILE, Url, Secret, get_user_config_path, init_config, new_config +import bbbmon.printing as printing + + + + + + +def generate_checksum(call_name: str, query_string: str, secret: Secret) -> str: + """ + Generate Checksum for the request header (passed as value for `?checksum=`) + """ + m = hashlib.sha1() + m.update(call_name.encode('utf-8')) + m.update(query_string.encode('utf-8')) + m.update(secret.encode('utf-8')) + return m.hexdigest() + + +def request_meetings(secret: Secret, bbb_url: Url, user_config_path: str) -> XmlDictConfig: + """ + Make a getMeetings-API Call to the bbb instance and return a XmlDictConfig + with the servers response + """ + call_name = "getMeetings" + checksum = generate_checksum(call_name, "", secret) + url = "{}/api/{}?checksum={}".format(bbb_url, call_name, checksum) + + try: + r = requests.get(url) + except: + click.echo("{} The URL \"{}\" is unreachable.\n Check your network connection, and the URL and Secret of the endpoint.".format(click.style('Error:', fg='red', bold=True), url)) + print() + time.sleep(1) + if click.confirm(click.style('Do you want to open the config file at {} with your default editor?'.format(user_config_path), fg="yellow"), abort=True): + click.edit(filename=user_config_path) + exit() + + root = ElementTree.XML(r.text) + xmldict = XmlDictConfig(root) + if "returncode" in xmldict.keys(): + if xmldict['returncode'] == "FAILED": + print(xmldict) + exit() + else: + print(r.text) + exit() + return xmldict + + +def get_meetings(secret: Secret, bbb_url: Url, user_config_path: str) -> Iterable[XmlDictConfig]: + """ + Request meetings and return a list of them. Sorted by biggest first + """ + meetings = [] + d = request_meetings(secret, bbb_url, user_config_path) + + if d["meetings"] is None: + return [] + + if type(d["meetings"]["meeting"]) is XmlListConfig: + meetings = sorted([m for m in d["meetings"]["meeting"] if m["running"] == "true"], key=lambda x:int(x['participantCount']), reverse=True) + elif type(d["meetings"]["meeting"]) is XmlDictConfig: + meetings = [d["meetings"]["meeting"]] + return meetings + + +def get_presenter(meeting: XmlDictConfig) -> Optional[XmlDictConfig]: + """ + Get the presenter of a meeting (return None if there is none) + """ + presenters = [] + if type(meeting["attendees"]["attendee"]) is XmlListConfig: + presenters = [a for a in meeting["attendees"]["attendee"] if a["isPresenter"] == "true"] + elif type(meeting["attendees"]["attendee"]) is XmlDictConfig: + presenters = [meeting["attendees"]["attendee"]] + + if len(presenters) > 0: + return presenters[0] + else: + return None + + +def get_duration(meeting: XmlDictConfig) -> timedelta: + """ + Return the duration of a meeting + """ + timestamp = int(meeting["startTime"][:-3]) + start_time = datetime.fromtimestamp(timestamp) + duration = datetime.now() - start_time + return duration + + +def list_meetings(config: Config, leaderboards: bool, participants: bool, presenter: bool, presenter_id: bool, show_meetings: bool, watch: int, fancy: bool): + """ + For each endpoint in the configuration get the active meetings and print + out an overview of the current bbb-usage + """ + + # Request Meetings from API + meetings = [get_meetings(e.secret, e.url, config.path) for e in config.endpoints] + + # Clear screen after request is done, and before printing new data to keep + # blinking to a minimum + if watch is not None: + click.clear() + + + for i, endpoint in enumerate(config.endpoints): + meeting = meetings[i] + + # Print divider if there is more than one endpoint + if i > 0: + print() + print("="*click.get_terminal_size()[0]) + print() + + # If there are no meetings, skip to next endpoint + if len(meeting) == 0: + if show_meetings: + printing.print_header(endpoint.name, "MEETINGS", fancy) + print(" └─── Currently no active meetings.") + continue + + n_running = len(meeting) + n_recording = len([m for m in meeting if m["recording"] == "true"]) + n_participants = sum([int(m["participantCount"]) for m in meeting]) + n_listeners = sum([int(m["listenerCount"]) for m in meeting]) + n_voice = sum([int(m["voiceParticipantCount"]) for m in meeting]) + n_video = sum([int(m["videoCount"]) for m in meeting]) + n_moderator = sum([int(m["moderatorCount"]) for m in meeting]) + + if show_meetings: + printing.print_header(endpoint.name, "MEETINGS", fancy) + print(" ├─── {:>4} running".format(n_running)) + print(" └─── {:>4} recording".format(n_recording)) + print() + + if participants: + printing.print_header(endpoint.name, "PARTICIPANTS across all {} rooms".format(n_running), fancy) + print(" └─┬─ {:>4} total".format(n_participants)) + print(" ├─ {:>4} listening only".format(n_listeners)) + print(" ├─ {:>4} mic on".format(n_voice)) + print(" ├─ {:>4} video on".format(n_video)) + print(" └─ {:>4} moderators".format(n_moderator)) + + if leaderboards: + print() + printing.print_leaderboard(meeting, "participantCount", endpoint.name, presenter, presenter_id, fancy) + print() + printing.print_leaderboard(meeting, "videoCount", endpoint.name, presenter, presenter_id, fancy) + print() + printing.print_leaderboard(meeting, "voiceParticipantCount", endpoint.name, presenter, presenter_id, fancy) + print() + printing.print_duration_leaderboard(meeting, endpoint.name, presenter, presenter_id, fancy) + + + + diff --git a/bbbmon/printing.py b/bbbmon/printing.py new file mode 100644 index 0000000..208fc11 --- /dev/null +++ b/bbbmon/printing.py @@ -0,0 +1,103 @@ +#!/usr/bin/env python3 +# -*- coding: utf-8 -*- + +import click +from datetime import datetime, timedelta +from typing import NewType, Optional, Tuple, Iterable, List + +from bbbmon.xmldict import XmlListConfig, XmlDictConfig +import bbbmon.meetings + + + + +FRIENDLY_KEYNAMES = { + "participantCount" : "Participants", + "listenerCount" : "only listening", + "voiceParticipantCount" : "Mics on", + "videoCount" : "Webcams on", + "moderatorCount" : "Number of Moderators" +} + + + +def strfdelta(duration: timedelta) -> str: + """ + Helper function for datetime.timedelta formatting, use like this: + strfdelta(delta_obj, "{days} days {hours}:{minutes}:{seconds}") + """ + s = int(duration.total_seconds()) + + return '{:02}:{:02}:{:02}'.format(s // 3600, s % 3600 // 60, s % 60) + + +def format_duration(meeting: XmlDictConfig) -> str: + """ + Helper functions for duration + """ + duration = bbbmon.meetings.get_duration(meeting) + return strfdelta(duration) + + + +def get_formated_presenter_name_id(meeting: XmlDictConfig) -> str: + """ + Get the formated name of the presenter for a given meeting (with id) + """ + presenter = bbbmon.meetings.get_presenter(meeting) + if presenter is not None: + return "{:<30} ({})".format(presenter["fullName"], presenter["userID"]) + else: + return "no Presenter" + +def get_formated_presenter_name(meeting: XmlDictConfig) -> str: + """ + Get the formated name of the presenter for a given meeting + """ + presenter = bbbmon.meetings.get_presenter(meeting) + if presenter is not None: + return "{:<30}".format(presenter["fullName"]) + else: + return "no Presenter" + + +def print_leaderboard(meetings: Iterable[XmlDictConfig], key: str, endpoint_name: str, presenter: bool, presenter_id: bool, fancy: bool): + """ + Print a leaderboard of all meetings sorted by a given key (e.g. + participantCount) + """ + print_header(endpoint_name, "LEADERBOARD ({})".format(FRIENDLY_KEYNAMES[key]), fancy) + sorted_by = sorted([m for m in meetings], key=lambda x:int(x[key]), reverse=True) + for m in sorted_by: + if presenter: + if presenter_id: + print("{:>5} {:<45} {}".format(m[key], m["meetingName"], get_formated_presenter_name_id(m))) + else: + print("{:>5} {:<45} {}".format(m[key], m["meetingName"], get_formated_presenter_name(m))) + else: + print("{:>5} {}".format(m[key], m["meetingName"])) + + +def print_duration_leaderboard(meetings: Iterable[XmlDictConfig], endpoint_name: str, presenter: bool, presenter_id: bool, fancy: bool): + """ + Print a leaderboard of all meetings sorted by a given key (e.g. + participantCount) + """ + print_header(endpoint_name, "LEADERBOARD (Duration)", fancy) + by_duration = sorted([m for m in meetings], key=lambda x:int(bbbmon.meetings.get_duration(x).total_seconds()), reverse=True) + + for m in by_duration: + if presenter: + if presenter_id: + print("{:>12} {:<38} {}".format(format_duration(m), m["meetingName"], get_formated_presenter_name_id(m))) + else: + print("{:>12} {:<38} {}".format(format_duration(m), m["meetingName"], get_formated_presenter_name(m))) + else: + print("{:>12} {}".format(format_duration(m), m["meetingName"])) + + +def print_header(endpoint_name: str, text: str, fancy=True): + if fancy: + click.echo(click.style(" [{}] {} ".format(endpoint_name, text), fg='black', bg='white', bold=True)) + else: + print("[{}] {}".format(endpoint_name, text)) \ No newline at end of file -- GitLab