bbbmon.py 10.8 KB
Newer Older
David Huss's avatar
David Huss committed
1
2
3
4
5
#!/usr/bin/env python
# -*- coding: utf-8 -*-

import os
import hashlib
6
from datetime import datetime, timedelta
David Huss's avatar
David Huss committed
7
8
9
10
11
12
13
import requests
import appdirs
from xml.etree import cElementTree as ElementTree
from typing import NewType, Optional, Tuple, Iterable



14
# Default path
David Huss's avatar
David Huss committed
15
16
SERVER_PROPERTIES_FILE = "/usr/share/bbb-web/WEB-INF/classes/bigbluebutton.properties"

17
# Type definitions
David Huss's avatar
David Huss committed
18
19
20
Secret = NewType('Secret', str)
Url    = NewType('Url', str)

21
22
23
24
25
26
27
28
FRIENDLY_KEYNAMES = {
    "participantCount": "Participants",
    "listenerCount": "only listening",
    "voiceParticipantCount": "Mics on",
    "videoCount": "Webcams on",
    "moderatorCount": "Number of Moderators"
}

David Huss's avatar
David Huss committed
29
30
31
32
33
34
35
36
37
38
39
40
41
42
43
44
45
46
47
48
49
50
51
52
53
54
55
56
57
58
59
60
61
62
63
64
65
66
67
68
69
70
71
72
73
74
75
76
77
78
79
80
81
82
83
84
85
86
87
88
89
90
91
92
93
94
95
96
97
98
99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127


class XmlListConfig(list):
    """
    Helper class to convert XML to python dicts
    """
    def __init__(self, aList):
        for element in aList:
            if element:
                # treat like dict
                if len(element) == 1 or element[0].tag != element[1].tag:
                    self.append(XmlDictConfig(element))
                # treat like list
                elif element[0].tag == element[1].tag:
                    self.append(XmlListConfig(element))
            elif element.text:
                text = element.text.strip()
                if text:
                    self.append(text)


class XmlDictConfig(dict):
    '''
    Example usage:

    >>> tree = ElementTree.parse('your_file.xml')
    >>> root = tree.getroot()
    >>> xmldict = XmlDictConfig(root)

    Or, if you want to use an XML string:

    >>> root = ElementTree.XML(xml_string)
    >>> xmldict = XmlDictConfig(root)

    And then use xmldict for what it is... a dict.
    '''
    def __init__(self, parent_element):
        if parent_element.items():
            self.update(dict(parent_element.items()))
        for element in parent_element:
            if element:
                # treat like dict - we assume that if the first two tags
                # in a series are different, then they are all different.
                if len(element) == 1 or element[0].tag != element[1].tag:
                    aDict = XmlDictConfig(element)
                # treat like list - we assume that if the first two tags
                # in a series are the same, then the rest are the same.
                else:
                    # here, we put the list in dictionary; the key is the
                    # tag name the list elements all share in common, and
                    # the value is the list itself 
                    aDict = {element[0].tag: XmlListConfig(element)}
                # if the tag has attributes, add those to the dict
                if element.items():
                    aDict.update(dict(element.items()))
                self.update({element.tag: aDict})
            # this assumes that if you've got an attribute in a tag,
            # you won't be having any text. This may or may not be a 
            # good idea -- time will tell. It works for the way we are
            # currently doing XML configuration files...
            elif element.items():
                self.update({element.tag: dict(element.items())})
            # finally, if there are no child tags and no attributes, extract
            # the text
            else:
                self.update({element.tag: element.text})


def generate_checksum(call_name: str, query_string: str, secret: Secret) -> str:
    """
    Generate Checksum for the request header (passed as value for `?checksum=`)
    """
    m = hashlib.sha1()
    m.update(call_name.encode('utf-8'))
    m.update(query_string.encode('utf-8'))
    m.update(secret.encode('utf-8'))
    return m.hexdigest()


def request_meetings(secret: Secret, bbb_url: Url) -> XmlDictConfig:
    """
    Make a getMeetings-API Call to the bbb instance and return a XmlDictConfig
    with the servers response
    """
    call_name = "getMeetings"
    checksum = generate_checksum(call_name, "", secret)
    url = "{}/api/{}?checksum={}".format(bbb_url, call_name, checksum)
    r = requests.get(url)
    root = ElementTree.XML(r.text)
    xmldict = XmlDictConfig(root)
    if "returncode" in xmldict.keys():
        if xmldict['returncode'] == "FAILED":
            print(xmldict)
            exit()
    else:
        print(r.text)
        exit()
    return xmldict

128

David Huss's avatar
David Huss committed
129
def get_meetings(secret: Secret, bbb_url: Url) -> Iterable[XmlDictConfig]:
130
131
132
    """
    Request meetings and return a list of them. Sorted by biggest first
    """
David Huss's avatar
David Huss committed
133
134
135
    meetings = []
    d = request_meetings(secret, bbb_url)

David Huss's avatar
David Huss committed
136
137
138
139
    if d["meetings"] is None:
        print("There are no active meetings currently.")
        exit()

David Huss's avatar
David Huss committed
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
    if type(d["meetings"]["meeting"]) is XmlListConfig:
        meetings = sorted([m for m in d["meetings"]["meeting"] if m["running"] == "true"], key=lambda x:int(x['participantCount']), reverse=True)
    elif type(d["meetings"]["meeting"]) is XmlDictConfig:
        meetings = [d["meetings"]["meeting"][0]]
    return meetings


def get_presenter(meeting: XmlDictConfig) -> Optional[XmlDictConfig]:
    """
    Get the presenter of a meeting (return None if there is none)
    """
    presenters = []
    if type(meeting["attendees"]["attendee"]) is XmlListConfig:
        presenters = [a for a in meeting["attendees"]["attendee"] if a["isPresenter"] == "true"]
    elif type(meeting["attendees"]["attendee"]) is XmlDictConfig:
        presenters = [meeting["attendees"]["attendee"]]
    
    if len(presenters) > 0:
        return presenters[0]
    else:
        return None


163
164
165
166
167
168
169
170
171
172
def get_duration(meeting: XmlDictConfig) -> timedelta:
    """
    Return the duration of a meeting
    """
    timestamp = int(meeting["startTime"][:-3])
    start_time = datetime.fromtimestamp(timestamp)
    duration = datetime.now() - start_time
    return duration


173
def strfdelta(duration: timedelta) -> str:
174
175
176
177
    """
    Helper function for datetime.timedelta formatting, use like this:
    strfdelta(delta_obj, "{days} days {hours}:{minutes}:{seconds}")
    """
178
179
180
    s = int(duration.total_seconds())

    return '{:02}:{:02}:{:02}'.format(s // 3600, s % 3600 // 60, s % 60)
181
182
183
184


def format_duration(meeting: XmlDictConfig) -> str:
    duration = get_duration(meeting)
185
    return strfdelta(duration)
186
187


David Huss's avatar
David Huss committed
188
189
190
191
192
193
194

def get_formated_presenter_name(meeting: XmlDictConfig) -> str:
    """
    Get the formated name of the presenter for a given meeting
    """
    presenter = get_presenter(meeting)
    if presenter is not None:
195
        return "{:<30} ({})".format(presenter["fullName"], presenter["userID"])
David Huss's avatar
David Huss committed
196
197
198
199
    else:
        return "no Presenter"


200
201
202
203
204
205
def print_leaderboard(meetings: Iterable[XmlDictConfig], key: str):
    """
    Print a leaderboard of all meetings sorted by a given key (e.g. 
    participantCount)
    """
    print("LEADERBOARD ({})".format(FRIENDLY_KEYNAMES[key]))
206
207
    sorted_by = sorted([m for m in meetings], key=lambda x:int(x[key]), reverse=True)
    for m in sorted_by:
208
209
210
211
212
213
214
215
216
217
218
219
        print("{:>5} {:<45} {}".format(m[key], m["meetingName"], get_formated_presenter_name(m))) 


def print_duration_leaderboard(meetings: Iterable[XmlDictConfig]):
    """
    Print a leaderboard of all meetings sorted by a given key (e.g. 
    participantCount)
    """
    print("LEADERBOARD (Duration)")
    by_duration = sorted([m for m in meetings], key=lambda x:int(get_duration(x).total_seconds()), reverse=True)

    for m in by_duration:
220
        print("{:>12} {:<45} {}".format(format_duration(m), m["meetingName"], get_formated_presenter_name(m))) 
221
222


David Huss's avatar
David Huss committed
223
def print_overview(secret: Secret, bbb_url: Url):
224
225
226
227
    """
    Get the meetings and print out an overview of the current bbb-usage
    """
    meetings = get_meetings(secret, bbb_url)
David Huss's avatar
David Huss committed
228

229
230
    if len(meetings) == 0:
        print("There are no meetings running now.")
David Huss's avatar
David Huss committed
231
232
        exit()

233
234
235
236
237
238
239
240
    n_running = len(meetings)
    n_recording = len([m for m in meetings if m["recording"] == "true"])
    n_participants = sum([int(m["participantCount"]) for m in meetings])
    n_listeners = sum([int(m["listenerCount"]) for m in meetings])
    n_voice = sum([int(m["voiceParticipantCount"]) for m in meetings])
    n_video = sum([int(m["videoCount"]) for m in meetings])
    n_moderator = sum([int(m["moderatorCount"]) for m in meetings])

David Huss's avatar
David Huss committed
241
242
243
244
245
246
247
248
249
250
251
252
    print("MEETINGS on {}:".format(bbb_url))
    print("   ├─── {:>4} running".format(n_running))
    print("   └─── {:>4} recording".format(n_recording))
    print()
    print("PARTICIPANTS across all {} rooms".format(n_running))
    print("   └─┬─ {:>4} total".format(n_participants))
    print("     ├─ {:>4} listening only".format(n_listeners))
    print("     ├─ {:>4} mic on".format(n_voice))
    print("     ├─ {:>4} video on".format(n_video))
    print("     └─ {:>4} moderators".format(n_moderator))

    print()
253
254
255
256
257
    print_leaderboard(meetings, "participantCount")
    print()
    print_leaderboard(meetings, "videoCount")
    print()
    print_leaderboard(meetings, "voiceParticipantCount")
David Huss's avatar
David Huss committed
258
    print()
259
    print_duration_leaderboard(meetings)
David Huss's avatar
David Huss committed
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305




def init_variables() -> Optional[Tuple[Secret, Url]]:
    """
    Read the config either from the servers bigbluebutton.properties-file or from
    the user config path. Display a message if neither of these files exist.
    """
    # Get OS dependend properties file
    user_config_path = appdirs.user_config_dir("bbbmon")
    user_config_path = "{}.properties".format(user_config_path)

    # Check if we are on the server and try to read that properties file first
    if os.path.isfile(SERVER_PROPERTIES_FILE):
        with open(SERVER_PROPERTIES_FILE, "r") as f:
            lines = [l for l in f.readlines()]
            secret = Secret([l for l in lines if l.startswith("securitySalt=")][0].replace("securitySalt=", "")).strip()
            bbb_url = Url([l for l in lines if l.startswith("bigbluebutton.web.serverURL=")][0].replace("bigbluebutton.web.serverURL=", "")).strip()
            bbb_url = "{}/bigbluebutton".format(bbb_url.rstrip('/'))
            return (secret, bbb_url)
    elif os.path.isfile(user_config_path):
        with open(user_config_path, "r") as f:
            lines = [l for l in f.readlines()]
            secret = Secret([l for l in lines if l.startswith("securitySalt=")][0].replace("securitySalt=", "")).strip()
            bbb_url = Url([l for l in lines if l.startswith("bigbluebutton.web.serverURL=")][0].replace("bigbluebutton.web.serverURL=", "")).strip()
            bbb_url = "{}/bigbluebutton".format(bbb_url.rstrip('/'))
            return (secret, bbb_url)
    else:
        print("ERROR: There was no config file found. Make sure it exists and is readable:")
        print("[0] {}".format(SERVER_PROPERTIES_FILE))
        print("[1] {}".format(user_config_path))
        print()
        print("For now the file just needs to contain two lines:")
        print("securitySalt=YOURSUPERSECRETSECRET")
        print("bigbluebutton.web.serverURL=https://bbb.example.com/")
        exit()



def main():
    secret, bbb_url = init_variables()
    print_overview(secret, bbb_url)

if __name__ == "__main__":
    main()