utils.py 4.57 KB
Newer Older
1
# Copyright (C) 2011 Lukas Lalinsky
2
# Distributed under the MIT license, see the LICENSE file for details.
3

4
import re
5
import syslog
Lukáš Lalinský's avatar
Lukáš Lalinský committed
6 7
import hashlib
import time
8 9 10
import datetime
import hmac
import base64
11
import six
Lukáš Lalinský's avatar
Lukáš Lalinský committed
12
from acoustid.const import MAX_FOREIGNID_NAMESPACE_LENGTH, MAX_FOREIGNID_VALUE_LENGTH
13 14
from six.moves.urllib.request import urlopen
from six.moves.urllib.parse import urlencode
15
from logging import Handler
16 17


Lukáš Lalinský's avatar
Lukáš Lalinský committed
18 19 20 21
def generate_api_key(length=10):
    return re.sub('[/+=]', '', hashlib.sha1(str(time.time())).digest().encode('base64').strip())[:length]


22 23 24 25 26 27 28 29 30 31 32 33 34 35 36 37 38 39 40 41
def generate_demo_client_api_key(secret, day=None):
    if day is None:
        day = datetime.date.today()
    key = '{}:demo-client-api-key'.format(secret)
    message = '{:x}'.format(day.toordinal())
    api_key = list(base64.urlsafe_b64encode(hmac.new(key, message).digest()[:8]).rstrip('='))
    api_key[-2] = 'A'
    return ''.join(api_key)


def check_demo_client_api_key(secret, api_key, max_age=7):
    if len(api_key) == 11 and api_key[-2] == 'A':
        day = datetime.date.today()
        for i in range(max_age):
            if api_key == generate_demo_client_api_key(secret, day):
                return True
            day += datetime.timedelta(days=1)
    return False


42 43 44 45 46 47 48
def is_uuid(s):
    """
    Check whether the given string is a valid UUID
    """
    return bool(re.match(r'^[0-9a-fA-F]{8}(-[0-9a-fA-F]{4}){3}-[0-9a-fA-F]{12}$', s))


49 50 51 52 53 54 55 56
def is_int(s):
    try:
        int(s)
    except ValueError:
        return False
    return True


57
def is_foreignid(s):
Lukáš Lalinský's avatar
Lukáš Lalinský committed
58 59 60 61 62 63 64 65 66
    match = re.match(r'^([0-9a-z]+):(.+)$', s)
    if match is None:
        return False
    namespace, value = match.groups()
    if len(namespace) > MAX_FOREIGNID_NAMESPACE_LENGTH:
        return False
    if len(value) > MAX_FOREIGNID_VALUE_LENGTH:
        return False
    return True
67 68


69 70 71 72 73 74 75 76 77 78 79 80 81 82 83
def singular(plural):
    """
    Take a plural English word and turn it into singular

    Obviously, this doesn't work in general. It know just enough words to
    generate XML tag names for list items. For example, if we have an element
    called 'tracks' in the response, it will be serialized as a list without
    named items in JSON, but we need names for items in XML, so those will be
    called 'track'.
    """
    if plural.endswith('ies'):
        return plural[:-3] + 'y'
    if plural.endswith('s'):
        return plural[:-1]
    raise ValueError('unknown plural form %r' % (plural,))
84 85 86


def provider(value):
87 88 89
    """
    Returns a function that returns the given value.
    """
90 91 92 93
    def func():
        return value
    return func

94 95 96 97 98 99

class LocalSysLogHandler(Handler):
    """
    Logging handler that logs to the local syslog using the syslog module
    """

100
    facility_names = {
Lukáš Lalinský's avatar
Lukáš Lalinský committed
101 102 103 104 105 106 107 108 109 110 111 112 113 114 115 116 117 118
        "auth": syslog.LOG_AUTH,
        "cron": syslog.LOG_CRON,
        "daemon": syslog.LOG_DAEMON,
        "kern": syslog.LOG_KERN,
        "lpr": syslog.LOG_LPR,
        "mail": syslog.LOG_MAIL,
        "news": syslog.LOG_NEWS,
        "syslog": syslog.LOG_SYSLOG,
        "user": syslog.LOG_USER,
        "uucp": syslog.LOG_UUCP,
        "local0": syslog.LOG_LOCAL0,
        "local1": syslog.LOG_LOCAL1,
        "local2": syslog.LOG_LOCAL2,
        "local3": syslog.LOG_LOCAL3,
        "local4": syslog.LOG_LOCAL4,
        "local5": syslog.LOG_LOCAL5,
        "local6": syslog.LOG_LOCAL6,
        "local7": syslog.LOG_LOCAL7,
119
    }
120 121 122 123 124 125 126 127 128 129 130

    priority_map = {
        "DEBUG": syslog.LOG_DEBUG,
        "INFO": syslog.LOG_INFO,
        "WARNING": syslog.LOG_WARNING,
        "ERROR": syslog.LOG_ERR,
        "CRITICAL": syslog.LOG_CRIT
    }

    def __init__(self, ident=None, facility=syslog.LOG_USER, log_pid=False):
        Handler.__init__(self)
131
        self.facility = facility
132
        if isinstance(facility, six.string_types):
133
            self.facility = self.facility_names[facility]
134 135 136
        options = 0
        if log_pid:
            options |= syslog.LOG_PID
137
        syslog.openlog(ident, options, self.facility)
138 139 140 141 142 143 144 145 146
        self.formatter = None

    def close(self):
        Handler.close(self)
        syslog.closelog()

    def emit(self, record):
        try:
            msg = self.format(record)
147
            if isinstance(msg, six.text_type):
148 149
                msg = msg.encode('utf-8')
            priority = self.priority_map[record.levelname]
150 151
            for m in msg.splitlines():
                syslog.syslog(self.facility | priority, m)
152 153 154
        except StandardError:
            self.handleError(record)

Lukáš Lalinský's avatar
Lukáš Lalinský committed
155 156 157 158 159

def call_internal_api(config, func, **kwargs):
    url = config.cluster.base_master_url.rstrip('/') + '/v2/internal/' + func
    data = dict(kwargs)
    data['secret'] = config.cluster.secret
160
    urlopen(url, urlencode(data))