@@ -1522,100 +1522,97 @@ class ApiController(JSONRPCController):
'failed to create repository `%s`' % (repo_name,))
# permission check inside
def update_repo(self, apiuser, repoid, name=Optional(None),
owner=Optional(OAttr('apiuser')),
group=Optional(None),
description=Optional(''), private=Optional(False),
clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
enable_statistics=Optional(False),
enable_locking=Optional(False),
enable_downloads=Optional(False)):
"""
Updates repo
:param apiuser: filled automatically from apikey
:type apiuser: AuthUser
:param repoid: repository name or repository id
:type repoid: str or int
:param name:
:param owner:
:param group:
:param description:
:param private:
:param clone_uri:
:param landing_rev:
:param enable_statistics:
:param enable_locking:
:param enable_downloads:
repo = get_repo_or_error(repoid)
if not HasPermissionAnyApi('hg.admin')(user=apiuser):
# check if we have admin permission for this repo !
if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
repo_name=repo.repo_name):
raise JSONRPCError('repository `%s` does not exist' % (repoid,))
if (name != repo.repo_name and
not HasPermissionAnyApi('hg.create.repository')(user=apiuser)
):
raise JSONRPCError('no permission to create (or move) repositories')
if not isinstance(owner, Optional):
#forbid setting owner for non-admins
raise JSONRPCError(
'Only Kallithea admin can specify `owner` param'
)
updates = {
# update function requires this.
'repo_name': repo.repo_name
}
updates = {}
repo_group = group
if not isinstance(repo_group, Optional):
repo_group = get_repo_group_or_error(repo_group)
repo_group = repo_group.group_id
try:
store_update(updates, name, 'repo_name')
store_update(updates, repo_group, 'repo_group')
store_update(updates, owner, 'user')
store_update(updates, description, 'repo_description')
store_update(updates, private, 'repo_private')
store_update(updates, clone_uri, 'clone_uri')
store_update(updates, landing_rev, 'repo_landing_rev')
store_update(updates, enable_statistics, 'repo_enable_statistics')
store_update(updates, enable_locking, 'repo_enable_locking')
store_update(updates, enable_downloads, 'repo_enable_downloads')
RepoModel().update(repo, **updates)
Session().commit()
return dict(
msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
repository=repo.get_api_data()
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError('failed to update repo `%s`' % repoid)
@HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
def fork_repo(self, apiuser, repoid, fork_name,
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('rev:tip')):
Creates a fork of given repo. In case of using celery this will
immediately return success message, while fork is going to be created
asynchronous. This command can be executed only using api_key belonging to
user with admin rights or regular user that have fork permission, and at least
read access to forking repository. Regular users cannot specify owner parameter.
:param fork_name:
:param copy_permissions:
@@ -277,132 +277,133 @@ class RepoModel(BaseModel):
defaults['repo_name'] = repo_name
defaults['repo_group'] = getattr(group[-1] if group else None,
'group_id', None)
for strip, k in [(0, 'repo_type'), (1, 'repo_enable_downloads'),
(1, 'repo_description'), (1, 'repo_enable_locking'),
(1, 'repo_landing_rev'), (0, 'clone_uri'),
(1, 'repo_private'), (1, 'repo_enable_statistics')]:
attr = k
if strip:
attr = remove_prefix(k, 'repo_')
val = defaults[attr]
if k == 'repo_landing_rev':
val = ':'.join(defaults[attr])
defaults[k] = val
if k == 'clone_uri':
defaults['clone_uri_hidden'] = repo_info.clone_uri_hidden
# fill owner
if repo_info.user:
defaults.update({'user': repo_info.user.username})
else:
replacement_user = User.query().filter(User.admin ==
True).first().username
defaults.update({'user': replacement_user})
# fill repository users
for p in repo_info.repo_to_perm:
defaults.update({'u_perm_%s' % p.user.username:
p.permission.permission_name})
# fill repository groups
for p in repo_info.users_group_to_perm:
defaults.update({'g_perm_%s' % p.users_group.users_group_name:
return defaults
def update(self, repo, **kwargs):
cur_repo = self._get_repo(repo)
org_repo_name = cur_repo.repo_name
if 'user' in kwargs:
cur_repo.user = User.get_by_username(kwargs['user'])
if 'repo_group' in kwargs:
cur_repo.group = RepoGroup.get(kwargs['repo_group'])
cur_repo.repo_name = cur_repo.get_new_name(cur_repo.just_name)
log.debug('Updating repo %s with params:%s', cur_repo, kwargs)
for k in ['repo_enable_downloads',
'repo_description',
'repo_enable_locking',
'repo_landing_rev',
'repo_private',
'repo_enable_statistics',
]:
if k in kwargs:
setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k])
clone_uri = kwargs.get('clone_uri')
if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden:
cur_repo.clone_uri = clone_uri
new_name = cur_repo.get_new_name(kwargs['repo_name'])
cur_repo.repo_name = new_name
if 'repo_name' in kwargs:
cur_repo.repo_name = cur_repo.get_new_name(kwargs['repo_name'])
#if private flag is set, reset default permission to NONE
if kwargs.get('repo_private'):
EMPTY_PERM = 'repository.none'
RepoModel().grant_user_permission(
repo=cur_repo, user='default', perm=EMPTY_PERM
#handle extra fields
for field in filter(lambda k: k.startswith(RepositoryField.PREFIX),
kwargs):
k = RepositoryField.un_prefix_key(field)
ex_field = RepositoryField.get_by_key_name(key=k, repo=cur_repo)
if ex_field:
ex_field.field_value = kwargs[field]
self.sa.add(ex_field)
self.sa.add(cur_repo)
if org_repo_name != new_name:
if org_repo_name != cur_repo.repo_name:
# rename repository
self._rename_filesystem_repo(old=org_repo_name, new=new_name)
self._rename_filesystem_repo(old=org_repo_name, new=cur_repo.repo_name)
return cur_repo
raise
def _create_repo(self, repo_name, repo_type, description, owner,
private=False, clone_uri=None, repo_group=None,
landing_rev='rev:tip', fork_of=None,
copy_fork_permissions=False, enable_statistics=False,
enable_locking=False, enable_downloads=False,
copy_group_permissions=False, state=Repository.STATE_PENDING):
Create repository inside database with PENDING state. This should only be
executed by create() repo, with exception of importing existing repos.
from kallithea.model.scm import ScmModel
owner = self._get_user(owner)
fork_of = self._get_repo(fork_of)
repo_group = self._get_repo_group(repo_group)
repo_name = safe_unicode(repo_name)
description = safe_unicode(description)
# repo name is just a name of repository
# while repo_name_full is a full qualified name that is combined
# with name and path of group
repo_name_full = repo_name
repo_name = repo_name.split(self.URL_SEPARATOR)[-1]
new_repo = Repository()
new_repo.repo_state = state
new_repo.enable_statistics = False
new_repo.repo_name = repo_name_full
new_repo.repo_type = repo_type
new_repo.user = owner
new_repo.group = repo_group
new_repo.description = description or repo_name
new_repo.private = private
new_repo.clone_uri = clone_uri
new_repo.landing_rev = landing_rev
new_repo.enable_statistics = enable_statistics
new_repo.enable_locking = enable_locking
new_repo.enable_downloads = enable_downloads
if repo_group:
@@ -1126,96 +1126,137 @@ class _BaseTestApi(object):
def test_api_create_repo_exception_occurred(self):
repo_name = 'api-repo'
id_, params = _build_data(self.apikey, 'create_repo',
repo_name=repo_name,
owner=TEST_USER_ADMIN_LOGIN,
repo_type=self.REPO_TYPE,)
response = api_call(self, params)
expected = 'failed to create repository `%s`' % repo_name
self._compare_error(id_, expected, given=response.body)
@parameterized.expand([
('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
('description', {'description': 'new description'}),
('active', {'active': True}),
('active', {'active': False}),
('clone_uri', {'clone_uri': 'http://example.com/repo'}),
('clone_uri', {'clone_uri': None}),
('landing_rev', {'landing_rev': 'branch:master'}),
('enable_statistics', {'enable_statistics': True}),
('enable_locking', {'enable_locking': True}),
('enable_downloads', {'enable_downloads': True}),
('name', {'name': 'new_repo_name'}),
('repo_group', {'group': 'test_group_for_update'}),
])
def test_api_update_repo(self, changing_attr, updates):
repo_name = 'api_update_me'
repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
if changing_attr == 'repo_group':
fixture.create_repo_group(updates['group'])
id_, params = _build_data(self.apikey, 'update_repo',
repoid=repo_name, **updates)
if changing_attr == 'name':
repo_name = updates['name']
repo_name = '/'.join([updates['group'], repo_name])
expected = {
'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
'repository': repo.get_api_data()
self._compare_ok(id_, expected, given=response.body)
finally:
fixture.destroy_repo(repo_name)
fixture.destroy_repo_group(updates['group'])
('description', {'description': u'new description'}),
('name', {'name': u'new_repo_name'}),
('repo_group', {'group': u'test_group_for_update'}),
def test_api_update_group_repo(self, changing_attr, updates):
group_name = u'lololo'
fixture.create_repo_group(group_name)
repo_name = u'%s/api_update_me' % group_name
repo = fixture.create_repo(repo_name, repo_group=group_name, repo_type=self.REPO_TYPE)
repo_name = u'%s/%s' % (group_name, updates['name'])
repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
fixture.destroy_repo_group(group_name)
def test_api_update_repo_repo_group_does_not_exist(self):
repo_name = 'admin_owned'
fixture.create_repo(repo_name)
updates = {'group': 'test_group_for_update'}
expected = 'repository group `%s` does not exist' % updates['group']
def test_api_update_repo_regular_user_not_allowed(self):
updates = {'active': False}
id_, params = _build_data(self.apikey_regular, 'update_repo',
expected = 'repository `%s` does not exist' % repo_name
@mock.patch.object(RepoModel, 'update', crash)
def test_api_update_repo_exception_occurred(self):
fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
expected = 'failed to update repo `%s`' % repo_name
def test_api_update_repo_regular_user_change_repo_name(self):
new_repo_name = 'new_repo_name'
RepoModel().grant_user_permission(repo=repo_name,
user=self.TEST_USER_LOGIN,
perm='repository.admin')
UserModel().revoke_perm('default', 'hg.create.repository')
UserModel().grant_perm('default', 'hg.create.none')
Status change: