@@ -549,44 +549,66 @@ class ApiController(JSONRPCController):
Session().commit()
return dict(
msg="Created new repository `%s`" % (repo.repo_name),
repo=repo.get_api_data()
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to create repository `%s`' % repo_name)
# @HasPermissionAnyDecorator('hg.admin')
# def fork_repo(self, apiuser, repoid, fork_name):
# repo = get_repo_or_error(repoid)
#
# try:
# form_data = dict(
# )
# RepoModel().create_fork(form_data, cur_user=apiuser)
# return dict(
# msg='Created fork of `%s` as `%s`' % (repo.repo_name,
# fork_name),
# success=True
# except Exception:
# log.error(traceback.format_exc())
# raise JSONRPCError(
# 'failed to fork repository `%s` as `%s`' % (repo.repo_name,
# fork_name)
@HasPermissionAnyDecorator('hg.admin')
def fork_repo(self, apiuser, repoid, fork_name, owner,
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('tip')):
repo = get_repo_or_error(repoid)
repo_name = repo.repo_name
owner = get_user_or_error(owner)
_repo = RepoModel().get_by_repo_name(fork_name)
if _repo:
type_ = 'fork' if _repo.fork else 'repo'
raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
try:
# create structure of groups and return the last group
group = map_groups(fork_name)
form_data = dict(
repo_name=fork_name,
repo_name_full=fork_name,
repo_group=group,
repo_type=repo.repo_type,
description=Optional.extract(description),
private=Optional.extract(private),
copy_permissions=Optional.extract(copy_permissions),
landing_rev=Optional.extract(landing_rev),
update_after_clone=False,
fork_parent_id=repo.repo_id,
RepoModel().create_fork(form_data, cur_user=owner)
msg='Created fork of `%s` as `%s`' % (repo.repo_name,
fork_name),
success=True # cannot return the repo data here since fork
# cann be done async
raise JSONRPCError(
'failed to fork repository `%s` as `%s`' % (repo_name,
fork_name)
def delete_repo(self, apiuser, repoid):
"""
Deletes a given repository
:param apiuser:
:param repoid:
@@ -59,24 +59,33 @@ def create_repo(repo_name, repo_type):
description='description %s' % repo_name,
repo_group=None,
private=False,
repo_type=repo_type,
clone_uri=None,
landing_rev='tip')
cur_user = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
return r
def create_fork(fork_name, fork_type, fork_of):
fork = RepoModel(Session())._get_repo(fork_of)
r = create_repo(fork_name, fork_type)
r.fork = fork
Session().add(r)
def destroy_repo(repo_name):
RepoModel().delete(repo_name)
class BaseTestApi(object):
REPO = None
REPO_TYPE = None
@classmethod
def setUpClass(self):
self.usr = UserModel().get_by_username(TEST_USER_ADMIN_LOGIN)
@@ -562,36 +571,116 @@ class BaseTestApi(object):
params=params)
ret = {
'msg': 'Deleted repository `%s`' % repo_name,
'success': True
}
expected = ret
self._compare_ok(id_, expected, given=response.body)
def test_api_delete_repo_exception_occurred(self):
repo_name = 'api_delete_me'
create_repo(repo_name, self.REPO_TYPE)
with mock.patch.object(RepoModel, 'delete', crash):
id_, params = _build_data(self.apikey, 'delete_repo',
repoid=repo_name,)
response = self.app.post(API_URL, content_type='application/json',
expected = 'failed to delete repository `%s`' % repo_name
self._compare_error(id_, expected, given=response.body)
finally:
destroy_repo(repo_name)
def test_api_fork_repo(self):
fork_name = 'api-repo-fork'
id_, params = _build_data(self.apikey, 'fork_repo',
repoid=self.REPO,
fork_name=fork_name,
owner=TEST_USER_ADMIN_LOGIN,
'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
destroy_repo(fork_name)
def test_api_fork_repo_unknown_owner(self):
owner = 'i-dont-exist'
owner=owner,
expected = 'user `%s` does not exist' % owner
def test_api_fork_repo_fork_exists(self):
create_fork(fork_name, self.REPO_TYPE, self.REPO)
expected = "fork `%s` already exist" % fork_name
def test_api_fork_repo_repo_exists(self):
fork_name = self.REPO
self.failIf(False, 'TODO:')
expected = "repo `%s` already exist" % fork_name
@mock.patch.object(RepoModel, 'create_fork', crash)
def test_api_fork_repo_exception_occurred(self):
expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
def test_api_get_users_group(self):
id_, params = _build_data(self.apikey, 'get_users_group',
usersgroupid=TEST_USERS_GROUP)
users_group = UsersGroupModel().get_group(TEST_USERS_GROUP)
members = []
for user in users_group.members:
user = user.user
members.append(user.get_api_data())
Status change: