Commit 1e133b22 authored by Lukáš Lalinský's avatar Lukáš Lalinský

New way of running uWSGI

parent c850d7a1
Pipeline #20393 passed with stages
in 4 minutes and 9 seconds
......@@ -7,4 +7,4 @@ EXPOSE 3031
HEALTHCHECK --start-period=10s \
CMD curl -qf http://localhost:3031/_health_docker || exit 1
CMD ["/opt/acoustid/server/admin/run-api.sh"]
CMD ["/opt/acoustid/server/admin/docker/run-api.sh"]
......@@ -2,4 +2,4 @@ ARG IMAGE=quay.io/acoustid/acoustid-server
ARG VERSION=master
FROM ${IMAGE}:${VERSION}
CMD ["/opt/acoustid/server/admin/run-cron.sh"]
CMD ["/opt/acoustid/server/admin/docker/run-cron.sh"]
......@@ -7,4 +7,4 @@ EXPOSE 3032
HEALTHCHECK --start-period=10s \
CMD curl -qf http://localhost:3032/_health_docker || exit 1
CMD ["/opt/acoustid/server/admin/run-web.sh"]
CMD ["/opt/acoustid/server/admin/docker/run-web.sh"]
......@@ -64,9 +64,10 @@ def serialize_response(data, format, **kwargs):
def get_health_response(script, req, require_master=False):
from acoustid.uwsgi_utils import is_shutting_down
if require_master and script.config.cluster.role != 'master':
return Response('not the master server', content_type='text/plain', status=503)
if script.shutdown:
if is_shutting_down(script.config.website.shutdown_file_path):
return Response('shutdown in process', content_type='text/plain', status=503)
return Response('ok', content_type='text/plain', status=200)
......
import click
from acoustid.script import Script
from acoustid.uwsgi_utils import run_web_app, run_api_app
from acoustid.cron import run_cron
@click.group()
def cli():
pass
@cli.group()
def run():
pass
@run.command('web')
@click.option('-c', '--config', default='acoustid.conf', envvar='ACOUSTID_CONFIG')
@click.option('-w', '--workers', default=1, envvar='ACOUSTID_WEB_WORKERS')
def run_web_cmd(config, workers):
"""Run production uWSGI with the website."""
script = Script(config)
script.setup_console_logging()
script.setup_sentry()
run_web_app(script.config, workers=workers)
@run.command('api')
@click.option('-c', '--config', default='acoustid.conf', envvar='ACOUSTID_CONFIG')
@click.option('-w', '--workers', default=1, envvar='ACOUSTID_API_WORKERS')
def run_api_cmd(config, workers):
"""Run production uWSGI with the API."""
script = Script(config)
script.setup_console_logging()
script.setup_sentry()
run_api_app(script.config, workers=workers)
@run.command('cron')
@click.option('-c', '--config', default='acoustid.conf', envvar='ACOUSTID_CONFIG')
def run_cron_cmd(config):
"""Run cron."""
script = Script(config)
script.setup_console_logging()
script.setup_sentry()
run_cron(script, None, None)
def main():
cli()
......@@ -167,6 +167,7 @@ class WebSiteConfig(object):
self.google_oauth_client_secret = None
self.maintenance = False
self.shutdown_delay = 0
self.shutdown_file_path = '/tmp/acoustid-server-shutdown.txt'
def read(self, parser, section):
if parser.has_option(section, 'debug'):
......
......@@ -30,7 +30,7 @@ def create_schedule(script, opt, args):
return schedule
def main(script, opt, args):
def run_cron(script, opt, args):
schedule = create_schedule(script, opt, args)
while True:
schedule.run_pending()
......
......@@ -2,7 +2,6 @@
# Distributed under the MIT license, see the LICENSE file for details.
import sys
import time
import logging
import sqlalchemy
import sqlalchemy.pool
......@@ -24,7 +23,6 @@ class Script(object):
if config_path:
self.config.read(config_path)
self.config.read_env(tests=tests)
self.shutdown = False
if tests:
self.engine = sqlalchemy.create_engine(self.config.database.create_url(),
poolclass=sqlalchemy.pool.AssertionPool)
......@@ -41,12 +39,9 @@ class Script(object):
else:
self.redis = Redis(host=self.config.redis.host,
port=self.config.redis.port)
self._console_logging_configured = False
self.setup_logging()
def atexit(self):
self.shutdown = True
time.sleep(self.config.website.shutdown_delay)
def setup_logging(self):
for logger_name, level in sorted(self.config.logging.levels.items()):
logging.getLogger(logger_name).setLevel(level)
......@@ -55,13 +50,18 @@ class Script(object):
facility=self.config.logging.syslog_facility, log_pid=True)
handler.setFormatter(logging.Formatter('%(name)s: %(message)s'))
logging.getLogger().addHandler(handler)
else:
self.setup_console_logging()
def setup_console_logging(self, quiet=False):
if self._console_logging_configured:
return
handler = logging.StreamHandler()
handler.setFormatter(logging.Formatter('%(asctime)s [%(levelname)s] %(name)s - %(message)s', '%H:%M:%S'))
if quiet:
handler.setLevel(logging.ERROR)
logging.getLogger().addHandler(handler)
self._console_logging_configured = True
def setup_sentry(self):
sentry_sdk.init(self.config.sentry.script_dsn, release=GIT_RELEASE)
......
import six
import datetime
import logging
import signal
import io
import os
import sys
from typing import List, Callable
from acoustid.config import Config
if six.PY3:
import subprocess
else:
import subprocess32 as subprocess
logger = logging.getLogger(__name__)
def call_setpgrp():
os.setpgrp()
class ProcessWrapper(object):
def __init__(self, args, shutdown_handler=None, shutdown_delay=0.0):
# type: (List[six.text_type], Callable[[], None], float) -> None
self.name = args[0]
self.shutdown = False
self.shutdown_requested_at = None
self.shutdown_handler = shutdown_handler
self.shutdown_handler_called = False
self.shutdown_delay = datetime.timedelta(seconds=shutdown_delay)
self.stop_immediately = False
signal.signal(signal.SIGINT, self._handle_signal)
signal.signal(signal.SIGTERM, self._handle_signal)
logger.info('Starting %s', subprocess.list2cmdline(args))
self.process = subprocess.Popen(args, preexec_fn=call_setpgrp)
def _handle_signal(self, sig, frame):
logger.info('Received signal %s', sig)
if self.shutdown:
if not self.stop_immediately:
logger.info('Will stop uwsgi ASAP')
self.stop_immediately = True
else:
self.shutdown = True
self.shutdown_requested_at = datetime.datetime.now()
def wait(self):
# type: () -> int
while True:
try:
return self.process.wait(timeout=1.0)
except subprocess.TimeoutExpired:
pass
if not self.shutdown:
continue
if not self.shutdown_handler_called:
logger.info('Preparing to shut down, will stop uwsgi in %s seconds', self.shutdown_delay.total_seconds())
if self.shutdown_handler:
self.shutdown_handler()
self.shutdown_handler_called = True
assert self.shutdown_requested_at is not None
if self.stop_immediately or (datetime.datetime.now() > self.shutdown_requested_at + self.shutdown_delay):
logger.info('Stopping %s', self.name)
self.process.terminate()
def is_shutting_down(shutdown_file_path):
# type: (six.text_type) -> bool
return os.path.exists(shutdown_file_path)
def shutdown_handler(shutdown_file_path):
# type: (six.text_type) -> None
if not is_shutting_down(shutdown_file_path):
with io.open(shutdown_file_path, 'wt', encoding='utf8') as fp:
fp.write(u'shutdown')
def cleanup_shutdown_file(shutdown_file_path):
# type: (six.text_type) -> None
try:
os.remove(shutdown_file_path)
except OSError as e:
if e.errno != 2:
raise
def run_uwsgi(config, args):
# type: (Config, List[six.text_type]) -> int
cleanup_shutdown_file(config.website.shutdown_file_path)
try:
wrapper = ProcessWrapper(
args,
shutdown_handler=lambda: shutdown_handler(config.website.shutdown_file_path),
shutdown_delay=config.website.shutdown_delay)
return wrapper.wait()
finally:
cleanup_shutdown_file(config.website.shutdown_file_path)
def common_uwsgi_args(config, workers=1):
# type: (Config, int) -> List[six.text_type]
args = [
"uwsgi",
"--die-on-term",
"--chmod-socket",
"--master",
"--disable-logging",
"--log-date",
"--buffer-size", "10240",
"--workers", six.text_type(workers),
"--offload-threads", "1",
"--harakiri", "60",
"--harakiri-verbose",
"--post-buffering", "1",
"--enable-threads",
"--need-app",
]
if 'PYTHONPATH' in os.environ:
args.extend(["--python-path", os.environ['PYTHONPATH']])
if hasattr(sys, 'real_prefix'):
args.extend(["--virtualenv", sys.prefix])
return args
def run_api_app(config, workers=1):
# type: (Config, int) -> int
args = common_uwsgi_args(config) + [
"--http-socket", "0.0.0.0:3031",
"--module", "acoustid.wsgi",
]
return run_uwsgi(config, args)
def run_web_app(config, workers=1):
# type: (Config, int) -> int
static_dir = os.path.join(os.path.dirname(__file__), 'web', 'static')
args = common_uwsgi_args(config) + [
"--http-socket", "0.0.0.0:3032",
"--module", "acoustid.web.app:make_application()",
"--static-map", "/static={}".format(static_dir),
"--static-map", "/favicon.ico={}".format(os.path.join(static_dir, 'favicon.ico')),
"--static-map", "/robots.txt={}".format(os.path.join(static_dir, 'robots.txt')),
]
return run_uwsgi(config, args)
......@@ -16,10 +16,6 @@ from acoustid.web.views.metadata import metadata_page
from acoustid.web.views.stats import stats_page
from acoustid.web.views.admin import admin_page
from acoustid._release import GIT_RELEASE
try:
import uwsgi
except ImportError:
uwsgi = None
def make_application(config_filename=None, tests=False):
......@@ -29,9 +25,6 @@ def make_application(config_filename=None, tests=False):
script = Script(config_filename, tests=tests)
script.setup_logging()
if uwsgi is not None:
uwsgi.atexit = script.atexit
config = script.config
app = Flask('acoustid.web')
......
......@@ -6,13 +6,7 @@
# uwsgi -w acoustid.wsgi --pythonpath ~/acoustid/ --env ACOUSTID_CONFIG=~/acoustid/acoustid.conf -M -L --socket 127.0.0.1:1717
import os
try:
import uwsgi
except ImportError:
uwsgi = None
from acoustid.server import make_application
from acoustid.server import make_application # noqa: E402
server, application = make_application(os.environ['ACOUSTID_CONFIG'])
if uwsgi is not None:
uwsgi.atexit = server.atexit
server, application = make_application(os.environ['ACOUSTID_CONFIG'])
#!/usr/bin/env bash
cd /opt/acoustid/server
export PYTHONPATH="$PWD"
exec /opt/acoustid/server.venv/bin/python /opt/acoustid/server/manage.py run api
#!/usr/bin/env bash
cd /opt/acoustid/server
export PYTHONPATH="$PWD"
exec /opt/acoustid/server.venv/bin/python /opt/acoustid/server/manage.py run cron
#!/usr/bin/env bash
cd /opt/acoustid/server
export PYTHONPATH="$PWD"
exec /opt/acoustid/server.venv/bin/python /opt/acoustid/server/manage.py run web
#!/usr/bin/env bash
exec /opt/acoustid/server.venv/bin/uwsgi \
--http-socket 0.0.0.0:3031 \
--chmod-socket \
--pidfile /tmp/uwsgi-acoustid-server-api.pid \
--master \
--disable-logging \
--log-date \
--buffer-size 10240 \
--workers 4 \
--harakiri 60 \
--harakiri-verbose \
--post-buffering 1 \
--enable-threads \
--need-app \
--virtualenv /opt/acoustid/server.venv \
--python-path /opt/acoustid/server \
--module acoustid.wsgi
#!/usr/bin/env bash
export PYTHONPATH="/opt/acoustid/server"
exec /opt/acoustid/server.venv/bin/python /opt/acoustid/server/scripts/cron.py -c "$ACOUSTID_CONFIG"
#!/bin/sh
set -e
echo >>/var/log/acoustid/mbslave.log
date >>/var/log/acoustid/mbslave.log
/home/acoustid/mbslave/mbslave-sync.py >>/var/log/acoustid/mbslave.log
#!/usr/bin/env bash
exec /opt/acoustid/server.venv/bin/uwsgi \
--http-socket 0.0.0.0:3032 \
--chmod-socket \
--pidfile /tmp/uwsgi-acoustid-server-web.pid \
--master \
--disable-logging \
--log-date \
--buffer-size 10240 \
--workers 4 \
--offload-threads 1 \
--harakiri 60 \
--harakiri-verbose \
--post-buffering 1 \
--enable-threads \
--need-app \
--static-map /static=/opt/acoustid/server/acoustid/web/static \
--static-map /favicon.ico=/opt/acoustid/server/acoustid/web/static/favicon.ico \
--static-map /robots.txt=/opt/acoustid/server/acoustid/web/static/robots.txt \
--virtualenv /opt/acoustid/server.venv \
--python-path /opt/acoustid/server \
--module 'acoustid.web.app:make_application()'
......@@ -11,7 +11,7 @@ services:
ports:
- "127.0.0.1:5432:5432"
volumes:
- ./ci/create_db.sql:/docker-entrypoint-initdb.d/10_create_db.sql
- ./admin/ci/create_db.sql:/docker-entrypoint-initdb.d/10_create_db.sql
environment:
POSTGRES_USER: acoustid
POSTGRES_PASSWORD: acoustid
......
#!/usr/bin/env python
from acoustid.cli import main
main()
......@@ -19,3 +19,5 @@ schedule
sentry-sdk[flask]
typing
six
subprocess32; python_version < "3.2"
click
......@@ -10,7 +10,7 @@ blinker==1.4
certifi==2019.3.9 # via requests, sentry-sdk
cffi==1.12.3 # via cryptography
chardet==3.0.4 # via requests
click==7.0 # via flask
click==7.0
contextlib2==0.5.5 # via mbdata
cryptography==2.6.1 # via pyopenssl
enum34==1.1.6 # via cryptography
......@@ -38,6 +38,7 @@ schedule==0.6.0
sentry-sdk[flask]==0.7.14
six==1.12.0
sqlalchemy==1.3.3
subprocess32==3.5.3 ; python_version < "3.2"
typing==3.6.6
urllib3==1.24.2 # via requests, sentry-sdk
uwsgi==2.0.18
......
#!/usr/bin/env python
# Copyright (C) 2012 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
from acoustid.script import run_script
from acoustid.scripts.cleanup_perf_stats import main
run_script(main)
#!/usr/bin/env python
# Copyright (C) 2019 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
from acoustid.script import run_script
from acoustid.cron import main
run_script(main)
#!/usr/bin/env python
# Copyright (C) 2012-2013 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
import json
import logging
import time
from contextlib import closing
from acoustid.script import run_script
from acoustid.data.submission import import_queued_submissions
from acoustid.data.fingerprint import update_fingerprint_index
logger = logging.getLogger(__file__)
def do_import(script, index_first=False, only_index=False):
with closing(script.engine.connect()) as db:
if index_first:
update_fingerprint_index(db, script.index)
if not only_index:
while True:
count = import_queued_submissions(db, script.index, limit=10)
if not count:
break
update_fingerprint_index(db, script.index)
def main_master(script, opts, args):
logger.info('Importer running in master mode')
# first make sure the index is in sync with the database and
# import already queued submissions
do_import(script, index_first=True)
# listen for new submissins and import them as they come
channel = script.redis.pubsub()
channel.subscribe('channel.submissions')
for message in channel.listen():
if message['type'] != 'message':
continue
try:
ids = json.loads(message['data'])
except Exception:
logger.exception('Invalid notification message: %r', message)
ids = []
logger.debug('Got notified about %s new submissions', len(ids))
do_import(script)
logger.debug('Waiting for the next event...')
def main_slave(script, opts, args):
logger.info('Importer running in slave mode, only updating the index')
# import new fingerprints to the index every 15 seconds
while True:
started = time.time()
do_import(script, index_first=True, only_index=True)
delay = 15 - time.time() + started
if delay > 0:
logger.debug('Waiting %d seconds...', delay)
time.sleep(delay)
def main(script, opts, args):
if script.config.cluster.role == 'master':
main_master(script, opts, args)
else:
main_slave(script, opts, args)
run_script(main)
#!/usr/bin/env python
# Copyright (C) 2011 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
from acoustid.script import run_script
from acoustid.scripts.merge_missing_mbids import main
run_script(main, master_only=True)
#!/usr/bin/env python
# Copyright (C) 2012 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
from acoustid.script import run_script
from acoustid.scripts.update_lookup_stats import main
run_script(main)
#!/usr/bin/env python
# Copyright (C) 2012 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
from acoustid.script import run_script
from acoustid.scripts.update_stats import main
run_script(main)
#!/usr/bin/env python
# Copyright (C) 2012 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
from acoustid.script import run_script
from acoustid.scripts.update_user_agent_stats import main
run_script(main)
......@@ -30,7 +30,7 @@ deps = flake8
commands = flake8 acoustid/ tests/
[flake8]
ignore = E128
ignore = E128,E121
max-line-length = 160
per-file-ignores =
tests/__init__.py:E501
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment