@@ -558,26 +558,48 @@ class ApiController(JSONRPCController):
log.error(traceback.format_exc())
raise JSONRPCError('failed to create repository `%s`' % repo_name)
# @HasPermissionAnyDecorator('hg.admin')
# def fork_repo(self, apiuser, repoid, fork_name):
# repo = get_repo_or_error(repoid)
#
# try:
# form_data = dict(
# )
# RepoModel().create_fork(form_data, cur_user=apiuser)
# return dict(
# msg='Created fork of `%s` as `%s`' % (repo.repo_name,
# fork_name),
# success=True
# except Exception:
# log.error(traceback.format_exc())
# raise JSONRPCError(
# 'failed to fork repository `%s` as `%s`' % (repo.repo_name,
# fork_name)
@HasPermissionAnyDecorator('hg.admin')
def fork_repo(self, apiuser, repoid, fork_name, owner,
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('tip')):
repo = get_repo_or_error(repoid)
repo_name = repo.repo_name
owner = get_user_or_error(owner)
_repo = RepoModel().get_by_repo_name(fork_name)
if _repo:
type_ = 'fork' if _repo.fork else 'repo'
raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
try:
# create structure of groups and return the last group
group = map_groups(fork_name)
form_data = dict(
repo_name=fork_name,
repo_name_full=fork_name,
repo_group=group,
repo_type=repo.repo_type,
description=Optional.extract(description),
private=Optional.extract(private),
copy_permissions=Optional.extract(copy_permissions),
landing_rev=Optional.extract(landing_rev),
update_after_clone=False,
fork_parent_id=repo.repo_id,
)
RepoModel().create_fork(form_data, cur_user=owner)
return dict(
msg='Created fork of `%s` as `%s`' % (repo.repo_name,
fork_name),
success=True # cannot return the repo data here since fork
# cann be done async
except Exception:
raise JSONRPCError(
'failed to fork repository `%s` as `%s`' % (repo_name,
fork_name)
def delete_repo(self, apiuser, repoid):
@@ -68,6 +68,15 @@ def create_repo(repo_name, repo_type):
return r
def create_fork(fork_name, fork_type, fork_of):
fork = RepoModel(Session())._get_repo(fork_of)
r = create_repo(fork_name, fork_type)
r.fork = fork
Session().add(r)
Session().commit()
def destroy_repo(repo_name):
RepoModel().delete(repo_name)
@@ -571,18 +580,98 @@ class BaseTestApi(object):
def test_api_delete_repo_exception_occurred(self):
repo_name = 'api_delete_me'
create_repo(repo_name, self.REPO_TYPE)
with mock.patch.object(RepoModel, 'delete', crash):
id_, params = _build_data(self.apikey, 'delete_repo',
repoid=repo_name,)
response = self.app.post(API_URL, content_type='application/json',
params=params)
expected = 'failed to delete repository `%s`' % repo_name
self._compare_error(id_, expected, given=response.body)
finally:
destroy_repo(repo_name)
def test_api_fork_repo(self):
fork_name = 'api-repo-fork'
id_, params = _build_data(self.apikey, 'fork_repo',
repoid=self.REPO,
fork_name=fork_name,
owner=TEST_USER_ADMIN_LOGIN,
ret = {
'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
'success': True
}
expected = ret
self._compare_ok(id_, expected, given=response.body)
destroy_repo(fork_name)
def test_api_fork_repo_unknown_owner(self):
owner = 'i-dont-exist'
owner=owner,
expected = 'user `%s` does not exist' % owner
def test_api_fork_repo_fork_exists(self):
create_fork(fork_name, self.REPO_TYPE, self.REPO)
expected = "fork `%s` already exist" % fork_name
def test_api_fork_repo_repo_exists(self):
fork_name = self.REPO
self.failIf(False, 'TODO:')
expected = "repo `%s` already exist" % fork_name
@mock.patch.object(RepoModel, 'create_fork', crash)
def test_api_fork_repo_exception_occurred(self):
expected = 'failed to fork repository `%s` as `%s`' % (self.REPO,
def test_api_get_users_group(self):
id_, params = _build_data(self.apikey, 'get_users_group',
Status change: