@@ -513,116 +513,138 @@ class ApiController(JSONRPCController):
if repo_name is withina group name the groups will be created
automatically if they aren't present
:param apiuser:
:param repo_name:
:param onwer:
:param repo_type:
:param description:
:param private:
:param clone_uri:
:param landing_rev:
"""
owner = get_user_or_error(owner)
if RepoModel().get_by_repo_name(repo_name):
raise JSONRPCError("repo `%s` already exist" % repo_name)
private = Optional.extract(private)
clone_uri = Optional.extract(clone_uri)
description = Optional.extract(description)
landing_rev = Optional.extract(landing_rev)
try:
# create structure of groups and return the last group
group = map_groups(repo_name)
repo = RepoModel().create_repo(
repo_name=repo_name,
repo_type=repo_type,
description=description,
owner=owner,
private=private,
clone_uri=clone_uri,
repos_group=group,
landing_rev=landing_rev,
)
Session().commit()
return dict(
msg="Created new repository `%s`" % (repo.repo_name),
repo=repo.get_api_data()
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to create repository `%s`' % repo_name)
# @HasPermissionAnyDecorator('hg.admin')
# def fork_repo(self, apiuser, repoid, fork_name):
# repo = get_repo_or_error(repoid)
#
# try:
# form_data = dict(
# )
# RepoModel().create_fork(form_data, cur_user=apiuser)
# return dict(
# msg='Created fork of `%s` as `%s`' % (repo.repo_name,
# fork_name),
# success=True
# except Exception:
# log.error(traceback.format_exc())
# raise JSONRPCError(
# 'failed to fork repository `%s` as `%s`' % (repo.repo_name,
# fork_name)
@HasPermissionAnyDecorator('hg.admin')
def fork_repo(self, apiuser, repoid, fork_name, owner,
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('tip')):
repo = get_repo_or_error(repoid)
repo_name = repo.repo_name
_repo = RepoModel().get_by_repo_name(fork_name)
if _repo:
type_ = 'fork' if _repo.fork else 'repo'
raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
group = map_groups(fork_name)
form_data = dict(
repo_name=fork_name,
repo_name_full=fork_name,
repo_group=group,
repo_type=repo.repo_type,
description=Optional.extract(description),
private=Optional.extract(private),
copy_permissions=Optional.extract(copy_permissions),
landing_rev=Optional.extract(landing_rev),
update_after_clone=False,
fork_parent_id=repo.repo_id,
RepoModel().create_fork(form_data, cur_user=owner)
msg='Created fork of `%s` as `%s`' % (repo.repo_name,
fork_name),
success=True # cannot return the repo data here since fork
# cann be done async
raise JSONRPCError(
'failed to fork repository `%s` as `%s`' % (repo_name,
fork_name)
def delete_repo(self, apiuser, repoid):
Deletes a given repository
:param repoid:
RepoModel().delete(repo)
msg='Deleted repository `%s`' % repo.repo_name,
success=True
'failed to delete repository `%s`' % repo.repo_name
def grant_user_permission(self, apiuser, repoid, userid, perm):
Grant permission for user on given repository, or update existing one
if found
:param userid:
:param perm:
user = get_user_or_error(userid)
perm = get_perm_or_error(perm)
RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
msg='Granted perm: `%s` for user: `%s` in repo: `%s`' % (
perm.permission_name, user.username, repo.repo_name
),
@@ -23,96 +23,105 @@ def _build_data(apikey, method, **kw):
random_id = random.randrange(1, 9999)
return random_id, json.dumps({
"id": random_id,
"api_key": apikey,
"method": method,
"args": kw
})
jsonify = lambda obj: json.loads(json.dumps(obj))
def crash(*args, **kwargs):
raise Exception('Total Crash !')
TEST_USERS_GROUP = 'test_users_group'
def make_users_group(name=TEST_USERS_GROUP):
gr = UsersGroupModel().create(name=name)
UsersGroupModel().add_user_to_group(users_group=gr,
user=TEST_USER_ADMIN_LOGIN)
return gr
def destroy_users_group(name=TEST_USERS_GROUP):
UsersGroupModel().delete(users_group=name, force=True)
def create_repo(repo_name, repo_type):
# create new repo
form_data = dict(repo_name=repo_name,
repo_name_full=repo_name,
fork_name=None,
description='description %s' % repo_name,
repo_group=None,
private=False,
clone_uri=None,
landing_rev='tip')
cur_user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
return r
def create_fork(fork_name, fork_type, fork_of):
fork = RepoModel(Session())._get_repo(fork_of)
r = create_repo(fork_name, fork_type)
r.fork = fork
Session().add(r)
def destroy_repo(repo_name):
RepoModel().delete(repo_name)
class BaseTestApi(object):
REPO = None
REPO_TYPE = None
@classmethod
def setUpClass(self):
self.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
self.apikey = self.usr.api_key
self.TEST_USER = UserModel().create_or_update(
username='test-api',
password='test',
email='test@api.rhodecode.org',
firstname='first',
lastname='last'
self.TEST_USER_LOGIN = self.TEST_USER.username
def teardownClass(self):
pass
def setUp(self):
self.maxDiff = None
make_users_group()
def tearDown(self):
destroy_users_group()
def _compare_ok(self, id_, expected, given):
expected = jsonify({
'id': id_,
'error': None,
'result': expected
given = json.loads(given)
self.assertEqual(expected, given)
def _compare_error(self, id_, expected, given):
'error': expected,
'result': None
@@ -526,108 +535,188 @@ class BaseTestApi(object):
params=params)
expected = 'user `%s` does not exist' % owner
self._compare_error(id_, expected, given=response.body)
def test_api_create_repo_exists(self):
repo_name = self.REPO
id_, params = _build_data(self.apikey, 'create_repo',
owner=TEST_USER_ADMIN_LOGIN,
repo_type='hg',
response = self.app.post(API_URL, content_type='application/json',
expected = "repo `%s` already exist" % repo_name
@mock.patch.object(RepoModel, 'create_repo', crash)
def test_api_create_repo_exception_occurred(self):
repo_name = 'api-repo'
expected = 'failed to create repository `%s`' % repo_name
def test_api_delete_repo(self):
repo_name = 'api_delete_me'
create_repo(repo_name, self.REPO_TYPE)
id_, params = _build_data(self.apikey, 'delete_repo',
repoid=repo_name,)
ret = {
'msg': 'Deleted repository `%s`' % repo_name,
'success': True
}
expected = ret
self._compare_ok(id_, expected, given=response.body)
def test_api_delete_repo_exception_occurred(self):
with mock.patch.object(RepoModel, 'delete', crash):
expected = 'failed to delete repository `%s`' % repo_name
finally:
destroy_repo(repo_name)
def test_api_fork_repo(self):
fork_name = 'api-repo-fork'
id_, params = _build_data(self.apikey, 'fork_repo',
repoid=self.REPO,
fork_name=fork_name,
'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
destroy_repo(fork_name)
def test_api_fork_repo_unknown_owner(self):
owner = 'i-dont-exist'
def test_api_fork_repo_fork_exists(self):
create_fork(fork_name, self.REPO_TYPE, self.REPO)
expected = "fork `%s` already exist" % fork_name
def test_api_fork_repo_repo_exists(self):
fork_name = self.REPO
self.failIf(False, 'TODO:')
expected = "repo `%s` already exist" % fork_name
@mock.patch.object(RepoModel, 'create_fork', crash)
def test_api_fork_repo_exception_occurred(self):
expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
def test_api_get_users_group(self):
id_, params = _build_data(self.apikey, 'get_users_group',
usersgroupid=TEST_USERS_GROUP)
users_group = UsersGroupModel().get_group(TEST_USERS_GROUP)
members = []
for user in users_group.members:
user = user.user
members.append(user.get_api_data())
ret = users_group.get_api_data()
ret['members'] = members
def test_api_get_users_groups(self):
make_users_group('test_users_group2')
id_, params = _build_data(self.apikey, 'get_users_groups',)
expected = []
for gr_name in [TEST_USERS_GROUP, 'test_users_group2']:
users_group = UsersGroupModel().get_group(gr_name)
expected.append(ret)
UsersGroupModel().delete(users_group='test_users_group2')
self.Session().commit()
def test_api_create_users_group(self):
group_name = 'some_new_group'
id_, params = _build_data(self.apikey, 'create_users_group',
group_name=group_name)
'msg': 'created new users group `%s`' % group_name,
'users_group': jsonify(UsersGroupModel()\
.get_by_name(group_name)\
.get_api_data())
Status change: