Commit 30537256 authored by Lukáš Lalinský's avatar Lukáš Lalinský

Fix track merging

parent 72f59958
Pipeline #20602 passed with stages
in 3 minutes and 33 seconds
......@@ -124,7 +124,7 @@ def import_submission(ingest_db, app_db, fingerprint_db, index_pool, submission)
if fingerprint['track_id'] in group and len(group) > 1:
fingerprint['track_id'] = min(group)
group.remove(fingerprint['track_id'])
merge_tracks(fingerprint_db, fingerprint['track_id'], list(group))
merge_tracks(fingerprint_db, ingest_db, fingerprint['track_id'], list(group))
break
if not fingerprint['track_id']:
......
......@@ -134,7 +134,7 @@ def merge_missing_mbids(fingerprint_db, ingest_db):
merge_mbids(fingerprint_db, ingest_db, new_mbid, old_mbids)
def _merge_tracks_gids(conn, name_with_id, target_id, source_ids):
def _merge_tracks_gids(fingerprint_db, ingest_db, name_with_id, target_id, source_ids):
name = name_with_id.replace('_id', '')
tab = schema.metadata.tables['track_%s' % name]
col = tab.columns[name_with_id]
......@@ -151,7 +151,7 @@ def _merge_tracks_gids(conn, name_with_id, target_id, source_ids):
if name == 'mbid':
columns.append(sql.func.every(schema.track_mbid.c.disabled).label('all_disabled'))
query = sql.select(columns, tab.c.track_id.in_(source_ids + [target_id]), group_by=col)
rows = conn.execute(query).fetchall()
rows = fingerprint_db.execute(query).fetchall()
to_delete = set()
to_update = []
for row in rows:
......@@ -161,38 +161,39 @@ def _merge_tracks_gids(conn, name_with_id, target_id, source_ids):
to_update.append((old_ids, row))
if old_ids:
update_stmt = tab_src.update().where(col_src.in_(old_ids))
conn.execute(update_stmt.values({col_src: row['id']}))
ingest_db.execute(update_stmt.values({col_src: row['id']}))
if name == 'mbid':
update_stmt = tab_chg.update().where(col_chg.in_(old_ids))
conn.execute(update_stmt.values({col_chg: row['id']}))
ingest_db.execute(update_stmt.values({col_chg: row['id']}))
if to_delete:
delete_stmt = tab.delete().where(tab.c.id.in_(to_delete))
conn.execute(delete_stmt)
fingerprint_db.execute(delete_stmt)
for old_ids, row in to_update:
update_stmt = tab.update().where(tab.c.id == row['id'])
if name == 'mbid':
conn.execute(update_stmt.values(submission_count=row['count'], track_id=target_id, disabled=row['all_disabled']))
fingerprint_db.execute(update_stmt.values(submission_count=row['count'], track_id=target_id, disabled=row['all_disabled']))
else:
conn.execute(update_stmt.values(submission_count=row['count'], track_id=target_id))
fingerprint_db.execute(update_stmt.values(submission_count=row['count'], track_id=target_id))
def merge_tracks(conn, target_id, source_ids):
def merge_tracks(fingerprint_db, ingest_db, target_id, source_ids):
# type: (FingerprintDB, IngestDB, int, List[int]) -> None
"""
Merge the specified tracks.
"""
logger.info("Merging tracks %s into %s", ', '.join(map(str, source_ids)), target_id)
_merge_tracks_gids(conn, 'mbid', target_id, source_ids)
_merge_tracks_gids(conn, 'puid', target_id, source_ids)
_merge_tracks_gids(conn, 'meta_id', target_id, source_ids)
_merge_tracks_gids(conn, 'foreignid_id', target_id, source_ids)
_merge_tracks_gids(fingerprint_db, ingest_db, 'mbid', target_id, source_ids)
_merge_tracks_gids(fingerprint_db, ingest_db, 'puid', target_id, source_ids)
_merge_tracks_gids(fingerprint_db, ingest_db, 'meta_id', target_id, source_ids)
_merge_tracks_gids(fingerprint_db, ingest_db, 'foreignid_id', target_id, source_ids)
# XXX don't move duplicate fingerprints
update_stmt = schema.fingerprint.update().where(
schema.fingerprint.c.track_id.in_(source_ids))
conn.execute(update_stmt.values(track_id=target_id))
fingerprint_db.execute(update_stmt.values(track_id=target_id))
update_stmt = schema.track.update().where(
sql.or_(schema.track.c.id.in_(source_ids),
schema.track.c.new_id.in_(source_ids)))
conn.execute(update_stmt.values(new_id=target_id))
fingerprint_db.execute(update_stmt.values(new_id=target_id))
def insert_track(conn):
......
......@@ -18,7 +18,7 @@ QUERIES = [
('fingerprint', 'track_mbid.all', 'SELECT count(*) FROM track_mbid'),
('fingerprint', 'mbid.all', 'SELECT count(DISTINCT mbid) FROM track_mbid'),
('fingerprint', 'track.all', 'SELECT count(*) FROM track'),
('ingest', 'submission.all', 'SELECT sum(submission_count) FROM account'),
('app', 'submission.all', 'SELECT sum(submission_count) FROM account'),
('ingest', 'submission.unhandled', 'SELECT count(*) FROM submission WHERE not handled'),
]
......
......@@ -179,7 +179,7 @@ INSERT INTO track_mbid_change (track_mbid_id, account_id, disabled) VALUES (5, 1
""", dict(fp1=TEST_1A_FP_RAW, len1=TEST_1A_LENGTH,
fp2=TEST_1B_FP_RAW, len2=TEST_1B_LENGTH))
merge_tracks(ctx.db.get_fingerprint_db(), 3, [1, 2, 4])
merge_tracks(ctx.db.get_fingerprint_db(), ctx.db.get_ingest_db(), 3, [1, 2, 4])
rows = ctx.db.get_fingerprint_db().execute("SELECT id, track_id FROM fingerprint ORDER BY id").fetchall()
assert_equals([(1, 3), (2, 3)], rows)
......@@ -216,7 +216,7 @@ TRUNCATE track_mbid CASCADE;
INSERT INTO track_mbid (track_id, mbid, submission_count, disabled) VALUES (1, '97edb73c-4dac-11e0-9096-0025225356f3', 9, true);
INSERT INTO track_mbid (track_id, mbid, submission_count) VALUES (2, '97edb73c-4dac-11e0-9096-0025225356f3', 11);
""")
merge_tracks(ctx.db.get_fingerprint_db(), 1, [2])
merge_tracks(ctx.db.get_fingerprint_db(), ctx.db.get_ingest_db(), 1, [2])
rows = ctx.db.get_fingerprint_db().execute("SELECT track_id, mbid, submission_count, disabled FROM track_mbid ORDER BY track_id, mbid").fetchall()
expected_rows = [
(1, UUID('97edb73c-4dac-11e0-9096-0025225356f3'), 20, False),
......@@ -232,7 +232,7 @@ TRUNCATE track_mbid CASCADE;
INSERT INTO track_mbid (track_id, mbid, submission_count) VALUES (1, '97edb73c-4dac-11e0-9096-0025225356f3', 9);
INSERT INTO track_mbid (track_id, mbid, submission_count, disabled) VALUES (2, '97edb73c-4dac-11e0-9096-0025225356f3', 11, true);
""")
merge_tracks(ctx.db.get_fingerprint_db(), 1, [2])
merge_tracks(ctx.db.get_fingerprint_db(), ctx.db.get_ingest_db(), 1, [2])
rows = ctx.db.get_fingerprint_db().execute("SELECT track_id, mbid, submission_count, disabled FROM track_mbid ORDER BY track_id, mbid").fetchall()
expected_rows = [
(1, UUID('97edb73c-4dac-11e0-9096-0025225356f3'), 20, False),
......@@ -248,7 +248,7 @@ TRUNCATE track_mbid CASCADE;
INSERT INTO track_mbid (track_id, mbid, submission_count, disabled) VALUES (1, '97edb73c-4dac-11e0-9096-0025225356f3', 9, true);
INSERT INTO track_mbid (track_id, mbid, submission_count, disabled) VALUES (2, '97edb73c-4dac-11e0-9096-0025225356f3', 11, true);
""")
merge_tracks(ctx.db.get_fingerprint_db(), 1, [2])
merge_tracks(ctx.db.get_fingerprint_db(), ctx.db.get_ingest_db(), 1, [2])
rows = ctx.db.get_fingerprint_db().execute("SELECT track_id, mbid, submission_count, disabled FROM track_mbid ORDER BY track_id, mbid").fetchall()
expected_rows = [
(1, UUID('97edb73c-4dac-11e0-9096-0025225356f3'), 20, True),
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment