@@ -253,24 +253,28 @@ def make_map(config):
#EXTRAS USER ROUTES
m.connect("users_group_perm", "/users_groups_perm/{id}",
action="update_perm", conditions=dict(method=["PUT"]))
#ADMIN GROUP REST ROUTES
rmap.resource('group', 'groups',
controller='admin/groups', path_prefix=ADMIN_PREFIX)
#ADMIN PERMISSIONS REST ROUTES
rmap.resource('permission', 'permissions',
controller='admin/permissions', path_prefix=ADMIN_PREFIX)
#ADMIN DEFAULTS REST ROUTES
rmap.resource('default', 'defaults',
controller='admin/defaults', path_prefix=ADMIN_PREFIX)
##ADMIN LDAP SETTINGS
rmap.connect('ldap_settings', '%s/ldap' % ADMIN_PREFIX,
controller='admin/ldap_settings', action='ldap_settings',
conditions=dict(method=["POST"]))
rmap.connect('ldap_home', '%s/ldap' % ADMIN_PREFIX,
controller='admin/ldap_settings')
#ADMIN SETTINGS REST ROUTES
with rmap.submapper(path_prefix=ADMIN_PREFIX,
controller='admin/settings') as m:
m.connect("admin_settings", "/settings",
new file 100644
# -*- coding: utf-8 -*-
"""
rhodecode.controllers.admin.defaults
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
default settings controller for Rhodecode
:created_on: Apr 27, 2010
:author: marcink
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import traceback
import formencode
from formencode import htmlfill
from pylons import request, session, tmpl_context as c, url
from pylons.controllers.util import abort, redirect
from pylons.i18n.translation import _
from rhodecode.lib import helpers as h
from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator
from rhodecode.lib.base import BaseController, render
from rhodecode.model.forms import DefaultsForm
from rhodecode.model.meta import Session
from rhodecode import BACKENDS
from rhodecode.model.db import RhodeCodeSetting
log = logging.getLogger(__name__)
class DefaultsController(BaseController):
"""REST Controller styled on the Atom Publishing Protocol"""
# To properly map this controller, ensure your config/routing.py
# file has a resource setup:
# map.resource('default', 'defaults')
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def __before__(self):
super(DefaultsController, self).__before__()
def index(self, format='html'):
"""GET /defaults: All items in the collection"""
# url('defaults')
c.backends = BACKENDS.keys()
defaults = RhodeCodeSetting.get_default_repo_settings()
return htmlfill.render(
render('admin/defaults/defaults.html'),
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
def create(self):
"""POST /defaults: Create a new item"""
def new(self, format='html'):
"""GET /defaults/new: Form to create a new item"""
# url('new_default')
def update(self, id):
"""PUT /defaults/id: Update an existing item"""
# Forms posted to this method should contain a hidden field:
# <input type="hidden" name="_method" value="PUT" />
# Or using helpers:
# h.form(url('default', id=ID),
# method='put')
# url('default', id=ID)
_form = DefaultsForm()()
try:
form_result = _form.to_python(dict(request.POST))
for k, v in form_result.iteritems():
setting = RhodeCodeSetting.get_by_name_or_create(k)
setting.app_settings_value = v
Session().add(setting)
Session().commit()
h.flash(_('Default settings updated successfully'),
category='success')
except formencode.Invalid, errors:
defaults = errors.value
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8")
except Exception:
log.error(traceback.format_exc())
h.flash(_('error occurred during update of defaults'),
category='error')
return redirect(url('defaults'))
def delete(self, id):
"""DELETE /defaults/id: Delete an existing item"""
# <input type="hidden" name="_method" value="DELETE" />
# method='delete')
def show(self, id, format='html'):
"""GET /defaults/id: Show a specific item"""
def edit(self, id, format='html'):
"""GET /defaults/id/edit: Form to edit an existing item"""
# url('edit_default', id=ID)
@@ -33,25 +33,26 @@ from pylons import request, session, tmp
from pylons.controllers.util import redirect
from sqlalchemy.exc import IntegrityError
import rhodecode
from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator, \
HasPermissionAnyDecorator, HasRepoPermissionAllDecorator
from rhodecode.lib.utils import invalidate_cache, action_logger, repo_name_slug
from rhodecode.lib.helpers import get_token
from rhodecode.model.db import User, Repository, UserFollowing, RepoGroup
from rhodecode.model.db import User, Repository, UserFollowing, RepoGroup,\
RhodeCodeSetting
from rhodecode.model.forms import RepoForm
from rhodecode.model.scm import ScmModel
from rhodecode.model.repo import RepoModel
from rhodecode.lib.compat import json
from sqlalchemy.sql.expression import func
class ReposController(BaseController):
REST Controller styled on the Atom Publishing Protocol"""
@@ -472,25 +472,33 @@ class SettingsController(BaseController)
@NotAnonymous()
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def create_repository(self):
"""GET /_admin/create_repository: Form to create a new item"""
c.repo_groups = RepoGroup.groups_choices(check_perms=True)
c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
choices, c.landing_revs = ScmModel().get_repo_landing_revs()
new_repo = request.GET.get('repo', '')
c.new_repo = repo_name_slug(new_repo)
return render('admin/repos/repo_add_create_repository.html')
## apply the defaults from defaults page
defaults = RhodeCodeSetting.get_default_repo_settings(strip_prefix=True)
render('admin/repos/repo_add_create_repository.html'),
errors={},
encoding="UTF-8"
def _get_hg_ui_settings(self):
ret = RhodeCodeUi.query().all()
if not ret:
raise Exception('Could not get application ui settings !')
settings = {}
for each in ret:
k = each.ui_key
v = each.ui_value
if k == '/':
k = 'root_path'
@@ -264,24 +264,25 @@ class DbManage(object):
def step_7(self):
perm_fixes = self.klass.reset_permissions(User.DEFAULT_USER)
if perm_fixes:
notify('There was an inconsistent state of permissions '
'detected for default user. Permissions are now '
'reset to the default value for default user. '
'Please validate and check default permissions '
'in admin panel')
def step_8(self):
self.klass.populate_default_permissions()
self.klass.create_default_options(skip_existing=True)
upgrade_steps = [0] + range(curr_version + 1, __dbversion__ + 1)
# CALL THE PROPER ORDER OF STEPS TO PERFORM FULL UPGRADE
_step = None
for step in upgrade_steps:
notify('performing upgrade step %s' % step)
getattr(UpgradeSteps(self), 'step_%s' % step)()
self.sa.commit()
_step = step
@@ -473,24 +474,40 @@ class DbManage(object):
('ldap_tls_reqcert', ''), ('ldap_dn_user', ''),
('ldap_dn_pass', ''), ('ldap_base_dn', ''),
('ldap_filter', ''), ('ldap_search_scope', ''),
('ldap_attr_login', ''), ('ldap_attr_firstname', ''),
('ldap_attr_lastname', ''), ('ldap_attr_email', '')]:
if skip_existing and RhodeCodeSetting.get_by_name(k) != None:
log.debug('Skipping option %s' % k)
continue
setting = RhodeCodeSetting(k, v)
self.sa.add(setting)
def create_default_options(self, skip_existing=False):
"""Creates default settings"""
for k, v in [
('default_repo_enable_locking', False),
('default_repo_enable_downloads', False),
('default_repo_enable_statistics', False),
('default_repo_private', False),
('default_repo_type', 'hg')]:
def fixup_groups(self):
def_usr = User.get_by_username('default')
for g in RepoGroup.query().all():
g.group_name = g.get_new_name(g.name)
self.sa.add(g)
# get default perm
default = UserRepoGroupToPerm.query()\
.filter(UserRepoGroupToPerm.group == g)\
.filter(UserRepoGroupToPerm.user == def_usr)\
.scalar()
if default is None:
@@ -613,24 +630,25 @@ class DbManage(object):
self.sa.add(web2)
self.sa.add(web3)
self.sa.add(web4)
self.sa.add(paths)
self.sa.add(sett1)
self.sa.add(sett2)
self.sa.add(sett3)
self.sa.add(sett4)
self.sa.add(sett5)
self.sa.add(sett6)
self.create_ldap_options()
self.create_default_options()
log.info('created ui config')
def create_user(self, username, password, email='', admin=False):
log.info('creating user %s' % username)
UserModel().create_or_update(username, password, email,
firstname='RhodeCode', lastname='Admin',
active=True, admin=admin)
def create_default_user(self):
log.info('creating default user')
# create default user for handling default permissions.
@@ -14,15 +14,16 @@ from rhodecode.model.meta import Base
from rhodecode.model import meta
def upgrade(migrate_engine):
Upgrade operations go here.
Don't create your own engine; bind migrate_engine to your metadata
pass
def downgrade(migrate_engine):
meta = MetaData()
meta.bind = migrate_engine
@@ -278,25 +278,25 @@ def safe_str(unicode_, to_encoding=None)
return safe_str
def remove_suffix(s, suffix):
if s.endswith(suffix):
s = s[:-1 * len(suffix)]
return s
def remove_prefix(s, prefix):
if s.startswith(prefix):
s = s[:-1 * len(prefix)]
s = s[len(prefix):]
def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
Custom engine_from_config functions that makes sure we use NullPool for
file based sqlite databases. This prevents errors on sqlite. This only
applies to sqlalchemy versions < 0.7.0
import sqlalchemy
from sqlalchemy import engine_from_config as efc
@@ -37,25 +37,25 @@ from sqlalchemy.orm import relationship,
from sqlalchemy.exc import DatabaseError
from beaker.cache import cache_region, region_invalidate
from webob.exc import HTTPNotFound
from pylons.i18n.translation import lazy_ugettext as _
from rhodecode.lib.vcs import get_backend
from rhodecode.lib.vcs.utils.helpers import get_scm
from rhodecode.lib.vcs.exceptions import VCSError
from rhodecode.lib.vcs.utils.lazy import LazyProperty
from rhodecode.lib.utils2 import str2bool, safe_str, get_changeset_safe, \
safe_unicode, remove_suffix
safe_unicode, remove_suffix, remove_prefix
from rhodecode.lib.caching_query import FromCache
from rhodecode.model.meta import Base, Session
URL_SEP = '/'
#==============================================================================
# BASE CLASSES
@@ -158,25 +158,29 @@ class RhodeCodeSetting(Base, BaseModel):
def __init__(self, k='', v=''):
self.app_settings_name = k
self.app_settings_value = v
@validates('_app_settings_value')
def validate_settings_value(self, key, val):
assert type(val) == unicode
return val
@hybrid_property
def app_settings_value(self):
v = self._app_settings_value
if self.app_settings_name == 'ldap_active':
if self.app_settings_name in ["ldap_active",
"default_repo_enable_statistics",
"default_repo_enable_locking",
"default_repo_private",
"default_repo_enable_downloads"]:
v = str2bool(v)
return v
@app_settings_value.setter
def app_settings_value(self, val):
Setter that will always make sure we use unicode in app_settings_value
:param val:
self._app_settings_value = safe_unicode(val)
@@ -216,24 +220,37 @@ class RhodeCodeSetting(Base, BaseModel):
return settings
@classmethod
def get_ldap_settings(cls, cache=False):
ret = cls.query()\
.filter(cls.app_settings_name.startswith('ldap_')).all()
fd = {}
for row in ret:
fd.update({row.app_settings_name: row.app_settings_value})
return fd
def get_default_repo_settings(cls, cache=False, strip_prefix=False):
.filter(cls.app_settings_name.startswith('default_')).all()
key = row.app_settings_name
if strip_prefix:
key = remove_prefix(key, prefix='default_')
fd.update({key: row.app_settings_value})
class RhodeCodeUi(Base, BaseModel):
__tablename__ = 'rhodecode_ui'
__table_args__ = (
UniqueConstraint('ui_key'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'}
HOOK_UPDATE = 'changegroup.update'
HOOK_REPO_SIZE = 'changegroup.repo_size'
HOOK_PUSH = 'changegroup.push_logger'
@@ -164,37 +164,37 @@ def RegisterForm(edit=False, old_data={}
def PasswordResetForm():
class _PasswordResetForm(formencode.Schema):
allow_extra_fields = True
filter_extra_fields = True
email = All(v.ValidSystemEmail(), v.Email(not_empty=True))
return _PasswordResetForm
def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
repo_groups=[], landing_revs=[]):
class _RepoForm(formencode.Schema):
filter_extra_fields = False
repo_name = All(v.UnicodeString(strip=True, min=1, not_empty=True),
v.SlugifyName())
clone_uri = All(v.UnicodeString(strip=True, min=1, not_empty=False))
repo_group = All(v.CanWriteGroup(),
v.OneOf(repo_groups, hideList=True))
repo_type = v.OneOf(supported_backends)
description = v.UnicodeString(strip=True, min=1, not_empty=False)
private = v.StringBoolean(if_missing=False)
enable_statistics = v.StringBoolean(if_missing=False)
enable_downloads = v.StringBoolean(if_missing=False)
enable_locking = v.StringBoolean(if_missing=False)
landing_rev = v.OneOf(landing_revs, hideList=True)
repo_description = v.UnicodeString(strip=True, min=1, not_empty=False)
repo_private = v.StringBoolean(if_missing=False)
repo_enable_statistics = v.StringBoolean(if_missing=False)
repo_enable_downloads = v.StringBoolean(if_missing=False)
repo_enable_locking = v.StringBoolean(if_missing=False)
repo_landing_rev = v.OneOf(landing_revs, hideList=True)
if edit:
#this is repo owner
user = All(v.UnicodeString(not_empty=True), v.ValidRepoUser())
chained_validators = [v.ValidCloneUri(),
v.ValidRepoName(edit, old_data),
v.ValidPerms()]
return _RepoForm
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
@@ -209,27 +209,26 @@ def RepoForkForm(edit=False, old_data={}
repo_type = All(v.ValidForkType(old_data), v.OneOf(supported_backends))
description = v.UnicodeString(strip=True, min=1, not_empty=True)
copy_permissions = v.StringBoolean(if_missing=False)
update_after_clone = v.StringBoolean(if_missing=False)
fork_parent_id = v.UnicodeString()
chained_validators = [v.ValidForkName(edit, old_data)]
return _RepoForkForm
def RepoSettingsForm(edit=False, old_data={},
supported_backends=BACKENDS.keys(), repo_groups=[],
landing_revs=[]):
def RepoSettingsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
chained_validators = [v.ValidRepoName(edit, old_data), v.ValidPerms(),
v.ValidSettings()]
@@ -273,41 +272,54 @@ def ApplicationUiSettingsForm():
hooks_changegroup_update = v.StringBoolean(if_missing=False)
hooks_changegroup_repo_size = v.StringBoolean(if_missing=False)
hooks_changegroup_push_logger = v.StringBoolean(if_missing=False)
hooks_outgoing_pull_logger = v.StringBoolean(if_missing=False)
extensions_largefiles = v.StringBoolean(if_missing=False)
extensions_hgsubversion = v.StringBoolean(if_missing=False)
extensions_hggit = v.StringBoolean(if_missing=False)
return _ApplicationUiSettingsForm
def DefaultPermissionsForm(repo_perms_choices, group_perms_choices, register_choices, create_choices,
fork_choices):
def DefaultPermissionsForm(repo_perms_choices, group_perms_choices,
register_choices, create_choices, fork_choices):
class _DefaultPermissionsForm(formencode.Schema):
overwrite_default_repo = v.StringBoolean(if_missing=False)
overwrite_default_group = v.StringBoolean(if_missing=False)
anonymous = v.StringBoolean(if_missing=False)
default_repo_perm = v.OneOf(repo_perms_choices)
default_group_perm = v.OneOf(group_perms_choices)
default_register = v.OneOf(register_choices)
default_create = v.OneOf(create_choices)
default_fork = v.OneOf(fork_choices)
return _DefaultPermissionsForm
def DefaultsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()):
class _DefaultsForm(formencode.Schema):
default_repo_type = v.OneOf(supported_backends)
default_repo_private = v.StringBoolean(if_missing=False)
default_repo_enable_statistics = v.StringBoolean(if_missing=False)
default_repo_enable_downloads = v.StringBoolean(if_missing=False)
default_repo_enable_locking = v.StringBoolean(if_missing=False)
return _DefaultsForm
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices,
tls_kind_choices):
class _LdapSettingsForm(formencode.Schema):
#pre_validators = [LdapLibValidator]
ldap_active = v.StringBoolean(if_missing=False)
ldap_host = v.UnicodeString(strip=True,)
ldap_port = v.Number(strip=True,)
ldap_tls_kind = v.OneOf(tls_kind_choices)
ldap_tls_reqcert = v.OneOf(tls_reqcert_choices)
ldap_dn_user = v.UnicodeString(strip=True,)
@@ -28,25 +28,26 @@ import shutil
from datetime import datetime
from rhodecode.lib.vcs.backends import get_backend
from rhodecode.lib.utils2 import LazyProperty, safe_str, safe_unicode
from rhodecode.lib.hooks import log_create_repository, log_delete_repository
from rhodecode.model import BaseModel
from rhodecode.model.db import Repository, UserRepoToPerm, User, Permission, \
Statistics, UsersGroup, UsersGroupRepoToPerm, RhodeCodeUi, RepoGroup
Statistics, UsersGroup, UsersGroupRepoToPerm, RhodeCodeUi, RepoGroup,\
class RepoModel(BaseModel):
cls = Repository
URL_SEPARATOR = Repository.url_sep()
def __get_users_group(self, users_group):
@@ -196,25 +197,26 @@ class RepoModel(BaseModel):
if repo_name != new_name:
# rename repository
self.__rename_repo(old=repo_name, new=new_name)
return cur_repo
except:
raise
def create_repo(self, repo_name, repo_type, description, owner,
private=False, clone_uri=None, repos_group=None,
landing_rev='tip', just_db=False, fork_of=None,
copy_fork_permissions=False):
copy_fork_permissions=False, enable_statistics=False,
enable_locking=False, enable_downloads=False):
Create repository
owner = self._get_user(owner)
fork_of = self._get_repo(fork_of)
repos_group = self._get_repos_group(repos_group)
# repo name is just a name of repository
@@ -225,24 +227,28 @@ class RepoModel(BaseModel):
new_repo = Repository()
new_repo.enable_statistics = False
new_repo.repo_name = repo_name_full
new_repo.repo_type = repo_type
new_repo.user = owner
new_repo.group = repos_group
new_repo.description = description or repo_name
new_repo.private = private
new_repo.clone_uri = clone_uri
new_repo.landing_rev = landing_rev
new_repo.enable_statistics = enable_statistics
new_repo.enable_locking = enable_locking
new_repo.enable_downloads = enable_downloads
if repos_group:
new_repo.enable_locking = repos_group.enable_locking
if fork_of:
parent_repo = fork_of
new_repo.fork = parent_repo
self.sa.add(new_repo)
def _create_default_perms():
# create default permission
repo_to_perm = UserRepoToPerm()
@@ -298,38 +304,45 @@ class RepoModel(BaseModel):
def create(self, form_data, cur_user, just_db=False, fork=None):
Backward compatibility function, just a wrapper on top of create_repo
:param form_data:
:param cur_user:
:param just_db:
:param fork:
owner = cur_user
repo_name = form_data['repo_name_full']
repo_type = form_data['repo_type']
description = form_data['description']
private = form_data['private']
description = form_data['repo_description']
private = form_data['repo_private']
clone_uri = form_data.get('clone_uri')
repos_group = form_data['repo_group']
landing_rev = form_data['landing_rev']
landing_rev = form_data['repo_landing_rev']
copy_fork_permissions = form_data.get('copy_permissions')
fork_of = form_data.get('fork_parent_id')
##defaults
defs = RhodeCodeSetting.get_default_repo_settings(strip_prefix=True)
enable_statistics = defs.get('repo_enable_statistic')
enable_locking = defs.get('repo_enable_locking')
enable_downloads = defs.get('repo_enable_downloads')
return self.create_repo(
repo_name, repo_type, description, owner, private, clone_uri,
repos_group, landing_rev, just_db, fork_of, copy_fork_permissions
repos_group, landing_rev, just_db, fork_of, copy_fork_permissions,
enable_statistics, enable_locking, enable_downloads
def create_fork(self, form_data, cur_user):
Simple wrapper into executing celery task for fork creation
from rhodecode.lib.celerylib import tasks, run_task
run_task(tasks.create_repo_fork, form_data, cur_user)
@@ -617,24 +617,32 @@ div:hover > a.permalink {
margin: 0;
padding: 12px 9px 7px 24px;
}
#header #header-inner #quick li ul li a.groups,#header #header-inner #quick li ul li a.groups:hover
{
background: #FFF url("../images/icons/group_edit.png") no-repeat 4px 9px;
width: 167px;
#header #header-inner #quick li ul li a.defaults,#header #header-inner #quick li ul li a.defaults:hover
background: #FFF url("../images/icons/wrench.png") no-repeat 4px 9px;
#header #header-inner #quick li ul li a.settings,#header #header-inner #quick li ul li a.settings:hover
background: #FFF url("../images/icons/cog.png") no-repeat 4px 9px;
#header #header-inner #quick li ul li a.permissions,#header #header-inner #quick li ul li a.permissions:hover
background: #FFF url("../images/icons/key.png") no-repeat 4px 9px;
## -*- coding: utf-8 -*-
<%inherit file="/base/base.html"/>
<%def name="title()">
${_('Repositories defaults')} - ${c.rhodecode_name}
</%def>
<%def name="breadcrumbs_links()">
${h.link_to(_('Admin'),h.url('admin_home'))}
»
${_('Defaults')}
<%def name="page_nav()">
${self.menu('admin')}
<%def name="main()">
<div class="box">
<!-- box / title -->
<div class="title">
${self.breadcrumbs()}
</div>
<h3>${_('Repositories defaults')}</h3>
${h.form(url('default', id='defaults'),method='put')}
<div class="form">
<!-- fields -->
<div class="fields">
<div class="field">
<div class="label">
<label for="default_repo_type">${_('Type')}:</label>
<div class="input">
${h.select('default_repo_type','hg',c.backends,class_="medium")}
<div class="label label-checkbox">
<label for="default_repo_private">${_('Private repository')}:</label>
<div class="checkboxes">
${h.checkbox('default_repo_private',value="True")}
<span class="help-block">${_('Private repositories are only visible to people explicitly added as collaborators.')}</span>
<label for="default_repo_enable_statistics">${_('Enable statistics')}:</label>
${h.checkbox('default_repo_enable_statistics',value="True")}
<span class="help-block">${_('Enable statistics window on summary page.')}</span>
<label for="default_repo_enable_downloads">${_('Enable downloads')}:</label>
${h.checkbox('default_repo_enable_downloads',value="True")}
<span class="help-block">${_('Enable download menu on summary page.')}</span>
<label for="default_repo_enable_locking">${_('Enable locking')}:</label>
${h.checkbox('default_repo_enable_locking',value="True")}
<span class="help-block">${_('Enable lock-by-pulling on repository.')}</span>
<div class="buttons">
${h.submit('save',_('Save'),class_="ui-btn large")}
${h.end_form()}
##<h3>${_('Groups defaults')}</h3>
@@ -3,25 +3,25 @@
${h.form(url('repos'))}
<label for="repo_name">${_('Name')}:</label>
${h.text('repo_name',c.new_repo,class_="small")}
%if not h.HasPermissionAll('hg.admin')('repo create form'):
${h.hidden('user_created',True)}
%endif
<label for="clone_uri">${_('Clone from')}:</label>
${h.text('clone_uri',class_="small")}
<span class="help-block">${_('Optional http[s] url from which repository should be cloned.')}</span>
@@ -35,43 +35,43 @@ ${h.form(url('repos'))}
<label for="repo_type">${_('Type')}:</label>
${h.select('repo_type','hg',c.backends,class_="small")}
<span class="help-block">${_('Type of repository to create.')}</span>
<label for="landing_rev">${_('Landing revision')}:</label>
<label for="repo_landing_rev">${_('Landing revision')}:</label>
${h.select('landing_rev','',c.landing_revs,class_="medium")}
${h.select('repo_landing_rev','',c.landing_revs,class_="medium")}
<span class="help-block">${_('Default revision for files page, downloads, whoosh and readme')}</span>
<div class="label label-textarea">
<label for="description">${_('Description')}:</label>
<label for="repo_description">${_('Description')}:</label>
<div class="textarea text-area editor">
${h.textarea('description')}
${h.textarea('repo_description')}
<span class="help-block">${_('Keep it short and to the point. Use a README file for longer descriptions.')}</span>
<label for="private">${_('Private repository')}:</label>
<label for="repo_private">${_('Private repository')}:</label>
${h.checkbox('private',value="True")}
${h.checkbox('repo_private',value="True")}
${h.submit('add',_('add'),class_="ui-btn large")}
@@ -227,24 +227,25 @@
% if h.HasPermissionAll('hg.admin')('access admin main page'):
<li>
${h.link_to(_('admin'),h.url('admin_home'),class_='admin')}
<%def name="admin_menu()">
<ul>
<li>${h.link_to(_('journal'),h.url('admin_home'),class_='journal')}</li>
<li>${h.link_to(_('repositories'),h.url('repos'),class_='repos')}</li>
<li>${h.link_to(_('repositories groups'),h.url('repos_groups'),class_='repos_groups')}</li>
<li>${h.link_to(_('users'),h.url('users'),class_='users')}</li>
<li>${h.link_to(_('users groups'),h.url('users_groups'),class_='groups')}</li>
<li>${h.link_to(_('permissions'),h.url('edit_permission',id='default'),class_='permissions')}</li>
<li>${h.link_to(_('ldap'),h.url('ldap_home'),class_='ldap')}</li>
<li>${h.link_to(_('defaults'),h.url('defaults'),class_='defaults')}</li>
<li class="last">${h.link_to(_('settings'),h.url('admin_settings'),class_='settings')}</li>
</ul>
## ADMIN MENU
${admin_menu()}
</li>
% endif
<a class="menu_link" title="${_('Followers')}" href="${h.url('repo_followers_home',repo_name=c.repo_name)}">
@@ -38,25 +38,25 @@ if not is_windows:
__all__ = [
'parameterized', 'environ', 'url', 'get_new_dir', 'TestController',
'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO',
'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
'TEST_GIT_REPO_CLONE', 'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO',
'GIT_REMOTE_REPO', 'SCM_TESTS',
'GIT_REMOTE_REPO', 'SCM_TESTS', '_get_repo_create_params'
]
# Invoke websetup with the current config file
# SetupCommand('setup-app').run([config_file])
##RUNNING DESIRED TESTS
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
# nosetests --pdb --pdb-failures
# nosetests --with-coverage --cover-package=rhodecode.model.validators rhodecode.tests.test_validators
environ = {}
#SOME GLOBALS FOR TESTS
@@ -154,12 +154,33 @@ class TestController(TestCase):
return response.session['rhodecode_user']
def _get_logged_user(self):
return User.get_by_username(self._logged_username)
def checkSessionFlash(self, response, msg):
self.assertTrue('flash' in response.session)
if not msg in response.session['flash'][0][1]:
self.fail(
'msg `%s` not found in session flash: got `%s` instead' % (
msg, response.session['flash'])
## HELPERS ##
def _get_repo_create_params(**custom):
defs = {
'repo_name': None,
'repo_type': 'hg',
'clone_uri': '',
'repo_group': '',
'repo_description': 'DESC',
'repo_private': False,
'repo_landing_rev': 'tip'
defs.update(custom)
if 'repo_name_full' not in custom:
defs.update({'repo_name_full': defs['repo_name']})
return defs
@@ -52,33 +52,28 @@ def make_users_group(name=TEST_USERS_GRO
user=TEST_USER_ADMIN_LOGIN)
return gr
def destroy_users_group(name=TEST_USERS_GROUP):
UsersGroupModel().delete(users_group=name, force=True)
def create_repo(repo_name, repo_type):
# create new repo
form_data = dict(repo_name=repo_name,
repo_name_full=repo_name,
fork_name=None,
description='description %s' % repo_name,
repo_group=None,
private=False,
repo_type=repo_type,
clone_uri=None,
landing_rev='tip')
form_data = _get_repo_create_params(
repo_description='description %s' % repo_name,
cur_user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
return r
def create_fork(fork_name, fork_type, fork_of):
fork = RepoModel(Session())._get_repo(fork_of)
r = create_repo(fork_name, fork_type)
r.fork = fork
Session().add(r)
from rhodecode.tests import *
class TestDefaultsController(TestController):
def test_index(self):
self.log_user()
response = self.app.get(url('defaults'))
response.mustcontain('default_repo_private')
response.mustcontain('default_repo_enable_statistics')
response.mustcontain('default_repo_enable_downloads')
response.mustcontain('default_repo_enable_locking')
def test_index_as_xml(self):
response = self.app.get(url('formatted_defaults', format='xml'))
def test_create(self):
response = self.app.post(url('defaults'))
def test_new(self):
response = self.app.get(url('new_default'))
def test_new_as_xml(self):
response = self.app.get(url('formatted_new_default', format='xml'))
def test_update(self):
params = {
'default_repo_enable_locking': True,
'default_repo_enable_downloads': True,
'default_repo_enable_statistics': True,
'default_repo_private': True,
'default_repo_type': 'hg',
response = self.app.put(url('default', id='default'), params=params)
self.checkSessionFlash(response, 'Default settings updated successfully')
defs = RhodeCodeSetting.get_default_repo_settings()
self.assertEqual(params, defs)
'default_repo_enable_locking': False,
'default_repo_enable_downloads': False,
'default_repo_enable_statistics': False,
'default_repo_private': False,
'default_repo_type': 'git',
def test_update_browser_fakeout(self):
response = self.app.post(url('default', id=1), params=dict(_method='put'))
def test_delete(self):
response = self.app.delete(url('default', id=1))
def test_delete_browser_fakeout(self):
response = self.app.post(url('default', id=1), params=dict(_method='delete'))
def test_show(self):
response = self.app.get(url('default', id=1))
def test_show_as_xml(self):
response = self.app.get(url('formatted_default', id=1, format='xml'))
def test_edit(self):
response = self.app.get(url('edit_default', id=1))
def test_edit_as_xml(self):
response = self.app.get(url('formatted_edit_default', id=1, format='xml'))
@@ -17,32 +17,28 @@ class TestAdminReposController(TestContr
response = self.app.get(url('repos'))
# Test response...
response = self.app.get(url('formatted_repos', format='xml'))
def test_create_hg(self):
repo_name = NEW_HG_REPO
description = 'description for newly created repo'
private = False
response = self.app.post(url('repos'), {'repo_name': repo_name,
'description': description,
'private': private,
'landing_rev': 'tip'})
response = self.app.post(url('repos'),
_get_repo_create_params(repo_private=False,
repo_name=repo_name,
repo_description=description))
self.checkSessionFlash(response,
'created repository %s' % (repo_name))
#test if the repo was created in the database
new_repo = self.Session().query(Repository)\
.filter(Repository.repo_name == repo_name).one()
self.assertEqual(new_repo.repo_name, repo_name)
self.assertEqual(new_repo.description, description)
#test if repository is visible in the list ?
response = response.follow()
@@ -54,31 +50,28 @@ class TestAdminReposController(TestContr
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
self.fail('no repo %s in filesystem' % repo_name)
def test_create_hg_non_ascii(self):
non_ascii = "ąęł"
repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
repo_name_unicode = repo_name.decode('utf8')
description = 'description for newly created repo' + non_ascii
description_unicode = description.decode('utf8')
'created repository %s' % (repo_name_unicode))
.filter(Repository.repo_name == repo_name_unicode).one()
self.assertEqual(new_repo.repo_name, repo_name_unicode)
self.assertEqual(new_repo.description, description_unicode)
@@ -94,32 +87,30 @@ class TestAdminReposController(TestContr
def test_create_hg_in_group(self):
## create GROUP
group_name = 'sometest'
gr = ReposGroupModel().create(group_name=group_name,
group_description='test',)
self.Session().commit()
repo_name = 'ingroup'
repo_name_full = RepoGroup.url_sep().join([group_name, repo_name])
'repo_group': gr.group_id,
repo_description=description,
repo_group=gr.group_id,))
.filter(Repository.repo_name == repo_name_full).one()
self.assertEqual(new_repo.repo_name, repo_name_full)
@@ -133,32 +124,30 @@ class TestAdminReposController(TestContr
ReposGroupModel().delete(group_name)
RepoModel().delete(repo_name_full)
def test_create_git(self):
repo_name = NEW_GIT_REPO
'repo_type': 'git',
repo_type='git',
@@ -170,31 +159,30 @@ class TestAdminReposController(TestContr
def test_create_git_non_ascii(self):
repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
@@ -217,31 +205,30 @@ class TestAdminReposController(TestContr
response = self.app.put(url('repo', repo_name=HG_REPO))
response = self.app.post(url('repo', repo_name=HG_REPO),
params=dict(_method='put'))
def test_delete_hg(self):
repo_name = 'vcs_test_new_to_delete'
repo_type='hg',
@@ -266,31 +253,30 @@ class TestAdminReposController(TestContr
.filter(Repository.repo_name == repo_name).scalar()
self.assertEqual(deleted_repo, None)
self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
False)
def test_delete_git(self):
from rhodecode.model.db import Repository
ARCHIVE_SPECS = {
'.tar.bz2': ('application/x-bzip2', 'tbz2', ''),
'.tar.gz': ('application/x-gzip', 'tgz', ''),
'.zip': ('application/zip', 'zip', ''),
def _set_downloads(repo_name, set_to):
repo = Repository.get_by_repo_name(repo_name)
repo.enable_downloads = set_to
Session().add(repo)
class TestFilesController(TestController):
response = self.app.get(url(controller='files', action='index',
repo_name=HG_REPO,
revision='tip',
f_path='/'))
response.mustcontain('<a class="browser-dir ypjax-link" href="/vcs_test_hg/files/27cd5cce30c96924232dffcd24178a07ffeb5dfc/docs">docs</a>')
response.mustcontain('<a class="browser-dir ypjax-link" href="/vcs_test_hg/files/27cd5cce30c96924232dffcd24178a07ffeb5dfc/tests">tests</a>')
response.mustcontain('<a class="browser-dir ypjax-link" href="/vcs_test_hg/files/27cd5cce30c96924232dffcd24178a07ffeb5dfc/vcs">vcs</a>')
@@ -207,58 +216,58 @@ removed extra unicode conversion in diff
</optgroup>""")
def test_file_annotation_git(self):
repo_name=GIT_REPO,
revision='master',
f_path='vcs/nodes.py',
annotate=True))
def test_archival(self):
_set_downloads(HG_REPO, set_to=True)
for arch_ext, info in ARCHIVE_SPECS.items():
short = '27cd5cce30c9%s' % arch_ext
fname = '27cd5cce30c96924232dffcd24178a07ffeb5dfc%s' % arch_ext
filename = '%s-%s' % (HG_REPO, short)
response = self.app.get(url(controller='files',
action='archivefile',
fname=fname))
self.assertEqual(response.status, '200 OK')
heads = [
('Pragma', 'no-cache'),
('Cache-Control', 'no-cache'),
('Content-Disposition', 'attachment; filename=%s' % filename),
('Content-Type', '%s; charset=utf-8' % info[0]),
self.assertEqual(response.response._headers.items(), heads)
def test_archival_wrong_ext(self):
for arch_ext in ['tar', 'rar', 'x', '..ax', '.zipz']:
response.mustcontain('Unknown archive type')
def test_archival_wrong_revision(self):
for rev in ['00x000000', 'tar', 'wrong', '@##$@$42413232', '232dffcd']:
fname = '%s.zip' % rev
response.mustcontain('Unknown revision')
#==========================================================================
# RAW FILE
import os
import unittest
from rhodecode.model.repos_group import ReposGroupModel
from rhodecode.model.db import RepoGroup, User, Repository
from rhodecode.model.db import RepoGroup, User
def _make_group(path, desc='desc', parent_id=None,
skip_if_exists=False):
gr = RepoGroup.get_by_group_name(path)
if gr and skip_if_exists:
if isinstance(parent_id, RepoGroup):
parent_id = parent_id.group_id
@@ -116,35 +116,25 @@ class TestReposGroups(unittest.TestCase)
new_sg1 = self.__update_group(sg1.group_id, 'hello')
self.assertTrue(self.__check_path('hello'))
self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
def test_subgrouping_with_repo(self):
g1 = _make_group('g1')
g2 = _make_group('g2')
form_data = dict(repo_name='john',
repo_name_full='john',
description=None,
landing_rev='tip',
enable_locking=False,
recursive=False)
form_data = _get_repo_create_params(repo_name='john')
cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
self.assertEqual(r.repo_name, 'john')
# put repo into group
form_data = form_data
form_data['repo_group'] = g1.group_id
form_data['perms_new'] = []
form_data['perms_updates'] = []
RepoModel().update(r.repo_name, form_data)
self.assertEqual(r.repo_name, 'g1/john')
Status change: