Changeset - e1f233b069b0
[Not reviewed]
default
0 3 0
Mads Kiilerich (mads) - 6 years ago 2020-06-19 17:24:36
mads@kiilerich.com
db: refactor to clarify that we always invoke SA create_all with checkfirst=False

create_all(checkfirst=True) would skip creating tables that already exist,
potentially leaving them with wrong schema.

We are strict and want a successful create_all to leave all the tables in a
fully known state. We thus want it to fail if any tables already are present.
Existing tables should not silently be accepted.
3 files changed with 4 insertions and 5 deletions:
0 comments (0 inline, 0 general)
kallithea/bin/kallithea_cli_db.py
Show inline comments
 
@@ -36,45 +36,45 @@ def db_create(user, password, email, rep
 
    values you provide.
 

	
 
    You can pass the answers to all questions as options to this command.
 
    """
 
    dbconf = kallithea.CONFIG['sqlalchemy.url']
 

	
 
    # force_ask should be True (yes), False (no), or None (ask)
 
    if force_yes:
 
        force_ask = True
 
    elif force_no:
 
        force_ask = False
 
    else:
 
        force_ask = None
 

	
 
    cli_args = dict(
 
            username=user,
 
            password=password,
 
            email=email,
 
            repos_location=repos,
 
            force_ask=force_ask,
 
            public_access=public_access,
 
    )
 
    dbmanage = DbManage(dbconf=dbconf, root=kallithea.CONFIG['here'],
 
                        tests=False, cli_args=cli_args)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.create_tables()
 
    repo_root_path = dbmanage.prompt_repo_root_path(None)
 
    dbmanage.create_settings(repo_root_path)
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session().commit()
 

	
 
    # initial repository scan
 
    kallithea.config.application.make_app(
 
            kallithea.CONFIG.global_conf, **kallithea.CONFIG.local_conf)
 
    added, _ = kallithea.lib.utils.repo2db_mapper(kallithea.model.scm.ScmModel().repo_scan())
 
    if added:
 
        click.echo('Initial repository scan: added following repositories:')
 
        click.echo('\t%s' % '\n\t'.join(added))
 
    else:
 
        click.echo('Initial repository scan: no repositories found.')
 

	
 
    click.echo('Database set up successfully.')
 
    click.echo("Don't forget to build the front-end using 'kallithea-cli front-end-build'.")
kallithea/lib/db_manage.py
Show inline comments
 
@@ -51,88 +51,87 @@ class DbManage(object):
 
        self.dbname = dbconf.split('/')[-1]
 
        self.tests = tests
 
        self.root = root
 
        self.dburi = dbconf
 
        self.cli_args = cli_args or {}
 
        self.init_db(SESSION=SESSION)
 

	
 
    def _ask_ok(self, msg):
 
        """Invoke ask_ok unless the force_ask option provides the answer"""
 
        force_ask = self.cli_args.get('force_ask')
 
        if force_ask is not None:
 
            return force_ask
 
        from kallithea.lib.utils2 import ask_ok
 
        return ask_ok(msg)
 

	
 
    def init_db(self, SESSION=None):
 
        if SESSION:
 
            self.sa = SESSION
 
        else:
 
            # init new sessions
 
            engine = create_engine(self.dburi)
 
            init_model(engine)
 
            self.sa = Session()
 

	
 
    def create_tables(self, override=False):
 
    def create_tables(self):
 
        """
 
        Create a auth database
 
        """
 

	
 
        log.info("Any existing database is going to be destroyed")
 
        if self.tests:
 
            destroy = True
 
        else:
 
            destroy = self._ask_ok('Are you sure to destroy old database ? [y/n]')
 
        if not destroy:
 
            print('Nothing done.')
 
            sys.exit(0)
 
        if destroy:
 
            # drop and re-create old schemas
 

	
 
            url = sqlalchemy.engine.url.make_url(self.dburi)
 
            database = url.database
 

	
 
            # Some databases enforce foreign key constraints and Base.metadata.drop_all() doesn't work
 
            if url.drivername == 'mysql':
 
                url.database = None  # don't connect to the database (it might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.execute('DROP DATABASE IF EXISTS `%s`' % database)
 
                    conn.execute('CREATE DATABASE `%s` CHARACTER SET utf8mb4 COLLATE utf8mb4_unicode_ci' % database)
 
            elif url.drivername == 'postgresql':
 
                from psycopg2.extensions import ISOLATION_LEVEL_AUTOCOMMIT
 
                url.database = 'postgres'  # connect to the system database (as the real one might not exist)
 
                engine = sqlalchemy.create_engine(url)
 
                with engine.connect() as conn:
 
                    conn.connection.set_isolation_level(ISOLATION_LEVEL_AUTOCOMMIT)
 
                    conn.execute('DROP DATABASE IF EXISTS "%s"' % database)
 
                    conn.execute('CREATE DATABASE "%s"' % database)
 
            else:
 
                # known to work on SQLite - possibly not on other databases with strong referential integrity
 
                Base.metadata.drop_all()
 

	
 
        checkfirst = not override
 
        Base.metadata.create_all(checkfirst=checkfirst)
 
        Base.metadata.create_all(checkfirst=False)
 

	
 
        # Create an Alembic configuration and generate the version table,
 
        # "stamping" it with the most recent Alembic migration revision, to
 
        # tell Alembic that all the schema upgrades are already in effect.
 
        alembic_cfg = alembic.config.Config()
 
        alembic_cfg.set_main_option('script_location', 'kallithea:alembic')
 
        alembic_cfg.set_main_option('sqlalchemy.url', self.dburi)
 
        # This command will give an error in an Alembic multi-head scenario,
 
        # but in practice, such a scenario should not come up during database
 
        # creation, even during development.
 
        alembic.command.stamp(alembic_cfg, 'head')
 

	
 
        log.info('Created tables for %s', self.dbname)
 

	
 
    def admin_prompt(self, second=False):
 
        if not self.tests:
 
            import getpass
 

	
 
            username = self.cli_args.get('username')
 
            password = self.cli_args.get('password')
 
            email = self.cli_args.get('email')
 

	
 
            def get_password():
 
                password = getpass.getpass('Specify admin password '
kallithea/tests/fixture.py
Show inline comments
 
@@ -345,49 +345,49 @@ class Fixture(object):
 
        return pull_request.pull_request_id
 

	
 

	
 
#==============================================================================
 
# Global test environment setup
 
#==============================================================================
 

	
 
def create_test_env(repos_test_path, config):
 
    """
 
    Makes a fresh database and
 
    install test repository into tmp dir
 
    """
 

	
 
    # PART ONE create db
 
    dbconf = config['sqlalchemy.url']
 
    log.debug('making test db %s', dbconf)
 

	
 
    # create test dir if it doesn't exist
 
    if not os.path.isdir(repos_test_path):
 
        log.debug('Creating testdir %s', repos_test_path)
 
        os.makedirs(repos_test_path)
 

	
 
    dbmanage = DbManage(dbconf=dbconf, root=config['here'],
 
                        tests=True)
 
    dbmanage.create_tables(override=True)
 
    dbmanage.create_tables()
 
    # for tests dynamically set new root paths based on generated content
 
    dbmanage.create_settings(dbmanage.prompt_repo_root_path(repos_test_path))
 
    dbmanage.create_default_user()
 
    dbmanage.admin_prompt()
 
    dbmanage.create_permissions()
 
    dbmanage.populate_default_permissions()
 
    Session().commit()
 
    # PART TWO make test repo
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['index_dir']
 
    data_path = config['cache_dir']
 

	
 
    # clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    # CREATE DEFAULT TEST REPOS
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
0 comments (0 inline, 0 general)