@@ -1564,16 +1564,13 @@ class ApiController(JSONRPCController):
if not isinstance(owner, Optional):
#forbid setting owner for non-admins
raise JSONRPCError(
'Only Kallithea admin can specify `owner` param'
)
updates = {
# update function requires this.
'repo_name': repo.repo_name
}
updates = {}
repo_group = group
if not isinstance(repo_group, Optional):
repo_group = get_repo_group_or_error(repo_group)
repo_group = repo_group.group_id
try:
store_update(updates, name, 'repo_name')
@@ -319,12 +319,13 @@ class RepoModel(BaseModel):
org_repo_name = cur_repo.repo_name
if 'user' in kwargs:
cur_repo.user = User.get_by_username(kwargs['user'])
if 'repo_group' in kwargs:
cur_repo.group = RepoGroup.get(kwargs['repo_group'])
cur_repo.repo_name = cur_repo.get_new_name(cur_repo.just_name)
log.debug('Updating repo %s with params:%s', cur_repo, kwargs)
for k in ['repo_enable_downloads',
'repo_description',
'repo_enable_locking',
'repo_landing_rev',
'repo_private',
@@ -333,16 +334,16 @@ class RepoModel(BaseModel):
if k in kwargs:
setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k])
clone_uri = kwargs.get('clone_uri')
if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden:
cur_repo.clone_uri = clone_uri
new_name = cur_repo.get_new_name(kwargs['repo_name'])
cur_repo.repo_name = new_name
if 'repo_name' in kwargs:
cur_repo.repo_name = cur_repo.get_new_name(kwargs['repo_name'])
#if private flag is set, reset default permission to NONE
if kwargs.get('repo_private'):
EMPTY_PERM = 'repository.none'
RepoModel().grant_user_permission(
repo=cur_repo, user='default', perm=EMPTY_PERM
#handle extra fields
@@ -352,15 +353,15 @@ class RepoModel(BaseModel):
ex_field = RepositoryField.get_by_key_name(key=k, repo=cur_repo)
if ex_field:
ex_field.field_value = kwargs[field]
self.sa.add(ex_field)
self.sa.add(cur_repo)
if org_repo_name != new_name:
if org_repo_name != cur_repo.repo_name:
# rename repository
self._rename_filesystem_repo(old=org_repo_name, new=new_name)
self._rename_filesystem_repo(old=org_repo_name, new=cur_repo.repo_name)
return cur_repo
except Exception:
log.error(traceback.format_exc())
raise
@@ -1168,12 +1168,53 @@ class _BaseTestApi(object):
self._compare_ok(id_, expected, given=response.body)
finally:
fixture.destroy_repo(repo_name)
if changing_attr == 'repo_group':
fixture.destroy_repo_group(updates['group'])
@parameterized.expand([
('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
('description', {'description': u'new description'}),
('active', {'active': True}),
('active', {'active': False}),
('clone_uri', {'clone_uri': 'http://example.com/repo'}),
('clone_uri', {'clone_uri': None}),
('landing_rev', {'landing_rev': 'branch:master'}),
('enable_statistics', {'enable_statistics': True}),
('enable_locking', {'enable_locking': True}),
('enable_downloads', {'enable_downloads': True}),
('name', {'name': u'new_repo_name'}),
('repo_group', {'group': u'test_group_for_update'}),
])
def test_api_update_group_repo(self, changing_attr, updates):
group_name = u'lololo'
fixture.create_repo_group(group_name)
repo_name = u'%s/api_update_me' % group_name
repo = fixture.create_repo(repo_name, repo_group=group_name, repo_type=self.REPO_TYPE)
fixture.create_repo_group(updates['group'])
id_, params = _build_data(self.apikey, 'update_repo',
repoid=repo_name, **updates)
response = api_call(self, params)
if changing_attr == 'name':
repo_name = u'%s/%s' % (group_name, updates['name'])
repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
expected = {
'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
'repository': repo.get_api_data()
fixture.destroy_repo_group(group_name)
def test_api_update_repo_repo_group_does_not_exist(self):
repo_name = 'admin_owned'
fixture.create_repo(repo_name)
updates = {'group': 'test_group_for_update'}
Status change: