@@ -35,794 +35,794 @@ from webhelpers.text import chop_at, col
replace_whitespace, urlify, truncate, wrap_paragraphs
from webhelpers.date import time_ago_in_words
from webhelpers.paginate import Page
from webhelpers.html.tags import _set_input_attrs, _set_id_attr, \
convert_boolean_attrs, NotGiven, _make_safe_id_component
from rhodecode.lib.annotate import annotate_highlight
from rhodecode.lib.utils import repo_name_slug
from rhodecode.lib.utils2 import str2bool, safe_unicode, safe_str, \
get_changeset_safe, datetime_to_time, time_to_datetime
from rhodecode.lib.markup_renderer import MarkupRenderer
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
from rhodecode.lib.vcs.backends.base import BaseChangeset
from rhodecode.config.conf import DATE_FORMAT, DATETIME_FORMAT
from rhodecode.model.changeset_status import ChangesetStatusModel
from rhodecode.model.db import URL_SEP, Permission
log = logging.getLogger(__name__)
html_escape_table = {
"&": "&",
'"': """,
"'": "'",
">": ">",
"<": "<",
}
def html_escape(text):
"""Produce entities within text."""
return "".join(html_escape_table.get(c,c) for c in text)
def shorter(text, size=20):
postfix = '...'
if len(text) > size:
return text[:size - len(postfix)] + postfix
return text
def _reset(name, value=None, id=NotGiven, type="reset", **attrs):
"""
Reset button
_set_input_attrs(attrs, type, name, value)
_set_id_attr(attrs, id, name)
convert_boolean_attrs(attrs, ["disabled"])
return HTML.input(**attrs)
reset = _reset
safeid = _make_safe_id_component
def FID(raw_id, path):
Creates a uniqe ID for filenode based on it's hash of path and revision
it's safe to use in urls
:param raw_id:
:param path:
return 'C-%s-%s' % (short_id(raw_id), md5(safe_str(path)).hexdigest()[:12])
def get_token():
"""Return the current authentication token, creating one if one doesn't
already exist.
token_key = "_authentication_token"
from pylons import session
if not token_key in session:
try:
token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
except AttributeError: # Python < 2.4
token = hashlib.sha1(str(random.randrange(2 ** 128))).hexdigest()
session[token_key] = token
if hasattr(session, 'save'):
session.save()
return session[token_key]
class _GetError(object):
"""Get error from form_errors, and represent it as span wrapped error
message
:param field_name: field to fetch errors for
:param form_errors: form errors dict
def __call__(self, field_name, form_errors):
tmpl = """<span class="error_msg">%s</span>"""
if form_errors and field_name in form_errors:
return literal(tmpl % form_errors.get(field_name))
get_error = _GetError()
class _ToolTip(object):
def __call__(self, tooltip_title, trim_at=50):
Special function just to wrap our text into nice formatted
autowrapped text
:param tooltip_title:
tooltip_title = escape(tooltip_title)
tooltip_title = tooltip_title.replace('<', '<').replace('>', '>')
return tooltip_title
tooltip = _ToolTip()
class _FilesBreadCrumbs(object):
def __call__(self, repo_name, rev, paths):
if isinstance(paths, str):
paths = safe_unicode(paths)
url_l = [link_to(repo_name, url('files_home',
repo_name=repo_name,
revision=rev, f_path=''),
class_='ypjax-link')]
paths_l = paths.split('/')
for cnt, p in enumerate(paths_l):
if p != '':
url_l.append(link_to(p,
url('files_home',
revision=rev,
f_path='/'.join(paths_l[:cnt + 1])
),
class_='ypjax-link'
)
return literal('/'.join(url_l))
files_breadcrumbs = _FilesBreadCrumbs()
class CodeHtmlFormatter(HtmlFormatter):
My code Html Formatter for source codes
def wrap(self, source, outfile):
return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
def _wrap_code(self, source):
for cnt, it in enumerate(source):
i, t = it
t = '<div id="L%s">%s</div>' % (cnt + 1, t)
yield i, t
def _wrap_tablelinenos(self, inner):
dummyoutfile = StringIO.StringIO()
lncount = 0
for t, line in inner:
if t:
lncount += 1
dummyoutfile.write(line)
fl = self.linenostart
mw = len(str(lncount + fl - 1))
sp = self.linenospecial
st = self.linenostep
la = self.lineanchors
aln = self.anchorlinenos
nocls = self.noclasses
if sp:
lines = []
for i in range(fl, fl + lncount):
if i % st == 0:
if i % sp == 0:
if aln:
lines.append('<a href="#%s%d" class="special">%*d</a>' %
(la, i, mw, i))
else:
lines.append('<span class="special">%*d</span>' % (mw, i))
lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
lines.append('%*d' % (mw, i))
lines.append('')
ls = '\n'.join(lines)
# in case you wonder about the seemingly redundant <div> here: since the
# content in the other cell also is wrapped in a div, some browsers in
# some configurations seem to mess up the formatting...
if nocls:
yield 0, ('<table class="%stable">' % self.cssclass +
'<tr><td><div class="linenodiv" '
'style="background-color: #f0f0f0; padding-right: 10px">'
'<pre style="line-height: 125%">' +
ls + '</pre></div></td><td id="hlcode" class="code">')
'<tr><td class="linenos"><div class="linenodiv"><pre>' +
yield 0, dummyoutfile.getvalue()
yield 0, '</td></tr></table>'
def pygmentize(filenode, **kwargs):
"""pygmentize function using pygments
:param filenode:
return literal(code_highlight(filenode.content,
filenode.lexer, CodeHtmlFormatter(**kwargs)))
def pygmentize_annotation(repo_name, filenode, **kwargs):
pygmentize function for annotation
color_dict = {}
def gen_color(n=10000):
"""generator for getting n of evenly distributed colors using
hsv color and golden ratio. It always return same order of colors
:returns: RGB tuple
def hsv_to_rgb(h, s, v):
if s == 0.0:
return v, v, v
i = int(h * 6.0) # XXX assume int() truncates!
f = (h * 6.0) - i
p = v * (1.0 - s)
q = v * (1.0 - s * f)
t = v * (1.0 - s * (1.0 - f))
i = i % 6
if i == 0:
return v, t, p
if i == 1:
return q, v, p
if i == 2:
return p, v, t
if i == 3:
return p, q, v
if i == 4:
return t, p, v
if i == 5:
return v, p, q
golden_ratio = 0.618033988749895
h = 0.22717784590367374
for _ in xrange(n):
h += golden_ratio
h %= 1
HSV_tuple = [h, 0.95, 0.95]
RGB_tuple = hsv_to_rgb(*HSV_tuple)
yield map(lambda x: str(int(x * 256)), RGB_tuple)
cgenerator = gen_color()
def get_color_string(cs):
if cs in color_dict:
col = color_dict[cs]
col = color_dict[cs] = cgenerator.next()
return "color: rgb(%s)! important;" % (', '.join(col))
def url_func(repo_name):
def _url_func(changeset):
author = changeset.author
date = changeset.date
message = tooltip(changeset.message)
tooltip_html = ("<div style='font-size:0.8em'><b>Author:</b>"
" %s<br/><b>Date:</b> %s</b><br/><b>Message:"
"</b> %s<br/></div>")
tooltip_html = tooltip_html % (author, date, message)
lnk_format = '%5s:%s' % ('r%s' % changeset.revision,
short_id(changeset.raw_id))
uri = link_to(
lnk_format,
url('changeset_home', repo_name=repo_name,
revision=changeset.raw_id),
style=get_color_string(changeset.raw_id),
class_='tooltip',
title=tooltip_html
uri += '\n'
return uri
return _url_func
return literal(annotate_highlight(filenode, url_func(repo_name), **kwargs))
def is_following_repo(repo_name, user_id):
from rhodecode.model.scm import ScmModel
return ScmModel().is_following_repo(repo_name, user_id)
flash = _Flash()
#==============================================================================
# SCM FILTERS available via h.
from rhodecode.lib.vcs.utils import author_name, author_email
from rhodecode.lib.utils2 import credentials_filter, age as _age
from rhodecode.model.db import User, ChangesetStatus
age = lambda x: _age(x)
capitalize = lambda x: x.capitalize()
email = author_email
short_id = lambda x: x[:12]
hide_credentials = lambda x: ''.join(credentials_filter(x))
def fmt_date(date):
if date:
_fmt = _(u"%a, %d %b %Y %H:%M:%S").encode('utf8')
return date.strftime(_fmt).decode('utf8')
return ""
def is_git(repository):
if hasattr(repository, 'alias'):
_type = repository.alias
elif hasattr(repository, 'repo_type'):
_type = repository.repo_type
_type = repository
return _type == 'git'
def is_hg(repository):
return _type == 'hg'
def email_or_none(author):
# extract email from the commit string
_email = email(author)
if _email != '':
# check it against RhodeCode database, and use the MAIN email for this
# user
user = User.get_by_email(_email, case_insensitive=True, cache=True)
if user is not None:
return user.email
return _email
# See if it contains a username we can get an email from
user = User.get_by_username(author_name(author), case_insensitive=True,
cache=True)
# No valid email, not a valid user in the system, none!
return None
def person(author):
def person(author, show_attr="username_and_name"):
# attr to return from fetched user
person_getter = lambda usr: usr.username
person_getter = lambda usr: getattr(usr, show_attr)
# Valid email in the attribute passed, see if they're in the system
return person_getter(user)
# Maybe it's a username?
_author = author_name(author)
user = User.get_by_username(_author, case_insensitive=True,
# Still nothing? Just pass back the author name then
return _author
def person_by_id(id_):
def person_by_id(id_, show_attr="username_and_name"):
#maybe it's an ID ?
if str(id_).isdigit() or isinstance(id_, int):
id_ = int(id_)
user = User.get(id_)
return id_
def desc_stylize(value):
converts tags from value into html equivalent
:param value:
value = re.sub(r'\[see\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
'<div class="metatag" tag="see">see => \\1 </div>', value)
value = re.sub(r'\[license\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
'<div class="metatag" tag="license"><a href="http:\/\/www.opensource.org/licenses/\\1">\\1</a></div>', value)
value = re.sub(r'\[(requires|recommends|conflicts|base)\ \=\>\ *([a-zA-Z\-\/]*)\]',
'<div class="metatag" tag="\\1">\\1 => <a href="/\\2">\\2</a></div>', value)
value = re.sub(r'\[(lang|language)\ \=\>\ *([a-zA-Z\-\/]*)\]',
'<div class="metatag" tag="lang">\\2</div>', value)
value = re.sub(r'\[([a-z]+)\]',
'<div class="metatag" tag="\\1">\\1</div>', value)
return value
def bool2icon(value):
"""Returns True/False values represented as small html image of true/false
icons
:param value: bool value
if value is True:
return HTML.tag('img', src=url("/images/icons/accept.png"),
alt=_('True'))
if value is False:
return HTML.tag('img', src=url("/images/icons/cancel.png"),
alt=_('False'))
def action_parser(user_log, feed=False):
This helper will action_map the specified string action into translated
fancy names with icons and links
:param user_log: user log instance
:param feed: use output for feeds (no html and fancy icons)
action = user_log.action
action_params = ' '
x = action.split(':')
if len(x) > 1:
action, action_params = x
def get_cs_links():
revs_limit = 3 # display this amount always
revs_top_limit = 50 # show upto this amount of changesets hidden
revs_ids = action_params.split(',')
deleted = user_log.repository is None
if deleted:
return ','.join(revs_ids)
repo_name = user_log.repository.repo_name
repo = user_log.repository.scm_instance
def lnk(rev, repo_name):
if isinstance(rev, BaseChangeset):
lbl = 'r%s:%s' % (rev.revision, rev.short_id)
_url = url('changeset_home', repo_name=repo_name,
revision=rev.raw_id)
title = tooltip(rev.message)
lbl = '%s' % rev
_url = '#'
title = _('Changeset not found')
return link_to(lbl, _url, title=title, class_='tooltip',)
revs = []
if len(filter(lambda v: v != '', revs_ids)) > 0:
for rev in revs_ids[:revs_top_limit]:
rev = repo.get_changeset(rev)
revs.append(rev)
except ChangesetDoesNotExistError:
log.error('cannot find revision %s in this repo' % rev)
continue
cs_links = []
cs_links.append(" " + ', '.join(
[lnk(rev, repo_name) for rev in revs[:revs_limit]]
compare_view = (
' <div class="compare_view tooltip" title="%s">'
'<a href="%s">%s</a> </div>' % (
_('Show all combined changesets %s->%s') % (
revs_ids[0], revs_ids[-1]
revision='%s...%s' % (revs_ids[0], revs_ids[-1])
_('compare view')
# if we have exactly one more than normally displayed
# just display it, takes less space than displaying
# "and 1 more revisions"
if len(revs_ids) == revs_limit + 1:
rev = revs[revs_limit]
cs_links.append(", " + lnk(rev, repo_name))
# hidden-by-default ones
if len(revs_ids) > revs_limit + 1:
uniq_id = revs_ids[0]
html_tmpl = (
'<span> %s <a class="show_more" id="_%s" '
'href="#more">%s</a> %s</span>'
if not feed:
cs_links.append(html_tmpl % (
_('and'),
uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
_('revisions')
html_tmpl = '<span id="%s" style="display:none">, %s </span>'
html_tmpl = '<span id="%s"> %s </span>'
morelinks = ', '.join(
[lnk(rev, repo_name) for rev in revs[revs_limit:]]
if len(revs_ids) > revs_top_limit:
morelinks += ', ...'
cs_links.append(html_tmpl % (uniq_id, morelinks))
if len(revs) > 1:
cs_links.append(compare_view)
return ''.join(cs_links)
def get_fork_name():
repo_name = action_params
return _('fork name ') + str(link_to(action_params, url('summary_home',
repo_name=repo_name,)))
def get_user_name():
user_name = action_params
return user_name
def get_users_group():
group_name = action_params
return group_name
def get_pull_request():
pull_request_id = action_params
return link_to(_('Pull request #%s') % pull_request_id,
url('pullrequest_show', repo_name=repo_name,
pull_request_id=pull_request_id))
# action : translated str, callback(extractor), icon
action_map = {
'user_deleted_repo': (_('[deleted] repository'),
None, 'database_delete.png'),
'user_created_repo': (_('[created] repository'),
None, 'database_add.png'),
'user_created_fork': (_('[created] repository as fork'),
None, 'arrow_divide.png'),
'user_forked_repo': (_('[forked] repository'),
get_fork_name, 'arrow_divide.png'),
'user_updated_repo': (_('[updated] repository'),
None, 'database_edit.png'),
'admin_deleted_repo': (_('[delete] repository'),
'admin_created_repo': (_('[created] repository'),
'admin_forked_repo': (_('[forked] repository'),
'admin_updated_repo': (_('[updated] repository'),
'admin_created_user': (_('[created] user'),
get_user_name, 'user_add.png'),
'admin_updated_user': (_('[updated] user'),
get_user_name, 'user_edit.png'),
'admin_created_users_group': (_('[created] users group'),
get_users_group, 'group_add.png'),
'admin_updated_users_group': (_('[updated] users group'),
get_users_group, 'group_edit.png'),
'user_commented_revision': (_('[commented] on revision in repository'),
get_cs_links, 'comment_add.png'),
'user_commented_pull_request': (_('[commented] on pull request for'),
get_pull_request, 'comment_add.png'),
'user_closed_pull_request': (_('[closed] pull request for'),
get_pull_request, 'tick.png'),
'push': (_('[pushed] into'),
get_cs_links, 'script_add.png'),
'push_local': (_('[committed via RhodeCode] into repository'),
get_cs_links, 'script_edit.png'),
'push_remote': (_('[pulled from remote] into repository'),
get_cs_links, 'connect.png'),
'pull': (_('[pulled] from'),
None, 'down_16.png'),
'started_following_repo': (_('[started following] repository'),
None, 'heart_add.png'),
'stopped_following_repo': (_('[stopped following] repository'),
None, 'heart_delete.png'),
action_str = action_map.get(action, action)
if feed:
action = action_str[0].replace('[', '').replace(']', '')
action = action_str[0]\
.replace('[', '<span class="journal_highlight">')\
.replace(']', '</span>')
action_params_func = lambda: ""
if callable(action_str[1]):
action_params_func = action_str[1]
def action_parser_icon():
action_params = None
tmpl = """<img src="%s%s" alt="%s"/>"""
ico = action_map.get(action, ['', '', ''])[2]
return literal(tmpl % ((url('/images/icons/')), ico, action))
# returned callbacks we need to call to get
return [lambda: literal(action), action_params_func, action_parser_icon]
# PERMS
from rhodecode.lib.auth import HasPermissionAny, HasPermissionAll, \
HasRepoPermissionAny, HasRepoPermissionAll
# GRAVATAR URL
def gravatar_url(email_address, size=30):
if (not str2bool(config['app_conf'].get('use_gravatar')) or
not email_address or email_address == 'anonymous@rhodecode.org'):
f = lambda a, l: min(l, key=lambda x: abs(x - a))
return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
ssl_enabled = 'https' == request.environ.get('wsgi.url_scheme')
default = 'identicon'
baseurl_nossl = "http://www.gravatar.com/avatar/"
baseurl_ssl = "https://secure.gravatar.com/avatar/"
baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl
if isinstance(email_address, unicode):
#hashlib crashes on unicode items
email_address = safe_str(email_address)
# construct the url
gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
gravatar_url += urllib.urlencode({'d': default, 's': str(size)})
return gravatar_url
# REPO PAGER, PAGER FOR REPOSITORY
class RepoPage(Page):
def __init__(self, collection, page=1, items_per_page=20,
item_count=None, url=None, **kwargs):
"""Create a "RepoPage" instance. special pager for paging
repository
self._url_generator = url
# Safe the kwargs class-wide so they can be used in the pager() method
self.kwargs = kwargs
# Save a reference to the collection
self.original_collection = collection
self.collection = collection
# The self.page is the number of the current page.
# The first page has the number 1!
self.page = int(page) # make it int() if we get it as a string
except (ValueError, TypeError):
self.page = 1
self.items_per_page = items_per_page
# Unless the user tells us how many items the collections has
# we calculate that ourselves.
if item_count is not None:
self.item_count = item_count
self.item_count = len(self.collection)
# Compute the number of the first and last available page
if self.item_count > 0:
self.first_page = 1
self.page_count = int(math.ceil(float(self.item_count) /
self.items_per_page))
self.last_page = self.first_page + self.page_count - 1
# Make sure that the requested page number is the range of
# valid pages
if self.page > self.last_page:
self.page = self.last_page
elif self.page < self.first_page:
self.page = self.first_page
# Note: the number of items on this page can be less than
# items_per_page if the last page is not full
self.first_item = max(0, (self.item_count) - (self.page *
items_per_page))
self.last_item = ((self.item_count - 1) - items_per_page *
(self.page - 1))
self.items = list(self.collection[self.first_item:self.last_item + 1])
# Links to previous and next page
if self.page > self.first_page:
self.previous_page = self.page - 1
self.previous_page = None
if self.page < self.last_page:
self.next_page = self.page + 1
self.next_page = None
# No items available
self.first_page = None
self.page_count = 0
self.last_page = None
self.first_item = None
self.last_item = None
self.items = []
# This is a subclass of the 'list' type. Initialise the list now.
list.__init__(self, reversed(self.items))
def changed_tooltip(nodes):
Generates a html string for changed nodes in changeset page.
It limits the output to 30 entries
:param nodes: LazyNodesGenerator
if nodes:
# -*- coding: utf-8 -*-
rhodecode.model.db
~~~~~~~~~~~~~~~~~~
Database Models for RhodeCode
:created_on: Apr 08, 2010
:author: marcink
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import logging
import datetime
import traceback
import hashlib
import time
from collections import defaultdict
from sqlalchemy import *
from sqlalchemy.ext.hybrid import hybrid_property
from sqlalchemy.orm import relationship, joinedload, class_mapper, validates
from sqlalchemy.exc import DatabaseError
from beaker.cache import cache_region, region_invalidate
from webob.exc import HTTPNotFound
from pylons.i18n.translation import lazy_ugettext as _
from rhodecode.lib.vcs import get_backend
from rhodecode.lib.vcs.utils.helpers import get_scm
from rhodecode.lib.vcs.exceptions import VCSError
from rhodecode.lib.vcs.utils.lazy import LazyProperty
from rhodecode.lib.utils2 import str2bool, safe_str, get_changeset_safe, \
safe_unicode
from rhodecode.lib.compat import json
from rhodecode.lib.caching_query import FromCache
from rhodecode.model.meta import Base, Session
URL_SEP = '/'
# BASE CLASSES
_hash_key = lambda k: hashlib.md5(safe_str(k)).hexdigest()
class BaseModel(object):
Base Model for all classess
@classmethod
def _get_keys(cls):
"""return column names for this model """
return class_mapper(cls).c.keys()
def get_dict(self):
return dict with keys and values corresponding
to this model data """
d = {}
for k in self._get_keys():
d[k] = getattr(self, k)
# also use __json__() if present to get additional fields
_json_attr = getattr(self, '__json__', None)
if _json_attr:
# update with attributes from __json__
if callable(_json_attr):
_json_attr = _json_attr()
for k, val in _json_attr.iteritems():
d[k] = val
return d
def get_appstruct(self):
"""return list with keys and values tupples corresponding
l = []
l.append((k, getattr(self, k),))
return l
def populate_obj(self, populate_dict):
"""populate model with data from given populate_dict"""
if k in populate_dict:
setattr(self, k, populate_dict[k])
def query(cls):
return Session().query(cls)
def get(cls, id_):
if id_:
return cls.query().get(id_)
def get_or_404(cls, id_):
res = cls.query().get(id_)
if not res:
raise HTTPNotFound
return res
def getAll(cls):
return cls.query().all()
def delete(cls, id_):
obj = cls.query().get(id_)
Session().delete(obj)
def __repr__(self):
if hasattr(self, '__unicode__'):
# python repr needs to return str
return safe_str(self.__unicode__())
return '<DB:%s>' % (self.__class__.__name__)
class RhodeCodeSetting(Base, BaseModel):
__tablename__ = 'rhodecode_settings'
__table_args__ = (
UniqueConstraint('app_settings_name'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'}
app_settings_id = Column("app_settings_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
app_settings_name = Column("app_settings_name", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
_app_settings_value = Column("app_settings_value", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
def __init__(self, k='', v=''):
self.app_settings_name = k
self.app_settings_value = v
@validates('_app_settings_value')
def validate_settings_value(self, key, val):
assert type(val) == unicode
return val
@hybrid_property
def app_settings_value(self):
v = self._app_settings_value
if self.app_settings_name == 'ldap_active':
v = str2bool(v)
return v
@app_settings_value.setter
def app_settings_value(self, val):
Setter that will always make sure we use unicode in app_settings_value
:param val:
self._app_settings_value = safe_unicode(val)
def __unicode__(self):
return u"<%s('%s:%s')>" % (
self.__class__.__name__,
self.app_settings_name, self.app_settings_value
def get_by_name(cls, key):
return cls.query()\
.filter(cls.app_settings_name == key).scalar()
def get_by_name_or_create(cls, key):
res = cls.get_by_name(key)
res = cls(key)
def get_app_settings(cls, cache=False):
ret = cls.query()
if cache:
ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
if not ret:
raise Exception('Could not get application settings !')
settings = {}
for each in ret:
settings['rhodecode_' + each.app_settings_name] = \
each.app_settings_value
return settings
def get_ldap_settings(cls, cache=False):
ret = cls.query()\
.filter(cls.app_settings_name.startswith('ldap_')).all()
fd = {}
for row in ret:
fd.update({row.app_settings_name: row.app_settings_value})
return fd
class RhodeCodeUi(Base, BaseModel):
__tablename__ = 'rhodecode_ui'
UniqueConstraint('ui_key'),
HOOK_UPDATE = 'changegroup.update'
HOOK_REPO_SIZE = 'changegroup.repo_size'
HOOK_PUSH = 'changegroup.push_logger'
HOOK_PRE_PUSH = 'prechangegroup.pre_push'
HOOK_PULL = 'outgoing.pull_logger'
HOOK_PRE_PULL = 'preoutgoing.pre_pull'
ui_id = Column("ui_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
ui_section = Column("ui_section", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_key = Column("ui_key", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_value = Column("ui_value", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_active = Column("ui_active", Boolean(), nullable=True, unique=None, default=True)
def get_by_key(cls, key):
return cls.query().filter(cls.ui_key == key).scalar()
def get_builtin_hooks(cls):
q = cls.query()
q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
cls.HOOK_PUSH, cls.HOOK_PRE_PUSH,
cls.HOOK_PULL, cls.HOOK_PRE_PULL]))
return q.all()
def get_custom_hooks(cls):
q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
q = q.filter(cls.ui_section == 'hooks')
def get_repos_location(cls):
return cls.get_by_key('/').ui_value
def create_or_update_hook(cls, key, val):
new_ui = cls.get_by_key(key) or cls()
new_ui.ui_section = 'hooks'
new_ui.ui_active = True
new_ui.ui_key = key
new_ui.ui_value = val
Session().add(new_ui)
class User(Base, BaseModel):
__tablename__ = 'users'
UniqueConstraint('username'), UniqueConstraint('email'),
Index('u_username_idx', 'username'),
Index('u_email_idx', 'email'),
DEFAULT_USER = 'default'
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=True)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("firstname", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
_email = Column("email", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
ldap_dn = Column("ldap_dn", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
inherit_default_permissions = Column("inherit_default_permissions", Boolean(), nullable=False, unique=None, default=True)
user_log = relationship('UserLog', cascade='all')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UsersGroupMember', cascade='all')
notifications = relationship('UserNotification', cascade='all')
# notifications assigned to this user
user_created_notifications = relationship('Notification', cascade='all')
# comments created by this user
user_comments = relationship('ChangesetComment', cascade='all')
#extra emails for this user
user_emails = relationship('UserEmailMap', cascade='all')
def email(self):
return self._email
@email.setter
def email(self, val):
self._email = val.lower() if val else None
@property
def firstname(self):
# alias for future
return self.name
def emails(self):
other = UserEmailMap.query().filter(UserEmailMap.user==self).all()
return [self.email] + [x.email for x in other]
def username_and_name(self):
return '%s (%s %s)' % (self.username, self.firstname, self.lastname)
def full_name(self):
return '%s %s' % (self.name, self.lastname)
return '%s %s' % (self.firstname, self.lastname)
def full_name_or_username(self):
return ('%s %s' % (self.name, self.lastname)
if (self.name and self.lastname) else self.username)
return ('%s %s' % (self.firstname, self.lastname)
if (self.firstname and self.lastname) else self.username)
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
def short_contact(self):
def is_admin(self):
return self.admin
return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
def get_by_username(cls, username, case_insensitive=False, cache=False):
if case_insensitive:
q = cls.query().filter(cls.username.ilike(username))
q = cls.query().filter(cls.username == username)
q = q.options(FromCache(
"sql_cache_short",
"get_user_%s" % _hash_key(username)
return q.scalar()
def get_by_api_key(cls, api_key, cache=False):
q = cls.query().filter(cls.api_key == api_key)
q = q.options(FromCache("sql_cache_short",
"get_api_key_%s" % api_key))
def get_by_email(cls, email, case_insensitive=False, cache=False):
q = cls.query().filter(cls.email.ilike(email))
q = cls.query().filter(cls.email == email)
"get_email_key_%s" % email))
ret = q.scalar()
if ret is None:
q = UserEmailMap.query()
# try fetching in alternate email map
q = q.filter(UserEmailMap.email.ilike(email))
q = q.filter(UserEmailMap.email == email)
q = q.options(joinedload(UserEmailMap.user))
"get_email_map_key_%s" % email))
ret = getattr(q.scalar(), 'user', None)
return ret
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
Session().add(self)
log.debug('updated user %s lastlogin' % self.username)
def get_api_data(self):
Common function for generating user related data for API
user = self
data = dict(
user_id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
emails=user.emails,
api_key=user.api_key,
active=user.active,
admin=user.admin,
ldap_dn=user.ldap_dn,
last_login=user.last_login,
return data
def __json__(self):
full_name=self.full_name,
full_name_or_username=self.full_name_or_username,
short_contact=self.short_contact,
full_contact=self.full_contact
data.update(self.get_api_data())
class UserEmailMap(Base, BaseModel):
__tablename__ = 'user_email_map'
Index('uem_email_idx', 'email'),
UniqueConstraint('email'),
__mapper_args__ = {}
email_id = Column("email_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
_email = Column("email", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
user = relationship('User', lazy='joined')
@validates('_email')
def validate_email(self, key, email):
# check if this email is not main one
main_email = Session().query(User).filter(User.email == email).scalar()
if main_email is not None:
raise AttributeError('email %s is present is user table' % email)
return email
class UserLog(Base, BaseModel):
__tablename__ = 'user_logs'
'mysql_charset': 'utf8'},
user_log_id = Column("user_log_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True)
repository_name = Column("repository_name", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_ip = Column("user_ip", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action = Column("action", UnicodeText(1200000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action_date = Column("action_date", DateTime(timezone=False), nullable=True, unique=None, default=None)
def action_as_day(self):
return datetime.date(*self.action_date.timetuple()[:3])
user = relationship('User')
repository = relationship('Repository', cascade='')
class UsersGroup(Base, BaseModel):
__tablename__ = 'users_groups'
users_group_id = Column("users_group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_name = Column("users_group_name", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
users_group_active = Column("users_group_active", Boolean(), nullable=True, unique=None, default=None)
inherit_default_permissions = Column("users_group_inherit_default_permissions", Boolean(), nullable=False, unique=None, default=True)
members = relationship('UsersGroupMember', cascade="all, delete, delete-orphan", lazy="joined")
users_group_to_perm = relationship('UsersGroupToPerm', cascade='all')
users_group_repo_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
return u'<userGroup(%s)>' % (self.users_group_name)
def get_by_group_name(cls, group_name, cache=False,
case_insensitive=False):
q = cls.query().filter(cls.users_group_name.ilike(group_name))
q = cls.query().filter(cls.users_group_name == group_name)
"get_user_%s" % _hash_key(group_name)
def get(cls, users_group_id, cache=False):
users_group = cls.query()
users_group = users_group.options(FromCache("sql_cache_short",
"get_users_group_%s" % users_group_id))
return users_group.get(users_group_id)
users_group = self
users_group_id=users_group.users_group_id,
group_name=users_group.users_group_name,
active=users_group.users_group_active,
class UsersGroupMember(Base, BaseModel):
__tablename__ = 'users_groups_members'
users_group_member_id = Column("users_group_member_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
users_group = relationship('UsersGroup')
def __init__(self, gr_id='', u_id=''):
self.users_group_id = gr_id
self.user_id = u_id
class Repository(Base, BaseModel):
__tablename__ = 'repositories'
UniqueConstraint('repo_name'),
Index('r_repo_name_idx', 'repo_name'),
repo_id = Column("repo_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repo_name = Column("repo_name", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
clone_uri = Column("clone_uri", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
repo_type = Column("repo_type", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
private = Column("private", Boolean(), nullable=True, unique=None, default=None)
enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
landing_rev = Column("landing_revision", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
enable_locking = Column("enable_locking", Boolean(), nullable=False, unique=None, default=False)
_locked = Column("locked", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
fork = relationship('Repository', remote_side=repo_id)
group = relationship('RepoGroup')
repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
followers = relationship('UserFollowing',
primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id',
cascade='all')
logs = relationship('UserLog')
comments = relationship('ChangesetComment', cascade="all, delete, delete-orphan")
pull_requests_org = relationship('PullRequest',
primaryjoin='PullRequest.org_repo_id==Repository.repo_id',
cascade="all, delete, delete-orphan")
pull_requests_other = relationship('PullRequest',
primaryjoin='PullRequest.other_repo_id==Repository.repo_id',
return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
self.repo_name)
def locked(self):
# always should return [user_id, timelocked]
if self._locked:
_lock_info = self._locked.split(':')
return int(_lock_info[0]), _lock_info[1]
return [None, None]
@locked.setter
def locked(self, val):
if val and isinstance(val, (list, tuple)):
self._locked = ':'.join(map(str, val))
self._locked = None
def url_sep(cls):
return URL_SEP
def get_by_repo_name(cls, repo_name):
q = Session().query(cls).filter(cls.repo_name == repo_name)
q = q.options(joinedload(Repository.fork))\
.options(joinedload(Repository.user))\
.options(joinedload(Repository.group))
def get_by_full_path(cls, repo_full_path):
repo_name = repo_full_path.split(cls.base_path(), 1)[-1]
return cls.get_by_repo_name(repo_name.strip(URL_SEP))
def get_repo_forks(cls, repo_id):
return cls.query().filter(Repository.fork_id == repo_id)
def base_path(cls):
Returns base path when all repos are stored
:param cls:
q = Session().query(RhodeCodeUi)\
.filter(RhodeCodeUi.ui_key == cls.url_sep())
q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
def forks(self):
Return forks of this repo
return Repository.get_repo_forks(self.repo_id)
def parent(self):
Returns fork parent
return self.fork
def just_name(self):
return self.repo_name.split(Repository.url_sep())[-1]
def groups_with_parents(self):
groups = []
if self.group is None:
return groups
cur_gr = self.group
groups.insert(0, cur_gr)
while 1:
gr = getattr(cur_gr, 'parent_group', None)
cur_gr = cur_gr.parent_group
if gr is None:
break
groups.insert(0, gr)
def groups_and_repo(self):
return self.groups_with_parents, self.just_name
@LazyProperty
def repo_path(self):
Returns base full path for that repository means where it actually
exists on a filesystem
q = Session().query(RhodeCodeUi).filter(RhodeCodeUi.ui_key ==
Repository.url_sep())
def repo_full_path(self):
p = [self.repo_path]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split(Repository.url_sep())
return os.path.join(*p)
Status change: