.. _api:
===
API
Starting from RhodeCode version 1.2 a simple API was implemented.
There's a single schema for calling all api methods. API is implemented
with JSON protocol both ways. An url to send API request to RhodeCode is
<your_server>/_admin/api
API ACCESS FOR WEB VIEWS
++++++++++++++++++++++++
API access can also be turned on for each web view in RhodeCode that is
decorated with `@LoginRequired` decorator. To enable API access simple change
the standard login decorator to `@LoginRequired(api_access=True)`.
After this change, a rhodecode view can be accessed without login by adding a
GET parameter `?api_key=<api_key>` to url. By default this is only
enabled on RSS/ATOM feed views.
API ACCESS
++++++++++
All clients are required to send JSON-RPC spec JSON data::
{
"id:"<id>",
"api_key":"<api_key>",
"method":"<method_name>",
"args":{"<arg_key>":"<arg_val>"}
}
Example call for autopulling remotes repos using curl::
curl https://server.com/_admin/api -X POST -H 'content-type:text/plain' --data-binary '{"id":1,"api_key":"xe7cdb2v278e4evbdf5vs04v832v0efvcbcve4a3","method":"pull","args":{"repo":"CPython"}}'
Simply provide
- *id* A value of any type, which is used to match the response with the request that it is replying to.
- *api_key* for access and permission validation.
- *method* is name of method to call
- *args* is an key:value list of arguments to pass to method
.. note::
api_key can be found in your user account page
RhodeCode API will return always a JSON-RPC response::
"id":<id>, # matching id sent by request
"result": "<result>"|null, # JSON formatted result, null if any errors
"error": "null"|<error_message> # JSON formatted error (if any)
All responses from API will be `HTTP/1.0 200 OK`, if there's an error while
calling api *error* key from response will contain failure description
and result will be null.
API CLIENT
From version 1.4 RhodeCode adds a script that allows to easily
communicate with API. After installing RhodeCode a `rhodecode-api` script
will be available.
To get started quickly simply run::
rhodecode-api _create_config --apikey=<youapikey> --apihost=<rhodecode host>
This will create a file named .config in the directory you executed it storing
json config file with credentials. You can skip this step and always provide
both of the arguments to be able to communicate with server
after that simply run any api command for example get_repo::
rhodecode-api get_repo
calling {"api_key": "<apikey>", "id": 75, "args": {}, "method": "get_repo"} to http://127.0.0.1:5000
rhodecode said:
{'error': 'Missing non optional `repoid` arg in JSON DATA',
'id': 75,
'result': None}
Ups looks like we forgot to add an argument
Let's try again now giving the repoid as parameters::
rhodecode-api get_repo repoid:rhodecode
calling {"api_key": "<apikey>", "id": 39, "args": {"repoid": "rhodecode"}, "method": "get_repo"} to http://127.0.0.1:5000
{'error': None,
'id': 39,
'result': <json data...>}
API METHODS
+++++++++++
pull
----
Pulls given repo from remote location. Can be used to automatically keep
remote repos up to date. This command can be executed only using api_key
belonging to user with admin rights
INPUT::
id : <id_for_response>
api_key : "<api_key>"
method : "pull"
args : {
"repoid" : "<reponame or repo_id>"
OUTPUT::
id : <id_given_in_input>
result : "Pulled from `<reponame>`"
error : null
rescan_repos
------------
Dispatch rescan repositories action. If remove_obsolete is set
RhodeCode will delete repos that are in database but not in the filesystem.
This command can be executed only using api_key belonging to user with admin
rights.
method : "rescan_repos"
"remove_obsolete" : "<boolean = Optional(False)>"
result : "{'added': [<list of names of added repos>],
'removed': [<list of names of removed repos>]}"
invalidate_cache
----------------
Invalidate cache for repository.
rights or regular user that have write or admin or write access to repository.
method : "invalidate_cache"
result : "Cache for repository `<reponame>` was invalidated: invalidated cache keys: <list_of_cache_keys>"
lock
Set locking state on given repository by given user. If userid param is skipped
, then it is set to id of user whos calling this method.
rights or regular user that have admin or write access to repository.
method : "lock"
"userid" : "<user_id or username = Optional(=apiuser)>",
"locked" : "<bool true|false>"
result : "User `<username>` set lock state for repo `<reponame>` to `true|false`"
show_ip
-------
Shows IP address as seen from RhodeCode server, together with all
defined IP addresses for given user.
method : "show_ip"
"userid" : "<user_id or username>",
result : {
"ip_addr_server": <ip_from_clien>",
"user_ips": [
"ip_addr": "<ip_with_mask>",
"ip_range": ["<start_ip>", "<end_ip>"],
},
...
]
get_user
--------
Get's an user by username or user_id, Returns empty result if user is not found.
If userid param is skipped it is set to id of user who is calling this method.
rights, or regular users that cannot specify different userid than theirs
method : "get_user"
"userid" : "<username or user_id Optional(=apiuser)>"
result: None if user does not exist or
"user_id" : "<user_id>",
"api_key" : "<api_key>",
"username" : "<username>",
"firstname": "<firstname>",
"lastname" : "<lastname>",
"email" : "<email>",
"emails": "<list_of_all_additional_emails>",
"ip_addresses": "<list_of_ip_addresses_for_user>",
"active" : "<bool>",
"admin" :Â "<bool>",
"ldap_dn" : "<ldap_dn>",
"last_login": "<last_login>",
"permissions": {
"global": ["hg.create.repository",
"repository.read",
"hg.register.manual_activate"],
"repositories": {"repo1": "repository.none"},
"repositories_groups": {"Group1": "group.read"}
error: null
get_users
---------
Lists all existing users. This command can be executed only using api_key
belonging to user with admin rights.
method : "get_users"
args : { }
result: [
…
create_user
-----------
Creates new user. This command can
be executed only using api_key belonging to user with admin rights.
method : "create_user"
"email" : "<useremail>",
"password" : "<password>",
"firstname" : "<firstname> = Optional(None)",
"lastname" : "<lastname> = Optional(None)",
"active" : "<bool> = Optional(True)",
"admin" : "<bool> = Optional(False)",
"ldap_dn" : "<ldap_dn> = Optional(None)"
result: {
"msg" : "created new user `<username>`",
"user": {
update_user
updates given user if such user exists. This command can
@@ -13,384 +13,410 @@
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
import traceback
import logging
from pylons.controllers.util import abort
from rhodecode.controllers.api import JSONRPCController, JSONRPCError
from rhodecode.lib.auth import PasswordGenerator, AuthUser, \
HasPermissionAllDecorator, HasPermissionAnyDecorator, \
HasPermissionAnyApi, HasRepoPermissionAnyApi
from rhodecode.lib.utils import map_groups, repo2db_mapper
from rhodecode.model.meta import Session
from rhodecode.model.scm import ScmModel
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.users_group import UsersGroupModel
from rhodecode.model.permission import PermissionModel
from rhodecode.model.db import Repository, RhodeCodeSetting, UserIpMap
log = logging.getLogger(__name__)
class OptionalAttr(object):
"""
Special Optional Option that defines other attribute
def __init__(self, attr_name):
self.attr_name = attr_name
def __repr__(self):
return '<OptionalAttr:%s>' % self.attr_name
def __call__(self):
return self
#alias
OAttr = OptionalAttr
class Optional(object):
Defines an optional parameter::
param = param.getval() if isinstance(param, Optional) else param
param = param() if isinstance(param, Optional) else param
is equivalent of::
param = Optional.extract(param)
def __init__(self, type_):
self.type_ = type_
return '<Optional:%s>' % self.type_.__repr__()
return self.getval()
def getval(self):
returns value from this Optional instance
return self.type_
@classmethod
def extract(cls, val):
if isinstance(val, cls):
return val.getval()
return val
def get_user_or_error(userid):
Get user by id or name or return JsonRPCError if not found
:param userid:
user = UserModel().get_user(userid)
if user is None:
raise JSONRPCError("user `%s` does not exist" % userid)
return user
def get_repo_or_error(repoid):
Get repo by id or name or return JsonRPCError if not found
repo = RepoModel().get_repo(repoid)
if repo is None:
raise JSONRPCError('repository `%s` does not exist' % (repoid))
return repo
def get_users_group_or_error(usersgroupid):
Get users group by id or name or return JsonRPCError if not found
users_group = UsersGroupModel().get_group(usersgroupid)
if users_group is None:
raise JSONRPCError('users group `%s` does not exist' % usersgroupid)
return users_group
def get_perm_or_error(permid):
Get permission by id or name or return JsonRPCError if not found
perm = PermissionModel().get_permission_by_name(permid)
if perm is None:
raise JSONRPCError('permission `%s` does not exist' % (permid))
return perm
class ApiController(JSONRPCController):
API Controller
Each method needs to have USER as argument this is then based on given
API_KEY propagated as instance of user object
Preferably this should be first argument also
Each function should also **raise** JSONRPCError for any
errors that happens
@HasPermissionAllDecorator('hg.admin')
def pull(self, apiuser, repoid):
Dispatch pull action on given repo
:param apiuser:
:param repoid:
repo = get_repo_or_error(repoid)
try:
ScmModel().pull_changes(repo.repo_name,
self.rhodecode_user.username)
return 'Pulled from `%s`' % repo.repo_name
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'Unable to pull changes from `%s`' % repo.repo_name
)
def rescan_repos(self, apiuser, remove_obsolete=Optional(False)):
than also delete repos that are in database but not in the filesystem.
aka "clean zombies"
:param remove_obsolete:
rm_obsolete = Optional.extract(remove_obsolete)
added, removed = repo2db_mapper(ScmModel().repo_scan(),
remove_obsolete=rm_obsolete)
return {'added': added, 'removed': removed}
'Error occurred during rescan repositories action'
def invalidate_cache(self, apiuser, repoid):
Dispatch cache invalidation action on given repo
if HasPermissionAnyApi('hg.admin')(user=apiuser) is False:
# check if we have admin permission for this repo !
if HasRepoPermissionAnyApi('repository.admin',
'repository.write')(user=apiuser,
repo_name=repo.repo_name) is False:
invalidated_keys = ScmModel().mark_for_invalidation(repo.repo_name)
Session().commit()
return ('Cache for repository `%s` was invalidated: '
'invalidated cache keys: %s' % (repoid, invalidated_keys))
'Error occurred during cache invalidation action'
def lock(self, apiuser, repoid, locked, userid=Optional(OAttr('apiuser'))):
Set locking state on particular repository by given user, if
this command is runned by non-admin account userid is set to user
who is calling this method
:param locked:
if HasPermissionAnyApi('hg.admin')(user=apiuser):
pass
elif HasRepoPermissionAnyApi('repository.admin',
repo_name=repo.repo_name):
#make sure normal user does not pass someone else userid,
#he is not allowed to do that
if not isinstance(userid, Optional) and userid != apiuser.user_id:
'userid is not the same as your user'
else:
if isinstance(userid, Optional):
userid = apiuser.user_id
user = get_user_or_error(userid)
locked = bool(locked)
if locked:
Repository.lock(repo, user.user_id)
Repository.unlock(repo)
return ('User `%s` set lock state for repo `%s` to `%s`'
% (user.username, repo.repo_name, locked))
'Error occurred locking repository `%s`' % repo.repo_name
def show_ip(self, apiuser, userid):
defined IP addresses for given user
ips = UserIpMap.query().filter(UserIpMap.user == user).all()
return dict(
ip_addr_server=self.ip_addr,
user_ips=ips
def get_user(self, apiuser, userid=Optional(OAttr('apiuser'))):
""""
Get a user by username, or userid, if userid is given
data = user.get_api_data()
data['permissions'] = AuthUser(user_id=user.user_id).permissions
return data
def get_users(self, apiuser):
Get all users
result = []
for user in UserModel().get_all():
result.append(user.get_api_data())
return result
def create_user(self, apiuser, username, email, password,
firstname=Optional(None), lastname=Optional(None),
active=Optional(True), admin=Optional(False),
ldap_dn=Optional(None)):
Create new user
:param username:
:param email:
:param password:
:param firstname:
:param lastname:
:param active:
:param admin:
:param ldap_dn:
if UserModel().get_by_username(username):
raise JSONRPCError("user `%s` already exist" % username)
if UserModel().get_by_email(email, case_insensitive=True):
raise JSONRPCError("email `%s` already exist" % email)
if Optional.extract(ldap_dn):
# generate temporary password if ldap_dn
password = PasswordGenerator().gen_password(length=8)
user = UserModel().create_or_update(
username=Optional.extract(username),
password=Optional.extract(password),
email=Optional.extract(email),
firstname=Optional.extract(firstname),
lastname=Optional.extract(lastname),
active=Optional.extract(active),
admin=Optional.extract(admin),
ldap_dn=Optional.extract(ldap_dn)
msg='created new user `%s`' % username,
user=user.get_api_data()
raise JSONRPCError('failed to create user `%s`' % username)
def update_user(self, apiuser, userid, username=Optional(None),
email=Optional(None), firstname=Optional(None),
lastname=Optional(None), active=Optional(None),
admin=Optional(None), ldap_dn=Optional(None),
password=Optional(None)):
Updates given user
# call function and store only updated arguments
updates = {}
def store_update(attr, name):
if not isinstance(attr, Optional):
updates[name] = attr
store_update(username, 'username')
store_update(password, 'password')
store_update(email, 'email')
store_update(firstname, 'name')
store_update(lastname, 'lastname')
store_update(active, 'active')
store_update(admin, 'admin')
store_update(ldap_dn, 'ldap_dn')
user = UserModel().update_user(user, **updates)
msg='updated user ID:%s %s' % (user.user_id, user.username),
@@ -97,384 +97,403 @@ class BaseTestApi(object):
username='test-api',
password='test',
email='test@api.rhodecode.org',
firstname='first',
lastname='last'
self.TEST_USER_LOGIN = self.test_user.username
self.apikey_regular = self.test_user.api_key
def teardownClass(self):
def setUp(self):
self.maxDiff = None
make_users_group()
def tearDown(self):
destroy_users_group()
def _compare_ok(self, id_, expected, given):
expected = jsonify({
'id': id_,
'error': None,
'result': expected
})
given = json.loads(given)
self.assertEqual(expected, given)
def _compare_error(self, id_, expected, given):
'error': expected,
'result': None
# def test_Optional(self):
# from rhodecode.controllers.api.api import Optional
# option1 = Optional(None)
# self.assertEqual('<Optional:%s>' % None, repr(option1))
# self.assertEqual(1, Optional.extract(Optional(1)))
# self.assertEqual('trololo', Optional.extract('trololo'))
def test_api_wrong_key(self):
id_, params = _build_data('trololo', 'get_user')
response = api_call(self, params)
expected = 'Invalid API KEY'
self._compare_error(id_, expected, given=response.body)
def test_api_missing_non_optional_param(self):
id_, params = _build_data(self.apikey, 'get_repo')
expected = 'Missing non optional `repoid` arg in JSON DATA'
def test_api_missing_non_optional_param_args_null(self):
params = params.replace('"args": {}', '"args": null')
def test_api_missing_non_optional_param_args_bad(self):
params = params.replace('"args": {}', '"args": 1')
def test_api_args_is_null(self):
id_, params = _build_data(self.apikey, 'get_users',)
self.assertEqual(response.status, '200 OK')
def test_api_args_is_bad(self):
def test_api_get_users(self):
ret_all = []
for usr in UserModel().get_all():
ret = usr.get_api_data()
ret_all.append(jsonify(ret))
expected = ret_all
self._compare_ok(id_, expected, given=response.body)
def test_api_get_user(self):
id_, params = _build_data(self.apikey, 'get_user',
userid=TEST_USER_ADMIN_LOGIN)
usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
ret['permissions'] = AuthUser(usr.user_id).permissions
expected = ret
def test_api_get_user_that_does_not_exist(self):
userid='trololo')
expected = "user `%s` does not exist" % 'trololo'
def test_api_get_user_without_giving_userid(self):
id_, params = _build_data(self.apikey, 'get_user')
def test_api_get_user_without_giving_userid_non_admin(self):
id_, params = _build_data(self.apikey_regular, 'get_user')
usr = UserModel().get_by_username(self.TEST_USER_LOGIN)
def test_api_get_user_with_giving_userid_non_admin(self):
id_, params = _build_data(self.apikey_regular, 'get_user',
userid=self.TEST_USER_LOGIN)
expected = 'userid is not the same as your user'
def test_api_pull(self):
#TODO: issues with rhodecode_extras here.. not sure why !
# repo_name = 'test_pull'
# r = create_repo(repo_name, self.REPO_TYPE)
# r.clone_uri = TEST_self.REPO
# Session.add(r)
# Session.commit()
# id_, params = _build_data(self.apikey, 'pull',
# repoid=repo_name,)
# response = self.app.post(API_URL, content_type='application/json',
# params=params)
# expected = 'Pulled from `%s`' % repo_name
# self._compare_ok(id_, expected, given=response.body)
# destroy_repo(repo_name)
def test_api_pull_error(self):
id_, params = _build_data(self.apikey, 'pull',
repoid=self.REPO,)
expected = 'Unable to pull changes from `%s`' % self.REPO
def test_api_rescan_repos(self):
id_, params = _build_data(self.apikey, 'rescan_repos')
expected = {'added': [], 'removed': []}
@mock.patch.object(ScmModel, 'repo_scan', crash)
def test_api_rescann_error(self):
id_, params = _build_data(self.apikey, 'rescan_repos',)
expected = 'Error occurred during rescan repositories action'
def test_api_invalidate_cache(self):
id_, params = _build_data(self.apikey, 'invalidate_cache',
repoid=self.REPO)
expected = ("Cache for repository `%s` was invalidated: "
"invalidated cache keys: %s" % (self.REPO,
[unicode(self.REPO)]))
@mock.patch.object(ScmModel, 'mark_for_invalidation', crash)
def test_api_invalidate_cache_error(self):
expected = 'Error occurred during cache invalidation action'
def test_api_lock_repo_lock_aquire(self):
id_, params = _build_data(self.apikey, 'lock',
userid=TEST_USER_ADMIN_LOGIN,
repoid=self.REPO,
locked=True)
expected = ('User `%s` set lock state for repo `%s` to `%s`'
% (TEST_USER_ADMIN_LOGIN, self.REPO, True))
def test_api_lock_repo_lock_aquire_by_non_admin(self):
repo_name = 'api_delete_me'
create_repo(repo_name, self.REPO_TYPE, owner=self.TEST_USER_LOGIN)
id_, params = _build_data(self.apikey_regular, 'lock',
repoid=repo_name,
% (self.TEST_USER_LOGIN, repo_name, True))
finally:
destroy_repo(repo_name)
def test_api_lock_repo_lock_aquire_non_admin_with_userid(self):
def test_api_lock_repo_lock_aquire_non_admin_not_his_repo(self):
expected = 'repository `%s` does not exist' % (self.REPO)
def test_api_lock_repo_lock_release(self):
locked=False)
% (TEST_USER_ADMIN_LOGIN, self.REPO, False))
def test_api_lock_repo_lock_aquire_optional_userid(self):
@mock.patch.object(Repository, 'lock', crash)
def test_api_lock_error(self):
expected = 'Error occurred locking repository `%s`' % self.REPO
def test_api_create_existing_user(self):
id_, params = _build_data(self.apikey, 'create_user',
username=TEST_USER_ADMIN_LOGIN,
email='test@foo.com',
password='trololo')
expected = "user `%s` already exist" % TEST_USER_ADMIN_LOGIN
def test_api_create_user_with_existing_email(self):
username=TEST_USER_ADMIN_LOGIN + 'new',
email=TEST_USER_REGULAR_EMAIL,
expected = "email `%s` already exist" % TEST_USER_REGULAR_EMAIL
def test_api_create_user(self):
username = 'test_new_api_user'
email = username + "@foo.com"
username=username,
email=email,
usr = UserModel().get_by_username(username)
ret = dict(
user=jsonify(usr.get_api_data())
UserModel().delete(usr.user_id)
@mock.patch.object(UserModel, 'create_or_update', crash)
def test_api_create_user_when_exception_happened(self):
expected = 'failed to create user `%s`' % username
def test_api_delete_user(self):
usr = UserModel().create_or_update(username=u'test_user',
password=u'qweqwe',
email=u'u232@rhodecode.org',
firstname=u'u1', lastname=u'u1')
username = usr.username
email = usr.email
usr_id = usr.user_id
## DELETE THIS USER NOW
id_, params = _build_data(self.apikey, 'delete_user',
userid=username,)
ret = {'msg': 'deleted user ID:%s %s' % (usr_id, username),
'user': None}
@mock.patch.object(UserModel, 'delete', crash)
def test_api_delete_user_when_exception_happened(self):
ret = 'failed to delete ID:%s %s' % (usr.user_id,
usr.username)
@parameterized.expand([('firstname', 'new_username'),
('lastname', 'new_username'),
('email', 'new_username'),
('admin', True),
('admin', False),
('ldap_dn', 'test'),
('ldap_dn', None),
('active', False),
('active', True),
('password', 'newpass')
])
def test_api_update_user(self, name, expected):
kw = {name: expected,
'userid': usr.user_id}
id_, params = _build_data(self.apikey, 'update_user', **kw)
ret = {
'msg': 'updated user ID:%s %s' % (usr.user_id, self.TEST_USER_LOGIN),
'user': jsonify(UserModel()\
.get_by_username(self.TEST_USER_LOGIN)\
.get_api_data())
Status change: