# -*- coding: utf-8 -*-
"""
rhodecode.lib.markup_renderer
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
Renderer for markup languages with ability to parse using rst or markdown
:created_on: Oct 27, 2011
:author: marcink
:copyright: (C) 2011-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import re
import logging
from rhodecode.lib.utils2 import safe_unicode
from rhodecode.lib.utils2 import safe_unicode, MENTIONS_REGEX
log = logging.getLogger(__name__)
class MarkupRenderer(object):
RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES = ['include', 'meta', 'raw']
MARKDOWN_PAT = re.compile(r'md|mkdn?|mdown|markdown', re.IGNORECASE)
RST_PAT = re.compile(r're?st', re.IGNORECASE)
PLAIN_PAT = re.compile(r'readme', re.IGNORECASE)
def __detect_renderer(self, source, filename=None):
runs detection of what renderer should be used for generating html
from a markup language
filename can be also explicitly a renderer name
:param source:
:param filename:
if MarkupRenderer.MARKDOWN_PAT.findall(filename):
detected_renderer = 'markdown'
elif MarkupRenderer.RST_PAT.findall(filename):
detected_renderer = 'rst'
elif MarkupRenderer.PLAIN_PAT.findall(filename):
else:
detected_renderer = 'plain'
return getattr(MarkupRenderer, detected_renderer)
def render(self, source, filename=None):
Renders a given filename using detected renderer
it detects renderers based on file extension or mimetype.
At last it will just do a simple html replacing new lines with <br/>
:param file_name:
renderer = self.__detect_renderer(source, filename)
readme_data = renderer(source)
return readme_data
@classmethod
def plain(cls, source):
source = safe_unicode(source)
def urlify_text(text):
url_pat = re.compile(r'(http[s]?://(?:[a-zA-Z]|[0-9]|[$-_@.&+]'
'|[!*\(\),]|(?:%[0-9a-fA-F][0-9a-fA-F]))+)')
def url_func(match_obj):
url_full = match_obj.groups()[0]
return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
return url_pat.sub(url_func, text)
source = urlify_text(source)
return '<br />' + source.replace("\n", '<br />')
def markdown(cls, source):
try:
import markdown as __markdown
return __markdown.markdown(source, ['codehilite'])
except ImportError:
log.warning('Install markdown to use this function')
return cls.plain(source)
def rst(cls, source):
from docutils.core import publish_parts
from docutils.parsers.rst import directives
docutils_settings = dict([(alias, None) for alias in
cls.RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES])
docutils_settings.update({'input_encoding': 'unicode',
'report_level': 4})
for k, v in docutils_settings.iteritems():
directives.register_directive(k, v)
parts = publish_parts(source=source,
writer_name="html4css1",
settings_overrides=docutils_settings)
return parts['html_title'] + parts["fragment"]
log.warning('Install docutils to use this function')
def rst_with_mentions(cls, source):
mention_pat = re.compile(r'(?:^@|\s@)(\w+)')
mention_pat = re.compile(MENTIONS_REGEX)
def wrapp(match_obj):
uname = match_obj.groups()[0]
return ' **@%(uname)s** ' % {'uname':uname}
return ' **@%(uname)s** ' % {'uname': uname}
mention_hl = mention_pat.sub(wrapp, source).strip()
return cls.rst(mention_hl)
@@ -203,203 +203,207 @@ def safe_str(unicode_, to_encoding=None)
return unicode_
if not to_encoding:
import rhodecode
DEFAULT_ENCODING = rhodecode.CONFIG.get('default_encoding','utf8')
to_encoding = DEFAULT_ENCODING
return unicode_.encode(to_encoding)
except UnicodeEncodeError:
pass
import chardet
encoding = chardet.detect(unicode_)['encoding']
print encoding
if encoding is None:
raise UnicodeEncodeError()
return unicode_.encode(encoding)
except (ImportError, UnicodeEncodeError):
return unicode_.encode(to_encoding, 'replace')
return safe_str
def engine_from_config(configuration, prefix='sqlalchemy.', **kwargs):
Custom engine_from_config functions that makes sure we use NullPool for
file based sqlite databases. This prevents errors on sqlite. This only
applies to sqlalchemy versions < 0.7.0
import sqlalchemy
from sqlalchemy import engine_from_config as efc
if int(sqlalchemy.__version__.split('.')[1]) < 7:
# This solution should work for sqlalchemy < 0.7.0, and should use
# proxy=TimerProxy() for execution time profiling
from sqlalchemy.pool import NullPool
url = configuration[prefix + 'url']
if url.startswith('sqlite'):
kwargs.update({'poolclass': NullPool})
return efc(configuration, prefix, **kwargs)
import time
from sqlalchemy import event
from sqlalchemy.engine import Engine
log = logging.getLogger('sqlalchemy.engine')
BLACK, RED, GREEN, YELLOW, BLUE, MAGENTA, CYAN, WHITE = xrange(30, 38)
engine = efc(configuration, prefix, **kwargs)
def color_sql(sql):
COLOR_SEQ = "\033[1;%dm"
COLOR_SQL = YELLOW
normal = '\x1b[0m'
return ''.join([COLOR_SEQ % COLOR_SQL, sql, normal])
if configuration['debug']:
#attach events only for debug configuration
def before_cursor_execute(conn, cursor, statement,
parameters, context, executemany):
context._query_start_time = time.time()
log.info(color_sql(">>>>> STARTING QUERY >>>>>"))
def after_cursor_execute(conn, cursor, statement,
total = time.time() - context._query_start_time
log.info(color_sql("<<<<< TOTAL TIME: %f <<<<<" % total))
event.listen(engine, "before_cursor_execute",
before_cursor_execute)
event.listen(engine, "after_cursor_execute",
after_cursor_execute)
return engine
def age(curdate):
turns a datetime into an age string.
:param curdate: datetime object
:rtype: unicode
:returns: unicode words describing age
from datetime import datetime
from webhelpers.date import time_ago_in_words
_ = lambda s: s
if not curdate:
return ''
agescales = [(_(u"year"), 3600 * 24 * 365),
(_(u"month"), 3600 * 24 * 30),
(_(u"day"), 3600 * 24),
(_(u"hour"), 3600),
(_(u"minute"), 60),
(_(u"second"), 1), ]
age = datetime.now() - curdate
age_seconds = (age.days * agescales[2][1]) + age.seconds
pos = 1
for scale in agescales:
if scale[1] <= age_seconds:
if pos == 6:
pos = 5
return '%s %s' % (time_ago_in_words(curdate,
agescales[pos][0]), _('ago'))
pos += 1
return _(u'just now')
def uri_filter(uri):
Removes user:password from given url string
:param uri:
:returns: filtered list of strings
if not uri:
proto = ''
for pat in ('https://', 'http://'):
if uri.startswith(pat):
uri = uri[len(pat):]
proto = pat
break
# remove passwords and username
uri = uri[uri.find('@') + 1:]
# get the port
cred_pos = uri.find(':')
if cred_pos == -1:
host, port = uri, None
host, port = uri[:cred_pos], uri[cred_pos + 1:]
return filter(None, [proto, host, port])
def credentials_filter(uri):
Returns a url with removed credentials
uri = uri_filter(uri)
#check if we have port
if len(uri) > 2 and uri[2]:
uri[2] = ':' + uri[2]
return ''.join(uri)
def get_changeset_safe(repo, rev):
Safe version of get_changeset if this changeset doesn't exists for a
repo it returns a Dummy one instead
:param repo:
:param rev:
from rhodecode.lib.vcs.backends.base import BaseRepository
from rhodecode.lib.vcs.exceptions import RepositoryError
if not isinstance(repo, BaseRepository):
raise Exception('You must pass an Repository '
'object as first argument got %s', type(repo))
cs = repo.get_changeset(rev)
except RepositoryError:
from rhodecode.lib.utils import EmptyChangeset
cs = EmptyChangeset(requested_revision=rev)
return cs
MENTIONS_REGEX = r'(?:^@|\s@)([a-zA-Z0-9]{1}[a-zA-Z0-9\-\_\.]+)(?:\s{1})'
def extract_mentioned_users(s):
Returns unique usernames from given string s that have @mention
:param s: string to get mentions
usrs = {}
for username in re.findall(r'(?:^@|\s@)(\w+)', s):
usrs[username] = username
usrs = set()
for username in re.findall(MENTIONS_REGEX, s):
usrs.add(username)
return sorted(usrs.keys())
return sorted(list(usrs), key=lambda k: k.lower())
rhodecode.tests.test_libs
~~~~~~~~~~~~~~~~~~~~~~~~~
Package for testing various lib/helper functions in rhodecode
:created_on: Jun 9, 2011
import unittest
from rhodecode.tests import *
proto = 'http'
TEST_URLS = [
('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
'%s://127.0.0.1' % proto),
('%s://marcink@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
('%s://marcink:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
'%s://127.0.0.1:8080' % proto),
('%s://domain.org' % proto, ['%s://' % proto, 'domain.org'],
'%s://domain.org' % proto),
('%s://user:pass@domain.org:8080' % proto, ['%s://' % proto, 'domain.org',
'8080'],
'%s://domain.org:8080' % proto),
]
proto = 'https'
TEST_URLS += [
class TestLibs(unittest.TestCase):
def test_uri_filter(self):
from rhodecode.lib.utils2 import uri_filter
for url in TEST_URLS:
self.assertEqual(uri_filter(url[0]), url[1])
def test_credentials_filter(self):
from rhodecode.lib.utils2 import credentials_filter
self.assertEqual(credentials_filter(url[0]), url[2])
def test_str2bool(self):
from rhodecode.lib.utils2 import str2bool
test_cases = [
('t', True),
('true', True),
('y', True),
('yes', True),
('on', True),
('1', True),
('Y', True),
('yeS', True),
('TRUE', True),
('T', True),
('False', False),
('F', False),
('FALSE', False),
('0', False),
('-1', False),
('', False), ]
for case in test_cases:
self.assertEqual(str2bool(case[0]), case[1])
def test_mention_extractor(self):
from rhodecode.lib.utils2 import extract_mentioned_users
sample = ("@first hi there @marcink here's my email marcin@email.com "
"@lukaszb check it pls @ ttwelve @D[] @one@two@three "
"@MARCIN @maRCiN @2one_more22")
s = ['2one_more22', 'D', 'MARCIN', 'first', 'lukaszb',
'maRCiN', 'marcink', 'one']
sample = (
"@first hi there @marcink here's my email marcin@email.com "
"@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
"@MARCIN @maRCiN @2one_more22 @john please see this http://org.pl "
"@marian.user just do it @marco-polo and next extract @marco_polo "
"user.dot hej ! not-needed maril@domain.org"
)
s = sorted([
'first', 'marcink', 'lukaszb', 'one_more22', 'MARCIN', 'maRCiN', 'john',
'marian.user', 'marco-polo', 'marco_polo'
], key=lambda k: k.lower())
self.assertEqual(s, extract_mentioned_users(sample))
Status change: