# HG changeset patch # User Mads Kiilerich # Date 2021-01-02 23:41:37 # Node ID 7643d8ecbb205effd547986878b4e40ce4f83329 # Parent 9a28233045b91e5aad7c1102733caf11df7b047e api: fix repo group permission check for repo creation hg.create.repository is only controlling whether all users have write access at top level. For other repo locations, check for write permission to the repo group. Note: This also covers "repo creation" by forking or by moving another repo. diff --git a/kallithea/controllers/api/api.py b/kallithea/controllers/api/api.py --- a/kallithea/controllers/api/api.py +++ b/kallithea/controllers/api/api.py @@ -1167,7 +1167,7 @@ class ApiController(JSONRPCController): 'failed to get repo: `%s` nodes' % repo.repo_name ) - @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository') + # permission check inside def create_repo(self, repo_name, owner=Optional(OAttr('apiuser')), repo_type=Optional('hg'), description=Optional(''), private=Optional(False), clone_uri=Optional(None), @@ -1224,6 +1224,19 @@ class ApiController(JSONRPCController): } """ + group_name = None + repo_name_parts = repo_name.split('/') + if len(repo_name_parts) > 1: + group_name = '/'.join(repo_name_parts[:-1]) + repo_group = RepoGroup.get_by_group_name(group_name) + if repo_group is None: + raise JSONRPCError("repo group `%s` not found" % group_name) + if not(HasPermissionAny('hg.admin')() or HasRepoGroupPermissionLevel('write')(group_name)): + raise JSONRPCError("no permission to create repo in %s" % group_name) + else: + if not HasPermissionAny('hg.admin', 'hg.create.repository')(): + raise JSONRPCError("no permission to create top level repo") + if not HasPermissionAny('hg.admin')(): if not isinstance(owner, Optional): # forbid setting owner for non-admins @@ -1254,13 +1267,6 @@ class ApiController(JSONRPCController): copy_permissions = Optional.extract(copy_permissions) try: - repo_name_parts = repo_name.split('/') - group_name = None - if len(repo_name_parts) > 1: - group_name = '/'.join(repo_name_parts[:-1]) - repo_group = RepoGroup.get_by_group_name(group_name) - if repo_group is None: - raise JSONRPCError("repo group `%s` not found" % group_name) data = dict( repo_name=repo_name_parts[-1], repo_name_full=repo_name, @@ -1334,6 +1340,9 @@ class ApiController(JSONRPCController): repo_group = group if not isinstance(repo_group, Optional): repo_group = get_repo_group_or_error(repo_group) + if repo_group.group_id != repo.group_id: + if not(HasPermissionAny('hg.admin')() or HasRepoGroupPermissionLevel('write')(repo_group.group_name)): + raise JSONRPCError("no permission to create (or move) repo in %s" % repo_group.group_name) repo_group = repo_group.group_id try: store_update(updates, name, 'repo_name') @@ -1356,6 +1365,7 @@ class ApiController(JSONRPCController): log.error(traceback.format_exc()) raise JSONRPCError('failed to update repo `%s`' % repoid) + # permission check inside @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository') def fork_repo(self, repoid, fork_name, owner=Optional(OAttr('apiuser')), @@ -1410,6 +1420,19 @@ class ApiController(JSONRPCController): type_ = 'fork' if _repo.fork else 'repo' raise JSONRPCError("%s `%s` already exist" % (type_, fork_name)) + group_name = None + fork_name_parts = fork_name.split('/') + if len(fork_name_parts) > 1: + group_name = '/'.join(fork_name_parts[:-1]) + repo_group = RepoGroup.get_by_group_name(group_name) + if repo_group is None: + raise JSONRPCError("repo group `%s` not found" % group_name) + if not(HasPermissionAny('hg.admin')() or HasRepoGroupPermissionLevel('write')(group_name)): + raise JSONRPCError("no permission to create repo in %s" % group_name) + else: + if not HasPermissionAny('hg.admin', 'hg.create.repository')(): + raise JSONRPCError("no permission to create top level repo") + if HasPermissionAny('hg.admin')(): pass elif HasRepoPermissionLevel('read')(repo.repo_name): @@ -1418,9 +1441,6 @@ class ApiController(JSONRPCController): raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' ) - - if not HasPermissionAny('hg.create.repository')(): - raise JSONRPCError('no permission to create repositories') else: raise JSONRPCError('repository `%s` does not exist' % (repoid,)) @@ -1430,14 +1450,6 @@ class ApiController(JSONRPCController): owner = get_user_or_error(owner) try: - fork_name_parts = fork_name.split('/') - group_name = None - if len(fork_name_parts) > 1: - group_name = '/'.join(fork_name_parts[:-1]) - repo_group = RepoGroup.get_by_group_name(group_name) - if repo_group is None: - raise JSONRPCError("repo group `%s` not found" % group_name) - form_data = dict( repo_name=fork_name_parts[-1], repo_name_full=fork_name, diff --git a/kallithea/templates/admin/permissions/permissions_globals.html b/kallithea/templates/admin/permissions/permissions_globals.html --- a/kallithea/templates/admin/permissions/permissions_globals.html +++ b/kallithea/templates/admin/permissions/permissions_globals.html @@ -54,7 +54,6 @@ ${h.form(url('admin_permissions'), metho
${h.select('default_repo_create','',c.repo_create_choices,class_='form-control')} ${_('Enable this to allow non-admins to create repositories at the top level.')} - ${_('Note: This will also give all users API access to create repositories everywhere. That might change in future versions.')}
diff --git a/kallithea/tests/api/api_base.py b/kallithea/tests/api/api_base.py --- a/kallithea/tests/api/api_base.py +++ b/kallithea/tests/api/api_base.py @@ -916,20 +916,11 @@ class _BaseTestApi(object): ) response = api_call(self, params) - # Current result when API access control is different from Web: - ret = { - 'msg': 'Created new repository `%s`' % repo_name, - 'success': True, - 'task': None, - } - expected = ret - self._compare_ok(id_, expected, given=response.body) + # API access control match Web access control: + expected = 'no permission to create repo in test_repo_group/api-repo-repo' + self._compare_error(id_, expected, given=response.body) + fixture.destroy_repo(repo_name) - - # Expected and arguably more correct result: - #expected = 'failed to create repository `%s`' % repo_name - #self._compare_error(id_, expected, given=response.body) - fixture.destroy_repo_group(repo_group_name) def test_api_create_repo_unknown_owner(self): @@ -1298,6 +1289,9 @@ class _BaseTestApi(object): '%s/api-repo-fork' % TEST_REPO_GROUP, ]) def test_api_fork_repo_non_admin(self, fork_name): + RepoGroupModel().grant_user_permission(TEST_REPO_GROUP, + self.TEST_USER_LOGIN, + 'group.write') id_, params = _build_data(self.apikey_regular, 'fork_repo', repoid=self.REPO, fork_name=fork_name, @@ -1364,7 +1358,7 @@ class _BaseTestApi(object): fork_name=fork_name, ) response = api_call(self, params) - expected = 'no permission to create repositories' + expected = 'no permission to create top level repo' self._compare_error(id_, expected, given=response.body) fixture.destroy_repo(fork_name)