Commit 3543c4dd authored by Lukáš Lalinský's avatar Lukáš Lalinský

Re-work fingerprint searching and importing

parent 9ffdf6fa
Pipeline #20590 failed with stages
in 4 minutes and 11 seconds
......@@ -70,55 +70,63 @@ class FingerprintSearcher(object):
return sql.select(columns, f.c.score > self.min_score, src,
order_by=[f.c.score.desc(), f.c.id])
def _search_index(self, fp, length):
# type: (List[int], int) -> List[FingerprintMatch]
def _search_index(self, fp, length, index):
# type: (List[int], int, Index) -> Optional[sql.ClauseElement]
# index search
fp_query = self.db.execute(sql.select([sql.func.acoustid_extract_query(fp)])).scalar()
if not fp_query:
return []
with self.index_pool.connect() as index:
results = index.search(fp_query)
if not results:
return []
min_score = results[0].score * 0.1 # at least 10% of the top score
candidate_ids = [r.id for r in results if r.score > min_score]
if not candidate_ids:
return []
return None
results = index.search(fp_query)
if not results:
return None
min_score = results[0].score * 0.1 # at least 10% of the top score
candidate_ids = [r.id for r in results if r.score > min_score]
if not candidate_ids:
return None
# construct the query
condition = schema.fingerprint.c.id.in_(candidate_ids)
query = self._create_search_query(fp, length, condition)
# database scoring
matches = [FingerprintMatch(*i) for i in self.db.execute(query)]
return matches
return condition
def _search_database(self, fp, length, min_fp_id):
# type: (List[int], int, int) -> List[FingerprintMatch]
def _search_database(self, fp, length, min_fingerprint_id):
# type: (List[int], int, int) -> Optional[sql.ClauseElement]
# construct the query
condition = sql.func.acoustid_extract_query(schema.fingerprint.c.fingerprint).op('&&')(sql.func.acoustid_extract_query(fp))
if min_fp_id:
condition = sql.and_(condition, schema.fingerprint.c.id > min_fp_id)
query = self._create_search_query(fp, length, condition)
# database scoring
matches = [FingerprintMatch(*i) for i in self.db.execute(query)]
return matches
condition = sql.and_(
sql.func.acoustid_extract_query(schema.fingerprint.c.fingerprint).op('&&')(sql.func.acoustid_extract_query(fp)),
schema.fingerprint.c.id > min_fingerprint_id)
return condition
def _get_min_indexed_fp_id(self):
# type: () -> int
with self.index_pool.connect() as index:
return int(index.get_attribute('max_document_id') or '0')
def _get_max_indexed_fingerprint_id(self, index):
# type: (Index) -> int
return int(index.get_attribute('max_document_id') or '0')
def search(self, fp, length):
# type: (List[int], int) -> List[FingerprintMatch]
min_fp_id = 0 if self.fast else self._get_min_indexed_fp_id()
matches = None
try:
matches = self._search_index(fp, length)
except IndexClientError:
logger.exception("Index search error")
matches = None
if not self.fast and not matches:
matches = self._search_database(fp, length, min_fp_id)
return matches or []
conditions = []
with self.index_pool.connect() as index:
if not self.fast:
max_indexed_fingerprint_id = self._get_max_indexed_fingerprint_id(index)
try:
condition = self._search_index(fp, length, index)
if condition is not None:
conditions.append(condition)
except IndexClientError:
if not self.fast:
raise
logger.exception("Index search error")
if not self.fast:
condition = self._search_database(fp, length, max_indexed_fingerprint_id)
if condition is not None:
conditions.append(condition)
if not conditions:
return []
query = self._create_search_query(fp, length, sql.or_(*conditions))
matches = [FingerprintMatch(*i) for i in self.db.execute(query)]
return matches
def insert_fingerprint(fingerprint_db, ingest_db, data, submission_id=None, source_id=None):
......
......@@ -32,6 +32,10 @@ class Index(object):
# type: () -> None
raise NotImplementedError(self.begin)
def search(self, fingerprint):
# type: (List[int]) -> List[Result]
raise NotImplementedError(self.search)
def commit(self):
# type: () -> None
raise NotImplementedError(self.commit)
......
......@@ -8,36 +8,25 @@ import logging
import time
from acoustid.script import Script
from acoustid.data.submission import import_queued_submissions
from acoustid.data.fingerprint import update_fingerprint_index
logger = logging.getLogger(__file__)
def do_import(script, index_first=False, only_index=False):
# type: (Script, bool, bool) -> None
with script.context() as ctx:
fingerprint_db = ctx.db.get_fingerprint_db()
if index_first:
with ctx.index.connect() as index:
update_fingerprint_index(fingerprint_db, index)
if not only_index:
app_db = ctx.db.get_app_db()
def do_import(script):
# type: (Script) -> None
count = 1
while count > 0:
with script.context() as ctx:
ingest_db = ctx.db.get_ingest_db()
while True:
count = import_queued_submissions(ingest_db, app_db, fingerprint_db, ctx.index, limit=10)
if not count:
break
with ctx.index.connect() as index:
update_fingerprint_index(fingerprint_db, index)
ctx.db.session.commit()
app_db = ctx.db.get_app_db()
fingerprint_db = ctx.db.get_fingerprint_db()
count = import_queued_submissions(ingest_db, app_db, fingerprint_db, ctx.index, limit=10)
ctx.db.session.commit()
def run_import_on_master(script):
# type: (Script) -> None
logger.info('Importer running in master mode')
# first make sure the index is in sync with the database and
# import already queued submissions
do_import(script, index_first=True)
# listen for new submissins and import them as they come
channel = script.redis.pubsub()
channel.subscribe('channel.submissions')
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment