# -*- coding: utf-8 -*-
"""
rhodecode.controllers.admin.repos
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Admin controller for RhodeCode
:created_on: Apr 7, 2010
:author: marcink
:copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
import traceback
import formencode
from operator import itemgetter
from formencode import htmlfill
from paste.httpexceptions import HTTPInternalServerError
from pylons import request, response, session, tmpl_context as c, url
from pylons.controllers.util import abort, redirect
from pylons.i18n.translation import _
from rhodecode.lib import helpers as h
from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator, \
HasPermissionAnyDecorator
from rhodecode.lib.base import BaseController, render
from rhodecode.lib.utils import invalidate_cache, action_logger, repo_name_slug
from rhodecode.lib.helpers import get_token
from rhodecode.model.db import User, Repository, UserFollowing, Group
from rhodecode.model.forms import RepoForm
from rhodecode.model.scm import ScmModel
from rhodecode.model.repo import RepoModel
from sqlalchemy.exc import IntegrityError
log = logging.getLogger(__name__)
class ReposController(BaseController):
REST Controller styled on the Atom Publishing Protocol"""
# To properly map this controller, ensure your config/routing.py
# file has a resource setup:
# map.resource('repo', 'repos')
@LoginRequired()
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def __before__(self):
c.admin_user = session.get('admin_user')
c.admin_username = session.get('admin_username')
super(ReposController, self).__before__()
def __load_defaults(self):
c.repo_groups = Group.groups_choices()
c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
repo_model = RepoModel()
c.users_array = repo_model.get_users_js()
c.users_groups_array = repo_model.get_users_groups_js()
def __load_data(self, repo_name=None):
Load defaults settings for edit, and update
:param repo_name:
self.__load_defaults()
c.repo_info = db_repo = Repository.get_by_repo_name(repo_name)
repo = db_repo.scm_instance
if c.repo_info is None:
h.flash(_('%s repository is not mapped to db perhaps'
' it was created or renamed from the filesystem'
' please run the application again'
' in order to rescan repositories') % repo_name,
category='error')
return redirect(url('repos'))
c.default_user_id = User.get_by_username('default').user_id
c.in_public_journal = self.sa.query(UserFollowing)\
c.in_public_journal = UserFollowing.query()\
.filter(UserFollowing.user_id == c.default_user_id)\
.filter(UserFollowing.follows_repository == c.repo_info).scalar()
if c.repo_info.stats:
last_rev = c.repo_info.stats.stat_on_revision
else:
last_rev = 0
c.stats_revision = last_rev
c.repo_last_rev = repo.count() - 1 if repo.revisions else 0
if last_rev == 0 or c.repo_last_rev == 0:
c.stats_percentage = 0
c.stats_percentage = '%.2f' % ((float((last_rev)) /
c.repo_last_rev) * 100)
defaults = c.repo_info.get_dict()
group, repo_name = c.repo_info.groups_and_repo
defaults['repo_name'] = repo_name
defaults['repo_group'] = getattr(group[-1] if group else None,
'group_id', None)
#fill owner
if c.repo_info.user:
defaults.update({'user': c.repo_info.user.username})
replacement_user = self.sa.query(User)\
.filter(User.admin == True).first().username
defaults.update({'user': replacement_user})
#fill repository users
for p in c.repo_info.repo_to_perm:
defaults.update({'u_perm_%s' % p.user.username:
p.permission.permission_name})
#fill repository groups
for p in c.repo_info.users_group_to_perm:
defaults.update({'g_perm_%s' % p.users_group.users_group_name:
defaults = RepoModel()._get_defaults(repo_name)
return defaults
@HasPermissionAllDecorator('hg.admin')
def index(self, format='html'):
"""GET /repos: All items in the collection"""
# url('repos')
c.repos_list = ScmModel().get_repos(Repository.query()
.order_by(Repository.repo_name)
.all(), sort_key='name_sort')
return render('admin/repos/repos.html')
def create(self):
POST /repos: Create a new item"""
form_result = {}
try:
form_result = RepoForm(repo_groups=c.repo_groups_choices)()\
.to_python(dict(request.POST))
repo_model.create(form_result, self.rhodecode_user)
if form_result['clone_uri']:
h.flash(_('created repository %s from %s') \
% (form_result['repo_name'], form_result['clone_uri']),
category='success')
h.flash(_('created repository %s') % form_result['repo_name'],
if request.POST.get('user_created'):
#created by regular non admin user
action_logger(self.rhodecode_user, 'user_created_repo',
form_result['repo_name_full'], '', self.sa)
action_logger(self.rhodecode_user, 'admin_created_repo',
except formencode.Invalid, errors:
c.new_repo = errors.value['repo_name']
r = render('admin/repos/repo_add_create_repository.html')
r = render('admin/repos/repo_add.html')
return htmlfill.render(
r,
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8")
except Exception:
log.error(traceback.format_exc())
msg = _('error occurred during creation of repository %s') \
% form_result.get('repo_name')
h.flash(msg, category='error')
return redirect(url('home'))
def new(self, format='html'):
"""GET /repos/new: Form to create a new item"""
new_repo = request.GET.get('repo', '')
c.new_repo = repo_name_slug(new_repo)
return render('admin/repos/repo_add.html')
def update(self, repo_name):
PUT /repos/repo_name: Update an existing item"""
# Forms posted to this method should contain a hidden field:
# <input type="hidden" name="_method" value="PUT" />
# Or using helpers:
# h.form(url('repo', repo_name=ID),
# method='put')
# url('repo', repo_name=ID)
changed_name = repo_name
_form = RepoForm(edit=True, old_data={'repo_name': repo_name},
repo_groups=c.repo_groups_choices)()
form_result = _form.to_python(dict(request.POST))
repo = repo_model.update(repo_name, form_result)
invalidate_cache('get_repo_cached_%s' % repo_name)
h.flash(_('Repository %s updated successfully' % repo_name),
changed_name = repo.repo_name
action_logger(self.rhodecode_user, 'admin_updated_repo',
from rhodecode.controllers.api import JSONRPCController, JSONRPCError
from rhodecode.lib.auth import HasPermissionAllDecorator, HasPermissionAnyDecorator
from rhodecode.lib.auth import HasPermissionAllDecorator, \
from rhodecode.model.db import User, UsersGroup, Group, Repository
from rhodecode.model.user import UserModel
from rhodecode.model.repo_permission import RepositoryPermissionModel
from rhodecode.model.users_group import UsersGroupModel
from rhodecode.model import users_group
from rhodecode.model.repos_group import ReposGroupModel
from sqlalchemy.orm.exc import NoResultFound
class ApiController(JSONRPCController):
API Controller
Each method needs to have USER as argument this is then based on given
API_KEY propagated as instance of user object
Preferably this should be first argument also
Each function should also **raise** JSONRPCError for any
errors that happens
def pull(self, apiuser, repo):
Dispatch pull action on given repo
:param user:
:param repo:
if Repository.is_valid(repo) is False:
raise JSONRPCError('Unknown repo "%s"' % repo)
ScmModel().pull_changes(repo, self.rhodecode_user.username)
return 'Pulled from %s' % repo
raise JSONRPCError('Unable to pull changes from "%s"' % repo)
def get_user(self, apiuser, username):
""""
Get a user by username
:param apiuser
:param username
user = User.get_by_username(username)
if not user:
return None
return dict(id = user.user_id,
username = user.username,
firstname = user.name,
lastname = user.lastname,
email = user.email,
active = user.active,
admin = user.admin,
ldap = user.ldap_dn)
return dict(id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
active=user.active,
admin=user.admin,
ldap=user.ldap_dn)
def get_users(self, apiuser):
Get all users
result = []
for user in User.getAll():
result.append(dict(id = user.user_id,
ldap = user.ldap_dn))
result.append(dict(id=user.user_id,
ldap=user.ldap_dn))
return result
def create_user(self, apiuser, username, password, firstname,
lastname, email, active = True, admin = False, ldap_dn = None):
lastname, email, active=True, admin=False, ldap_dn=None):
Create new user
:param apiuser:
:param username:
:param password:
:param name:
:param lastname:
:param email:
:param active:
:param admin:
:param ldap_dn:
if self.get_user(apiuser, username):
raise JSONRPCError("user %s already exist" % username)
form_data = dict(username = username,
password = password,
active = active,
admin = admin,
name = firstname,
lastname = lastname,
email = email,
ldap_dn = ldap_dn)
form_data = dict(username=username,
password=password,
active=active,
admin=admin,
name=firstname,
lastname=lastname,
email=email,
ldap_dn=ldap_dn)
UserModel().create_ldap(username, password, ldap_dn, form_data)
return dict(msg = 'created new user %s' % username)
return dict(msg='created new user %s' % username)
raise JSONRPCError('failed to create user %s' % username)
def get_users_group(self, apiuser, group_name):
Get users group by name
:param group_name
users_group = UsersGroup.get_by_group_name(group_name)
if not users_group:
members = []
for user in users_group.members:
user = user.user
members.append(dict(id = user.user_id,
members.append(dict(id=user.user_id,
return dict(id = users_group.users_group_id,
name = users_group.users_group_name,
active = users_group.users_group_active,
members = members)
return dict(id=users_group.users_group_id,
name=users_group.users_group_name,
active=users_group.users_group_active,
members=members)
def get_users_groups(self, apiuser):
Get all users groups
for users_group in UsersGroup.getAll():
result.append(dict(id = users_group.users_group_id,
members = members))
result.append(dict(id=users_group.users_group_id,
members=members))
def create_users_group(self, apiuser, name, active = True):
def create_users_group(self, apiuser, name, active=True):
Creates an new usergroup
if self.get_users_group(apiuser, name):
raise JSONRPCError("users group %s already exist" % name)
form_data = dict(users_group_name = name,
users_group_active = active)
form_data = dict(users_group_name=name,
users_group_active=active)
ug = UsersGroup.create(form_data)
return dict(id = ug.users_group_id,
msg = 'created new users group %s' % name)
return dict(id=ug.users_group_id,
msg='created new users group %s' % name)
raise JSONRPCError('failed to create group %s' % name)
def add_user_to_users_group(self, apiuser, group_name, user_name):
Add a user to a group
:param user_name
raise JSONRPCError('unknown users group %s' % group_name)
user = User.get_by_username(user_name)
except NoResultFound:
raise JSONRPCError('unknown user %s' % user_name)
ugm = UsersGroupModel().add_user_to_group(users_group, user)
return dict(id = ugm.users_group_member_id,
msg = 'created new users group member')
return dict(id=ugm.users_group_member_id,
msg='created new users group member')
raise JSONRPCError('failed to create users group member')
@HasPermissionAnyDecorator('hg.admin')
def get_repo(self, apiuser, repo_name):
Get repository by name
:param repo_name
repo = Repository.get_by_repo_name(repo_name)
for user in repo.repo_to_perm:
perm = user.permission.permission_name
members.append(dict(type_ = "user",
id = user.user_id,
ldap = user.ldap_dn,
permission = perm))
members.append(dict(type_="user",
id=user.user_id,
ldap=user.ldap_dn,
permission=perm))
for users_group in repo.users_group_to_perm:
perm = users_group.permission.permission_name
users_group = users_group.users_group
members.append(dict(type_ = "users_group",
id = users_group.users_group_id,
members.append(dict(type_="users_group",
id=users_group.users_group_id,
return dict(id = repo.repo_id,
name = repo.repo_name,
type = repo.repo_type,
description = repo.description,
return dict(id=repo.repo_id,
name=repo.repo_name,
type=repo.repo_type,
description=repo.description,
def get_repos(self, apiuser):
Get all repositories
for repository in Repository.getAll():
result.append(dict(id = repository.repo_id,
name = repository.repo_name,
type = repository.repo_type,
description = repository.description))
result.append(dict(id=repository.repo_id,
name=repository.repo_name,
type=repository.repo_type,
description=repository.description))
def create_repo(self, apiuser, name, owner_name, description = '', repo_type = 'hg', \
private = False):
def create_repo(self, apiuser, name, owner_name, description='',
repo_type='hg', private=False):
Create a repository
:param name
:param description
:param type
:param private
:param owner_name
owner = User.get_by_username(owner_name)
raise JSONRPCError('unknown user %s' % owner)
if self.get_repo(apiuser, name):
raise JSONRPCError("repo %s already exist" % name)
groups = name.split('/')
real_name = groups[-1]
groups = groups[:-1]
parent_id = None
for g in groups:
group = Group.get_by_group_name(g)
if not group:
group = ReposGroupModel().create(dict(group_name = g,
group_description = '',
group_parent_id = parent_id))
group = ReposGroupModel().create(dict(group_name=g,
group_description='',
group_parent_id=parent_id))
parent_id = group.group_id
RepoModel().create(dict(repo_name = real_name,
repo_name_full = name,
description = description,
private = private,
repo_type = repo_type,
repo_group = parent_id,
clone_uri = None), owner)
RepoModel().create(dict(repo_name=real_name,
repo_name_full=name,
description=description,
private=private,
repo_type=repo_type,
repo_group=parent_id,
clone_uri=None), owner)
raise JSONRPCError('failed to create repository %s' % name)
def add_user_to_repo(self, apiuser, repo_name, user_name, perm):
Add permission for a user to a repository
:param perm
raise JSONRPCError('unknown repository %s' % repo)
raise JSONRPCError('unknown user %s' % user)
RepositoryPermissionModel().update_or_delete_user_permission(repo, user, perm)
RepositoryPermissionModel()\
.update_or_delete_user_permission(repo, user, perm)
raise JSONRPCError('failed to edit permission %(repo)s for %(user)s'
% dict(user = user_name, repo = repo_name))
% dict(user=user_name, repo=repo_name))
rhodecode.controllers.settings
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Settings controller for rhodecode
:created_on: Jun 30, 2010
from pylons import tmpl_context as c, request, url
from pylons.controllers.util import redirect
import rhodecode.lib.helpers as h
from rhodecode.lib.auth import LoginRequired, HasRepoPermissionAllDecorator, \
HasRepoPermissionAnyDecorator, NotAnonymous
from rhodecode.lib.base import BaseRepoController, render
from rhodecode.lib.utils import invalidate_cache, action_logger
from rhodecode.model.forms import RepoSettingsForm, RepoForkForm
from rhodecode.model.db import User
from rhodecode.model.db import Group
class SettingsController(BaseRepoController):
super(SettingsController, self).__before__()
@HasRepoPermissionAllDecorator('repository.admin')
def index(self, repo_name):
c.repo_info = repo = repo_model.get_by_repo_name(repo_name)
if not repo:
' it was created or renamed from the file system'
render('settings/repo_settings.html'),
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
_form = RepoSettingsForm(edit=True,
old_data={'repo_name': repo_name})()
old_data={'repo_name': repo_name},
repo_model.update(repo_name, form_result)
changed_name = form_result['repo_name']
changed_name = form_result['repo_name_full']
action_logger(self.rhodecode_user, 'user_updated_repo',
changed_name, '', self.sa)
c.repo_info = repo_model.get_by_repo_name(repo_name)
errors.value.update({'user': c.repo_info.user.username})
h.flash(_('error occurred during update of repository %s') \
% repo_name, category='error')
return redirect(url('repo_settings_home', repo_name=changed_name))
def delete(self, repo_name):
"""DELETE /repos/repo_name: Delete an existing item"""
# <input type="hidden" name="_method" value="DELETE" />
# h.form(url('repo_settings_delete', repo_name=ID),
# method='delete')
# url('repo_settings_delete', repo_name=ID)
repo = repo_model.get_by_repo_name(repo_name)
' it was moved or renamed from the filesystem'
action_logger(self.rhodecode_user, 'user_deleted_repo',
repo_name, '', self.sa)
repo_model.delete(repo)
h.flash(_('deleted repository %s') % repo_name, category='success')
h.flash(_('An error occurred during deletion of %s') % repo_name,
@NotAnonymous()
@HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
'repository.admin')
def fork(self, repo_name):
return render('settings/repo_fork.html')
def fork_create(self, repo_name):
_form = RepoForkForm(old_data={'repo_type': c.repo_info.repo_type})()
form_result.update({'repo_name': repo_name})
repo_model.create_fork(form_result, self.rhodecode_user)
h.flash(_('forked %s repository as %s') \
% (repo_name, form_result['fork_name']),
action_logger(self.rhodecode_user,
'user_forked_repo:%s' % form_result['fork_name'],
c.new_repo = errors.value['fork_name']
r = render('settings/repo_fork.html')
rhodecode.model.db
~~~~~~~~~~~~~~~~~~
Database Models for RhodeCode
:created_on: Apr 08, 2010
import os
import datetime
from datetime import date
from sqlalchemy import *
from sqlalchemy.exc import DatabaseError
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, backref, joinedload, class_mapper, \
validates
from sqlalchemy.orm.interfaces import MapperExtension
from sqlalchemy.orm import relationship, joinedload, class_mapper, validates
from beaker.cache import cache_region, region_invalidate
from vcs import get_backend
from vcs.utils.helpers import get_scm
from vcs.exceptions import VCSError
from vcs.utils.lazy import LazyProperty
from rhodecode.lib import str2bool, safe_str, get_changeset_safe, \
generate_api_key, safe_unicode
from rhodecode.lib.exceptions import UsersGroupsAssignedException
from rhodecode.lib.compat import json
from rhodecode.model.meta import Base, Session
from rhodecode.model.caching_query import FromCache
#==============================================================================
# BASE CLASSES
class ModelSerializer(json.JSONEncoder):
Simple Serializer for JSON,
usage::
to make object customized for serialization implement a __json__
method that will return a dict for serialization into json
example::
class Task(object):
def __init__(self, name, value):
self.name = name
self.value = value
def __json__(self):
return dict(name=self.name,
value=self.value)
def default(self, obj):
if hasattr(obj, '__json__'):
return obj.__json__()
return json.JSONEncoder.default(self, obj)
class BaseModel(object):
"""Base Model for all classess
@classmethod
def _get_keys(cls):
"""return column names for this model """
return class_mapper(cls).c.keys()
def get_dict(self):
"""return dict with keys and values corresponding
to this model data """
d = {}
for k in self._get_keys():
d[k] = getattr(self, k)
return d
def get_appstruct(self):
"""return list with keys and values tupples corresponding
l = []
l.append((k, getattr(self, k),))
return l
def populate_obj(self, populate_dict):
"""populate model with data from given populate_dict"""
if k in populate_dict:
setattr(self, k, populate_dict[k])
def query(cls):
return Session.query(cls)
def get(cls, id_):
return cls.query().get(id_)
if id_:
def getAll(cls):
return cls.query().all()
def delete(cls, id_):
obj = cls.query().get(id_)
Session.delete(obj)
Session.commit()
class RhodeCodeSettings(Base, BaseModel):
__tablename__ = 'rhodecode_settings'
__table_args__ = (UniqueConstraint('app_settings_name'), {'extend_existing':True})
app_settings_id = Column("app_settings_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
app_settings_name = Column("app_settings_name", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
_app_settings_value = Column("app_settings_value", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
app_settings_id = Column("app_settings_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
app_settings_name = Column("app_settings_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
_app_settings_value = Column("app_settings_value", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
def __init__(self, k = '', v = ''):
def __init__(self, k='', v=''):
self.app_settings_name = k
self.app_settings_value = v
@validates('_app_settings_value')
def validate_settings_value(self, key, val):
assert type(val) == unicode
return val
@hybrid_property
def app_settings_value(self):
v = self._app_settings_value
if v == 'ldap_active':
v = str2bool(v)
return v
@app_settings_value.setter
def app_settings_value(self, val):
Setter that will always make sure we use unicode in app_settings_value
:param val:
self._app_settings_value = safe_unicode(val)
def __repr__(self):
return "<%s('%s:%s')>" % (self.__class__.__name__,
self.app_settings_name, self.app_settings_value)
def get_by_name(cls, ldap_key):
return cls.query()\
.filter(cls.app_settings_name == ldap_key).scalar()
def get_app_settings(cls, cache = False):
def get_app_settings(cls, cache=False):
ret = cls.query()
if cache:
ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
if not ret:
raise Exception('Could not get application settings !')
settings = {}
for each in ret:
settings['rhodecode_' + each.app_settings_name] = \
each.app_settings_value
return settings
def get_ldap_settings(cls, cache = False):
def get_ldap_settings(cls, cache=False):
ret = cls.query()\
.filter(cls.app_settings_name.startswith('ldap_')).all()
fd = {}
for row in ret:
fd.update({row.app_settings_name:row.app_settings_value})
return fd
class RhodeCodeUi(Base, BaseModel):
__tablename__ = 'rhodecode_ui'
__table_args__ = (UniqueConstraint('ui_key'), {'extend_existing':True})
HOOK_UPDATE = 'changegroup.update'
HOOK_REPO_SIZE = 'changegroup.repo_size'
HOOK_PUSH = 'pretxnchangegroup.push_logger'
HOOK_PULL = 'preoutgoing.pull_logger'
ui_id = Column("ui_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
ui_section = Column("ui_section", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
ui_key = Column("ui_key", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
ui_value = Column("ui_value", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
ui_active = Column("ui_active", Boolean(), nullable = True, unique = None, default = True)
ui_id = Column("ui_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
ui_section = Column("ui_section", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_key = Column("ui_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_value = Column("ui_value", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_active = Column("ui_active", Boolean(), nullable=True, unique=None, default=True)
def get_by_key(cls, key):
return cls.query().filter(cls.ui_key == key)
def get_builtin_hooks(cls):
q = cls.query()
q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE,
cls.HOOK_REPO_SIZE,
cls.HOOK_PUSH, cls.HOOK_PULL]))
return q.all()
def get_custom_hooks(cls):
q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE,
q = q.filter(cls.ui_section == 'hooks')
def create_or_update_hook(cls, key, val):
new_ui = cls.get_by_key(key).scalar() or cls()
new_ui.ui_section = 'hooks'
new_ui.ui_active = True
new_ui.ui_key = key
new_ui.ui_value = val
Session.add(new_ui)
class User(Base, BaseModel):
__tablename__ = 'users'
__table_args__ = (UniqueConstraint('username'), UniqueConstraint('email'), {'extend_existing':True})
user_id = Column("user_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
username = Column("username", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
password = Column("password", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
active = Column("active", Boolean(), nullable = True, unique = None, default = None)
admin = Column("admin", Boolean(), nullable = True, unique = None, default = False)
name = Column("name", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
lastname = Column("lastname", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
email = Column("email", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
last_login = Column("last_login", DateTime(timezone = False), nullable = True, unique = None, default = None)
ldap_dn = Column("ldap_dn", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
api_key = Column("api_key", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=None)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
ldap_dn = Column("ldap_dn", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_log = relationship('UserLog', cascade = 'all')
user_perms = relationship('UserToPerm', primaryjoin = "User.user_id==UserToPerm.user_id", cascade = 'all')
user_log = relationship('UserLog', cascade='all')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
user_followers = relationship('UserFollowing', primaryjoin = 'UserFollowing.follows_user_id==User.user_id', cascade = 'all')
repo_to_perm = relationship('RepoToPerm', primaryjoin = 'RepoToPerm.user_id==User.user_id', cascade = 'all')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
repo_to_perm = relationship('RepoToPerm', primaryjoin='RepoToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UsersGroupMember', cascade = 'all')
group_member = relationship('UsersGroupMember', cascade='all')
@property
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
def short_contact(self):
return '%s %s' % (self.name, self.lastname)
def is_admin(self):
return self.admin
return "<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
except:
return self.__class__.__name__
def get_by_username(cls, username, case_insensitive = False):
def get_by_username(cls, username, case_insensitive=False):
if case_insensitive:
return Session.query(cls).filter(cls.username.ilike(username)).scalar()
return Session.query(cls).filter(cls.username == username).scalar()
def get_by_api_key(cls, api_key):
return cls.query().filter(cls.api_key == api_key).one()
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
Session.add(self)
log.debug('updated user %s lastlogin', self.username)
def create(cls, form_data):
from rhodecode.lib.auth import get_crypt_password
new_user = cls()
for k, v in form_data.items():
if k == 'password':
v = get_crypt_password(v)
setattr(new_user, k, v)
new_user.api_key = generate_api_key(form_data['username'])
Session.add(new_user)
return new_user
Session.rollback()
raise
class UserLog(Base, BaseModel):
__tablename__ = 'user_logs'
__table_args__ = {'extend_existing':True}
user_log_id = Column("user_log_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable = False, unique = None, default = None)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable = False, unique = None, default = None)
repository_name = Column("repository_name", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
user_ip = Column("user_ip", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
action = Column("action", UnicodeText(length = 1200000, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
action_date = Column("action_date", DateTime(timezone = False), nullable = True, unique = None, default = None)
user_log_id = Column("user_log_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
repository_name = Column("repository_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_ip = Column("user_ip", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action = Column("action", UnicodeText(length=1200000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action_date = Column("action_date", DateTime(timezone=False), nullable=True, unique=None, default=None)
def action_as_day(self):
return date(*self.action_date.timetuple()[:3])
user = relationship('User')
repository = relationship('Repository')
class UsersGroup(Base, BaseModel):
__tablename__ = 'users_groups'
users_group_id = Column("users_group_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
users_group_name = Column("users_group_name", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = False, unique = True, default = None)
users_group_active = Column("users_group_active", Boolean(), nullable = True, unique = None, default = None)
users_group_id = Column("users_group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_name = Column("users_group_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
users_group_active = Column("users_group_active", Boolean(), nullable=True, unique=None, default=None)
members = relationship('UsersGroupMember', cascade = "all, delete, delete-orphan", lazy = "joined")
members = relationship('UsersGroupMember', cascade="all, delete, delete-orphan", lazy="joined")
return '<userGroup(%s)>' % (self.users_group_name)
def get_by_group_name(cls, group_name, cache = False, case_insensitive = False):
def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
gr = cls.query()\
.filter(cls.users_group_name.ilike(group_name))
.filter(cls.users_group_name == group_name)
gr = gr.options(FromCache("sql_cache_short",
"get_user_%s" % group_name))
return gr.scalar()
def get(cls, users_group_id, cache = False):
def get(cls, users_group_id, cache=False):
users_group = cls.query()
users_group = users_group.options(FromCache("sql_cache_short",
"get_users_group_%s" % users_group_id))
return users_group.get(users_group_id)
new_users_group = cls()
setattr(new_users_group, k, v)
Session.add(new_users_group)
return new_users_group
def update(cls, users_group_id, form_data):
users_group = cls.get(users_group_id, cache = False)
users_group = cls.get(users_group_id, cache=False)
if k == 'users_group_members':
users_group.members = []
Session.flush()
members_list = []
if v:
v = [v] if isinstance(v, basestring) else v
for u_id in set(v):
member = UsersGroupMember(users_group_id, u_id)
members_list.append(member)
setattr(users_group, 'members', members_list)
setattr(users_group, k, v)
Session.add(users_group)
def delete(cls, users_group_id):
# check if this group is not assigned to repo
assigned_groups = UsersGroupRepoToPerm.query()\
.filter(UsersGroupRepoToPerm.users_group_id ==
users_group_id).all()
if assigned_groups:
raise UsersGroupsAssignedException('Group assigned to %s' %
assigned_groups)
Session.delete(users_group)
class UsersGroupMember(Base, BaseModel):
__tablename__ = 'users_groups_members'
users_group_member_id = Column("users_group_member_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable = False, unique = None, default = None)
users_group_member_id = Column("users_group_member_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
user = relationship('User', lazy = 'joined')
user = relationship('User', lazy='joined')
users_group = relationship('UsersGroup')
def __init__(self, gr_id = '', u_id = ''):
def __init__(self, gr_id='', u_id=''):
self.users_group_id = gr_id
self.user_id = u_id
@staticmethod
def add_user_to_group(group, user):
ugm = UsersGroupMember()
ugm.users_group = group
ugm.user = user
Session.add(ugm)
return ugm
class Repository(Base, BaseModel):
__tablename__ = 'repositories'
__table_args__ = (UniqueConstraint('repo_name'), {'extend_existing':True},)
repo_id = Column("repo_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
repo_name = Column("repo_name", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = False, unique = True, default = None)
clone_uri = Column("clone_uri", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = False, default = None)
repo_type = Column("repo_type", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = False, unique = False, default = 'hg')
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable = False, unique = False, default = None)
private = Column("private", Boolean(), nullable = True, unique = None, default = None)
enable_statistics = Column("statistics", Boolean(), nullable = True, unique = None, default = True)
enable_downloads = Column("downloads", Boolean(), nullable = True, unique = None, default = True)
description = Column("description", String(length = 10000, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
created_on = Column('created_on', DateTime(timezone = False), nullable = True, unique = None, default = datetime.datetime.now)
repo_id = Column("repo_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repo_name = Column("repo_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
clone_uri = Column("clone_uri", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default='hg')
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
private = Column("private", Boolean(), nullable=True, unique=None, default=None)
enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable = True, unique = False, default = None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable = True, unique = False, default = None)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
fork = relationship('Repository', remote_side = repo_id)
fork = relationship('Repository', remote_side=repo_id)
group = relationship('Group')
repo_to_perm = relationship('RepoToPerm', cascade = 'all', order_by = 'RepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade = 'all')
stats = relationship('Statistics', cascade = 'all', uselist = False)
repo_to_perm = relationship('RepoToPerm', cascade='all', order_by='RepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
followers = relationship('UserFollowing', primaryjoin = 'UserFollowing.follows_repo_id==Repository.repo_id', cascade = 'all')
followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id', cascade='all')
logs = relationship('UserLog', cascade = 'all')
logs = relationship('UserLog', cascade='all')
self.repo_id, self.repo_name)
def url_sep(cls):
return '/'
def get_by_repo_name(cls, repo_name):
q = Session.query(cls).filter(cls.repo_name == repo_name)
q = q.options(joinedload(Repository.fork))\
.options(joinedload(Repository.user))\
.options(joinedload(Repository.group))
return q.one()
def get_repo_forks(cls, repo_id):
return cls.query().filter(Repository.fork_id == repo_id)
def base_path(cls):
Returns base path when all repos are stored
:param cls:
q = Session.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key ==
cls.url_sep())
q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
def just_name(self):
return self.repo_name.split(Repository.url_sep())[-1]
def groups_with_parents(self):
groups = []
if self.group is None:
return groups
cur_gr = self.group
groups.insert(0, cur_gr)
while 1:
gr = getattr(cur_gr, 'parent_group', None)
cur_gr = cur_gr.parent_group
if gr is None:
break
groups.insert(0, gr)
def groups_and_repo(self):
return self.groups_with_parents, self.just_name
@LazyProperty
def repo_path(self):
Returns base full path for that repository means where it actually
exists on a filesystem
Repository.url_sep())
def repo_full_path(self):
p = [self.repo_path]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split(Repository.url_sep())
return os.path.join(*p)
def get_new_name(self, repo_name):
returns new full repository name based on assigned group and new new
:param group_name:
path_prefix = self.group.full_path_splitted if self.group else []
return Repository.url_sep().join(path_prefix + [repo_name])
def _ui(self):
Creates an db based ui object for this repository
from mercurial import ui
from mercurial import config
baseui = ui.ui()
@@ -637,434 +635,435 @@ class Repository(Base, BaseModel):
from rhodecode.lib.utils import is_valid_repo
return is_valid_repo(repo_name, cls.base_path())
#==========================================================================
# SCM PROPERTIES
def get_changeset(self, rev):
return get_changeset_safe(self.scm_instance, rev)
def tip(self):
return self.get_changeset('tip')
def author(self):
return self.tip.author
def last_change(self):
return self.scm_instance.last_change
# SCM CACHE INSTANCE
def invalidate(self):
Returns Invalidation object if this repo should be invalidated
None otherwise. `cache_active = False` means that this cache
state is not valid and needs to be invalidated
return CacheInvalidation.query()\
.filter(CacheInvalidation.cache_key == self.repo_name)\
.filter(CacheInvalidation.cache_active == False)\
.scalar()
def set_invalidate(self):
set a cache for invalidation for this instance
inv = CacheInvalidation.query()\
if inv is None:
inv = CacheInvalidation(self.repo_name)
inv.cache_active = True
Session.add(inv)
def scm_instance(self):
return self.__get_instance()
def scm_instance_cached(self):
@cache_region('long_term')
def _c(repo_name):
# TODO: remove this trick when beaker 1.6 is released
# and have fixed this issue with not supporting unicode keys
rn = safe_str(self.repo_name)
inv = self.invalidate
if inv is not None:
region_invalidate(_c, None, rn)
# update our cache
return _c(rn)
def __get_instance(self):
repo_full_path = self.repo_full_path
alias = get_scm(repo_full_path)[0]
log.debug('Creating instance of %s repository', alias)
backend = get_backend(alias)
except VCSError:
log.error('Perhaps this repository is in db and not in '
'filesystem run rescan repositories with '
'"destroy old data " option from admin panel')
return
if alias == 'hg':
repo = backend(safe_str(repo_full_path), create = False,
baseui = self._ui)
repo = backend(safe_str(repo_full_path), create=False,
baseui=self._ui)
#skip hidden web repository
if repo._get_hidden():
repo = backend(repo_full_path, create = False)
repo = backend(repo_full_path, create=False)
return repo
class Group(Base, BaseModel):
__tablename__ = 'groups'
__table_args__ = (UniqueConstraint('group_name', 'group_parent_id'),
CheckConstraint('group_id != group_parent_id'), {'extend_existing':True},)
__mapper_args__ = {'order_by':'group_name'}
group_id = Column("group_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
group_name = Column("group_name", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = False, unique = True, default = None)
group_parent_id = Column("group_parent_id", Integer(), ForeignKey('groups.group_id'), nullable = True, unique = None, default = None)
group_description = Column("group_description", String(length = 10000, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
group_id = Column("group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
group_name = Column("group_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
group_parent_id = Column("group_parent_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=None, default=None)
group_description = Column("group_description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
parent_group = relationship('Group', remote_side = group_id)
parent_group = relationship('Group', remote_side=group_id)
def __init__(self, group_name = '', parent_group = None):
def __init__(self, group_name='', parent_group=None):
self.group_name = group_name
self.parent_group = parent_group
return "<%s('%s:%s')>" % (self.__class__.__name__, self.group_id,
self.group_name)
def groups_choices(cls):
from webhelpers.html import literal as _literal
repo_groups = [('', '')]
sep = ' » '
_name = lambda k: _literal(sep.join(k))
repo_groups.extend([(x.group_id, _name(x.full_path_splitted))
for x in cls.query().all()])
repo_groups = sorted(repo_groups, key = lambda t: t[1].split(sep)[0])
repo_groups = sorted(repo_groups, key=lambda t: t[1].split(sep)[0])
return repo_groups
.filter(cls.group_name.ilike(group_name))
.filter(cls.group_name == group_name)
"get_group_%s" % group_name))
def parents(self):
parents_recursion_limit = 5
if self.parent_group is None:
cur_gr = self.parent_group
cnt = 0
cnt += 1
if cnt == parents_recursion_limit:
# this will prevent accidental infinit loops
log.error('group nested more than %s' %
parents_recursion_limit)
def children(self):
return Group.query().filter(Group.parent_group == self)
def name(self):
return self.group_name.split(Group.url_sep())[-1]
def full_path(self):
return Group.url_sep().join([g.group_name for g in self.parents] +
[self.group_name])
return self.group_name
def full_path_splitted(self):
return self.group_name.split(Group.url_sep())
def repositories(self):
return Repository.query().filter(Repository.group == self)
def repositories_recursive_count(self):
cnt = self.repositories.count()
def children_count(group):
for child in group.children:
cnt += child.repositories.count()
cnt += children_count(child)
return cnt
return cnt + children_count(self)
def get_new_name(self, group_name):
returns new full group name based on parent and new name
path_prefix = self.parent_group.full_path_splitted if self.parent_group else []
path_prefix = (self.parent_group.full_path_splitted if
self.parent_group else [])
return Group.url_sep().join(path_prefix + [group_name])
class Permission(Base, BaseModel):
__tablename__ = 'permissions'
permission_id = Column("permission_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
permission_name = Column("permission_name", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
permission_longname = Column("permission_longname", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
permission_id = Column("permission_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
permission_name = Column("permission_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
permission_longname = Column("permission_longname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
self.permission_id, self.permission_name)
return cls.query().filter(cls.permission_name == key).scalar()
class RepoToPerm(Base, BaseModel):
__tablename__ = 'repo_to_perm'
__table_args__ = (UniqueConstraint('user_id', 'repository_id'), {'extend_existing':True})
repo_to_perm_id = Column("repo_to_perm_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable = False, unique = None, default = None)
repo_to_perm_id = Column("repo_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
permission_id = Column("permission_id", Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
permission = relationship('Permission')
class UserToPerm(Base, BaseModel):
__tablename__ = 'user_to_perm'
__table_args__ = (UniqueConstraint('user_id', 'permission_id'), {'extend_existing':True})
user_to_perm_id = Column("user_to_perm_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
user_to_perm_id = Column("user_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
def has_perm(cls, user_id, perm):
if not isinstance(perm, Permission):
raise Exception('perm needs to be an instance of Permission class')
return cls.query().filter(cls.user_id == user_id)\
.filter(cls.permission == perm).scalar() is not None
def grant_perm(cls, user_id, perm):
new = cls()
new.user_id = user_id
new.permission = perm
Session.add(new)
def revoke_perm(cls, user_id, perm):
cls.query().filter(cls.user_id == user_id)\
.filter(cls.permission == perm).delete()
class UsersGroupRepoToPerm(Base, BaseModel):
__tablename__ = 'users_group_repo_to_perm'
__table_args__ = (UniqueConstraint('repository_id', 'users_group_id', 'permission_id'), {'extend_existing':True})
users_group_to_perm_id = Column("users_group_to_perm_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
users_group_to_perm_id = Column("users_group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
return '<userGroup:%s => %s >' % (self.users_group, self.repository)
class UsersGroupToPerm(Base, BaseModel):
__tablename__ = 'users_group_to_perm'
def has_perm(cls, users_group_id, perm):
return cls.query().filter(cls.users_group_id ==
users_group_id)\
.filter(cls.permission == perm)\
.scalar() is not None
def grant_perm(cls, users_group_id, perm):
new.users_group_id = users_group_id
def revoke_perm(cls, users_group_id, perm):
cls.query().filter(cls.users_group_id == users_group_id)\
class GroupToPerm(Base, BaseModel):
__tablename__ = 'group_to_perm'
__table_args__ = (UniqueConstraint('group_id', 'permission_id'), {'extend_existing':True})
group_to_perm_id = Column("group_to_perm_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable = False, unique = None, default = None)
group_to_perm_id = Column("group_to_perm_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=False, unique=None, default=None)
class Statistics(Base, BaseModel):
__tablename__ = 'statistics'
__table_args__ = (UniqueConstraint('repository_id'), {'extend_existing':True})
stat_id = Column("stat_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable = False, unique = True, default = None)
stat_on_revision = Column("stat_on_revision", Integer(), nullable = False)
commit_activity = Column("commit_activity", LargeBinary(1000000), nullable = False)#JSON data
commit_activity_combined = Column("commit_activity_combined", LargeBinary(), nullable = False)#JSON data
languages = Column("languages", LargeBinary(1000000), nullable = False)#JSON data
stat_id = Column("stat_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=True, default=None)
stat_on_revision = Column("stat_on_revision", Integer(), nullable=False)
commit_activity = Column("commit_activity", LargeBinary(1000000), nullable=False)#JSON data
commit_activity_combined = Column("commit_activity_combined", LargeBinary(), nullable=False)#JSON data
languages = Column("languages", LargeBinary(1000000), nullable=False)#JSON data
repository = relationship('Repository', single_parent = True)
repository = relationship('Repository', single_parent=True)
class UserFollowing(Base, BaseModel):
__tablename__ = 'user_followings'
__table_args__ = (UniqueConstraint('user_id', 'follows_repository_id'),
UniqueConstraint('user_id', 'follows_user_id')
, {'extend_existing':True})
user_following_id = Column("user_following_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
follows_repo_id = Column("follows_repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable = True, unique = None, default = None)
follows_user_id = Column("follows_user_id", Integer(), ForeignKey('users.user_id'), nullable = True, unique = None, default = None)
follows_from = Column('follows_from', DateTime(timezone = False), nullable = True, unique = None, default = datetime.datetime.now)
user_following_id = Column("user_following_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
follows_repo_id = Column("follows_repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=None, default=None)
follows_user_id = Column("follows_user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
follows_from = Column('follows_from', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
user = relationship('User', primaryjoin = 'User.user_id==UserFollowing.user_id')
user = relationship('User', primaryjoin='User.user_id==UserFollowing.user_id')
follows_user = relationship('User', primaryjoin = 'User.user_id==UserFollowing.follows_user_id')
follows_repository = relationship('Repository', order_by = 'Repository.repo_name')
follows_user = relationship('User', primaryjoin='User.user_id==UserFollowing.follows_user_id')
follows_repository = relationship('Repository', order_by='Repository.repo_name')
def get_repo_followers(cls, repo_id):
return cls.query().filter(cls.follows_repo_id == repo_id)
class CacheInvalidation(Base, BaseModel):
__tablename__ = 'cache_invalidation'
__table_args__ = (UniqueConstraint('cache_key'), {'extend_existing':True})
cache_id = Column("cache_id", Integer(), nullable = False, unique = True, default = None, primary_key = True)
cache_key = Column("cache_key", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
cache_args = Column("cache_args", String(length = 255, convert_unicode = False, assert_unicode = None), nullable = True, unique = None, default = None)
cache_active = Column("cache_active", Boolean(), nullable = True, unique = None, default = False)
cache_id = Column("cache_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
cache_key = Column("cache_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
cache_args = Column("cache_args", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
cache_active = Column("cache_active", Boolean(), nullable=True, unique=None, default=False)
def __init__(self, cache_key, cache_args = ''):
def __init__(self, cache_key, cache_args=''):
self.cache_key = cache_key
self.cache_args = cache_args
self.cache_active = False
self.cache_id, self.cache_key)
class DbMigrateVersion(Base, BaseModel):
__tablename__ = 'db_migrate_version'
repository_id = Column('repository_id', String(250), primary_key = True)
repository_id = Column('repository_id', String(250), primary_key=True)
repository_path = Column('repository_path', Text)
version = Column('version', Integer)
@@ -100,252 +100,253 @@ def ValidUsersGroup(edit, old_data):
if old_ugname != value or not edit:
if UsersGroup.get_by_group_name(value, cache=False,
case_insensitive=True):
raise formencode.Invalid(_('This users group '
'already exists') , value,
state)
if re.match(r'^[a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+$', value) is None:
raise formencode.Invalid(_('Group name may only contain '
'alphanumeric characters '
'underscores, periods or dashes '
'and must begin with alphanumeric '
'character'), value, state)
return _ValidUsersGroup
def ValidReposGroup(edit, old_data):
class _ValidReposGroup(formencode.validators.FancyValidator):
def validate_python(self, value, state):
#TODO WRITE VALIDATIONS
group_name = value.get('group_name')
group_parent_id = int(value.get('group_parent_id') or -1)
# slugify repo group just in case :)
slug = repo_name_slug(group_name)
# check for parent of self
if edit and old_data['group_id'] == group_parent_id:
e_dict = {'group_parent_id':_('Cannot assign this group '
'as parent')}
raise formencode.Invalid('', value, state,
error_dict=e_dict)
old_gname = None
if edit:
old_gname = Group.get(
old_data.get('group_id')).group_name
if old_gname != group_name or not edit:
# check filesystem
gr = Group.query().filter(Group.group_name == slug)\
.filter(Group.group_parent_id == group_parent_id).scalar()
if gr:
e_dict = {'group_name':_('This group already exists')}
return _ValidReposGroup
class ValidPassword(formencode.validators.FancyValidator):
def to_python(self, value, state):
if value:
if value.get('password'):
value['password'] = get_crypt_password(value['password'])
except UnicodeEncodeError:
e_dict = {'password':_('Invalid characters in password')}
raise formencode.Invalid('', value, state, error_dict=e_dict)
if value.get('password_confirmation'):
value['password_confirmation'] = \
get_crypt_password(value['password_confirmation'])
e_dict = {'password_confirmation':_('Invalid characters in password')}
if value.get('new_password'):
value['new_password'] = \
get_crypt_password(value['new_password'])
e_dict = {'new_password':_('Invalid characters in password')}
return value
class ValidPasswordsMatch(formencode.validators.FancyValidator):
if value['password'] != value['password_confirmation']:
e_dict = {'password_confirmation':
_('Passwords do not match')}
class ValidAuth(formencode.validators.FancyValidator):
messages = {
'invalid_password':_('invalid password'),
'invalid_login':_('invalid user name'),
'disabled_account':_('Your account is disabled')
}
#error mapping
# error mapping
e_dict = {'username':messages['invalid_login'],
'password':messages['invalid_password']}
e_dict_disable = {'username':messages['disabled_account']}
password = value['password']
username = value['username']
if authenticate(username, password):
if user and user.active is False:
log.warning('user %s is disabled', username)
raise formencode.Invalid(self.message('disabled_account',
state=State_obj),
value, state,
error_dict=self.e_dict_disable)
log.warning('user %s not authenticated', username)
raise formencode.Invalid(self.message('invalid_password',
state=State_obj), value, state,
error_dict=self.e_dict)
class ValidRepoUser(formencode.validators.FancyValidator):
User.query().filter(User.active == True)\
.filter(User.username == value).one()
raise formencode.Invalid(_('This username is not valid'),
value, state)
def ValidRepoName(edit, old_data):
class _ValidRepoName(formencode.validators.FancyValidator):
repo_name = value.get('repo_name')
slug = repo_name_slug(repo_name)
if slug in [ADMIN_PREFIX, '']:
e_dict = {'repo_name': _('This repository name is disallowed')}
if value.get('repo_group'):
gr = Group.get(value.get('repo_group'))
group_path = gr.full_path
# value needs to be aware of group name in order to check
# db key This is an actual just the name to store in the
# database
repo_name_full = group_path + Group.url_sep() + repo_name
group_path = ''
repo_name_full = repo_name
value['repo_name_full'] = repo_name_full
rename = old_data.get('repo_name') != repo_name_full
create = not edit
if rename or create:
if group_path != '':
if RepoModel().get_by_repo_name(repo_name_full,):
e_dict = {'repo_name':_('This repository already '
'exists in a group "%s"') %
gr.group_name}
elif Group.get_by_group_name(repo_name_full):
e_dict = {'repo_name':_('There is a group with this'
' name already "%s"') %
repo_name_full}
elif RepoModel().get_by_repo_name(repo_name_full):
e_dict = {'repo_name':_('This repository '
'already exists')}
return _ValidRepoName
def ValidForkName():
class _ValidForkName(formencode.validators.FancyValidator):
repo_name = value.get('fork_name')
if RepoModel().get_by_repo_name(repo_name):
e_dict = {'fork_name':_('This repository '
return _ValidForkName
def SlugifyName():
class _SlugifyName(formencode.validators.FancyValidator):
return repo_name_slug(value)
return _SlugifyName
def ValidCloneUri():
from mercurial.httprepo import httprepository, httpsrepository
from rhodecode.lib.utils import make_ui
class _ValidCloneUri(formencode.validators.FancyValidator):
if not value:
pass
elif value.startswith('https'):
httpsrepository(make_ui('db'), value).capabilities
except Exception, e:
raise formencode.Invalid(_('invalid clone url'), value,
elif value.startswith('http'):
httprepository(make_ui('db'), value).capabilities
raise formencode.Invalid(_('Invalid clone url, provide a '
'valid clone http\s url'), value,
return _ValidCloneUri
def ValidForkType(old_data):
class _ValidForkType(formencode.validators.FancyValidator):
@@ -523,166 +524,169 @@ def UserForm(edit=False, old_data={}):
def UsersGroupForm(edit=False, old_data={}, available_members=[]):
class _UsersGroupForm(formencode.Schema):
allow_extra_fields = True
filter_extra_fields = True
users_group_name = All(UnicodeString(strip=True, min=1, not_empty=True),
ValidUsersGroup(edit, old_data))
users_group_active = StringBoolean(if_missing=False)
users_group_members = OneOf(available_members, hideList=False,
testValueList=True,
if_missing=None, not_empty=False)
return _UsersGroupForm
def ReposGroupForm(edit=False, old_data={}, available_groups=[]):
class _ReposGroupForm(formencode.Schema):
group_name = All(UnicodeString(strip=True, min=1, not_empty=True),
SlugifyName())
group_description = UnicodeString(strip=True, min=1,
not_empty=True)
group_parent_id = OneOf(available_groups, hideList=False,
chained_validators = [ValidReposGroup(edit, old_data)]
return _ReposGroupForm
def RegisterForm(edit=False, old_data={}):
class _RegisterForm(formencode.Schema):
username = All(ValidUsername(edit, old_data),
UnicodeString(strip=True, min=1, not_empty=True))
password = All(UnicodeString(strip=True, min=6, not_empty=True))
password_confirmation = All(UnicodeString(strip=True, min=6, not_empty=True))
active = StringBoolean(if_missing=False)
name = UnicodeString(strip=True, min=1, not_empty=True)
lastname = UnicodeString(strip=True, min=1, not_empty=True)
email = All(Email(not_empty=True), UniqSystemEmail(old_data))
chained_validators = [ValidPasswordsMatch, ValidPassword]
return _RegisterForm
def PasswordResetForm():
class _PasswordResetForm(formencode.Schema):
email = All(ValidSystemEmail(), Email(not_empty=True))
return _PasswordResetForm
def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
repo_groups=[]):
class _RepoForm(formencode.Schema):
filter_extra_fields = False
repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
clone_uri = All(UnicodeString(strip=True, min=1, not_empty=False),
ValidCloneUri()())
repo_group = OneOf(repo_groups, hideList=True)
repo_type = OneOf(supported_backends)
description = UnicodeString(strip=True, min=1, not_empty=True)
private = StringBoolean(if_missing=False)
enable_statistics = StringBoolean(if_missing=False)
enable_downloads = StringBoolean(if_missing=False)
#this is repo owner
user = All(UnicodeString(not_empty=True), ValidRepoUser)
chained_validators = [ValidRepoName(edit, old_data), ValidPerms]
return _RepoForm
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys()):
class _RepoForkForm(formencode.Schema):
fork_name = All(UnicodeString(strip=True, min=1, not_empty=True),
repo_type = All(ValidForkType(old_data), OneOf(supported_backends))
chained_validators = [ValidForkName()]
return _RepoForkForm
def RepoSettingsForm(edit=False, old_data={}):
def RepoSettingsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
chained_validators = [ValidRepoName(edit, old_data), ValidPerms, ValidSettings]
chained_validators = [ValidRepoName(edit, old_data), ValidPerms,
ValidSettings]
def ApplicationSettingsForm():
class _ApplicationSettingsForm(formencode.Schema):
rhodecode_title = UnicodeString(strip=True, min=1, not_empty=True)
rhodecode_realm = UnicodeString(strip=True, min=1, not_empty=True)
rhodecode_ga_code = UnicodeString(strip=True, min=1, not_empty=False)
return _ApplicationSettingsForm
def ApplicationUiSettingsForm():
class _ApplicationUiSettingsForm(formencode.Schema):
web_push_ssl = OneOf(['true', 'false'], if_missing='false')
paths_root_path = All(ValidPath(), UnicodeString(strip=True, min=1, not_empty=True))
hooks_changegroup_update = OneOf(['True', 'False'], if_missing=False)
hooks_changegroup_repo_size = OneOf(['True', 'False'], if_missing=False)
hooks_pretxnchangegroup_push_logger = OneOf(['True', 'False'], if_missing=False)
hooks_preoutgoing_pull_logger = OneOf(['True', 'False'], if_missing=False)
return _ApplicationUiSettingsForm
def DefaultPermissionsForm(perms_choices, register_choices, create_choices):
class _DefaultPermissionsForm(formencode.Schema):
overwrite_default = StringBoolean(if_missing=False)
anonymous = OneOf(['True', 'False'], if_missing=False)
default_perm = OneOf(perms_choices)
default_register = OneOf(register_choices)
default_create = OneOf(create_choices)
return _DefaultPermissionsForm
def LdapSettingsForm(tls_reqcert_choices, search_scope_choices, tls_kind_choices):
class _LdapSettingsForm(formencode.Schema):
pre_validators = [LdapLibValidator]
ldap_active = StringBoolean(if_missing=False)
ldap_host = UnicodeString(strip=True,)
ldap_port = Number(strip=True,)
ldap_tls_kind = OneOf(tls_kind_choices)
ldap_tls_reqcert = OneOf(tls_reqcert_choices)
ldap_dn_user = UnicodeString(strip=True,)
ldap_dn_pass = UnicodeString(strip=True,)
ldap_base_dn = UnicodeString(strip=True,)
ldap_filter = UnicodeString(strip=True,)
ldap_search_scope = OneOf(search_scope_choices)
ldap_attr_login = All(AttrLoginValidator, UnicodeString(strip=True,))
ldap_attr_firstname = UnicodeString(strip=True,)
ldap_attr_lastname = UnicodeString(strip=True,)
ldap_attr_email = UnicodeString(strip=True,)
return _LdapSettingsForm
rhodecode.model.repo
~~~~~~~~~~~~~~~~~~~~
Repository model for rhodecode
:created_on: Jun 5, 2010
import shutil
from datetime import datetime
from sqlalchemy.orm import joinedload, make_transient
from vcs.backends import get_backend
from rhodecode.lib import safe_str
from rhodecode.model import BaseModel
from rhodecode.model.db import Repository, RepoToPerm, User, Permission, \
Statistics, UsersGroup, UsersGroupRepoToPerm, RhodeCodeUi, Group
class RepoModel(BaseModel):
def repos_path(self):
"""Get's the repositories root path from database
q = self.sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
return q.ui_value
def get(self, repo_id, cache=False):
repo = self.sa.query(Repository)\
.filter(Repository.repo_id == repo_id)
repo = repo.options(FromCache("sql_cache_short",
"get_repo_%s" % repo_id))
return repo.scalar()
def get_by_repo_name(self, repo_name, cache=False):
.filter(Repository.repo_name == repo_name)
"get_repo_%s" % repo_name))
def get_users_js(self):
users = self.sa.query(User).filter(User.active == True).all()
u_tmpl = '''{id:%s, fname:"%s", lname:"%s", nname:"%s"},'''
users_array = '[%s]' % '\n'.join([u_tmpl % (u.user_id, u.name,
u.lastname, u.username)
for u in users])
return users_array
def get_users_groups_js(self):
users_groups = self.sa.query(UsersGroup)\
.filter(UsersGroup.users_group_active == True).all()
g_tmpl = '''{id:%s, grname:"%s",grmembers:"%s"},'''
users_groups_array = '[%s]' % '\n'.join([g_tmpl % \
(gr.users_group_id, gr.users_group_name,
len(gr.members))
for gr in users_groups])
return users_groups_array
def _get_defaults(self, repo_name):
Get's information about repository, and returns a dict for
usage in forms
repo_info = Repository.get_by_repo_name(repo_name)
if repo_info is None:
defaults = repo_info.get_dict()
group, repo_name = repo_info.groups_and_repo
# fill owner
if repo_info.user:
defaults.update({'user': repo_info.user.username})
replacement_user = User.query().filter(User.admin ==
True).first().username
# fill repository users
for p in repo_info.repo_to_perm:
# fill repository groups
for p in repo_info.users_group_to_perm:
def update(self, repo_name, form_data):
cur_repo = self.get_by_repo_name(repo_name, cache=False)
# update permissions
for member, perm, member_type in form_data['perms_updates']:
if member_type == 'user':
r2p = self.sa.query(RepoToPerm)\
.filter(RepoToPerm.user == User.get_by_username(member))\
.filter(RepoToPerm.repository == cur_repo)\
.one()
r2p.permission = self.sa.query(Permission)\
.filter(Permission.permission_name ==
perm).scalar()
self.sa.add(r2p)
g2p = self.sa.query(UsersGroupRepoToPerm)\
.filter(UsersGroupRepoToPerm.users_group ==
UsersGroup.get_by_group_name(member))\
.filter(UsersGroupRepoToPerm.repository ==
cur_repo).one()
g2p.permission = self.sa.query(Permission)\
self.sa.add(g2p)
# set new permissions
for member, perm, member_type in form_data['perms_new']:
r2p = RepoToPerm()
r2p.repository = cur_repo
r2p.user = User.get_by_username(member)
.filter(Permission.
permission_name == perm)\
g2p = UsersGroupRepoToPerm()
g2p.repository = cur_repo
g2p.users_group = UsersGroup.get_by_group_name(member)
# update current repo
if k == 'user':
cur_repo.user = User.get_by_username(v)
elif k == 'repo_name':
elif k == 'repo_group':
cur_repo.group_id = v
cur_repo.group = Group.get(v)
setattr(cur_repo, k, v)
new_name = cur_repo.get_new_name(form_data['repo_name'])
cur_repo.repo_name = new_name
self.sa.add(cur_repo)
if repo_name != new_name:
# rename repository
self.__rename_repo(old=repo_name, new=new_name)
self.sa.commit()
return cur_repo
self.sa.rollback()
def create(self, form_data, cur_user, just_db=False, fork=False):
if fork:
repo_name = form_data['fork_name']
org_name = form_data['repo_name']
org_full_name = org_name
org_name = repo_name = form_data['repo_name']
repo_name_full = form_data['repo_name_full']
new_repo = Repository()
new_repo.enable_statistics = False
if k == 'repo_name':
v = repo_name
v = repo_name_full
if k == 'repo_group':
k = 'group_id'
if k == 'description':
v = v or repo_name
setattr(new_repo, k, v)
parent_repo = self.sa.query(Repository)\
.filter(Repository.repo_name == org_full_name).one()
new_repo.fork = parent_repo
new_repo.user_id = cur_user.user_id
self.sa.add(new_repo)
#create default permission
repo_to_perm = RepoToPerm()
default = 'repository.read'
for p in User.get_by_username('default').user_perms:
if p.permission.permission_name.startswith('repository.'):
default = p.permission.permission_name
default_perm = 'repository.none' if form_data['private'] else default
repo_to_perm.permission_id = self.sa.query(Permission)\
.filter(Permission.permission_name == default_perm)\
.one().permission_id
repo_to_perm.repository = new_repo
repo_to_perm.user_id = User.get_by_username('default').user_id
self.sa.add(repo_to_perm)
if not just_db:
self.__create_repo(repo_name, form_data['repo_type'],
form_data['repo_group'],
form_data['clone_uri'])
#now automatically start following this repository as owner
ScmModel(self.sa).toggle_following_repo(new_repo.repo_id,
cur_user.user_id)
return new_repo
def create_fork(self, form_data, cur_user):
from rhodecode.lib.celerylib import tasks, run_task
run_task(tasks.create_repo_fork, form_data, cur_user)
def delete(self, repo):
self.sa.delete(repo)
self.__delete_repo(repo)
def delete_perm_user(self, form_data, repo_name):
self.sa.query(RepoToPerm)\
.filter(RepoToPerm.repository \
== self.get_by_repo_name(repo_name))\
.filter(RepoToPerm.user_id == form_data['user_id']).delete()
def delete_perm_users_group(self, form_data, repo_name):
self.sa.query(UsersGroupRepoToPerm)\
.filter(UsersGroupRepoToPerm.repository \
.filter(UsersGroupRepoToPerm.users_group_id \
== form_data['users_group_id']).delete()
def delete_stats(self, repo_name):
self.sa.query(Statistics)\
.filter(Statistics.repository == \
self.get_by_repo_name(repo_name)).delete()
def __create_repo(self, repo_name, alias, new_parent_id, clone_uri=False):
makes repository on filesystem. It's group aware means it'll create
a repository within a group, and alter the paths accordingly of
group location
:param alias:
:param parent_id:
:param clone_uri:
from rhodecode.lib.utils import is_valid_repo,is_valid_repos_group
from rhodecode.lib.utils import is_valid_repo, is_valid_repos_group
if new_parent_id:
paths = Group.get(new_parent_id).full_path.split(Group.url_sep())
new_parent_path = os.sep.join(paths)
new_parent_path = ''
repo_path = os.path.join(*map(lambda x:safe_str(x),
[self.repos_path, new_parent_path, repo_name]))
# check if this path is not a repository
if is_valid_repo(repo_path, self.repos_path):
raise Exception('This path %s is a valid repository' % repo_path)
# check if this path is a group
if is_valid_repos_group(repo_path, self.repos_path):
raise Exception('This path %s is a valid group' % repo_path)
log.info('creating repo %s in %s @ %s', repo_name, repo_path,
clone_uri)
backend(repo_path, create=True, src_url=clone_uri)
def __rename_repo(self, old, new):
renames repository on filesystem
:param old: old name
:param new: new name
log.info('renaming repo from %s to %s', old, new)
old_path = os.path.join(self.repos_path, old)
new_path = os.path.join(self.repos_path, new)
if os.path.isdir(new_path):
raise Exception('Was trying to rename to already existing dir %s' \
% new_path)
shutil.move(old_path, new_path)
def __delete_repo(self, repo):
removes repo from filesystem, the removal is acctually made by
added rm__ prefix into dir, and rename internat .hg/.git dirs so this
repository is no longer valid for rhodecode, can be undeleted later on
by reverting the renames on this repository
:param repo: repo object
rm_path = os.path.join(self.repos_path, repo.repo_name)
log.info("Removing %s", rm_path)
#disable hg/git
alias = repo.repo_type
shutil.move(os.path.join(rm_path, '.%s' % alias),
os.path.join(rm_path, 'rm__.%s' % alias))
#disable repo
shutil.move(rm_path, os.path.join(self.repos_path, 'rm__%s__%s' \
% (datetime.today()\
.strftime('%Y%m%d_%H%M%S_%f'),
repo.repo_name)))
rhodecode.model.user_group
~~~~~~~~~~~~~~~~~~~~~~~~~~
users groups model for RhodeCode
:created_on: Jan 25, 2011
from rhodecode.model.db import Group, RhodeCodeUi
class ReposGroupModel(BaseModel):
Get's the repositories root path from database
q = RhodeCodeUi.get_by_key('/').one()
def __create_group(self, group_name, parent_id):
def __create_group(self, group_name):
makes repositories group on filesystem
if parent_id:
paths = Group.get(parent_id).full_path.split(Group.url_sep())
parent_path = os.sep.join(paths)
parent_path = ''
create_path = os.path.join(self.repos_path, parent_path, group_name)
create_path = os.path.join(self.repos_path, group_name)
log.debug('creating new group in %s', create_path)
if os.path.isdir(create_path):
raise Exception('That directory already exists !')
os.makedirs(create_path)
def __rename_group(self, old, old_parent_id, new, new_parent_id):
def __rename_group(self, old, new):
Renames a group on filesystem
if old == new:
log.debug('skipping group rename')
log.debug('renaming repos group from %s to %s', old, new)
if old_parent_id:
paths = Group.get(old_parent_id).full_path.split(Group.url_sep())
old_parent_path = os.sep.join(paths)
old_parent_path = ''
old_path = os.path.join(self.repos_path, old_parent_path, old)
new_path = os.path.join(self.repos_path, new_parent_path, new)
log.debug('renaming repos paths from %s to %s', old_path, new_path)
raise Exception('Was trying to rename to already '
'existing dir %s' % new_path)
def __delete_group(self, group):
Deletes a group from a filesystem
:param group: instance of group from database
paths = group.full_path.split(Group.url_sep())
paths = os.sep.join(paths)
rm_path = os.path.join(self.repos_path, paths)
os.rmdir(rm_path)
if os.path.isdir(rm_path):
# delete only if that path really exists
def create(self, form_data):
new_repos_group = Group()
new_repos_group.group_name = form_data['group_name']
new_repos_group.group_description = \
form_data['group_description']
new_repos_group.group_parent_id = form_data['group_parent_id']
new_repos_group.group_description = form_data['group_description']
new_repos_group.parent_group = Group.get(form_data['group_parent_id'])
new_repos_group.group_name = new_repos_group.get_new_name(form_data['group_name'])
self.sa.add(new_repos_group)
self.__create_group(form_data['group_name'],
form_data['group_parent_id'])
self.__create_group(new_repos_group.group_name)
return new_repos_group
def update(self, repos_group_id, form_data):
repos_group = Group.get(repos_group_id)
old_name = repos_group.group_name
old_parent_id = repos_group.group_parent_id
old_path = repos_group.full_path
# change properties
repos_group.group_description = form_data['group_description']
repos_group.parent_group = Group.get(form_data['group_parent_id'])
repos_group.group_name = repos_group.get_new_name(form_data['group_name'])
repos_group.group_name = form_data['group_name']
repos_group.group_description = \
repos_group.group_parent_id = form_data['group_parent_id']
new_path = repos_group.full_path
self.sa.add(repos_group)
if old_name != form_data['group_name'] or (old_parent_id !=
form_data['group_parent_id']):
self.__rename_group(old = old_name, old_parent_id = old_parent_id,
new = form_data['group_name'],
new_parent_id = form_data['group_parent_id'])
self.__rename_group(old_path, new_path)
# we need to get all repositories from this new group and
# rename them accordingly to new group path
for r in repos_group.repositories:
r.repo_name = r.get_new_name(r.just_name)
self.sa.add(r)
return repos_group
def delete(self, users_group_id):
users_group = Group.get(users_group_id)
self.sa.delete(users_group)
self.__delete_group(users_group)
rhodecode.model.user
users model for RhodeCode
:created_on: Apr 9, 2010
from rhodecode.lib import safe_unicode
from rhodecode.model.db import User, RepoToPerm, Repository, Permission, \
UserToPerm, UsersGroupRepoToPerm, UsersGroupToPerm, UsersGroupMember
from rhodecode.lib.exceptions import DefaultUserException, \
UserOwnsReposException
from rhodecode.lib import generate_api_key
from sqlalchemy.orm import joinedload
PERM_WEIGHTS = {'repository.none': 0,
'repository.read': 1,
'repository.write': 3,
'repository.admin': 3}
class UserModel(BaseModel):
def get(self, user_id, cache = False):
def get(self, user_id, cache=False):
user = self.sa.query(User)
user = user.options(FromCache("sql_cache_short",
"get_user_%s" % user_id))
return user.get(user_id)
def get_by_username(self, username, cache = False, case_insensitive = False):
def get_by_username(self, username, cache=False, case_insensitive=False):
user = self.sa.query(User).filter(User.username.ilike(username))
user = self.sa.query(User)\
.filter(User.username == username)
"get_user_%s" % username))
return user.scalar()
def get_by_api_key(self, api_key, cache = False):
def get_by_api_key(self, api_key, cache=False):
.filter(User.api_key == api_key)
"get_user_%s" % api_key))
new_user = User()
self.sa.add(new_user)
def create_ldap(self, username, password, user_dn, attrs):
Checks if user is in database, if not creates this user marked
as ldap user
:param user_dn:
:param attrs:
log.debug('Checking for such ldap account in RhodeCode database')
if self.get_by_username(username, case_insensitive = True) is None:
if self.get_by_username(username, case_insensitive=True) is None:
# add ldap account always lowercase
new_user.username = username.lower()
new_user.password = get_crypt_password(password)
new_user.api_key = generate_api_key(username)
new_user.email = attrs['email']
new_user.active = True
new_user.ldap_dn = safe_unicode(user_dn)
new_user.name = attrs['name']
new_user.lastname = attrs['lastname']
return True
except (DatabaseError,):
log.debug('this %s user exists skipping creation of ldap account',
username)
return False
def create_registration(self, form_data):
if k != 'admin':
body = ('New user registration\n'
'username: %s\n'
'email: %s\n')
body = body % (form_data['username'], form_data['email'])
run_task(tasks.send_email, None,
_('[RhodeCode] New User registration'),
body)
def update(self, user_id, form_data):
user = self.get(user_id, cache = False)
user = self.get(user_id, cache=False)
if user.username == 'default':
raise DefaultUserException(
_("You can't Edit this user since it's"
" crucial for entire application"))
if k == 'new_password' and v != '':
user.password = v
user.api_key = generate_api_key(user.username)
setattr(user, k, v)
self.sa.add(user)
def update_my_account(self, user_id, form_data):
if k not in ['admin', 'active']:
def delete(self, user_id):
_("You can't remove this user since it's"
if user.repositories:
raise UserOwnsReposException(_('This user still owns %s '
'repositories and cannot be '
'removed. Switch owners or '
'remove those repositories') \
% user.repositories)
self.sa.delete(user)
def reset_password_link(self, data):
run_task(tasks.send_password_link, data['email'])
def reset_password(self, data):
run_task(tasks.reset_user_password, data['email'])
def fill_data(self, auth_user, user_id = None, api_key = None):
def fill_data(self, auth_user, user_id=None, api_key=None):
Fetches auth_user by user_id,or api_key if present.
Fills auth_user attributes with those taken from database.
Additionally set's is_authenitated if lookup fails
present in database
:param auth_user: instance of user to set attributes
:param user_id: user id to fetch by
:param api_key: api key to fetch by
if user_id is None and api_key is None:
raise Exception('You need to pass user_id or api_key')
if api_key:
dbuser = self.get_by_api_key(api_key)
dbuser = self.get(user_id)
if dbuser is not None:
log.debug('filling %s data', dbuser)
for k, v in dbuser.get_dict().items():
setattr(auth_user, k, v)
auth_user.is_authenticated = False
return auth_user
def fill_perms(self, user):
Fills user permission attribute with permissions taken from database
works for permissions given for repositories, and for permissions that
are granted to groups
:param user: user instance to fill his perms
user.permissions['repositories'] = {}
user.permissions['global'] = set()
#======================================================================
# fetch default permissions
default_user = self.get_by_username('default', cache = True)
default_user = self.get_by_username('default', cache=True)
default_perms = self.sa.query(RepoToPerm, Repository, Permission)\
.join((Repository, RepoToPerm.repository_id ==
Repository.repo_id))\
.join((Permission, RepoToPerm.permission_id ==
Permission.permission_id))\
.filter(RepoToPerm.user == default_user).all()
if user.is_admin:
#==================================================================
# #admin have all default rights set to admin
user.permissions['global'].add('hg.admin')
for perm in default_perms:
p = 'repository.admin'
user.permissions['repositories'][perm.RepoToPerm.
repository.repo_name] = p
# set default permissions
uid = user.user_id
#default global
default_global_perms = self.sa.query(UserToPerm)\
.filter(UserToPerm.user == default_user)
for perm in default_global_perms:
user.permissions['global'].add(perm.permission.permission_name)
#default for repositories
if perm.Repository.private and not (perm.Repository.user_id ==
uid):
#diself.sable defaults for private repos,
p = 'repository.none'
elif perm.Repository.user_id == uid:
#set admin if owner
p = perm.Permission.permission_name
# overwrite default with user permissions if any
#user global
user_perms = self.sa.query(UserToPerm)\
.options(joinedload(UserToPerm.permission))\
.filter(UserToPerm.user_id == uid).all()
for perm in user_perms:
user.permissions['global'].add(perm.permission.
permission_name)
#user repositories
user_repo_perms = self.sa.query(RepoToPerm, Permission,
Repository)\
.filter(RepoToPerm.user_id == uid).all()
for perm in user_repo_perms:
# set admin if owner
if perm.Repository.user_id == uid:
# check if user is part of groups for this repository and fill in
# (or replace with higher) permissions
#users group global
user_perms_from_users_groups = self.sa.query(UsersGroupToPerm)\
.options(joinedload(UsersGroupToPerm.permission))\
.join((UsersGroupMember, UsersGroupToPerm.users_group_id ==
UsersGroupMember.users_group_id))\
.filter(UsersGroupMember.user_id == uid).all()
for perm in user_perms_from_users_groups:
#users group repositories
user_repo_perms_from_users_groups = self.sa.query(
UsersGroupRepoToPerm,
Permission, Repository,)\
.join((Repository, UsersGroupRepoToPerm.repository_id ==
.join((Permission, UsersGroupRepoToPerm.permission_id ==
.join((UsersGroupMember, UsersGroupRepoToPerm.users_group_id ==
for perm in user_repo_perms_from_users_groups:
cur_perm = user.permissions['repositories'][perm.
UsersGroupRepoToPerm.
repository.repo_name]
#overwrite permission only if it's greater than permission
# given from other sources
if PERM_WEIGHTS[p] > PERM_WEIGHTS[cur_perm]:
user.permissions['repositories'][perm.UsersGroupRepoToPerm.
return user
## -*- coding: utf-8 -*-
<%inherit file="/base/base.html"/>
<%def name="title()">
${c.repo_name} ${_('Settings')} - ${c.rhodecode_name}
</%def>
<%def name="breadcrumbs_links()">
${h.link_to(u'Home',h.url('/'))}
»
${h.link_to(c.repo_info.repo_name,h.url('summary_home',repo_name=c.repo_info.repo_name))}
${_('Settings')}
<%def name="page_nav()">
${self.menu('settings')}
<%def name="main()">
<div class="box">
<!-- box / title -->
<div class="title">
${self.breadcrumbs()}
</div>
${h.form(url('repo_settings_update', repo_name=c.repo_info.repo_name),method='put')}
<div class="form">
<!-- fields -->
<div class="fields">
<div class="field">
<div class="label">
<label for="repo_name">${_('Name')}:</label>
<div class="input input-medium">
${h.text('repo_name',class_="small")}
<label for="repo_group">${_('Repository group')}:</label>
<div class="input">
${h.select('repo_group','',c.repo_groups,class_="medium")}
<div class="label label-textarea">
<label for="description">${_('Description')}:</label>
<div class="textarea text-area editor">
${h.textarea('description',cols=23,rows=5)}
<div class="label label-checkbox">
<label for="private">${_('Private')}:</label>
<div class="checkboxes">
${h.checkbox('private',value="True")}
<label for="">${_('Permissions')}:</label>
<%include file="../admin/repos/repo_edit_perms.html"/>
<div class="buttons">
${h.submit('save','Save',class_="ui-button")}
${h.reset('reset','Reset',class_="ui-button")}
${h.end_form()}
"""Pylons application test package
This package assumes the Pylons environment is already loaded, such as
when this script is imported from the `nosetests --with-pylons=test.ini`
command.
This module initializes the application via ``websetup`` (`paster
setup-app`) and provides the base testing objects.
from os.path import join as jn
from unittest import TestCase
from paste.deploy import loadapp
from paste.script.appinstall import SetupCommand
from pylons import config, url
from routes.util import URLGenerator
from webtest import TestApp
from rhodecode.model import meta
import pylons.test
__all__ = ['environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK',
'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS' ]
# Invoke websetup with the current config file
#SetupCommand('setup-app').run([config_file])
##RUNNING DESIRED TESTS
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
# nosetests --pdb --pdb-failures
environ = {}
#SOME GLOBALS FOR TESTS
from tempfile import _RandomNameSequence
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
TEST_USER_ADMIN_LOGIN = 'test_admin'
TEST_USER_ADMIN_PASS = 'test12'
HG_REPO = 'vcs_test_hg'
GIT_REPO = 'vcs_test_git'
NEW_HG_REPO = 'vcs_test_hg_new'
NEW_GIT_REPO = 'vcs_test_git_new'
HG_FORK = 'vcs_test_hg_fork'
GIT_FORK = 'vcs_test_git_fork'
class TestController(TestCase):
def __init__(self, *args, **kwargs):
wsgiapp = pylons.test.pylonsapp
config = wsgiapp.config
self.app = TestApp(wsgiapp)
url._push_object(URLGenerator(config['routes.map'], environ))
self.sa = meta.Session
self.index_location = config['app_conf']['index_dir']
TestCase.__init__(self, *args, **kwargs)
def log_user(self, username=TEST_USER_ADMIN_LOGIN,
password=TEST_USER_ADMIN_PASS):
response = self.app.post(url(controller='login', action='index'),
{'username':username,
'password':password})
if 'invalid user name' in response.body:
self.fail('could not login using %s %s' % (username, password))
self.assertEqual(response.status, '302 Found')
self.assertEqual(response.session['rhodecode_user'].username, username)
return response.follow()
def checkSessionFlash(self, response, msg):
self.assertTrue('flash' in response.session)
self.assertTrue(msg in response.session['flash'][0][1])
Status change: