#!/usr/bin/env python
# encoding: utf-8
# repos controller for pylons
# Copyright (C) 2009-2010 Marcin Kuzminski <marcin@python-works.com>
#
# This program is free software; you can redistribute it and/or
# modify it under the terms of the GNU General Public License
# as published by the Free Software Foundation; version 2
# of the License or (at your opinion) any later version of the license.
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program; if not, write to the Free Software
# Foundation, Inc., 51 Franklin Street, Fifth Floor, Boston,
# MA 02110-1301, USA.
"""
Created on April 7, 2010
admin controller for pylons
@author: marcink
from formencode import htmlfill
from operator import itemgetter
from paste.httpexceptions import HTTPInternalServerError
from pylons import request, response, session, tmpl_context as c, url
from pylons.controllers.util import abort, redirect
from pylons.i18n.translation import _
from pylons_app.lib import helpers as h
from pylons_app.lib.auth import LoginRequired, HasPermissionAllDecorator
from pylons_app.lib.base import BaseController, render
from pylons_app.lib.utils import invalidate_cache
from pylons_app.model.db import User
from pylons_app.model.forms import RepoForm
from pylons_app.model.hg_model import HgModel
from pylons_app.model.repo_model import RepoModel
import formencode
import logging
import traceback
log = logging.getLogger(__name__)
class ReposController(BaseController):
"""REST Controller styled on the Atom Publishing Protocol"""
# To properly map this controller, ensure your config/routing.py
# file has a resource setup:
# map.resource('repo', 'repos')
@LoginRequired()
@HasPermissionAllDecorator('hg.admin')
def __before__(self):
c.admin_user = session.get('admin_user')
c.admin_username = session.get('admin_username')
super(ReposController, self).__before__()
def index(self, format='html'):
"""GET /repos: All items in the collection"""
# url('repos')
cached_repo_list = HgModel().get_repos()
c.repos_list = sorted(cached_repo_list, key=itemgetter('name_sort'))
return render('admin/repos/repos.html')
def create(self):
"""POST /repos: Create a new item"""
repo_model = RepoModel()
_form = RepoForm()()
form_result = {}
try:
form_result = _form.to_python(dict(request.POST))
repo_model.create(form_result, c.hg_app_user)
invalidate_cache('cached_repo_list')
h.flash(_('created repository %s') % form_result['repo_name'],
category='success')
except formencode.Invalid as errors:
c.new_repo = errors.value['repo_name']
return htmlfill.render(
render('admin/repos/repo_add.html'),
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8")
except Exception:
log.error(traceback.format_exc())
msg = _('error occured during creation of repository %s') \
@@ -151,61 +152,67 @@ class ReposController(BaseController):
' it was moved or renamed from the filesystem'
' please run the application again'
' in order to rescan repositories') % repo_name,
category='error')
return redirect(url('repos'))
repo_model.delete(repo)
h.flash(_('deleted repository %s') % repo_name, category='success')
h.flash(_('An error occured during deletion of %s') % repo_name,
def delete_perm_user(self, repo_name):
DELETE an existing repository permission user
@param repo_name:
repo_model.delete_perm_user(request.POST, repo_name)
except Exception as e:
h.flash(_('An error occured during deletion of repository user'),
raise HTTPInternalServerError()
def show(self, repo_name, format='html'):
"""GET /repos/repo_name: Show a specific item"""
# url('repo', repo_name=ID)
def edit(self, repo_name, format='html'):
"""GET /repos/repo_name/edit: Form to edit an existing item"""
# url('edit_repo', repo_name=ID)
c.repo_info = repo = repo_model.get(repo_name)
if not repo:
h.flash(_('%s repository is not mapped to db perhaps'
' it was created or renamed from the filesystem'
defaults = c.repo_info.__dict__
defaults.update({'user':c.repo_info.user.username})
if c.repo_info.user:
else:
replacement_user = self.sa.query(User)\
.filter(User.admin == True).first().username
defaults.update({'user':replacement_user})
c.users_array = repo_model.get_users_js()
for p in c.repo_info.repo2perm:
defaults.update({'perm_%s' % p.user.username:
p.permission.permission_name})
render('admin/repos/repo_edit.html'),
defaults=defaults,
encoding="UTF-8",
force_defaults=False
)
# Model for hg app
Created on April 9, 2010
Model for hg app
from beaker.cache import cache_region
from mercurial import ui
from mercurial.hgweb.hgwebdir_mod import findrepos
from pylons_app.lib.auth import HasRepoPermissionAny
from pylons_app.model import meta
from pylons_app.model.db import Repository
from pylons_app.model.db import Repository, User
from sqlalchemy.orm import joinedload
from vcs.exceptions import RepositoryError, VCSError
import os
import sys
from vcs.backends.hg import MercurialRepository
except ImportError:
sys.stderr.write('You have to import vcs module')
raise Exception('Unable to import vcs')
def _get_repos_cached_initial(app_globals, initial):
return cached dict with repos
g = app_globals
return HgModel.repo_scan(g.paths[0][0], g.paths[0][1], g.baseui, initial)
@cache_region('long_term', 'cached_repo_list')
def _get_repos_cached():
log.info('getting all repositories list')
from pylons import app_globals as g
return HgModel.repo_scan(g.paths[0][0], g.paths[0][1], g.baseui)
@cache_region('super_short_term', 'cached_repos_switcher_list')
def _get_repos_switcher_cached(cached_repo_list):
repos_lst = []
for repo in sorted(x.name.lower() for x in cached_repo_list.values()):
if HasRepoPermissionAny('repository.write', 'repository.read', 'repository.admin')(repo, 'main page check'):
repos_lst.append(repo)
return repos_lst
@cache_region('long_term', 'full_changelog')
def _full_changelog_cached(repo_name):
log.info('getting full changelog for %s', repo_name)
return list(reversed(list(HgModel().get_repo(repo_name))))
class HgModel(object):
Mercurial Model
@@ -89,86 +89,90 @@ class HgModel(object):
repository itself. Return a dictionary of repository objects
:param repos_path: path to directory it could take syntax with
* or ** for deep recursive displaying repositories
sa = meta.Session()
def check_repo_dir(path):
Checks the repository
:param path:
repos_path = path.split('/')
if repos_path[-1] in ['*', '**']:
repos_path = repos_path[:-1]
if repos_path[0] != '/':
repos_path[0] = '/'
if not os.path.isdir(os.path.join(*repos_path)):
raise RepositoryError('Not a valid repository in %s' % path[0][1])
if not repos_path.endswith('*'):
raise VCSError('You need to specify * or ** at the end of path '
'for recursive scanning')
check_repo_dir(repos_path)
log.info('scanning for repositories in %s', repos_path)
repos = findrepos([(repos_prefix, repos_path)])
if not isinstance(baseui, ui.ui):
baseui = ui.ui()
repos_list = {}
for name, path in repos:
#name = name.split('/')[-1]
if repos_list.has_key(name):
raise RepositoryError('Duplicate repository name %s found in'
' %s' % (name, path))
repos_list[name] = MercurialRepository(path, baseui=baseui)
repos_list[name].name = name
dbrepo = None
if not initial:
dbrepo = sa.query(Repository)\
.filter(Repository.repo_name == name).scalar()
if dbrepo:
log.info('Adding db instance to cached list')
repos_list[name].dbrepo = dbrepo
repos_list[name].description = dbrepo.description
repos_list[name].contact = dbrepo.user.full_contact
if dbrepo.user:
repos_list[name].contact = sa.query(User)\
.filter(User.admin == True).first().full_contact
except OSError:
continue
meta.Session.remove()
return repos_list
def get_repos(self):
for name, repo in _get_repos_cached().items():
if repo._get_hidden():
#skip hidden web repository
last_change = repo.last_change
tip = repo.get_changeset('tip')
except RepositoryError:
from pylons_app.lib.utils import EmptyChangeset
tip = EmptyChangeset()
tmp_d = {}
tmp_d['name'] = repo.name
tmp_d['name_sort'] = tmp_d['name'].lower()
tmp_d['description'] = repo.description
tmp_d['description_sort'] = tmp_d['description']
tmp_d['last_change'] = last_change
tmp_d['last_change_sort'] = last_change[1] - last_change[0]
tmp_d['tip'] = tip.raw_id
tmp_d['tip_sort'] = tip.revision
tmp_d['rev'] = tip.revision
tmp_d['contact'] = repo.contact
tmp_d['contact_sort'] = tmp_d['contact']
tmp_d['repo_archives'] = list(repo._get_archives())
tmp_d['last_msg'] = tip.message
tmp_d['repo'] = repo
yield tmp_d
def get_repo(self, repo_name):
return _get_repos_cached()[repo_name]
Status change: