# -*- coding: utf-8 -*-
"""
rhodecode.controllers.admin.admin
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Controller for Admin panel of Rhodecode
:created_on: Apr 7, 2010
:author: marcink
:copyright: (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
#
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import logging
from pylons import request, tmpl_context as c
from rhodecode.lib.base import BaseController, render
from rhodecode.model.db import UserLog
from webhelpers.paginate import Page
from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator
log = logging.getLogger(__name__)
class AdminController(BaseController):
@LoginRequired()
def __before__(self):
super(AdminController, self).__before__()
@HasPermissionAllDecorator('hg.admin')
def index(self):
users_log = self.sa.query(UserLog).order_by(UserLog.action_date.desc())
p = int(request.params.get('page', 1))
c.users_log = Page(users_log, page=p, items_per_page=10)
c.log_data = render('admin/admin_log.html')
if request.params.get('partial'):
return c.log_data
return render('admin/admin.html')
rhodecode.controllers.admin.ldap_settings
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
ldap controller for RhodeCode
:created_on: Nov 26, 2010
import formencode
import traceback
from formencode import htmlfill
from pylons import request, response, session, tmpl_context as c, url
from pylons.controllers.util import abort, redirect
from pylons.i18n.translation import _
from rhodecode.lib import helpers as h
from rhodecode.lib.auth_ldap import LdapImportError
from rhodecode.model.settings import SettingsModel
from rhodecode.model.forms import LdapSettingsForm
from sqlalchemy.exc import DatabaseError
class LdapSettingsController(BaseController):
c.admin_user = session.get('admin_user')
c.admin_username = session.get('admin_username')
super(LdapSettingsController, self).__before__()
defaults = SettingsModel().get_ldap_settings()
return htmlfill.render(
render('admin/ldap/ldap.html'),
defaults=defaults,
encoding="UTF-8",
force_defaults=True,)
def ldap_settings(self):
"""POST ldap create and store ldap settings"""
settings_model = SettingsModel()
post_data = dict(request.POST)
_form = LdapSettingsForm(post_data.get('ldap_active'))()
try:
form_result = _form.to_python(post_data)
for k, v in form_result.items():
if k.startswith('ldap_'):
setting = settings_model.get(k)
setting.app_settings_value = v
self.sa.add(setting)
self.sa.commit()
h.flash(_('Ldap settings updated successfully'),
category='success')
except (DatabaseError,):
raise
except LdapImportError:
h.flash(_('Unable to activate ldap. The "python-ldap" library '
'is missing.'), category='warning')
except formencode.Invalid, errors:
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8")
except Exception:
log.error(traceback.format_exc())
h.flash(_('error occurred during update of ldap settings'),
category='error')
return redirect(url('ldap_home'))
rhodecode.controllers.admin.permissions
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
permissions controller for Rhodecode
:created_on: Apr 27, 2010
from pylons import request, session, tmpl_context as c, url
from rhodecode.model.forms import LdapSettingsForm, DefaultPermissionsForm
from rhodecode.model.permission import PermissionModel
from rhodecode.model.user import UserModel
class PermissionsController(BaseController):
"""REST Controller styled on the Atom Publishing Protocol"""
# To properly map this controller, ensure your config/routing.py
# file has a resource setup:
# map.resource('permission', 'permissions')
super(PermissionsController, self).__before__()
self.perms_choices = [('repository.none', _('None'),),
('repository.read', _('Read'),),
('repository.write', _('Write'),),
('repository.admin', _('Admin'),)]
self.register_choices = [
('hg.register.none',
_('disabled')),
('hg.register.manual_activate',
_('allowed with manual account activation')),
('hg.register.auto_activate',
_('allowed with automatic account activation')), ]
self.create_choices = [('hg.create.none', _('Disabled')),
('hg.create.repository', _('Enabled'))]
def index(self, format='html'):
"""GET /permissions: All items in the collection"""
# url('permissions')
def create(self):
"""POST /permissions: Create a new item"""
def new(self, format='html'):
"""GET /permissions/new: Form to create a new item"""
# url('new_permission')
def update(self, id):
"""PUT /permissions/id: Update an existing item"""
# Forms posted to this method should contain a hidden field:
# <input type="hidden" name="_method" value="PUT" />
# Or using helpers:
# h.form(url('permission', id=ID),
# method='put')
# url('permission', id=ID)
permission_model = PermissionModel()
_form = DefaultPermissionsForm([x[0] for x in self.perms_choices],
[x[0] for x in self.register_choices],
[x[0] for x in self.create_choices])()
form_result = _form.to_python(dict(request.POST))
form_result.update({'perm_user_name':id})
permission_model.update(form_result)
h.flash(_('Default permissions updated successfully'),
c.perms_choices = self.perms_choices
c.register_choices = self.register_choices
c.create_choices = self.create_choices
defaults = errors.value
render('admin/permissions/permissions.html'),
rhodecode.controllers.admin.repos
Admin controller for RhodeCode
:copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
from operator import itemgetter
from paste.httpexceptions import HTTPInternalServerError
from rhodecode.lib.auth import LoginRequired, HasPermissionAllDecorator, \
HasPermissionAnyDecorator
from rhodecode.lib.utils import invalidate_cache, action_logger
from rhodecode.model.db import User
from rhodecode.model.forms import RepoForm
from rhodecode.model.scm import ScmModel
from rhodecode.model.repo import RepoModel
class ReposController(BaseController):
# map.resource('repo', 'repos')
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
super(ReposController, self).__before__()
"""GET /repos: All items in the collection"""
# url('repos')
cached_repo_list = ScmModel().get_repos()
c.repos_list = sorted(cached_repo_list, key=itemgetter('name_sort'))
return render('admin/repos/repos.html')
"""POST /repos: Create a new item"""
repo_model = RepoModel()
_form = RepoForm()()
form_result = {}
repo_model.create(form_result, c.rhodecode_user)
h.flash(_('created repository %s') % form_result['repo_name'],
if request.POST.get('user_created'):
action_logger(self.rhodecode_user, 'user_created_repo',
form_result['repo_name'], '', self.sa)
else:
action_logger(self.rhodecode_user, 'admin_created_repo',
c.new_repo = errors.value['repo_name']
r = render('admin/repos/repo_add_create_repository.html')
r = render('admin/repos/repo_add.html')
r,
msg = _('error occurred during creation of repository %s') \
% form_result.get('repo_name')
h.flash(msg, category='error')
return redirect(url('home'))
return redirect(url('repos'))
"""GET /repos/new: Form to create a new item"""
new_repo = request.GET.get('repo', '')
c.new_repo = h.repo_name_slug(new_repo)
rhodecode.controllers.admin.settings
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
settings controller for rhodecode admin
:created_on: Jul 14, 2010
from pylons import request, session, tmpl_context as c, url, app_globals as g, \
config
HasPermissionAnyDecorator, NotAnonymous
from rhodecode.lib.celerylib import tasks, run_task
from rhodecode.lib.utils import repo2db_mapper, invalidate_cache, \
set_rhodecode_config
from rhodecode.model.db import RhodeCodeUi, Repository
from rhodecode.model.forms import UserForm, ApplicationSettingsForm, \
ApplicationUiSettingsForm
from sqlalchemy import func
class SettingsController(BaseController):
# map.resource('setting', 'settings', controller='admin/settings',
# path_prefix='/admin', name_prefix='admin_')
super(SettingsController, self).__before__()
"""GET /admin/settings: All items in the collection"""
# url('admin_settings')
defaults = SettingsModel().get_app_settings()
defaults.update(self.get_hg_ui_settings())
render('admin/settings/settings.html'),
force_defaults=False
)
"""POST /admin/settings: Create a new item"""
"""GET /admin/settings/new: Form to create a new item"""
# url('admin_new_setting')
def update(self, setting_id):
"""PUT /admin/settings/setting_id: Update an existing item"""
# h.form(url('admin_setting', setting_id=ID),
# url('admin_setting', setting_id=ID)
if setting_id == 'mapping':
rm_obsolete = request.POST.get('destroy', False)
log.debug('Rescanning directories with destroy=%s', rm_obsolete)
initial = ScmModel().repo_scan(g.paths[0][1], g.baseui)
for repo_name in initial.keys():
invalidate_cache('get_repo_cached_%s' % repo_name)
repo2db_mapper(initial, rm_obsolete)
h.flash(_('Repositories successfully rescanned'), category='success')
if setting_id == 'whoosh':
repo_location = self.get_hg_ui_settings()['paths_root_path']
full_index = request.POST.get('full_index', False)
task = run_task(tasks.whoosh_index, repo_location, full_index)
h.flash(_('Whoosh reindex task scheduled'), category='success')
if setting_id == 'global':
application_form = ApplicationSettingsForm()()
rhodecode.controllers.admin.users
Users crud controller for pylons
:created_on: Apr 4, 2010
from rhodecode.lib.exceptions import *
from rhodecode.model.forms import UserForm
class UsersController(BaseController):
# map.resource('user', 'users')
super(UsersController, self).__before__()
"""GET /users: All items in the collection"""
# url('users')
c.users_list = self.sa.query(User).all()
return render('admin/users/users.html')
"""POST /users: Create a new item"""
user_model = UserModel()
login_form = UserForm()()
form_result = login_form.to_python(dict(request.POST))
user_model.create(form_result)
h.flash(_('created user %s') % form_result['username'],
#action_logger(self.rhodecode_user, 'new_user', '', '', self.sa)
render('admin/users/user_add.html'),
h.flash(_('error occurred during creation of user %s') \
% request.POST.get('username'), category='error')
return redirect(url('users'))
"""GET /users/new: Form to create a new item"""
# url('new_user')
return render('admin/users/user_add.html')
"""PUT /users/id: Update an existing item"""
# h.form(url('user', id=ID),
# url('user', id=ID)
c.user = user_model.get(id)
_form = UserForm(edit=True, old_data={'user_id':id,
'email':c.user.email})()
user_model.update(id, form_result)
h.flash(_('User updated succesfully'), category='success')
render('admin/users/user_edit.html'),
rhodecode.controllers.branches
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
branches controller for rhodecode
:created_on: Apr 21, 2010
from pylons import tmpl_context as c
from rhodecode.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
from rhodecode.lib.utils import OrderedDict
class BranchesController(BaseController):
@HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
'repository.admin')
super(BranchesController, self).__before__()
hg_model = ScmModel()
c.repo_info = hg_model.get_repo(c.repo_name)
c.repo_branches = OrderedDict()
for name, hash_ in c.repo_info.branches.items():
c.repo_branches[name] = c.repo_info.get_changeset(hash_)
return render('branches/branches.html')
rhodecode.controllers.changelog
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
changelog controller for rhodecode
import json
except ImportError:
#python 2.5 compatibility
import simplejson as json
from mercurial.graphmod import colored, CHANGESET, revisions as graph_rev
from pylons import request, session, tmpl_context as c
class ChangelogController(BaseController):
super(ChangelogController, self).__before__()
limit = 100
default = 20
if request.params.get('size'):
int_size = int(request.params.get('size'))
except ValueError:
int_size = default
int_size = int_size if int_size <= limit else limit
c.size = int_size
session['changelog_size'] = c.size
session.save()
c.size = int(session.get('changelog_size', default))
changesets = ScmModel().get_repo(c.repo_name)
c.total_cs = len(changesets)
c.pagination = Page(changesets, page=p, item_count=c.total_cs,
items_per_page=c.size)
self._graph(changesets, c.size, p)
return render('changelog/changelog.html')
def _graph(self, repo, size, p):
revcount = size
if not repo.revisions or repo.alias == 'git':
c.jsdata = json.dumps([])
return
max_rev = repo.revisions[-1]
offset = 1 if p == 1 else ((p - 1) * revcount + 1)
rev_start = repo.revisions[(-1 * offset)]
revcount = min(max_rev, revcount)
rev_end = max(0, rev_start - revcount)
dag = graph_rev(repo.repo, rev_start, rev_end)
c.dag = tree = list(colored(dag))
data = []
for (id, type, ctx, vtx, edges) in tree:
if type != CHANGESET:
continue
data.append(('', vtx, edges))
c.jsdata = json.dumps(data)
rhodecode.controllers.changeset
changeset controller for pylons
:created_on: Apr 25, 2010
from pylons import tmpl_context as c, url, request, response
from pylons.controllers.util import redirect
import rhodecode.lib.helpers as h
from rhodecode.lib.utils import EmptyChangeset
from vcs.exceptions import RepositoryError, ChangesetError
from vcs.nodes import FileNode
from vcs.utils import diffs as differ
class ChangesetController(BaseController):
super(ChangesetController, self).__before__()
def index(self, revision):
def wrap_to_table(str):
return '''<table class="code-difftable">
<tr class="line">
<td class="lineno new"></td>
<td class="code"><pre>%s</pre></td>
</tr>
</table>''' % str
c.changeset = hg_model.get_repo(c.repo_name).get_changeset(revision)
except RepositoryError, e:
h.flash(str(e), category='warning')
c.changeset_old = c.changeset.parents[0]
except IndexError:
c.changeset_old = None
c.changes = []
#===================================================================
# ADDED FILES
c.sum_added = 0
for node in c.changeset.added:
filenode_old = FileNode(node.path, '', EmptyChangeset())
if filenode_old.is_binary or node.is_binary:
diff = wrap_to_table(_('binary file'))
c.sum_added += node.size
if c.sum_added < self.cut_off_limit:
f_udiff = differ.get_udiff(filenode_old, node)
diff = differ.DiffProcessor(f_udiff).as_html()
diff = wrap_to_table(_('Changeset is to big and was cut'
' off, see raw changeset instead'))
cs1 = None
cs2 = node.last_changeset.raw_id
c.changes.append(('added', node, diff, cs1, cs2))
# CHANGED FILES
c.sum_removed = 0
for node in c.changeset.changed:
filenode_old = c.changeset_old.get_node(node.path)
except ChangesetError:
if c.sum_removed < self.cut_off_limit:
if diff:
c.sum_removed += len(diff)
package.rhodecode.controllers.error
~~~~~~~~~~~~~~
RhodeCode error controller
:created_on: Dec 8, 2010
import os
import cgi
import paste.fileapp
from pylons import tmpl_context as c, request, config
from pylons.middleware import media_path
class ErrorController(BaseController):
"""Generates error documents as and when they are required.
The ErrorDocuments middleware forwards to ErrorController when error
related status codes are returned from the application.
This behavior can be altered by changing the parameters to the
ErrorDocuments middleware in your config/middleware.py file.
c.rhodecode_name = config.get('rhodecode_title')
def document(self):
resp = request.environ.get('pylons.original_response')
log.debug('### %s ###', resp.status)
e = request.environ
c.serv_p = r'%(protocol)s://%(host)s/' % {
'protocol': e.get('wsgi.url_scheme'),
'host':e.get('HTTP_HOST'),
}
c.error_message = cgi.escape(request.GET.get('code', str(resp.status)))
c.error_explanation = self.get_error_explanation(resp.status_int)
#redirect to when error with given seconds
c.redirect_time = 0
c.redirect_module = _('Home page')# name to what your going to be redirected
c.url_redirect = "/"
return render('/errors/error_document.html')
def img(self, id):
"""Serve Pylons' stock images"""
return self._serve_file(os.path.join(media_path, 'img', id))
def style(self, id):
"""Serve Pylons' stock stylesheets"""
return self._serve_file(os.path.join(media_path, 'style', id))
def _serve_file(self, path):
"""Call Paste's FileApp (a WSGI application) to serve the file
at the specified path
fapp = paste.fileapp.FileApp(path)
return fapp(request.environ, self.start_response)
def get_error_explanation(self, code):
''' get the error explanations of int codes
[400, 401, 403, 404, 500]'''
code = int(code)
except:
code = 500
if code == 400:
return _('The request could not be understood by the server due to malformed syntax.')
if code == 401:
return _('Unauthorized access to resource')
if code == 403:
return _("You don't have permission to view this page")
if code == 404:
return _('The resource could not be found')
if code == 500:
return _('The server encountered an unexpected condition which prevented it from fulfilling the request.')
rhodecode.controllers.feed
~~~~~~~~~~~~~~~~~~~~~~~~~~
Feed controller for rhodecode
:created_on: Apr 23, 2010
from pylons import url, response
from rhodecode.lib.base import BaseController
from webhelpers.feedgenerator import Atom1Feed, Rss201rev2Feed
class FeedController(BaseController):
super(FeedController, self).__before__()
#common values for feeds
self.description = 'Changes on %s repository'
self.title = "%s feed"
self.language = 'en-us'
self.ttl = "5"
self.feed_nr = 10
def atom(self, repo_name):
"""Produce an atom-1.0 feed via feedgenerator module"""
feed = Atom1Feed(title=self.title % repo_name,
link=url('summary_home', repo_name=repo_name, qualified=True),
description=self.description % repo_name,
language=self.language,
ttl=self.ttl)
changesets = ScmModel().get_repo(repo_name)
for cs in changesets[:self.feed_nr]:
feed.add_item(title=cs.message,
link=url('changeset_home', repo_name=repo_name,
revision=cs.raw_id, qualified=True),
description=str(cs.date))
response.content_type = feed.mime_type
return feed.writeString('utf-8')
def rss(self, repo_name):
"""Produce an rss2 feed via feedgenerator module"""
feed = Rss201rev2Feed(title=self.title % repo_name,
rhodecode.controllers.files
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Files controller for RhodeCode
import tempfile
from mercurial import archival
from vcs.exceptions import RepositoryError, ChangesetError, \
ChangesetDoesNotExistError, EmptyRepositoryError
class FilesController(BaseController):
super(FilesController, self).__before__()
c.cut_off_limit = self.cut_off_limit
def __get_cs_or_redirect(self, rev, repo_name):
Safe way to get changeset if error occur it redirects to tip with
proper message
:param rev: revision to fetch
:param repo_name: repo name to redirect after
_repo = ScmModel().get_repo(c.repo_name)
return _repo.get_changeset(rev)
except EmptyRepositoryError, e:
h.flash(_('There are no files yet'), category='warning')
redirect(h.url('summary_home', repo_name=repo_name))
redirect(h.url('files_home', repo_name=repo_name, revision='tip'))
def index(self, repo_name, revision, f_path):
cs = self.__get_cs_or_redirect(revision, repo_name)
c.repo = ScmModel().get_repo(c.repo_name)
revision = request.POST.get('at_rev', None) or revision
def get_next_rev(cur):
max_rev = len(c.repo.revisions) - 1
r = cur + 1
if r > max_rev:
r = max_rev
return r
def get_prev_rev(cur):
r = cur - 1
c.f_path = f_path
c.changeset = cs
cur_rev = c.changeset.revision
prev_rev = c.repo.get_changeset(get_prev_rev(cur_rev)).raw_id
next_rev = c.repo.get_changeset(get_next_rev(cur_rev)).raw_id
c.url_prev = url('files_home', repo_name=c.repo_name,
revision=prev_rev, f_path=f_path)
c.url_next = url('files_home', repo_name=c.repo_name,
revision=next_rev, f_path=f_path)
c.files_list = c.changeset.get_node(f_path)
c.file_history = self._get_history(c.repo, c.files_list, f_path)
redirect(h.url('files_home', repo_name=repo_name,
revision=revision))
return render('files/files.html')
def rawfile(self, repo_name, revision, f_path):
file_node = cs.get_node(f_path)
rhodecode.controllers.home
Home controller for Rhodecode
:created_on: Feb 18, 2010
from pylons import tmpl_context as c, request
from rhodecode.lib.auth import LoginRequired
class HomeController(BaseController):
super(HomeController, self).__before__()
sortables = ['name', 'description', 'last_change', 'tip', 'owner']
current_sort = request.GET.get('sort', 'name')
current_sort_slug = current_sort.replace('-', '')
if current_sort_slug not in sortables:
c.sort_by = 'name'
current_sort_slug = c.sort_by
c.sort_by = current_sort
c.sort_slug = current_sort_slug
sort_key = current_sort_slug + '_sort'
if c.sort_by.startswith('-'):
c.repos_list = sorted(cached_repo_list, key=itemgetter(sort_key),
reverse=True)
reverse=False)
return render('/index.html')
rhodecode.controllers.journal
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Journal controller for pylons
:created_on: Nov 21, 2010
from paste.httpexceptions import HTTPInternalServerError, HTTPBadRequest
from sqlalchemy import or_
from rhodecode.lib.auth import LoginRequired, NotAnonymous
from rhodecode.lib.helpers import get_token
from rhodecode.model.db import UserLog, UserFollowing
class JournalController(BaseController):
@NotAnonymous()
super(JournalController, self).__before__()
# Return a rendered template
c.following = self.sa.query(UserFollowing)\
.filter(UserFollowing.user_id == c.rhodecode_user.user_id).all()
repo_ids = [x.follows_repository.repo_id for x in c.following
if x.follows_repository is not None]
user_ids = [x.follows_user.user_id for x in c.following
if x.follows_user is not None]
c.journal = self.sa.query(UserLog)\
.filter(or_(
UserLog.repository_id.in_(repo_ids),
UserLog.user_id.in_(user_ids),
))\
.order_by(UserLog.action_date.desc())\
.limit(20)\
.all()
return render('/journal.html')
def toggle_following(self):
if request.POST.get('auth_token') == get_token():
scm_model = ScmModel()
user_id = request.POST.get('follows_user_id')
if user_id:
scm_model.toggle_following_user(user_id,
c.rhodecode_user.user_id)
return 'ok'
raise HTTPInternalServerError()
repo_id = request.POST.get('follows_repo_id')
if repo_id:
scm_model.toggle_following_repo(repo_id,
raise HTTPBadRequest()
rhodecode.controllers.login
Login controller for rhodeocode
:created_on: Apr 22, 2010
from rhodecode.lib.auth import AuthUser, HasPermissionAnyDecorator
from rhodecode.model.forms import LoginForm, RegisterForm, PasswordResetForm
class LoginController(BaseController):
super(LoginController, self).__before__()
#redirect if already logged in
c.came_from = request.GET.get('came_from', None)
if c.rhodecode_user.is_authenticated \
and c.rhodecode_user.username != 'default':
if request.POST:
#import Login Form validator class
login_form = LoginForm()
c.form_result = login_form.to_python(dict(request.POST))
username = c.form_result['username']
user = UserModel().get_by_username(username, case_insensitive=True)
auth_user = AuthUser()
auth_user.username = user.username
auth_user.is_authenticated = True
auth_user.is_admin = user.admin
auth_user.user_id = user.user_id
auth_user.name = user.name
auth_user.lastname = user.lastname
session['rhodecode_user'] = auth_user
log.info('user %s is now authenticated', username)
user.update_lastlogin()
if c.came_from:
return redirect(c.came_from)
render('/login.html'),
return render('/login.html')
@HasPermissionAnyDecorator('hg.admin', 'hg.register.auto_activate',
'hg.register.manual_activate')
def register(self):
c.auto_active = False
for perm in user_model.get_by_username('default', cache=False).user_perms:
if perm.permission.permission_name == 'hg.register.auto_activate':
c.auto_active = True
break
register_form = RegisterForm()()
form_result = register_form.to_python(dict(request.POST))
form_result['active'] = c.auto_active
user_model.create_registration(form_result)
h.flash(_('You have successfully registered into rhodecode'),
return redirect(url('login_home'))
render('/register.html'),
rhodecode.controllers.search
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Search controller for rhodecode
:created_on: Aug 7, 2010
from pylons import request, response, config, session, tmpl_context as c, url
from rhodecode.lib.indexers import SCHEMA, IDX_NAME, ResultWrapper
from webhelpers.util import update_params
from whoosh.index import open_dir, EmptyIndexError
from whoosh.qparser import QueryParser, QueryParserError
from whoosh.query import Phrase
class SearchController(BaseController):
super(SearchController, self).__before__()
def index(self, search_repo=None):
c.repo_name = search_repo
c.formated_results = []
c.runtime = ''
c.cur_query = request.GET.get('q', None)
c.cur_type = request.GET.get('type', 'source')
c.cur_search = search_type = {'content':'content',
'commit':'content',
'path':'path',
'repository':'repository'}\
.get(c.cur_type, 'content')
if c.cur_query:
cur_query = c.cur_query.lower()
highlight_items = set()
idx = open_dir(config['app_conf']['index_dir']
, indexname=IDX_NAME)
searcher = idx.searcher()
qp = QueryParser(search_type, schema=SCHEMA)
if c.repo_name:
cur_query = u'repository:%s %s' % (c.repo_name, cur_query)
query = qp.parse(unicode(cur_query))
if isinstance(query, Phrase):
highlight_items.update(query.words)
for i in query.all_terms():
if i[0] == 'content':
highlight_items.add(i[1])
matcher = query.matcher(searcher)
log.debug(query)
log.debug(highlight_items)
results = searcher.search(query)
res_ln = len(results)
c.runtime = '%s results (%.3f seconds)' \
% (res_ln, results.runtime)
def url_generator(**kw):
return update_params("?q=%s&type=%s" \
% (c.cur_query, c.cur_search), **kw)
c.formated_results = Page(
ResultWrapper(search_type, searcher, matcher,
highlight_items),
page=p, item_count=res_ln,
items_per_page=10, url=url_generator)
except QueryParserError:
c.runtime = _('Invalid search query. Try quoting it.')
searcher.close()
except (EmptyIndexError, IOError):
log.error('Empty Index data')
c.runtime = _('There is no index to search in. '
'Please run whoosh indexer')
return render('/search/search.html')
rhodecode.controllers.settings
Settings controller for rhodecode
:created_on: Jun 30, 2010
from pylons import tmpl_context as c, request, url
from rhodecode.lib.auth import LoginRequired, HasRepoPermissionAllDecorator, \
HasRepoPermissionAnyDecorator, NotAnonymous
from rhodecode.model.forms import RepoSettingsForm, RepoForkForm
@HasRepoPermissionAllDecorator('repository.admin')
def index(self, repo_name):
c.repo_info = repo = repo_model.get_by_repo_name(repo_name)
if not repo:
h.flash(_('%s repository is not mapped to db perhaps'
' it was created or renamed from the file system'
' please run the application again'
' in order to rescan repositories') % repo_name,
defaults = c.repo_info.get_dict()
defaults.update({'user':c.repo_info.user.username})
c.users_array = repo_model.get_users_js()
for p in c.repo_info.repo_to_perm:
defaults.update({'perm_%s' % p.user.username:
p.permission.permission_name})
render('settings/repo_settings.html'),
def update(self, repo_name):
changed_name = repo_name
_form = RepoSettingsForm(edit=True, old_data={'repo_name':repo_name})()
repo_model.update(repo_name, form_result)
h.flash(_('Repository %s updated successfully' % repo_name),
changed_name = form_result['repo_name']
action_logger(self.rhodecode_user, 'user_updated_repo',
changed_name, '', self.sa)
c.repo_info = repo_model.get_by_repo_name(repo_name)
errors.value.update({'user':c.repo_info.user.username})
h.flash(_('error occurred during update of repository %s') \
% repo_name, category='error')
return redirect(url('repo_settings_home', repo_name=changed_name))
def delete(self, repo_name):
"""DELETE /repos/repo_name: Delete an existing item"""
# <input type="hidden" name="_method" value="DELETE" />
# h.form(url('repo_settings_delete', repo_name=ID),
# method='delete')
# url('repo_settings_delete', repo_name=ID)
rhodecode.controllers.shortlog
Shortlog controller for rhodecode
:created_on: Apr 18, 2010
class ShortlogController(BaseController):
super(ShortlogController, self).__before__()
repo = ScmModel().get_repo(c.repo_name)
c.repo_changesets = Page(repo, page=p, items_per_page=20)
c.shortlog_data = render('shortlog/shortlog_data.html')
return c.shortlog_data
r = render('shortlog/shortlog.html')
rhodecode.controllers.summary
Summary controller for Rhodecode
import calendar
from time import mktime
from datetime import datetime, timedelta, date
from vcs.exceptions import ChangesetError
from rhodecode.model.db import Statistics
from rhodecode.lib.utils import OrderedDict, EmptyChangeset
from rhodecode.lib.celerylib import run_task
from rhodecode.lib.celerylib.tasks import get_commits_stats
class SummaryController(BaseController):
super(SummaryController, self).__before__()
c.repo_info = scm_model.get_repo(c.repo_name)
c.following = scm_model.is_following_repo(c.repo_name,
return url('shortlog_home', repo_name=c.repo_name, **kw)
c.repo_changesets = Page(c.repo_info, page=1, items_per_page=10,
url=url_generator)
if self.rhodecode_user.username == 'default':
password = ':default'
password = ''
uri = u'%(protocol)s://%(user)s%(password)s@%(host)s%(prefix)s/%(repo_name)s' % {
'user':str(c.rhodecode_user.username),
'password':password,
'prefix':e.get('SCRIPT_NAME'),
'repo_name':c.repo_name, }
c.clone_repo_url = uri
c.repo_tags = OrderedDict()
for name, hash in c.repo_info.tags.items()[:10]:
c.repo_tags[name] = c.repo_info.get_changeset(hash)
c.repo_tags[name] = EmptyChangeset(hash)
for name, hash in c.repo_info.branches.items()[:10]:
c.repo_branches[name] = c.repo_info.get_changeset(hash)
c.repo_branches[name] = EmptyChangeset(hash)
td = date.today() + timedelta(days=1)
td_1m = td - timedelta(days=calendar.mdays[td.month])
td_1y = td - timedelta(days=365)
ts_min_m = mktime(td_1m.timetuple())
ts_min_y = mktime(td_1y.timetuple())
ts_max_y = mktime(td.timetuple())
if c.repo_info.dbrepo.enable_statistics:
c.no_data_msg = _('No data loaded yet')
run_task(get_commits_stats, c.repo_info.name, ts_min_y, ts_max_y)
c.no_data_msg = _('Statistics update are disabled for this repository')
c.ts_min = ts_min_m
c.ts_max = ts_max_y
stats = self.sa.query(Statistics)\
.filter(Statistics.repository == c.repo_info.dbrepo)\
rhodecode.controllers.tags
Tags controller for rhodecode
class TagsController(BaseController):
super(TagsController, self).__before__()
for name, hash_ in c.repo_info.tags.items():
c.repo_tags[name] = c.repo_info.get_changeset(hash_)
return render('tags/tags.html')
rhodecode.lib.auth
~~~~~~~~~~~~~~~~~~
authentication and permission libraries
:copyright: (c) 2010 by marcink.
:license: LICENSE_NAME, see LICENSE_FILE for more details.
import random
from decorator import decorator
from pylons import config, session, url, request
from rhodecode import __platform__, PLATFORM_WIN, PLATFORM_OTHERS
if __platform__ in PLATFORM_WIN:
from hashlib import sha256
if __platform__ in PLATFORM_OTHERS:
import bcrypt
from rhodecode.lib import str2bool
from rhodecode.lib.exceptions import LdapPasswordError, LdapUsernameError
from rhodecode.lib.utils import get_repo_slug
from rhodecode.lib.auth_ldap import AuthLdap
from rhodecode.model import meta
from rhodecode.model.db import Permission, RepoToPerm, Repository, \
User, UserToPerm
class PasswordGenerator(object):
"""This is a simple class for generating password from
different sets of characters
usage:
passwd_gen = PasswordGenerator()
#print 8-letter password containing only big and small letters of alphabet
print passwd_gen.gen_password(8, passwd_gen.ALPHABETS_BIG_SMALL)
ALPHABETS_NUM = r'''1234567890'''#[0]
ALPHABETS_SMALL = r'''qwertyuiopasdfghjklzxcvbnm'''#[1]
ALPHABETS_BIG = r'''QWERTYUIOPASDFGHJKLZXCVBNM'''#[2]
ALPHABETS_SPECIAL = r'''`-=[]\;',./~!@#$%^&*()_+{}|:"<>?''' #[3]
ALPHABETS_FULL = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM + ALPHABETS_SPECIAL#[4]
ALPHABETS_ALPHANUM = ALPHABETS_BIG + ALPHABETS_SMALL + ALPHABETS_NUM#[5]
ALPHABETS_BIG_SMALL = ALPHABETS_BIG + ALPHABETS_SMALL
ALPHABETS_ALPHANUM_BIG = ALPHABETS_BIG + ALPHABETS_NUM#[6]
ALPHABETS_ALPHANUM_SMALL = ALPHABETS_SMALL + ALPHABETS_NUM#[7]
def __init__(self, passwd=''):
self.passwd = passwd
def gen_password(self, len, type):
self.passwd = ''.join([random.choice(type) for _ in xrange(len)])
return self.passwd
class RhodeCodeCrypto(object):
@classmethod
def hash_string(cls, str_):
Cryptographic function used for password hashing based on pybcrypt
or pycrypto in windows
:param password: password to hash
return sha256(str_).hexdigest()
elif __platform__ in PLATFORM_OTHERS:
return bcrypt.hashpw(str_, bcrypt.gensalt(10))
raise Exception('Unknown or unsupported platform %s' % __platform__)
def hash_check(cls, password, hashed):
Checks matching password with it's hashed value, runs different
implementation based on platform it runs on
:param password: password
:param hashed: password in hashed form
return sha256(password).hexdigest() == hashed
return bcrypt.hashpw(password, hashed) == hashed
def get_crypt_password(password):
return RhodeCodeCrypto.hash_string(password)
def check_password(password, hashed):
return RhodeCodeCrypto.hash_check(password, hashed)
#!/usr/bin/env python
# encoding: utf-8
# ldap authentication lib
# Copyright (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
Created on Nov 17, 2010
@author: marcink
import ldap
pass
class AuthLdap(object):
def __init__(self, server, base_dn, port=389, bind_dn='', bind_pass='',
use_ldaps=False, ldap_version=3):
self.ldap_version = ldap_version
if use_ldaps:
port = port or 689
self.LDAP_USE_LDAPS = use_ldaps
self.LDAP_SERVER_ADDRESS = server
self.LDAP_SERVER_PORT = port
#USE FOR READ ONLY BIND TO LDAP SERVER
self.LDAP_BIND_DN = bind_dn
self.LDAP_BIND_PASS = bind_pass
ldap_server_type = 'ldap'
if self.LDAP_USE_LDAPS:ldap_server_type = ldap_server_type + 's'
self.LDAP_SERVER = "%s://%s:%s" % (ldap_server_type,
self.LDAP_SERVER_ADDRESS,
self.LDAP_SERVER_PORT)
self.BASE_DN = base_dn
def authenticate_ldap(self, username, password):
"""Authenticate a user via LDAP and return his/her LDAP properties.
Raises AuthenticationError if the credentials are rejected, or
EnvironmentError if the LDAP server can't be reached.
:param username: username
from rhodecode.lib.helpers import chop_at
uid = chop_at(username, "@%s" % self.LDAP_SERVER_ADDRESS)
if "," in username:
raise LdapUsernameError("invalid character in username: ,")
ldap.set_option(ldap.OPT_X_TLS_CACERTDIR, '/etc/openldap/cacerts')
ldap.set_option(ldap.OPT_NETWORK_TIMEOUT, 10)
server = ldap.initialize(self.LDAP_SERVER)
if self.ldap_version == 2:
server.protocol = ldap.VERSION2
server.protocol = ldap.VERSION3
if self.LDAP_BIND_DN and self.LDAP_BIND_PASS:
server.simple_bind_s(self.LDAP_BIND_DN, self.LDAP_BIND_PASS)
dn = self.BASE_DN % {'user':uid}
log.debug("Authenticating %r at %s", dn, self.LDAP_SERVER)
server.simple_bind_s(dn, password)
properties = server.search_s(dn, ldap.SCOPE_SUBTREE)
if not properties:
raise ldap.NO_SUCH_OBJECT()
except ldap.NO_SUCH_OBJECT, e:
log.debug("LDAP says no such user '%s' (%s)", uid, username)
raise LdapUsernameError()
except ldap.INVALID_CREDENTIALS, e:
log.debug("LDAP rejected password for user '%s' (%s)", uid, username)
raise LdapPasswordError()
except ldap.SERVER_DOWN, e:
raise LdapConnectionError("LDAP can't access authentication server")
return properties[0]
# mercurial repository backup manager
# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
Created on Feb 28, 2010
Mercurial repositories backup manager
import tarfile
import datetime
import sys
import subprocess
logging.basicConfig(level=logging.DEBUG,
format="%(asctime)s %(levelname)-5.5s %(message)s")
class BackupManager(object):
def __init__(self, repos_location, rsa_key, backup_server):
today = datetime.datetime.now().weekday() + 1
self.backup_file_name = "mercurial_repos.%s.tar.gz" % today
self.id_rsa_path = self.get_id_rsa(rsa_key)
self.repos_path = self.get_repos_path(repos_location)
self.backup_server = backup_server
self.backup_file_path = '/tmp'
logging.info('starting backup for %s', self.repos_path)
logging.info('backup target %s', self.backup_file_path)
def get_id_rsa(self, rsa_key):
if not os.path.isfile(rsa_key):
logging.error('Could not load id_rsa key file in %s', rsa_key)
sys.exit()
return rsa_key
def get_repos_path(self, path):
if not os.path.isdir(path):
logging.error('Wrong location for repositories in %s', path)
return path
def backup_repos(self):
bckp_file = os.path.join(self.backup_file_path, self.backup_file_name)
tar = tarfile.open(bckp_file, "w:gz")
for dir_name in os.listdir(self.repos_path):
logging.info('backing up %s', dir_name)
tar.add(os.path.join(self.repos_path, dir_name), dir_name)
tar.close()
logging.info('finished backup of mercurial repositories')
def transfer_files(self):
params = {
'id_rsa_key': self.id_rsa_path,
'backup_file':os.path.join(self.backup_file_path,
self.backup_file_name),
'backup_server':self.backup_server
cmd = ['scp', '-l', '40000', '-i', '%(id_rsa_key)s' % params,
'%(backup_file)s' % params,
'%(backup_server)s' % params]
subprocess.call(cmd)
logging.info('Transfered file %s to %s', self.backup_file_name, cmd[4])
def rm_file(self):
logging.info('Removing file %s', self.backup_file_name)
os.remove(os.path.join(self.backup_file_path, self.backup_file_name))
if __name__ == "__main__":
repo_location = '/home/repo_path'
backup_server = 'root@192.168.1.100:/backups/mercurial'
rsa_key = '/home/id_rsa'
B_MANAGER = BackupManager(repo_location, rsa_key, backup_server)
B_MANAGER.backup_repos()
B_MANAGER.transfer_files()
B_MANAGER.rm_file()
package.rhodecode.lib.celerylib.__init__
celery libs for RhodeCode
:created_on: Nov 27, 2010
import socket
from hashlib import md5
from vcs.utils.lazy import LazyProperty
from rhodecode.lib.pidlock import DaemonLock, LockHeld
from pylons import config
def str2bool(v):
return v.lower() in ["yes", "true", "t", "1"] if v else None
CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
except KeyError:
CELERY_ON = False
class ResultWrapper(object):
def __init__(self, task):
self.task = task
@LazyProperty
def result(self):
return self.task
def run_task(task, *args, **kwargs):
if CELERY_ON:
t = task.delay(*args, **kwargs)
log.info('running task %s:%s', t.task_id, task)
return t
except socket.error, e:
if e.errno == 111:
log.debug('Unable to connect to celeryd. Sync execution')
except KeyError, e:
except Exception, e:
log.debug('executing task %s in sync mode', task)
return ResultWrapper(task(*args, **kwargs))
def locked_task(func):
def __wrapper(func, *fargs, **fkwargs):
params = list(fargs)
params.extend(['%s-%s' % ar for ar in fkwargs.items()])
lockkey = 'task_%s' % \
md5(str(func.__name__) + '-' + \
'-'.join(map(str, params))).hexdigest()
log.info('running task with lockkey %s', lockkey)
l = DaemonLock(lockkey)
ret = func(*fargs, **fkwargs)
l.release()
return ret
except LockHeld:
log.info('LockHeld')
return 'Task with key %s already running' % lockkey
return decorator(__wrapper, func)
rhodecode.lib.celerylib.tasks
RhodeCode task modules, containing all task that suppose to be run
by celery daemon
:created_on: Oct 6, 2010
from celery.decorators import task
from rhodecode.lib.celerylib import run_task, locked_task, str2bool
from rhodecode.lib.helpers import person
from rhodecode.lib.smtp_mailer import SmtpMailer
from rhodecode.lib.utils import OrderedDict, add_cache
from rhodecode.model import init_model
from rhodecode.model.db import RhodeCodeUi
from vcs.backends import get_repo
from sqlalchemy import engine_from_config
add_cache(config)
__all__ = ['whoosh_index', 'get_commits_stats',
'reset_user_password', 'send_email']
def get_session():
engine = engine_from_config(config, 'sqlalchemy.db1.')
init_model(engine)
sa = meta.Session()
return sa
def get_repos_path():
sa = get_session()
q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
return q.ui_value
@task(ignore_result=True)
@locked_task
def whoosh_index(repo_location, full_index):
#log = whoosh_index.get_logger()
from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
index_location = config['index_dir']
WhooshIndexingDaemon(index_location=index_location,
repo_location=repo_location, sa=get_session())\
.run(full_index=full_index)
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
log = get_commits_stats.get_logger()
from rhodecode.model.db import Statistics, Repository
#for js data compatibilty
author_key_cleaner = lambda k: person(k).replace('"', "")
commits_by_day_author_aggregate = {}
commits_by_day_aggregate = {}
repos_path = get_repos_path()
p = os.path.join(repos_path, repo_name)
repo = get_repo(p)
skip_date_limit = True
parse_limit = 250 #limit for single task changeset parsing optimal for
last_rev = 0
last_cs = None
timegetter = itemgetter('time')
dbrepo = sa.query(Repository)\
.filter(Repository.repo_name == repo_name).scalar()
cur_stats = sa.query(Statistics)\
.filter(Statistics.repository == dbrepo).scalar()
if cur_stats:
last_rev = cur_stats.stat_on_revision
if not repo.revisions:
return True
if last_rev == repo.revisions[-1] and len(repo.revisions) > 1:
rhodecode.lib.dbmigrate.__init__
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Database migration modules
:created_on: Dec 11, 2010
from rhodecode.lib.utils import BasePasterCommand, Command, add_cache
from rhodecode.lib.db_manage import DbManage
class UpgradeDb(BasePasterCommand):
"""Command used for paster to upgrade our database to newer version
max_args = 1
min_args = 1
usage = "CONFIG_FILE"
summary = "Upgrades current db to newer version given configuration file"
group_name = "RhodeCode"
parser = Command.standard_parser(verbose=True)
def command(self):
db_uri = config['sqlalchemy.db1.url']
dbmanage = DbManage(log_sql=True, dbconf=db_uri,
root=config['here'], tests=False)
dbmanage.upgrade()
def update_parser(self):
self.parser.add_option('--sql',
action='store_true',
dest='just_sql',
help="Prints upgrade sql for further investigation",
default=False)
rhodecode.lib.dbmigrate.versions.__init__
Package containing new versions of database models
# Custom Exceptions modules
Custom Exceptions modules
class LdapUsernameError(Exception):pass
class LdapPasswordError(Exception):pass
class LdapConnectionError(Exception):pass
class LdapImportError(Exception):pass
class DefaultUserException(Exception):pass
class UserOwnsReposException(Exception):pass
rhodecode.lib.hooks
~~~~~~~~~~~~~~~~~~~
Hooks runned by rhodecode
:created_on: Aug 6, 2010
import getpass
from mercurial.cmdutil import revrange
from mercurial.node import nullrev
from rhodecode.lib.utils import action_logger
def repo_size(ui, repo, hooktype=None, **kwargs):
"""Presents size of repository after push
:param ui:
:param repo:
:param hooktype:
if hooktype != 'changegroup':
return False
size_hg, size_root = 0, 0
for path, dirs, files in os.walk(repo.root):
if path.find('.hg') != -1:
for f in files:
size_hg += os.path.getsize(os.path.join(path, f))
except OSError:
size_root += os.path.getsize(os.path.join(path, f))
size_hg_f = h.format_byte_size(size_hg)
size_root_f = h.format_byte_size(size_root)
size_total_f = h.format_byte_size(size_root + size_hg)
sys.stdout.write('Repository size .hg:%s repo:%s total:%s\n' \
% (size_hg_f, size_root_f, size_total_f))
def log_pull_action(ui, repo, **kwargs):
"""Logs user last pull action
extra_params = dict(repo.ui.configitems('rhodecode_extras'))
username = extra_params['username']
repository = extra_params['repository']
action = 'pull'
action_logger(username, action, repository, extra_params['ip'])
return 0
def log_push_action(ui, repo, **kwargs):
"""Maps user last push action to new changeset id, from mercurial
action = 'push:%s'
node = kwargs['node']
def get_revs(repo, rev_opt):
if rev_opt:
revs = revrange(repo, rev_opt)
if len(revs) == 0:
return (nullrev, nullrev)
return (max(revs), min(revs))
return (len(repo) - 1, 0)
stop, start = get_revs(repo, [node + ':'])
revs = (str(repo[r]) for r in xrange(start, stop + 1))
action = action % ','.join(revs)
# whoosh indexer daemon for rhodecode
Created on Jan 26, 2010
A deamon will read from task table and run tasks
from os.path import dirname as dn
from os.path import join as jn
#to get the rhodecode import
project_path = dn(dn(dn(dn(os.path.realpath(__file__)))))
sys.path.append(project_path)
from rhodecode.lib.helpers import safe_unicode
from whoosh.index import create_in, open_dir
from shutil import rmtree
from rhodecode.lib.indexers import INDEX_EXTENSIONS, SCHEMA, IDX_NAME
from vcs.exceptions import ChangesetError, RepositoryError
log = logging.getLogger('whooshIndexer')
# create logger
log.setLevel(logging.DEBUG)
log.propagate = False
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter("%(asctime)s - %(name)s - %(levelname)s - %(message)s")
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
log.addHandler(ch)
class WhooshIndexingDaemon(object):
Deamon for atomic jobs
def __init__(self, indexname='HG_INDEX', index_location=None,
repo_location=None, sa=None):
self.indexname = indexname
self.index_location = index_location
if not index_location:
raise Exception('You have to provide index location')
self.repo_location = repo_location
if not repo_location:
raise Exception('You have to provide repositories location')
self.repo_paths = ScmModel(sa).repo_scan(self.repo_location, None)
self.initial = False
if not os.path.isdir(self.index_location):
os.makedirs(self.index_location)
log.info('Cannot run incremental index since it does not'
' yet exist running full build')
self.initial = True
def get_paths(self, repo):
"""recursive walk in root dir and return a set of all path in that dir
based on repository walk function
index_paths_ = set()
for topnode, dirs, files in repo.walk('/', 'tip'):
index_paths_.add(jn(repo.path, f.path))
for dir in dirs:
except RepositoryError:
return index_paths_
def get_node(self, repo, path):
n_path = path[len(repo.path) + 1:]
node = repo.get_changeset().get_node(n_path)
return node
def get_node_mtime(self, node):
return mktime(node.last_changeset.date.timetuple())
def add_doc(self, writer, path, repo):
"""Adding doc to writer this function itself fetches data from
rhodecode.lib.middleware.https_fixup
middleware to handle https correctly
:created_on: May 23, 2010
class HttpsFixup(object):
def __init__(self, app, config):
self.application = app
self.config = config
def __call__(self, environ, start_response):
self.__fixup(environ)
return self.application(environ, start_response)
def __fixup(self, environ):
"""Function to fixup the environ as needed. In order to use this
middleware you should set this header inside your
proxy ie. nginx, apache etc.
proto = environ.get('HTTP_X_URL_SCHEME')
if str2bool(self.config.get('force_https')):
proto = 'https'
if proto == 'https':
environ['wsgi.url_scheme'] = proto
environ['wsgi.url_scheme'] = 'http'
return None
# middleware to handle git api calls
Created on 2010-04-28
SimpleGit middleware for handling git protocol request (push/clone etc.)
It's implemented with basic auth function
from dulwich import server as dulserver
class SimpleGitUploadPackHandler(dulserver.UploadPackHandler):
def handle(self):
write = lambda x: self.proto.write_sideband(1, x)
graph_walker = dulserver.ProtocolGraphWalker(self, self.repo.object_store,
self.repo.get_peeled)
objects_iter = self.repo.fetch_objects(
graph_walker.determine_wants, graph_walker, self.progress,
get_tagged=self.get_tagged)
# Do they want any objects?
if len(objects_iter) == 0:
self.progress("counting objects: %d, done.\n" % len(objects_iter))
dulserver.write_pack_data(dulserver.ProtocolFile(None, write), objects_iter,
len(objects_iter))
messages = []
messages.append('thank you for using rhodecode')
for msg in messages:
self.progress(msg + "\n")
# we are done
self.proto.write("0000")
dulserver.DEFAULT_HANDLERS = {
'git-upload-pack': SimpleGitUploadPackHandler,
'git-receive-pack': dulserver.ReceivePackHandler,
from dulwich.repo import Repo
from dulwich.web import HTTPGitApplication
from paste.auth.basic import AuthBasicAuthenticator
from paste.httpheaders import REMOTE_USER, AUTH_TYPE
from rhodecode.lib.auth import authfunc, HasPermissionAnyMiddleware
from rhodecode.lib.utils import invalidate_cache, check_repo_fast
from webob.exc import HTTPNotFound, HTTPForbidden, HTTPInternalServerError
def is_git(environ):
Returns True if request's target is git server. ``HTTP_USER_AGENT`` would
then have git client version given.
:param environ:
http_user_agent = environ.get('HTTP_USER_AGENT')
if http_user_agent and http_user_agent.startswith('git'):
class SimpleGit(object):
def __init__(self, application, config):
self.application = application
#authenticate this git request using
self.authenticate = AuthBasicAuthenticator('', authfunc)
self.ipaddr = '0.0.0.0'
self.repository = None
self.username = None
self.action = None
if not is_git(environ):
proxy_key = 'HTTP_X_REAL_IP'
def_key = 'REMOTE_ADDR'
self.ipaddr = environ.get(proxy_key, environ.get(def_key, '0.0.0.0'))
# AUTHENTICATE THIS GIT REQUEST
username = REMOTE_USER(environ)
if not username:
self.authenticate.realm = self.config['rhodecode_realm']
result = self.authenticate(environ)
if isinstance(result, str):
AUTH_TYPE.update(environ, 'basic')
# middleware to handle mercurial api calls
SimpleHG middleware for handling mercurial protocol request (push/clone etc.)
from mercurial.error import RepoError
from mercurial.hgweb import hgweb
from mercurial.hgweb.request import wsgiapplication
from rhodecode.lib.utils import make_ui, invalidate_cache, \
check_repo_fast, ui_sections
def is_mercurial(environ):
Returns True if request's target is mercurial server - header
``HTTP_ACCEPT`` of such request would start with ``application/mercurial``.
http_accept = environ.get('HTTP_ACCEPT')
if http_accept and http_accept.startswith('application/mercurial'):
class SimpleHg(object):
#authenticate this mercurial request using authfunc
if not is_mercurial(environ):
# skip passing error to error controller
environ['pylons.status_code_redirect'] = True
# AUTHENTICATE THIS MERCURIAL REQUEST
self.authenticate.realm = str(self.config['rhodecode_realm'])
REMOTE_USER.update(environ, result)
return result.wsgi_application(environ, start_response)
#=======================================================================
# GET REPOSITORY
repo_name = '/'.join(environ['PATH_INFO'].split('/')[1:])
if repo_name.endswith('/'):
repo_name = repo_name.rstrip('/')
self.repository = repo_name
return HTTPInternalServerError()(environ, start_response)
# CHECK PERMISSIONS FOR THIS REQUEST
self.action = self.__get_action(environ)
if self.action:
username = self.__get_environ_user(environ)
user = self.__get_user(username)
self.username = user.username
#check permissions for this repository
if self.action == 'push':
rhodecode.lib.utils
Utilities library for RhodeCode
from UserDict import DictMixin
from mercurial import ui, config, hg
import paste
import beaker
from paste.script.command import Command, BadCommand
from vcs.backends.base import BaseChangeset
from rhodecode.model.caching_query import FromCache
from rhodecode.model.db import Repository, User, RhodeCodeUi, UserLog
def get_repo_slug(request):
return request.environ['pylons.routes_dict'].get('repo_name')
def action_logger(user, action, repo, ipaddr='', sa=None):
Action logger for various actions made by users
:param user: user that made this action, can be a unique username string or
object containing user_id attribute
:param action: action to log, should be on of predefined unique actions for
easy translations
:param repo: string name of repository or object containing repo_id,
that action was made on
:param ipaddr: optional ip address from what the action was made
:param sa: optional sqlalchemy session
if not sa:
um = UserModel()
if hasattr(user, 'user_id'):
user_obj = user
elif isinstance(user, basestring):
user_obj = um.get_by_username(user, cache=False)
raise Exception('You have to provide user object or username')
rm = RepoModel()
if hasattr(repo, 'repo_id'):
repo_obj = rm.get(repo.repo_id, cache=False)
repo_name = repo_obj.repo_name
elif isinstance(repo, basestring):
repo_name = repo.lstrip('/')
repo_obj = rm.get_by_repo_name(repo_name, cache=False)
raise Exception('You have to provide repository to action logger')
user_log = UserLog()
user_log.user_id = user_obj.user_id
user_log.action = action
user_log.repository_id = repo_obj.repo_id
user_log.repository_name = repo_name
user_log.action_date = datetime.datetime.now()
user_log.user_ip = ipaddr
sa.add(user_log)
sa.commit()
log.info('Adding user %s, action %s on %s', user_obj, action, repo)
sa.rollback()
def get_repos(path, recursive=False, initial=False):
Scans given path for repos and return (name,(type,path)) tuple
:param prefix:
:param path:
:param recursive:
:param initial:
from vcs.utils.helpers import get_scm
rhodecode.model.__init__
~~~~~~~~~~~~~~~~~~~~~~~~
The application's model objects
:created_on: Nov 25, 2010
:example:
.. code-block:: python
from paste.deploy import appconfig
from rhodecode.config.environment import load_environment
conf = appconfig('config:development.ini', relative_to = './../../')
load_environment(conf.global_conf, conf.local_conf)
engine = engine_from_config(config, 'sqlalchemy.')
# RUN YOUR CODE HERE
def init_model(engine):
"""Initializes db session, bind the engine with the metadata,
Call this before using any of the tables or classes in the model, preferably
once in application start
:param engine: engine to bind to
log.info("initializing db models for %s", engine)
meta.Base.metadata.bind = engine
class BaseModel(object):
"""Base Model for all RhodeCode models, it adds sql alchemy session
into instance of model
:param sa: If passed it reuses this session instead of creating a new one
def __init__(self, sa=None):
if sa is not None:
self.sa = sa
self.sa = meta.Session()
rhodecode.model.db
Database Models for RhodeCode
:created_on: Apr 08, 2010
from datetime import date
from sqlalchemy import *
from sqlalchemy.orm import relationship, backref
from sqlalchemy.orm.interfaces import MapperExtension
from rhodecode.model.meta import Base, Session
class RhodeCodeSettings(Base):
__tablename__ = 'rhodecode_settings'
__table_args__ = (UniqueConstraint('app_settings_name'), {'useexisting':True})
app_settings_id = Column("app_settings_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
app_settings_name = Column("app_settings_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
app_settings_value = Column("app_settings_value", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
def __init__(self, k='', v=''):
self.app_settings_name = k
self.app_settings_value = v
def __repr__(self):
return "<%s('%s:%s')>" % (self.__class__.__name__,
self.app_settings_name, self.app_settings_value)
class RhodeCodeUi(Base):
__tablename__ = 'rhodecode_ui'
__table_args__ = {'useexisting':True}
ui_id = Column("ui_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
ui_section = Column("ui_section", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_key = Column("ui_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_value = Column("ui_value", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_active = Column("ui_active", Boolean(), nullable=True, unique=None, default=True)
class User(Base):
__tablename__ = 'users'
__table_args__ = (UniqueConstraint('username'), UniqueConstraint('email'), {'useexisting':True})
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=None)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
is_ldap = Column("is_ldap", Boolean(), nullable=False, unique=None, default=False)
user_log = relationship('UserLog', cascade='all')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
@property
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
def short_contact(self):
return '%s %s' % (self.name, self.lastname)
def is_admin(self):
return self.admin
return "<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
def by_username(cls, username):
return Session.query(cls).filter(cls.username == username).one()
def update_lastlogin(self):
"""Update user lastlogin"""
session = Session.object_session(self)
self.last_login = datetime.datetime.now()
session.add(self)
session.commit()
log.debug('updated user %s lastlogin', self.username)
session.rollback()
class UserLog(Base):
__tablename__ = 'user_logs'
rhodecode.model.permission
permissions model for RhodeCode
:created_on: Aug 20, 2010
from rhodecode.model import BaseModel
from rhodecode.model.db import User, Permission, UserToPerm, RepoToPerm
class PermissionModel(BaseModel):
"""Permissions model for RhodeCode
def get_permission(self, permission_id, cache=False):
"""Get's permissions by id
:param permission_id: id of permission to get from database
:param cache: use Cache for this query
perm = self.sa.query(Permission)
if cache:
perm = perm.options(FromCache("sql_cache_short",
"get_permission_%s" % permission_id))
return perm.get(permission_id)
def get_permission_by_name(self, name, cache=False):
"""Get's permissions by given name
:param name: name to fetch
:param cache: Use cache for this query
perm = self.sa.query(Permission)\
.filter(Permission.permission_name == name)
"get_permission_%s" % name))
return perm.scalar()
def update(self, form_result):
perm_user = self.sa.query(User)\
.filter(User.username == form_result['perm_user_name']).scalar()
u2p = self.sa.query(UserToPerm).filter(UserToPerm.user == perm_user).all()
if len(u2p) != 3:
raise Exception('Defined: %s should be 3 permissions for default'
' user. This should not happen please verify'
' your database' % len(u2p))
#stage 1 change defaults
for p in u2p:
if p.permission.permission_name.startswith('repository.'):
p.permission = self.get_permission_by_name(
form_result['default_perm'])
self.sa.add(p)
if p.permission.permission_name.startswith('hg.register.'):
form_result['default_register'])
if p.permission.permission_name.startswith('hg.create.'):
form_result['default_create'])
#stage 2 update all default permissions for repos if checked
if form_result['overwrite_default'] == True:
for r2p in self.sa.query(RepoToPerm)\
.filter(RepoToPerm.user == perm_user).all():
r2p.permission = self.get_permission_by_name(
self.sa.add(r2p)
#stage 3 set anonymous access
if perm_user.username == 'default':
perm_user.active = bool(form_result['anonymous'])
self.sa.add(perm_user)
self.sa.rollback()
rhodecode.model.repo
~~~~~~~~~~~~~~~~~~~~
Repository model for rhodecode
:created_on: Jun 5, 2010
import shutil
from datetime import datetime
from sqlalchemy.orm import joinedload, make_transient
from vcs.backends import get_backend
from rhodecode.model.db import Repository, RepoToPerm, User, Permission, \
Statistics, RhodeCodeUi
class RepoModel(BaseModel):
def repos_path(self):
"""Get's the repositories root path from database
q = self.sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
def get(self, repo_id, cache=False):
repo = self.sa.query(Repository)\
.filter(Repository.repo_id == repo_id)
repo = repo.options(FromCache("sql_cache_short",
"get_repo_%s" % repo_id))
return repo.scalar()
def get_by_repo_name(self, repo_name, cache=False):
.filter(Repository.repo_name == repo_name)
"get_repo_%s" % repo_name))
def get_users_js(self):
users = self.sa.query(User).filter(User.active == True).all()
u_tmpl = '''{id:%s, fname:"%s", lname:"%s", nname:"%s"},'''
users_array = '[%s];' % '\n'.join([u_tmpl % (u.user_id, u.name,
u.lastname, u.username)
for u in users])
return users_array
def update(self, repo_name, form_data):
cur_repo = self.get_by_repo_name(repo_name, cache=False)
user_model = UserModel(self.sa)
#update permissions
for username, perm in form_data['perms_updates']:
r2p = self.sa.query(RepoToPerm)\
.filter(RepoToPerm.user == user_model.get_by_username(username))\
.filter(RepoToPerm.repository == cur_repo)\
.one()
r2p.permission = self.sa.query(Permission)\
.filter(Permission.permission_name == perm)\
.scalar()
#set new permissions
for username, perm in form_data['perms_new']:
r2p = RepoToPerm()
r2p.repository = cur_repo
r2p.user = user_model.get_by_username(username, cache=False)
#update current repo
for k, v in form_data.items():
if k == 'user':
cur_repo.user = user_model.get(v)
setattr(cur_repo, k, v)
self.sa.add(cur_repo)
if repo_name != form_data['repo_name']:
rhodecode.model.settings
Settings model for RhodeCode
:created on Nov 17, 2010
from rhodecode.model.db import RhodeCodeSettings
class SettingsModel(BaseModel):
Settings model
def get(self, settings_key, cache=False):
r = self.sa.query(RhodeCodeSettings)\
.filter(RhodeCodeSettings.app_settings_name == settings_key).scalar()
r = r.options(FromCache("sql_cache_short",
"get_setting_%s" % settings_key))
def get_app_settings(self, cache=False):
"""Get's config from database, each config key is prefixed with
'rhodecode_' prefix, than global pylons config is updated with such
keys
ret = self.sa.query(RhodeCodeSettings)
ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
if not ret:
raise Exception('Could not get application settings !')
settings = {}
for each in ret:
settings['rhodecode_' + each.app_settings_name] = each.app_settings_value
return settings
def get_ldap_settings(self):
Returns ldap settings from database
:returns:
ldap_active
ldap_host
ldap_port
ldap_ldaps
ldap_tls_reqcert
ldap_dn_user
ldap_dn_pass
ldap_base_dn
ldap_filter
ldap_search_scope
ldap_attr_login
ldap_attr_firstname
ldap_attr_lastname
ldap_attr_email
# ldap_search_scope
.filter(RhodeCodeSettings.app_settings_name\
.startswith('ldap_'))\
fd = {}
for row in r:
v = row.app_settings_value
if v in ['true', 'yes', 'on', 'y', 't', '1']:
v = True
elif v in ['false', 'no', 'off', 'n', 'f', '0']:
v = False
fd.update({row.app_settings_name:v})
return fd
rhodecode.model.user
users model for RhodeCode
:created_on: Apr 9, 2010
from rhodecode.lib.exceptions import DefaultUserException, UserOwnsReposException
class UserModel(BaseModel):
def get(self, user_id, cache=False):
user = self.sa.query(User)
user = user.options(FromCache("sql_cache_short",
"get_user_%s" % user_id))
return user.get(user_id)
def get_by_username(self, username, cache=False, case_insensitive=False):
if case_insensitive:
user = self.sa.query(User).filter(User.username.ilike(username))
user = self.sa.query(User)\
.filter(User.username == username)
"get_user_%s" % username))
return user.scalar()
def create(self, form_data):
new_user = User()
setattr(new_user, k, v)
self.sa.add(new_user)
def create_ldap(self, username, password):
Checks if user is in database, if not creates this user marked
as ldap user
:param username:
:param password:
from rhodecode.lib.auth import get_crypt_password
log.debug('Checking for such ldap account in RhodeCode database')
if self.get_by_username(username, case_insensitive=True) is None:
new_user.username = username.lower()#add ldap account always lowercase
new_user.password = get_crypt_password(password)
new_user.email = '%s@ldap.server' % username
new_user.active = True
new_user.is_ldap = True
new_user.name = '%s@ldap' % username
new_user.lastname = ''
log.debug('this %s user exists skipping creation of ldap account',
username)
def create_registration(self, form_data):
if k != 'admin':
body = ('New user registration\n'
'username: %s\n'
'email: %s\n')
Status change: