"""Pylons environment configuration"""
import os
import logging
import rhodecode
from mako.lookup import TemplateLookup
from pylons.configuration import PylonsConfig
from pylons.error import handle_mako_error
# don't remove this import it does magic for celery
from rhodecode.lib import celerypylons
import rhodecode.lib.app_globals as app_globals
from rhodecode.config.routing import make_map
from rhodecode.lib import helpers
from rhodecode.lib.auth import set_available_permissions
from rhodecode.lib.utils import repo2db_mapper, make_ui, set_rhodecode_config,\
load_rcextensions
load_rcextensions, check_git_version
from rhodecode.lib.utils2 import engine_from_config, str2bool
from rhodecode.model import init_model
from rhodecode.model.scm import ScmModel
log = logging.getLogger(__name__)
def load_environment(global_conf, app_conf, initial=False):
"""
Configure the Pylons environment via the ``pylons.config``
object
config = PylonsConfig()
# Pylons paths
root = os.path.dirname(os.path.dirname(os.path.abspath(__file__)))
paths = dict(
root=root,
controllers=os.path.join(root, 'controllers'),
static_files=os.path.join(root, 'public'),
templates=[os.path.join(root, 'templates')]
)
# Initialize config with the basic options
config.init_app(global_conf, app_conf, package='rhodecode', paths=paths)
# store some globals into rhodecode
rhodecode.CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
rhodecode.CELERY_EAGER = str2bool(config['app_conf'].get('celery.always.eager'))
config['routes.map'] = make_map(config)
config['pylons.app_globals'] = app_globals.Globals(config)
config['pylons.h'] = helpers
rhodecode.CONFIG = config
load_rcextensions(root_path=config['here'])
# Setup cache object as early as possible
import pylons
pylons.cache._push_object(config['pylons.app_globals'].cache)
# Create the Mako TemplateLookup, with the default auto-escaping
config['pylons.app_globals'].mako_lookup = TemplateLookup(
directories=paths['templates'],
error_handler=handle_mako_error,
module_directory=os.path.join(app_conf['cache_dir'], 'templates'),
input_encoding='utf-8', default_filters=['escape'],
imports=['from webhelpers.html import escape'])
# sets the c attribute access when don't existing attribute are accessed
config['pylons.strict_tmpl_context'] = True
test = os.path.split(config['__file__'])[-1] == 'test.ini'
if test:
if os.environ.get('TEST_DB'):
# swap config if we pass enviroment variable
config['sqlalchemy.db1.url'] = os.environ.get('TEST_DB')
from rhodecode.lib.utils import create_test_env, create_test_index
from rhodecode.tests import TESTS_TMP_PATH
# set RC_NO_TMP_PATH=1 to disable re-creating the database and
# test repos
if not int(os.environ.get('RC_NO_TMP_PATH', 0)):
create_test_env(TESTS_TMP_PATH, config)
# set RC_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
if not int(os.environ.get('RC_WHOOSH_TEST_DISABLE', 0)):
create_test_index(TESTS_TMP_PATH, config, True)
#check git version
check_git_version()
# MULTIPLE DB configs
# Setup the SQLAlchemy database engine
sa_engine_db1 = engine_from_config(config, 'sqlalchemy.db1.')
init_model(sa_engine_db1)
repos_path = make_ui('db').configitems('paths')[0][1]
repo2db_mapper(ScmModel().repo_scan(repos_path),
remove_obsolete=False, install_git_hook=False)
set_available_permissions(config)
config['base_path'] = repos_path
set_rhodecode_config(config)
# CONFIGURATION OPTIONS HERE (note: all config options will override
# any Pylons config options)
# store config reference into our module to skip import magic of
# pylons
rhodecode.CONFIG.update(config)
return config
# -*- coding: utf-8 -*-
rhodecode.controllers.admin.settings
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
settings controller for rhodecode admin
:created_on: Jul 14, 2010
:author: marcink
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import traceback
import formencode
import pkg_resources
import platform
from sqlalchemy import func
from formencode import htmlfill
from pylons import request, session, tmpl_context as c, url, config
from pylons.controllers.util import abort, redirect
from pylons.i18n.translation import _
from rhodecode.lib import helpers as h
from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator, \
HasPermissionAnyDecorator, NotAnonymous
from rhodecode.lib.base import BaseController, render
from rhodecode.lib.celerylib import tasks, run_task
from rhodecode.lib.utils import repo2db_mapper, invalidate_cache, \
set_rhodecode_config, repo_name_slug
set_rhodecode_config, repo_name_slug, check_git_version
from rhodecode.model.db import RhodeCodeUi, Repository, RepoGroup, \
RhodeCodeSetting, PullRequest, PullRequestReviewers
from rhodecode.model.forms import UserForm, ApplicationSettingsForm, \
ApplicationUiSettingsForm, ApplicationVisualisationForm
from rhodecode.model.user import UserModel
from rhodecode.model.db import User
from rhodecode.model.notification import EmailNotificationModel
from rhodecode.model.meta import Session
from rhodecode.lib.utils2 import str2bool
class SettingsController(BaseController):
"""REST Controller styled on the Atom Publishing Protocol"""
# To properly map this controller, ensure your config/routing.py
# file has a resource setup:
# map.resource('setting', 'settings', controller='admin/settings',
# path_prefix='/admin', name_prefix='admin_')
@LoginRequired()
def __before__(self):
c.admin_user = session.get('admin_user')
c.admin_username = session.get('admin_username')
c.modules = sorted([(p.project_name, p.version)
for p in pkg_resources.working_set],
for p in pkg_resources.working_set]
+ [('git', check_git_version())],
key=lambda k: k[0].lower())
c.py_version = platform.python_version()
c.platform = platform.platform()
super(SettingsController, self).__before__()
@HasPermissionAllDecorator('hg.admin')
def index(self, format='html'):
"""GET /admin/settings: All items in the collection"""
# url('admin_settings')
defaults = RhodeCodeSetting.get_app_settings()
defaults.update(self._get_hg_ui_settings())
return htmlfill.render(
render('admin/settings/settings.html'),
defaults=defaults,
encoding="UTF-8",
force_defaults=False
def create(self):
"""POST /admin/settings: Create a new item"""
def new(self, format='html'):
"""GET /admin/settings/new: Form to create a new item"""
# url('admin_new_setting')
def update(self, setting_id):
"""PUT /admin/settings/setting_id: Update an existing item"""
# Forms posted to this method should contain a hidden field:
# <input type="hidden" name="_method" value="PUT" />
# Or using helpers:
# h.form(url('admin_setting', setting_id=ID),
# method='put')
# url('admin_setting', setting_id=ID)
if setting_id == 'mapping':
rm_obsolete = request.POST.get('destroy', False)
log.debug('Rescanning directories with destroy=%s' % rm_obsolete)
initial = ScmModel().repo_scan()
log.debug('invalidating all repositories')
for repo_name in initial.keys():
invalidate_cache('get_repo_cached_%s' % repo_name)
added, removed = repo2db_mapper(initial, rm_obsolete)
h.flash(_('Repositories successfully'
' rescanned added: %s,removed: %s') % (added, removed),
category='success')
if setting_id == 'whoosh':
repo_location = self._get_hg_ui_settings()['paths_root_path']
full_index = request.POST.get('full_index', False)
run_task(tasks.whoosh_index, repo_location, full_index)
h.flash(_('Whoosh reindex task scheduled'), category='success')
if setting_id == 'global':
application_form = ApplicationSettingsForm()()
try:
form_result = application_form.to_python(dict(request.POST))
except formencode.Invalid, errors:
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8"
sett1 = RhodeCodeSetting.get_by_name_or_create('title')
sett1.app_settings_value = form_result['rhodecode_title']
Session().add(sett1)
sett2 = RhodeCodeSetting.get_by_name_or_create('realm')
sett2.app_settings_value = form_result['rhodecode_realm']
Session().add(sett2)
sett3 = RhodeCodeSetting.get_by_name_or_create('ga_code')
sett3.app_settings_value = form_result['rhodecode_ga_code']
Session().add(sett3)
Session().commit()
h.flash(_('Updated application settings'), category='success')
except Exception:
log.error(traceback.format_exc())
h.flash(_('error occurred during updating '
'application settings'),
category='error')
if setting_id == 'visual':
application_form = ApplicationVisualisationForm()()
sett1 = RhodeCodeSetting.get_by_name_or_create('show_public_icon')
sett1.app_settings_value = \
form_result['rhodecode_show_public_icon']
sett2 = RhodeCodeSetting.get_by_name_or_create('show_private_icon')
sett2.app_settings_value = \
form_result['rhodecode_show_private_icon']
sett3 = RhodeCodeSetting.get_by_name_or_create('stylify_metatags')
sett3.app_settings_value = \
form_result['rhodecode_stylify_metatags']
h.flash(_('Updated visualisation settings'),
'visualisation settings'),
if setting_id == 'vcs':
application_form = ApplicationUiSettingsForm()()
# fix namespaces for hooks and extensions
_f = lambda s: s.replace('.', '_')
sett = RhodeCodeUi.get_by_key('push_ssl')
sett.ui_value = form_result['web_push_ssl']
Session().add(sett)
sett = RhodeCodeUi.get_by_key('/')
sett.ui_value = form_result['paths_root_path']
#HOOKS
sett = RhodeCodeUi.get_by_key(RhodeCodeUi.HOOK_UPDATE)
sett.ui_active = form_result[_f('hooks_%s' %
RhodeCodeUi.HOOK_UPDATE)]
sett = RhodeCodeUi.get_by_key(RhodeCodeUi.HOOK_REPO_SIZE)
RhodeCodeUi.HOOK_REPO_SIZE)]
sett = RhodeCodeUi.get_by_key(RhodeCodeUi.HOOK_PUSH)
RhodeCodeUi.HOOK_PUSH)]
sett = RhodeCodeUi.get_by_key(RhodeCodeUi.HOOK_PULL)
RhodeCodeUi.HOOK_PULL)]
## EXTENSIONS
sett = RhodeCodeUi.get_by_key('largefiles')
if not sett:
#make one if it's not there !
sett = RhodeCodeUi()
sett.ui_key = 'largefiles'
sett.ui_section = 'extensions'
sett.ui_active = form_result[_f('extensions_largefiles')]
sett = RhodeCodeUi.get_by_key('hgsubversion')
sett.ui_key = 'hgsubversion'
sett.ui_active = form_result[_f('extensions_hgsubversion')]
# sett = RhodeCodeUi.get_by_key('hggit')
# if not sett:
# #make one if it's not there !
# sett = RhodeCodeUi()
# sett.ui_key = 'hggit'
# sett.ui_section = 'extensions'
# sett.ui_active = form_result[_f('extensions_hggit')]
# Session().add(sett)
h.flash(_('Updated VCS settings'), category='success')
'application settings'), category='error')
if setting_id == 'hooks':
ui_key = request.POST.get('new_hook_ui_key')
ui_value = request.POST.get('new_hook_ui_value')
if ui_value and ui_key:
RhodeCodeUi.create_or_update_hook(ui_key, ui_value)
h.flash(_('Added new hook'),
# check for edits
update = False
_d = request.POST.dict_of_lists()
for k, v in zip(_d.get('hook_ui_key', []),
_d.get('hook_ui_value_new', [])):
RhodeCodeUi.create_or_update_hook(k, v)
update = True
if update:
h.flash(_('Updated hooks'), category='success')
h.flash(_('error occurred during hook creation'),
return redirect(url('admin_edit_setting', setting_id='hooks'))
if setting_id == 'email':
test_email = request.POST.get('test_email')
test_email_subj = 'RhodeCode TestEmail'
test_email_body = 'RhodeCode Email test'
test_email_html_body = EmailNotificationModel()\
.get_email_tmpl(EmailNotificationModel.TYPE_DEFAULT,
body=test_email_body)
recipients = [test_email] if [test_email] else None
run_task(tasks.send_email, recipients, test_email_subj,
test_email_body, test_email_html_body)
h.flash(_('Email task created'), category='success')
return redirect(url('admin_settings'))
def delete(self, setting_id):
"""DELETE /admin/settings/setting_id: Delete an existing item"""
# <input type="hidden" name="_method" value="DELETE" />
# method='delete')
hook_id = request.POST.get('hook_id')
RhodeCodeUi.delete(hook_id)
def show(self, setting_id, format='html'):
GET /admin/settings/setting_id: Show a specific item"""
def edit(self, setting_id, format='html'):
GET /admin/settings/setting_id/edit: Form to
edit an existing item"""
# url('admin_edit_setting', setting_id=ID)
c.hooks = RhodeCodeUi.get_builtin_hooks()
c.custom_hooks = RhodeCodeUi.get_custom_hooks()
render('admin/settings/hooks.html'),
defaults={},
@NotAnonymous()
def my_account(self):
GET /_admin/my_account Displays info about my account
# url('admin_settings_my_account')
c.user = User.get(self.rhodecode_user.user_id)
all_repos = Session().query(Repository)\
.filter(Repository.user_id == c.user.user_id)\
.order_by(func.lower(Repository.repo_name)).all()
c.user_repos = ScmModel().get_repos(all_repos)
if c.user.username == 'default':
h.flash(_("You can't edit this user since it's"
" crucial for entire application"), category='warning')
return redirect(url('users'))
defaults = c.user.get_dict()
c.form = htmlfill.render(
render('admin/users/user_edit_my_account_form.html'),
return render('admin/users/user_edit_my_account.html')
def my_account_update(self):
"""PUT /_admin/my_account_update: Update an existing item"""
# h.form(url('admin_settings_my_account_update'),
# url('admin_settings_my_account_update', id=ID)
uid = self.rhodecode_user.user_id
email = self.rhodecode_user.email
_form = UserForm(edit=True,
old_data={'user_id': uid, 'email': email})()
form_result = {}
form_result = _form.to_python(dict(request.POST))
UserModel().update_my_account(uid, form_result)
h.flash(_('Your account was updated successfully'),
encoding="UTF-8")
h.flash(_('error occurred during update of user %s') \
% form_result.get('username'), category='error')
return redirect(url('my_account'))
def my_account_my_repos(self):
.filter(Repository.user_id == self.rhodecode_user.user_id)\
.order_by(func.lower(Repository.repo_name))\
.all()
return render('admin/users/user_edit_my_account_repos.html')
def my_account_my_pullrequests(self):
c.my_pull_requests = PullRequest.query()\
@@ -291,384 +291,419 @@ def make_ui(read_from='file', path=None,
baseui = ui.ui()
# clean the baseui object
baseui._ocfg = config.config()
baseui._ucfg = config.config()
baseui._tcfg = config.config()
if read_from == 'file':
if not os.path.isfile(path):
log.debug('hgrc file is not present at %s skipping...' % path)
return False
log.debug('reading hgrc from %s' % path)
cfg = config.config()
cfg.read(path)
for section in ui_sections:
for k, v in cfg.items(section):
log.debug('settings ui from file[%s]%s:%s' % (section, k, v))
baseui.setconfig(section, k, v)
elif read_from == 'db':
sa = meta.Session()
ret = sa.query(RhodeCodeUi)\
.options(FromCache("sql_cache_short", "get_hg_ui_settings"))\
hg_ui = ret
for ui_ in hg_ui:
if ui_.ui_active:
log.debug('settings ui from db[%s]%s:%s', ui_.ui_section,
ui_.ui_key, ui_.ui_value)
baseui.setconfig(ui_.ui_section, ui_.ui_key, ui_.ui_value)
if ui_.ui_key == 'push_ssl':
# force set push_ssl requirement to False, rhodecode
# handles that
baseui.setconfig(ui_.ui_section, ui_.ui_key, False)
if clear_session:
meta.Session.remove()
return baseui
def set_rhodecode_config(config):
Updates pylons config with new settings from database
:param config:
hgsettings = RhodeCodeSetting.get_app_settings()
for k, v in hgsettings.items():
config[k] = v
def invalidate_cache(cache_key, *args):
Puts cache invalidation task into db for
further global cache invalidation
if cache_key.startswith('get_repo_cached_'):
name = cache_key.split('get_repo_cached_')[-1]
ScmModel().mark_for_invalidation(name)
def map_groups(path):
Given a full path to a repository, create all nested groups that this
repo is inside. This function creates parent-child relationships between
groups and creates default perms for all new groups.
:param paths: full path to repository
groups = path.split(Repository.url_sep())
parent = None
group = None
# last element is repo in nested groups structure
groups = groups[:-1]
rgm = ReposGroupModel(sa)
for lvl, group_name in enumerate(groups):
group_name = '/'.join(groups[:lvl] + [group_name])
group = RepoGroup.get_by_group_name(group_name)
desc = '%s group' % group_name
# skip folders that are now removed repos
if REMOVED_REPO_PAT.match(group_name):
break
if group is None:
log.debug('creating group level: %s group_name: %s' % (lvl,
group_name))
group = RepoGroup(group_name, parent)
group.group_description = desc
sa.add(group)
rgm._create_default_perms(group)
sa.flush()
parent = group
return group
def repo2db_mapper(initial_repo_list, remove_obsolete=False,
install_git_hook=False):
maps all repos given in initial_repo_list, non existing repositories
are created, if remove_obsolete is True it also check for db entries
that are not in initial_repo_list and removes them.
:param initial_repo_list: list of repositories found by scanning methods
:param remove_obsolete: check for obsolete entries in database
:param install_git_hook: if this is True, also check and install githook
for a repo if missing
from rhodecode.model.repo import RepoModel
rm = RepoModel()
user = sa.query(User).filter(User.admin == True).first()
if user is None:
raise Exception('Missing administrative account !')
added = []
# # clear cache keys
# log.debug("Clearing cache keys now...")
# CacheInvalidation.clear_cache()
# sa.commit()
for name, repo in initial_repo_list.items():
group = map_groups(name)
db_repo = rm.get_by_repo_name(name)
# found repo that is on filesystem not in RhodeCode database
if not db_repo:
log.info('repository %s not found creating now' % name)
added.append(name)
desc = (repo.description
if repo.description != 'unknown'
else '%s repository' % name)
new_repo = rm.create_repo(
repo_name=name,
repo_type=repo.alias,
description=desc,
repos_group=getattr(group, 'group_id', None),
owner=user,
just_db=True
# we added that repo just now, and make sure it has githook
# installed
if new_repo.repo_type == 'git':
ScmModel().install_git_hook(new_repo.scm_instance)
elif install_git_hook:
if db_repo.repo_type == 'git':
ScmModel().install_git_hook(db_repo.scm_instance)
# during starting install all cache keys for all repositories in the
# system, this will register all repos and multiple instances
key, _prefix, _org_key = CacheInvalidation._get_key(name)
log.debug("Creating cache key for %s instance_id:`%s`" % (name, _prefix))
CacheInvalidation._get_or_create_key(key, _prefix, _org_key, commit=False)
sa.commit()
removed = []
if remove_obsolete:
# remove from database those repositories that are not in the filesystem
for repo in sa.query(Repository).all():
if repo.repo_name not in initial_repo_list.keys():
log.debug("Removing non existing repository found in db `%s`" %
repo.repo_name)
sa.delete(repo)
removed.append(repo.repo_name)
except:
#don't hold further removals on error
sa.rollback()
return added, removed
# set cache regions for beaker so celery can utilise it
def add_cache(settings):
cache_settings = {'regions': None}
for key in settings.keys():
for prefix in ['beaker.cache.', 'cache.']:
if key.startswith(prefix):
name = key.split(prefix)[1].strip()
cache_settings[name] = settings[key].strip()
if cache_settings['regions']:
for region in cache_settings['regions'].split(','):
region = region.strip()
region_settings = {}
for key, value in cache_settings.items():
if key.startswith(region):
region_settings[key.split('.')[1]] = value
region_settings['expire'] = int(region_settings.get('expire',
60))
region_settings.setdefault('lock_dir',
cache_settings.get('lock_dir'))
region_settings.setdefault('data_dir',
cache_settings.get('data_dir'))
if 'type' not in region_settings:
region_settings['type'] = cache_settings.get('type',
'memory')
beaker.cache.cache_regions[region] = region_settings
def load_rcextensions(root_path):
from rhodecode.config import conf
path = os.path.join(root_path, 'rcextensions', '__init__.py')
if os.path.isfile(path):
rcext = create_module('rc', path)
EXT = rhodecode.EXTENSIONS = rcext
log.debug('Found rcextensions now loading %s...' % rcext)
# Additional mappings that are not present in the pygments lexers
conf.LANGUAGES_EXTENSIONS_MAP.update(getattr(EXT, 'EXTRA_MAPPINGS', {}))
#OVERRIDE OUR EXTENSIONS FROM RC-EXTENSIONS (if present)
if getattr(EXT, 'INDEX_EXTENSIONS', []) != []:
log.debug('settings custom INDEX_EXTENSIONS')
conf.INDEX_EXTENSIONS = getattr(EXT, 'INDEX_EXTENSIONS', [])
#ADDITIONAL MAPPINGS
log.debug('adding extra into INDEX_EXTENSIONS')
conf.INDEX_EXTENSIONS.extend(getattr(EXT, 'EXTRA_INDEX_EXTENSIONS', []))
#==============================================================================
# TEST FUNCTIONS AND CREATORS
def create_test_index(repo_location, config, full_index):
Makes default test index
:param config: test config
:param full_index:
from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
from rhodecode.lib.pidlock import DaemonLock, LockHeld
repo_location = repo_location
index_location = os.path.join(config['app_conf']['index_dir'])
if not os.path.exists(index_location):
os.makedirs(index_location)
l = DaemonLock(file_=jn(dn(index_location), 'make_index.lock'))
WhooshIndexingDaemon(index_location=index_location,
repo_location=repo_location)\
.run(full_index=full_index)
l.release()
except LockHeld:
pass
def create_test_env(repos_test_path, config):
Makes a fresh database and
install test repository into tmp dir
from rhodecode.lib.db_manage import DbManage
from rhodecode.tests import HG_REPO, GIT_REPO, TESTS_TMP_PATH
# PART ONE create db
dbconf = config['sqlalchemy.db1.url']
log.debug('making test db %s' % dbconf)
# create test dir if it doesn't exist
if not os.path.isdir(repos_test_path):
log.debug('Creating testdir %s' % repos_test_path)
os.makedirs(repos_test_path)
dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
tests=True)
dbmanage.create_tables(override=True)
dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
dbmanage.create_default_user()
dbmanage.admin_prompt()
dbmanage.create_permissions()
dbmanage.populate_default_permissions()
# PART TWO make test repo
log.debug('making test vcs repositories')
idx_path = config['app_conf']['index_dir']
data_path = config['app_conf']['cache_dir']
#clean index and data
if idx_path and os.path.exists(idx_path):
log.debug('remove %s' % idx_path)
shutil.rmtree(idx_path)
if data_path and os.path.exists(data_path):
log.debug('remove %s' % data_path)
shutil.rmtree(data_path)
#CREATE DEFAULT TEST REPOS
cur_dir = dn(dn(abspath(__file__)))
tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
tar.close()
tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_git.tar.gz"))
tar.extractall(jn(TESTS_TMP_PATH, GIT_REPO))
#LOAD VCS test stuff
from rhodecode.tests.vcs import setup_package
setup_package()
# PASTER COMMANDS
class BasePasterCommand(Command):
Abstract Base Class for paster commands.
The celery commands are somewhat aggressive about loading
celery.conf, and since our module sets the `CELERY_LOADER`
environment variable to our loader, we have to bootstrap a bit and
make sure we've had a chance to load the pylons config off of the
command line, otherwise everything fails.
min_args = 1
min_args_error = "Please provide a paster config file as an argument."
takes_config_file = 1
requires_config_file = True
def notify_msg(self, msg, log=False):
"""Make a notification to user, additionally if logger is passed
it logs this action using given logger
:param msg: message that will be printed to user
:param log: logging instance, to use to additionally log this message
if log and isinstance(log, logging):
log(msg)
def run(self, args):
Overrides Command.run
Checks for a config file argument and loads it.
if len(args) < self.min_args:
raise BadCommand(
self.min_args_error % {'min_args': self.min_args,
'actual_args': len(args)})
# Decrement because we're going to lob off the first argument.
# @@ This is hacky
self.min_args -= 1
self.bootstrap_config(args[0])
self.update_parser()
return super(BasePasterCommand, self).run(args[1:])
def update_parser(self):
Abstract method. Allows for the class's parser to be updated
before the superclass's `run` method is called. Necessary to
allow options/arguments to be passed through to the underlying
celery command.
raise NotImplementedError("Abstract Method.")
def bootstrap_config(self, conf):
Loads the pylons configuration.
from pylons import config as pylonsconfig
self.path_to_ini_file = os.path.realpath(conf)
conf = paste.deploy.appconfig('config:' + self.path_to_ini_file)
pylonsconfig.init_app(conf.global_conf, conf.local_conf)
def check_git_version():
Checks what version of git is installed in system, and issues a warning
if it's to old for RhodeCode to properly work.
import subprocess
from distutils.version import StrictVersion
from rhodecode import BACKENDS
p = subprocess.Popen('git --version', shell=True,
stdout=subprocess.PIPE, stderr=subprocess.PIPE)
stdout, stderr = p.communicate()
ver = (stdout.split(' ')[-1] or '').strip() or '0.0.0'
_ver = StrictVersion(ver)
_ver = StrictVersion('0.0.0')
stderr = traceback.format_exc()
req_ver = '1.7.4'
to_old_git = False
if _ver <= StrictVersion(req_ver):
to_old_git = True
if 'git' in BACKENDS:
log.debug('GIT version detected: %s' % stdout)
if stderr:
log.warning('Unable to detect git version org error was:%r' % stderr)
elif to_old_git:
log.warning('RhodeCode detected git version %s, which is to old '
'for the system to function properly make sure '
'it is at least in version %s' % (ver, req_ver))
return _ver
\ No newline at end of file
Status change: