# HG changeset patch # User Thomas De Schampheleire # Date 2020-10-09 19:43:18 # Node ID 4791487dbec133388d2f3feb9c0c9b2e518ea3c1 # Parent 26fcb8c16c2c1def4ef664c7ceb31daa4e828cb6 api: stop using 'Optional', 'OAttr'/'OptionalAttr' classes There does not seem to be a good reason to use the 'Optional' and 'OptionalAttr' classes. It makes the code harder to understand. And worse, the 'default value' specified is not always used, which can thus give false information to users. The way Optional was used in the API calls is twofold: 1.either by effectively extracting a value, via Optional.extract(param). If 'param' was indeed specified by the user, then this would yield that user-specified value. Otherwise, it would yield the value declared in the parameter declaration, e.g. param=Optional(defaultvalue). 2.or by checking if a variable is an instance of the Optional class. In case a user effectively passed a value, this value will not be of the Optional class. So if a parameter is an object of class Optional, we know the user did not pass a value, and we can apply some default. In the declaration of the parameter, the specified default value will only be used if the 'extract' method is used, i.e. method 1 above. A simpler way to address this problem of default values is just with Python default values, using 'None' as magic value if the default will be calculated inside the method. The docstrings still specify something like: type: Optional(bool) which is humanly readable and does not necessarily refer to a class called 'Optional', so such strings are kept. diff --git a/kallithea/controllers/api/api.py b/kallithea/controllers/api/api.py --- a/kallithea/controllers/api/api.py +++ b/kallithea/controllers/api/api.py @@ -36,7 +36,6 @@ from kallithea.lib.auth import (AuthUser HasUserGroupPermissionLevel) from kallithea.lib.exceptions import DefaultUserException, UserGroupsAssignedException from kallithea.lib.utils import action_logger, repo2db_mapper -from kallithea.lib.utils2 import OAttr, Optional from kallithea.lib.vcs.backends.base import EmptyChangeset from kallithea.lib.vcs.exceptions import EmptyRepositoryError from kallithea.model.changeset_status import ChangesetStatusModel @@ -57,10 +56,10 @@ log = logging.getLogger(__name__) def store_update(updates, attr, name): """ - Stores param in updates dict if it's not instance of Optional - allows easy updates of passed in params + Stores param in updates dict if it's not None (i.e. if user explicitly set + a parameter). This allows easy updates of passed in params. """ - if not isinstance(attr, Optional): + if attr is not None: updates[name] = attr @@ -161,7 +160,7 @@ class ApiController(JSONRPCController): return args @HasPermissionAnyDecorator('hg.admin') - def pull(self, repoid, clone_uri=Optional(None)): + def pull(self, repoid, clone_uri=None): """ Triggers a pull from remote location on given repo. Can be used to automatically keep remote repos up to date. This command can be executed @@ -197,7 +196,7 @@ class ApiController(JSONRPCController): ScmModel().pull_changes(repo.repo_name, request.authuser.username, request.ip_addr, - clone_uri=Optional.extract(clone_uri)) + clone_uri=clone_uri) return dict( msg='Pulled from `%s`' % repo.repo_name, repository=repo.repo_name @@ -209,7 +208,7 @@ class ApiController(JSONRPCController): ) @HasPermissionAnyDecorator('hg.admin') - def rescan_repos(self, remove_obsolete=Optional(False)): + def rescan_repos(self, remove_obsolete=False): """ Triggers rescan repositories action. If remove_obsolete is set than also delete repos that are in database but not in the filesystem. @@ -240,7 +239,7 @@ class ApiController(JSONRPCController): """ try: - rm_obsolete = Optional.extract(remove_obsolete) + rm_obsolete = remove_obsolete added, removed = repo2db_mapper(ScmModel().repo_scan(), remove_obsolete=rm_obsolete) return {'added': added, 'removed': removed} @@ -295,7 +294,7 @@ class ApiController(JSONRPCController): ) @HasPermissionAnyDecorator('hg.admin') - def get_ip(self, userid=Optional(OAttr('apiuser'))): + def get_ip(self, userid=None): """ Shows IP address as seen from Kallithea server, together with all defined IP addresses for given user. If userid is not passed data is @@ -321,7 +320,7 @@ class ApiController(JSONRPCController): } """ - if isinstance(userid, Optional): + if userid is None: userid = request.authuser.user_id user = get_user_or_error(userid) ips = UserIpMap.query().filter(UserIpMap.user == user).all() @@ -352,7 +351,7 @@ class ApiController(JSONRPCController): """ return Setting.get_server_info() - def get_user(self, userid=Optional(OAttr('apiuser'))): + def get_user(self, userid=None): """ Gets a user by username or user_id, Returns empty result if user is not found. If userid param is skipped it is set to id of user who is @@ -397,12 +396,12 @@ class ApiController(JSONRPCController): if not HasPermissionAny('hg.admin')(): # make sure normal user does not pass someone else userid, # he is not allowed to do that - if not isinstance(userid, Optional) and userid != request.authuser.user_id: + if userid is not None and userid != request.authuser.user_id: raise JSONRPCError( 'userid is not the same as your user' ) - if isinstance(userid, Optional): + if userid is None: userid = request.authuser.user_id user = get_user_or_error(userid) @@ -432,11 +431,11 @@ class ApiController(JSONRPCController): ] @HasPermissionAnyDecorator('hg.admin') - def create_user(self, username, email, password=Optional(''), - firstname=Optional(''), lastname=Optional(''), - active=Optional(True), admin=Optional(False), - extern_type=Optional(User.DEFAULT_AUTH_TYPE), - extern_name=Optional('')): + def create_user(self, username, email, password='', + firstname='', lastname='', + active=True, admin=False, + extern_type=User.DEFAULT_AUTH_TYPE, + extern_name=''): """ Creates new user. Returns new user object. This command can be executed only using api_key belonging to user with admin rights. @@ -492,15 +491,15 @@ class ApiController(JSONRPCController): try: user = UserModel().create_or_update( - username=Optional.extract(username), - password=Optional.extract(password), - email=Optional.extract(email), - firstname=Optional.extract(firstname), - lastname=Optional.extract(lastname), - active=Optional.extract(active), - admin=Optional.extract(admin), - extern_type=Optional.extract(extern_type), - extern_name=Optional.extract(extern_name) + username=username, + password=password, + email=email, + firstname=firstname, + lastname=lastname, + active=active, + admin=admin, + extern_type=extern_type, + extern_name=extern_name ) Session().commit() return dict( @@ -512,11 +511,11 @@ class ApiController(JSONRPCController): raise JSONRPCError('failed to create user `%s`' % (username,)) @HasPermissionAnyDecorator('hg.admin') - def update_user(self, userid, username=Optional(None), - email=Optional(None), password=Optional(None), - firstname=Optional(None), lastname=Optional(None), - active=Optional(None), admin=Optional(None), - extern_type=Optional(None), extern_name=Optional(None)): + def update_user(self, userid, username=None, + email=None, password=None, + firstname=None, lastname=None, + active=None, admin=None, + extern_type=None, extern_name=None): """ updates given user if such user exists. This command can be executed only using api_key belonging to user with admin rights. @@ -686,8 +685,8 @@ class ApiController(JSONRPCController): ] @HasPermissionAnyDecorator('hg.admin', 'hg.usergroup.create.true') - def create_user_group(self, group_name, description=Optional(''), - owner=Optional(OAttr('apiuser')), active=Optional(True)): + def create_user_group(self, group_name, description='', + owner=None, active=True): """ Creates new user group. This command can be executed only using api_key belonging to user with admin rights or an user who has create user group @@ -727,12 +726,10 @@ class ApiController(JSONRPCController): raise JSONRPCError("user group `%s` already exist" % (group_name,)) try: - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) - active = Optional.extract(active) - description = Optional.extract(description) ug = UserGroupModel().create(name=group_name, description=description, owner=owner, active=active) Session().commit() @@ -745,9 +742,9 @@ class ApiController(JSONRPCController): raise JSONRPCError('failed to create group `%s`' % (group_name,)) # permission check inside - def update_user_group(self, usergroupid, group_name=Optional(''), - description=Optional(''), owner=Optional(None), - active=Optional(True)): + def update_user_group(self, usergroupid, group_name=None, + description=None, owner=None, + active=None): """ Updates given usergroup. This command can be executed only using api_key belonging to user with admin rights or an admin of given user group @@ -786,7 +783,7 @@ class ApiController(JSONRPCController): if not HasUserGroupPermissionLevel('admin')(user_group.users_group_name): raise JSONRPCError('user group `%s` does not exist' % (usergroupid,)) - if not isinstance(owner, Optional): + if owner is not None: owner = get_user_or_error(owner) updates = {} @@ -963,8 +960,8 @@ class ApiController(JSONRPCController): # permission check inside def get_repo(self, repoid, - with_revision_names=Optional(False), - with_pullrequests=Optional(False)): + with_revision_names=False, + with_pullrequests=False): """ Gets an existing repository by it's name or repository_id. Members will return either users_group or user associated to that repository. This command can be @@ -1064,8 +1061,8 @@ class ApiController(JSONRPCController): for uf in repo.followers ] - data = repo.get_api_data(with_revision_names=Optional.extract(with_revision_names), - with_pullrequests=Optional.extract(with_pullrequests)) + data = repo.get_api_data(with_revision_names=with_revision_names, + with_pullrequests=with_pullrequests) data['members'] = members data['followers'] = followers return data @@ -1112,7 +1109,7 @@ class ApiController(JSONRPCController): # permission check inside def get_repo_nodes(self, repoid, revision, root_path, - ret_type=Optional('all')): + ret_type='all'): """ returns a list of nodes and it's children in a flat list for a given path at given revision. It's possible to specify ret_type to show only `files` or @@ -1147,7 +1144,7 @@ class ApiController(JSONRPCController): if not HasRepoPermissionLevel('read')(repo.repo_name): raise JSONRPCError('repository `%s` does not exist' % (repoid,)) - ret_type = Optional.extract(ret_type) + ret_type = ret_type _map = {} try: _d, _f = ScmModel().get_nodes(repo, revision, root_path, @@ -1168,13 +1165,13 @@ class ApiController(JSONRPCController): ) @HasPermissionAnyDecorator('hg.admin', 'hg.create.repository') - def create_repo(self, repo_name, owner=Optional(OAttr('apiuser')), - repo_type=Optional('hg'), description=Optional(''), - private=Optional(False), clone_uri=Optional(None), - landing_rev=Optional('rev:tip'), - enable_statistics=Optional(False), - enable_downloads=Optional(False), - copy_permissions=Optional(False)): + def create_repo(self, repo_name, owner=None, + repo_type=None, description='', + private=False, clone_uri=None, + landing_rev='rev:tip', + enable_statistics=None, + enable_downloads=None, + copy_permissions=False): """ Creates a repository. The repository name contains the full path, but the parent repository group must exist. For example "foo/bar/baz" require the groups @@ -1225,12 +1222,12 @@ class ApiController(JSONRPCController): """ if not HasPermissionAny('hg.admin')(): - if not isinstance(owner, Optional): + if owner is not None: # forbid setting owner for non-admins raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' ) - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) @@ -1239,20 +1236,15 @@ class ApiController(JSONRPCController): raise JSONRPCError("repo `%s` already exist" % repo_name) defs = Setting.get_default_repo_settings(strip_prefix=True) - if isinstance(private, Optional): - private = defs.get('repo_private') or Optional.extract(private) - if isinstance(repo_type, Optional): + if private is None: + private = defs.get('repo_private') or False + if repo_type is None: repo_type = defs.get('repo_type') - if isinstance(enable_statistics, Optional): + if enable_statistics is None: enable_statistics = defs.get('repo_enable_statistics') - if isinstance(enable_downloads, Optional): + if enable_downloads is None: enable_downloads = defs.get('repo_enable_downloads') - clone_uri = Optional.extract(clone_uri) - description = Optional.extract(description) - landing_rev = Optional.extract(landing_rev) - copy_permissions = Optional.extract(copy_permissions) - try: repo_name_parts = repo_name.split('/') repo_group = None @@ -1291,13 +1283,13 @@ class ApiController(JSONRPCController): 'failed to create repository `%s`' % (repo_name,)) # permission check inside - def update_repo(self, repoid, name=Optional(None), - owner=Optional(OAttr('apiuser')), - group=Optional(None), - description=Optional(''), private=Optional(False), - clone_uri=Optional(None), landing_rev=Optional('rev:tip'), - enable_statistics=Optional(False), - enable_downloads=Optional(False)): + def update_repo(self, repoid, name=None, + owner=None, + group=None, + description=None, private=None, + clone_uri=None, landing_rev=None, + enable_statistics=None, + enable_downloads=None): """ Updates repo @@ -1324,7 +1316,7 @@ class ApiController(JSONRPCController): ): raise JSONRPCError('no permission to create (or move) repositories') - if not isinstance(owner, Optional): + if owner is not None: # forbid setting owner for non-admins raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' @@ -1332,7 +1324,7 @@ class ApiController(JSONRPCController): updates = {} repo_group = group - if not isinstance(repo_group, Optional): + if repo_group is not None: repo_group = get_repo_group_or_error(repo_group) repo_group = repo_group.group_id try: @@ -1358,9 +1350,9 @@ class ApiController(JSONRPCController): @HasPermissionAnyDecorator('hg.admin', 'hg.fork.repository') def fork_repo(self, repoid, fork_name, - owner=Optional(OAttr('apiuser')), - description=Optional(''), copy_permissions=Optional(False), - private=Optional(False), landing_rev=Optional('rev:tip')): + owner=None, + description='', copy_permissions=False, + private=False, landing_rev='rev:tip'): """ Creates a fork of given repo. In case of using celery this will immediately return success message, while fork is going to be created @@ -1413,7 +1405,7 @@ class ApiController(JSONRPCController): if HasPermissionAny('hg.admin')(): pass elif HasRepoPermissionLevel('read')(repo.repo_name): - if not isinstance(owner, Optional): + if owner is not None: # forbid setting owner for non-admins raise JSONRPCError( 'Only Kallithea admin can specify `owner` param' @@ -1424,7 +1416,7 @@ class ApiController(JSONRPCController): else: raise JSONRPCError('repository `%s` does not exist' % (repoid,)) - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) @@ -1443,10 +1435,10 @@ class ApiController(JSONRPCController): repo_name_full=fork_name, repo_group=repo_group, repo_type=repo.repo_type, - description=Optional.extract(description), - private=Optional.extract(private), - copy_permissions=Optional.extract(copy_permissions), - landing_rev=Optional.extract(landing_rev), + description=description, + private=private, + copy_permissions=copy_permissions, + landing_rev=landing_rev, update_after_clone=False, fork_parent_id=repo.repo_id, ) @@ -1468,7 +1460,7 @@ class ApiController(JSONRPCController): ) # permission check inside - def delete_repo(self, repoid, forks=Optional('')): + def delete_repo(self, repoid, forks=''): """ Deletes a repository. This command can be executed only using api_key belonging to user with admin rights or regular user that have admin access to repository. @@ -1497,7 +1489,7 @@ class ApiController(JSONRPCController): raise JSONRPCError('repository `%s` does not exist' % (repoid,)) try: - handle_forks = Optional.extract(forks) + handle_forks = forks _forks_msg = '' _forks = [f for f in repo.forks] if handle_forks == 'detach': @@ -1768,10 +1760,10 @@ class ApiController(JSONRPCController): ] @HasPermissionAnyDecorator('hg.admin') - def create_repo_group(self, group_name, description=Optional(''), - owner=Optional(OAttr('apiuser')), - parent=Optional(None), - copy_permissions=Optional(False)): + def create_repo_group(self, group_name, description='', + owner=None, + parent=None, + copy_permissions=False): """ Creates a repository group. This command can be executed only using api_key belonging to user with admin rights. @@ -1808,14 +1800,13 @@ class ApiController(JSONRPCController): if RepoGroup.get_by_group_name(group_name): raise JSONRPCError("repo group `%s` already exist" % (group_name,)) - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id - group_description = Optional.extract(description) - parent_group = Optional.extract(parent) - if not isinstance(parent, Optional): - parent_group = get_repo_group_or_error(parent_group) + group_description = description + parent_group = None + if parent is not None: + parent_group = get_repo_group_or_error(parent) - copy_permissions = Optional.extract(copy_permissions) try: repo_group = RepoGroupModel().create( group_name=group_name, @@ -1835,10 +1826,10 @@ class ApiController(JSONRPCController): raise JSONRPCError('failed to create repo group `%s`' % (group_name,)) @HasPermissionAnyDecorator('hg.admin') - def update_repo_group(self, repogroupid, group_name=Optional(''), - description=Optional(''), - owner=Optional(OAttr('apiuser')), - parent=Optional(None)): + def update_repo_group(self, repogroupid, group_name=None, + description=None, + owner=None, + parent=None): repo_group = get_repo_group_or_error(repogroupid) updates = {} @@ -1902,7 +1893,7 @@ class ApiController(JSONRPCController): # permission check inside def grant_user_permission_to_repo_group(self, repogroupid, userid, - perm, apply_to_children=Optional('none')): + perm, apply_to_children='none'): """ Grant permission for user on given repository group, or update existing one if found. This command can be executed only using api_key belonging @@ -1944,7 +1935,6 @@ class ApiController(JSONRPCController): user = get_user_or_error(userid) perm = get_perm_or_error(perm, prefix='group.') - apply_to_children = Optional.extract(apply_to_children) try: RepoGroupModel().add_permission(repo_group=repo_group, @@ -1967,7 +1957,7 @@ class ApiController(JSONRPCController): # permission check inside def revoke_user_permission_from_repo_group(self, repogroupid, userid, - apply_to_children=Optional('none')): + apply_to_children='none'): """ Revoke permission for user on given repository group. This command can be executed only using api_key belonging to user with admin rights, or @@ -2006,7 +1996,6 @@ class ApiController(JSONRPCController): raise JSONRPCError('repository group `%s` does not exist' % (repogroupid,)) user = get_user_or_error(userid) - apply_to_children = Optional.extract(apply_to_children) try: RepoGroupModel().delete_permission(repo_group=repo_group, @@ -2030,7 +2019,7 @@ class ApiController(JSONRPCController): # permission check inside def grant_user_group_permission_to_repo_group( self, repogroupid, usergroupid, perm, - apply_to_children=Optional('none')): + apply_to_children='none'): """ Grant permission for user group on given repository group, or update existing one if found. This command can be executed only using @@ -2077,8 +2066,6 @@ class ApiController(JSONRPCController): raise JSONRPCError( 'user group `%s` does not exist' % (usergroupid,)) - apply_to_children = Optional.extract(apply_to_children) - try: RepoGroupModel().add_permission(repo_group=repo_group, obj=user_group, @@ -2105,7 +2092,7 @@ class ApiController(JSONRPCController): # permission check inside def revoke_user_group_permission_from_repo_group( self, repogroupid, usergroupid, - apply_to_children=Optional('none')): + apply_to_children='none'): """ Revoke permission for user group on given repository. This command can be executed only using api_key belonging to user with admin rights, or @@ -2147,8 +2134,6 @@ class ApiController(JSONRPCController): raise JSONRPCError( 'user group `%s` does not exist' % (usergroupid,)) - apply_to_children = Optional.extract(apply_to_children) - try: RepoGroupModel().delete_permission(repo_group=repo_group, obj=user_group, @@ -2182,7 +2167,7 @@ class ApiController(JSONRPCController): raise JSONRPCError('gist `%s` does not exist' % (gistid,)) return gist.get_api_data() - def get_gists(self, userid=Optional(OAttr('apiuser'))): + def get_gists(self, userid=None): """ Get all gists for given user. If userid is empty returned gists are for user who called the api @@ -2193,12 +2178,12 @@ class ApiController(JSONRPCController): if not HasPermissionAny('hg.admin')(): # make sure normal user does not pass someone else userid, # he is not allowed to do that - if not isinstance(userid, Optional) and userid != request.authuser.user_id: + if userid is not None and userid != request.authuser.user_id: raise JSONRPCError( 'userid is not the same as your user' ) - if isinstance(userid, Optional): + if userid is None: user_id = request.authuser.user_id else: user_id = get_user_or_error(userid).user_id @@ -2211,9 +2196,9 @@ class ApiController(JSONRPCController): .order_by(Gist.created_on.desc()) ] - def create_gist(self, files, owner=Optional(OAttr('apiuser')), - gist_type=Optional(Gist.GIST_PUBLIC), lifetime=Optional(-1), - description=Optional('')): + def create_gist(self, files, owner=None, + gist_type=Gist.GIST_PUBLIC, lifetime=-1, + description=''): """ Creates new Gist @@ -2250,13 +2235,10 @@ class ApiController(JSONRPCController): """ try: - if isinstance(owner, Optional): + if owner is None: owner = request.authuser.user_id owner = get_user_or_error(owner) - description = Optional.extract(description) - gist_type = Optional.extract(gist_type) - lifetime = Optional.extract(lifetime) gist = GistModel().create(description=description, owner=owner, @@ -2273,12 +2255,6 @@ class ApiController(JSONRPCController): log.error(traceback.format_exc()) raise JSONRPCError('failed to create gist') - # def update_gist(self, gistid, files, owner=Optional(OAttr('apiuser')), - # gist_type=Optional(Gist.GIST_PUBLIC), - # gist_lifetime=Optional(-1), gist_description=Optional('')): - # gist = get_gist_or_error(gistid) - # updates = {} - # permission check inside def delete_gist(self, gistid): """ @@ -2342,7 +2318,7 @@ class ApiController(JSONRPCController): raise JSONRPCError('Repository is empty') # permission check inside - def get_changeset(self, repoid, raw_id, with_reviews=Optional(False)): + def get_changeset(self, repoid, raw_id, with_reviews=False): repo = get_repo_or_error(repoid) if not HasRepoPermissionLevel('read')(repo.repo_name): raise JSONRPCError('Access denied to repo %s' % repo.repo_name) @@ -2352,7 +2328,6 @@ class ApiController(JSONRPCController): info = dict(changeset.as_dict()) - with_reviews = Optional.extract(with_reviews) if with_reviews: reviews = ChangesetStatusModel().get_statuses( repo.repo_name, raw_id)