diff --git a/kallithea/controllers/api/api.py b/kallithea/controllers/api/api.py --- a/kallithea/controllers/api/api.py +++ b/kallithea/controllers/api/api.py @@ -36,7 +36,6 @@ from kallithea.lib.auth import (AuthUser HasUserGroupPermissionLevel) from kallithea.lib.exceptions import DefaultUserException, UserGroupsAssignedException from kallithea.lib.utils import action_logger, repo2db_mapper -from kallithea.lib.utils2 import OAttr, Optional from kallithea.lib.vcs.backends.base import EmptyChangeset from kallithea.lib.vcs.exceptions import EmptyRepositoryError from kallithea.model.changeset_status import ChangesetStatusModel @@ -57,10 +56,10 @@ log = logging.getLogger(__name__) def store_update(updates, attr, name): """ - Stores param in updates dict if it's not instance of Optional - allows easy updates of passed in params + Stores param in updates dict if it's not None (i.e. if user explicitly set + a parameter). This allows easy updates of passed in params. """ - if not isinstance(attr, Optional): + if attr is not None: updates[name] = attr @@ -161,7 +160,7 @@ class ApiController(JSONRPCController): return args @HasPermissionAnyDecorator('hg.admin') - def pull(self, repoid, clone_uri=Optional(None)): + def pull(self, repoid, clone_uri=None): """ Triggers a pull from remote location on given repo. Can be used to automatically keep remote repos up to date. This command can be executed @@ -197,7 +196,7 @@ class ApiController(JSONRPCController): ScmModel().pull_changes(repo.repo_name, request.authuser.username, request.ip_addr, - clone_uri=Optional.extract(clone_uri)) + clone_uri=clone_uri) return dict( msg='Pulled from `%s`' % repo.repo_name, repository=repo.repo_name @@ -209,7 +208,7 @@ class ApiController(JSONRPCController): ) @HasPermissionAnyDecorator('hg.admin') - def rescan_repos(self, remove_obsolete=Optional(False)): + def rescan_repos(self, remove_obsolete=False): """ Triggers rescan repositories action. If remove_obsolete is set than also delete repos that are in database but not in the filesystem. @@ -240,7 +239,7 @@ class ApiController(JSONRPCController): """ try: - rm_obsolete = Optional.extract(remove_obsolete) + rm_obsolete = remove_obsolete added, removed = repo2db_mapper(ScmModel().repo_scan(), remove_obsolete=rm_obsolete) return {'added': added, 'removed': removed} @@ -295,7 +294,7 @@ class ApiController(JSONRPCController): ) @HasPermissionAnyDecorator('hg.admin') - def get_ip(self, userid=Optional(OAttr('apiuser'))): + def get_ip(self, userid=None): """ Shows IP address as seen from Kallithea server, together with all defined IP addresses for given user. If userid is not passed data is @@ -321,7 +320,7 @@ class ApiController(JSONRPCController): } """ - if isinstance(userid, Optional): + if userid is None: userid = request.authuser.user_id user = get_user_or_error(userid) ips = UserIpMap.query().filter(UserIpMap.user == user).all() @@ -352,7 +351,7 @@ class ApiController(JSONRPCController): """ return Setting.get_server_info() - def get_user(self, userid=Optional(OAttr('apiuser'))): + def get_user(self, userid=None): """ Gets a user by username or user_id, Returns empty result if user is not found. If userid param is skipped it is set to id of user who is @@ -397,12 +396,12 @@ class ApiController(JSONRPCController): if not HasPermissionAny('hg.admin')(): # make sure normal user does not pass someone else userid, # he is not allowed to do that - if not isinstance(userid, Optional) and userid != request.authuser.user_id: + if userid is not None and userid != request.authuser.user_id: raise JSONRPCError( 'userid is not the same as your user' ) - if isinstance(userid, Optional): + if userid is None: userid = request.authuser.user_id user = get_user_or_error(userid) @@ -432,11 +431,11 @@ class ApiController(JSONRPCController): ] @HasPermissionAnyDecorator('hg.admin') - def create_user(self, username, email, password=Optional(''), - firstname=Optional(''), lastname=Optional(''), - active=Optional(True), admin=Optional(False), - extern_type=Optional(User.DEFAULT_AUTH_TYPE), - extern_name=Optional('')): + def create_user(self, username, email, password='', + firstname='', lastname='', + active=True, admin=False, + extern_type=User.DEFAULT_AUTH_TYPE, + extern_name=''): """ Creates new user. Returns new user object. This command can be executed only using api_key belonging to user with admin rights. @@ -492,15 +491,15 @@ class ApiController(JSONRPCController): try: user = UserModel().create_or_update( - username=Optional.extract(username), - password=Optional.extract(password), - email=Optional.extract(email), - firstname=Optional.extract(firstname), - lastname=Optional.extract(lastname), - active=Optional.extract(active), - admin=Optional.extract(admin), - extern_type=Optional.extract(extern_type), - extern_name=Optional.extract(extern_name) + username=username, + password=password, + email=email, + firstname=firstname, + lastname=lastname, + active=active, + admin=admin, + extern_type=extern_type, + extern_name=extern_name ) Session().commit() return dict( @@ -512,11 +511,11 @@ class ApiController(JSONRPCController): raise JSONRPCError('failed to create user `%s`' % (username,)) @HasPermissionAnyDecorator('hg.admin') - def update_user(self, userid, username=Optional(None), - email=Optional(None), password=Optional(None), - firstname=Optional(None), lastname=Optional(None), - active=Optional(None), admin=Optional(None), - extern_type=Optional(None), extern_name=Optional(None)): + def update_user(self, userid, username=None, + email=None, password=None, + firstname=None, lastname=None, + active=None, admin=None, + extern_type=None, extern_name=None): """ updates given user if such user exists. This command can be executed only using api_key belonging to user with admin rights. @@ -686,8 +685,8 @@ class ApiController(JSONRPCController): ] @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true') - def create_user_group(self, group_name, description=Optional(''), - owner=Optional(OAttr('apiuser')), active=Optional(True)): + def create_user_group(self, group_name, description='', + owner=None, active=True): """ Creates new user group. This command can be executed only using api_key belonging to user with admin rights or an user who has create user group @@ -727,12 +726,10 @@ class ApiController(JSONRPCController): raise JSONRPCError("user group `%s` already exist" % (group_name,)) try: - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) - active = Optional.extract(active) - description = Optional.extract(description) ug = UserGroupModel().create(name=group_name, description=description, owner=owner, active=active) Session().commit() @@ -745,9 +742,9 @@ class ApiController(JSONRPCController): raise JSONRPCError('failed to create group `%s`' % (group_name,)) # permission check inside - def update_user_group(self, usergroupid, group_name=Optional(''), - description=Optional(''), owner=Optional(None), - active=Optional(True)): + def update_user_group(self, usergroupid, group_name=None, + description=None, owner=None, + active=None): """ Updates given usergroup. This command can be executed only using api_key belonging to user with admin rights or an admin of given user group @@ -786,7 +783,7 @@ class ApiController(JSONRPCController): if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) - if not isinstance(owner, Optional): + if owner is not None: owner = get_user_or_error(owner) updates = {} @@ -963,8 +960,8 @@ class ApiController(JSONRPCController): # permission check inside def get_repo(self, repoid, - with_revision_names=Optional(False), - with_pullrequests=Optional(False)): + with_revision_names=False, + with_pullrequests=False): """ Gets an existing repository by it's name or repository_id. Members will return either users_group or user associated to that repository. This command can be @@ -1064,8 +1061,8 @@ class ApiController(JSONRPCController): for uf in repo.followers ] - data = repo.get_api_data(with_revision_names=Optional.extract(with_revision_names), - with_pullrequests=Optional.extract(with_pullrequests)) + data = repo.get_api_data(with_revision_names=with_revision_names, + with_pullrequests=with_pullrequests) data['members'] = members data['followers'] = followers return data @@ -1112,7 +1109,7 @@ class ApiController(JSONRPCController): # permission check inside def get_repo_nodes(self, repoid, revision, root_path, - ret_type=Optional('all')): + ret_type='all'): """ returns a list of nodes and it's children in a flat list for a given path at given revision. It's possible to specify ret_type to show only `files` or @@ -1147,7 +1144,7 @@ class ApiController(JSONRPCController): if not HasRepoPermissionLevel('read')(repo.repo_name): raise JSONRPCError('repository `%s` does not exist' % (repoid,)) - ret_type = Optional.extract(ret_type) + ret_type = ret_type _map = {} try: _d, _f = ScmModel().get_nodes(repo, revision, root_path, @@ -1168,13 +1165,13 @@ class ApiController(JSONRPCController): ) @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository') - def create_repo(self, repo_name, owner=Optional(OAttr('apiuser')), - repo_type=Optional('hg'), description=Optional(''), - private=Optional(False), clone_uri=Optional(None), - landing_rev=Optional('rev:tip'), - enable_statistics=Optional(False), - enable_downloads=Optional(False), - copy_permissions=Optional(False)): + def create_repo(self, repo_name, owner=None, + repo_type=None, description='', + private=False, clone_uri=None, + landing_rev='rev:tip', + enable_statistics=None, + enable_downloads=None, + copy_permissions=False): """ Creates a repository. The repository name contains the full path, but the parent repository group must exist. For example "foo/bar/baz" require the groups @@ -1225,12 +1222,12 @@ class ApiController(JSONRPCController): """ if not HasPermissionAny('hg.admin')(): - if not isinstance(owner, Optional): + if owner is not None: # forbid setting owner for non-admins raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' ) - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) @@ -1239,20 +1236,15 @@ class ApiController(JSONRPCController): raise JSONRPCError("repo `%s` already exist" % repo_name) defs = Setting.get_default_repo_settings(strip_prefix=True) - if isinstance(private, Optional): - private = defs.get('repo_private') or Optional.extract(private) - if isinstance(repo_type, Optional): + if private is None: + private = defs.get('repo_private') or False + if repo_type is None: repo_type = defs.get('repo_type') - if isinstance(enable_statistics, Optional): + if enable_statistics is None: enable_statistics = defs.get('repo_enable_statistics') - if isinstance(enable_downloads, Optional): + if enable_downloads is None: enable_downloads = defs.get('repo_enable_downloads') - clone_uri = Optional.extract(clone_uri) - description = Optional.extract(description) - landing_rev = Optional.extract(landing_rev) - copy_permissions = Optional.extract(copy_permissions) - try: repo_name_parts = repo_name.split('/') repo_group = None @@ -1291,13 +1283,13 @@ class ApiController(JSONRPCController): 'failed to create repository `%s`' % (repo_name,)) # permission check inside - def update_repo(self, repoid, name=Optional(None), - owner=Optional(OAttr('apiuser')), - group=Optional(None), - description=Optional(''), private=Optional(False), - clone_uri=Optional(None), landing_rev=Optional('rev:tip'), - enable_statistics=Optional(False), - enable_downloads=Optional(False)): + def update_repo(self, repoid, name=None, + owner=None, + group=None, + description=None, private=None, + clone_uri=None, landing_rev=None, + enable_statistics=None, + enable_downloads=None): """ Updates repo @@ -1324,7 +1316,7 @@ class ApiController(JSONRPCController): ): raise JSONRPCError('no permission to create (or move) repositories') - if not isinstance(owner, Optional): + if owner is not None: # forbid setting owner for non-admins raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' @@ -1332,7 +1324,7 @@ class ApiController(JSONRPCController): updates = {} repo_group = group - if not isinstance(repo_group, Optional): + if repo_group is not None: repo_group = get_repo_group_or_error(repo_group) repo_group = repo_group.group_id try: @@ -1358,9 +1350,9 @@ class ApiController(JSONRPCController): @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository') def fork_repo(self, repoid, fork_name, - owner=Optional(OAttr('apiuser')), - description=Optional(''), copy_permissions=Optional(False), - private=Optional(False), landing_rev=Optional('rev:tip')): + owner=None, + description='', copy_permissions=False, + private=False, landing_rev='rev:tip'): """ Creates a fork of given repo. In case of using celery this will immediately return success message, while fork is going to be created @@ -1413,7 +1405,7 @@ class ApiController(JSONRPCController): if HasPermissionAny('hg.admin')(): pass elif HasRepoPermissionLevel('read')(repo.repo_name): - if not isinstance(owner, Optional): + if owner is not None: # forbid setting owner for non-admins raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' @@ -1424,7 +1416,7 @@ class ApiController(JSONRPCController): else: raise JSONRPCError('repository `%s` does not exist' % (repoid,)) - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) @@ -1443,10 +1435,10 @@ class ApiController(JSONRPCController): repo_name_full=fork_name, repo_group=repo_group, repo_type=repo.repo_type, - description=Optional.extract(description), - private=Optional.extract(private), - copy_permissions=Optional.extract(copy_permissions), - landing_rev=Optional.extract(landing_rev), + description=description, + private=private, + copy_permissions=copy_permissions, + landing_rev=landing_rev, update_after_clone=False, fork_parent_id=repo.repo_id, ) @@ -1468,7 +1460,7 @@ class ApiController(JSONRPCController): ) # permission check inside - def delete_repo(self, repoid, forks=Optional('')): + def delete_repo(self, repoid, forks=''): """ Deletes a repository. This command can be executed only using api_key belonging to user with admin rights or regular user that have admin access to repository. @@ -1497,7 +1489,7 @@ class ApiController(JSONRPCController): raise JSONRPCError('repository `%s` does not exist' % (repoid,)) try: - handle_forks = Optional.extract(forks) + handle_forks = forks _forks_msg = '' _forks = [f for f in repo.forks] if handle_forks == 'detach': @@ -1768,10 +1760,10 @@ class ApiController(JSONRPCController): ] @HasPermissionAnyDecorator('hg.admin') - def create_repo_group(self, group_name, description=Optional(''), - owner=Optional(OAttr('apiuser')), - parent=Optional(None), - copy_permissions=Optional(False)): + def create_repo_group(self, group_name, description='', + owner=None, + parent=None, + copy_permissions=False): """ Creates a repository group. This command can be executed only using api_key belonging to user with admin rights. @@ -1808,14 +1800,13 @@ class ApiController(JSONRPCController): if RepoGroup.get_by_group_name(group_name): raise JSONRPCError("repo group `%s` already exist" % (group_name,)) - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id - group_description = Optional.extract(description) - parent_group = Optional.extract(parent) - if not isinstance(parent, Optional): - parent_group = get_repo_group_or_error(parent_group) + group_description = description + parent_group = None + if parent is not None: + parent_group = get_repo_group_or_error(parent) - copy_permissions = Optional.extract(copy_permissions) try: repo_group = RepoGroupModel().create( group_name=group_name, @@ -1835,10 +1826,10 @@ class ApiController(JSONRPCController): raise JSONRPCError('failed to create repo group `%s`' % (group_name,)) @HasPermissionAnyDecorator('hg.admin') - def update_repo_group(self, repogroupid, group_name=Optional(''), - description=Optional(''), - owner=Optional(OAttr('apiuser')), - parent=Optional(None)): + def update_repo_group(self, repogroupid, group_name=None, + description=None, + owner=None, + parent=None): repo_group = get_repo_group_or_error(repogroupid) updates = {} @@ -1902,7 +1893,7 @@ class ApiController(JSONRPCController): # permission check inside def grant_user_permission_to_repo_group(self, repogroupid, userid, - perm, apply_to_children=Optional('none')): + perm, apply_to_children='none'): """ Grant permission for user on given repository group, or update existing one if found. This command can be executed only using api_key belonging @@ -1944,7 +1935,6 @@ class ApiController(JSONRPCController): user = get_user_or_error(userid) perm = get_perm_or_error(perm, prefix='group.') - apply_to_children = Optional.extract(apply_to_children) try: RepoGroupModel().add_permission(repo_group=repo_group, @@ -1967,7 +1957,7 @@ class ApiController(JSONRPCController): # permission check inside def revoke_user_permission_from_repo_group(self, repogroupid, userid, - apply_to_children=Optional('none')): + apply_to_children='none'): """ Revoke permission for user on given repository group. This command can be executed only using api_key belonging to user with admin rights, or @@ -2006,7 +1996,6 @@ class ApiController(JSONRPCController): raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,)) user = get_user_or_error(userid) - apply_to_children = Optional.extract(apply_to_children) try: RepoGroupModel().delete_permission(repo_group=repo_group, @@ -2030,7 +2019,7 @@ class ApiController(JSONRPCController): # permission check inside def grant_user_group_permission_to_repo_group( self, repogroupid, usergroupid, perm, - apply_to_children=Optional('none')): + apply_to_children='none'): """ Grant permission for user group on given repository group, or update existing one if found. This command can be executed only using @@ -2077,8 +2066,6 @@ class ApiController(JSONRPCController): raise JSONRPCError( 'user group `%s` does not exist' % (usergroupid,)) - apply_to_children = Optional.extract(apply_to_children) - try: RepoGroupModel().add_permission(repo_group=repo_group, obj=user_group, @@ -2105,7 +2092,7 @@ class ApiController(JSONRPCController): # permission check inside def revoke_user_group_permission_from_repo_group( self, repogroupid, usergroupid, - apply_to_children=Optional('none')): + apply_to_children='none'): """ Revoke permission for user group on given repository. This command can be executed only using api_key belonging to user with admin rights, or @@ -2147,8 +2134,6 @@ class ApiController(JSONRPCController): raise JSONRPCError( 'user group `%s` does not exist' % (usergroupid,)) - apply_to_children = Optional.extract(apply_to_children) - try: RepoGroupModel().delete_permission(repo_group=repo_group, obj=user_group, @@ -2182,7 +2167,7 @@ class ApiController(JSONRPCController): raise JSONRPCError('gist `%s` does not exist' % (gistid,)) return gist.get_api_data() - def get_gists(self, userid=Optional(OAttr('apiuser'))): + def get_gists(self, userid=None): """ Get all gists for given user. If userid is empty returned gists are for user who called the api @@ -2193,12 +2178,12 @@ class ApiController(JSONRPCController): if not HasPermissionAny('hg.admin')(): # make sure normal user does not pass someone else userid, # he is not allowed to do that - if not isinstance(userid, Optional) and userid != request.authuser.user_id: + if userid is not None and userid != request.authuser.user_id: raise JSONRPCError( 'userid is not the same as your user' ) - if isinstance(userid, Optional): + if userid is None: user_id = request.authuser.user_id else: user_id = get_user_or_error(userid).user_id @@ -2211,9 +2196,9 @@ class ApiController(JSONRPCController): .order_by(Gist.created_on.desc()) ] - def create_gist(self, files, owner=Optional(OAttr('apiuser')), - gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1), - description=Optional('')): + def create_gist(self, files, owner=None, + gist_type=Gist.GIST_PUBLIC, lifetime=-1, + description=''): """ Creates new Gist @@ -2250,13 +2235,10 @@ class ApiController(JSONRPCController): """ try: - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) - description = Optional.extract(description) - gist_type = Optional.extract(gist_type) - lifetime = Optional.extract(lifetime) gist = GistModel().create(description=description, owner=owner, @@ -2273,12 +2255,6 @@ class ApiController(JSONRPCController): log.error(traceback.format_exc()) raise JSONRPCError('failed to create gist') - # def update_gist(self, gistid, files, owner=Optional(OAttr('apiuser')), - # gist_type=Optional(Gist.GIST_PUBLIC), - # gist_lifetime=Optional(-1), gist_description=Optional('')): - # gist = get_gist_or_error(gistid) - # updates = {} - # permission check inside def delete_gist(self, gistid): """ @@ -2342,7 +2318,7 @@ class ApiController(JSONRPCController): raise JSONRPCError('Repository is empty') # permission check inside - def get_changeset(self, repoid, raw_id, with_reviews=Optional(False)): + def get_changeset(self, repoid, raw_id, with_reviews=False): repo = get_repo_or_error(repoid) if not HasRepoPermissionLevel('read')(repo.repo_name): raise JSONRPCError('Access denied to repo %s' % repo.repo_name) @@ -2352,7 +2328,6 @@ class ApiController(JSONRPCController): info = dict(changeset.as_dict()) - with_reviews = Optional.extract(with_reviews) if with_reviews: reviews = ChangesetStatusModel().get_statuses( repo.repo_name, raw_id)