################################################################################
# RhodeCode - Pylons environment configuration #
# #
# The %(here)s variable will be replaced with the parent directory of this file#
[DEFAULT]
debug = true
pdebug = false
## Uncomment and replace with the address which should receive ##
## any error reports after application crash ##
## Additionally those settings will be used by RhodeCode mailing system ##
#email_to = admin@localhost
#error_email_from = paste_error@localhost
#app_email_from = rhodecode-noreply@localhost
#error_message =
#email_prefix = [RhodeCode]
#smtp_server = mail.server.com
#smtp_username =
#smtp_password =
#smtp_port =
#smtp_use_tls = false
#smtp_use_ssl = true
# Specify available auth parameters here (e.g. LOGIN PLAIN CRAM-MD5, etc.)
#smtp_auth =
[server:main]
##nr of threads to spawn
#threadpool_workers = 5
##max request before thread respawn
#threadpool_max_requests = 10
##option to use threads of process
#use_threadpool = true
#use = egg:Paste#http
use = egg:waitress#main
host = 0.0.0.0
port = 5000
[filter:proxy-prefix]
# prefix middleware for rc
use = egg:PasteDeploy#prefix
prefix = /<your-prefix>
[app:main]
use = egg:rhodecode
#filter-with = proxy-prefix
full_stack = true
static_files = true
# Optional Languages
# en, fr, ja, pt_BR, zh_CN, zh_TW
lang = en
cache_dir = %(here)s/data
index_dir = %(here)s/data/index
app_instance_uuid = rc-develop
cut_off_limit = 256000
force_https = false
commit_parse_limit = 25
use_gravatar = true
## alternative_gravatar_url allows you to use your own avatar server application
## the following parts of the URL will be replaced
## {email} user email
## {md5email} md5 hash of the user email (like at gravatar.com)
## {size} size of the image that is expected from the server application
## {scheme} http/https from RhodeCode server
## {netloc} network location from RhodeCode server
#alternative_gravatar_url = http://myavatarserver.com/getbyemail/{email}/{size}
#alternative_gravatar_url = http://myavatarserver.com/getbymd5/{md5email}?s={size}
container_auth_enabled = false
proxypass_auth_enabled = false
default_encoding = utf8
## overwrite schema of clone url
## available vars:
## scheme - http/https
## user - current user
## pass - password
## netloc - network location
## path - usually repo_name
#clone_uri = {scheme}://{user}{pass}{netloc}{path}
## issue tracking mapping for commits messages
## comment out issue_pat, issue_server, issue_prefix to enable
## pattern to get the issues from commit messages
## default one used here is #<numbers> with a regex passive group for `#`
## {id} will be all groups matched from this pattern
issue_pat = (?:\s*#)(\d+)
## server url to the issue, each {id} will be replaced with match
## fetched from the regex and {repo} is replaced with full repository name
## including groups {repo_name} is replaced with just name of repo
issue_server_link = https://myissueserver.com/{repo}/issue/{id}
## prefix to add to link to indicate it's an url
## #314 will be replaced by <issue_prefix><id>
issue_prefix = #
## instance-id prefix
## a prefix key for this instance used for cache invalidation when running
## multiple instances of rhodecode, make sure it's globally unique for
## all running rhodecode instances. Leave empty if you don't use it
instance_id =
## alternative return HTTP header for failed authentication. Default HTTP
## response is 401 HTTPUnauthorized. Currently HG clients have troubles with
## handling that. Set this variable to 403 to return HTTPForbidden
auth_ret_code =
####################################
### CELERY CONFIG ####
use_celery = false
broker.host = localhost
broker.vhost = rabbitmqhost
broker.port = 5672
broker.user = rabbitmq
broker.password = qweqwe
celery.imports = rhodecode.lib.celerylib.tasks
celery.result.backend = amqp
celery.result.dburi = amqp://
celery.result.serialier = json
#celery.send.task.error.emails = true
#celery.amqp.task.result.expires = 18000
celeryd.concurrency = 2
#celeryd.log.file = celeryd.log
celeryd.log.level = debug
celeryd.max.tasks.per.child = 1
#tasks will never be sent to the queue, but executed locally instead.
celery.always.eager = false
### BEAKER CACHE ####
beaker.cache.data_dir=%(here)s/data/cache/data
beaker.cache.lock_dir=%(here)s/data/cache/lock
beaker.cache.regions=super_short_term,short_term,long_term,sql_cache_short,sql_cache_med,sql_cache_long
beaker.cache.super_short_term.type=memory
beaker.cache.super_short_term.expire=10
beaker.cache.super_short_term.key_length = 256
beaker.cache.short_term.type=memory
beaker.cache.short_term.expire=60
beaker.cache.short_term.key_length = 256
beaker.cache.long_term.type=memory
beaker.cache.long_term.expire=36000
beaker.cache.long_term.key_length = 256
beaker.cache.sql_cache_short.type=memory
beaker.cache.sql_cache_short.expire=10
beaker.cache.sql_cache_short.key_length = 256
beaker.cache.sql_cache_med.type=memory
beaker.cache.sql_cache_med.expire=360
beaker.cache.sql_cache_med.key_length = 256
beaker.cache.sql_cache_long.type=file
beaker.cache.sql_cache_long.expire=3600
beaker.cache.sql_cache_long.key_length = 256
### BEAKER SESSION ####
## Type of storage used for the session, current types are
## dbm, file, memcached, database, and memory.
## The storage uses the Container API
## that is also used by the cache system.
## db session ##
#beaker.session.type = ext:database
#beaker.session.sa.url = postgresql://postgres:qwe@localhost/rhodecode
#beaker.session.table_name = db_session
## encrypted cookie client side session, good for many instances ##
#beaker.session.type = cookie
## file based cookies (default) ##
#beaker.session.type = file
beaker.session.key = rhodecode
## secure cookie requires AES python libraries ##
#beaker.session.encrypt_key = g654dcno0-9873jhgfreyu
#beaker.session.validate_key = 9712sds2212c--zxc123
## sets session as invalid if it haven't been accessed for given amount of time
beaker.session.timeout = 2592000
beaker.session.httponly = true
#beaker.session.cookie_path = /<your-prefix>
## uncomment for https secure cookie ##
beaker.session.secure = false
## auto save the session to not to use .save() ##
beaker.session.auto = False
## default cookie expiration time in seconds `true` expire at browser close ##
#beaker.session.cookie_expires = 3600
## WARNING: *THE LINE BELOW MUST BE UNCOMMENTED ON A PRODUCTION ENVIRONMENT* ##
## Debug mode will enable the interactive debugging tool, allowing ANYONE to ##
## execute malicious code after an exception is raised. ##
#set debug = false
##################################
### LOGVIEW CONFIG ###
logview.sqlalchemy = #faa
logview.pylons.templating = #bfb
logview.pylons.util = #eee
#########################################################
### DB CONFIGS - EACH DB WILL HAVE IT'S OWN CONFIG ###
#sqlalchemy.db1.url = sqlite:///%(here)s/rhodecode.db
sqlalchemy.db1.url = postgresql://postgres:qwe@localhost/rhodecode
sqlalchemy.db1.echo = false
sqlalchemy.db1.pool_recycle = 3600
sqlalchemy.db1.convert_unicode = true
################################
### LOGGING CONFIGURATION ####
[loggers]
keys = root, routes, rhodecode, sqlalchemy, beaker, templates, whoosh_indexer
[handlers]
keys = console, console_sql
[formatters]
keys = generic, color_formatter, color_formatter_sql
#############
## LOGGERS ##
[logger_root]
level = NOTSET
handlers = console
[logger_routes]
level = DEBUG
handlers =
qualname = routes.middleware
host = 127.0.0.1
port = 8001
app_instance_uuid = rc-production
commit_parse_limit = 50
set debug = false
app_instance_uuid = ${app_instance_uuid}
# SQLITE [default]
sqlalchemy.db1.url = sqlite:///%(here)s/rhodecode.db
# POSTGRESQL
# sqlalchemy.db1.url = postgresql://user:pass@localhost/rhodecode
# MySQL
# sqlalchemy.db1.url = mysql://user:pass@localhost/rhodecode
# see sqlalchemy docs for others
"""Helper functions
Consists of functions to typically be used within templates, but also
available to Controllers. This module is available to both as 'h'.
"""
import random
import hashlib
import StringIO
import urllib
import math
import logging
import re
import urlparse
from datetime import datetime
from pygments.formatters.html import HtmlFormatter
from pygments import highlight as code_highlight
from pylons import url, request, config
from pylons.i18n.translation import _, ungettext
from hashlib import md5
from webhelpers.html import literal, HTML, escape
from webhelpers.html.tools import *
from webhelpers.html.builder import make_tag
from webhelpers.html.tags import auto_discovery_link, checkbox, css_classes, \
end_form, file, form, hidden, image, javascript_link, link_to, \
link_to_if, link_to_unless, ol, required_legend, select, stylesheet_link, \
submit, text, password, textarea, title, ul, xml_declaration, radio
from webhelpers.html.tools import auto_link, button_to, highlight, \
js_obfuscate, mail_to, strip_links, strip_tags, tag_re
from webhelpers.number import format_byte_size, format_bit_size
from webhelpers.pylonslib import Flash as _Flash
from webhelpers.pylonslib.secure_form import secure_form
from webhelpers.text import chop_at, collapse, convert_accented_entities, \
convert_misc_entities, lchop, plural, rchop, remove_formatting, \
replace_whitespace, urlify, truncate, wrap_paragraphs
from webhelpers.date import time_ago_in_words
from webhelpers.paginate import Page
from webhelpers.html.tags import _set_input_attrs, _set_id_attr, \
convert_boolean_attrs, NotGiven, _make_safe_id_component
from rhodecode.lib.annotate import annotate_highlight
from rhodecode.lib.utils import repo_name_slug
from rhodecode.lib.utils2 import str2bool, safe_unicode, safe_str, \
get_changeset_safe, datetime_to_time, time_to_datetime
from rhodecode.lib.markup_renderer import MarkupRenderer
from rhodecode.lib.vcs.exceptions import ChangesetDoesNotExistError
from rhodecode.lib.vcs.backends.base import BaseChangeset
from rhodecode.config.conf import DATE_FORMAT, DATETIME_FORMAT
from rhodecode.model.changeset_status import ChangesetStatusModel
from rhodecode.model.db import URL_SEP, Permission
log = logging.getLogger(__name__)
html_escape_table = {
"&": "&",
'"': """,
"'": "'",
">": ">",
"<": "<",
}
def html_escape(text):
"""Produce entities within text."""
return "".join(html_escape_table.get(c,c) for c in text)
def shorter(text, size=20):
postfix = '...'
if len(text) > size:
return text[:size - len(postfix)] + postfix
return text
def _reset(name, value=None, id=NotGiven, type="reset", **attrs):
Reset button
_set_input_attrs(attrs, type, name, value)
_set_id_attr(attrs, id, name)
convert_boolean_attrs(attrs, ["disabled"])
return HTML.input(**attrs)
reset = _reset
safeid = _make_safe_id_component
def FID(raw_id, path):
Creates a uniqe ID for filenode based on it's hash of path and revision
it's safe to use in urls
:param raw_id:
:param path:
return 'C-%s-%s' % (short_id(raw_id), md5(safe_str(path)).hexdigest()[:12])
def get_token():
"""Return the current authentication token, creating one if one doesn't
already exist.
token_key = "_authentication_token"
from pylons import session
if not token_key in session:
try:
token = hashlib.sha1(str(random.getrandbits(128))).hexdigest()
except AttributeError: # Python < 2.4
token = hashlib.sha1(str(random.randrange(2 ** 128))).hexdigest()
session[token_key] = token
if hasattr(session, 'save'):
session.save()
return session[token_key]
class _GetError(object):
"""Get error from form_errors, and represent it as span wrapped error
message
:param field_name: field to fetch errors for
:param form_errors: form errors dict
def __call__(self, field_name, form_errors):
tmpl = """<span class="error_msg">%s</span>"""
if form_errors and field_name in form_errors:
return literal(tmpl % form_errors.get(field_name))
get_error = _GetError()
class _ToolTip(object):
def __call__(self, tooltip_title, trim_at=50):
Special function just to wrap our text into nice formatted
autowrapped text
:param tooltip_title:
tooltip_title = escape(tooltip_title)
tooltip_title = tooltip_title.replace('<', '<').replace('>', '>')
return tooltip_title
tooltip = _ToolTip()
class _FilesBreadCrumbs(object):
def __call__(self, repo_name, rev, paths):
if isinstance(paths, str):
paths = safe_unicode(paths)
url_l = [link_to(repo_name, url('files_home',
repo_name=repo_name,
revision=rev, f_path=''),
class_='ypjax-link')]
paths_l = paths.split('/')
for cnt, p in enumerate(paths_l):
if p != '':
url_l.append(link_to(p,
url('files_home',
revision=rev,
f_path='/'.join(paths_l[:cnt + 1])
),
class_='ypjax-link'
)
return literal('/'.join(url_l))
files_breadcrumbs = _FilesBreadCrumbs()
class CodeHtmlFormatter(HtmlFormatter):
My code Html Formatter for source codes
def wrap(self, source, outfile):
return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
def _wrap_code(self, source):
for cnt, it in enumerate(source):
i, t = it
t = '<div id="L%s">%s</div>' % (cnt + 1, t)
yield i, t
def _wrap_tablelinenos(self, inner):
dummyoutfile = StringIO.StringIO()
lncount = 0
for t, line in inner:
if t:
lncount += 1
dummyoutfile.write(line)
fl = self.linenostart
mw = len(str(lncount + fl - 1))
sp = self.linenospecial
st = self.linenostep
la = self.lineanchors
aln = self.anchorlinenos
nocls = self.noclasses
@@ -522,389 +523,393 @@ def action_parser(user_log, feed=False):
def lnk(rev, repo_name):
if isinstance(rev, BaseChangeset):
lbl = 'r%s:%s' % (rev.revision, rev.short_id)
_url = url('changeset_home', repo_name=repo_name,
revision=rev.raw_id)
title = tooltip(rev.message)
else:
lbl = '%s' % rev
_url = '#'
title = _('Changeset not found')
return link_to(lbl, _url, title=title, class_='tooltip',)
revs = []
if len(filter(lambda v: v != '', revs_ids)) > 0:
for rev in revs_ids[:revs_top_limit]:
rev = repo.get_changeset(rev)
revs.append(rev)
except ChangesetDoesNotExistError:
log.error('cannot find revision %s in this repo' % rev)
continue
cs_links = []
cs_links.append(" " + ', '.join(
[lnk(rev, repo_name) for rev in revs[:revs_limit]]
compare_view = (
' <div class="compare_view tooltip" title="%s">'
'<a href="%s">%s</a> </div>' % (
_('Show all combined changesets %s->%s') % (
revs_ids[0], revs_ids[-1]
url('changeset_home', repo_name=repo_name,
revision='%s...%s' % (revs_ids[0], revs_ids[-1])
_('compare view')
# if we have exactly one more than normally displayed
# just display it, takes less space than displaying
# "and 1 more revisions"
if len(revs_ids) == revs_limit + 1:
rev = revs[revs_limit]
cs_links.append(", " + lnk(rev, repo_name))
# hidden-by-default ones
if len(revs_ids) > revs_limit + 1:
uniq_id = revs_ids[0]
html_tmpl = (
'<span> %s <a class="show_more" id="_%s" '
'href="#more">%s</a> %s</span>'
if not feed:
cs_links.append(html_tmpl % (
_('and'),
uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
_('revisions')
html_tmpl = '<span id="%s" style="display:none">, %s </span>'
html_tmpl = '<span id="%s"> %s </span>'
morelinks = ', '.join(
[lnk(rev, repo_name) for rev in revs[revs_limit:]]
if len(revs_ids) > revs_top_limit:
morelinks += ', ...'
cs_links.append(html_tmpl % (uniq_id, morelinks))
if len(revs) > 1:
cs_links.append(compare_view)
return ''.join(cs_links)
def get_fork_name():
repo_name = action_params
return _('fork name ') + str(link_to(action_params, url('summary_home',
repo_name=repo_name,)))
def get_user_name():
user_name = action_params
return user_name
def get_users_group():
group_name = action_params
return group_name
def get_pull_request():
pull_request_id = action_params
repo_name = user_log.repository.repo_name
return link_to(_('Pull request #%s') % pull_request_id,
url('pullrequest_show', repo_name=repo_name,
pull_request_id=pull_request_id))
# action : translated str, callback(extractor), icon
action_map = {
'user_deleted_repo': (_('[deleted] repository'),
None, 'database_delete.png'),
'user_created_repo': (_('[created] repository'),
None, 'database_add.png'),
'user_created_fork': (_('[created] repository as fork'),
None, 'arrow_divide.png'),
'user_forked_repo': (_('[forked] repository'),
get_fork_name, 'arrow_divide.png'),
'user_updated_repo': (_('[updated] repository'),
None, 'database_edit.png'),
'admin_deleted_repo': (_('[delete] repository'),
'admin_created_repo': (_('[created] repository'),
'admin_forked_repo': (_('[forked] repository'),
'admin_updated_repo': (_('[updated] repository'),
'admin_created_user': (_('[created] user'),
get_user_name, 'user_add.png'),
'admin_updated_user': (_('[updated] user'),
get_user_name, 'user_edit.png'),
'admin_created_users_group': (_('[created] users group'),
get_users_group, 'group_add.png'),
'admin_updated_users_group': (_('[updated] users group'),
get_users_group, 'group_edit.png'),
'user_commented_revision': (_('[commented] on revision in repository'),
get_cs_links, 'comment_add.png'),
'user_commented_pull_request': (_('[commented] on pull request for'),
get_pull_request, 'comment_add.png'),
'user_closed_pull_request': (_('[closed] pull request for'),
get_pull_request, 'tick.png'),
'push': (_('[pushed] into'),
get_cs_links, 'script_add.png'),
'push_local': (_('[committed via RhodeCode] into repository'),
get_cs_links, 'script_edit.png'),
'push_remote': (_('[pulled from remote] into repository'),
get_cs_links, 'connect.png'),
'pull': (_('[pulled] from'),
None, 'down_16.png'),
'started_following_repo': (_('[started following] repository'),
None, 'heart_add.png'),
'stopped_following_repo': (_('[stopped following] repository'),
None, 'heart_delete.png'),
action_str = action_map.get(action, action)
if feed:
action = action_str[0].replace('[', '').replace(']', '')
action = action_str[0]\
.replace('[', '<span class="journal_highlight">')\
.replace(']', '</span>')
action_params_func = lambda: ""
if callable(action_str[1]):
action_params_func = action_str[1]
def action_parser_icon():
action = user_log.action
action_params = None
x = action.split(':')
if len(x) > 1:
action, action_params = x
tmpl = """<img src="%s%s" alt="%s"/>"""
ico = action_map.get(action, ['', '', ''])[2]
return literal(tmpl % ((url('/images/icons/')), ico, action))
# returned callbacks we need to call to get
return [lambda: literal(action), action_params_func, action_parser_icon]
#==============================================================================
# PERMS
from rhodecode.lib.auth import HasPermissionAny, HasPermissionAll, \
HasRepoPermissionAny, HasRepoPermissionAll
# GRAVATAR URL
def gravatar_url(email_address, size=30):
from pylons import url ## doh, we need to re-import url to mock it later
if(str2bool(config['app_conf'].get('use_gravatar')) and
config['app_conf'].get('alternative_gravatar_url')):
tmpl = config['app_conf'].get('alternative_gravatar_url', '')
parsed_url = urlparse.urlparse(url.current(qualified=True))
tmpl = tmpl.replace('{email}', email_address)\
.replace('{md5email}', hashlib.md5(email_address.lower()).hexdigest())\
.replace('{md5email}', hashlib.md5(email_address.lower()).hexdigest()) \
.replace('{netloc}', parsed_url.netloc)\
.replace('{scheme}', parsed_url.scheme)\
.replace('{size}', str(size))
return tmpl
if (not str2bool(config['app_conf'].get('use_gravatar')) or
not email_address or email_address == 'anonymous@rhodecode.org'):
f = lambda a, l: min(l, key=lambda x: abs(x - a))
return url("/images/user%s.png" % f(size, [14, 16, 20, 24, 30]))
ssl_enabled = 'https' == request.environ.get('wsgi.url_scheme')
default = 'identicon'
baseurl_nossl = "http://www.gravatar.com/avatar/"
baseurl_ssl = "https://secure.gravatar.com/avatar/"
baseurl = baseurl_ssl if ssl_enabled else baseurl_nossl
if isinstance(email_address, unicode):
#hashlib crashes on unicode items
email_address = safe_str(email_address)
# construct the url
gravatar_url = baseurl + hashlib.md5(email_address.lower()).hexdigest() + "?"
gravatar_url += urllib.urlencode({'d': default, 's': str(size)})
return gravatar_url
# REPO PAGER, PAGER FOR REPOSITORY
class RepoPage(Page):
def __init__(self, collection, page=1, items_per_page=20,
item_count=None, url=None, **kwargs):
"""Create a "RepoPage" instance. special pager for paging
repository
self._url_generator = url
# Safe the kwargs class-wide so they can be used in the pager() method
self.kwargs = kwargs
# Save a reference to the collection
self.original_collection = collection
self.collection = collection
# The self.page is the number of the current page.
# The first page has the number 1!
self.page = int(page) # make it int() if we get it as a string
except (ValueError, TypeError):
self.page = 1
self.items_per_page = items_per_page
# Unless the user tells us how many items the collections has
# we calculate that ourselves.
if item_count is not None:
self.item_count = item_count
self.item_count = len(self.collection)
# Compute the number of the first and last available page
if self.item_count > 0:
self.first_page = 1
self.page_count = int(math.ceil(float(self.item_count) /
self.items_per_page))
self.last_page = self.first_page + self.page_count - 1
# Make sure that the requested page number is the range of
# valid pages
if self.page > self.last_page:
self.page = self.last_page
elif self.page < self.first_page:
self.page = self.first_page
# Note: the number of items on this page can be less than
# items_per_page if the last page is not full
self.first_item = max(0, (self.item_count) - (self.page *
items_per_page))
self.last_item = ((self.item_count - 1) - items_per_page *
(self.page - 1))
self.items = list(self.collection[self.first_item:self.last_item + 1])
# Links to previous and next page
if self.page > self.first_page:
self.previous_page = self.page - 1
self.previous_page = None
if self.page < self.last_page:
self.next_page = self.page + 1
self.next_page = None
# No items available
self.first_page = None
self.page_count = 0
self.last_page = None
self.first_item = None
self.last_item = None
self.items = []
# This is a subclass of the 'list' type. Initialise the list now.
list.__init__(self, reversed(self.items))
def changed_tooltip(nodes):
Generates a html string for changed nodes in changeset page.
It limits the output to 30 entries
:param nodes: LazyNodesGenerator
if nodes:
pref = ': <br/> '
suf = ''
if len(nodes) > 30:
suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
return literal(pref + '<br/> '.join([safe_unicode(x.path)
for x in nodes[:30]]) + suf)
return ': ' + _('No Files')
def repo_link(groups_and_repos):
Makes a breadcrumbs link to repo within a group
joins » on each group to create a fancy link
ex::
group >> subgroup >> repo
:param groups_and_repos:
groups, repo_name = groups_and_repos
if not groups:
return repo_name
def make_link(group):
return link_to(group.name, url('repos_group_home',
group_name=group.group_name))
return literal(' » '.join(map(make_link, groups)) + \
" » " + repo_name)
def fancy_file_stats(stats):
Displays a fancy two colored bar for number of added/deleted
lines of code on file
:param stats: two element list of added/deleted lines of code
a, d, t = stats[0], stats[1], stats[0] + stats[1]
width = 100
unit = float(width) / (t or 1)
# needs > 9% of width to be visible or 0 to be hidden
a_p = max(9, unit * a) if a > 0 else 0
d_p = max(9, unit * d) if d > 0 else 0
p_sum = a_p + d_p
if p_sum > width:
#adjust the percentage to be == 100% since we adjusted to 9
if a_p > d_p:
a_p = a_p - (p_sum - width)
d_p = d_p - (p_sum - width)
a_v = a if a > 0 else ''
d_v = d if d > 0 else ''
def cgen(l_type):
mapping = {'tr': 'top-right-rounded-corner-mid',
'tl': 'top-left-rounded-corner-mid',
'br': 'bottom-right-rounded-corner-mid',
'bl': 'bottom-left-rounded-corner-mid'}
map_getter = lambda x: mapping[x]
if l_type == 'a' and d_v:
#case when added and deleted are present
return ' '.join(map(map_getter, ['tl', 'bl']))
if l_type == 'a' and not d_v:
return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
if l_type == 'd' and a_v:
# -*- coding: utf-8 -*-
rhodecode.tests.test_libs
~~~~~~~~~~~~~~~~~~~~~~~~~
Package for testing various lib/helper functions in rhodecode
:created_on: Jun 9, 2011
:copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
from __future__ import with_statement
import unittest
import datetime
import mock
from rhodecode.tests import *
proto = 'http'
TEST_URLS = [
('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
'%s://127.0.0.1' % proto),
('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
'%s://127.0.0.1:8080' % proto),
('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
'%s://domain.org' % proto),
('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
'8080'],
'%s://domain.org:8080' % proto),
]
proto = 'https'
TEST_URLS += [
class TestLibs(unittest.TestCase):
def test_uri_filter(self):
from rhodecode.lib.utils2 import uri_filter
for url in TEST_URLS:
self.assertEqual(uri_filter(url[0]), url[1])
def test_credentials_filter(self):
from rhodecode.lib.utils2 import credentials_filter
self.assertEqual(credentials_filter(url[0]), url[2])
def test_str2bool(self):
from rhodecode.lib.utils2 import str2bool
test_cases = [
('t', True),
('true', True),
('y', True),
('yes', True),
('on', True),
('1', True),
('Y', True),
('yeS', True),
('TRUE', True),
('T', True),
('False', False),
('F', False),
('FALSE', False),
('0', False),
('-1', False),
('', False), ]
for case in test_cases:
self.assertEqual(str2bool(case[0]), case[1])
def test_mention_extractor(self):
from rhodecode.lib.utils2 import extract_mentioned_users
sample = (
"@first hi there @marcink here's my email marcin@email.com "
"@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
"@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl "
"@marian.user just do it @marco-polo and next extract @marco_polo "
"user.dot hej ! not-needed maril@domain.org"
s = sorted([
'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john',
'marian.user', 'marco-polo', 'marco_polo'
], key=lambda k: k.lower())
self.assertEqual(s, extract_mentioned_users(sample))
def test_age(self):
import calendar
from rhodecode.lib.utils2 import age
n = datetime.datetime.now()
delt = lambda *args, **kwargs: datetime.timedelta(*args, **kwargs)
self.assertEqual(age(n), u'just now')
self.assertEqual(age(n - delt(seconds=1)), u'1 second ago')
self.assertEqual(age(n - delt(seconds=60 * 2)), u'2 minutes ago')
self.assertEqual(age(n - delt(hours=1)), u'1 hour ago')
self.assertEqual(age(n - delt(hours=24)), u'1 day ago')
self.assertEqual(age(n - delt(hours=24 * 5)), u'5 days ago')
self.assertEqual(age(n - delt(hours=24 * (calendar.mdays[n.month-1] + 2))),
u'1 month and 2 days ago')
self.assertEqual(age(n - delt(hours=24 * 400)), u'1 year and 1 month ago')
def test_tag_exctrator(self):
"hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
"[requires] [stale] [see<>=>] [see => http://url.com]"
"[requires => url] [lang => python] [just a tag]"
"[,d] [ => ULR ] [obsolete] [desc]]"
from rhodecode.lib.helpers import desc_stylize
res = desc_stylize(sample)
self.assertTrue('<div class="metatag" tag="tag">tag</div>' in res)
self.assertTrue('<div class="metatag" tag="obsolete">obsolete</div>' in res)
self.assertTrue('<div class="metatag" tag="stale">stale</div>' in res)
self.assertTrue('<div class="metatag" tag="lang">python</div>' in res)
self.assertTrue('<div class="metatag" tag="requires">requires => <a href="/url">url</a></div>' in res)
def test_alternative_gravatar(self):
from rhodecode.lib.helpers import gravatar_url
_md5 = lambda s: hashlib.md5(s).hexdigest()
def fake_conf(**kwargs):
from pylons import config
config['app_conf'] = {}
config['app_conf']['use_gravatar'] = True
config['app_conf'].update(kwargs)
return config
fake = fake_conf(alternative_gravatar_url='http://test.com/{email}')
with mock.patch('pylons.config', fake):
grav = gravatar_url(email_address='test@foo.com', size=24)
assert grav == 'http://test.com/test@foo.com'
class fake_url():
@classmethod
def current(cls, *args, **kwargs):
return 'https://server.com'
with mock.patch('pylons.url', fake_url):
from pylons import url
assert url.current() == 'http://server.com'
fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}')
em = 'test@foo.com'
grav = gravatar_url(email_address=em, size=24)
assert grav == 'http://test.com/%s' % (_md5(em))
fake = fake_conf(alternative_gravatar_url='http://test.com/{md5email}/{size}')
assert grav == 'http://test.com/%s/%s' % (_md5(em), 24)
fake = fake_conf(alternative_gravatar_url='{scheme}://{netloc}/{md5email}/{size}')
assert grav == 'https://server.com/%s/%s' % (_md5(em), 24)
Status change: