@@ -227,410 +227,410 @@ class CodeHtmlFormatter(HtmlFormatter):
if i % st == 0:
if aln:
lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
else:
lines.append('%*d' % (mw, i))
lines.append('')
ls = '\n'.join(lines)
# in case you wonder about the seemingly redundant <div> here: since the
# content in the other cell also is wrapped in a div, some browsers in
# some configurations seem to mess up the formatting...
if nocls:
yield 0, ('<table class="%stable">' % self.cssclass +
'<tr><td><div class="linenodiv" '
'style="background-color: #f0f0f0; padding-right: 10px">'
'<pre style="line-height: 125%">' +
ls + '</pre></div></td><td id="hlcode" class="code">')
'<tr><td class="linenos"><div class="linenodiv"><pre>' +
yield 0, dummyoutfile.getvalue()
yield 0, '</td></tr></table>'
def pygmentize(filenode, **kwargs):
"""pygmentize function using pygments
:param filenode:
"""
return literal(code_highlight(filenode.content,
filenode.lexer, CodeHtmlFormatter(**kwargs)))
def pygmentize_annotation(repo_name, filenode, **kwargs):
pygmentize function for annotation
color_dict = {}
def gen_color(n=10000):
"""generator for getting n of evenly distributed colors using
hsv color and golden ratio. It always return same order of colors
:returns: RGB tuple
def hsv_to_rgb(h, s, v):
if s == 0.0:
return v, v, v
i = int(h * 6.0) # XXX assume int() truncates!
f = (h * 6.0) - i
p = v * (1.0 - s)
q = v * (1.0 - s * f)
t = v * (1.0 - s * (1.0 - f))
i = i % 6
if i == 0:
return v, t, p
if i == 1:
return q, v, p
if i == 2:
return p, v, t
if i == 3:
return p, q, v
if i == 4:
return t, p, v
if i == 5:
return v, p, q
golden_ratio = 0.618033988749895
h = 0.22717784590367374
for _ in xrange(n):
h += golden_ratio
h %= 1
HSV_tuple = [h, 0.95, 0.95]
RGB_tuple = hsv_to_rgb(*HSV_tuple)
yield map(lambda x: str(int(x * 256)), RGB_tuple)
cgenerator = gen_color()
def get_color_string(cs):
if cs in color_dict:
col = color_dict[cs]
col = color_dict[cs] = cgenerator.next()
return "color: rgb(%s)! important;" % (', '.join(col))
def url_func(repo_name):
def _url_func(changeset):
author = changeset.author
date = changeset.date
message = tooltip(changeset.message)
tooltip_html = ("<div style='font-size:0.8em'><b>Author:</b>"
" %s<br/><b>Date:</b> %s</b><br/><b>Message:"
"</b> %s<br/></div>")
tooltip_html = tooltip_html % (author, date, message)
lnk_format = '%5s:%s' % ('r%s' % changeset.revision,
short_id(changeset.raw_id))
uri = link_to(
lnk_format,
url('changeset_home', repo_name=repo_name,
revision=changeset.raw_id),
style=get_color_string(changeset.raw_id),
class_='tooltip',
title=tooltip_html
)
uri += '\n'
return uri
return _url_func
return literal(annotate_highlight(filenode, url_func(repo_name), **kwargs))
def is_following_repo(repo_name, user_id):
from rhodecode.model.scm import ScmModel
return ScmModel().is_following_repo(repo_name, user_id)
flash = _Flash()
#==============================================================================
# SCM FILTERS available via h.
from rhodecode.lib.vcs.utils import author_name, author_email
from rhodecode.lib.utils2 import credentials_filter, age as _age
from rhodecode.model.db import User, ChangesetStatus
age = lambda x: _age(x)
capitalize = lambda x: x.capitalize()
email = author_email
short_id = lambda x: x[:12]
hide_credentials = lambda x: ''.join(credentials_filter(x))
def fmt_date(date):
if date:
_fmt = _(u"%a, %d %b %Y %H:%M:%S").encode('utf8')
return date.strftime(_fmt).decode('utf8')
return ""
def is_git(repository):
if hasattr(repository, 'alias'):
_type = repository.alias
elif hasattr(repository, 'repo_type'):
_type = repository.repo_type
_type = repository
return _type == 'git'
def is_hg(repository):
return _type == 'hg'
def email_or_none(author):
# extract email from the commit string
_email = email(author)
if _email != '':
# check it against RhodeCode database, and use the MAIN email for this
# user
user = User.get_by_email(_email, case_insensitive=True, cache=True)
if user is not None:
return user.email
return _email
# See if it contains a username we can get an email from
user = User.get_by_username(author_name(author), case_insensitive=True,
cache=True)
# No valid email, not a valid user in the system, none!
return None
def person(author):
def person(author, show_attr="username_and_name"):
# attr to return from fetched user
person_getter = lambda usr: usr.username
person_getter = lambda usr: getattr(usr, show_attr)
# Valid email in the attribute passed, see if they're in the system
return person_getter(user)
# Maybe it's a username?
_author = author_name(author)
user = User.get_by_username(_author, case_insensitive=True,
# Still nothing? Just pass back the author name then
return _author
def person_by_id(id_):
def person_by_id(id_, show_attr="username_and_name"):
#maybe it's an ID ?
if str(id_).isdigit() or isinstance(id_, int):
id_ = int(id_)
user = User.get(id_)
return id_
def desc_stylize(value):
converts tags from value into html equivalent
:param value:
value = re.sub(r'\[see\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
'<div class="metatag" tag="see">see => \\1 </div>', value)
value = re.sub(r'\[license\ \=\>\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
'<div class="metatag" tag="license"><a href="http:\/\/www.opensource.org/licenses/\\1">\\1</a></div>', value)
value = re.sub(r'\[(requires|recommends|conflicts|base)\ \=\>\ *([a-zA-Z\-\/]*)\]',
'<div class="metatag" tag="\\1">\\1 => <a href="/\\2">\\2</a></div>', value)
value = re.sub(r'\[(lang|language)\ \=\>\ *([a-zA-Z\-\/]*)\]',
'<div class="metatag" tag="lang">\\2</div>', value)
value = re.sub(r'\[([a-z]+)\]',
'<div class="metatag" tag="\\1">\\1</div>', value)
return value
def bool2icon(value):
"""Returns True/False values represented as small html image of true/false
icons
:param value: bool value
if value is True:
return HTML.tag('img', src=url("/images/icons/accept.png"),
alt=_('True'))
if value is False:
return HTML.tag('img', src=url("/images/icons/cancel.png"),
alt=_('False'))
def action_parser(user_log, feed=False):
This helper will action_map the specified string action into translated
fancy names with icons and links
:param user_log: user log instance
:param feed: use output for feeds (no html and fancy icons)
action = user_log.action
action_params = ' '
x = action.split(':')
if len(x) > 1:
action, action_params = x
def get_cs_links():
revs_limit = 3 # display this amount always
revs_top_limit = 50 # show upto this amount of changesets hidden
revs_ids = action_params.split(',')
deleted = user_log.repository is None
if deleted:
return ','.join(revs_ids)
repo_name = user_log.repository.repo_name
repo = user_log.repository.scm_instance
def lnk(rev, repo_name):
if isinstance(rev, BaseChangeset):
lbl = 'r%s:%s' % (rev.revision, rev.short_id)
_url = url('changeset_home', repo_name=repo_name,
revision=rev.raw_id)
title = tooltip(rev.message)
lbl = '%s' % rev
_url = '#'
title = _('Changeset not found')
return link_to(lbl, _url, title=title, class_='tooltip',)
revs = []
if len(filter(lambda v: v != '', revs_ids)) > 0:
for rev in revs_ids[:revs_top_limit]:
try:
rev = repo.get_changeset(rev)
revs.append(rev)
except ChangesetDoesNotExistError:
log.error('cannot find revision %s in this repo' % rev)
continue
cs_links = []
cs_links.append(" " + ', '.join(
[lnk(rev, repo_name) for rev in revs[:revs_limit]]
compare_view = (
' <div class="compare_view tooltip" title="%s">'
'<a href="%s">%s</a> </div>' % (
_('Show all combined changesets %s->%s') % (
revs_ids[0], revs_ids[-1]
),
revision='%s...%s' % (revs_ids[0], revs_ids[-1])
_('compare view')
# if we have exactly one more than normally displayed
# just display it, takes less space than displaying
# "and 1 more revisions"
if len(revs_ids) == revs_limit + 1:
rev = revs[revs_limit]
cs_links.append(", " + lnk(rev, repo_name))
# hidden-by-default ones
if len(revs_ids) > revs_limit + 1:
uniq_id = revs_ids[0]
html_tmpl = (
'<span> %s <a class="show_more" id="_%s" '
'href="#more">%s</a> %s</span>'
if not feed:
cs_links.append(html_tmpl % (
_('and'),
uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
_('revisions')
html_tmpl = '<span id="%s" style="display:none">, %s </span>'
html_tmpl = '<span id="%s"> %s </span>'
morelinks = ', '.join(
[lnk(rev, repo_name) for rev in revs[revs_limit:]]
if len(revs_ids) > revs_top_limit:
morelinks += ', ...'
cs_links.append(html_tmpl % (uniq_id, morelinks))
if len(revs) > 1:
cs_links.append(compare_view)
return ''.join(cs_links)
def get_fork_name():
repo_name = action_params
return _('fork name ') + str(link_to(action_params, url('summary_home',
repo_name=repo_name,)))
def get_user_name():
user_name = action_params
return user_name
def get_users_group():
group_name = action_params
return group_name
def get_pull_request():
pull_request_id = action_params
return link_to(_('Pull request #%s') % pull_request_id,
url('pullrequest_show', repo_name=repo_name,
pull_request_id=pull_request_id))
# action : translated str, callback(extractor), icon
action_map = {
'user_deleted_repo': (_('[deleted] repository'),
None, 'database_delete.png'),
'user_created_repo': (_('[created] repository'),
None, 'database_add.png'),
'user_created_fork': (_('[created] repository as fork'),
None, 'arrow_divide.png'),
'user_forked_repo': (_('[forked] repository'),
get_fork_name, 'arrow_divide.png'),
'user_updated_repo': (_('[updated] repository'),
None, 'database_edit.png'),
'admin_deleted_repo': (_('[delete] repository'),
@@ -141,404 +141,413 @@ class BaseModel(object):
class RhodeCodeSetting(Base, BaseModel):
__tablename__ = 'rhodecode_settings'
__table_args__ = (
UniqueConstraint('app_settings_name'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'}
app_settings_id = Column("app_settings_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
app_settings_name = Column("app_settings_name", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
_app_settings_value = Column("app_settings_value", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
def __init__(self, k='', v=''):
self.app_settings_name = k
self.app_settings_value = v
@validates('_app_settings_value')
def validate_settings_value(self, key, val):
assert type(val) == unicode
return val
@hybrid_property
def app_settings_value(self):
v = self._app_settings_value
if self.app_settings_name == 'ldap_active':
v = str2bool(v)
return v
@app_settings_value.setter
def app_settings_value(self, val):
Setter that will always make sure we use unicode in app_settings_value
:param val:
self._app_settings_value = safe_unicode(val)
def __unicode__(self):
return u"<%s('%s:%s')>" % (
self.__class__.__name__,
self.app_settings_name, self.app_settings_value
@classmethod
def get_by_name(cls, key):
return cls.query()\
.filter(cls.app_settings_name == key).scalar()
def get_by_name_or_create(cls, key):
res = cls.get_by_name(key)
if not res:
res = cls(key)
return res
def get_app_settings(cls, cache=False):
ret = cls.query()
if cache:
ret = ret.options(FromCache("sql_cache_short", "get_hg_settings"))
if not ret:
raise Exception('Could not get application settings !')
settings = {}
for each in ret:
settings['rhodecode_' + each.app_settings_name] = \
each.app_settings_value
return settings
def get_ldap_settings(cls, cache=False):
ret = cls.query()\
.filter(cls.app_settings_name.startswith('ldap_')).all()
fd = {}
for row in ret:
fd.update({row.app_settings_name: row.app_settings_value})
return fd
class RhodeCodeUi(Base, BaseModel):
__tablename__ = 'rhodecode_ui'
UniqueConstraint('ui_key'),
HOOK_UPDATE = 'changegroup.update'
HOOK_REPO_SIZE = 'changegroup.repo_size'
HOOK_PUSH = 'changegroup.push_logger'
HOOK_PRE_PUSH = 'prechangegroup.pre_push'
HOOK_PULL = 'outgoing.pull_logger'
HOOK_PRE_PULL = 'preoutgoing.pre_pull'
ui_id = Column("ui_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
ui_section = Column("ui_section", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_key = Column("ui_key", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_value = Column("ui_value", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
ui_active = Column("ui_active", Boolean(), nullable=True, unique=None, default=True)
def get_by_key(cls, key):
return cls.query().filter(cls.ui_key == key).scalar()
def get_builtin_hooks(cls):
q = cls.query()
q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
cls.HOOK_PUSH, cls.HOOK_PRE_PUSH,
cls.HOOK_PULL, cls.HOOK_PRE_PULL]))
return q.all()
def get_custom_hooks(cls):
q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE,
q = q.filter(cls.ui_section == 'hooks')
def get_repos_location(cls):
return cls.get_by_key('/').ui_value
def create_or_update_hook(cls, key, val):
new_ui = cls.get_by_key(key) or cls()
new_ui.ui_section = 'hooks'
new_ui.ui_active = True
new_ui.ui_key = key
new_ui.ui_value = val
Session().add(new_ui)
class User(Base, BaseModel):
__tablename__ = 'users'
UniqueConstraint('username'), UniqueConstraint('email'),
Index('u_username_idx', 'username'),
Index('u_email_idx', 'email'),
DEFAULT_USER = 'default'
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=True)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("firstname", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
_email = Column("email", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
ldap_dn = Column("ldap_dn", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
inherit_default_permissions = Column("inherit_default_permissions", Boolean(), nullable=False, unique=None, default=True)
user_log = relationship('UserLog', cascade='all')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UsersGroupMember', cascade='all')
notifications = relationship('UserNotification', cascade='all')
# notifications assigned to this user
user_created_notifications = relationship('Notification', cascade='all')
# comments created by this user
user_comments = relationship('ChangesetComment', cascade='all')
#extra emails for this user
user_emails = relationship('UserEmailMap', cascade='all')
def email(self):
return self._email
@email.setter
def email(self, val):
self._email = val.lower() if val else None
@property
def firstname(self):
# alias for future
return self.name
def emails(self):
other = UserEmailMap.query().filter(UserEmailMap.user==self).all()
return [self.email] + [x.email for x in other]
def username_and_name(self):
return '%s (%s %s)' % (self.username, self.firstname, self.lastname)
def full_name(self):
return '%s %s' % (self.name, self.lastname)
return '%s %s' % (self.firstname, self.lastname)
def full_name_or_username(self):
return ('%s %s' % (self.name, self.lastname)
if (self.name and self.lastname) else self.username)
return ('%s %s' % (self.firstname, self.lastname)
if (self.firstname and self.lastname) else self.username)
def full_contact(self):
return '%s %s <%s>' % (self.name, self.lastname, self.email)
return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
def short_contact(self):
def is_admin(self):
return self.admin
return u"<%s('id:%s:%s')>" % (self.__class__.__name__,
self.user_id, self.username)
def get_by_username(cls, username, case_insensitive=False, cache=False):
if case_insensitive:
q = cls.query().filter(cls.username.ilike(username))
q = cls.query().filter(cls.username == username)
q = q.options(FromCache(
"sql_cache_short",
"get_user_%s" % _hash_key(username)
return q.scalar()
def get_by_api_key(cls, api_key, cache=False):
q = cls.query().filter(cls.api_key == api_key)
q = q.options(FromCache("sql_cache_short",
"get_api_key_%s" % api_key))
def get_by_email(cls, email, case_insensitive=False, cache=False):
q = cls.query().filter(cls.email.ilike(email))
q = cls.query().filter(cls.email == email)
"get_email_key_%s" % email))
ret = q.scalar()
if ret is None:
q = UserEmailMap.query()
# try fetching in alternate email map
q = q.filter(UserEmailMap.email.ilike(email))
q = q.filter(UserEmailMap.email == email)
q = q.options(joinedload(UserEmailMap.user))
"get_email_map_key_%s" % email))
ret = getattr(q.scalar(), 'user', None)
return ret
def update_lastlogin(self):
"""Update user lastlogin"""
self.last_login = datetime.datetime.now()
Session().add(self)
log.debug('updated user %s lastlogin' % self.username)
def get_api_data(self):
Common function for generating user related data for API
user = self
data = dict(
user_id=user.user_id,
username=user.username,
firstname=user.name,
lastname=user.lastname,
email=user.email,
emails=user.emails,
api_key=user.api_key,
active=user.active,
admin=user.admin,
ldap_dn=user.ldap_dn,
last_login=user.last_login,
return data
def __json__(self):
full_name=self.full_name,
full_name_or_username=self.full_name_or_username,
short_contact=self.short_contact,
full_contact=self.full_contact
data.update(self.get_api_data())
class UserEmailMap(Base, BaseModel):
__tablename__ = 'user_email_map'
Index('uem_email_idx', 'email'),
UniqueConstraint('email'),
__mapper_args__ = {}
email_id = Column("email_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
_email = Column("email", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
user = relationship('User', lazy='joined')
@validates('_email')
def validate_email(self, key, email):
# check if this email is not main one
main_email = Session().query(User).filter(User.email == email).scalar()
if main_email is not None:
raise AttributeError('email %s is present is user table' % email)
return email
class UserLog(Base, BaseModel):
__tablename__ = 'user_logs'
'mysql_charset': 'utf8'},
user_log_id = Column("user_log_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
repository_id = Column("repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True)
repository_name = Column("repository_name", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_ip = Column("user_ip", String(255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action = Column("action", UnicodeText(1200000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
action_date = Column("action_date", DateTime(timezone=False), nullable=True, unique=None, default=None)
def action_as_day(self):
return datetime.date(*self.action_date.timetuple()[:3])
user = relationship('User')
repository = relationship('Repository', cascade='')
class UsersGroup(Base, BaseModel):
__tablename__ = 'users_groups'
users_group_id = Column("users_group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_name = Column("users_group_name", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
users_group_active = Column("users_group_active", Boolean(), nullable=True, unique=None, default=None)
inherit_default_permissions = Column("users_group_inherit_default_permissions", Boolean(), nullable=False, unique=None, default=True)
members = relationship('UsersGroupMember', cascade="all, delete, delete-orphan", lazy="joined")
users_group_to_perm = relationship('UsersGroupToPerm', cascade='all')
users_group_repo_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
return u'<userGroup(%s)>' % (self.users_group_name)
def get_by_group_name(cls, group_name, cache=False,
case_insensitive=False):
q = cls.query().filter(cls.users_group_name.ilike(group_name))
q = cls.query().filter(cls.users_group_name == group_name)
"get_user_%s" % _hash_key(group_name)
def get(cls, users_group_id, cache=False):
users_group = cls.query()
users_group = users_group.options(FromCache("sql_cache_short",
"get_users_group_%s" % users_group_id))
Status change: