# -*- coding: utf-8 -*-
"""
rhodecode.controllers.api
~~~~~~~~~~~~~~~~~~~~~~~~~
API controller for RhodeCode
:created_on: Aug 20, 2011
:author: marcink
:copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
import traceback
import logging
from sqlalchemy.orm.exc import NoResultFound
from rhodecode.controllers.api import JSONRPCController, JSONRPCError
from rhodecode.lib.auth import HasPermissionAllDecorator, \
HasPermissionAnyDecorator
HasPermissionAnyDecorator, PasswordGenerator
from rhodecode.model.meta import Session
from rhodecode.model.scm import ScmModel
from rhodecode.model.db import User, UsersGroup, RepoGroup, Repository
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.repo_permission import RepositoryPermissionModel
from rhodecode.model.users_group import UsersGroupModel
from rhodecode.model.repos_group import ReposGroupModel
log = logging.getLogger(__name__)
class ApiController(JSONRPCController):
API Controller
Each method needs to have USER as argument this is then based on given
API_KEY propagated as instance of user object
Preferably this should be first argument also
Each function should also **raise** JSONRPCError for any
errors that happens
@HasPermissionAllDecorator('hg.admin')
def pull(self, apiuser, repo_name):
Dispatch pull action on given repo
:param user:
:param repo_name:
if Repository.is_valid(repo_name) is False:
raise JSONRPCError('Unknown repo "%s"' % repo_name)
try:
ScmModel().pull_changes(repo_name, self.rhodecode_user.username)
return 'Pulled from %s' % repo_name
except Exception:
raise JSONRPCError('Unable to pull changes from "%s"' % repo_name)
def get_user(self, apiuser, username):
""""
Get a user by username
:param apiuser:
:param username:
user = User.get_by_username(username)
if user is None:
return user
return dict(
id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
active=user.active,
admin=user.admin,
ldap=user.ldap_dn
)
def get_users(self, apiuser):
Get all users
result = []
for user in User.getAll():
result.append(
dict(
return result
def create_user(self, apiuser, username, password, email, firstname=None,
def create_user(self, apiuser, username, email, password, firstname=None,
lastname=None, active=True, admin=False, ldap_dn=None):
Create new user
:param password:
:param email:
:param name:
:param lastname:
:param active:
:param admin:
:param ldap_dn:
if User.get_by_username(username):
raise JSONRPCError("user %s already exist" % username)
if User.get_by_email(email, case_insensitive=True):
raise JSONRPCError("email %s already exist" % email)
if ldap_dn:
# generate temporary password if ldap_dn
password = PasswordGenerator().gen_password(length=8)
usr = UserModel().create_or_update(
username, password, email, firstname,
lastname, active, admin, ldap_dn
Session.commit()
id=usr.user_id,
msg='created new user %s' % username
log.error(traceback.format_exc())
raise JSONRPCError('failed to create user %s' % username)
def update_user(self, apiuser, username, password, email, firstname=None,
Updates given user
if not User.get_by_username(username):
raise JSONRPCError("user %s does not exist" % username)
msg='updated user %s' % username
raise JSONRPCError('failed to update user %s' % username)
def get_users_group(self, apiuser, group_name):
Get users group by name
:param group_name:
users_group = UsersGroup.get_by_group_name(group_name)
if not users_group:
return None
members = []
for user in users_group.members:
user = user.user
members.append(dict(id=user.user_id,
ldap=user.ldap_dn))
return dict(id=users_group.users_group_id,
group_name=users_group.users_group_name,
active=users_group.users_group_active,
members=members)
def get_users_groups(self, apiuser):
Get all users groups
for users_group in UsersGroup.getAll():
Status change: