.. _api:
API
===
Starting from RhodeCode version 1.2 a simple API was implemented.
There's a single schema for calling all api methods. API is implemented
with JSON protocol both ways. An url to send API request in RhodeCode is
<your_server>/_admin/api
API ACCESS FOR WEB VIEWS
++++++++++++++++++++++++
API access can also be turned on for each web view in RhodeCode that is
decorated with `@LoginRequired` decorator. To enable API access simple change
the standard login decorator to `@LoginRequired(api_access=True)`.
After this change, a rhodecode view can be accessed without login by adding a
GET parameter `?api_key=<api_key>` to url. By default this is only
enabled on RSS/ATOM feed views.
API ACCESS
++++++++++
All clients are required to send JSON-RPC spec JSON data::
{
"id:<id>,
"api_key":"<api_key>",
"method":"<method_name>",
"args":{"<arg_key>":"<arg_val>"}
}
Example call for autopulling remotes repos using curl::
curl https://server.com/_admin/api -X POST -H 'content-type:text/plain' --data-binary '{"id":1,"api_key":"xe7cdb2v278e4evbdf5vs04v832v0efvcbcve4a3","method":"pull","args":{"repo":"CPython"}}'
Simply provide
- *id* A value of any type, which is used to match the response with the request that it is replying to.
- *api_key* for access and permission validation.
- *method* is name of method to call
- *args* is an key:value list of arguments to pass to method
.. note::
api_key can be found in your user account page
RhodeCode API will return always a JSON-RPC response::
"id":<id>,
"result": "<result>",
"error": null
All responses from API will be `HTTP/1.0 200 OK`, if there's an error while
calling api *error* key from response will contain failure description
and result will be null.
API METHODS
+++++++++++
pull
----
Pulls given repo from remote location. Can be used to automatically keep
remote repos up to date. This command can be executed only using api_key
belonging to user with admin rights
INPUT::
api_key : "<api_key>"
method : "pull"
args : {
"repo_name" : "<reponame>"
OUTPUT::
result : "Pulled from <reponame>"
error : null
get_user
--------
Get's an user by username, Returns empty result if user is not found.
This command can be executed only using api_key belonging to user with admin
rights.
method : "get_user"
"username" : "<username>"
result: None if user does not exist or
"id" : "<id>",
"username" : "<username>",
"firstname": "<firstname>",
"lastname" : "<lastname>",
"email" : "<email>",
"active" : "<bool>",
"admin" :Â "<bool>",
"ldap" : "<ldap_dn>"
error: null
get_users
---------
Lists all existing users. This command can be executed only using api_key
belonging to user with admin rights.
method : "get_users"
args : { }
result: [
},
…
]
create_user
-----------
Creates new user. This command can
be executed only using api_key belonging to user with admin rights.
method : "create_user"
"password" : "<password>",
"email" : "<useremail>",
"firstname" : "<firstname> = None",
"lastname" : "<lastname> = None",
"active" : "<bool> = True",
"admin" : "<bool> = False",
"ldap_dn" : "<ldap_dn> = None"
result: {
"id" : "<new_user_id>",
"msg" : "created new user <username>"
update_user
updates current one if such user exists. This command can
method : "update_user"
"id" : "<edited_user_id>",
"msg" : "updated user <username>"
get_users_group
---------------
Gets an existing users group. This command can be executed only using api_key
method : "get_users_group"
"group_name" : "<name>"
result : None if group not exist
"group_name" : "<groupname>",
"active": "<bool>",
"members" : [
{ "id" : "<userid>",
get_users_groups
----------------
Lists all existing users groups. This command can be executed only using
api_key belonging to user with admin rights.
method : "get_users_groups"
result : [
"id" : "<userid>",
create_users_group
------------------
Creates new users group. This command can be executed only using api_key
method : "create_users_group"
args: {
"group_name": "<groupname>",
"active":"<bool> = True"
"id": "<newusersgroupid>",
"msg": "created new users group <groupname>"
add_user_to_users_group
-----------------------
Adds a user to a users group. If user exists in that group success will be
`false`. This command can be executed only using api_key
method : "add_user_users_group"
"id": "<newusersgroupmemberid>",
"success": True|False # depends on if member is in group
"msg": "added member <username> to users group <groupname> |
User is already in that group"
remove_user_from_users_group
----------------------------
Removes a user from a users group. If user is not in given group success will
be `false`. This command can be executed only
using api_key belonging to user with admin rights
method : "remove_user_from_users_group"
"success": True|False, # depends on if member is in group
"msg": "removed member <username> from users group <groupname> |
User wasn't in group"
get_repo
Gets an existing repository. This command can be executed only using api_key
method : "get_repo"
result: None if repository does not exist or
"type" : "<type>",
"description" : "<description>",
"ldap" : "<ldap_dn>",
"permission" : "repository.(read|write|admin)"
"id" : "<usersgroupid>",
"name" : "<usersgroupname>",
get_repos
Lists all existing repositories. This command can be executed only using api_key
method : "get_repos"
args: { }
"description" : "<description>"
get_repo_nodes
--------------
returns a list of nodes and it's children in a flat list for a given path
at given revision. It's possible to specify ret_type to show only `files` or
`dirs`. This command can be executed only using api_key belonging to user
with admin rights
method : "get_repo_nodes"
"repo_name" : "<reponame>",
"revision" : "<revision>",
"root_path" : "<root_path>",
"ret_type" : "<ret_type>" = 'all'
"name" : "<name>"
create_repo
Creates a repository. This command can be executed only using api_key
If repository name contains "/", all needed repository groups will be created.
For example "foo/bar/baz" will create groups "foo", "bar" (with "foo" as parent),
and create "baz" repository with "bar" as group.
method : "create_repo"
"owner_name" : "<ownername>",
"description" : "<description> = ''",
"repo_type" : "<type> = 'hg'",
"private" : "<bool> = False"
"id": "<newrepoid>",
"msg": "Created new repository <reponame>",
delete_repo
Deletes a repository. This command can be executed only using api_key
method : "delete_repo"
"msg": "Deleted repository <reponame>",
grant_user_permission
---------------------
Grant permission for user on given repository, or update existing one
if found. This command can be executed only using api_key belonging to user
with admin rights.
method : "grant_user_permission"
"perm" : "(repository.(none|read|write|admin))",
"msg" : "Granted perm: <perm> for user: <username> in repo: <reponame>"
revoke_user_permission
----------------------
Revoke permission for user on given repository. This command can be executed
only using api_key belonging to user with admin rights.
method : "revoke_user_permission"
"msg" : "Revoked perm for user: <suername> in repo: <reponame>"
grant_users_group_permission
Grant permission for users group on given repository, or update
existing one if found. This command can be executed only using
method : "grant_users_group_permission"
"group_name" : "<usersgroupname>",
"msg" : "Granted perm: <perm> for group: <usersgroupname> in repo: <reponame>"
revoke_users_group_permission
-----------------------------
Revoke permission for users group on given repository.This command can be
executed only using api_key belonging to user with admin rights.
method : "revoke_users_group_permission"
"users_group" : "<usersgroupname>",
"msg" : "Revoked perm for group: <usersgroupname> in repo: <reponame>"
\ No newline at end of file
# -*- coding: utf-8 -*-
"""
rhodecode.controllers.api
~~~~~~~~~~~~~~~~~~~~~~~~~
API controller for RhodeCode
:created_on: Aug 20, 2011
:author: marcink
:copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
import traceback
import logging
from sqlalchemy.orm.exc import NoResultFound
from rhodecode.controllers.api import JSONRPCController, JSONRPCError
from rhodecode.lib.auth import HasPermissionAllDecorator, \
HasPermissionAnyDecorator
from rhodecode.model.meta import Session
from rhodecode.model.scm import ScmModel
from rhodecode.model.db import User, UsersGroup, RepoGroup, Repository
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
from rhodecode.model.repo_permission import RepositoryPermissionModel
from rhodecode.model.users_group import UsersGroupModel
from rhodecode.model.repos_group import ReposGroupModel
log = logging.getLogger(__name__)
class ApiController(JSONRPCController):
API Controller
Each method needs to have USER as argument this is then based on given
API_KEY propagated as instance of user object
Preferably this should be first argument also
Each function should also **raise** JSONRPCError for any
errors that happens
@HasPermissionAllDecorator('hg.admin')
def pull(self, apiuser, repo_name):
Dispatch pull action on given repo
:param user:
:param repo_name:
if Repository.is_valid(repo_name) is False:
raise JSONRPCError('Unknown repo "%s"' % repo_name)
try:
ScmModel().pull_changes(repo_name, self.rhodecode_user.username)
return 'Pulled from %s' % repo_name
except Exception:
raise JSONRPCError('Unable to pull changes from "%s"' % repo_name)
def get_user(self, apiuser, username):
""""
Get a user by username
:param apiuser:
:param username:
user = User.get_by_username(username)
if user is None:
return user
return dict(
id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
active=user.active,
admin=user.admin,
ldap=user.ldap_dn
)
def get_users(self, apiuser):
Get all users
result = []
for user in User.getAll():
result.append(
dict(
return result
def create_user(self, apiuser, username, password, email, firstname=None,
lastname=None, active=True, admin=False, ldap_dn=None):
Create new user
:param password:
:param email:
:param name:
:param lastname:
:param active:
:param admin:
:param ldap_dn:
if User.get_by_username(username):
raise JSONRPCError("user %s already exist" % username)
usr = UserModel().create_or_update(
username, password, email, firstname,
lastname, active, admin, ldap_dn
Session.commit()
id=usr.user_id,
msg='created new user %s' % username
log.error(traceback.format_exc())
raise JSONRPCError('failed to create user %s' % username)
def update_user(self, apiuser, username, password, email, firstname=None,
Updates given user
if not User.get_by_username(username):
raise JSONRPCError("user %s does not exist" % username)
msg='updated user %s' % username
raise JSONRPCError('failed to update user %s' % username)
def get_users_group(self, apiuser, group_name):
Get users group by name
:param group_name:
users_group = UsersGroup.get_by_group_name(group_name)
if not users_group:
return None
members = []
for user in users_group.members:
user = user.user
members.append(dict(id=user.user_id,
ldap=user.ldap_dn))
return dict(id=users_group.users_group_id,
group_name=users_group.users_group_name,
active=users_group.users_group_active,
members=members)
def get_users_groups(self, apiuser):
Get all users groups
for users_group in UsersGroup.getAll():
result.append(dict(id=users_group.users_group_id,
members=members))
def create_users_group(self, apiuser, group_name, active=True):
Creates an new usergroup
if self.get_users_group(apiuser, group_name):
raise JSONRPCError("users group %s already exist" % group_name)
ug = UsersGroupModel().create(name=group_name, active=active)
return dict(id=ug.users_group_id,
msg='created new users group %s' % group_name)
raise JSONRPCError('failed to create group %s' % group_name)
def add_user_to_users_group(self, apiuser, group_name, username):
Add a user to a group
raise JSONRPCError('unknown users group %s' % group_name)
raise JSONRPCError('unknown user %s' % username)
ugm = UsersGroupModel().add_user_to_group(users_group, user)
success = True if ugm != True else False
msg = 'added member %s to users group %s' % (username, group_name)
msg = msg if success else 'User is already in that group'
id=ugm.users_group_member_id if ugm != True else None,
success=success,
msg=msg
raise JSONRPCError('failed to add users group member')
def remove_user_from_users_group(self, apiuser, group_name, username):
Remove user from a group
:param apiuser
:param group_name
:param username
success = UsersGroupModel().remove_user_from_group(users_group, user)
msg = 'removed member %s from users group %s' % (username, group_name)
msg = msg if success else "User wasn't in group"
return dict(success=success, msg=msg)
raise JSONRPCError('failed to remove user from group')
@HasPermissionAnyDecorator('hg.admin')
def get_repo(self, apiuser, repo_name):
Get repository by name
repo = Repository.get_by_repo_name(repo_name)
if repo is None:
raise JSONRPCError('unknown repository %s' % repo)
for user in repo.repo_to_perm:
perm = user.permission.permission_name
members.append(
type_="user",
ldap=user.ldap_dn,
permission=perm
for users_group in repo.users_group_to_perm:
perm = users_group.permission.permission_name
users_group = users_group.users_group
type_="users_group",
id=users_group.users_group_id,
name=users_group.users_group_name,
id=repo.repo_id,
repo_name=repo.repo_name,
type=repo.repo_type,
description=repo.description,
members=members
def get_repos(self, apiuser):
Get all repositories
for repository in Repository.getAll():
id=repository.repo_id,
repo_name=repository.repo_name,
type=repository.repo_type,
description=repository.description
def get_repo_nodes(self, apiuser, repo_name, revision, root_path,
ret_type='all'):
returns a list of nodes and it's children
for a given path at given revision. It's possible to specify ret_type
to show only files or dirs
:param repo_name: name of repository
:param revision: revision for which listing should be done
:param root_path: path from which start displaying
:param ret_type: return type 'all|files|dirs' nodes
_d, _f = ScmModel().get_nodes(repo_name, revision, root_path,
flat=False)
_map = {
'all': _d + _f,
'files': _f,
'dirs': _d,
return _map[ret_type]
except KeyError:
raise JSONRPCError('ret_type must be one of %s' % _map.keys())
except Exception, e:
raise JSONRPCError(e)
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def create_repo(self, apiuser, repo_name, owner_name, description='',
repo_type='hg', private=False):
Create a repository
:param description:
:param type:
:param private:
:param owner_name:
owner = User.get_by_username(owner_name)
if owner is None:
raise JSONRPCError('unknown user %s' % owner_name)
if Repository.get_by_repo_name(repo_name):
raise JSONRPCError("repo %s already exist" % repo_name)
groups = repo_name.split('/')
real_name = groups[-1]
groups = groups[:-1]
parent_id = None
for g in groups:
group = RepoGroup.get_by_group_name(g)
if not group:
group = ReposGroupModel().create(g, '', parent_id)
parent_id = group.group_id
repo = RepoModel().create(
repo_name=real_name,
repo_name_full=repo_name,
description=description,
private=private,
repo_type=repo_type,
repo_group=parent_id,
clone_uri=None
),
owner
msg="Created new repository %s" % repo.repo_name
raise JSONRPCError('failed to create repository %s' % repo_name)
def delete_repo(self, apiuser, repo_name):
Deletes a given repository
if not Repository.get_by_repo_name(repo_name):
raise JSONRPCError("repo %s does not exist" % repo_name)
RepoModel().delete(repo_name)
msg='Deleted repository %s' % repo_name
raise JSONRPCError('failed to delete repository %s' % repo_name)
def grant_user_permission(self, repo_name, username, perm):
if found
:param perm:
RepoModel().grant_user_permission(repo=repo, user=user, perm=perm)
msg='Granted perm: %s for user: %s in repo: %s' % (
perm, username, repo_name
raise JSONRPCError(
'failed to edit permission %(repo)s for %(user)s' % dict(
user=username, repo=repo_name
def revoke_user_permission(self, repo_name, username):
Revoke permission for user on given repository
RepoModel().revoke_user_permission(repo=repo_name, user=username)
msg='Revoked perm for user: %s in repo: %s' % (
username, repo_name
def grant_users_group_permission(self, repo_name, group_name, perm):
existing one if found
user_group = UsersGroup.get_by_group_name(group_name)
if user_group is None:
raise JSONRPCError('unknown users group %s' % user_group)
RepoModel().grant_users_group_permission(repo=repo_name,
group_name=group_name,
perm=perm)
msg='Granted perm: %s for group: %s in repo: %s' % (
perm, group_name, repo_name
'failed to edit permission %(repo)s for %(usersgr)s' % dict(
usersgr=group_name, repo=repo_name
def revoke_users_group_permission(self, repo_name, group_name):
Revoke permission for users group on given repository
RepoModel().revoke_users_group_permission(repo=repo_name,
group_name=group_name)
msg='Revoked perm for group: %s in repo: %s' % (
group_name, repo_name
rhodecode.model.repo
~~~~~~~~~~~~~~~~~~~~
Repository model for rhodecode
:created_on: Jun 5, 2010
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import shutil
from datetime import datetime
from vcs.backends import get_backend
from rhodecode.lib import LazyProperty
from rhodecode.lib import safe_str, safe_unicode
from rhodecode.lib.caching_query import FromCache
from rhodecode.lib.hooks import log_create_repository
from rhodecode.model import BaseModel
from rhodecode.model.db import Repository, UserRepoToPerm, User, Permission, \
Statistics, UsersGroup, UsersGroupRepoToPerm, RhodeCodeUi, RepoGroup
class RepoModel(BaseModel):
def __get_user(self, user):
return self._get_instance(User, user, callback=User.get_by_username)
def __get_users_group(self, users_group):
return self._get_instance(UsersGroup, users_group,
callback=UsersGroup.get_by_group_name)
def __get_repos_group(self, repos_group):
return self._get_instance(RepoGroup, repos_group,
callback=RepoGroup.get_by_group_name)
def __get_repo(self, repository):
return self._get_instance(Repository, repository,
callback=Repository.get_by_repo_name)
def __get_perm(self, permission):
return self._get_instance(Permission, permission,
callback=Permission.get_by_key)
@LazyProperty
def repos_path(self):
Get's the repositories root path from database
q = self.sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
return q.ui_value
def get(self, repo_id, cache=False):
repo = self.sa.query(Repository)\
.filter(Repository.repo_id == repo_id)
if cache:
repo = repo.options(FromCache("sql_cache_short",
"get_repo_%s" % repo_id))
return repo.scalar()
def get_by_repo_name(self, repo_name, cache=False):
.filter(Repository.repo_name == repo_name)
"get_repo_%s" % repo_name))
def get_users_js(self):
users = self.sa.query(User).filter(User.active == True).all()
u_tmpl = '''{id:%s, fname:"%s", lname:"%s", nname:"%s"},'''
users_array = '[%s]' % '\n'.join([u_tmpl % (u.user_id, u.name,
u.lastname, u.username)
for u in users])
return users_array
def get_users_groups_js(self):
users_groups = self.sa.query(UsersGroup)\
.filter(UsersGroup.users_group_active == True).all()
g_tmpl = '''{id:%s, grname:"%s",grmembers:"%s"},'''
users_groups_array = '[%s]' % '\n'.join([g_tmpl % \
(gr.users_group_id, gr.users_group_name,
len(gr.members))
for gr in users_groups])
return users_groups_array
def _get_defaults(self, repo_name):
Get's information about repository, and returns a dict for
usage in forms
repo_info = Repository.get_by_repo_name(repo_name)
if repo_info is None:
defaults = repo_info.get_dict()
group, repo_name = repo_info.groups_and_repo
defaults['repo_name'] = repo_name
defaults['repo_group'] = getattr(group[-1] if group else None,
'group_id', None)
# fill owner
if repo_info.user:
defaults.update({'user': repo_info.user.username})
else:
replacement_user = User.query().filter(User.admin ==
True).first().username
defaults.update({'user': replacement_user})
# fill repository users
for p in repo_info.repo_to_perm:
defaults.update({'u_perm_%s' % p.user.username:
p.permission.permission_name})
# fill repository groups
for p in repo_info.users_group_to_perm:
defaults.update({'g_perm_%s' % p.users_group.users_group_name:
return defaults
def update(self, repo_name, form_data):
cur_repo = self.get_by_repo_name(repo_name, cache=False)
# update permissions
for member, perm, member_type in form_data['perms_updates']:
if member_type == 'user':
# this updates existing one
RepoModel().grant_user_permission(
repo=cur_repo, user=member, perm=perm
RepoModel().grant_users_group_permission(
repo=cur_repo, group_name=member, perm=perm
# set new permissions
for member, perm, member_type in form_data['perms_new']:
# update current repo
for k, v in form_data.items():
if k == 'user':
cur_repo.user = User.get_by_username(v)
elif k == 'repo_name':
pass
elif k == 'repo_group':
cur_repo.group = RepoGroup.get(v)
setattr(cur_repo, k, v)
new_name = cur_repo.get_new_name(form_data['repo_name'])
cur_repo.repo_name = new_name
self.sa.add(cur_repo)
if repo_name != new_name:
# rename repository
self.__rename_repo(old=repo_name, new=new_name)
return cur_repo
except:
raise
def create(self, form_data, cur_user, just_db=False, fork=False):
if fork:
fork_parent_id = form_data['fork_parent_id']
# repo name is just a name of repository
# while repo_name_full is a full qualified name that is combined
# with name and path of group
repo_name = form_data['repo_name']
repo_name_full = form_data['repo_name_full']
new_repo = Repository()
new_repo.enable_statistics = False
if k == 'repo_name':
v = repo_name_full
if k == 'repo_group':
k = 'group_id'
if k == 'description':
v = v or repo_name
setattr(new_repo, k, v)
parent_repo = Repository.get(fork_parent_id)
new_repo.fork = parent_repo
new_repo.user_id = cur_user.user_id
self.sa.add(new_repo)
def _create_default_perms():
# create default permission
repo_to_perm = UserRepoToPerm()
default = 'repository.read'
for p in User.get_by_username('default').user_perms:
if p.permission.permission_name.startswith('repository.'):
default = p.permission.permission_name
break
default_perm = 'repository.none' if form_data['private'] else default
repo_to_perm.permission_id = self.sa.query(Permission)\
.filter(Permission.permission_name == default_perm)\
.one().permission_id
repo_to_perm.repository = new_repo
repo_to_perm.user_id = User.get_by_username('default').user_id
self.sa.add(repo_to_perm)
if form_data.get('copy_permissions'):
repo = Repository.get(fork_parent_id)
user_perms = UserRepoToPerm.query()\
.filter(UserRepoToPerm.repository == repo).all()
group_perms = UsersGroupRepoToPerm.query()\
.filter(UsersGroupRepoToPerm.repository == repo).all()
for perm in user_perms:
UserRepoToPerm.create(perm.user, new_repo,
perm.permission)
for perm in group_perms:
UsersGroupRepoToPerm.create(perm.users_group, new_repo,
_create_default_perms()
if not just_db:
self.__create_repo(repo_name, form_data['repo_type'],
form_data['repo_group'],
form_data['clone_uri'])
# now automatically start following this repository as owner
ScmModel(self.sa).toggle_following_repo(new_repo.repo_id,
cur_user.user_id)
log_create_repository(new_repo.get_dict(),
created_by=cur_user.username)
return new_repo
def create_fork(self, form_data, cur_user):
Simple wrapper into executing celery task for fork creation
:param form_data:
:param cur_user:
from rhodecode.lib.celerylib import tasks, run_task
run_task(tasks.create_repo_fork, form_data, cur_user)
def delete(self, repo):
repo = self.__get_repo(repo)
self.sa.delete(repo)
self.__delete_repo(repo)
def grant_user_permission(self, repo, user, perm):
:param repo: Instance of Repository, repository_id, or repository name
:param user: Instance of User, user_id or username
:param perm: Instance of Permission, or permission_name
user = self.__get_user(user)
permission = self.__get_perm(perm)
# check if we have that permission already
obj = self.sa.query(UserRepoToPerm)\
.filter(UserRepoToPerm.user == user)\
.filter(UserRepoToPerm.repository == repo)\
.scalar()
if obj is None:
# create new !
obj = UserRepoToPerm()
obj.repository = repo
obj.user = user
obj.permission = permission
self.sa.add(obj)
def revoke_user_permission(self, repo, user):
.one()
self.sa.delete(obj)
def grant_users_group_permission(self, repo, group_name, perm):
:param group_name: Instance of UserGroup, users_group_id,
or users group name
group_name = self.__get_users_group(group_name)
obj = self.sa.query(UsersGroupRepoToPerm)\
.filter(UsersGroupRepoToPerm.users_group == group_name)\
.filter(UsersGroupRepoToPerm.repository == repo)\
# create new
obj = UsersGroupRepoToPerm()
obj.users_group = group_name
def revoke_users_group_permission(self, repo, group_name):
def delete_stats(self, repo_name):
removes stats for given repo
obj = self.sa.query(Statistics)\
.filter(Statistics.repository ==
self.get_by_repo_name(repo_name))\
def __create_repo(self, repo_name, alias, new_parent_id, clone_uri=False):
makes repository on filesystem. It's group aware means it'll create
a repository within a group, and alter the paths accordingly of
group location
:param alias:
:param parent_id:
:param clone_uri:
from rhodecode.lib.utils import is_valid_repo, is_valid_repos_group
if new_parent_id:
paths = RepoGroup.get(new_parent_id)\
.full_path.split(RepoGroup.url_sep())
new_parent_path = os.sep.join(paths)
new_parent_path = ''
# we need to make it str for mercurial
repo_path = os.path.join(*map(lambda x: safe_str(x),
[self.repos_path, new_parent_path, repo_name]))
# check if this path is not a repository
if is_valid_repo(repo_path, self.repos_path):
raise Exception('This path %s is a valid repository' % repo_path)
# check if this path is a group
if is_valid_repos_group(repo_path, self.repos_path):
raise Exception('This path %s is a valid group' % repo_path)
log.info('creating repo %s in %s @ %s' % (
repo_name, safe_unicode(repo_path), clone_uri
backend = get_backend(alias)
backend(repo_path, create=True, src_url=clone_uri)
def __rename_repo(self, old, new):
renames repository on filesystem
:param old: old name
:param new: new name
log.info('renaming repo from %s to %s' % (old, new))
old_path = os.path.join(self.repos_path, old)
new_path = os.path.join(self.repos_path, new)
if os.path.isdir(new_path):
raise Exception(
'Was trying to rename to already existing dir %s' % new_path
shutil.move(old_path, new_path)
def __delete_repo(self, repo):
removes repo from filesystem, the removal is acctually made by
added rm__ prefix into dir, and rename internat .hg/.git dirs so this
repository is no longer valid for rhodecode, can be undeleted later on
by reverting the renames on this repository
:param repo: repo object
rm_path = os.path.join(self.repos_path, repo.repo_name)
log.info("Removing %s" % (rm_path))
# disable hg/git
alias = repo.repo_type
shutil.move(os.path.join(rm_path, '.%s' % alias),
os.path.join(rm_path, 'rm__.%s' % alias))
# disable repo
_d = 'rm__%s__%s' % (datetime.now().strftime('%Y%m%d_%H%M%S_%f'),
repo.repo_name)
shutil.move(rm_path, os.path.join(self.repos_path, _d))
Status change: