Commit 98c4d7eb authored by Lukáš Lalinský's avatar Lukáš Lalinský

Concept of master/slave DB configs

parent dbeeca4f
...@@ -27,10 +27,10 @@ test: ...@@ -27,10 +27,10 @@ test:
POSTGRES_PASSWORD: acoustid POSTGRES_PASSWORD: acoustid
POSTGRES_DB: acoustid_test POSTGRES_DB: acoustid_test
ACOUSTID_TEST_DB_TWO_PHASE_COMMIT: 0 ACOUSTID_TEST_DB_TWO_PHASE_COMMIT: 0
ACOUSTID_TEST_DB_DEFAULT_HOST: postgres ACOUSTID_TEST_DB_MAIN_HOST: postgres
ACOUSTID_TEST_DB_DEFAULT_NAME: acoustid_test ACOUSTID_TEST_DB_MAIN_NAME: acoustid_test
ACOUSTID_TEST_DB_SLOW_HOST: postgres ACOUSTID_TEST_DB_IMPORT_HOST: postgres
ACOUSTID_TEST_DB_SLOW_NAME: acoustid_test ACOUSTID_TEST_DB_IMPORT_NAME: acoustid_test
ACOUSTID_TEST_REDIS_HOST: redis ACOUSTID_TEST_REDIS_HOST: redis
ACOUSTID_TEST_INDEX_HOST: acoustid-index ACOUSTID_TEST_INDEX_HOST: acoustid-index
cache: cache:
......
[database] [database]
two_phase_commit=yes two_phase_commit=yes
[database:default] [database:main]
host=127.0.0.1 host=127.0.0.1
port=5432 port=5432
user=acoustid user=acoustid
name=acoustid_test name=acoustid_test
password=acoustid password=acoustid
[database:slow] [database:import]
host=127.0.0.1 host=127.0.0.1
port=5432 port=5432
user=acoustid user=acoustid
name=acoustid_slow_test name=acoustid_import_test
password=acoustid password=acoustid
[logging] [logging]
......
[database] [databases]
two_phase_commit=false
[database:main]
user=acoustid
password=acoustid
name=acoustid name=acoustid
host=localhost
port=5432
[database:import]
user=acoustid user=acoustid
password=XXX password=acoustid
name=acoustid_import
host=localhost host=localhost
port=5432 port=5432
[database_slow] [database:main_master]
user=acoustid
password=acoustid
name=acoustid name=acoustid
host=localhost
port=5432
[database:import_master]
user=acoustid user=acoustid
password=XXX password=acoustid
name=acoustid_import
host=localhost host=localhost
port=5432 port=5432
......
...@@ -44,30 +44,36 @@ class BaseConfig(object): ...@@ -44,30 +44,36 @@ class BaseConfig(object):
class DatabasesConfig(BaseConfig): class DatabasesConfig(BaseConfig):
def __init__(self): def __init__(self):
self.databases = { self.modes = {'local', 'replica'}
'default': DatabaseConfig(), self.names = {'main', 'import'}
'slow': DatabaseConfig(), self.databases = {}
} for mode in self.modes:
self.databases[mode] = {}
for name in self.name:
self.databases[mode][name] = DatabaseConfig()
self.use_two_phase_commit = False self.use_two_phase_commit = False
def create_engines(self, **kwargs): def create_engines(self, **kwargs):
mode = kwargs.pop('mode', 'local')
engines = {} engines = {}
for name, db_config in self.databases.items(): for name, db_config in self.databases[mode].items():
engines[name] = db_config.create_engine(**kwargs) engines[name] = db_config.create_engine(**kwargs)
return engines return engines
def read_section(self, parser, section): def read_section(self, parser, section):
if parser.has_option(section, 'two_phase_commit'): if parser.has_option(section, 'two_phase_commit'):
self.use_two_phase_commit = parser.getboolean(section, 'two_phase_commit') self.use_two_phase_commit = parser.getboolean(section, 'two_phase_commit')
for name, sub_config in self.databases.items(): for mode, db_configs in self.databases.items():
sub_section = '{}:{}'.format(section, name) for name, sub_config in db_configs.items():
sub_config.read_section(parser, sub_section) sub_section = '{}:{}{}'.format(section, name, '_' + mode if mode != 'local' else '')
sub_config.read_section(parser, sub_section)
def read_env(self, prefix): def read_env(self, prefix):
read_env_item(self, 'use_two_phase_commit', prefix + 'TWO_PHASE_COMMIT', convert=str_to_bool) read_env_item(self, 'use_two_phase_commit', prefix + 'TWO_PHASE_COMMIT', convert=str_to_bool)
for name, sub_config in self.databases.items(): for mode, db_configs in self.databases.items():
sub_prefix = prefix + name.upper() + '_' for name, sub_config in db_configs.items():
sub_config.read_env(sub_prefix) sub_prefix = prefix + name.upper() + '_' + (mode.upper() + '_' if mode != 'local' else '')
sub_config.read_env(sub_prefix)
class DatabaseConfig(BaseConfig): class DatabaseConfig(BaseConfig):
......
...@@ -8,10 +8,10 @@ Session = sessionmaker() ...@@ -8,10 +8,10 @@ Session = sessionmaker()
def get_bind_args(engines): def get_bind_args(engines):
binds = {} binds = {}
for table in metadata.sorted_tables: for table in metadata.sorted_tables:
bind_key = table.info.get('bind_key', 'default') bind_key = table.info.get('bind_key', 'main')
if bind_key != 'default': if bind_key != 'main':
binds[table] = engines[bind_key] binds[table] = engines[bind_key]
return {'bind': engines['default'], 'binds': binds} return {'bind': engines['main'], 'binds': binds}
def get_session_args(script): def get_session_args(script):
......
...@@ -60,7 +60,7 @@ class Script(object): ...@@ -60,7 +60,7 @@ class Script(object):
@property @property
def engine(self): def engine(self):
return self.db_engines['default'] return self.db_engines['main']
def setup_logging(self): def setup_logging(self):
for logger_name, level in sorted(self.config.logging.levels.items()): for logger_name, level in sorted(self.config.logging.levels.items()):
......
...@@ -148,7 +148,7 @@ submission = Table('submission', metadata, ...@@ -148,7 +148,7 @@ submission = Table('submission', metadata,
Column('disc_no', Integer), Column('disc_no', Integer),
Column('year', Integer), Column('year', Integer),
info={'bind_key': 'slow'}, info={'bind_key': 'import'},
) )
submission_result = Table('submission_result', metadata, submission_result = Table('submission_result', metadata,
...@@ -166,7 +166,7 @@ submission_result = Table('submission_result', metadata, ...@@ -166,7 +166,7 @@ submission_result = Table('submission_result', metadata,
Column('puid', UUID), Column('puid', UUID),
Column('foreignid', String), Column('foreignid', String),
info={'bind_key': 'slow'}, info={'bind_key': 'import'},
) )
stats = Table('stats', metadata, stats = Table('stats', metadata,
......
CREATE DATABASE "acoustid"; CREATE DATABASE "acoustid";
CREATE DATABASE "acoustid_test"; CREATE DATABASE "acoustid_test";
CREATE DATABASE "acoustid_slow"; CREATE DATABASE "acoustid_import";
CREATE DATABASE "acoustid_slow_test"; CREATE DATABASE "acoustid_import_test";
\c acoustid \c acoustid
create extension intarray; create extension intarray;
...@@ -16,13 +16,13 @@ create extension pgcrypto; ...@@ -16,13 +16,13 @@ create extension pgcrypto;
create extension acoustid; create extension acoustid;
create extension cube; create extension cube;
\c acoustid_slow \c acoustid_import
create extension intarray; create extension intarray;
create extension pgcrypto; create extension pgcrypto;
create extension acoustid; create extension acoustid;
create extension cube; create extension cube;
\c acoustid_slow_test \c acoustid_import_test
create extension intarray; create extension intarray;
create extension pgcrypto; create extension pgcrypto;
create extension acoustid; create extension acoustid;
......
...@@ -29,7 +29,7 @@ script_location = alembic ...@@ -29,7 +29,7 @@ script_location = alembic
# are written from script.py.mako # are written from script.py.mako
# output_encoding = utf-8 # output_encoding = utf-8
databases = default, slow databases = main, import
# Logging configuration # Logging configuration
[loggers] [loggers]
......
...@@ -27,13 +27,13 @@ def include_object(db_name): ...@@ -27,13 +27,13 @@ def include_object(db_name):
if obj_type == "table": if obj_type == "table":
if obj.schema == "musicbrainz": if obj.schema == "musicbrainz":
return False return False
bind_key = obj.info.get('bind_key', 'default') bind_key = obj.info.get('bind_key', 'main')
if bind_key != db_name: if bind_key != db_name:
return False return False
if obj_type == "column": if obj_type == "column":
if obj.table.schema == "musicbrainz": if obj.table.schema == "musicbrainz":
return False return False
bind_key = obj.table.info.get('bind_key', 'default') bind_key = obj.table.info.get('bind_key', 'main')
if bind_key != db_name: if bind_key != db_name:
return False return False
return True return True
......
...@@ -24,7 +24,7 @@ def downgrade(engine_name): ...@@ -24,7 +24,7 @@ def downgrade(engine_name):
globals()["downgrade_%s" % engine_name]() globals()["downgrade_%s" % engine_name]()
def upgrade_default(): def upgrade_main():
op.create_foreign_key(op.f('account_google_fk_account_id'), 'account_google', 'account', ['account_id'], ['id']) op.create_foreign_key(op.f('account_google_fk_account_id'), 'account_google', 'account', ['account_id'], ['id'])
op.create_foreign_key(op.f('application_fk_account_id'), 'application', 'account', ['account_id'], ['id']) op.create_foreign_key(op.f('application_fk_account_id'), 'application', 'account', ['account_id'], ['id'])
op.create_foreign_key(op.f('stats_lookups_fk_application_id'), 'stats_lookups', 'application', ['application_id'], ['id']) op.create_foreign_key(op.f('stats_lookups_fk_application_id'), 'stats_lookups', 'application', ['application_id'], ['id'])
...@@ -54,7 +54,7 @@ def upgrade_default(): ...@@ -54,7 +54,7 @@ def upgrade_default():
op.alter_column('recording_acoustid', 'created', nullable=False) op.alter_column('recording_acoustid', 'created', nullable=False)
def downgrade_default(): def downgrade_main():
op.alter_column('recording_acoustid', 'created', nullable=True) op.alter_column('recording_acoustid', 'created', nullable=True)
op.alter_column('track_foreignid_source', 'created', nullable=True) op.alter_column('track_foreignid_source', 'created', nullable=True)
op.alter_column('track_foreignid', 'created', nullable=True) op.alter_column('track_foreignid', 'created', nullable=True)
...@@ -84,9 +84,9 @@ def downgrade_default(): ...@@ -84,9 +84,9 @@ def downgrade_default():
op.drop_constraint(op.f('account_google_fk_account_id'), 'account_google', type_='foreignkey') op.drop_constraint(op.f('account_google_fk_account_id'), 'account_google', type_='foreignkey')
def upgrade_slow(): def upgrade_import():
pass pass
def downgrade_slow(): def downgrade_import():
pass pass
...@@ -24,18 +24,18 @@ def downgrade(engine_name): ...@@ -24,18 +24,18 @@ def downgrade(engine_name):
globals()["downgrade_%s" % engine_name]() globals()["downgrade_%s" % engine_name]()
def upgrade_default(): def upgrade_main():
pass pass
def downgrade_default(): def downgrade_main():
pass pass
def upgrade_slow(): def upgrade_import():
op.create_table('submission_result', op.create_table('submission_result',
sa.Column('submission_id', sa.Integer(), autoincrement=False, nullable=False), sa.Column('submission_id', sa.Integer(), autoincrement=False, nullable=False),
sa.Column('created', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), sa.Column('created', sa.DateTime(timezone=True), server_main=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('account_id', sa.Integer(), nullable=False), sa.Column('account_id', sa.Integer(), nullable=False),
sa.Column('application_id', sa.Integer(), nullable=False), sa.Column('application_id', sa.Integer(), nullable=False),
sa.Column('application_version', sa.String(), nullable=True), sa.Column('application_version', sa.String(), nullable=True),
...@@ -49,5 +49,5 @@ def upgrade_slow(): ...@@ -49,5 +49,5 @@ def upgrade_slow():
) )
def downgrade_slow(): def downgrade_import():
op.drop_table('submission_result') op.drop_table('submission_result')
This diff is collapsed.
...@@ -24,7 +24,7 @@ def downgrade(engine_name): ...@@ -24,7 +24,7 @@ def downgrade(engine_name):
globals()["downgrade_%s" % engine_name]() globals()["downgrade_%s" % engine_name]()
def upgrade_default(): def upgrade_main():
op.drop_constraint(u'fingerprint_source_fk_submission_id', 'fingerprint_source', type_='foreignkey') op.drop_constraint(u'fingerprint_source_fk_submission_id', 'fingerprint_source', type_='foreignkey')
op.drop_constraint(u'track_foreignid_source_fk_submission_id', 'track_foreignid_source', type_='foreignkey') op.drop_constraint(u'track_foreignid_source_fk_submission_id', 'track_foreignid_source', type_='foreignkey')
op.drop_constraint(u'track_mbid_source_fk_submission_id', 'track_mbid_source', type_='foreignkey') op.drop_constraint(u'track_mbid_source_fk_submission_id', 'track_mbid_source', type_='foreignkey')
...@@ -32,7 +32,7 @@ def upgrade_default(): ...@@ -32,7 +32,7 @@ def upgrade_default():
op.drop_constraint(u'track_puid_source_fk_submission_id', 'track_puid_source', type_='foreignkey') op.drop_constraint(u'track_puid_source_fk_submission_id', 'track_puid_source', type_='foreignkey')
def downgrade_default(): def downgrade_main():
op.create_foreign_key(u'track_puid_source_fk_submission_id', 'track_puid_source', 'submission', ['submission_id'], ['id']) op.create_foreign_key(u'track_puid_source_fk_submission_id', 'track_puid_source', 'submission', ['submission_id'], ['id'])
op.create_foreign_key(u'track_meta_source_fk_submission_id', 'track_meta_source', 'submission', ['submission_id'], ['id']) op.create_foreign_key(u'track_meta_source_fk_submission_id', 'track_meta_source', 'submission', ['submission_id'], ['id'])
op.create_foreign_key(u'track_mbid_source_fk_submission_id', 'track_mbid_source', 'submission', ['submission_id'], ['id']) op.create_foreign_key(u'track_mbid_source_fk_submission_id', 'track_mbid_source', 'submission', ['submission_id'], ['id'])
...@@ -40,9 +40,9 @@ def downgrade_default(): ...@@ -40,9 +40,9 @@ def downgrade_default():
op.create_foreign_key(u'fingerprint_source_fk_submission_id', 'fingerprint_source', 'submission', ['submission_id'], ['id']) op.create_foreign_key(u'fingerprint_source_fk_submission_id', 'fingerprint_source', 'submission', ['submission_id'], ['id'])
def upgrade_slow(): def upgrade_import():
pass pass
def downgrade_slow(): def downgrade_import():
pass pass
...@@ -28,20 +28,20 @@ def downgrade(engine_name): ...@@ -28,20 +28,20 @@ def downgrade(engine_name):
globals()["downgrade_%s" % engine_name]() globals()["downgrade_%s" % engine_name]()
def upgrade_default(): def upgrade_main():
op.rename_table('submission', 'submission_old') op.rename_table('submission', 'submission_old')
def downgrade_default(): def downgrade_main():
op.rename_table('submission_old', 'submission') op.rename_table('submission_old', 'submission')
def upgrade_slow(): def upgrade_import():
op.execute(CreateSequence(Sequence('submission_id_seq'))) op.execute(CreateSequence(Sequence('submission_id_seq')))
op.create_table('submission', op.create_table('submission',
sa.Column('id', sa.Integer(), server_default=sa.text("nextval('submission_id_seq')"), nullable=False), sa.Column('id', sa.Integer(), server_main=sa.text("nextval('submission_id_seq')"), nullable=False),
sa.Column('created', sa.DateTime(timezone=True), server_default=sa.text('CURRENT_TIMESTAMP'), nullable=False), sa.Column('created', sa.DateTime(timezone=True), server_main=sa.text('CURRENT_TIMESTAMP'), nullable=False),
sa.Column('handled', sa.Boolean(), server_default=sa.text('false'), nullable=True), sa.Column('handled', sa.Boolean(), server_main=sa.text('false'), nullable=True),
sa.Column('account_id', sa.Integer(), nullable=False), sa.Column('account_id', sa.Integer(), nullable=False),
sa.Column('application_id', sa.Integer(), nullable=False), sa.Column('application_id', sa.Integer(), nullable=False),
sa.Column('application_version', sa.String(), nullable=True), sa.Column('application_version', sa.String(), nullable=True),
...@@ -84,6 +84,6 @@ def upgrade_slow(): ...@@ -84,6 +84,6 @@ def upgrade_slow():
range_from = range_to range_from = range_to
def downgrade_slow(): def downgrade_import():
op.drop_table('submission') op.drop_table('submission')
op.execute(DropSequence(Sequence('submission_id_seq'))) op.execute(DropSequence(Sequence('submission_id_seq')))
...@@ -24,19 +24,19 @@ def downgrade(engine_name): ...@@ -24,19 +24,19 @@ def downgrade(engine_name):
globals()["downgrade_%s" % engine_name]() globals()["downgrade_%s" % engine_name]()
def upgrade_default(): def upgrade_main():
op.add_column('account', op.add_column('account',
sa.Column('is_admin', sa.Boolean(), server_default=sa.text('false'), nullable=False) sa.Column('is_admin', sa.Boolean(), server_main=sa.text('false'), nullable=False)
) )
def downgrade_default(): def downgrade_main():
op.drop_column('account', 'is_admin') op.drop_column('account', 'is_admin')
def upgrade_slow(): def upgrade_import():
pass pass
def downgrade_slow(): def downgrade_import():
pass pass
...@@ -8,7 +8,7 @@ import os ...@@ -8,7 +8,7 @@ import os
def main(script, opts, args): def main(script, opts, args):
os.execlp('psql', 'psql', *(script.config.databases.databases['default'].create_psql_args() + args)) os.execlp('psql', 'psql', *(script.config.databases.databases['main'].create_psql_args() + args))
run_script(main) run_script(main)
...@@ -12,7 +12,7 @@ RECREATE_DB = True ...@@ -12,7 +12,7 @@ RECREATE_DB = True
def get_tables_for_bind(metadata, bind_key): def get_tables_for_bind(metadata, bind_key):
tables = [] tables = []
for table in metadata.sorted_tables: for table in metadata.sorted_tables:
if table.info.get('bind_key', 'default') == bind_key: if table.info.get('bind_key', 'main') == bind_key:
tables.append(table) tables.append(table)
return tables return tables
......
Markdown is supported
0% or
You are about to add 0 people to the discussion. Proceed with caution.
Finish editing this message first!
Please register or to comment