#!/usr/bin/env python
# encoding: utf-8
# users controller for pylons
# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""
Created on April 4, 2010
users controller for pylons
@author: marcink
from formencode import htmlfill
from pylons import request, session, tmpl_context as c, url
from pylons.controllers.util import abort, redirect
from pylons.i18n.translation import _
from rhodecode.lib.exceptions import *
from rhodecode.lib import helpers as h
from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator
from rhodecode.lib.base import BaseController, render
from rhodecode.model.db import User, UserLog
from rhodecode.model.db import User
from rhodecode.model.forms import UserForm
from rhodecode.model.user import UserModel, DefaultUserException
from rhodecode.model.user import UserModel
import formencode
import logging
import traceback
log = logging.getLogger(__name__)
class UsersController(BaseController):
"""REST Controller styled on the Atom Publishing Protocol"""
# To properly map this controller, ensure your config/routing.py
# file has a resource setup:
# map.resource('user', 'users')
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def __before__(self):
c.admin_user = session.get('admin_user')
c.admin_username = session.get('admin_username')
super(UsersController, self).__before__()
def index(self, format='html'):
"""GET /users: All items in the collection"""
# url('users')
c.users_list = self.sa.query(User).all()
return render('admin/users/users.html')
def create(self):
"""POST /users: Create a new item"""
user_model = UserModel()
login_form = UserForm()()
try:
form_result = login_form.to_python(dict(request.POST))
user_model.create(form_result)
h.flash(_('created user %s') % form_result['username'],
category='success')
#action_logger(self.rhodecode_user, 'new_user', '', '', self.sa)
except formencode.Invalid, errors:
return htmlfill.render(
render('admin/users/user_add.html'),
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8")
except Exception:
log.error(traceback.format_exc())
h.flash(_('error occured during creation of user %s') \
% request.POST.get('username'), category='error')
return redirect(url('users'))
def new(self, format='html'):
"""GET /users/new: Form to create a new item"""
# url('new_user')
return render('admin/users/user_add.html')
def update(self, id):
"""PUT /users/id: Update an existing item"""
# Forms posted to this method should contain a hidden field:
# <input type="hidden" name="_method" value="PUT" />
# Or using helpers:
# h.form(url('user', id=ID),
# method='put')
# url('user', id=ID)
c.user = user_model.get(id)
_form = UserForm(edit=True, old_data={'user_id':id,
'email':c.user.email})()
form_result = {}
form_result = _form.to_python(dict(request.POST))
user_model.update(id, form_result)
h.flash(_('User updated succesfully'), category='success')
render('admin/users/user_edit.html'),
h.flash(_('error occured during update of user %s') \
% form_result.get('username'), category='error')
def delete(self, id):
"""DELETE /users/id: Delete an existing item"""
# <input type="hidden" name="_method" value="DELETE" />
# method='delete')
user_model.delete(id)
h.flash(_('sucessfully deleted user'), category='success')
except DefaultUserException, e:
except (UserOwnsReposException, DefaultUserException), e:
h.flash(str(e), category='warning')
h.flash(_('An error occured during deletion of user'),
category='error')
def show(self, id, format='html'):
"""GET /users/id: Show a specific item"""
def edit(self, id, format='html'):
"""GET /users/id/edit: Form to edit an existing item"""
# url('edit_user', id=ID)
c.user = self.sa.query(User).get(id)
if not c.user:
if c.user.username == 'default':
h.flash(_("You can't edit this user"), category='warning')
defaults = c.user.__dict__
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
# authentication and permission libraries
from pylons import config, session, url, request
from rhodecode.lib.utils import get_repo_slug
from rhodecode.lib.auth_ldap import AuthLdap, UsernameError, PasswordError
from rhodecode.lib.auth_ldap import AuthLdap
from rhodecode.model import meta
from rhodecode.model.caching_query import FromCache
from rhodecode.model.db import User, RepoToPerm, Repository, Permission, \
UserToPerm
import bcrypt
from decorator import decorator
import random
class PasswordGenerator(object):
"""This is a simple class for generating password from
different sets of characters
usage:
passwd_gen = PasswordGenerator()
#print 8-letter password containing only big and small letters of alphabet
print passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
ALPHABETS_NUM = r'''1234567890'''#[0]
ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''#[1]
ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''#[2]
ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?''' #[3]
ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM + ALPHABETS_SPECIAL#[4]
ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM#[5]
ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM#[6]
ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM#[7]
def __init__(self, passwd=''):
self.passwd = passwd
def gen_password(self, len, type):
self.passwd = ''.join([random.choice(type) for _ in xrange(len)])
return self.passwd
def get_crypt_password(password):
"""Cryptographic function used for password hashing based on sha1
:param password: password to hash
return bcrypt.hashpw(password, bcrypt.gensalt(10))
def check_password(password, hashed):
return bcrypt.hashpw(password, hashed) == hashed
def authfunc(environ, username, password):
Authentication function used in Mercurial/Git/ and access control,
firstly checks for db authentication then if ldap is enabled for ldap
authentication, also creates ldap user if not in database
:param environ: needed only for using in Basic auth, can be None
:param username: username
:param password: password
user = user_model.get_by_username(username, cache=False)
if user is not None and user.is_ldap is False:
if user.active:
if user.username == 'default' and user.active:
log.info('user %s authenticated correctly', username)
return True
elif user.username == username and check_password(password, user.password):
else:
log.error('user %s is disabled', username)
from rhodecode.model.settings import SettingsModel
ldap_settings = SettingsModel().get_ldap_settings()
#======================================================================
# FALLBACK TO LDAP AUTH IN ENABLE
if ldap_settings.get('ldap_active', False):
kwargs = {
'server':ldap_settings.get('ldap_host', ''),
'base_dn':ldap_settings.get('ldap_base_dn', ''),
'port':ldap_settings.get('ldap_port'),
'bind_dn':ldap_settings.get('ldap_dn_user'),
'bind_pass':ldap_settings.get('ldap_dn_pass'),
'use_ldaps':ldap_settings.get('ldap_ldaps'),
'ldap_version':3,
}
log.debug('Checking for ldap authentication')
aldap = AuthLdap(**kwargs)
res = aldap.authenticate_ldap(username, password)
authenticated = res[1]['uid'][0] == username
if authenticated and user_model.create_ldap(username, password):
log.info('created new ldap user')
return authenticated
except (UsernameError, PasswordError):
except (LdapUsernameError, LdapPasswordError):
return False
except:
class AuthUser(object):
A simple object that handles a mercurial username for authentication
def __init__(self):
self.username = 'None'
self.name = ''
self.lastname = ''
self.email = ''
self.user_id = None
self.is_authenticated = False
self.is_admin = False
self.permissions = {}
def __repr__(self):
return "<AuthUser('id:%s:%s')>" % (self.user_id, self.username)
def set_available_permissions(config):
This function will propagate pylons globals with all available defined
permission given in db. We don't wannt to check each time from db for new
permissions since adding a new permission also requires application restart
ie. to decorate new views with the newly created permission
:param config:
log.info('getting information about all available permissions')
sa = meta.Session()
all_perms = sa.query(Permission).all()
pass
finally:
meta.Session.remove()
config['available_permissions'] = [x.permission_name for x in all_perms]
def set_base_path(config):
config['base_path'] = config['pylons.app_globals'].base_path
def fill_perms(user):
Fills user permission attribute with permissions taken from database
:param user:
user.permissions['repositories'] = {}
user.permissions['global'] = set()
#===========================================================================
# fetch default permissions
default_user = UserModel().get_by_username('default', cache=True)
default_perms = sa.query(RepoToPerm, Repository, Permission)\
.join((Repository, RepoToPerm.repository_id == Repository.repo_id))\
.join((Permission, RepoToPerm.permission_id == Permission.permission_id))\
.filter(RepoToPerm.user == default_user).all()
if user.is_admin:
#=======================================================================
# #admin have all default rights set to admin
user.permissions['global'].add('hg.admin')
for perm in default_perms:
p = 'repository.admin'
user.permissions['repositories'][perm.RepoToPerm.repository.repo_name] = p
# set default permissions
#default global
default_global_perms = sa.query(UserToPerm)\
.filter(UserToPerm.user == sa.query(User).filter(User.username ==
'default').one())
for perm in default_global_perms:
user.permissions['global'].add(perm.permission.permission_name)
#default repositories
if perm.Repository.private and not perm.Repository.user_id == user.user_id:
#disable defaults for private repos,
p = 'repository.none'
elif perm.Repository.user_id == user.user_id:
#set admin if owner
#==============================================================================
# LDAP
#Name = Just a description for the auth modes page
#Host = DepartmentName.OrganizationName.local/ IP
#Port = 389 default for ldap
#LDAPS = no set True if You need to use ldaps
#Account = DepartmentName\UserName (or UserName@MyDomain depending on AD server)
#Password = <password>
#Base DN = DC=DepartmentName,DC=OrganizationName,DC=local
# ldap authentication lib
Created on Nov 17, 2010
from rhodecode.lib.exceptions import LdapImportError, UsernameError, \
PasswordError, ConnectionError
import ldap
except ImportError:
class AuthLdap(object):
def __init__(self, server, base_dn, port=389, bind_dn='', bind_pass='',
use_ldaps=False, ldap_version=3):
self.ldap_version = ldap_version
if use_ldaps:
port = port or 689
self.LDAP_USE_LDAPS = use_ldaps
self.LDAP_SERVER_ADDRESS = server
self.LDAP_SERVER_PORT = port
#USE FOR READ ONLY BIND TO LDAP SERVER
self.LDAP_BIND_DN = bind_dn
self.LDAP_BIND_PASS = bind_pass
ldap_server_type = 'ldap'
if self.LDAP_USE_LDAPS:ldap_server_type = ldap_server_type + 's'
self.LDAP_SERVER = "%s://%s:%s" % (ldap_server_type,
self.LDAP_SERVER_ADDRESS,
self.LDAP_SERVER_PORT)
self.BASE_DN = base_dn
self.AUTH_DN = "uid=%s,%s"
def authenticate_ldap(self, username, password):
"""Authenticate a user via LDAP and return his/her LDAP properties.
Raises AuthenticationError if the credentials are rejected, or
EnvironmentError if the LDAP server can't be reached.
from rhodecode.lib.helpers import chop_at
uid = chop_at(username, "@%s" % self.LDAP_SERVER_ADDRESS)
dn = self.AUTH_DN % (uid, self.BASE_DN)
log.debug("Authenticating %r at %s", dn, self.LDAP_SERVER)
if "," in username:
raise UsernameError("invalid character in username: ,")
raise LdapUsernameError("invalid character in username: ,")
ldap.set_option(ldap.OPT_X_TLS_CACERTFILE, '/etc/openldap/cacerts')
ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
server = ldap.initialize(self.LDAP_SERVER)
if self.ldap_version == 2:
server.protocol = ldap.VERSION2
server.protocol = ldap.VERSION3
if self.LDAP_BIND_DN and self.LDAP_BIND_PASS:
server.simple_bind_s(self.AUTH_DN % (self.LDAP_BIND_DN,
self.BASE_DN),
self.LDAP_BIND_PASS)
server.simple_bind_s(dn, password)
properties = server.search_s(dn, ldap.SCOPE_SUBTREE)
if not properties:
raise ldap.NO_SUCH_OBJECT()
except ldap.NO_SUCH_OBJECT, e:
log.debug("LDAP says no such user '%s' (%s)", uid, username)
raise UsernameError()
raise LdapUsernameError()
except ldap.INVALID_CREDENTIALS, e:
log.debug("LDAP rejected password for user '%s' (%s)", uid, username)
raise PasswordError()
raise LdapPasswordError()
except ldap.SERVER_DOWN, e:
raise ConnectionError("LDAP can't access authentication server")
raise LdapConnectionError("LDAP can't access authentication server")
return properties[0]
# Custom Exceptions modules
Custom Exceptions modules
class UsernameError(Exception):pass
class PasswordError(Exception):pass
class ConnectionError(Exception):pass
class LdapUsernameError(Exception):pass
class LdapPasswordError(Exception):pass
class LdapConnectionError(Exception):pass
class LdapImportError(Exception):pass
class DefaultUserException(Exception):pass
class UserOwnsReposException(Exception):pass
from rhodecode.model.meta import Base
from sqlalchemy import *
from sqlalchemy.orm import relation, backref
from sqlalchemy.orm.session import Session
from vcs.utils.lazy import LazyProperty
class RhodeCodeSettings(Base):
__tablename__ = 'rhodecode_settings'
__table_args__ = (UniqueConstraint('app_settings_name'), {'useexisting':True})
app_settings_id = Column("app_settings_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
app_settings_name = Column("app_settings_name", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
app_settings_value = Column("app_settings_value", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
def __init__(self, k, v):
self.app_settings_name = k
self.app_settings_value = v
return "<RhodeCodeSetting('%s:%s')>" % (self.app_settings_name,
self.app_settings_value)
class RhodeCodeUi(Base):
__tablename__ = 'rhodecode_ui'
__table_args__ = {'useexisting':True}
ui_id = Column("ui_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
ui_section = Column("ui_section", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_key = Column("ui_key", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_value = Column("ui_value", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_active = Column("ui_active", BOOLEAN(), nullable=True, unique=None, default=True)
class User(Base):
__tablename__ = 'users'
__table_args__ = (UniqueConstraint('username'), UniqueConstraint('email'), {'useexisting':True})
user_id = Column("user_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", BOOLEAN(), nullable=True, unique=None, default=None)
admin = Column("admin", BOOLEAN(), nullable=True, unique=None, default=False)
name = Column("name", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
email = Column("email", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DATETIME(timezone=False), nullable=True, unique=None, default=None)
is_ldap = Column("is_ldap", BOOLEAN(), nullable=False, unique=None, default=False)
user_log = relation('UserLog', cascade='all')
user_perms = relation('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relation('Repository')
@LazyProperty
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
return "<User('id:%s:%s')>" % (self.user_id, self.username)
def update_lastlogin(self):
"""Update user lastlogin"""
import datetime
session = Session.object_session(self)
self.last_login = datetime.datetime.now()
session.add(self)
session.commit()
log.debug('updated user %s lastlogin', self.username)
session.rollback()
class UserLog(Base):
__tablename__ = 'user_logs'
user_log_id = Column("user_log_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", INTEGER(), ForeignKey(u'users.user_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", INTEGER(length=None, convert_unicode=False, assert_unicode=None), ForeignKey(u'repositories.repo_id'), nullable=False, unique=None, default=None)
repository_name = Column("repository_name", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_ip = Column("user_ip", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action = Column("action", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action_date = Column("action_date", DATETIME(timezone=False), nullable=True, unique=None, default=None)
user = relation('User')
repository = relation('Repository')
class Repository(Base):
__tablename__ = 'repositories'
__table_args__ = (UniqueConstraint('repo_name'), {'useexisting':True},)
repo_id = Column("repo_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
repo_name = Column("repo_name", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
repo_type = Column("repo_type", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
user_id = Column("user_id", INTEGER(), ForeignKey(u'users.user_id'), nullable=False, unique=False, default=None)
private = Column("private", BOOLEAN(), nullable=True, unique=None, default=None)
description = Column("description", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
fork_id = Column("fork_id", INTEGER(), ForeignKey(u'repositories.repo_id'), nullable=True, unique=False, default=None)
fork = relation('Repository', remote_side=repo_id)
repo_to_perm = relation('RepoToPerm', cascade='all')
stats = relation('Statistics', cascade='all', uselist=False)
return "<Repository('%s:%s')>" % (self.repo_id, self.repo_name)
class Permission(Base):
__tablename__ = 'permissions'
permission_id = Column("permission_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
permission_name = Column("permission_name", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
permission_longname = Column("permission_longname", TEXT(length=None, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
return "<Permission('%s:%s')>" % (self.permission_id, self.permission_name)
class RepoToPerm(Base):
__tablename__ = 'repo_to_perm'
__table_args__ = (UniqueConstraint('user_id', 'repository_id'), {'useexisting':True})
repo_to_perm_id = Column("repo_to_perm_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
permission_id = Column("permission_id", INTEGER(), ForeignKey(u'permissions.permission_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", INTEGER(), ForeignKey(u'repositories.repo_id'), nullable=False, unique=None, default=None)
permission = relation('Permission')
class UserToPerm(Base):
__tablename__ = 'user_to_perm'
__table_args__ = (UniqueConstraint('user_id', 'permission_id'), {'useexisting':True})
user_to_perm_id = Column("user_to_perm_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
class Statistics(Base):
__tablename__ = 'statistics'
__table_args__ = (UniqueConstraint('repository_id'), {'useexisting':True})
stat_id = Column("stat_id", INTEGER(), nullable=False, unique=True, default=None, primary_key=True)
repository_id = Column("repository_id", INTEGER(), ForeignKey(u'repositories.repo_id'), nullable=False, unique=True, default=None)
stat_on_revision = Column("stat_on_revision", INTEGER(), nullable=False)
commit_activity = Column("commit_activity", BLOB(), nullable=False)#JSON data
commit_activity_combined = Column("commit_activity_combined", BLOB(), nullable=False)#JSON data
languages = Column("languages", BLOB(), nullable=False)#JSON data
# Model for users
Created on April 9, 2010
Model for users
:author: marcink
from rhodecode.model.meta import Session
class UserModel(object):
self.sa = Session()
def get(self, user_id, cache=False):
user = self.sa.query(User)
if cache:
user = user.options(FromCache("sql_cache_short",
"get_user_%s" % user_id))
return user.get(user_id)
def get_by_username(self, username, cache=False):
user = self.sa.query(User)\
.filter(User.username == username)
"get_user_%s" % username))
return user.scalar()
def create(self, form_data):
new_user = User()
for k, v in form_data.items():
setattr(new_user, k, v)
self.sa.add(new_user)
self.sa.commit()
self.sa.rollback()
raise
def create_ldap(self, username, password):
Checks if user is in database, if not creates this user marked
as ldap user
:param username:
:param password:
if self.get_by_username(username) is None:
new_user.username = username
new_user.password = password
new_user.email = '%s@ldap.server' % username
new_user.active = True
new_user.is_ldap = True
new_user.name = '%s@ldap' % username
new_user.lastname = ''
def create_registration(self, form_data):
from rhodecode.lib.celerylib import tasks, run_task
if k != 'admin':
body = ('New user registration\n'
'username: %s\n'
'email: %s\n')
body = body % (form_data['username'], form_data['email'])
run_task(tasks.send_email, None,
_('[RhodeCode] New User registration'),
body)
def update(self, user_id, form_data):
new_user = self.get(user_id, cache=False)
if new_user.username == 'default':
raise DefaultUserException(
_("You can't Edit this user since it's"
" crucial for entire application"))
if k == 'new_password' and v != '':
new_user.password = v
def update_my_account(self, user_id, form_data):
if k not in ['admin', 'active']:
def delete(self, user_id):
user = self.get(user_id, cache=False)
if user.username == 'default':
_("You can't remove this user since it's"
if user.repositories:
raise UserOwnsReposException(_('This user still owns %s '
'repositories and cannot be '
'removed. Switch owners or '
'remove those repositories') \
% user.repositories)
self.sa.delete(user)
def reset_password(self, data):
run_task(tasks.reset_user_password, data['email'])
def fill_data(self, user):
Fills user data with those from database and log out user if not
present in database
if not hasattr(user, 'user_id') or user.user_id is None:
raise Exception('passed in user has to have the user_id attribute')
log.debug('filling auth user data')
dbuser = self.get(user.user_id)
user.username = dbuser.username
user.is_admin = dbuser.admin
user.name = dbuser.name
user.lastname = dbuser.lastname
user.email = dbuser.email
user.is_authenticated = False
return user
Status change: