@@ -1474,196 +1474,193 @@ class ApiController(JSONRPCController):
repo_type = defs.get('repo_type')
if isinstance(enable_statistics, Optional):
enable_statistics = defs.get('repo_enable_statistics')
if isinstance(enable_locking, Optional):
enable_locking = defs.get('repo_enable_locking')
if isinstance(enable_downloads, Optional):
enable_downloads = defs.get('repo_enable_downloads')
clone_uri = Optional.extract(clone_uri)
description = Optional.extract(description)
landing_rev = Optional.extract(landing_rev)
copy_permissions = Optional.extract(copy_permissions)
try:
repo_name_cleaned = repo_name.split('/')[-1]
# create structure of groups and return the last group
repo_group = map_groups(repo_name)
data = dict(
repo_name=repo_name_cleaned,
repo_name_full=repo_name,
repo_type=repo_type,
repo_description=description,
owner=owner,
repo_private=private,
clone_uri=clone_uri,
repo_group=repo_group,
repo_landing_rev=landing_rev,
enable_statistics=enable_statistics,
enable_locking=enable_locking,
enable_downloads=enable_downloads,
repo_copy_permissions=copy_permissions,
)
task = RepoModel().create(form_data=data, cur_user=owner)
from celery.result import BaseAsyncResult
task_id = None
if isinstance(task, BaseAsyncResult):
task_id = task.task_id
# no commit, it's done in RepoModel, or async via celery
return dict(
msg="Created new repository `%s`" % (repo_name,),
success=True, # cannot return the repo data here since fork
# can be done async
task=task_id
except Exception:
log.error(traceback.format_exc())
raise JSONRPCError(
'failed to create repository `%s`' % (repo_name,))
# permission check inside
def update_repo(self, apiuser, repoid, name=Optional(None),
owner=Optional(OAttr('apiuser')),
group=Optional(None),
description=Optional(''), private=Optional(False),
clone_uri=Optional(None), landing_rev=Optional('rev:tip'),
enable_statistics=Optional(False),
enable_locking=Optional(False),
enable_downloads=Optional(False)):
"""
Updates repo
:param apiuser: filled automatically from apikey
:type apiuser: AuthUser
:param repoid: repository name or repository id
:type repoid: str or int
:param name:
:param owner:
:param group:
:param description:
:param private:
:param clone_uri:
:param landing_rev:
:param enable_statistics:
:param enable_locking:
:param enable_downloads:
repo = get_repo_or_error(repoid)
if not HasPermissionAnyApi('hg.admin')(user=apiuser):
# check if we have admin permission for this repo !
if not HasRepoPermissionAnyApi('repository.admin')(user=apiuser,
repo_name=repo.repo_name):
raise JSONRPCError('repository `%s` does not exist' % (repoid,))
if (name != repo.repo_name and
not HasPermissionAnyApi('hg.create.repository')(user=apiuser)
):
raise JSONRPCError('no permission to create (or move) repositories')
if not isinstance(owner, Optional):
#forbid setting owner for non-admins
'Only Kallithea admin can specify `owner` param'
updates = {
# update function requires this.
'repo_name': repo.repo_name
}
updates = {}
repo_group = group
if not isinstance(repo_group, Optional):
repo_group = get_repo_group_or_error(repo_group)
repo_group = repo_group.group_id
store_update(updates, name, 'repo_name')
store_update(updates, repo_group, 'repo_group')
store_update(updates, owner, 'user')
store_update(updates, description, 'repo_description')
store_update(updates, private, 'repo_private')
store_update(updates, clone_uri, 'clone_uri')
store_update(updates, landing_rev, 'repo_landing_rev')
store_update(updates, enable_statistics, 'repo_enable_statistics')
store_update(updates, enable_locking, 'repo_enable_locking')
store_update(updates, enable_downloads, 'repo_enable_downloads')
RepoModel().update(repo, **updates)
Session().commit()
msg='updated repo ID:%s %s' % (repo.repo_id, repo.repo_name),
repository=repo.get_api_data()
raise JSONRPCError('failed to update repo `%s`' % repoid)
@HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository')
def fork_repo(self, apiuser, repoid, fork_name,
description=Optional(''), copy_permissions=Optional(False),
private=Optional(False), landing_rev=Optional('rev:tip')):
Creates a fork of given repo. In case of using celery this will
immediately return success message, while fork is going to be created
asynchronous. This command can be executed only using api_key belonging to
user with admin rights or regular user that have fork permission, and at least
read access to forking repository. Regular users cannot specify owner parameter.
:param fork_name:
:param copy_permissions:
INPUT::
id : <id_for_response>
api_key : "<api_key>"
args: {
"repoid" : "<reponame or repo_id>",
"fork_name": "<forkname>",
"owner": "<username or user_id = Optional(=apiuser)>",
"description": "<description>",
"copy_permissions": "<bool>",
"private": "<bool>",
"landing_rev": "<landing_rev>"
OUTPUT::
id : <id_given_in_input>
result: {
"msg": "Created fork of `<reponame>` as `<forkname>`",
"success": true,
"task": "<celery task id or None if done sync>"
error: null
repo_name = repo.repo_name
_repo = RepoModel().get_by_repo_name(fork_name)
if _repo:
type_ = 'fork' if _repo.fork else 'repo'
raise JSONRPCError("%s `%s` already exist" % (type_, fork_name))
if HasPermissionAnyApi('hg.admin')(user=apiuser):
pass
elif HasRepoPermissionAnyApi('repository.admin',
'repository.write',
'repository.read')(user=apiuser,
if not HasPermissionAnyApi('hg.create.repository')(user=apiuser):
raise JSONRPCError('no permission to create repositories')
@@ -229,228 +229,229 @@ class RepoModel(BaseModel):
continue
cs_cache = repo.changeset_cache
row = {
"menu": quick_menu(repo.repo_name),
"raw_name": repo.repo_name.lower(),
"name": repo_lnk(repo.repo_name, repo.repo_type,
repo.repo_state, repo.private, repo.fork),
"last_change": last_change(repo.last_db_change),
"last_changeset": last_rev(repo.repo_name, cs_cache),
"last_rev_raw": cs_cache.get('revision'),
"desc": desc(repo.description),
"owner": h.person(repo.user),
"state": state(repo.repo_state),
"rss": rss_lnk(repo.repo_name),
"atom": atom_lnk(repo.repo_name),
if admin:
row.update({
"action": repo_actions(repo.repo_name),
"owner": owner_actions(repo.user.user_id,
h.person(repo.user))
})
repos_data.append(row)
return {
"totalRecords": len(repos_list),
"startIndex": 0,
"sort": "name",
"dir": "asc",
"records": repos_data
def _get_defaults(self, repo_name):
Gets information about repository, and returns a dict for
usage in forms
:param repo_name:
repo_info = Repository.get_by_repo_name(repo_name)
if repo_info is None:
return None
defaults = repo_info.get_dict()
group, repo_name, repo_name_full = repo_info.groups_and_repo
defaults['repo_name'] = repo_name
defaults['repo_group'] = getattr(group[-1] if group else None,
'group_id', None)
for strip, k in [(0, 'repo_type'), (1, 'repo_enable_downloads'),
(1, 'repo_description'), (1, 'repo_enable_locking'),
(1, 'repo_landing_rev'), (0, 'clone_uri'),
(1, 'repo_private'), (1, 'repo_enable_statistics')]:
attr = k
if strip:
attr = remove_prefix(k, 'repo_')
val = defaults[attr]
if k == 'repo_landing_rev':
val = ':'.join(defaults[attr])
defaults[k] = val
if k == 'clone_uri':
defaults['clone_uri_hidden'] = repo_info.clone_uri_hidden
# fill owner
if repo_info.user:
defaults.update({'user': repo_info.user.username})
else:
replacement_user = User.query().filter(User.admin ==
True).first().username
defaults.update({'user': replacement_user})
# fill repository users
for p in repo_info.repo_to_perm:
defaults.update({'u_perm_%s' % p.user.username:
p.permission.permission_name})
# fill repository groups
for p in repo_info.users_group_to_perm:
defaults.update({'g_perm_%s' % p.users_group.users_group_name:
return defaults
def update(self, repo, **kwargs):
cur_repo = self._get_repo(repo)
org_repo_name = cur_repo.repo_name
if 'user' in kwargs:
cur_repo.user = User.get_by_username(kwargs['user'])
if 'repo_group' in kwargs:
cur_repo.group = RepoGroup.get(kwargs['repo_group'])
cur_repo.repo_name = cur_repo.get_new_name(cur_repo.just_name)
log.debug('Updating repo %s with params:%s', cur_repo, kwargs)
for k in ['repo_enable_downloads',
'repo_description',
'repo_enable_locking',
'repo_landing_rev',
'repo_private',
'repo_enable_statistics',
]:
if k in kwargs:
setattr(cur_repo, remove_prefix(k, 'repo_'), kwargs[k])
clone_uri = kwargs.get('clone_uri')
if clone_uri is not None and clone_uri != cur_repo.clone_uri_hidden:
cur_repo.clone_uri = clone_uri
new_name = cur_repo.get_new_name(kwargs['repo_name'])
cur_repo.repo_name = new_name
if 'repo_name' in kwargs:
cur_repo.repo_name = cur_repo.get_new_name(kwargs['repo_name'])
#if private flag is set, reset default permission to NONE
if kwargs.get('repo_private'):
EMPTY_PERM = 'repository.none'
RepoModel().grant_user_permission(
repo=cur_repo, user='default', perm=EMPTY_PERM
#handle extra fields
for field in filter(lambda k: k.startswith(RepositoryField.PREFIX),
kwargs):
k = RepositoryField.un_prefix_key(field)
ex_field = RepositoryField.get_by_key_name(key=k, repo=cur_repo)
if ex_field:
ex_field.field_value = kwargs[field]
self.sa.add(ex_field)
self.sa.add(cur_repo)
if org_repo_name != new_name:
if org_repo_name != cur_repo.repo_name:
# rename repository
self._rename_filesystem_repo(old=org_repo_name, new=new_name)
self._rename_filesystem_repo(old=org_repo_name, new=cur_repo.repo_name)
return cur_repo
raise
def _create_repo(self, repo_name, repo_type, description, owner,
private=False, clone_uri=None, repo_group=None,
landing_rev='rev:tip', fork_of=None,
copy_fork_permissions=False, enable_statistics=False,
enable_locking=False, enable_downloads=False,
copy_group_permissions=False, state=Repository.STATE_PENDING):
Create repository inside database with PENDING state. This should only be
executed by create() repo, with exception of importing existing repos.
from kallithea.model.scm import ScmModel
owner = self._get_user(owner)
fork_of = self._get_repo(fork_of)
repo_group = self._get_repo_group(repo_group)
repo_name = safe_unicode(repo_name)
description = safe_unicode(description)
# repo name is just a name of repository
# while repo_name_full is a full qualified name that is combined
# with name and path of group
repo_name_full = repo_name
repo_name = repo_name.split(self.URL_SEPARATOR)[-1]
new_repo = Repository()
new_repo.repo_state = state
new_repo.enable_statistics = False
new_repo.repo_name = repo_name_full
new_repo.repo_type = repo_type
new_repo.user = owner
new_repo.group = repo_group
new_repo.description = description or repo_name
new_repo.private = private
new_repo.clone_uri = clone_uri
new_repo.landing_rev = landing_rev
new_repo.enable_statistics = enable_statistics
new_repo.enable_locking = enable_locking
new_repo.enable_downloads = enable_downloads
if repo_group:
new_repo.enable_locking = repo_group.enable_locking
if fork_of:
parent_repo = fork_of
new_repo.fork = parent_repo
self.sa.add(new_repo)
if fork_of and copy_fork_permissions:
repo = fork_of
user_perms = UserRepoToPerm.query() \
.filter(UserRepoToPerm.repository == repo).all()
group_perms = UserGroupRepoToPerm.query() \
.filter(UserGroupRepoToPerm.repository == repo).all()
for perm in user_perms:
UserRepoToPerm.create(perm.user, new_repo, perm.permission)
for perm in group_perms:
UserGroupRepoToPerm.create(perm.users_group, new_repo,
perm.permission)
elif repo_group and copy_group_permissions:
user_perms = UserRepoGroupToPerm.query() \
.filter(UserRepoGroupToPerm.group == repo_group).all()
group_perms = UserGroupRepoGroupToPerm.query() \
.filter(UserGroupRepoGroupToPerm.group == repo_group).all()
perm_name = perm.permission.permission_name.replace('group.', 'repository.')
perm_obj = Permission.get_by_key(perm_name)
UserRepoToPerm.create(perm.user, new_repo, perm_obj)
UserGroupRepoToPerm.create(perm.users_group, new_repo, perm_obj)
perm_obj = self._create_default_perms(new_repo, private)
self.sa.add(perm_obj)
# now automatically start following this repository as owner
ScmModel(self.sa).toggle_following_repo(new_repo.repo_id,
owner.user_id)
# we need to flush here, in order to check if database won't
@@ -1078,192 +1078,233 @@ class _BaseTestApi(object):
expected = ret
self._compare_ok(id_, expected, given=response.body)
fixture.destroy_repo(repo_name)
def test_api_create_repo_by_non_admin(self):
repo_name = 'api-repo'
owner = 'i-dont-exist'
id_, params = _build_data(self.apikey_regular, 'create_repo',
repo_name=repo_name,
repo_type=self.REPO_TYPE,
response = api_call(self, params)
repo = RepoModel().get_by_repo_name(repo_name)
self.assertNotEqual(repo, None)
ret = {
'msg': 'Created new repository `%s`' % repo_name,
'success': True,
'task': None,
def test_api_create_repo_by_non_admin_specify_owner(self):
owner=owner)
expected = 'Only Kallithea admin can specify `owner` param'
self._compare_error(id_, expected, given=response.body)
def test_api_create_repo_exists(self):
repo_name = self.REPO
id_, params = _build_data(self.apikey, 'create_repo',
owner=TEST_USER_ADMIN_LOGIN,
repo_type=self.REPO_TYPE,)
expected = "repo `%s` already exist" % repo_name
@mock.patch.object(RepoModel, 'create', crash)
def test_api_create_repo_exception_occurred(self):
expected = 'failed to create repository `%s`' % repo_name
@parameterized.expand([
('owner', {'owner': TEST_USER_REGULAR_LOGIN}),
('description', {'description': 'new description'}),
('active', {'active': True}),
('active', {'active': False}),
('clone_uri', {'clone_uri': 'http://example.com/repo'}),
('clone_uri', {'clone_uri': None}),
('landing_rev', {'landing_rev': 'branch:master'}),
('enable_statistics', {'enable_statistics': True}),
('enable_locking', {'enable_locking': True}),
('enable_downloads', {'enable_downloads': True}),
('name', {'name': 'new_repo_name'}),
('repo_group', {'group': 'test_group_for_update'}),
])
def test_api_update_repo(self, changing_attr, updates):
repo_name = 'api_update_me'
repo = fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
if changing_attr == 'repo_group':
fixture.create_repo_group(updates['group'])
id_, params = _build_data(self.apikey, 'update_repo',
repoid=repo_name, **updates)
if changing_attr == 'name':
repo_name = updates['name']
repo_name = '/'.join([updates['group'], repo_name])
expected = {
'msg': 'updated repo ID:%s %s' % (repo.repo_id, repo_name),
'repository': repo.get_api_data()
finally:
fixture.destroy_repo_group(updates['group'])
('description', {'description': u'new description'}),
('name', {'name': u'new_repo_name'}),
('repo_group', {'group': u'test_group_for_update'}),
def test_api_update_group_repo(self, changing_attr, updates):
group_name = u'lololo'
fixture.create_repo_group(group_name)
repo_name = u'%s/api_update_me' % group_name
repo = fixture.create_repo(repo_name, repo_group=group_name, repo_type=self.REPO_TYPE)
repo_name = u'%s/%s' % (group_name, updates['name'])
repo_name = u'/'.join([updates['group'], repo_name.rsplit('/', 1)[-1]])
fixture.destroy_repo_group(group_name)
def test_api_update_repo_repo_group_does_not_exist(self):
repo_name = 'admin_owned'
fixture.create_repo(repo_name)
updates = {'group': 'test_group_for_update'}
expected = 'repository group `%s` does not exist' % updates['group']
def test_api_update_repo_regular_user_not_allowed(self):
updates = {'active': False}
id_, params = _build_data(self.apikey_regular, 'update_repo',
expected = 'repository `%s` does not exist' % repo_name
@mock.patch.object(RepoModel, 'update', crash)
def test_api_update_repo_exception_occurred(self):
fixture.create_repo(repo_name, repo_type=self.REPO_TYPE)
repoid=repo_name, owner=TEST_USER_ADMIN_LOGIN,)
expected = 'failed to update repo `%s`' % repo_name
def test_api_update_repo_regular_user_change_repo_name(self):
new_repo_name = 'new_repo_name'
RepoModel().grant_user_permission(repo=repo_name,
user=self.TEST_USER_LOGIN,
perm='repository.admin')
UserModel().revoke_perm('default', 'hg.create.repository')
UserModel().grant_perm('default', 'hg.create.none')
updates = {'name': new_repo_name}
expected = 'no permission to create (or move) repositories'
fixture.destroy_repo(new_repo_name)
def test_api_update_repo_regular_user_change_repo_name_allowed(self):
UserModel().revoke_perm('default', 'hg.create.none')
UserModel().grant_perm('default', 'hg.create.repository')
'msg': 'updated repo ID:%s %s' % (repo.repo_id, new_repo_name),
def test_api_update_repo_regular_user_change_owner(self):
updates = {'owner': TEST_USER_ADMIN_LOGIN}
Status change: