@@ -465,212 +465,234 @@ class ApiController(JSONRPCController):
Get all repositories
:param apiuser:
"""
result = []
for repo in RepoModel().get_all():
result.append(repo.get_api_data())
return result
@HasPermissionAnyDecorator('hg.admin')
def get_repo_nodes(self, apiuser, repoid, revision, root_path,
ret_type='all'):
returns a list of nodes and it's children
for a given path at given revision. It's possible to specify ret_type
to show only files or dirs
:param repoid: name or id of repository
:param revision: revision for which listing should be done
:param root_path: path from which start displaying
:param ret_type: return type 'all|files|dirs' nodes
repo = get_repo_or_error(repoid)
try:
_d, _f = ScmModel().get_nodes(repo, revision, root_path,
flat=False)
_map = {
'all': _d + _f,
'files': _f,
'dirs': _d,
}
return _map[ret_type]
except KeyError:
raise JSONRPCError('ret_type must be one of %s' % _map.keys())
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to get repo: `%s` nodes' % repo.repo_name
)
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def create_repo(self, apiuser, repo_name, owner, repo_type,
description=Optional(''), private=Optional(False),
clone_uri=Optional(None), landing_rev=Optional('tip')):
Create repository, if clone_url is given it makes a remote clone
if repo_name is withina group name the groups will be created
automatically if they aren't present
:param repo_name:
:param onwer:
:param repo_type:
:param description:
:param private:
:param clone_uri:
:param landing_rev:
owner = get_user_or_error(owner)
if RepoModel().get_by_repo_name(repo_name):
raise JSONRPCError("repo `%s` already exist" % repo_name)
private = Optional.extract(private)
clone_uri = Optional.extract(clone_uri)
description = Optional.extract(description)
landing_rev = Optional.extract(landing_rev)
# create structure of groups and return the last group
group = map_groups(repo_name)
repo = RepoModel().create_repo(
repo_name=repo_name,
repo_type=repo_type,
description=description,
owner=owner,
private=private,
clone_uri=clone_uri,
repos_group=group,
landing_rev=landing_rev,
Session().commit()
return dict(
msg="Created new repository `%s`" % (repo.repo_name),
repo=repo.get_api_data()
raise JSONRPCError('failed to create repository `%s`' % repo_name)
# @HasPermissionAnyDecorator('hg.admin')
# def fork_repo(self, apiuser, repoid, fork_name):
# repo = get_repo_or_error(repoid)
#
# try:
# form_data = dict(
# )
# RepoModel().create_fork(form_data, cur_user=apiuser)
# return dict(
# msg='Created fork of `%s` as `%s`' % (repo.repo_name,
# fork_name),
# success=True
# except Exception:
# log.error(traceback.format_exc())
# raise JSONRPCError(
# 'failed to fork repository `%s` as `%s`' % (repo.repo_name,
# fork_name)
def fork_repo(self, apiuser, repoid, fork_name, owner,
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('tip')):
repo_name = repo.repo_name
_repo = RepoModel().get_by_repo_name(fork_name)
if _repo:
type_ = 'fork' if _repo.fork else 'repo'
raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
group = map_groups(fork_name)
form_data = dict(
repo_name=fork_name,
repo_name_full=fork_name,
repo_group=group,
repo_type=repo.repo_type,
description=Optional.extract(description),
private=Optional.extract(private),
copy_permissions=Optional.extract(copy_permissions),
landing_rev=Optional.extract(landing_rev),
update_after_clone=False,
fork_parent_id=repo.repo_id,
RepoModel().create_fork(form_data, cur_user=owner)
msg='Created fork of `%s` as `%s`' % (repo.repo_name,
fork_name),
success=True # cannot return the repo data here since fork
# cann be done async
'failed to fork repository `%s` as `%s`' % (repo_name,
fork_name)
def delete_repo(self, apiuser, repoid):
Deletes a given repository
:param repoid:
RepoModel().delete(repo)
msg='Deleted repository `%s`' % repo.repo_name,
success=True
'failed to delete repository `%s`' % repo.repo_name
def grant_user_permission(self, apiuser, repoid, userid, perm):
Grant permission for user on given repository, or update existing one
if found
:param userid:
:param perm:
user = get_user_or_error(userid)
perm = get_perm_or_error(perm)
RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
perm.permission_name, user.username, repo.repo_name
),
'failed to edit permission for user: `%s` in repo: `%s`' % (
userid, repoid
def revoke_user_permission(self, apiuser, repoid, userid):
Revoke permission for user on given repository
RepoModel().revoke_user_permission(repo=repo, user=user)
msg='Revoked perm for user: `%s` in repo: `%s`' % (
user.username, repo.repo_name
def grant_users_group_permission(self, apiuser, repoid, usersgroupid,
perm):
Grant permission for users group on given repository, or update
existing one if found
from __future__ import with_statement
import random
import mock
from rhodecode.tests import *
from rhodecode.lib.compat import json
from rhodecode.lib.auth import AuthUser
from rhodecode.model.user import UserModel
from rhodecode.model.users_group import UsersGroupModel
from rhodecode.model.repo import RepoModel
from rhodecode.model.meta import Session
API_URL = '/_admin/api'
def _build_data(apikey, method, **kw):
Builds API data with given random ID
:param random_id:
:type random_id:
random_id = random.randrange(1, 9999)
return random_id, json.dumps({
"id": random_id,
"api_key": apikey,
"method": method,
"args": kw
})
jsonify = lambda obj: json.loads(json.dumps(obj))
def crash(*args, **kwargs):
raise Exception('Total Crash !')
TEST_USERS_GROUP = 'test_users_group'
def make_users_group(name=TEST_USERS_GROUP):
gr = UsersGroupModel().create(name=name)
UsersGroupModel().add_user_to_group(users_group=gr,
user=TEST_USER_ADMIN_LOGIN)
return gr
def destroy_users_group(name=TEST_USERS_GROUP):
UsersGroupModel().delete(users_group=name, force=True)
def create_repo(repo_name, repo_type):
# create new repo
form_data = dict(repo_name=repo_name,
repo_name_full=repo_name,
fork_name=None,
description='description %s' % repo_name,
repo_group=None,
private=False,
clone_uri=None,
landing_rev='tip')
cur_user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
return r
def create_fork(fork_name, fork_type, fork_of):
fork = RepoModel(Session())._get_repo(fork_of)
r = create_repo(fork_name, fork_type)
r.fork = fork
Session().add(r)
def destroy_repo(repo_name):
RepoModel().delete(repo_name)
class BaseTestApi(object):
REPO = None
REPO_TYPE = None
@classmethod
def setUpClass(self):
self.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
self.apikey = self.usr.api_key
self.TEST_USER = UserModel().create_or_update(
username='test-api',
password='test',
email='test@api.rhodecode.org',
firstname='first',
lastname='last'
self.TEST_USER_LOGIN = self.TEST_USER.username
def teardownClass(self):
pass
def setUp(self):
self.maxDiff = None
make_users_group()
def tearDown(self):
destroy_users_group()
def _compare_ok(self, id_, expected, given):
expected = jsonify({
'id': id_,
'error': None,
'result': expected
given = json.loads(given)
self.assertEqual(expected, given)
def _compare_error(self, id_, expected, given):
'error': expected,
'result': None
# def test_Optional(self):
# from rhodecode.controllers.api.api import Optional
# option1 = Optional(None)
# self.assertEqual('<Optional:%s>' % None, repr(option1))
# self.assertEqual(1, Optional.extract(Optional(1)))
# self.assertEqual('trololo', Optional.extract('trololo'))
def test_api_wrong_key(self):
id_, params = _build_data('trololo', 'get_user')
response = self.app.post(API_URL, content_type='application/json',
params=params)
expected = 'Invalid API KEY'
self._compare_error(id_, expected, given=response.body)
def test_api_missing_non_optional_param(self):
id_, params = _build_data(self.apikey, 'get_user')
expected = 'Missing non optional `userid` arg in JSON DATA'
def test_api_get_users(self):
id_, params = _build_data(self.apikey, 'get_users',)
ret_all = []
for usr in UserModel().get_all():
ret = usr.get_api_data()
ret_all.append(jsonify(ret))
expected = ret_all
self._compare_ok(id_, expected, given=response.body)
def test_api_get_user(self):
id_, params = _build_data(self.apikey, 'get_user',
userid=TEST_USER_ADMIN_LOGIN)
usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
ret['permissions'] = AuthUser(usr.user_id).permissions
@@ -478,204 +487,284 @@ class BaseTestApi(object):
expected = 'failed to get repo: `%s` nodes' % self.REPO
def test_api_get_repo_nodes_bad_ret_type(self):
rev = 'tip'
path = '/'
ret_type = 'error'
id_, params = _build_data(self.apikey, 'get_repo_nodes',
repoid=self.REPO, revision=rev,
root_path=path,
ret_type=ret_type)
expected = 'ret_type must be one of %s' % (['files', 'dirs', 'all'])
def test_api_create_repo(self):
repo_name = 'api-repo'
id_, params = _build_data(self.apikey, 'create_repo',
owner=TEST_USER_ADMIN_LOGIN,
repo_type='hg',
repo = RepoModel().get_by_repo_name(repo_name)
ret = {
'msg': 'Created new repository `%s`' % repo_name,
'repo': jsonify(repo.get_api_data())
expected = ret
destroy_repo(repo_name)
def test_api_create_repo_unknown_owner(self):
owner = 'i-dont-exist'
expected = 'user `%s` does not exist' % owner
def test_api_create_repo_exists(self):
repo_name = self.REPO
expected = "repo `%s` already exist" % repo_name
@mock.patch.object(RepoModel, 'create_repo', crash)
def test_api_create_repo_exception_occurred(self):
expected = 'failed to create repository `%s`' % repo_name
def test_api_delete_repo(self):
repo_name = 'api_delete_me'
create_repo(repo_name, self.REPO_TYPE)
id_, params = _build_data(self.apikey, 'delete_repo',
repoid=repo_name,)
'msg': 'Deleted repository `%s`' % repo_name,
'success': True
def test_api_delete_repo_exception_occurred(self):
with mock.patch.object(RepoModel, 'delete', crash):
expected = 'failed to delete repository `%s`' % repo_name
finally:
def test_api_fork_repo(self):
fork_name = 'api-repo-fork'
id_, params = _build_data(self.apikey, 'fork_repo',
repoid=self.REPO,
fork_name=fork_name,
'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
destroy_repo(fork_name)
def test_api_fork_repo_unknown_owner(self):
def test_api_fork_repo_fork_exists(self):
create_fork(fork_name, self.REPO_TYPE, self.REPO)
expected = "fork `%s` already exist" % fork_name
def test_api_fork_repo_repo_exists(self):
fork_name = self.REPO
self.failIf(False, 'TODO:')
expected = "repo `%s` already exist" % fork_name
@mock.patch.object(RepoModel, 'create_fork', crash)
def test_api_fork_repo_exception_occurred(self):
expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
def test_api_get_users_group(self):
id_, params = _build_data(self.apikey, 'get_users_group',
usersgroupid=TEST_USERS_GROUP)
users_group = UsersGroupModel().get_group(TEST_USERS_GROUP)
members = []
for user in users_group.members:
user = user.user
members.append(user.get_api_data())
ret = users_group.get_api_data()
ret['members'] = members
def test_api_get_users_groups(self):
make_users_group('test_users_group2')
id_, params = _build_data(self.apikey, 'get_users_groups',)
expected = []
for gr_name in [TEST_USERS_GROUP, 'test_users_group2']:
users_group = UsersGroupModel().get_group(gr_name)
expected.append(ret)
UsersGroupModel().delete(users_group='test_users_group2')
self.Session().commit()
def test_api_create_users_group(self):
group_name = 'some_new_group'
id_, params = _build_data(self.apikey, 'create_users_group',
group_name=group_name)
'msg': 'created new users group `%s`' % group_name,
'users_group': jsonify(UsersGroupModel()\
.get_by_name(group_name)\
.get_api_data())
destroy_users_group(group_name)
def test_api_get_users_group_that_exist(self):
group_name=TEST_USERS_GROUP)
expected = "users group `%s` already exist" % TEST_USERS_GROUP
@mock.patch.object(UsersGroupModel, 'create', crash)
def test_api_get_users_group_exception_occurred(self):
group_name = 'exception_happens'
expected = 'failed to create group `%s`' % group_name
def test_api_add_user_to_users_group(self):
gr_name = 'test_group'
UsersGroupModel().create(gr_name)
id_, params = _build_data(self.apikey, 'add_user_to_users_group',
usersgroupid=gr_name,
expected = {
'msg': 'added member `%s` to users group `%s`' % (
TEST_USER_ADMIN_LOGIN, gr_name
'success': True}
UsersGroupModel().delete(users_group=gr_name)
def test_api_add_user_to_users_group_that_doesnt_exist(self):
Status change: