Commit 92b319a6 authored by Lukáš Lalinský's avatar Lukáš Lalinský

Re-add import_submissions

parent f7f5b815
Pipeline #20395 passed with stages
in 2 minutes and 36 seconds
......@@ -2,4 +2,4 @@ ARG IMAGE=quay.io/acoustid/acoustid-server
ARG VERSION=master
FROM ${IMAGE}:${VERSION}
CMD ["/opt/acoustid/server/admin/run-import.sh"]
CMD ["/opt/acoustid/server/admin/docker/run-import.sh"]
......@@ -2,6 +2,7 @@ import click
from acoustid.script import Script
from acoustid.uwsgi_utils import run_web_app, run_api_app
from acoustid.cron import run_cron
from acoustid.scripts.import_submissions import run_import
@click.group()
......@@ -43,7 +44,17 @@ def run_cron_cmd(config):
script = Script(config)
script.setup_console_logging()
script.setup_sentry()
run_cron(script, None, None)
run_cron(script)
@run.command('import')
@click.option('-c', '--config', default='acoustid.conf', envvar='ACOUSTID_CONFIG')
def run_import_cmd(config):
"""Run import."""
script = Script(config)
script.setup_console_logging()
script.setup_sentry()
run_import(script)
def main():
......
......@@ -13,11 +13,11 @@ from acoustid.scripts.merge_missing_mbids import main as merge_missing_mbids_mai
logger = logging.getLogger(__name__)
def create_schedule(script, opt, args):
def create_schedule(script):
def wrap_job(func):
logger.info('Running %s', func.__name__)
func(script, opt, args)
func(script, None, None)
schedule = Scheduler()
# hourly jobs
......@@ -30,8 +30,8 @@ def create_schedule(script, opt, args):
return schedule
def run_cron(script, opt, args):
schedule = create_schedule(script, opt, args)
def run_cron(script):
schedule = create_schedule(script)
while True:
schedule.run_pending()
time.sleep(schedule.idle_seconds)
#!/usr/bin/env python
# Copyright (C) 2012-2013 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
import json
import logging
import time
from contextlib import closing
from acoustid.data.submission import import_queued_submissions
from acoustid.data.fingerprint import update_fingerprint_index
logger = logging.getLogger(__file__)
def do_import(script, index_first=False, only_index=False):
with closing(script.engine.connect()) as db:
if index_first:
update_fingerprint_index(db, script.index)
if not only_index:
while True:
count = import_queued_submissions(db, script.index, limit=10)
if not count:
break
update_fingerprint_index(db, script.index)
def run_import_on_master(script):
logger.info('Importer running in master mode')
# first make sure the index is in sync with the database and
# import already queued submissions
do_import(script, index_first=True)
# listen for new submissins and import them as they come
channel = script.redis.pubsub()
channel.subscribe('channel.submissions')
for message in channel.listen():
if message['type'] != 'message':
continue
try:
ids = json.loads(message['data'])
except Exception:
logger.exception('Invalid notification message: %r', message)
ids = []
logger.debug('Got notified about %s new submissions', len(ids))
do_import(script)
logger.debug('Waiting for the next event...')
def run_import_on_slave(script):
logger.info('Importer running in slave mode, only updating the index')
# import new fingerprints to the index every 15 seconds
while True:
started = time.time()
do_import(script, index_first=True, only_index=True)
delay = 15 - time.time() + started
if delay > 0:
logger.debug('Waiting %d seconds...', delay)
time.sleep(delay)
def run_import(script):
if script.config.cluster.role == 'master':
run_import_on_master(script)
else:
run_import_on_slave(script)
......@@ -129,7 +129,7 @@ def common_uwsgi_args(config, workers=1):
def run_api_app(config, workers=1):
# type: (Config, int) -> int
args = common_uwsgi_args(config) + [
args = common_uwsgi_args(config, workers=workers) + [
"--http-socket", "0.0.0.0:3031",
"--module", "acoustid.wsgi",
]
......@@ -139,7 +139,7 @@ def run_api_app(config, workers=1):
def run_web_app(config, workers=1):
# type: (Config, int) -> int
static_dir = os.path.join(os.path.dirname(__file__), 'web', 'static')
args = common_uwsgi_args(config) + [
args = common_uwsgi_args(config, workers=workers) + [
"--http-socket", "0.0.0.0:3032",
"--module", "acoustid.web.app:make_application()",
"--static-map", "/static={}".format(static_dir),
......
#!/usr/bin/env bash
cd /opt/acoustid/server
export PYTHONPATH="$PWD"
exec /opt/acoustid/server.venv/bin/python /opt/acoustid/server/manage.py run import
#!/usr/bin/env bash
export PYTHONPATH="/opt/acoustid/server"
exec /opt/acoustid/server.venv/bin/python /opt/acoustid/server/scripts/import_submissions.py -c "$ACOUSTID_CONFIG"
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment