Commit a5a269ad authored by Lukáš Lalinský's avatar Lukáš Lalinský

Import submissions immediately after they are added

parent 8ea7a7cf
......@@ -166,3 +166,20 @@ def inc_fingerprint_submission_count(conn, id, submission_id=None, source_id=Non
conn.execute(insert_stmt)
return True
def update_fingerprint_index(db, index, limit=1000):
with closing(index.connect()) as index:
max_id = int(index.get_attribute('max_document_id') or '0')
query = sql.select([
schema.fingerprint.c.id,
sql.func.acoustid_extract_query(schema.fingerprint.c.fingerprint),
]).where(scalar.fingerprint.c.id > max_id).\
order_by(schema.fingerprint.c.id).limit(limit)
for id, fingerprint in db.execute(query):
if not index.in_transaction:
index.begin()
logger.debug("Adding fingerprint %s to index %s", id, index)
index.insert(id, fingerprint)
if index.in_transaction:
index.commit()
......@@ -107,11 +107,12 @@ def import_submission(conn, submission, index=None):
return fingerprint
def import_queued_submissions(conn, limit=50, index=None, ids=None):
def import_queued_submissions(conn, index=None, limit=100, ids=None):
"""
Import the given submission into the main fingerprint database
"""
query = schema.submission.select(schema.submission.c.handled == False).order_by(schema.submission.c.id.desc())
query = schema.submission.select(schema.submission.c.handled == False).\
order_by(schema.submission.c.id.desc())
if ids is not None:
query = query.whhere(schema.submission.c.id.in_(ids))
if limit is not None:
......
......@@ -38,6 +38,10 @@ class IndexClient(object):
self._buffer = ''
self._connect()
def __repr__(self):
return '<%s(%s, %s) instance at %s>' % (self.__class__.__name__,
self.host, self.port, hex(id(self)))
def __del__(self):
if self.sock is not None:
logger.warn('Deleted without being explicitly closed')
......
#!/usr/bin/env python
# Copyright (C) 2011 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
from acoustid.script import run_script
from acoustid.data.submission import import_queued_submissions
def main(script, opts, args):
conn = script.engine.connect()
import_queued_submissions(conn, limit=300, index=script.index)
run_script(main)
......@@ -7,18 +7,33 @@ import json
import logging
from acoustid.script import run_script
from acoustid.data.submission import import_queued_submissions
from acoustid.data.fingerprint import update_fingerprint_index
logger = logging.getLogger(__file__)
def do_import(script, index_first=False):
with closing(script.engine.connect()) as db:
if index_first:
update_fingerprint_index(db, script.index)
while True:
count = import_queued_submissions(db, script.index)
if not count:
break
update_fingerprint_index(db, script.index)
def main(script, opts, args):
# first make sure the index is in sync with the database and
# import already queued submissions
do_import(script, index_first=True)
# listen for new submissins and import them as they come
channel = script.redis.pubsub()
channel.subscribe('channel.submissions')
for message in channel.listen():
ids = json.loads(message['data'])
logger.debug('Got notified about %s new submissions', len(ids))
#conn = script.engine.connect()
#import_queued_submissions(conn, limit=300, index=script.index)
do_import(script)
run_script(main)
......
......@@ -3,29 +3,13 @@
# Copyright (C) 2011-2012 Lukas Lalinsky
# Distributed under the MIT license, see the LICENSE file for details.
import re
from acoustid.indexclient import IndexClient
from acoustid.script import run_script
from acoustid.data.fingerprint import update_fingerprint_index
def main(script, opts, args):
conn = script.engine.connect()
with conn.begin():
idx = script.index.connect()
max_id = int(idx.get_attribute('max_document_id') or '0')
rows = conn.execute("""
SELECT id, acoustid_extract_query(fingerprint) AS fingerprint
FROM fingerprint WHERE id > %s ORDER BY id LIMIT 10000
""", (max_id,))
has_rows = False
for row in rows:
if not has_rows:
idx.begin()
has_rows = True
idx.insert(row['id'], row['fingerprint'])
if has_rows:
idx.commit()
idx.close()
with closing(script.engine.connect()) as db:
update_fingerprint_index(db, script.index)
run_script(main)
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment