"""Helper functions
Consists of functions to typically be used within templates, but also
available to Controllers. This module is available to both as 'h'.
"""
import random
import hashlib
import StringIO
import urllib
import math
import logging
import re
from datetime import datetime
from pygments.formatters.html import HtmlFormatter
from pygments import highlight as code_highlight
from pylons import url, request, config
from pylons.i18n.translation import _, ungettext
from hashlib import md5
from webhelpers.html import literal, HTML, escape
from webhelpers.html.tools import *
from webhelpers.html.builder import make_tag
from webhelpers.html.tags import auto_discovery_link, checkbox, css_classes, \
end_form, file, form, hidden, image, javascript_link, link_to, \
link_to_if, link_to_unless, ol, required_legend, select, stylesheet_link, \
submit, text, password, textarea, title, ul, xml_declaration, radio
from webhelpers.html.tools import auto_link, button_to, highlight, \
js_obfuscate, mail_to, strip_links, strip_tags, tag_re
from webhelpers.number import format_byte_size, format_bit_size
from webhelpers.pylonslib import Flash as _Flash
from webhelpers.pylonslib.secure_form import secure_form
from webhelpers.text import chop_at, collapse, convert_accented_entities, \
convert_misc_entities, lchop, plural, rchop, remove_formatting, \
replace_whitespace, urlify, truncate, wrap_paragraphs
from webhelpers.date import time_ago_in_words
from webhelpers.paginate import Page
from webhelpers.html.tags import _set_input_attrs, _set_id_attr, \
convert_boolean_attrs, NotGiven, _make_safe_id_component
from rhodecode.lib.annotate import annotate_highlight
from rhodecode.lib.utils import repo_name_slug
from rhodecode.lib.utils2 import str2bool, safe_unicode, safe_str, \
get_changeset_safe, datetime_to_time, time_to_datetime
from rhodecode.lib.markup_renderer import MarkupRenderer
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
from rhodecode.lib.vcs.backends.base import BaseChangeset
from rhodecode.config.conf import DATE_FORMAT, DATETIME_FORMAT
from rhodecode.model.changeset_status import ChangesetStatusModel
from rhodecode.model.db import URL_SEP, Permission
log = logging.getLogger(__name__)
html_escape_table = {
"&": "&",
'"': """,
"'": "'",
">": ">",
"<": "<",
}
def html_escape(text):
"""Produce entities within text."""
return "".join(html_escape_table.get(c,c) for c in text)
def shorter(text, size=20):
postfix = '...'
if len(text) > size:
return text[:size - len(postfix)] + postfix
return text
def _reset(name, value=None, id=NotGiven, type="reset", **attrs):
Reset button
_set_input_attrs(attrs, type, name, value)
_set_id_attr(attrs, id, name)
convert_boolean_attrs(attrs, ["disabled"])
return HTML.input(**attrs)
reset = _reset
safeid = _make_safe_id_component
def FID(raw_id, path):
Creates a uniqe ID for filenode based on it's hash of path and revision
it's safe to use in urls
:param raw_id:
:param path:
return 'C-%s-%s' % (short_id(raw_id), md5(safe_str(path)).hexdigest()[:12])
def get_token():
"""Return the current authentication token, creating one if one doesn't
already exist.
token_key = "_authentication_token"
from pylons import session
if not token_key in session:
try:
token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
except AttributeError: # Python < 2.4
token = hashlib.sha1(str(random.randrange(2 ** 128))).hexdigest()
session[token_key] = token
if hasattr(session, 'save'):
session.save()
return session[token_key]
class _GetError(object):
"""Get error from form_errors, and represent it as span wrapped error
message
:param field_name: field to fetch errors for
:param form_errors: form errors dict
def __call__(self, field_name, form_errors):
tmpl = """<span class="error_msg">%s</span>"""
if form_errors and field_name in form_errors:
return literal(tmpl % form_errors.get(field_name))
get_error = _GetError()
class _ToolTip(object):
def __call__(self, tooltip_title, trim_at=50):
Special function just to wrap our text into nice formatted
autowrapped text
:param tooltip_title:
tooltip_title = escape(tooltip_title)
tooltip_title = tooltip_title.replace('<', '<').replace('>', '>')
return tooltip_title
tooltip = _ToolTip()
class _FilesBreadCrumbs(object):
def __call__(self, repo_name, rev, paths):
if isinstance(paths, str):
paths = safe_unicode(paths)
url_l = [link_to(repo_name, url('files_home',
repo_name=repo_name,
revision=rev, f_path=''),
class_='ypjax-link')]
paths_l = paths.split('/')
for cnt, p in enumerate(paths_l):
if p != '':
url_l.append(link_to(p,
url('files_home',
revision=rev,
f_path='/'.join(paths_l[:cnt + 1])
),
class_='ypjax-link'
)
return literal('/'.join(url_l))
files_breadcrumbs = _FilesBreadCrumbs()
class CodeHtmlFormatter(HtmlFormatter):
My code Html Formatter for source codes
def wrap(self, source, outfile):
return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
def _wrap_code(self, source):
for cnt, it in enumerate(source):
i, t = it
t = '<div id="L%s">%s</div>' % (cnt + 1, t)
yield i, t
def _wrap_tablelinenos(self, inner):
dummyoutfile = StringIO.StringIO()
lncount = 0
for t, line in inner:
if t:
lncount += 1
dummyoutfile.write(line)
fl = self.linenostart
mw = len(str(lncount + fl - 1))
sp = self.linenospecial
st = self.linenostep
la = self.lineanchors
aln = self.anchorlinenos
nocls = self.noclasses
if sp:
lines = []
for i in range(fl, fl + lncount):
if i % st == 0:
if i % sp == 0:
if aln:
lines.append('<a href="#%s%d" class="special">%*d</a>' %
(la, i, mw, i))
else:
lines.append('<span class="special">%*d</span>' % (mw, i))
lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
lines.append('%*d' % (mw, i))
lines.append('')
ls = '\n'.join(lines)
# in case you wonder about the seemingly redundant <div> here: since the
# content in the other cell also is wrapped in a div, some browsers in
# some configurations seem to mess up the formatting...
if nocls:
yield 0, ('<table class="%stable">' % self.cssclass +
'<tr><td><div class="linenodiv" '
'style="background-color: #f0f0f0; padding-right: 10px">'
'<pre style="line-height: 125%">' +
ls + '</pre></div></td><td id="hlcode" class="code">')
'<tr><td class="linenos"><div class="linenodiv"><pre>' +
yield 0, dummyoutfile.getvalue()
yield 0, '</td></tr></table>'
def pygmentize(filenode, **kwargs):
"""pygmentize function using pygments
:param filenode:
return literal(code_highlight(filenode.content,
filenode.lexer, CodeHtmlFormatter(**kwargs)))
def pygmentize_annotation(repo_name, filenode, **kwargs):
pygmentize function for annotation
color_dict = {}
def gen_color(n=10000):
"""generator for getting n of evenly distributed colors using
hsv color and golden ratio. It always return same order of colors
:returns: RGB tuple
def hsv_to_rgb(h, s, v):
if s == 0.0:
return v, v, v
i = int(h * 6.0) # XXX assume int() truncates!
f = (h * 6.0) - i
p = v * (1.0 - s)
q = v * (1.0 - s * f)
t = v * (1.0 - s * (1.0 - f))
i = i % 6
if i == 0:
return v, t, p
if i == 1:
return q, v, p
if i == 2:
return p, v, t
if i == 3:
return p, q, v
if i == 4:
return t, p, v
if i == 5:
return v, p, q
golden_ratio = 0.618033988749895
h = 0.22717784590367374
for _ in xrange(n):
h += golden_ratio
h %= 1
HSV_tuple = [h, 0.95, 0.95]
RGB_tuple = hsv_to_rgb(*HSV_tuple)
yield map(lambda x: str(int(x * 256)), RGB_tuple)
cgenerator = gen_color()
def get_color_string(cs):
if cs in color_dict:
col = color_dict[cs]
col = color_dict[cs] = cgenerator.next()
return "color: rgb(%s)! important;" % (', '.join(col))
def url_func(repo_name):
def _url_func(changeset):
author = changeset.author
date = changeset.date
message = tooltip(changeset.message)
tooltip_html = ("<div style='font-size:0.8em'><b>Author:</b>"
" %s<br/><b>Date:</b> %s</b><br/><b>Message:"
"</b> %s<br/></div>")
tooltip_html = tooltip_html % (author, date, message)
lnk_format = '%5s:%s' % ('r%s' % changeset.revision,
short_id(changeset.raw_id))
uri = link_to(
lnk_format,
url('changeset_home', repo_name=repo_name,
revision=changeset.raw_id),
style=get_color_string(changeset.raw_id),
class_='tooltip',
title=tooltip_html
uri += '\n'
return uri
return _url_func
return literal(annotate_highlight(filenode, url_func(repo_name), **kwargs))
def is_following_repo(repo_name, user_id):
from rhodecode.model.scm import ScmModel
return ScmModel().is_following_repo(repo_name, user_id)
flash = _Flash()
#==============================================================================
# SCM FILTERS available via h.
from rhodecode.lib.vcs.utils import author_name, author_email
from rhodecode.lib.utils2 import credentials_filter, age as _age
from rhodecode.model.db import User, ChangesetStatus
age = lambda x: _age(x)
capitalize = lambda x: x.capitalize()
email = author_email
short_id = lambda x: x[:12]
hide_credentials = lambda x: ''.join(credentials_filter(x))
def fmt_date(date):
if date:
_fmt = _(u"%a, %d %b %Y %H:%M:%S").encode('utf8')
return date.strftime(_fmt).decode('utf8')
return ""
def is_git(repository):
if hasattr(repository, 'alias'):
_type = repository.alias
elif hasattr(repository, 'repo_type'):
_type = repository.repo_type
_type = repository
return _type == 'git'
def is_hg(repository):
return _type == 'hg'
def email_or_none(author):
# extract email from the commit string
_email = email(author)
if _email != '':
# check it against RhodeCode database, and use the MAIN email for this
# user
user = User.get_by_email(_email, case_insensitive=True, cache=True)
if user is not None:
return user.email
return _email
# See if it contains a username we can get an email from
user = User.get_by_username(author_name(author), case_insensitive=True,
cache=True)
# No valid email, not a valid user in the system, none!
return None
def person(author):
def person(author, show_attr="username_and_name"):
# attr to return from fetched user
person_getter = lambda usr: usr.username
person_getter = lambda usr: getattr(usr, show_attr)
# Valid email in the attribute passed, see if they're in the system
return person_getter(user)
# Maybe it's a username?
_author = author_name(author)
user = User.get_by_username(_author, case_insensitive=True,
# Still nothing? Just pass back the author name then
return _author
def person_by_id(id_):
def person_by_id(id_, show_attr="username_and_name"):
#maybe it's an ID ?
if str(id_).isdigit() or isinstance(id_, int):
id_ = int(id_)
user = User.get(id_)
return id_
def desc_stylize(value):
converts tags from value into html equivalent
:param value:
value = re.sub(r'\[see\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
'<div class="metatag" tag="see">see => \\1 </div>', value)
value = re.sub(r'\[license\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
'<div class="metatag" tag="license"><a href="http:\/\/www.opensource.org/licenses/\\1">\\1</a></div>', value)
value = re.sub(r'\[(requires|recommends|conflicts|base)\ \=\>\ *([a-zA-Z\-\/]*)\]',
'<div class="metatag" tag="\\1">\\1 => <a href="/\\2">\\2</a></div>', value)
value = re.sub(r'\[(lang|language)\ \=\>\ *([a-zA-Z\-\/]*)\]',
'<div class="metatag" tag="lang">\\2</div>', value)
value = re.sub(r'\[([a-z]+)\]',
'<div class="metatag" tag="\\1">\\1</div>', value)
return value
def bool2icon(value):
"""Returns True/False values represented as small html image of true/false
icons
:param value: bool value
if value is True:
return HTML.tag('img', src=url("/images/icons/accept.png"),
alt=_('True'))
if value is False:
return HTML.tag('img', src=url("/images/icons/cancel.png"),
alt=_('False'))
def action_parser(user_log, feed=False):
This helper will action_map the specified string action into translated
fancy names with icons and links
:param user_log: user log instance
:param feed: use output for feeds (no html and fancy icons)
action = user_log.action
action_params = ' '
x = action.split(':')
if len(x) > 1:
action, action_params = x
def get_cs_links():
revs_limit = 3 # display this amount always
revs_top_limit = 50 # show upto this amount of changesets hidden
revs_ids = action_params.split(',')
deleted = user_log.repository is None
if deleted:
return ','.join(revs_ids)
repo_name = user_log.repository.repo_name
repo = user_log.repository.scm_instance
def lnk(rev, repo_name):
if isinstance(rev, BaseChangeset):
lbl = 'r%s:%s' % (rev.revision, rev.short_id)
_url = url('changeset_home', repo_name=repo_name,
revision=rev.raw_id)
title = tooltip(rev.message)
lbl = '%s' % rev
_url = '#'
title = _('Changeset not found')
return link_to(lbl, _url, title=title, class_='tooltip',)
revs = []
if len(filter(lambda v: v != '', revs_ids)) > 0:
for rev in revs_ids[:revs_top_limit]:
rev = repo.get_changeset(rev)
revs.append(rev)
except ChangesetDoesNotExistError:
log.error('cannot find revision %s in this repo' % rev)
continue
cs_links = []
cs_links.append(" " + ', '.join(
[lnk(rev, repo_name) for rev in revs[:revs_limit]]
compare_view = (
' <div class="compare_view tooltip" title="%s">'
'<a href="%s">%s</a> </div>' % (
_('Show all combined changesets %s->%s') % (
revs_ids[0], revs_ids[-1]
revision='%s...%s' % (revs_ids[0], revs_ids[-1])
_('compare view')
# if we have exactly one more than normally displayed
# just display it, takes less space than displaying
# "and 1 more revisions"
if len(revs_ids) == revs_limit + 1:
rev = revs[revs_limit]
cs_links.append(", " + lnk(rev, repo_name))
# hidden-by-default ones
if len(revs_ids) > revs_limit + 1:
uniq_id = revs_ids[0]
html_tmpl = (
'<span> %s <a class="show_more" id="_%s" '
'href="#more">%s</a> %s</span>'
if not feed:
cs_links.append(html_tmpl % (
_('and'),
uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
_('revisions')
html_tmpl = '<span id="%s" style="display:none">, %s </span>'
html_tmpl = '<span id="%s"> %s </span>'
morelinks = ', '.join(
[lnk(rev, repo_name) for rev in revs[revs_limit:]]
if len(revs_ids) > revs_top_limit:
morelinks += ', ...'
cs_links.append(html_tmpl % (uniq_id, morelinks))
if len(revs) > 1:
cs_links.append(compare_view)
return ''.join(cs_links)
def get_fork_name():
repo_name = action_params
return _('fork name ') + str(link_to(action_params, url('summary_home',
repo_name=repo_name,)))
def get_user_name():
user_name = action_params
return user_name
def get_users_group():
group_name = action_params
return group_name
def get_pull_request():
pull_request_id = action_params
return link_to(_('Pull request #%s') % pull_request_id,
url('pullrequest_show', repo_name=repo_name,
pull_request_id=pull_request_id))
# action : translated str, callback(extractor), icon
action_map = {
'user_deleted_repo': (_('[deleted] repository'),
None, 'database_delete.png'),
'user_created_repo': (_('[created] repository'),
None, 'database_add.png'),
'user_created_fork': (_('[created] repository as fork'),
None, 'arrow_divide.png'),
'user_forked_repo': (_('[forked] repository'),
get_fork_name, 'arrow_divide.png'),
'user_updated_repo': (_('[updated] repository'),
None, 'database_edit.png'),
'admin_deleted_repo': (_('[delete] repository'),
'admin_created_repo': (_('[created] repository'),
'admin_forked_repo': (_('[forked] repository'),
'admin_updated_repo': (_('[updated] repository'),
'admin_created_user': (_('[created] user'),
get_user_name, 'user_add.png'),
'admin_updated_user': (_('[updated] user'),
get_user_name, 'user_edit.png'),
'admin_created_users_group': (_('[created] users group'),
get_users_group, 'group_add.png'),
'admin_updated_users_group': (_('[updated] users group'),
get_users_group, 'group_edit.png'),
'user_commented_revision': (_('[commented] on revision in repository'),
get_cs_links, 'comment_add.png'),
'user_commented_pull_request': (_('[commented] on pull request for'),
get_pull_request, 'comment_add.png'),
'user_closed_pull_request': (_('[closed] pull request for'),
get_pull_request, 'tick.png'),
'push': (_('[pushed] into'),
get_cs_links, 'script_add.png'),
'push_local': (_('[committed via RhodeCode] into repository'),
get_cs_links, 'script_edit.png'),
'push_remote': (_('[pulled from remote] into repository'),
get_cs_links, 'connect.png'),
'pull': (_('[pulled] from'),
None, 'down_16.png'),
'started_following_repo': (_('[started following] repository'),
None, 'heart_add.png'),
'stopped_following_repo': (_('[stopped following] repository'),
None, 'heart_delete.png'),
action_str = action_map.get(action, action)
if feed:
action = action_str[0].replace('[', '').replace(']', '')
action = action_str[0]\
.replace('[', '<span class="journal_highlight">')\
.replace(']', '</span>')
action_params_func = lambda: ""
if callable(action_str[1]):
action_params_func = action_str[1]
def action_parser_icon():
action_params = None
tmpl = """<img src="%s%s" alt="%s"/>"""
ico = action_map.get(action, ['', '', ''])[2]
return literal(tmpl % ((url('/images/icons/')), ico, action))
# returned callbacks we need to call to get
return [lambda: literal(action), action_params_func, action_parser_icon]
# PERMS
from rhodecode.lib.auth import HasPermissionAny, HasPermissionAll, \
HasRepoPermissionAny, HasRepoPermissionAll
# GRAVATAR URL
def gravatar_url(email_address, size=30):
if (not str2bool(config['app_conf'].get('use_gravatar')) or
not email_address or email_address == 'anonymous@rhodecode.org'):
f = lambda a, l: min(l, key=lambda x: abs(x - a))
return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
ssl_enabled = 'https' == request.environ.get('wsgi.url_scheme')
default = 'identicon'
baseurl_nossl = "http://www.gravatar.com/avatar/"
baseurl_ssl = "https://secure.gravatar.com/avatar/"
baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl
if isinstance(email_address, unicode):
#hashlib crashes on unicode items
email_address = safe_str(email_address)
# construct the url
gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
gravatar_url += urllib.urlencode({'d': default, 's': str(size)})
return gravatar_url
# REPO PAGER, PAGER FOR REPOSITORY
class RepoPage(Page):
def __init__(self, collection, page=1, items_per_page=20,
item_count=None, url=None, **kwargs):
"""Create a "RepoPage" instance. special pager for paging
repository
self._url_generator = url
# Safe the kwargs class-wide so they can be used in the pager() method
self.kwargs = kwargs
# Save a reference to the collection
self.original_collection = collection
self.collection = collection
# The self.page is the number of the current page.
# The first page has the number 1!
self.page = int(page) # make it int() if we get it as a string
except (ValueError, TypeError):
self.page = 1
self.items_per_page = items_per_page
# Unless the user tells us how many items the collections has
# we calculate that ourselves.
if item_count is not None:
self.item_count = item_count
self.item_count = len(self.collection)
# Compute the number of the first and last available page
if self.item_count > 0:
self.first_page = 1
self.page_count = int(math.ceil(float(self.item_count) /
self.items_per_page))
self.last_page = self.first_page + self.page_count - 1
# Make sure that the requested page number is the range of
# valid pages
if self.page > self.last_page:
self.page = self.last_page
elif self.page < self.first_page:
self.page = self.first_page
# Note: the number of items on this page can be less than
# items_per_page if the last page is not full
self.first_item = max(0, (self.item_count) - (self.page *
items_per_page))
self.last_item = ((self.item_count - 1) - items_per_page *
(self.page - 1))
self.items = list(self.collection[self.first_item:self.last_item + 1])
# Links to previous and next page
if self.page > self.first_page:
self.previous_page = self.page - 1
self.previous_page = None
if self.page < self.last_page:
self.next_page = self.page + 1
self.next_page = None
# No items available
self.first_page = None
self.page_count = 0
self.last_page = None
self.first_item = None
self.last_item = None
self.items = []
# This is a subclass of the 'list' type. Initialise the list now.
list.__init__(self, reversed(self.items))
def changed_tooltip(nodes):
Generates a html string for changed nodes in changeset page.
It limits the output to 30 entries
:param nodes: LazyNodesGenerator
if nodes:
pref = ': <br/> '
suf = ''
if len(nodes) > 30:
suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
return literal(pref + '<br/> '.join([safe_unicode(x.path)
for x in nodes[:30]]) + suf)
return ': ' + _('No Files')
def repo_link(groups_and_repos):
Makes a breadcrumbs link to repo within a group
joins » on each group to create a fancy link
ex::
group >> subgroup >> repo
:param groups_and_repos:
groups, repo_name = groups_and_repos
if not groups:
return repo_name
def make_link(group):
return link_to(group.name, url('repos_group_home',
group_name=group.group_name))
return literal(' » '.join(map(make_link, groups)) + \
" » " + repo_name)
def fancy_file_stats(stats):
Displays a fancy two colored bar for number of added/deleted
lines of code on file
:param stats: two element list of added/deleted lines of code
a, d, t = stats[0], stats[1], stats[0] + stats[1]
width = 100
unit = float(width) / (t or 1)
# needs > 9% of width to be visible or 0 to be hidden
a_p = max(9, unit * a) if a > 0 else 0
d_p = max(9, unit * d) if d > 0 else 0
p_sum = a_p + d_p
if p_sum > width:
#adjust the percentage to be == 100% since we adjusted to 9
if a_p > d_p:
a_p = a_p - (p_sum - width)
d_p = d_p - (p_sum - width)
a_v = a if a > 0 else ''
d_v = d if d > 0 else ''
def cgen(l_type):
mapping = {'tr': 'top-right-rounded-corner-mid',
'tl': 'top-left-rounded-corner-mid',
'br': 'bottom-right-rounded-corner-mid',
'bl': 'bottom-left-rounded-corner-mid'}
map_getter = lambda x: mapping[x]
if l_type == 'a' and d_v:
#case when added and deleted are present
return ' '.join(map(map_getter, ['tl', 'bl']))
if l_type == 'a' and not d_v:
return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
if l_type == 'd' and a_v:
return ' '.join(map(map_getter, ['tr', 'br']))
if l_type == 'd' and not a_v:
d_a = '<div class="added %s" style="width:%s%%">%s</div>' % (
cgen('a'), a_p, a_v
d_d = '<div class="deleted %s" style="width:%s%%">%s</div>' % (
cgen('d'), d_p, d_v
return literal('<div style="width:%spx">%s%s</div>' % (width, d_a, d_d))
def urlify_text(text_):
url_pat = re.compile(r'''(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]'''
'''|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)''')
def url_func(match_obj):
url_full = match_obj.groups()[0]
return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
return literal(url_pat.sub(url_func, text_))
def urlify_changesets(text_, repository):
Extract revision ids from changeset and make link from them
:param text_:
:param repository:
URL_PAT = re.compile(r'([0-9a-fA-F]{12,})')
rev = match_obj.groups()[0]
pref = ''
if match_obj.group().startswith(' '):
pref = ' '
tmpl = (
'%(pref)s<a class="%(cls)s" href="%(url)s">'
'%(rev)s'
'</a>'
return tmpl % {
'pref': pref,
'cls': 'revision-link',
'url': url('changeset_home', repo_name=repository, revision=rev),
'rev': rev,
newtext = URL_PAT.sub(url_func, text_)
return newtext
def urlify_commit(text_, repository=None, link_=None):
Parses given text message and makes proper links.
issues are linked to given issue-server, and rest is a changeset link
if link_ is given, in other case it's a plain text
:param link_: changeset link
import traceback
def escaper(string):
return string.replace('<', '<').replace('>', '>')
def linkify_others(t, l):
urls = re.compile(r'(\<a.*?\<\/a\>)',)
links = []
for e in urls.split(t):
if not urls.match(e):
links.append('<a class="message-link" href="%s">%s</a>' % (l, e))
links.append(e)
return ''.join(links)
# urlify changesets - extrac revisions and make link out of them
text_ = urlify_changesets(escaper(text_), repository)
conf = config['app_conf']
URL_PAT = re.compile(r'%s' % conf.get('issue_pat'))
if URL_PAT:
ISSUE_SERVER_LNK = conf.get('issue_server_link')
ISSUE_PREFIX = conf.get('issue_prefix')
issue_id = ''.join(match_obj.groups())
'%(issue-prefix)s%(id-repr)s'
url = ISSUE_SERVER_LNK.replace('{id}', issue_id)
if repository:
url = url.replace('{repo}', repository)
repo_name = repository.split(URL_SEP)[-1]
url = url.replace('{repo_name}', repo_name)
'cls': 'issue-tracker-link',
'url': url,
'id-repr': issue_id,
'issue-prefix': ISSUE_PREFIX,
'serv': ISSUE_SERVER_LNK,
if link_:
# wrap not links into final link => link_
newtext = linkify_others(newtext, link_)
return literal(newtext)
except:
log.error(traceback.format_exc())
pass
return text_
def rst(source):
return literal('<div class="rst-block">%s</div>' %
MarkupRenderer.rst(source))
def rst_w_mentions(source):
Wrapped rst renderer with @mention highlighting
:param source:
MarkupRenderer.rst_with_mentions(source))
def changeset_status(repo, revision):
return ChangesetStatusModel().get_status(repo, revision)
def changeset_status_lbl(changeset_status):
return dict(ChangesetStatus.STATUSES).get(changeset_status)
def get_permission_name(key):
return dict(Permission.PERMS).get(key)
# -*- coding: utf-8 -*-
rhodecode.model.db
~~~~~~~~~~~~~~~~~~
Database Models for RhodeCode
:created_on: Apr 08, 2010
:author: marcink
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import datetime
import time
from collections import defaultdict
from sqlalchemy import *
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, joinedload, class_mapper, validates
from sqlalchemy.exc import DatabaseError
from beaker.cache import cache_region, region_invalidate
from webob.exc import HTTPNotFound
from pylons.i18n.translation import lazy_ugettext as _
from rhodecode.lib.vcs import get_backend
from rhodecode.lib.vcs.utils.helpers import get_scm
from rhodecode.lib.vcs.exceptions import VCSError
from rhodecode.lib.vcs.utils.lazy import LazyProperty
from rhodecode.lib.utils2 import str2bool, safe_str, get_changeset_safe, \
safe_unicode
from rhodecode.lib.compat import json
from rhodecode.lib.caching_query import FromCache
from rhodecode.model.meta import Base, Session
URL_SEP = '/'
# BASE CLASSES
_hash_key = lambda k: hashlib.md5(safe_str(k)).hexdigest()
class BaseModel(object):
Base Model for all classess
@classmethod
def _get_keys(cls):
"""return column names for this model """
return class_mapper(cls).c.keys()
def get_dict(self):
return dict with keys and values corresponding
to this model data """
d = {}
for k in self._get_keys():
d[k] = getattr(self, k)
# also use __json__() if present to get additional fields
_json_attr = getattr(self, '__json__', None)
if _json_attr:
# update with attributes from __json__
if callable(_json_attr):
_json_attr = _json_attr()
for k, val in _json_attr.iteritems():
d[k] = val
return d
def get_appstruct(self):
"""return list with keys and values tupples corresponding
l = []
l.append((k, getattr(self, k),))
return l
def populate_obj(self, populate_dict):
"""populate model with data from given populate_dict"""
if k in populate_dict:
setattr(self, k, populate_dict[k])
def query(cls):
return Session().query(cls)
def get(cls, id_):
if id_:
return cls.query().get(id_)
def get_or_404(cls, id_):
res = cls.query().get(id_)
if not res:
raise HTTPNotFound
return res
def getAll(cls):
return cls.query().all()
def delete(cls, id_):
obj = cls.query().get(id_)
Session().delete(obj)
def __repr__(self):
if hasattr(self, '__unicode__'):
# python repr needs to return str
return safe_str(self.__unicode__())
return '<DB:%s>' % (self.__class__.__name__)
class RhodeCodeSetting(Base, BaseModel):
__tablename__ = 'rhodecode_settings'
__table_args__ = (
UniqueConstraint('app_settings_name'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'}
app_settings_id = Column("app_settings_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
app_settings_name = Column("app_settings_name", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
_app_settings_value = Column("app_settings_value", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
def __init__(self, k='', v=''):
self.app_settings_name = k
self.app_settings_value = v
@validates('_app_settings_value')
def validate_settings_value(self, key, val):
assert type(val) == unicode
return val
@hybrid_property
def app_settings_value(self):
v = self._app_settings_value
if self.app_settings_name == 'ldap_active':
v = str2bool(v)
return v
@app_settings_value.setter
def app_settings_value(self, val):
Setter that will always make sure we use unicode in app_settings_value
:param val:
self._app_settings_value = safe_unicode(val)
def __unicode__(self):
return u"<%s('%s:%s')>" % (
self.__class__.__name__,
self.app_settings_name, self.app_settings_value
def get_by_name(cls, key):
return cls.query()\
.filter(cls.app_settings_name == key).scalar()
def get_by_name_or_create(cls, key):
res = cls.get_by_name(key)
res = cls(key)
def get_app_settings(cls, cache=False):
ret = cls.query()
if cache:
ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
if not ret:
raise Exception('Could not get application settings !')
settings = {}
for each in ret:
settings['rhodecode_' + each.app_settings_name] = \
each.app_settings_value
return settings
def get_ldap_settings(cls, cache=False):
ret = cls.query()\
.filter(cls.app_settings_name.startswith('ldap_')).all()
fd = {}
for row in ret:
fd.update({row.app_settings_name: row.app_settings_value})
return fd
class RhodeCodeUi(Base, BaseModel):
__tablename__ = 'rhodecode_ui'
UniqueConstraint('ui_key'),
HOOK_UPDATE = 'changegroup.update'
HOOK_REPO_SIZE = 'changegroup.repo_size'
HOOK_PUSH = 'changegroup.push_logger'
HOOK_PRE_PUSH = 'prechangegroup.pre_push'
HOOK_PULL = 'outgoing.pull_logger'
HOOK_PRE_PULL = 'preoutgoing.pre_pull'
ui_id = Column("ui_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
ui_section = Column("ui_section", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_key = Column("ui_key", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_value = Column("ui_value", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_active = Column("ui_active", Boolean(), nullable=True, unique=None, default=True)
def get_by_key(cls, key):
return cls.query().filter(cls.ui_key == key).scalar()
def get_builtin_hooks(cls):
q = cls.query()
q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
cls.HOOK_PUSH, cls.HOOK_PRE_PUSH,
cls.HOOK_PULL, cls.HOOK_PRE_PULL]))
return q.all()
def get_custom_hooks(cls):
q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
q = q.filter(cls.ui_section == 'hooks')
def get_repos_location(cls):
return cls.get_by_key('/').ui_value
def create_or_update_hook(cls, key, val):
new_ui = cls.get_by_key(key) or cls()
new_ui.ui_section = 'hooks'
new_ui.ui_active = True
new_ui.ui_key = key
new_ui.ui_value = val
Session().add(new_ui)
class User(Base, BaseModel):
__tablename__ = 'users'
UniqueConstraint('username'), UniqueConstraint('email'),
Index('u_username_idx', 'username'),
Index('u_email_idx', 'email'),
DEFAULT_USER = 'default'
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=True)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("firstname", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
_email = Column("email", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
ldap_dn = Column("ldap_dn", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
inherit_default_permissions = Column("inherit_default_permissions", Boolean(), nullable=False, unique=None, default=True)
user_log = relationship('UserLog', cascade='all')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UsersGroupMember', cascade='all')
notifications = relationship('UserNotification', cascade='all')
# notifications assigned to this user
user_created_notifications = relationship('Notification', cascade='all')
# comments created by this user
user_comments = relationship('ChangesetComment', cascade='all')
#extra emails for this user
user_emails = relationship('UserEmailMap', cascade='all')
def email(self):
return self._email
@email.setter
def email(self, val):
self._email = val.lower() if val else None
@property
def firstname(self):
# alias for future
return self.name
def emails(self):
other = UserEmailMap.query().filter(UserEmailMap.user==self).all()
return [self.email] + [x.email for x in other]
def username_and_name(self):
return '%s (%s %s)' % (self.username, self.firstname, self.lastname)
def full_name(self):
return '%s %s' % (self.name, self.lastname)
return '%s %s' % (self.firstname, self.lastname)
def full_name_or_username(self):
return ('%s %s' % (self.name, self.lastname)
if (self.name and self.lastname) else self.username)
return ('%s %s' % (self.firstname, self.lastname)
if (self.firstname and self.lastname) else self.username)
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
def short_contact(self):
def is_admin(self):
return self.admin
return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
def get_by_username(cls, username, case_insensitive=False, cache=False):
if case_insensitive:
q = cls.query().filter(cls.username.ilike(username))
q = cls.query().filter(cls.username == username)
q = q.options(FromCache(
"sql_cache_short",
"get_user_%s" % _hash_key(username)
return q.scalar()
def get_by_api_key(cls, api_key, cache=False):
q = cls.query().filter(cls.api_key == api_key)
q = q.options(FromCache("sql_cache_short",
"get_api_key_%s" % api_key))
def get_by_email(cls, email, case_insensitive=False, cache=False):
q = cls.query().filter(cls.email.ilike(email))
q = cls.query().filter(cls.email == email)
"get_email_key_%s" % email))
ret = q.scalar()
if ret is None:
q = UserEmailMap.query()
# try fetching in alternate email map
q = q.filter(UserEmailMap.email.ilike(email))
q = q.filter(UserEmailMap.email == email)
q = q.options(joinedload(UserEmailMap.user))
"get_email_map_key_%s" % email))
ret = getattr(q.scalar(), 'user', None)
return ret
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
Session().add(self)
log.debug('updated user %s lastlogin' % self.username)
def get_api_data(self):
Common function for generating user related data for API
user = self
data = dict(
user_id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
emails=user.emails,
api_key=user.api_key,
active=user.active,
admin=user.admin,
ldap_dn=user.ldap_dn,
last_login=user.last_login,
return data
def __json__(self):
full_name=self.full_name,
full_name_or_username=self.full_name_or_username,
short_contact=self.short_contact,
full_contact=self.full_contact
data.update(self.get_api_data())
class UserEmailMap(Base, BaseModel):
__tablename__ = 'user_email_map'
Index('uem_email_idx', 'email'),
UniqueConstraint('email'),
__mapper_args__ = {}
email_id = Column("email_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
_email = Column("email", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
user = relationship('User', lazy='joined')
@validates('_email')
def validate_email(self, key, email):
# check if this email is not main one
main_email = Session().query(User).filter(User.email == email).scalar()
if main_email is not None:
raise AttributeError('email %s is present is user table' % email)
return email
class UserLog(Base, BaseModel):
__tablename__ = 'user_logs'
'mysql_charset': 'utf8'},
user_log_id = Column("user_log_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True)
repository_name = Column("repository_name", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_ip = Column("user_ip", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action = Column("action", UnicodeText(1200000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action_date = Column("action_date", DateTime(timezone=False), nullable=True, unique=None, default=None)
def action_as_day(self):
return datetime.date(*self.action_date.timetuple()[:3])
user = relationship('User')
repository = relationship('Repository', cascade='')
class UsersGroup(Base, BaseModel):
__tablename__ = 'users_groups'
users_group_id = Column("users_group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_name = Column("users_group_name", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
users_group_active = Column("users_group_active", Boolean(), nullable=True, unique=None, default=None)
inherit_default_permissions = Column("users_group_inherit_default_permissions", Boolean(), nullable=False, unique=None, default=True)
members = relationship('UsersGroupMember', cascade="all, delete, delete-orphan", lazy="joined")
users_group_to_perm = relationship('UsersGroupToPerm', cascade='all')
users_group_repo_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
return u'<userGroup(%s)>' % (self.users_group_name)
def get_by_group_name(cls, group_name, cache=False,
case_insensitive=False):
q = cls.query().filter(cls.users_group_name.ilike(group_name))
q = cls.query().filter(cls.users_group_name == group_name)
"get_user_%s" % _hash_key(group_name)
def get(cls, users_group_id, cache=False):
users_group = cls.query()
users_group = users_group.options(FromCache("sql_cache_short",
"get_users_group_%s" % users_group_id))
return users_group.get(users_group_id)
users_group = self
users_group_id=users_group.users_group_id,
group_name=users_group.users_group_name,
active=users_group.users_group_active,
class UsersGroupMember(Base, BaseModel):
__tablename__ = 'users_groups_members'
users_group_member_id = Column("users_group_member_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
users_group = relationship('UsersGroup')
def __init__(self, gr_id='', u_id=''):
self.users_group_id = gr_id
self.user_id = u_id
class Repository(Base, BaseModel):
__tablename__ = 'repositories'
UniqueConstraint('repo_name'),
Index('r_repo_name_idx', 'repo_name'),
repo_id = Column("repo_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repo_name = Column("repo_name", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
clone_uri = Column("clone_uri", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
repo_type = Column("repo_type", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
private = Column("private", Boolean(), nullable=True, unique=None, default=None)
enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
landing_rev = Column("landing_revision", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
enable_locking = Column("enable_locking", Boolean(), nullable=False, unique=None, default=False)
_locked = Column("locked", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
fork = relationship('Repository', remote_side=repo_id)
group = relationship('RepoGroup')
repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
followers = relationship('UserFollowing',
primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id',
cascade='all')
logs = relationship('UserLog')
comments = relationship('ChangesetComment', cascade="all, delete, delete-orphan")
pull_requests_org = relationship('PullRequest',
primaryjoin='PullRequest.org_repo_id==Repository.repo_id',
cascade="all, delete, delete-orphan")
pull_requests_other = relationship('PullRequest',
primaryjoin='PullRequest.other_repo_id==Repository.repo_id',
return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
self.repo_name)
def locked(self):
# always should return [user_id, timelocked]
if self._locked:
_lock_info = self._locked.split(':')
return int(_lock_info[0]), _lock_info[1]
return [None, None]
@locked.setter
def locked(self, val):
if val and isinstance(val, (list, tuple)):
self._locked = ':'.join(map(str, val))
self._locked = None
def url_sep(cls):
return URL_SEP
def get_by_repo_name(cls, repo_name):
q = Session().query(cls).filter(cls.repo_name == repo_name)
q = q.options(joinedload(Repository.fork))\
.options(joinedload(Repository.user))\
.options(joinedload(Repository.group))
def get_by_full_path(cls, repo_full_path):
repo_name = repo_full_path.split(cls.base_path(), 1)[-1]
return cls.get_by_repo_name(repo_name.strip(URL_SEP))
def get_repo_forks(cls, repo_id):
return cls.query().filter(Repository.fork_id == repo_id)
def base_path(cls):
Returns base path when all repos are stored
:param cls:
q = Session().query(RhodeCodeUi)\
.filter(RhodeCodeUi.ui_key == cls.url_sep())
q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
def forks(self):
Return forks of this repo
return Repository.get_repo_forks(self.repo_id)
def parent(self):
Returns fork parent
return self.fork
def just_name(self):
return self.repo_name.split(Repository.url_sep())[-1]
def groups_with_parents(self):
groups = []
if self.group is None:
return groups
cur_gr = self.group
groups.insert(0, cur_gr)
while 1:
gr = getattr(cur_gr, 'parent_group', None)
cur_gr = cur_gr.parent_group
if gr is None:
break
groups.insert(0, gr)
def groups_and_repo(self):
return self.groups_with_parents, self.just_name
@LazyProperty
def repo_path(self):
Returns base full path for that repository means where it actually
exists on a filesystem
q = Session().query(RhodeCodeUi).filter(RhodeCodeUi.ui_key ==
Repository.url_sep())
def repo_full_path(self):
p = [self.repo_path]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split(Repository.url_sep())
return os.path.join(*p)
def get_new_name(self, repo_name):
returns new full repository name based on assigned group and new new
:param group_name:
path_prefix = self.group.full_path_splitted if self.group else []
return Repository.url_sep().join(path_prefix + [repo_name])
def _ui(self):
Creates an db based ui object for this repository
from mercurial import ui
from mercurial import config
baseui = ui.ui()
#clean the baseui object
baseui._ocfg = config.config()
baseui._ucfg = config.config()
baseui._tcfg = config.config()
ret = RhodeCodeUi.query()\
.options(FromCache("sql_cache_short", "repository_repo_ui")).all()
hg_ui = ret
for ui_ in hg_ui:
if ui_.ui_active:
log.debug('settings ui from db[%s]%s:%s', ui_.ui_section,
ui_.ui_key, ui_.ui_value)
baseui.setconfig(ui_.ui_section, ui_.ui_key, ui_.ui_value)
if ui_.ui_key == 'push_ssl':
# force set push_ssl requirement to False, rhodecode
# handles that
baseui.setconfig(ui_.ui_section, ui_.ui_key, False)
return baseui
def inject_ui(cls, repo, extras={}):
from rhodecode.lib.vcs.backends.hg import MercurialRepository
from rhodecode.lib.vcs.backends.git import GitRepository
required = (MercurialRepository, GitRepository)
if not isinstance(repo, required):
raise Exception('repo must be instance of %s' % required)
# inject ui extra param to log this action via push logger
for k, v in extras.items():
repo._repo.ui.setconfig('rhodecode_extras', k, v)
def is_valid(cls, repo_name):
returns True if given repo name is a valid filesystem repository
:param repo_name:
from rhodecode.lib.utils import is_valid_repo
return is_valid_repo(repo_name, cls.base_path())
Common function for generating repo api data
repo = self
repo_id=repo.repo_id,
repo_name=repo.repo_name,
repo_type=repo.repo_type,
clone_uri=repo.clone_uri,
private=repo.private,
created_on=repo.created_on,
description=repo.description,
landing_rev=repo.landing_rev,
owner=repo.user.username,
fork_of=repo.fork.repo_name if repo.fork else None
def lock(cls, repo, user_id):
repo.locked = [user_id, time.time()]
Session().add(repo)
Session().commit()
def unlock(cls, repo):
repo.locked = None
#==========================================================================
# SCM PROPERTIES
def get_changeset(self, rev=None):
return get_changeset_safe(self.scm_instance, rev)
def get_landing_changeset(self):
Returns landing changeset, or if that doesn't exist returns the tip
cs = self.get_changeset(self.landing_rev) or self.get_changeset()
return cs
def tip(self):
return self.get_changeset('tip')
def author(self):
return self.tip.author
def last_change(self):
return self.scm_instance.last_change
def get_comments(self, revisions=None):
Returns comments for this repository grouped by revisions
:param revisions: filter query by revisions only
cmts = ChangesetComment.query()\
.filter(ChangesetComment.repo == self)
if revisions:
cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
grouped = defaultdict(list)
for cmt in cmts.all():
grouped[cmt.revision].append(cmt)
return grouped
def statuses(self, revisions=None):
Returns statuses for this repository
:param revisions: list of revisions to get statuses for
:type revisions: list
statuses = ChangesetStatus.query()\
.filter(ChangesetStatus.repo == self)\
.filter(ChangesetStatus.version == 0)
statuses = statuses.filter(ChangesetStatus.revision.in_(revisions))
grouped = {}
#maybe we have open new pullrequest without a status ?
stat = ChangesetStatus.STATUS_UNDER_REVIEW
status_lbl = ChangesetStatus.get_status_lbl(stat)
for pr in PullRequest.query().filter(PullRequest.org_repo == self).all():
for rev in pr.revisions:
pr_id = pr.pull_request_id
pr_repo = pr.other_repo.repo_name
grouped[rev] = [stat, status_lbl, pr_id, pr_repo]
for stat in statuses.all():
pr_id = pr_repo = None
if stat.pull_request:
pr_id = stat.pull_request.pull_request_id
pr_repo = stat.pull_request.other_repo.repo_name
grouped[stat.revision] = [str(stat.status), stat.status_lbl,
pr_id, pr_repo]
# SCM CACHE INSTANCE
def invalidate(self):
return CacheInvalidation.invalidate(self.repo_name)
def set_invalidate(self):
set a cache for invalidation for this instance
CacheInvalidation.set_invalidate(self.repo_name)
def scm_instance(self):
return self.__get_instance()
def scm_instance_cached(self, cache_map=None):
@cache_region('long_term')
def _c(repo_name):
rn = self.repo_name
log.debug('Getting cached instance of repo')
if cache_map:
# get using prefilled cache_map
invalidate_repo = cache_map[self.repo_name]
if invalidate_repo:
invalidate_repo = (None if invalidate_repo.cache_active
else invalidate_repo)
# get from invalidate
invalidate_repo = self.invalidate
if invalidate_repo is not None:
region_invalidate(_c, None, rn)
# update our cache
CacheInvalidation.set_valid(invalidate_repo.cache_key)
return _c(rn)
def __get_instance(self):
repo_full_path = self.repo_full_path
alias = get_scm(repo_full_path)[0]
log.debug('Creating instance of %s repository' % alias)
backend = get_backend(alias)
except VCSError:
log.error('Perhaps this repository is in db and not in '
'filesystem run rescan repositories with '
'"destroy old data " option from admin panel')
return
if alias == 'hg':
repo = backend(safe_str(repo_full_path), create=False,
baseui=self._ui)
# skip hidden web repository
if repo._get_hidden():
repo = backend(repo_full_path, create=False)
return repo
class RepoGroup(Base, BaseModel):
__tablename__ = 'groups'
UniqueConstraint('group_name', 'group_parent_id'),
CheckConstraint('group_id != group_parent_id'),
__mapper_args__ = {'order_by': 'group_name'}
group_id = Column("group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
group_name = Column("group_name", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
group_parent_id = Column("group_parent_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=None, default=None)
group_description = Column("group_description", String(10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
repo_group_to_perm = relationship('UserRepoGroupToPerm', cascade='all', order_by='UserRepoGroupToPerm.group_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoGroupToPerm', cascade='all')
parent_group = relationship('RepoGroup', remote_side=group_id)
def __init__(self, group_name='', parent_group=None):
self.group_name = group_name
self.parent_group = parent_group
return u"<%s('%s:%s')>" % (self.__class__.__name__, self.group_id,
self.group_name)
def groups_choices(cls):
from webhelpers.html import literal as _literal
repo_groups = [('', '')]
sep = ' » '
_name = lambda k: _literal(sep.join(k))
repo_groups.extend([(x.group_id, _name(x.full_path_splitted))
for x in cls.query().all()])
repo_groups = sorted(repo_groups, key=lambda t: t[1].split(sep)[0])
return repo_groups
def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
gr = cls.query()\
.filter(cls.group_name.ilike(group_name))
.filter(cls.group_name == group_name)
gr = gr.options(FromCache(
"get_group_%s" % _hash_key(group_name)
return gr.scalar()
def parents(self):
parents_recursion_limit = 5
if self.parent_group is None:
cur_gr = self.parent_group
cnt = 0
cnt += 1
if cnt == parents_recursion_limit:
# this will prevent accidental infinit loops
log.error('group nested more than %s' %
parents_recursion_limit)
def children(self):
return RepoGroup.query().filter(RepoGroup.parent_group == self)
def name(self):
return self.group_name.split(RepoGroup.url_sep())[-1]
def full_path(self):
return self.group_name
def full_path_splitted(self):
return self.group_name.split(RepoGroup.url_sep())
def repositories(self):
return Repository.query()\
.filter(Repository.group == self)\
.order_by(Repository.repo_name)
def repositories_recursive_count(self):
cnt = self.repositories.count()
def children_count(group):
for child in group.children:
cnt += child.repositories.count()
cnt += children_count(child)
return cnt
return cnt + children_count(self)
def get_new_name(self, group_name):
returns new full group name based on parent and new name
path_prefix = (self.parent_group.full_path_splitted if
self.parent_group else [])
return RepoGroup.url_sep().join(path_prefix + [group_name])
class Permission(Base, BaseModel):
__tablename__ = 'permissions'
Index('p_perm_name_idx', 'permission_name'),
PERMS = [
('repository.none', _('Repository no access')),
('repository.read', _('Repository read access')),
('repository.write', _('Repository write access')),
('repository.admin', _('Repository admin access')),
('group.none', _('Repositories Group no access')),
('group.read', _('Repositories Group read access')),
('group.write', _('Repositories Group write access')),
Status change: