@@ -1582,195 +1582,196 @@ class ApiController(JSONRPCController):
store_update(updates, landing_rev, 'repo_landing_rev')
store_update(updates, enable_statistics, 'repo_enable_statistics')
store_update(updates, enable_locking, 'repo_enable_locking')
store_update(updates, enable_downloads, 'repo_enable_downloads')
RepoModel().update(repo, **updates)
Session().commit()
return dict(
msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
repository=repo.get_api_data()
)
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to update repo `%s`' % repoid)
@HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
def fork_repo(self, apiuser, repoid, fork_name,
owner=Optional(OAttr('apiuser')),
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('rev:tip')):
"""
Creates a fork of given repo. In case of using celery this will
immediately return success message, while fork is going to be created
asynchronous. This command can be executed only using api_key belonging to
user with admin rights or regular user that have fork permission, and at least
read access to forking repository. Regular users cannot specify owner parameter.
:param apiuser: filled automatically from apikey
:type apiuser: AuthUser
:param repoid: repository name or repository id
:type repoid: str or int
:param fork_name:
:param owner:
:param description:
:param copy_permissions:
:param private:
:param landing_rev:
INPUT::
id : <id_for_response>
api_key : "<api_key>"
args: {
"repoid" : "<reponame or repo_id>",
"fork_name": "<forkname>",
"owner": "<username or user_id = Optional(=apiuser)>",
"description": "<description>",
"copy_permissions": "<bool>",
"private": "<bool>",
"landing_rev": "<landing_rev>"
}
OUTPUT::
id : <id_given_in_input>
result: {
"msg": "Created fork of `<reponame>` as `<forkname>`",
"success": true,
"task": "<celery task id or None if done sync>"
error: null
repo = get_repo_or_error(repoid)
repo_name = repo.repo_name
_repo = RepoModel().get_by_repo_name(fork_name)
if _repo:
type_ = 'fork' if _repo.fork else 'repo'
raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
if HasPermissionAnyApi('hg.admin')(user=apiuser):
pass
elif HasRepoPermissionAnyApi('repository.admin',
'repository.write',
'repository.read')(user=apiuser,
repo_name=repo.repo_name):
if not isinstance(owner, Optional):
#forbid setting owner for non-admins
raise JSONRPCError(
'Only Kallithea admin can specify `owner` param'
if not HasPermissionAnyApi('hg.create.repository')(user=apiuser):
raise JSONRPCError('no permission to create repositories')
else:
raise JSONRPCError('repository `%s` does not exist' % (repoid,))
if isinstance(owner, Optional):
owner = apiuser.user_id
owner = get_user_or_error(owner)
try:
# create structure of groups and return the last group
group = map_groups(fork_name)
fork_base_name = fork_name.rsplit('/', 1)[-1]
form_data = dict(
repo_name=fork_name,
repo_name=fork_base_name,
repo_name_full=fork_name,
repo_group=group,
repo_type=repo.repo_type,
description=Optional.extract(description),
private=Optional.extract(private),
copy_permissions=Optional.extract(copy_permissions),
landing_rev=Optional.extract(landing_rev),
update_after_clone=False,
fork_parent_id=repo.repo_id,
task = RepoModel().create_fork(form_data, cur_user=owner)
# no commit, it's done in RepoModel, or async via celery
from celery.result import BaseAsyncResult
task_id = None
if isinstance(task, BaseAsyncResult):
task_id = task.task_id
msg='Created fork of `%s` as `%s`' % (repo.repo_name,
fork_name),
success=True, # cannot return the repo data here since fork
# can be done async
task=task_id
'failed to fork repository `%s` as `%s`' % (repo_name,
fork_name)
# permission check inside
def delete_repo(self, apiuser, repoid, forks=Optional('')):
Deletes a repository. This command can be executed only using api_key belonging
to user with admin rights or regular user that have admin access to repository.
When `forks` param is set it's possible to detach or delete forks of deleting
repository
:param forks: `detach` or `delete`, what do do with attached forks for repo
:type forks: Optional(str)
"msg": "Deleted repository `<reponame>`",
"success": true
if not HasPermissionAnyApi('hg.admin')(user=apiuser):
# check if we have admin permission for this repo !
if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
handle_forks = Optional.extract(forks)
_forks_msg = ''
_forks = [f for f in repo.forks]
if handle_forks == 'detach':
_forks_msg = ' ' + 'Detached %s forks' % len(_forks)
elif handle_forks == 'delete':
_forks_msg = ' ' + 'Deleted %s forks' % len(_forks)
elif _forks:
'Cannot delete `%s` it still contains attached forks' %
(repo.repo_name,)
RepoModel().delete(repo, forks=forks)
msg='Deleted repository `%s`%s' % (repo.repo_name, _forks_msg),
success=True
'failed to delete repository `%s`' % (repo.repo_name,)
@HasPermissionAllDecorator('hg.admin')
def grant_user_permission(self, apiuser, repoid, userid, perm):
Grant permission for user on given repository, or update existing one
if found. This command can be executed only using api_key belonging to user
with admin rights.
@@ -1298,194 +1298,197 @@ class _BaseTestApi(object):
repo_name = 'admin_owned'
fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
RepoModel().grant_user_permission(repo=repo_name,
user=self.TEST_USER_LOGIN,
perm='repository.admin')
updates = {'owner': TEST_USER_ADMIN_LOGIN}
id_, params = _build_data(self.apikey_regular, 'update_repo',
repoid=repo_name, **updates)
response = api_call(self, params)
expected = 'Only Kallithea admin can specify `owner` param'
self._compare_error(id_, expected, given=response.body)
finally:
fixture.destroy_repo(repo_name)
def test_api_delete_repo(self):
repo_name = 'api_delete_me'
id_, params = _build_data(self.apikey, 'delete_repo',
repoid=repo_name, )
ret = {
'msg': 'Deleted repository `%s`' % repo_name,
'success': True
expected = ret
self._compare_ok(id_, expected, given=response.body)
def test_api_delete_repo_by_non_admin(self):
fixture.create_repo(repo_name, repo_type=self.REPO_TYPE,
cur_user=self.TEST_USER_LOGIN)
id_, params = _build_data(self.apikey_regular, 'delete_repo',
def test_api_delete_repo_by_non_admin_no_permission(self):
expected = 'repository `%s` does not exist' % (repo_name)
def test_api_delete_repo_exception_occurred(self):
with mock.patch.object(RepoModel, 'delete', crash):
expected = 'failed to delete repository `%s`' % repo_name
def test_api_fork_repo(self):
fork_name = 'api-repo-fork'
id_, params = _build_data(self.apikey, 'fork_repo',
repoid=self.REPO,
fork_name=fork_name,
owner=TEST_USER_ADMIN_LOGIN,
'msg': 'Created fork of `%s` as `%s`' % (self.REPO,
'success': True,
'task': None,
fixture.destroy_repo(fork_name)
def test_api_fork_repo_non_admin(self):
@parameterized.expand([
(u'api-repo-fork',),
(u'%s/api-repo-fork' % TEST_REPO_GROUP,),
])
def test_api_fork_repo_non_admin(self, fork_name):
id_, params = _build_data(self.apikey_regular, 'fork_repo',
def test_api_fork_repo_non_admin_specify_owner(self):
def test_api_fork_repo_non_admin_no_permission_to_fork(self):
RepoModel().grant_user_permission(repo=self.REPO,
perm='repository.none')
expected = 'repository `%s` does not exist' % (self.REPO)
@parameterized.expand([('read', 'repository.read'),
('write', 'repository.write'),
('admin', 'repository.admin')])
def test_api_fork_repo_non_admin_no_create_repo_permission(self, name, perm):
# regardless of base repository permission, forking is disallowed
# when repository creation is disabled
perm=perm)
UserModel().revoke_perm('default', 'hg.create.repository')
UserModel().grant_perm('default', 'hg.create.none')
expected = 'no permission to create repositories'
def test_api_fork_repo_unknown_owner(self):
owner = 'i-dont-exist'
owner=owner,
expected = 'user `%s` does not exist' % owner
def test_api_fork_repo_fork_exists(self):
fixture.create_fork(self.REPO, fork_name)
expected = "fork `%s` already exist" % fork_name
def test_api_fork_repo_repo_exists(self):
fork_name = self.REPO
Status change: