# -*- coding: utf-8 -*-
"""
rhodecode.controllers.admin.repos
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Admin controller for RhodeCode
:created_on: Apr 7, 2010
:author: marcink
:copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import traceback
import formencode
from operator import itemgetter
from formencode import htmlfill
from paste.httpexceptions import HTTPInternalServerError
from pylons import request, response, session, tmpl_context as c, url
from pylons.controllers.util import abort, redirect
from pylons.i18n.translation import _
from rhodecode.lib import helpers as h
from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator, \
HasPermissionAnyDecorator
from rhodecode.lib.base import BaseController, render
from rhodecode.lib.utils import invalidate_cache, action_logger, repo_name_slug
from rhodecode.lib.helpers import get_token
from rhodecode.model.db import User, Repository, UserFollowing, Group
from rhodecode.model.forms import RepoForm
from rhodecode.model.scm import ScmModel
from rhodecode.model.repo import RepoModel
from sqlalchemy.exc import IntegrityError
log = logging.getLogger(__name__)
class ReposController(BaseController):
REST Controller styled on the Atom Publishing Protocol"""
# To properly map this controller, ensure your config/routing.py
# file has a resource setup:
# map.resource('repo', 'repos')
@LoginRequired()
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def __before__(self):
c.admin_user = session.get('admin_user')
c.admin_username = session.get('admin_username')
super(ReposController, self).__before__()
def __load_defaults(self):
repo_model = RepoModel()
c.repo_groups = [('', '')]
parents_link = lambda k: h.literal('»'.join(
map(lambda k: k.group_name,
k.parents + [k])
)
c.repo_groups.extend([(x.group_id, parents_link(x)) for \
x in self.sa.query(Group).all()])
c.repo_groups = sorted(c.repo_groups,
key=lambda t: t[1].split('»')[0])
c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
c.users_array = repo_model.get_users_js()
c.users_groups_array = repo_model.get_users_groups_js()
def __load_data(self, repo_name=None):
Load defaults settings for edit, and update
:param repo_name:
self.__load_defaults()
c.repo_info = db_repo = Repository.get_by_repo_name(repo_name)
repo = scm_repo = db_repo.scm_instance
if c.repo_info is None:
h.flash(_('%s repository is not mapped to db perhaps'
' it was created or renamed from the filesystem'
' please run the application again'
' in order to rescan repositories') % repo_name,
category='error')
return redirect(url('repos'))
c.default_user_id = User.get_by_username('default').user_id
c.in_public_journal = self.sa.query(UserFollowing)\
.filter(UserFollowing.user_id == c.default_user_id)\
.filter(UserFollowing.follows_repository == c.repo_info).scalar()
if c.repo_info.stats:
last_rev = c.repo_info.stats.stat_on_revision
else:
last_rev = 0
c.stats_revision = last_rev
c.repo_last_rev = repo.count() - 1 if repo.revisions else 0
if last_rev == 0 or c.repo_last_rev == 0:
c.stats_percentage = 0
c.stats_percentage = '%.2f' % ((float((last_rev)) /
c.repo_last_rev) * 100)
defaults = c.repo_info.get_dict()
group, repo_name = c.repo_info.groups_and_repo
defaults['repo_name'] = repo_name
defaults['repo_group'] = getattr(group[-1] if group else None,
'group_id', None)
#fill owner
if c.repo_info.user:
defaults.update({'user': c.repo_info.user.username})
replacement_user = self.sa.query(User)\
.filter(User.admin == True).first().username
defaults.update({'user': replacement_user})
#fill repository users
for p in c.repo_info.repo_to_perm:
defaults.update({'u_perm_%s' % p.user.username:
p.permission.permission_name})
#fill repository groups
for p in c.repo_info.users_group_to_perm:
defaults.update({'g_perm_%s' % p.users_group.users_group_name:
return defaults
@HasPermissionAllDecorator('hg.admin')
def index(self, format='html'):
"""GET /repos: All items in the collection"""
# url('repos')
c.repos_list = ScmModel().get_repos(Repository.query()
.order_by(Repository.repo_name)
.all(), sort_key='name_sort')
return render('admin/repos/repos.html')
def create(self):
POST /repos: Create a new item"""
form_result = {}
try:
form_result = RepoForm(repo_groups=c.repo_groups_choices)()\
.to_python(dict(request.POST))
repo_model.create(form_result, self.rhodecode_user)
if form_result['clone_uri']:
h.flash(_('created repository %s from %s') \
% (form_result['repo_name'], form_result['clone_uri']),
category='success')
h.flash(_('created repository %s') % form_result['repo_name'],
if request.POST.get('user_created'):
#created by regular non admin user
action_logger(self.rhodecode_user, 'user_created_repo',
form_result['repo_name_full'], '', self.sa)
action_logger(self.rhodecode_user, 'admin_created_repo',
except formencode.Invalid, errors:
c.new_repo = errors.value['repo_name']
r = render('admin/repos/repo_add_create_repository.html')
r = render('admin/repos/repo_add.html')
return htmlfill.render(
r,
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8")
except Exception:
log.error(traceback.format_exc())
msg = _('error occurred during creation of repository %s') \
% form_result.get('repo_name')
h.flash(msg, category='error')
return redirect(url('home'))
def new(self, format='html'):
"""GET /repos/new: Form to create a new item"""
new_repo = request.GET.get('repo', '')
c.new_repo = repo_name_slug(new_repo)
return render('admin/repos/repo_add.html')
def update(self, repo_name):
PUT /repos/repo_name: Update an existing item"""
# Forms posted to this method should contain a hidden field:
# <input type="hidden" name="_method" value="PUT" />
# Or using helpers:
# h.form(url('repo', repo_name=ID),
# method='put')
# url('repo', repo_name=ID)
changed_name = repo_name
_form = RepoForm(edit=True, old_data={'repo_name': repo_name},
repo_groups=c.repo_groups_choices)()
form_result = _form.to_python(dict(request.POST))
repo_model.update(repo_name, form_result)
repo = repo_model.update(repo_name, form_result)
invalidate_cache('get_repo_cached_%s' % repo_name)
h.flash(_('Repository %s updated successfully' % repo_name),
changed_name = form_result['repo_name_full']
changed_name = repo.repo_name
action_logger(self.rhodecode_user, 'admin_updated_repo',
changed_name, '', self.sa)
defaults = self.__load_data(repo_name)
defaults.update(errors.value)
render('admin/repos/repo_edit.html'),
defaults=defaults,
h.flash(_('error occurred during update of repository %s') \
% repo_name, category='error')
return redirect(url('edit_repo', repo_name=changed_name))
def delete(self, repo_name):
DELETE /repos/repo_name: Delete an existing item"""
# <input type="hidden" name="_method" value="DELETE" />
# method='delete')
repo = repo_model.get_by_repo_name(repo_name)
if not repo:
' it was moved or renamed from the filesystem'
action_logger(self.rhodecode_user, 'admin_deleted_repo',
repo_name, '', self.sa)
repo_model.delete(repo)
h.flash(_('deleted repository %s') % repo_name, category='success')
except IntegrityError, e:
if e.message.find('repositories_fork_id_fkey'):
h.flash(_('Cannot delete %s it still contains attached '
'forks') % repo_name,
category='warning')
h.flash(_('An error occurred during '
'deletion of %s') % repo_name,
except Exception, e:
h.flash(_('An error occurred during deletion of %s') % repo_name,
def delete_perm_user(self, repo_name):
DELETE an existing repository permission user
repo_model.delete_perm_user(request.POST, repo_name)
h.flash(_('An error occurred during deletion of repository user'),
raise HTTPInternalServerError()
def delete_perm_users_group(self, repo_name):
DELETE an existing repository permission users group
repo_model.delete_perm_users_group(request.POST, repo_name)
h.flash(_('An error occurred during deletion of repository'
' users groups'),
def repo_stats(self, repo_name):
DELETE an existing repository statistics
repo_model.delete_stats(repo_name)
h.flash(_('An error occurred during deletion of repository stats'),
return redirect(url('edit_repo', repo_name=repo_name))
def repo_cache(self, repo_name):
INVALIDATE existing repository cache
ScmModel().mark_for_invalidation(repo_name)
h.flash(_('An error occurred during cache invalidation'),
def repo_public_journal(self, repo_name):
Set's this repository to be visible in public journal,
in other words assing default user to follow this repo
cur_token = request.POST.get('auth_token')
token = get_token()
if cur_token == token:
repo_id = Repository.get_by_repo_name(repo_name).repo_id
user_id = User.get_by_username('default').user_id
self.scm_model.toggle_following_repo(repo_id, user_id)
h.flash(_('Updated repository visibility in public journal'),
except:
h.flash(_('An error occurred during setting this'
' repository in public journal'),
h.flash(_('Token mismatch'), category='error')
def repo_pull(self, repo_name):
Runs task to update given repository with remote changes,
ie. make pull on remote location
ScmModel().pull_changes(repo_name, self.rhodecode_user.username)
h.flash(_('Pulled from remote location'), category='success')
h.flash(_('An error occurred during pull from remote location'),
def show(self, repo_name, format='html'):
"""GET /repos/repo_name: Show a specific item"""
def edit(self, repo_name, format='html'):
"""GET /repos/repo_name/edit: Form to edit an existing item"""
# url('edit_repo', repo_name=ID)
encoding="UTF-8",
force_defaults=False
rhodecode.model.db
~~~~~~~~~~~~~~~~~~
Database Models for RhodeCode
:created_on: Apr 08, 2010
import os
import datetime
from datetime import date
from sqlalchemy import *
from sqlalchemy.exc import DatabaseError
from sqlalchemy.orm import relationship, backref, joinedload, class_mapper
from sqlalchemy.orm.interfaces import MapperExtension
from beaker.cache import cache_region, region_invalidate
from vcs import get_backend
from vcs.utils.helpers import get_scm
from vcs.exceptions import VCSError
from vcs.utils.lazy import LazyProperty
from rhodecode.lib import str2bool, safe_str, get_changeset_safe, \
generate_api_key
from rhodecode.lib.exceptions import UsersGroupsAssignedException
from rhodecode.lib.compat import json
from rhodecode.model.meta import Base, Session
from rhodecode.model.caching_query import FromCache
#==============================================================================
# BASE CLASSES
class ModelSerializer(json.JSONEncoder):
Simple Serializer for JSON,
usage::
to make object customized for serialization implement a __json__
method that will return a dict for serialization into json
example::
class Task(object):
def __init__(self, name, value):
self.name = name
self.value = value
def __json__(self):
return dict(name=self.name,
value=self.value)
def default(self, obj):
if hasattr(obj, '__json__'):
return obj.__json__()
return json.JSONEncoder.default(self, obj)
class BaseModel(object):
"""Base Model for all classess
@classmethod
def _get_keys(cls):
"""return column names for this model """
return class_mapper(cls).c.keys()
def get_dict(self):
"""return dict with keys and values corresponding
to this model data """
d = {}
for k in self._get_keys():
d[k] = getattr(self, k)
return d
def get_appstruct(self):
"""return list with keys and values tupples corresponding
l = []
l.append((k, getattr(self, k),))
return l
def populate_obj(self, populate_dict):
"""populate model with data from given populate_dict"""
if k in populate_dict:
setattr(self, k, populate_dict[k])
def query(cls):
return Session.query(cls)
def get(cls, id_):
if id_:
return Session.query(cls).get(id_)
def delete(cls, id_):
obj = Session.query(cls).get(id_)
Session.delete(obj)
Session.commit()
class RhodeCodeSettings(Base, BaseModel):
__tablename__ = 'rhodecode_settings'
__table_args__ = (UniqueConstraint('app_settings_name'), {'extend_existing':True})
app_settings_id = Column("app_settings_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
app_settings_name = Column("app_settings_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
app_settings_value = Column("app_settings_value", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
def __init__(self, k='', v=''):
self.app_settings_name = k
self.app_settings_value = v
def __repr__(self):
return "<%s('%s:%s')>" % (self.__class__.__name__,
self.app_settings_name, self.app_settings_value)
def get_by_name(cls, ldap_key):
return Session.query(cls)\
.filter(cls.app_settings_name == ldap_key).scalar()
def get_app_settings(cls, cache=False):
ret = Session.query(cls)
if cache:
ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
if not ret:
raise Exception('Could not get application settings !')
settings = {}
for each in ret:
settings['rhodecode_' + each.app_settings_name] = \
each.app_settings_value
return settings
def get_ldap_settings(cls, cache=False):
ret = Session.query(cls)\
.filter(cls.app_settings_name.startswith('ldap_'))\
.all()
fd = {}
for row in ret:
fd.update({row.app_settings_name:row.app_settings_value})
fd.update({'ldap_active':str2bool(fd.get('ldap_active'))})
return fd
class RhodeCodeUi(Base, BaseModel):
__tablename__ = 'rhodecode_ui'
__table_args__ = (UniqueConstraint('ui_key'), {'extend_existing':True})
HOOK_UPDATE = 'changegroup.update'
HOOK_REPO_SIZE = 'changegroup.repo_size'
HOOK_PUSH = 'pretxnchangegroup.push_logger'
HOOK_PULL = 'preoutgoing.pull_logger'
ui_id = Column("ui_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
ui_section = Column("ui_section", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_key = Column("ui_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_value = Column("ui_value", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_active = Column("ui_active", Boolean(), nullable=True, unique=None, default=True)
def get_by_key(cls, key):
return Session.query(cls).filter(cls.ui_key == key)
def get_builtin_hooks(cls):
q = cls.query()
q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE,
cls.HOOK_REPO_SIZE,
cls.HOOK_PUSH, cls.HOOK_PULL]))
return q.all()
def get_custom_hooks(cls):
q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE,
q = q.filter(cls.ui_section == 'hooks')
def create_or_update_hook(cls, key, val):
new_ui = cls.get_by_key(key).scalar() or cls()
new_ui.ui_section = 'hooks'
new_ui.ui_active = True
new_ui.ui_key = key
new_ui.ui_value = val
Session.add(new_ui)
class User(Base, BaseModel):
__tablename__ = 'users'
__table_args__ = (UniqueConstraint('username'), UniqueConstraint('email'), {'extend_existing':True})
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=None)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
ldap_dn = Column("ldap_dn", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_log = relationship('UserLog', cascade='all')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
repo_to_perm = relationship('RepoToPerm', primaryjoin='RepoToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UsersGroupMember', cascade='all')
@property
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
def short_contact(self):
return '%s %s' % (self.name, self.lastname)
def is_admin(self):
return self.admin
return "<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
return self.__class__.__name__
def get_by_username(cls, username, case_insensitive=False):
if case_insensitive:
return Session.query(cls).filter(cls.username.like(username)).scalar()
return Session.query(cls).filter(cls.username.ilike(username)).scalar()
return Session.query(cls).filter(cls.username == username).scalar()
def get_by_api_key(cls, api_key):
return Session.query(cls).filter(cls.api_key == api_key).one()
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
Session.add(self)
log.debug('updated user %s lastlogin', self.username)
def create(cls, form_data):
from rhodecode.lib.auth import get_crypt_password
new_user = cls()
for k, v in form_data.items():
if k == 'password':
v = get_crypt_password(v)
setattr(new_user, k, v)
new_user.api_key = generate_api_key(form_data['username'])
Session.add(new_user)
return new_user
Session.rollback()
raise
class UserLog(Base, BaseModel):
__tablename__ = 'user_logs'
__table_args__ = {'extend_existing':True}
user_log_id = Column("user_log_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
repository_name = Column("repository_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_ip = Column("user_ip", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action = Column("action", UnicodeText(length=1200000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action_date = Column("action_date", DateTime(timezone=False), nullable=True, unique=None, default=None)
def action_as_day(self):
return date(*self.action_date.timetuple()[:3])
user = relationship('User')
repository = relationship('Repository')
class UsersGroup(Base, BaseModel):
__tablename__ = 'users_groups'
users_group_id = Column("users_group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_name = Column("users_group_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
users_group_active = Column("users_group_active", Boolean(), nullable=True, unique=None, default=None)
members = relationship('UsersGroupMember', cascade="all, delete, delete-orphan", lazy="joined")
return '<userGroup(%s)>' % (self.users_group_name)
def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
gr = Session.query(cls)\
.filter(cls.users_group_name.ilike(group_name))
gr = Session.query(UsersGroup)\
.filter(UsersGroup.users_group_name == group_name)
gr = gr.options(FromCache("sql_cache_short",
"get_user_%s" % group_name))
return gr.scalar()
def get(cls, users_group_id, cache=False):
users_group = Session.query(cls)
users_group = users_group.options(FromCache("sql_cache_short",
"get_users_group_%s" % users_group_id))
return users_group.get(users_group_id)
new_users_group = cls()
setattr(new_users_group, k, v)
Session.add(new_users_group)
return new_users_group
def update(cls, users_group_id, form_data):
users_group = cls.get(users_group_id, cache=False)
if k == 'users_group_members':
users_group.members = []
Session.flush()
members_list = []
if v:
for u_id in set(v):
members_list.append(UsersGroupMember(
users_group_id,
u_id))
setattr(users_group, 'members', members_list)
setattr(users_group, k, v)
Session.add(users_group)
def delete(cls, users_group_id):
# check if this group is not assigned to repo
assigned_groups = UsersGroupRepoToPerm.query()\
.filter(UsersGroupRepoToPerm.users_group_id ==
users_group_id).all()
if assigned_groups:
raise UsersGroupsAssignedException('Group assigned to %s' %
assigned_groups)
Session.delete(users_group)
class UsersGroupMember(Base, BaseModel):
__tablename__ = 'users_groups_members'
users_group_member_id = Column("users_group_member_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
user = relationship('User', lazy='joined')
users_group = relationship('UsersGroup')
def __init__(self, gr_id='', u_id=''):
self.users_group_id = gr_id
self.user_id = u_id
class Repository(Base, BaseModel):
__tablename__ = 'repositories'
__table_args__ = (UniqueConstraint('repo_name'), {'extend_existing':True},)
repo_id = Column("repo_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repo_name = Column("repo_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
clone_uri = Column("clone_uri", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default='hg')
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
private = Column("private", Boolean(), nullable=True, unique=None, default=None)
enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
fork = relationship('Repository', remote_side=repo_id)
group = relationship('Group')
repo_to_perm = relationship('RepoToPerm', cascade='all', order_by='RepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id', cascade='all')
logs = relationship('UserLog', cascade='all')
self.repo_id, self.repo_name)
def get_by_repo_name(cls, repo_name):
q = Session.query(cls).filter(cls.repo_name == repo_name)
q = q.options(joinedload(Repository.fork))\
.options(joinedload(Repository.user))\
.options(joinedload(Repository.group))\
return q.one()
def get_repo_forks(cls, repo_id):
return Session.query(cls).filter(Repository.fork_id == repo_id)
def base_path(cls):
Returns base path when all repos are stored
:param cls:
q = Session.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/')
q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
def just_name(self):
return self.repo_name.split(os.sep)[-1]
def groups_with_parents(self):
groups = []
if self.group is None:
return groups
cur_gr = self.group
groups.insert(0, cur_gr)
while 1:
gr = getattr(cur_gr, 'parent_group', None)
cur_gr = cur_gr.parent_group
if gr is None:
break
groups.insert(0, gr)
def groups_and_repo(self):
return self.groups_with_parents, self.just_name
@LazyProperty
def repo_path(self):
Returns base full path for that repository means where it actually
exists on a filesystem
def repo_full_path(self):
p = [self.repo_path]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split('/')
return os.path.join(*p)
def get_new_name(self, repo_name):
returns new full repository name based on assigned group and new new
:param group_name:
path_prefix = self.group.full_path_splitted if self.group else []
return '/'.join(path_prefix + [repo_name])
def _ui(self):
Creates an db based ui object for this repository
from mercurial import ui
from mercurial import config
baseui = ui.ui()
#clean the baseui object
baseui._ocfg = config.config()
baseui._ucfg = config.config()
baseui._tcfg = config.config()
ret = Session.query(RhodeCodeUi)\
.options(FromCache("sql_cache_short", "repository_repo_ui")).all()
hg_ui = ret
for ui_ in hg_ui:
if ui_.ui_active:
log.debug('settings ui from db[%s]%s:%s', ui_.ui_section,
ui_.ui_key, ui_.ui_value)
baseui.setconfig(ui_.ui_section, ui_.ui_key, ui_.ui_value)
return baseui
def is_valid(cls, repo_name):
returns True if given repo name is a valid filesystem repository
@param cls:
@param repo_name:
from rhodecode.lib.utils import is_valid_repo
return is_valid_repo(repo_name, cls.base_path())
#==========================================================================
# SCM PROPERTIES
def get_changeset(self, rev):
return get_changeset_safe(self.scm_instance, rev)
def tip(self):
return self.get_changeset('tip')
def author(self):
return self.tip.author
def last_change(self):
return self.scm_instance.last_change
# SCM CACHE INSTANCE
def invalidate(self):
Returns Invalidation object if this repo should be invalidated
None otherwise. `cache_active = False` means that this cache
state is not valid and needs to be invalidated
return Session.query(CacheInvalidation)\
.filter(CacheInvalidation.cache_key == self.repo_name)\
.filter(CacheInvalidation.cache_active == False)\
.scalar()
def set_invalidate(self):
set a cache for invalidation for this instance
inv = Session.query(CacheInvalidation)\
if inv is None:
inv = CacheInvalidation(self.repo_name)
inv.cache_active = True
Session.add(inv)
def scm_instance(self):
return self.__get_instance()
def scm_instance_cached(self):
@cache_region('long_term')
def _c(repo_name):
# TODO: remove this trick when beaker 1.6 is released
# and have fixed this issue with not supporting unicode keys
rn = safe_str(self.repo_name)
inv = self.invalidate
if inv is not None:
region_invalidate(_c, None, rn)
# update our cache
return _c(rn)
def __get_instance(self):
repo_full_path = self.repo_full_path
alias = get_scm(repo_full_path)[0]
log.debug('Creating instance of %s repository', alias)
backend = get_backend(alias)
except VCSError:
log.error('Perhaps this repository is in db and not in '
'filesystem run rescan repositories with '
'"destroy old data " option from admin panel')
return
if alias == 'hg':
repo = backend(safe_str(repo_full_path), create=False,
baseui=self._ui)
#skip hidden web repository
if repo._get_hidden():
repo = backend(repo_full_path, create=False)
return repo
class Group(Base, BaseModel):
__tablename__ = 'groups'
__table_args__ = (UniqueConstraint('group_name', 'group_parent_id'),
CheckConstraint('group_id != group_parent_id'), {'extend_existing':True},)
__mapper_args__ = {'order_by':'group_name'}
group_id = Column("group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
group_name = Column("group_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
group_parent_id = Column("group_parent_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=None, default=None)
group_description = Column("group_description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
parent_group = relationship('Group', remote_side=group_id)
def __init__(self, group_name='', parent_group=None):
self.group_name = group_name
self.parent_group = parent_group
return "<%s('%s:%s')>" % (self.__class__.__name__, self.group_id,
self.group_name)
def url_sep(cls):
return '/'
def get_by_group_name(cls, group_name):
return cls.query().filter(cls.group_name == group_name).scalar()
def parents(self):
parents_recursion_limit = 5
if self.parent_group is None:
cur_gr = self.parent_group
cnt = 0
cnt += 1
if cnt == parents_recursion_limit:
# this will prevent accidental infinit loops
log.error('group nested more than %s' %
parents_recursion_limit)
def children(self):
return Session.query(Group).filter(Group.parent_group == self)
def name(self):
return self.group_name.split(Group.url_sep())[-1]
def full_path(self):
return self.group_name
def full_path_splitted(self):
return self.group_name.split(Group.url_sep())
def repositories(self):
return Session.query(Repository).filter(Repository.group == self)
def repositories_recursive_count(self):
cnt = self.repositories.count()
def children_count(group):
for child in group.children:
cnt += child.repositories.count()
cnt += children_count(child)
return cnt
return cnt + children_count(self)
def get_new_name(self, group_name):
returns new full group name based on parent and new name
path_prefix = self.parent_group.full_path_splitted if self.parent_group else []
return Group.url_sep().join(path_prefix + [group_name])
class Permission(Base, BaseModel):
__tablename__ = 'permissions'
permission_id = Column("permission_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
permission_name = Column("permission_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
permission_longname = Column("permission_longname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
self.permission_id, self.permission_name)
return Session.query(cls).filter(cls.permission_name == key).scalar()
class RepoToPerm(Base, BaseModel):
__tablename__ = 'repo_to_perm'
__table_args__ = (UniqueConstraint('user_id', 'repository_id'), {'extend_existing':True})
repo_to_perm_id = Column("repo_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
permission = relationship('Permission')
class UserToPerm(Base, BaseModel):
__tablename__ = 'user_to_perm'
__table_args__ = (UniqueConstraint('user_id', 'permission_id'), {'extend_existing':True})
user_to_perm_id = Column("user_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
def has_perm(cls, user_id, perm):
if not isinstance(perm, Permission):
raise Exception('perm needs to be an instance of Permission class')
return Session.query(cls).filter(cls.user_id == user_id)\
.filter(cls.permission == perm).scalar() is not None
def grant_perm(cls, user_id, perm):
new = cls()
new.user_id = user_id
new.permission = perm
Session.add(new)
def revoke_perm(cls, user_id, perm):
Session.query(cls).filter(cls.user_id == user_id)\
.filter(cls.permission == perm).delete()
class UsersGroupRepoToPerm(Base, BaseModel):
__tablename__ = 'users_group_repo_to_perm'
__table_args__ = (UniqueConstraint('repository_id', 'users_group_id', 'permission_id'), {'extend_existing':True})
users_group_to_perm_id = Column("users_group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
return '<userGroup:%s => %s >' % (self.users_group, self.repository)
class UsersGroupToPerm(Base, BaseModel):
__tablename__ = 'users_group_to_perm'
def has_perm(cls, users_group_id, perm):
return Session.query(cls).filter(cls.users_group_id ==
users_group_id)\
.filter(cls.permission == perm)\
.scalar() is not None
def grant_perm(cls, users_group_id, perm):
new.users_group_id = users_group_id
def revoke_perm(cls, users_group_id, perm):
Session.query(cls).filter(cls.users_group_id == users_group_id)\
class GroupToPerm(Base, BaseModel):
__tablename__ = 'group_to_perm'
__table_args__ = (UniqueConstraint('group_id', 'permission_id'), {'extend_existing':True})
group_to_perm_id = Column("group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=False, unique=None, default=None)
class Statistics(Base, BaseModel):
__tablename__ = 'statistics'
__table_args__ = (UniqueConstraint('repository_id'), {'extend_existing':True})
stat_id = Column("stat_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=True, default=None)
stat_on_revision = Column("stat_on_revision", Integer(), nullable=False)
commit_activity = Column("commit_activity", LargeBinary(1000000), nullable=False)#JSON data
commit_activity_combined = Column("commit_activity_combined", LargeBinary(), nullable=False)#JSON data
languages = Column("languages", LargeBinary(1000000), nullable=False)#JSON data
repository = relationship('Repository', single_parent=True)
class UserFollowing(Base, BaseModel):
__tablename__ = 'user_followings'
__table_args__ = (UniqueConstraint('user_id', 'follows_repository_id'),
UniqueConstraint('user_id', 'follows_user_id')
, {'extend_existing':True})
user_following_id = Column("user_following_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
follows_repo_id = Column("follows_repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=None, default=None)
follows_user_id = Column("follows_user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
follows_from = Column('follows_from', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
user = relationship('User', primaryjoin='User.user_id==UserFollowing.user_id')
follows_user = relationship('User', primaryjoin='User.user_id==UserFollowing.follows_user_id')
follows_repository = relationship('Repository', order_by='Repository.repo_name')
def get_repo_followers(cls, repo_id):
return Session.query(cls).filter(cls.follows_repo_id == repo_id)
class CacheInvalidation(Base, BaseModel):
__tablename__ = 'cache_invalidation'
__table_args__ = (UniqueConstraint('cache_key'), {'extend_existing':True})
cache_id = Column("cache_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
cache_key = Column("cache_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
cache_args = Column("cache_args", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False)
def __init__(self, cache_key, cache_args=''):
self.cache_key = cache_key
self.cache_args = cache_args
self.cache_active = False
self.cache_id, self.cache_key)
class DbMigrateVersion(Base, BaseModel):
__tablename__ = 'db_migrate_version'
repository_id = Column('repository_id', String(250), primary_key=True)
repository_path = Column('repository_path', Text)
version = Column('version', Integer)
rhodecode.model.repo
~~~~~~~~~~~~~~~~~~~~
Repository model for rhodecode
:created_on: Jun 5, 2010
import shutil
from datetime import datetime
from sqlalchemy.orm import joinedload, make_transient
from vcs.backends import get_backend
from rhodecode.lib import safe_str
from rhodecode.model import BaseModel
from rhodecode.model.db import Repository, RepoToPerm, User, Permission, \
Statistics, UsersGroup, UsersGroupRepoToPerm, RhodeCodeUi, Group
from rhodecode.model.user import UserModel
class RepoModel(BaseModel):
def repos_path(self):
"""Get's the repositories root path from database
q = self.sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
return q.ui_value
def get(self, repo_id, cache=False):
repo = self.sa.query(Repository)\
.filter(Repository.repo_id == repo_id)
repo = repo.options(FromCache("sql_cache_short",
"get_repo_%s" % repo_id))
return repo.scalar()
def get_by_repo_name(self, repo_name, cache=False):
.filter(Repository.repo_name == repo_name)
"get_repo_%s" % repo_name))
def get_users_js(self):
users = self.sa.query(User).filter(User.active == True).all()
u_tmpl = '''{id:%s, fname:"%s", lname:"%s", nname:"%s"},'''
users_array = '[%s]' % '\n'.join([u_tmpl % (u.user_id, u.name,
u.lastname, u.username)
for u in users])
return users_array
def get_users_groups_js(self):
users_groups = self.sa.query(UsersGroup)\
.filter(UsersGroup.users_group_active == True).all()
g_tmpl = '''{id:%s, grname:"%s",grmembers:"%s"},'''
users_groups_array = '[%s]' % '\n'.join([g_tmpl % \
(gr.users_group_id, gr.users_group_name,
len(gr.members))
for gr in users_groups])
return users_groups_array
def update(self, repo_name, form_data):
cur_repo = self.get_by_repo_name(repo_name, cache=False)
#update permissions
for member, perm, member_type in form_data['perms_updates']:
if member_type == 'user':
r2p = self.sa.query(RepoToPerm)\
.filter(RepoToPerm.user == User.get_by_username(member))\
.filter(RepoToPerm.repository == cur_repo)\
.one()
r2p.permission = self.sa.query(Permission)\
.filter(Permission.permission_name ==
perm).scalar()
self.sa.add(r2p)
g2p = self.sa.query(UsersGroupRepoToPerm)\
.filter(UsersGroupRepoToPerm.users_group ==
UsersGroup.get_by_group_name(member))\
.filter(UsersGroupRepoToPerm.repository ==
cur_repo).one()
g2p.permission = self.sa.query(Permission)\
self.sa.add(g2p)
#set new permissions
for member, perm, member_type in form_data['perms_new']:
r2p = RepoToPerm()
r2p.repository = cur_repo
r2p.user = User.get_by_username(member)
.filter(Permission.
permission_name == perm)\
g2p = UsersGroupRepoToPerm()
g2p.repository = cur_repo
g2p.users_group = UsersGroup.get_by_group_name(member)
#update current repo
if k == 'user':
cur_repo.user = User.get_by_username(v)
elif k == 'repo_name':
cur_repo.repo_name = form_data['repo_name_full']
pass
elif k == 'repo_group':
cur_repo.group_id = v
setattr(cur_repo, k, v)
new_name = cur_repo.get_new_name(form_data['repo_name'])
cur_repo.repo_name = new_name
self.sa.add(cur_repo)
if repo_name != form_data['repo_name_full']:
if repo_name != new_name:
# rename repository
self.__rename_repo(old=repo_name,
new=form_data['repo_name_full'])
self.__rename_repo(old=repo_name, new=new_name)
self.sa.commit()
return cur_repo
self.sa.rollback()
def create(self, form_data, cur_user, just_db=False, fork=False):
if fork:
repo_name = form_data['fork_name']
org_name = form_data['repo_name']
org_full_name = org_name
org_name = repo_name = form_data['repo_name']
repo_name_full = form_data['repo_name_full']
new_repo = Repository()
new_repo.enable_statistics = False
if k == 'repo_name':
v = repo_name
v = repo_name_full
if k == 'repo_group':
k = 'group_id'
if k == 'description':
v = v or repo_name
setattr(new_repo, k, v)
parent_repo = self.sa.query(Repository)\
.filter(Repository.repo_name == org_full_name).one()
new_repo.fork = parent_repo
new_repo.user_id = cur_user.user_id
self.sa.add(new_repo)
#create default permission
repo_to_perm = RepoToPerm()
default = 'repository.read'
for p in User.get_by_username('default').user_perms:
if p.permission.permission_name.startswith('repository.'):
default = p.permission.permission_name
default_perm = 'repository.none' if form_data['private'] else default
repo_to_perm.permission_id = self.sa.query(Permission)\
.filter(Permission.permission_name == default_perm)\
.one().permission_id
repo_to_perm.repository = new_repo
repo_to_perm.user_id = User.get_by_username('default').user_id
self.sa.add(repo_to_perm)
if not just_db:
self.__create_repo(repo_name, form_data['repo_type'],
form_data['repo_group'],
form_data['clone_uri'])
#now automatically start following this repository as owner
ScmModel(self.sa).toggle_following_repo(new_repo.repo_id,
cur_user.user_id)
return new_repo
def create_fork(self, form_data, cur_user):
from rhodecode.lib.celerylib import tasks, run_task
run_task(tasks.create_repo_fork, form_data, cur_user)
def delete(self, repo):
self.sa.delete(repo)
self.__delete_repo(repo)
def delete_perm_user(self, form_data, repo_name):
self.sa.query(RepoToPerm)\
.filter(RepoToPerm.repository \
== self.get_by_repo_name(repo_name))\
.filter(RepoToPerm.user_id == form_data['user_id']).delete()
def delete_perm_users_group(self, form_data, repo_name):
self.sa.query(UsersGroupRepoToPerm)\
.filter(UsersGroupRepoToPerm.repository \
.filter(UsersGroupRepoToPerm.users_group_id \
== form_data['users_group_id']).delete()
def delete_stats(self, repo_name):
self.sa.query(Statistics)\
.filter(Statistics.repository == \
self.get_by_repo_name(repo_name)).delete()
def __create_repo(self, repo_name, alias, new_parent_id, clone_uri=False):
makes repository on filesystem. It's group aware means it'll create
a repository within a group, and alter the paths accordingly of
group location
:param alias:
:param parent_id:
:param clone_uri:
if new_parent_id:
paths = Group.get(new_parent_id).full_path.split(Group.url_sep())
new_parent_path = os.sep.join(paths)
new_parent_path = ''
repo_path = os.path.join(*map(lambda x:safe_str(x),
[self.repos_path, new_parent_path, repo_name]))
if is_valid_repo(repo_path, self.repos_path) is False:
log.info('creating repo %s in %s @ %s', repo_name, repo_path,
clone_uri)
backend(repo_path, create=True, src_url=clone_uri)
def __rename_repo(self, old, new):
renames repository on filesystem
:param old: old name
:param new: new name
log.info('renaming repo from %s to %s', old, new)
old_path = os.path.join(self.repos_path, old)
new_path = os.path.join(self.repos_path, new)
if os.path.isdir(new_path):
raise Exception('Was trying to rename to already existing dir %s' \
% new_path)
shutil.move(old_path, new_path)
def __delete_repo(self, repo):
removes repo from filesystem, the removal is acctually made by
added rm__ prefix into dir, and rename internat .hg/.git dirs so this
repository is no longer valid for rhodecode, can be undeleted later on
by reverting the renames on this repository
:param repo: repo object
rm_path = os.path.join(self.repos_path, repo.repo_name)
log.info("Removing %s", rm_path)
#disable hg/git
alias = repo.repo_type
shutil.move(os.path.join(rm_path, '.%s' % alias),
os.path.join(rm_path, 'rm__.%s' % alias))
#disable repo
shutil.move(rm_path, os.path.join(self.repos_path, 'rm__%s__%s' \
% (datetime.today()\
.strftime('%Y%m%d_%H%M%S_%f'),
repo.repo_name)))
rhodecode.model.user_group
~~~~~~~~~~~~~~~~~~~~~~~~~~
users groups model for RhodeCode
:created_on: Jan 25, 2011
from rhodecode.model.db import Group, RhodeCodeUi
class ReposGroupModel(BaseModel):
Get's the repositories root path from database
q = RhodeCodeUi.get_by_key('/').one()
def __create_group(self, group_name):
makes repositories group on filesystem
create_path = os.path.join(self.repos_path, group_name)
log.debug('creating new group in %s', create_path)
if os.path.isdir(create_path):
raise Exception('That directory already exists !')
os.makedirs(create_path)
def __rename_group(self, old, new):
Renames a group on filesystem
if old == new:
log.debug('skipping group rename')
log.debug('renaming repos group from %s to %s', old, new)
log.debug('renaming repos paths from %s to %s', old_path, new_path)
raise Exception('Was trying to rename to already '
'existing dir %s' % new_path)
def __delete_group(self, group):
Deletes a group from a filesystem
:param group: instance of group from database
paths = group.full_path.split(Group.url_sep())
paths = os.sep.join(paths)
rm_path = os.path.join(self.repos_path, paths)
os.rmdir(rm_path)
def create(self, form_data):
new_repos_group = Group()
new_repos_group.group_description = form_data['group_description']
new_repos_group.parent_group = Group.get(form_data['group_parent_id'])
new_repos_group.group_name = new_repos_group.get_new_name(form_data['group_name'])
self.sa.add(new_repos_group)
self.__create_group(new_repos_group.group_name)
return new_repos_group
def update(self, repos_group_id, form_data):
repos_group = Group.get(repos_group_id)
old_path = repos_group.full_path
#change properties
repos_group.group_description = form_data['group_description']
repos_group.parent_group = Group.get(form_data['group_parent_id'])
repos_group.group_name = repos_group.get_new_name(form_data['group_name'])
new_path = repos_group.full_path
self.sa.add(repos_group)
self.__rename_group(old_path, new_path)
# we need to get all repositories from this new group and
# rename them accordingly to new group path
for r in repos_group.repositories:
r.repo_name = r.get_new_name(r.just_name)
self.sa.add(r)
return repos_group
def delete(self, users_group_id):
users_group = Group.get(users_group_id)
self.sa.delete(users_group)
self.__delete_group(users_group)
from rhodecode.tests import *
from rhodecode.model.db import User
from rhodecode.lib import generate_api_key
from rhodecode.lib.auth import check_password
class TestLoginController(TestController):
def test_index(self):
response = self.app.get(url(controller='login', action='index'))
self.assertEqual(response.status, '200 OK')
# Test response...
def test_login_admin_ok(self):
response = self.app.post(url(controller='login', action='index'),
{'username':'test_admin',
'password':'test12'})
self.assertEqual(response.status, '302 Found')
self.assertEqual(response.session['rhodecode_user'].username ,
'test_admin')
response = response.follow()
self.assertTrue('%s repository' % HG_REPO in response.body)
def test_login_regular_ok(self):
{'username':'test_regular',
'test_regular')
self.assertTrue('<a title="Admin" href="/_admin">' not in response.body)
def test_login_ok_came_from(self):
test_came_from = '/_admin/users'
response = self.app.post(url(controller='login', action='index',
came_from=test_came_from),
self.assertTrue('Users administration' in response.body)
def test_login_short_password(self):
'password':'as'})
self.assertTrue('Enter 3 characters or more' in response.body)
def test_login_wrong_username_password(self):
{'username':'error',
self.assertEqual(response.status , '200 OK')
self.assertTrue('invalid user name' in response.body)
self.assertTrue('invalid password' in response.body)
# REGISTRATIONS
def test_register(self):
response = self.app.get(url(controller='login', action='register'))
self.assertTrue('Sign Up to RhodeCode' in response.body)
def test_register_err_same_username(self):
response = self.app.post(url(controller='login', action='register'),
'password':'test12',
'password_confirmation':'test12',
'email':'goodmail@domain.com',
'name':'test',
'lastname':'test'})
self.assertTrue('This username already exists' in response.body)
def test_register_err_same_email(self):
{'username':'test_admin_0',
'email':'test_admin@mail.com',
assert 'This e-mail address is already taken' in response.body
def test_register_err_same_email_case_sensitive(self):
{'username':'test_admin_1',
'email':'TesT_Admin@mail.COM',
def test_register_err_wrong_data(self):
{'username':'xs',
'password':'test',
'password_confirmation':'test',
'email':'goodmailm',
assert 'An email address must contain a single @' in response.body
assert 'Enter a value 6 characters long or more' in response.body
def test_register_err_username(self):
{'username':'error user',
assert ('Username may only contain '
'alphanumeric characters underscores, '
'periods or dashes and must begin with '
'alphanumeric character') in response.body
def test_register_err_case_sensitive(self):
{'username':'Test_Admin',
assert 'This username already exists' in response.body
self.assertTrue('An email address must contain a single @' in response.body)
def test_register_special_chars(self):
{'username':'xxxaxn',
'password':'ąćźżąśśśś',
'password_confirmation':'ąćźżąśśśś',
'email':'goodmailm@test.plx',
assert 'Invalid characters in password' in response.body
self.assertTrue('Invalid characters in password' in response.body)
def test_register_password_mismatch(self):
'password':'123qwe',
'password_confirmation':'qwe123',
'email':'goodmailm@test.plxa',
assert 'Passwords do not match' in response.body
def test_register_ok(self):
username = 'test_regular4'
password = 'qweqwe'
email = 'marcin@test.com'
name = 'testname'
lastname = 'testlastname'
{'username':username,
'password':password,
'password_confirmation':password,
'email':email,
'name':name,
'lastname':lastname})
self.assertEqual(response.status , '302 Found')
assert 'You have successfully registered into rhodecode' in response.session['flash'][0], 'No flash message about user registration'
ret = self.sa.query(User).filter(User.username == 'test_regular4').one()
assert ret.username == username , 'field mismatch %s %s' % (ret.username, username)
assert check_password(password, ret.password) == True , 'password mismatch'
assert ret.email == email , 'field mismatch %s %s' % (ret.email, email)
assert ret.name == name , 'field mismatch %s %s' % (ret.name, name)
assert ret.lastname == lastname , 'field mismatch %s %s' % (ret.lastname, lastname)
def test_forgot_password_wrong_mail(self):
response = self.app.post(url(controller='login', action='password_reset'),
{'email':'marcin@wrongmail.org', })
assert "This e-mail address doesn't exist" in response.body, 'Missing error message about wrong email'
def test_forgot_password(self):
response = self.app.get(url(controller='login',
action='password_reset'))
username = 'test_password_reset_1'
email = 'marcin@python-works.com'
name = 'passwd'
lastname = 'reset'
new = User()
new.username = username
new.password = password
new.email = email
new.name = name
new.lastname = lastname
new.api_key = generate_api_key(username)
self.sa.add(new)
response = self.app.post(url(controller='login',
action='password_reset'),
{'email':email, })
self.checkSessionFlash(response, 'Your password reset link was sent')
# BAD KEY
key = "bad"
action='password_reset_confirmation',
key=key))
self.assertTrue(response.location.endswith(url('reset_password')))
# GOOD KEY
key = User.get_by_username(username).api_key
self.assertTrue(response.location.endswith(url('login_home')))
self.checkSessionFlash(response,
('Your password reset was successful, '
'new password has been sent to your email'))
import unittest
from rhodecode.model.repos_group import ReposGroupModel
from rhodecode.model.db import Group
from rhodecode.model.db import Group, User
class TestReposGroups(unittest.TestCase):
def setUp(self):
self.g1 = self.__make_group('test1', skip_if_exists=True)
self.g2 = self.__make_group('test2', skip_if_exists=True)
self.g3 = self.__make_group('test3', skip_if_exists=True)
def tearDown(self):
print 'out'
def __check_path(self, *path):
path = [TESTS_TMP_PATH] + list(path)
path = os.path.join(*path)
return os.path.isdir(path)
def _check_folders(self):
print os.listdir(TESTS_TMP_PATH)
def __make_group(self, path, desc='desc', parent_id=None,
skip_if_exists=False):
gr = Group.get_by_group_name(path)
if gr and skip_if_exists:
return gr
form_data = dict(group_name=path,
group_description=desc,
group_parent_id=parent_id)
gr = ReposGroupModel().create(form_data)
def __delete_group(self, id_):
ReposGroupModel().delete(id_)
def __update_group(self, id_, path, desc='desc', parent_id=None):
gr = ReposGroupModel().update(id_, form_data)
def test_create_group(self):
g = self.__make_group('newGroup')
self.assertEqual(g.full_path, 'newGroup')
self.assertTrue(self.__check_path('newGroup'))
def test_create_same_name_group(self):
self.assertRaises(IntegrityError, lambda:self.__make_group('newGroup'))
def test_same_subgroup(self):
sg1 = self.__make_group('sub1', parent_id=self.g1.group_id)
self.assertEqual(sg1.parent_group, self.g1)
self.assertEqual(sg1.full_path, 'test1/sub1')
self.assertTrue(self.__check_path('test1', 'sub1'))
ssg1 = self.__make_group('subsub1', parent_id=sg1.group_id)
self.assertEqual(ssg1.parent_group, sg1)
self.assertEqual(ssg1.full_path, 'test1/sub1/subsub1')
self.assertTrue(self.__check_path('test1', 'sub1', 'subsub1'))
def test_remove_group(self):
sg1 = self.__make_group('deleteme')
self.__delete_group(sg1.group_id)
self.assertEqual(Group.get(sg1.group_id), None)
self.assertFalse(self.__check_path('deteteme'))
sg1 = self.__make_group('deleteme', parent_id=self.g1.group_id)
self.assertFalse(self.__check_path('test1', 'deteteme'))
def test_rename_single_group(self):
sg1 = self.__make_group('initial')
new_sg1 = self.__update_group(sg1.group_id, 'after')
self.assertTrue(self.__check_path('after'))
self.assertEqual(Group.get_by_group_name('initial'), None)
def test_update_group_parent(self):
sg1 = self.__make_group('initial', parent_id=self.g1.group_id)
new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g1.group_id)
self.assertTrue(self.__check_path('test1', 'after'))
self.assertEqual(Group.get_by_group_name('test1/initial'), None)
new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
self.assertTrue(self.__check_path('test3', 'after'))
self.assertEqual(Group.get_by_group_name('test3/initial'), None)
new_sg1 = self.__update_group(sg1.group_id, 'hello')
self.assertTrue(self.__check_path('hello'))
self.assertEqual(Group.get_by_group_name('hello'), new_sg1)
def test_subgrouping_with_repo(self):
g1 = self.__make_group('g1')
g2 = self.__make_group('g2')
# create new repo
form_data = dict(repo_name='john',
repo_name_full='john',
fork_name=None,
description=None,
repo_group=None,
private=False,
repo_type='hg',
clone_uri=None)
cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
self.assertEqual(r.repo_name, 'john')
# put repo into group
form_data = form_data
form_data['repo_group'] = g1.group_id
form_data['perms_new'] = []
form_data['perms_updates'] = []
RepoModel().update(r.repo_name, form_data)
self.assertEqual(r.repo_name, 'g1/john')
self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
self.assertTrue(self.__check_path('g2', 'g1'))
# test repo
self.assertEqual(r.repo_name, os.path.join('g2', 'g1', r.just_name))
Status change: