.. _api:
===
API
Starting from RhodeCode version 1.2 a simple API was implemented.
There's a single schema for calling all api methods. API is implemented
with JSON protocol both ways. An url to send API request to RhodeCode is
<your_server>/_admin/api
API ACCESS FOR WEB VIEWS
++++++++++++++++++++++++
API access can also be turned on for each web view in RhodeCode that is
decorated with `@LoginRequired` decorator. To enable API access simple change
the standard login decorator to `@LoginRequired(api_access=True)`.
After this change, a rhodecode view can be accessed without login by adding a
GET parameter `?api_key=<api_key>` to url. By default this is only
enabled on RSS/ATOM feed views.
API ACCESS
++++++++++
All clients are required to send JSON-RPC spec JSON data::
{
"id:"<id>",
"api_key":"<api_key>",
"method":"<method_name>",
"args":{"<arg_key>":"<arg_val>"}
}
Example call for autopulling remotes repos using curl::
curl https://server.com/_admin/api -X POST -H 'content-type:text/plain' --data-binary '{"id":1,"api_key":"xe7cdb2v278e4evbdf5vs04v832v0efvcbcve4a3","method":"pull","args":{"repo":"CPython"}}'
Simply provide
- *id* A value of any type, which is used to match the response with the request that it is replying to.
- *api_key* for access and permission validation.
- *method* is name of method to call
- *args* is an key:value list of arguments to pass to method
.. note::
api_key can be found in your user account page
RhodeCode API will return always a JSON-RPC response::
"id":<id>, # matching id sent by request
"result": "<result>"|null, # JSON formatted result, null if any errors
"error": "null"|<error_message> # JSON formatted error (if any)
All responses from API will be `HTTP/1.0 200 OK`, if there's an error while
calling api *error* key from response will contain failure description
and result will be null.
API CLIENT
From version 1.4 RhodeCode adds a script that allows to easily
communicate with API. After installing RhodeCode a `rhodecode-api` script
will be available.
To get started quickly simply run::
rhodecode-api _create_config --apikey=<youapikey> --apihost=<rhodecode host>
This will create a file named .config in the directory you executed it storing
json config file with credentials. You can skip this step and always provide
both of the arguments to be able to communicate with server
after that simply run any api command for example get_repo::
rhodecode-api get_repo
calling {"api_key": "<apikey>", "id": 75, "args": {}, "method": "get_repo"} to http://127.0.0.1:5000
rhodecode said:
{'error': 'Missing non optional `repoid` arg in JSON DATA',
'id': 75,
'result': None}
Ups looks like we forgot to add an argument
Let's try again now giving the repoid as parameters::
rhodecode-api get_repo repoid:rhodecode
calling {"api_key": "<apikey>", "id": 39, "args": {"repoid": "rhodecode"}, "method": "get_repo"} to http://127.0.0.1:5000
{'error': None,
'id': 39,
'result': <json data...>}
API METHODS
+++++++++++
pull
----
Pulls given repo from remote location. Can be used to automatically keep
remote repos up to date. This command can be executed only using api_key
belonging to user with admin rights
INPUT::
id : <id_for_response>
api_key : "<api_key>"
method : "pull"
args : {
"repoid" : "<reponame or repo_id>"
OUTPUT::
id : <id_given_in_input>
result : "Pulled from `<reponame>`"
error : null
rescan_repos
------------
Dispatch rescan repositories action. If remove_obsolete is set
RhodeCode will delete repos that are in database but not in the filesystem.
This command can be executed only using api_key belonging to user with admin
rights.
method : "rescan_repos"
"remove_obsolete" : "<boolean = Optional(False)>"
result : "{'added': [<list of names of added repos>],
'removed': [<list of names of removed repos>]}"
invalidate_cache
----------------
Invalidate cache for repository.
rights or regular user that have write or admin or write access to repository.
method : "invalidate_cache"
result : "Cache for repository `<reponame>` was invalidated: invalidated cache keys: <list_of_cache_keys>"
lock
Set locking state on given repository by given user. If userid param is skipped
, then it is set to id of user whos calling this method.
rights or regular user that have admin or write access to repository.
method : "lock"
"userid" : "<user_id or username = Optional(=apiuser)>",
"locked" : "<bool true|false>"
result : "User `<username>` set lock state for repo `<reponame>` to `true|false`"
show_ip
-------
Shows IP address as seen from RhodeCode server, together with all
defined IP addresses for given user.
method : "show_ip"
"userid" : "<user_id or username>",
result : {
"ip_addr_server": <ip_from_clien>",
"user_ips": [
"ip_addr": "<ip_with_mask>",
"ip_range": ["<start_ip>", "<end_ip>"],
},
...
]
get_user
--------
Get's an user by username or user_id, Returns empty result if user is not found.
If userid param is skipped it is set to id of user who is calling this method.
rights, or regular users that cannot specify different userid than theirs
method : "get_user"
"userid" : "<username or user_id Optional(=apiuser)>"
result: None if user does not exist or
"user_id" : "<user_id>",
"api_key" : "<api_key>",
"username" : "<username>",
"firstname": "<firstname>",
"lastname" : "<lastname>",
"email" : "<email>",
"emails": "<list_of_all_additional_emails>",
"ip_addresses": "<list_of_ip_addresses_for_user>",
"active" : "<bool>",
"admin" :Â "<bool>",
"ldap_dn" : "<ldap_dn>",
"last_login": "<last_login>",
"permissions": {
"global": ["hg.create.repository",
"repository.read",
"hg.register.manual_activate"],
"repositories": {"repo1": "repository.none"},
"repositories_groups": {"Group1": "group.read"}
error: null
get_users
---------
Lists all existing users. This command can be executed only using api_key
belonging to user with admin rights.
method : "get_users"
args : { }
result: [
…
create_user
-----------
Creates new user. This command can
be executed only using api_key belonging to user with admin rights.
method : "create_user"
"email" : "<useremail>",
"password" : "<password>",
"firstname" : "<firstname> = Optional(None)",
"lastname" : "<lastname> = Optional(None)",
"active" : "<bool> = Optional(True)",
"admin" : "<bool> = Optional(False)",
"ldap_dn" : "<ldap_dn> = Optional(None)"
result: {
"msg" : "created new user `<username>`",
"user": {
update_user
updates given user if such user exists. This command can
method : "update_user"
"username" : "<username> = Optional(None)",
"email" : "<useremail> = Optional(None)",
"password" : "<password> = Optional(None)",
"active" : "<bool> = Optional(None)",
"admin" : "<bool> = Optional(None)",
"msg" : "updated user ID:<userid> <username>",
delete_user
deletes givenuser if such user exists. This command can
method : "delete_user"
"msg" : "deleted user ID:<userid> <username>",
"user": null
get_users_group
---------------
Gets an existing users group. This command can be executed only using api_key
method : "get_users_group"
"usersgroupid" : "<users group id or name>"
result : None if group not exist
"users_group_id" : "<id>",
"group_name" : "<groupname>",
"active": "<bool>",
"members" : [
get_users_groups
Lists all existing users groups. This command can be executed only using
api_key belonging to user with admin rights.
method : "get_users_groups"
result : [
create_users_group
------------------
Creates new users group. This command can be executed only using api_key
method : "create_users_group"
args: {
"group_name": "<groupname>",
"active":"<bool> = Optional(True)"
"msg": "created new users group `<groupname>`",
"users_group": {
add_user_to_users_group
-----------------------
Adds a user to a users group. If user exists in that group success will be
`false`. This command can be executed only using api_key
method : "add_user_users_group"
"usersgroupid" : "<users group id or name>",
"success": True|False # depends on if member is in group
"msg": "added member `<username>` to users group `<groupname>` |
User is already in that group"
remove_user_from_users_group
----------------------------
Removes a user from a users group. If user is not in given group success will
be `false`. This command can be executed only
using api_key belonging to user with admin rights
method : "remove_user_from_users_group"
"success": True|False, # depends on if member is in group
"msg": "removed member <username> from users group <groupname> |
User wasn't in group"
get_repo
Gets an existing repository by it's name or repository_id. Members will return
either users_group or user associated to that repository. This command can be
executed only using api_key belonging to user with admin
rights or regular user that have at least read access to repository.
method : "get_repo"
result: None if repository does not exist or
"repo_id" : "<repo_id>",
"repo_name" : "<reponame>"
"repo_type" : "<repo_type>",
"clone_uri" : "<clone_uri>",
"enable_downloads": "<bool>",
"enable_locking": "<bool>",
"enable_statistics": "<bool>",
"private": "<bool>",
"created_on" : "<date_time_created>",
"description" : "<description>",
"landing_rev": "<landing_rev>",
"last_changeset": {
"author": "<full_author>",
"date": "<date_time_of_commit>",
"message": "<commit_message>",
"raw_id": "<raw_id>",
"revision": "<numeric_revision>",
"short_id": "<short_id>"
"owner": "<repo_owner>",
"fork_of": "<name_of_fork_parent>",
"type": "user",
"permission" : "repository.(read|write|admin)"
"type": "users_group",
"id" : "<usersgroupid>",
"name" : "<usersgroupname>",
"followers": [
get_repos
Lists all existing repositories. This command can be executed only using
api_key belonging to user with admin rights or regular user that have
admin, write or read access to repository.
method : "get_repos"
args: { }
"private": : "<bool>",
"created_on" : "<datetimecreated>",
get_repo_nodes
--------------
returns a list of nodes and it's children in a flat list for a given path
at given revision. It's possible to specify ret_type to show only `files` or
`dirs`. This command can be executed only using api_key belonging to user
with admin rights
method : "get_repo_nodes"
"revision" : "<revision>",
"root_path" : "<root_path>",
"ret_type" : "<ret_type> = Optional('all')"
"name" : "<name>"
"type" : "<type>",
create_repo
Creates a repository. If repository name contains "/", all needed repository
groups will be created. For example "foo/bar/baz" will create groups
"foo", "bar" (with "foo" as parent), and create "baz" repository with
"bar" as group. This command can be executed only using api_key belonging to user with admin
rights or regular user that have create repository permission. Regular users
cannot specify owner parameter
method : "create_repo"
"repo_name" : "<reponame>",
"owner" : "<onwer_name_or_id = Optional(=apiuser)>",
"repo_type" : "<repo_type> = Optional('hg')",
"description" : "<description> = Optional('')",
"private" : "<bool> = Optional(False)",
"clone_uri" : "<clone_uri> = Optional(None)",
"landing_rev" : "<landing_rev> = Optional('tip')",
"enable_downloads": "<bool> = Optional(False)",
"enable_locking": "<bool> = Optional(False)",
"enable_statistics": "<bool> = Optional(False)",
"msg": "Created new repository `<reponame>`",
"repo": {
"owner": "<username or user_id>",
fork_repo
Creates a fork of given repo. In case of using celery this will
immidiatelly return success message, while fork is going to be created
asynchronous. This command can be executed only using api_key belonging to
user with admin rights or regular user that have fork permission, and at least
read access to forking repository. Regular users cannot specify owner parameter.
method : "fork_repo"
"repoid" : "<reponame or repo_id>",
"fork_name": "<forkname>",
"owner": "<username or user_id = Optional(=apiuser)>",
"description": "<description>",
"copy_permissions": "<bool>",
"landing_rev": "<landing_rev>"
"msg": "Created fork of `<reponame>` as `<forkname>`",
"success": true
delete_repo
Deletes a repository. This command can be executed only using api_key belonging to user with admin
rights or regular user that have admin access to repository.
method : "delete_repo"
"msg": "Deleted repository `<reponame>`",
grant_user_permission
---------------------
Grant permission for user on given repository, or update existing one
if found. This command can be executed only using api_key belonging to user
with admin rights.
method : "grant_user_permission"
"userid" : "<username or user_id>"
"perm" : "(repository.(none|read|write|admin))",
"msg" : "Granted perm: `<perm>` for user: `<username>` in repo: `<reponame>`",
revoke_user_permission
----------------------
Revoke permission for user on given repository. This command can be executed
only using api_key belonging to user with admin rights.
method : "revoke_user_permission"
"msg" : "Revoked perm for user: `<username>` in repo: `<reponame>`",
grant_users_group_permission
Grant permission for users group on given repository, or update
existing one if found. This command can be executed only using
method : "grant_users_group_permission"
# -*- coding: utf-8 -*-
"""
rhodecode.controllers.api
~~~~~~~~~~~~~~~~~~~~~~~~~
API controller for RhodeCode
:created_on: Aug 20, 2011
:author: marcink
:copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
import traceback
import logging
from pylons.controllers.util import abort
from rhodecode.controllers.api import JSONRPCController, JSONRPCError
from rhodecode.lib.auth import PasswordGenerator, AuthUser, \
HasPermissionAllDecorator, HasPermissionAnyDecorator, \
HasPermissionAnyApi, HasRepoPermissionAnyApi
from rhodecode.lib.utils import map_groups, repo2db_mapper
from rhodecode.model.meta import Session
from rhodecode.model.scm import ScmModel
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.users_group import UsersGroupModel
from rhodecode.model.permission import PermissionModel
from rhodecode.model.db import Repository, RhodeCodeSetting, UserIpMap
log = logging.getLogger(__name__)
class OptionalAttr(object):
Special Optional Option that defines other attribute
def __init__(self, attr_name):
self.attr_name = attr_name
def __repr__(self):
return '<OptionalAttr:%s>' % self.attr_name
def __call__(self):
return self
#alias
OAttr = OptionalAttr
class Optional(object):
Defines an optional parameter::
param = param.getval() if isinstance(param, Optional) else param
param = param() if isinstance(param, Optional) else param
is equivalent of::
param = Optional.extract(param)
def __init__(self, type_):
self.type_ = type_
return '<Optional:%s>' % self.type_.__repr__()
return self.getval()
def getval(self):
returns value from this Optional instance
return self.type_
@classmethod
def extract(cls, val):
if isinstance(val, cls):
return val.getval()
return val
def get_user_or_error(userid):
Get user by id or name or return JsonRPCError if not found
:param userid:
user = UserModel().get_user(userid)
if user is None:
raise JSONRPCError("user `%s` does not exist" % userid)
return user
def get_repo_or_error(repoid):
Get repo by id or name or return JsonRPCError if not found
repo = RepoModel().get_repo(repoid)
if repo is None:
raise JSONRPCError('repository `%s` does not exist' % (repoid))
return repo
def get_users_group_or_error(usersgroupid):
Get users group by id or name or return JsonRPCError if not found
users_group = UsersGroupModel().get_group(usersgroupid)
if users_group is None:
raise JSONRPCError('users group `%s` does not exist' % usersgroupid)
return users_group
def get_perm_or_error(permid):
Get permission by id or name or return JsonRPCError if not found
perm = PermissionModel().get_permission_by_name(permid)
if perm is None:
raise JSONRPCError('permission `%s` does not exist' % (permid))
return perm
class ApiController(JSONRPCController):
API Controller
Each method needs to have USER as argument this is then based on given
API_KEY propagated as instance of user object
Preferably this should be first argument also
Each function should also **raise** JSONRPCError for any
errors that happens
@HasPermissionAllDecorator('hg.admin')
def pull(self, apiuser, repoid):
Dispatch pull action on given repo
:param apiuser:
:param repoid:
repo = get_repo_or_error(repoid)
try:
ScmModel().pull_changes(repo.repo_name,
self.rhodecode_user.username)
return 'Pulled from `%s`' % repo.repo_name
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'Unable to pull changes from `%s`' % repo.repo_name
)
def rescan_repos(self, apiuser, remove_obsolete=Optional(False)):
than also delete repos that are in database but not in the filesystem.
aka "clean zombies"
:param remove_obsolete:
rm_obsolete = Optional.extract(remove_obsolete)
added, removed = repo2db_mapper(ScmModel().repo_scan(),
remove_obsolete=rm_obsolete)
return {'added': added, 'removed': removed}
'Error occurred during rescan repositories action'
def invalidate_cache(self, apiuser, repoid):
Dispatch cache invalidation action on given repo
if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
# check if we have admin permission for this repo !
if HasRepoPermissionAnyApi('repository.admin',
'repository.write')(user=apiuser,
repo_name=repo.repo_name) is False:
invalidated_keys = ScmModel().mark_for_invalidation(repo.repo_name)
Session().commit()
return ('Cache for repository `%s` was invalidated: '
'invalidated cache keys: %s' % (repoid, invalidated_keys))
'Error occurred during cache invalidation action'
def lock(self, apiuser, repoid, locked, userid=Optional(OAttr('apiuser'))):
Set locking state on particular repository by given user, if
this command is runned by non-admin account userid is set to user
who is calling this method
:param locked:
if HasPermissionAnyApi('hg.admin')(user=apiuser):
pass
elif HasRepoPermissionAnyApi('repository.admin',
repo_name=repo.repo_name):
#make sure normal user does not pass someone else userid,
#he is not allowed to do that
if not isinstance(userid, Optional) and userid != apiuser.user_id:
'userid is not the same as your user'
else:
if isinstance(userid, Optional):
userid = apiuser.user_id
user = get_user_or_error(userid)
locked = bool(locked)
if locked:
Repository.lock(repo, user.user_id)
Repository.unlock(repo)
return ('User `%s` set lock state for repo `%s` to `%s`'
% (user.username, repo.repo_name, locked))
'Error occurred locking repository `%s`' % repo.repo_name
def show_ip(self, apiuser, userid):
defined IP addresses for given user
ips = UserIpMap.query().filter(UserIpMap.user == user).all()
return dict(
ip_addr_server=self.ip_addr,
user_ips=ips
def get_user(self, apiuser, userid=Optional(OAttr('apiuser'))):
""""
Get a user by username, or userid, if userid is given
data = user.get_api_data()
data['permissions'] = AuthUser(user_id=user.user_id).permissions
return data
def get_users(self, apiuser):
Get all users
result = []
for user in UserModel().get_all():
result.append(user.get_api_data())
return result
def create_user(self, apiuser, username, email, password,
firstname=Optional(None), lastname=Optional(None),
active=Optional(True), admin=Optional(False),
ldap_dn=Optional(None)):
Create new user
:param username:
:param email:
:param password:
:param firstname:
:param lastname:
:param active:
:param admin:
:param ldap_dn:
if UserModel().get_by_username(username):
raise JSONRPCError("user `%s` already exist" % username)
if UserModel().get_by_email(email, case_insensitive=True):
raise JSONRPCError("email `%s` already exist" % email)
if Optional.extract(ldap_dn):
# generate temporary password if ldap_dn
password = PasswordGenerator().gen_password(length=8)
user = UserModel().create_or_update(
username=Optional.extract(username),
password=Optional.extract(password),
email=Optional.extract(email),
firstname=Optional.extract(firstname),
lastname=Optional.extract(lastname),
active=Optional.extract(active),
admin=Optional.extract(admin),
ldap_dn=Optional.extract(ldap_dn)
msg='created new user `%s`' % username,
user=user.get_api_data()
raise JSONRPCError('failed to create user `%s`' % username)
def update_user(self, apiuser, userid, username=Optional(None),
email=Optional(None), firstname=Optional(None),
lastname=Optional(None), active=Optional(None),
admin=Optional(None), ldap_dn=Optional(None),
password=Optional(None)):
Updates given user
# call function and store only updated arguments
updates = {}
def store_update(attr, name):
if not isinstance(attr, Optional):
updates[name] = attr
store_update(username, 'username')
store_update(password, 'password')
store_update(email, 'email')
store_update(firstname, 'name')
store_update(lastname, 'lastname')
store_update(active, 'active')
store_update(admin, 'admin')
store_update(ldap_dn, 'ldap_dn')
user = UserModel().update_user(user, **updates)
msg='updated user ID:%s %s' % (user.user_id, user.username),
raise JSONRPCError('failed to update user `%s`' % userid)
def delete_user(self, apiuser, userid):
Deletes an user
UserModel().delete(userid)
msg='deleted user ID:%s %s' % (user.user_id, user.username),
user=None
raise JSONRPCError('failed to delete ID:%s %s' % (user.user_id,
user.username))
def get_users_group(self, apiuser, usersgroupid):
Get users group by name or id
:param usersgroupid:
users_group = get_users_group_or_error(usersgroupid)
data = users_group.get_api_data()
members = []
for user in users_group.members:
user = user.user
members.append(user.get_api_data())
data['members'] = members
def get_users_groups(self, apiuser):
Get all users groups
for users_group in UsersGroupModel().get_all():
result.append(users_group.get_api_data())
def create_users_group(self, apiuser, group_name, active=Optional(True)):
Creates an new usergroup
:param group_name:
if UsersGroupModel().get_by_name(group_name):
raise JSONRPCError("users group `%s` already exist" % group_name)
active = Optional.extract(active)
ug = UsersGroupModel().create(name=group_name, active=active)
msg='created new users group `%s`' % group_name,
users_group=ug.get_api_data()
raise JSONRPCError('failed to create group `%s`' % group_name)
def add_user_to_users_group(self, apiuser, usersgroupid, userid):
Add a user to a users group
ugm = UsersGroupModel().add_user_to_group(users_group, user)
success = True if ugm != True else False
msg = 'added member `%s` to users group `%s`' % (
user.username, users_group.users_group_name
msg = msg if success else 'User is already in that group'
success=success,
msg=msg
'failed to add member to users group `%s`' % (
users_group.users_group_name
def remove_user_from_users_group(self, apiuser, usersgroupid, userid):
Remove user from a group
success = UsersGroupModel().remove_user_from_group(users_group,
user)
msg = 'removed member `%s` from users group `%s`' % (
msg = msg if success else "User wasn't in group"
return dict(success=success, msg=msg)
'failed to remove member from users group `%s`' % (
def get_repo(self, apiuser, repoid):
Get repository by name
if HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
followers = []
for user in repo.repo_to_perm:
perm = user.permission.permission_name
user_data = user.get_api_data()
user_data['type'] = "user"
user_data['permission'] = perm
members.append(user_data)
for users_group in repo.users_group_to_perm:
perm = users_group.permission.permission_name
users_group = users_group.users_group
users_group_data = users_group.get_api_data()
users_group_data['type'] = "users_group"
users_group_data['permission'] = perm
members.append(users_group_data)
for user in repo.followers:
followers.append(user.user.get_api_data())
data = repo.get_api_data()
data['followers'] = followers
def get_repos(self, apiuser):
Get all repositories
repos = RepoModel().get_all_user_repos(user=apiuser)
repos = RepoModel().get_all()
for repo in repos:
result.append(repo.get_api_data())
def get_repo_nodes(self, apiuser, repoid, revision, root_path,
ret_type='all'):
returns a list of nodes and it's children
for a given path at given revision. It's possible to specify ret_type
to show only files or dirs
:param repoid: name or id of repository
:param revision: revision for which listing should be done
:param root_path: path from which start displaying
:param ret_type: return type 'all|files|dirs' nodes
_d, _f = ScmModel().get_nodes(repo, revision, root_path,
flat=False)
_map = {
'all': _d + _f,
'files': _f,
'dirs': _d,
return _map[ret_type]
except KeyError:
raise JSONRPCError('ret_type must be one of %s' % _map.keys())
'failed to get repo: `%s` nodes' % repo.repo_name
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def create_repo(self, apiuser, repo_name, owner=Optional(OAttr('apiuser')),
repo_type=Optional('hg'),
description=Optional(''), private=Optional(False),
clone_uri=Optional(None), landing_rev=Optional('tip'),
enable_statistics=Optional(False),
enable_locking=Optional(False),
enable_downloads=Optional(False)):
Create repository, if clone_url is given it makes a remote clone
if repo_name is within a group name the groups will be created
automatically if they aren't present
:param repo_name:
:param onwer:
:param repo_type:
:param description:
:param private:
:param clone_uri:
:param landing_rev:
if not isinstance(owner, Optional):
#forbid setting owner for non-admins
'Only RhodeCode admin can specify `owner` param'
if isinstance(owner, Optional):
owner = apiuser.user_id
owner = get_user_or_error(owner)
if RepoModel().get_by_repo_name(repo_name):
raise JSONRPCError("repo `%s` already exist" % repo_name)
defs = RhodeCodeSetting.get_default_repo_settings(strip_prefix=True)
if isinstance(private, Optional):
private = defs.get('repo_private') or Optional.extract(private)
if isinstance(repo_type, Optional):
repo_type = defs.get('repo_type')
if isinstance(enable_statistics, Optional):
enable_statistics = defs.get('repo_enable_statistics')
if isinstance(enable_locking, Optional):
enable_locking = defs.get('repo_enable_locking')
if isinstance(enable_downloads, Optional):
enable_downloads = defs.get('repo_enable_downloads')
clone_uri = Optional.extract(clone_uri)
description = Optional.extract(description)
landing_rev = Optional.extract(landing_rev)
# create structure of groups and return the last group
group = map_groups(repo_name)
repo = RepoModel().create_repo(
repo_name=repo_name,
repo_type=repo_type,
description=description,
owner=owner,
private=private,
clone_uri=clone_uri,
repos_group=group,
landing_rev=landing_rev,
enable_statistics=enable_statistics,
enable_downloads=enable_downloads,
enable_locking=enable_locking
msg="Created new repository `%s`" % (repo.repo_name),
repo=repo.get_api_data()
raise JSONRPCError('failed to create repository `%s`' % repo_name)
@HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
def fork_repo(self, apiuser, repoid, fork_name, owner=Optional(OAttr('apiuser')),
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('tip')):
repo_name = repo.repo_name
_repo = RepoModel().get_by_repo_name(fork_name)
if _repo:
type_ = 'fork' if _repo.fork else 'repo'
raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
'repository.write',
'repository.read')(user=apiuser,
group = map_groups(fork_name)
form_data = dict(
repo_name=fork_name,
repo_name_full=fork_name,
repo_group=group,
repo_type=repo.repo_type,
description=Optional.extract(description),
private=Optional.extract(private),
copy_permissions=Optional.extract(copy_permissions),
landing_rev=Optional.extract(landing_rev),
update_after_clone=False,
fork_parent_id=repo.repo_id,
RepoModel().create_fork(form_data, cur_user=owner)
msg='Created fork of `%s` as `%s`' % (repo.repo_name,
fork_name),
success=True # cannot return the repo data here since fork
# cann be done async
'failed to fork repository `%s` as `%s`' % (repo_name,
fork_name)
def delete_repo(self, apiuser, repoid):
Deletes a given repository
RepoModel().delete(repo)
msg='Deleted repository `%s`' % repo.repo_name,
success=True
'failed to delete repository `%s`' % repo.repo_name
def grant_user_permission(self, apiuser, repoid, userid, perm):
if found
:param perm:
perm = get_perm_or_error(perm)
RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
perm.permission_name, user.username, repo.repo_name
),
'failed to edit permission for user: `%s` in repo: `%s`' % (
userid, repoid
def revoke_user_permission(self, apiuser, repoid, userid):
Revoke permission for user on given repository
RepoModel().revoke_user_permission(repo=repo, user=user)
msg='Revoked perm for user: `%s` in repo: `%s`' % (
user.username, repo.repo_name
def grant_users_group_permission(self, apiuser, repoid, usersgroupid,
perm):
existing one if found
RepoModel().grant_users_group_permission(repo=repo,
group_name=users_group,
perm=perm)
msg='Granted perm: `%s` for users group: `%s` in '
'repo: `%s`' % (
perm.permission_name, users_group.users_group_name,
repo.repo_name
'failed to edit permission for users group: `%s` in '
usersgroupid, repo.repo_name
def revoke_users_group_permission(self, apiuser, repoid, usersgroupid):
Revoke permission for users group on given repository
RepoModel().revoke_users_group_permission(repo=repo,
group_name=users_group)
msg='Revoked perm for users group: `%s` in repo: `%s`' % (
users_group.users_group_name, repo.repo_name
from __future__ import with_statement
import random
import mock
from rhodecode.tests import *
from rhodecode.lib.compat import json
from rhodecode.lib.auth import AuthUser
from rhodecode.model.db import Repository
API_URL = '/_admin/api'
def _build_data(apikey, method, **kw):
Builds API data with given random ID
:param random_id:
:type random_id:
random_id = random.randrange(1, 9999)
return random_id, json.dumps({
"id": random_id,
"api_key": apikey,
"method": method,
"args": kw
})
jsonify = lambda obj: json.loads(json.dumps(obj))
def crash(*args, **kwargs):
raise Exception('Total Crash !')
def api_call(test_obj, params):
response = test_obj.app.post(API_URL, content_type='application/json',
params=params)
return response
TEST_USERS_GROUP = 'test_users_group'
def make_users_group(name=TEST_USERS_GROUP):
gr = UsersGroupModel().create(name=name)
UsersGroupModel().add_user_to_group(users_group=gr,
user=TEST_USER_ADMIN_LOGIN)
return gr
def destroy_users_group(name=TEST_USERS_GROUP):
UsersGroupModel().delete(users_group=name, force=True)
def create_repo(repo_name, repo_type, owner=None):
# create new repo
form_data = _get_repo_create_params(
repo_name_full=repo_name,
repo_description='description %s' % repo_name,
cur_user = UserModel().get_by_username(owner or TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
return r
def create_fork(fork_name, fork_type, fork_of):
fork = RepoModel(Session())._get_repo(fork_of)
r = create_repo(fork_name, fork_type)
r.fork = fork
Session().add(r)
def destroy_repo(repo_name):
RepoModel().delete(repo_name)
class BaseTestApi(object):
REPO = None
REPO_TYPE = None
def setUpClass(self):
self.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
self.apikey = self.usr.api_key
self.test_user = UserModel().create_or_update(
username='test-api',
password='test',
email='test@api.rhodecode.org',
firstname='first',
lastname='last'
self.TEST_USER_LOGIN = self.test_user.username
self.apikey_regular = self.test_user.api_key
def teardownClass(self):
def setUp(self):
self.maxDiff = None
make_users_group()
def tearDown(self):
destroy_users_group()
def _compare_ok(self, id_, expected, given):
expected = jsonify({
'id': id_,
'error': None,
'result': expected
given = json.loads(given)
self.assertEqual(expected, given)
def _compare_error(self, id_, expected, given):
'error': expected,
'result': None
# def test_Optional(self):
# from rhodecode.controllers.api.api import Optional
# option1 = Optional(None)
# self.assertEqual('<Optional:%s>' % None, repr(option1))
# self.assertEqual(1, Optional.extract(Optional(1)))
# self.assertEqual('trololo', Optional.extract('trololo'))
def test_api_wrong_key(self):
id_, params = _build_data('trololo', 'get_user')
response = api_call(self, params)
expected = 'Invalid API KEY'
self._compare_error(id_, expected, given=response.body)
def test_api_missing_non_optional_param(self):
id_, params = _build_data(self.apikey, 'get_repo')
expected = 'Missing non optional `repoid` arg in JSON DATA'
def test_api_missing_non_optional_param_args_null(self):
params = params.replace('"args": {}', '"args": null')
def test_api_missing_non_optional_param_args_bad(self):
params = params.replace('"args": {}', '"args": 1')
def test_api_args_is_null(self):
id_, params = _build_data(self.apikey, 'get_users',)
self.assertEqual(response.status, '200 OK')
def test_api_args_is_bad(self):
def test_api_get_users(self):
ret_all = []
for usr in UserModel().get_all():
ret = usr.get_api_data()
ret_all.append(jsonify(ret))
expected = ret_all
self._compare_ok(id_, expected, given=response.body)
def test_api_get_user(self):
id_, params = _build_data(self.apikey, 'get_user',
userid=TEST_USER_ADMIN_LOGIN)
usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
ret['permissions'] = AuthUser(usr.user_id).permissions
expected = ret
def test_api_get_user_that_does_not_exist(self):
userid='trololo')
expected = "user `%s` does not exist" % 'trololo'
def test_api_get_user_without_giving_userid(self):
id_, params = _build_data(self.apikey, 'get_user')
def test_api_get_user_without_giving_userid_non_admin(self):
id_, params = _build_data(self.apikey_regular, 'get_user')
usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
def test_api_get_user_with_giving_userid_non_admin(self):
id_, params = _build_data(self.apikey_regular, 'get_user',
userid=self.TEST_USER_LOGIN)
expected = 'userid is not the same as your user'
def test_api_pull(self):
#TODO: issues with rhodecode_extras here.. not sure why !
# repo_name = 'test_pull'
# r = create_repo(repo_name, self.REPO_TYPE)
# r.clone_uri = TEST_self.REPO
# Session.add(r)
# Session.commit()
# id_, params = _build_data(self.apikey, 'pull',
# repoid=repo_name,)
# response = self.app.post(API_URL, content_type='application/json',
# params=params)
# expected = 'Pulled from `%s`' % repo_name
# self._compare_ok(id_, expected, given=response.body)
# destroy_repo(repo_name)
def test_api_pull_error(self):
id_, params = _build_data(self.apikey, 'pull',
repoid=self.REPO,)
expected = 'Unable to pull changes from `%s`' % self.REPO
def test_api_rescan_repos(self):
id_, params = _build_data(self.apikey, 'rescan_repos')
expected = {'added': [], 'removed': []}
@mock.patch.object(ScmModel, 'repo_scan', crash)
def test_api_rescann_error(self):
id_, params = _build_data(self.apikey, 'rescan_repos',)
expected = 'Error occurred during rescan repositories action'
def test_api_invalidate_cache(self):
id_, params = _build_data(self.apikey, 'invalidate_cache',
repoid=self.REPO)
expected = ("Cache for repository `%s` was invalidated: "
"invalidated cache keys: %s" % (self.REPO,
[unicode(self.REPO)]))
@mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
def test_api_invalidate_cache_error(self):
expected = 'Error occurred during cache invalidation action'
def test_api_lock_repo_lock_aquire(self):
id_, params = _build_data(self.apikey, 'lock',
userid=TEST_USER_ADMIN_LOGIN,
repoid=self.REPO,
locked=True)
expected = ('User `%s` set lock state for repo `%s` to `%s`'
% (TEST_USER_ADMIN_LOGIN, self.REPO, True))
def test_api_lock_repo_lock_aquire_by_non_admin(self):
repo_name = 'api_delete_me'
create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
id_, params = _build_data(self.apikey_regular, 'lock',
repoid=repo_name,
% (self.TEST_USER_LOGIN, repo_name, True))
finally:
destroy_repo(repo_name)
def test_api_lock_repo_lock_aquire_non_admin_with_userid(self):
def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
expected = 'repository `%s` does not exist' % (self.REPO)
def test_api_lock_repo_lock_release(self):
locked=False)
% (TEST_USER_ADMIN_LOGIN, self.REPO, False))
def test_api_lock_repo_lock_aquire_optional_userid(self):
@mock.patch.object(Repository, 'lock', crash)
def test_api_lock_error(self):
expected = 'Error occurred locking repository `%s`' % self.REPO
def test_api_create_existing_user(self):
id_, params = _build_data(self.apikey, 'create_user',
username=TEST_USER_ADMIN_LOGIN,
email='test@foo.com',
password='trololo')
expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
def test_api_create_user_with_existing_email(self):
username=TEST_USER_ADMIN_LOGIN + 'new',
email=TEST_USER_REGULAR_EMAIL,
expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
def test_api_create_user(self):
username = 'test_new_api_user'
email = username + "@foo.com"
username=username,
email=email,
usr = UserModel().get_by_username(username)
ret = dict(
user=jsonify(usr.get_api_data())
UserModel().delete(usr.user_id)
@mock.patch.object(UserModel, 'create_or_update', crash)
def test_api_create_user_when_exception_happened(self):
expected = 'failed to create user `%s`' % username
def test_api_delete_user(self):
usr = UserModel().create_or_update(username=u'test_user',
password=u'qweqwe',
email=u'u232@rhodecode.org',
firstname=u'u1', lastname=u'u1')
username = usr.username
email = usr.email
usr_id = usr.user_id
## DELETE THIS USER NOW
id_, params = _build_data(self.apikey, 'delete_user',
userid=username,)
ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
'user': None}
@mock.patch.object(UserModel, 'delete', crash)
def test_api_delete_user_when_exception_happened(self):
ret = 'failed to delete ID:%s %s' % (usr.user_id,
usr.username)
@parameterized.expand([('firstname', 'new_username'),
('lastname', 'new_username'),
('email', 'new_username'),
('admin', True),
('admin', False),
('ldap_dn', 'test'),
('ldap_dn', None),
('active', False),
('active', True),
('password', 'newpass')
])
def test_api_update_user(self, name, expected):
kw = {name: expected,
'userid': usr.user_id}
id_, params = _build_data(self.apikey, 'update_user', **kw)
ret = {
'msg': 'updated user ID:%s %s' % (usr.user_id, self.TEST_USER_LOGIN),
'user': jsonify(UserModel()\
.get_by_username(self.TEST_USER_LOGIN)\
.get_api_data())
def test_api_update_user_no_changed_params(self):
ret = jsonify(usr.get_api_data())
id_, params = _build_data(self.apikey, 'update_user',
'msg': 'updated user ID:%s %s' % (usr.user_id, TEST_USER_ADMIN_LOGIN),
'user': ret
def test_api_update_user_by_user_id(self):
userid=usr.user_id)
@mock.patch.object(UserModel, 'update_user', crash)
def test_api_update_user_when_exception_happens(self):
ret = 'failed to update user `%s`' % usr.user_id
def test_api_get_repo(self):
new_group = 'some_new_group'
make_users_group(new_group)
RepoModel().grant_users_group_permission(repo=self.REPO,
group_name=new_group,
perm='repository.read')
id_, params = _build_data(self.apikey, 'get_repo',
repo = RepoModel().get_by_repo_name(self.REPO)
ret = repo.get_api_data()
ret['members'] = members
ret['followers'] = followers
destroy_users_group(new_group)
def test_api_get_repo_by_non_admin(self):
def test_api_get_repo_by_non_admin_no_permission_to_repo(self):
RepoModel().grant_user_permission(repo=self.REPO,
user=self.TEST_USER_LOGIN,
perm='repository.none')
id_, params = _build_data(self.apikey_regular, 'get_repo',
def test_api_get_repo_that_doesn_not_exist(self):
repoid='no-such-repo')
ret = 'repository `%s` does not exist' % 'no-such-repo'
def test_api_get_repos(self):
id_, params = _build_data(self.apikey, 'get_repos')
for repo in RepoModel().get_all():
ret = jsonify(result)
def test_api_get_repos_non_admin(self):
id_, params = _build_data(self.apikey_regular, 'get_repos')
for repo in RepoModel().get_all_user_repos(self.TEST_USER_LOGIN):
@parameterized.expand([('all', 'all'),
('dirs', 'dirs'),
('files', 'files'), ])
def test_api_get_repo_nodes(self, name, ret_type):
rev = 'tip'
path = '/'
id_, params = _build_data(self.apikey, 'get_repo_nodes',
repoid=self.REPO, revision=rev,
root_path=path,
ret_type=ret_type)
# we don't the actual return types here since it's tested somewhere
# else
expected = json.loads(response.body)['result']
def test_api_get_repo_nodes_bad_revisions(self):
rev = 'i-dont-exist'
root_path=path,)
expected = 'failed to get repo: `%s` nodes' % self.REPO
def test_api_get_repo_nodes_bad_path(self):
path = '/idontexits'
def test_api_get_repo_nodes_bad_ret_type(self):
ret_type = 'error'
expected = 'ret_type must be one of %s' % (['files', 'dirs', 'all'])
def test_api_create_repo(self):
repo_name = 'api-repo'
id_, params = _build_data(self.apikey, 'create_repo',
owner=TEST_USER_ADMIN_LOGIN,
repo_type='hg',
repo = RepoModel().get_by_repo_name(repo_name)
'msg': 'Created new repository `%s`' % repo_name,
'repo': jsonify(repo.get_api_data())
def test_api_create_repo_unknown_owner(self):
owner = 'i-dont-exist'
expected = 'user `%s` does not exist' % owner
def test_api_create_repo_dont_specify_owner(self):
def test_api_create_repo_by_non_admin(self):
id_, params = _build_data(self.apikey_regular, 'create_repo',
def test_api_create_repo_by_non_admin_specify_owner(self):
owner=owner
expected = 'Only RhodeCode admin can specify `owner` param'
def test_api_create_repo_exists(self):
repo_name = self.REPO
expected = "repo `%s` already exist" % repo_name
@mock.patch.object(RepoModel, 'create_repo', crash)
def test_api_create_repo_exception_occurred(self):
expected = 'failed to create repository `%s`' % repo_name
def test_api_delete_repo(self):
create_repo(repo_name, self.REPO_TYPE)
id_, params = _build_data(self.apikey, 'delete_repo',
repoid=repo_name,)
'msg': 'Deleted repository `%s`' % repo_name,
'success': True
def test_api_delete_repo_by_non_admin(self):
id_, params = _build_data(self.apikey_regular, 'delete_repo',
def test_api_delete_repo_by_non_admin_no_permission(self):
expected = 'repository `%s` does not exist' % (repo_name)
def test_api_delete_repo_exception_occurred(self):
with mock.patch.object(RepoModel, 'delete', crash):
expected = 'failed to delete repository `%s`' % repo_name
def test_api_fork_repo(self):
fork_name = 'api-repo-fork'
id_, params = _build_data(self.apikey, 'fork_repo',
fork_name=fork_name,
'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
destroy_repo(fork_name)
def test_api_fork_repo_non_admin(self):
id_, params = _build_data(self.apikey_regular, 'fork_repo',
def test_api_fork_repo_non_admin_specify_owner(self):
def test_api_fork_repo_non_admin_no_permission_to_fork(self):
def test_api_fork_repo_unknown_owner(self):
def test_api_fork_repo_fork_exists(self):
create_fork(fork_name, self.REPO_TYPE, self.REPO)
expected = "fork `%s` already exist" % fork_name
def test_api_fork_repo_repo_exists(self):
fork_name = self.REPO
expected = "repo `%s` already exist" % fork_name
@mock.patch.object(RepoModel, 'create_fork', crash)
def test_api_fork_repo_exception_occurred(self):
expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
def test_api_get_users_group(self):
id_, params = _build_data(self.apikey, 'get_users_group',
usersgroupid=TEST_USERS_GROUP)
users_group = UsersGroupModel().get_group(TEST_USERS_GROUP)
ret = users_group.get_api_data()
def test_api_get_users_groups(self):
make_users_group('test_users_group2')
id_, params = _build_data(self.apikey, 'get_users_groups',)
expected = []
for gr_name in [TEST_USERS_GROUP, 'test_users_group2']:
users_group = UsersGroupModel().get_group(gr_name)
expected.append(ret)
UsersGroupModel().delete(users_group='test_users_group2')
def test_api_create_users_group(self):
group_name = 'some_new_group'
id_, params = _build_data(self.apikey, 'create_users_group',
group_name=group_name)
'msg': 'created new users group `%s`' % group_name,
'users_group': jsonify(UsersGroupModel()\
.get_by_name(group_name)\
destroy_users_group(group_name)
def test_api_get_users_group_that_exist(self):
group_name=TEST_USERS_GROUP)
expected = "users group `%s` already exist" % TEST_USERS_GROUP
@mock.patch.object(UsersGroupModel, 'create', crash)
def test_api_get_users_group_exception_occurred(self):
group_name = 'exception_happens'
expected = 'failed to create group `%s`' % group_name
def test_api_add_user_to_users_group(self):
gr_name = 'test_group'
UsersGroupModel().create(gr_name)
id_, params = _build_data(self.apikey, 'add_user_to_users_group',
usersgroupid=gr_name,
Status change: