Changeset - f375751fe3fa
[Not reviewed]
default
0 6 0
Mads Kiilerich (mads) - 5 years ago 2020-11-09 16:42:43
mads@kiilerich.com
Grafted from: 788b5492ab35
lib: move extract_mentioned_usernames and MENTIONS_REGEX to webutils
6 files changed with 28 insertions and 26 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/helpers.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Helper functions
 

	
 
Consists of functions to typically be used within templates, but also
 
available to Controllers. This module is available to both as 'h'.
 
"""
 
import hashlib
 
import logging
 
import re
 
import textwrap
 
import urllib.parse
 

	
 
from beaker.cache import cache_region
 
from pygments import highlight as code_highlight
 
from pygments.formatters.html import HtmlFormatter
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea.lib.annotate import annotate_highlight
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel
 
from kallithea.lib.diffs import BIN_FILENODE, CHMOD_FILENODE, DEL_FILENODE, MOD_FILENODE, NEW_FILENODE, RENAMED_FILENODE
 
from kallithea.lib.markup_renderer import url_re
 
from kallithea.lib.pygmentsutils import get_custom_lexer
 
from kallithea.lib.utils2 import (MENTIONS_REGEX, AttributeDict, age, asbool, credentials_filter, fmt_date, link_to_ref, safe_bytes, safe_int, safe_str,
 
                                  shorter, time_to_datetime)
 
from kallithea.lib.utils2 import (AttributeDict, age, asbool, credentials_filter, fmt_date, link_to_ref, safe_bytes, safe_int, safe_str, shorter,
 
                                  time_to_datetime)
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.webutils import (HTML, Option, canonical_url, checkbox, chop_at, end_form, escape, form, format_byte_size, hidden, html_escape, js, jshtml,
 
                                    link_to, literal, password, pop_flash_messages, radio, reset, safeid, select, session_csrf_secret_name,
 
                                    session_csrf_secret_token, submit, text, textarea, truncate, url, wrap_paragraphs)
 
from kallithea.lib.webutils import (HTML, MENTIONS_REGEX, Option, canonical_url, checkbox, chop_at, end_form, escape, form, format_byte_size, hidden,
 
                                    html_escape, js, jshtml, link_to, literal, password, pop_flash_messages, radio, reset, safeid, select,
 
                                    session_csrf_secret_name, session_csrf_secret_token, submit, text, textarea, truncate, url, wrap_paragraphs)
 
from kallithea.model import db
 
from kallithea.model.changeset_status import ChangesetStatusModel
 

	
 

	
 
# mute pyflakes "imported but unused"
 
# from webutils
 
assert HTML
 
assert Option
 
assert canonical_url
 
assert checkbox
 
assert chop_at
 
assert end_form
 
assert form
 
assert format_byte_size
 
assert hidden
 
assert js
 
assert jshtml
 
assert password
 
assert pop_flash_messages
 
assert radio
 
assert reset
 
assert safeid
 
assert select
 
assert session_csrf_secret_name
 
assert session_csrf_secret_token
 
assert submit
 
assert text
 
assert textarea
 
assert wrap_paragraphs
 
# from kallithea.lib.auth
 
assert HasPermissionAny
 
assert HasRepoGroupPermissionLevel
 
assert HasRepoPermissionLevel
 
# from utils2
 
assert age
 
assert fmt_date
 
assert link_to_ref
 
assert shorter
 
assert time_to_datetime
 
# from vcs
 
assert EmptyChangeset
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a unique ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 
    """
 
    return 'C-%s-%s' % (short_id(raw_id), hashlib.md5(safe_bytes(path)).hexdigest()[:12])
 

	
 

	
 
def get_ignore_whitespace_diff(GET):
 
    """Return true if URL requested whitespace to be ignored"""
 
    return bool(GET.get('ignorews'))
 

	
 

	
 
def ignore_whitespace_link(GET, anchor=None):
 
    """Return snippet with link to current URL with whitespace ignoring toggled"""
 
    params = dict(GET)  # ignoring duplicates
 
    if get_ignore_whitespace_diff(GET):
 
        params.pop('ignorews')
 
        title = _("Show whitespace changes")
 
    else:
 
        params['ignorews'] = '1'
 
        title = _("Ignore whitespace changes")
 
    params['anchor'] = anchor
 
    return link_to(
 
        literal('<i class="icon-strike"></i>'),
 
        url.current(**params),
 
        title=title,
 
        **{'data-toggle': 'tooltip'})
 

	
 

	
 
def get_diff_context_size(GET):
 
    """Return effective context size requested in URL"""
 
    return safe_int(GET.get('context'), default=3)
 

	
 

	
 
def increase_context_link(GET, anchor=None):
 
    """Return snippet with link to current URL with double context size"""
 
    context = get_diff_context_size(GET) * 2
 
    params = dict(GET)  # ignoring duplicates
 
    params['context'] = str(context)
 
    params['anchor'] = anchor
 
    return link_to(
 
        literal('<i class="icon-sort"></i>'),
 
        url.current(**params),
 
        title=_('Increase diff context to %(num)s lines') % {'num': context},
 
        **{'data-toggle': 'tooltip'})
 

	
 

	
 
def files_breadcrumbs(repo_name, rev, paths):
 
    url_l = [link_to(repo_name, url('files_home',
kallithea/lib/markup_renderer.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.markup_renderer
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Renderer for markup languages with ability to parse using rst or markdown
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Oct 27, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import hashlib
 
import logging
 
import re
 
import traceback
 

	
 
import bleach
 
import markdown as markdown_mod
 
from docutils.core import publish_parts
 
from docutils.parsers.rst import directives
 

	
 
from kallithea.lib.utils2 import MENTIONS_REGEX
 
from kallithea.lib import webutils
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
url_re = re.compile(r'''\bhttps?://(?:[\da-zA-Z0-9@:.-]+)'''
 
                    r'''(?:[/a-zA-Z0-9_=@#~&+%.,:;?!*()-]*[/a-zA-Z0-9_=@#~])?''')
 

	
 

	
 
class MarkupRenderer(object):
 
    RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES = ['include', 'meta', 'raw']
 

	
 
    MARKDOWN_PAT = re.compile(r'md|mkdn?|mdown|markdown', re.IGNORECASE)
 
    RST_PAT = re.compile(r're?st', re.IGNORECASE)
 
    PLAIN_PAT = re.compile(r'readme', re.IGNORECASE)
 

	
 
    @classmethod
 
    def _detect_renderer(cls, source, filename):
 
        """
 
        runs detection of what renderer should be used for generating html
 
        from a markup language
 

	
 
        filename can be also explicitly a renderer name
 
        """
 

	
 
        if cls.MARKDOWN_PAT.findall(filename):
 
            return cls.markdown
 
        elif cls.RST_PAT.findall(filename):
 
            return cls.rst
 
        elif cls.PLAIN_PAT.findall(filename):
 
            return cls.rst
 
        return cls.plain
 

	
 
    @classmethod
 
    def _flavored_markdown(cls, text):
 
        """
 
        Github style flavored markdown
 

	
 
        :param text:
 
        """
 

	
 
        # Extract pre blocks.
 
        extractions = {}
 

	
 
        def pre_extraction_callback(matchobj):
 
            digest = hashlib.sha1(matchobj.group(0)).hexdigest()
 
            extractions[digest] = matchobj.group(0)
 
            return "{gfm-extraction-%s}" % digest
 
        pattern = re.compile(r'<pre>.*?</pre>', re.MULTILINE | re.DOTALL)
 
        text = re.sub(pattern, pre_extraction_callback, text)
 

	
 
        # Prevent foo_bar_baz from ending up with an italic word in the middle.
 
        def italic_callback(matchobj):
 
            s = matchobj.group(0)
 
            if list(s).count('_') >= 2:
 
                return s.replace('_', r'\_')
 
            return s
 
        text = re.sub(r'^(?! {4}|\t)\w+_\w+_\w[\w_]*', italic_callback, text)
 

	
 
        # In very clear cases, let newlines become <br /> tags.
 
        def newline_callback(matchobj):
 
            if len(matchobj.group(1)) == 1:
 
                return matchobj.group(0).rstrip() + '  \n'
 
            else:
 
                return matchobj.group(0)
 
        pattern = re.compile(r'^[\w\<][^\n]*(\n+)', re.MULTILINE)
 
        text = re.sub(pattern, newline_callback, text)
 

	
 
        # Insert pre block extractions.
 
        def pre_insert_callback(matchobj):
 
            return '\n\n' + extractions[matchobj.group(1)]
 
        text = re.sub(r'{gfm-extraction-([0-9a-f]{32})\}',
 
                      pre_insert_callback, text)
 

	
 
        return text
 

	
 
    @classmethod
 
    def render(cls, source, filename=None):
 
        """
 
        Renders a given filename using detected renderer
 
        it detects renderers based on file extension or mimetype.
 
        At last it will just do a simple html replacing new lines with <br/>
 

	
 
        >>> MarkupRenderer.render('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''', '.md')
 
        '<p><img id="a" src="http://example.com/test.jpg" style="color: red;"></p>'
 
        >>> MarkupRenderer.render('''<img class="c d" src="file://localhost/test.jpg">''', 'b.mkd')
 
        '<p><img class="c d"></p>'
 
        >>> MarkupRenderer.render('''<a href="foo">foo</a>''', 'c.mkdn')
 
        '<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.render('''<script>alert(1)</script>''', 'd.mdown')
 
        '&lt;script&gt;alert(1)&lt;/script&gt;'
 
        >>> MarkupRenderer.render('''<div onclick="alert(2)">yo</div>''', 'markdown')
 
        '<div>yo</div>'
 
        >>> MarkupRenderer.render('''<a href="javascript:alert(3)">yo</a>''', 'md')
 
        '<p><a>yo</a></p>'
 
        """
 
@@ -149,98 +149,98 @@ class MarkupRenderer(object):
 
            styles=['color'],
 
            protocols=['http', 'https', 'mailto'],
 
            )
 

	
 
    @classmethod
 
    def plain(cls, source, universal_newline=True):
 
        """
 
        >>> MarkupRenderer.plain('https://example.com/')
 
        '<br /><a href="https://example.com/">https://example.com/</a>'
 
        """
 
        if universal_newline:
 
            newline = '\n'
 
            source = newline.join(source.splitlines())
 

	
 
        def url_func(match_obj):
 
            url_full = match_obj.group(0)
 
            return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
 
        source = url_re.sub(url_func, source)
 
        return '<br />' + source.replace("\n", '<br />')
 

	
 
    @classmethod
 
    def markdown(cls, source, safe=True, flavored=False):
 
        """
 
        Convert Markdown (possibly GitHub Flavored) to INSECURE HTML, possibly
 
        with "safe" fall-back to plaintext. Output from this method should be sanitized before use.
 

	
 
        >>> MarkupRenderer.markdown('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''')
 
        '<p><img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<img class="c d" src="file://localhost/test.jpg">''')
 
        '<p><img class="c d" src="file://localhost/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<a href="foo">foo</a>''')
 
        '<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.markdown('''<script>alert(1)</script>''')
 
        '<script>alert(1)</script>'
 
        >>> MarkupRenderer.markdown('''<div onclick="alert(2)">yo</div>''')
 
        '<div onclick="alert(2)">yo</div>'
 
        >>> MarkupRenderer.markdown('''<a href="javascript:alert(3)">yo</a>''')
 
        '<p><a href="javascript:alert(3)">yo</a></p>'
 
        >>> MarkupRenderer.markdown('''## Foo''')
 
        '<h2>Foo</h2>'
 
        >>> print(MarkupRenderer.markdown('''
 
        ...     #!/bin/bash
 
        ...     echo "hello"
 
        ... '''))
 
        <table class="code-highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
 
        2</pre></div></td><td class="code"><div class="code-highlight"><pre><span></span><span class="ch">#!/bin/bash</span>
 
        <span class="nb">echo</span> <span class="s2">&quot;hello&quot;</span>
 
        </pre></div>
 
        </td></tr></table>
 
        """
 
        try:
 
            if flavored:
 
                source = cls._flavored_markdown(source)
 
            return markdown_mod.markdown(
 
                source,
 
                extensions=['markdown.extensions.codehilite', 'markdown.extensions.extra'],
 
                extension_configs={'markdown.extensions.codehilite': {'css_class': 'code-highlight'}})
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst(cls, source, safe=True):
 
        try:
 
            docutils_settings = dict([(alias, None) for alias in
 
                                cls.RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES])
 

	
 
            docutils_settings.update({'input_encoding': 'unicode',
 
                                      'report_level': 4})
 

	
 
            for k, v in docutils_settings.items():
 
                directives.register_directive(k, v)
 

	
 
            parts = publish_parts(source=source,
 
                                  writer_name="html4css1",
 
                                  settings_overrides=docutils_settings)
 

	
 
            return parts['html_title'] + parts["fragment"]
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst_with_mentions(cls, source):
 

	
 
        def wrapp(match_obj):
 
            uname = match_obj.groups()[0]
 
            return r'\ **@%(uname)s**\ ' % {'uname': uname}
 
        mention_hl = MENTIONS_REGEX.sub(wrapp, source).strip()
 
        mention_hl = webutils.MENTIONS_REGEX.sub(wrapp, source).strip()
 
        return cls.rst(mention_hl)
kallithea/lib/utils.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Utilities library for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import os
 
import re
 
import traceback
 
import urllib.error
 

	
 
import mercurial.config
 
import mercurial.error
 
import mercurial.ui
 

	
 
import kallithea.lib.conf
 
from kallithea.lib import webutils
 
from kallithea.lib.exceptions import InvalidCloneUriException
 
from kallithea.lib.utils2 import ascii_bytes, aslist, extract_mentioned_usernames, safe_bytes, safe_str
 
from kallithea.lib.utils2 import ascii_bytes, aslist, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.git.repository import GitRepository
 
from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.utils.fakemod import create_module
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model import db, meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
 

	
 

	
 
#==============================================================================
 
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
 
#==============================================================================
 
def get_repo_slug(request):
 
    _repo = request.environ['pylons.routes_dict'].get('repo_name')
 
    if _repo:
 
        _repo = _repo.rstrip('/')
 
    return _repo
 

	
 

	
 
def get_repo_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('group_name')
 
    if _group:
 
        _group = _group.rstrip('/')
 
    return _group
 

	
 

	
 
def get_user_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('id')
 
    _group = db.UserGroup.get(_group)
 
    if _group:
 
        return _group.users_group_name
 
    return None
 

	
 

	
 
def _get_permanent_id(s):
 
    """Helper for decoding stable URLs with repo ID. For a string like '_123'
 
    return 123.
 
    """
 
    by_id_match = re.match(r'^_(\d+)$', s)
 
    if by_id_match is None:
 
        return None
 
    return int(by_id_match.group(1))
 

	
 

	
 
def fix_repo_id_name(path):
 
    """
 
    Rewrite repo_name for _<ID> permanent URLs.
 

	
 
    Given a path, if the first path element is like _<ID>, return the path with
 
    this part expanded to the corresponding full repo name, else return the
 
    provided path.
 
    """
 
    first, rest = path, ''
 
    if '/' in path:
 
        first, rest_ = path.split('/', 1)
 
        rest = '/' + rest_
 
    repo_id = _get_permanent_id(first)
 
    if repo_id is not None:
 
        repo = db.Repository.get(repo_id)
 
        if repo is not None:
 
            return repo.repo_name + rest
 
    return path
 

	
 

	
 
def get_filesystem_repos(path):
 
    """
 
    Scans given path for repos and return (name,(type,path)) tuple
 

	
 
    :param path: path to scan for repositories
 
    :param recursive: recursive search and return names with subdirs in front
 
    """
 

	
 
    # remove ending slash for better results
 
    path = path.rstrip(os.sep)
 
    log.debug('now scanning in %s', path)
 

	
 
    def isdir(*n):
 
        return os.path.isdir(os.path.join(*n))
 

	
 
    for root, dirs, _files in os.walk(path):
 
        recurse_dirs = []
 
        for subdir in dirs:
 
            # skip removed repos
 
            if REMOVED_REPO_PAT.match(subdir):
 
                continue
 

	
 
            # skip .<something> dirs TODO: rly? then we should prevent creating them ...
 
            if subdir.startswith('.'):
 
                continue
 

	
 
            cur_path = os.path.join(root, subdir)
 
@@ -401,101 +402,101 @@ def repo2db_mapper(initial_repo_dict, re
 
    enable_statistics = defs.get('repo_enable_statistics')
 
    enable_downloads = defs.get('repo_enable_downloads')
 
    private = defs.get('repo_private')
 

	
 
    for name, repo in sorted(initial_repo_dict.items()):
 
        group = map_groups(name)
 
        db_repo = repo_model.get_by_repo_name(name)
 
        # found repo that is on filesystem not in Kallithea database
 
        if not db_repo:
 
            log.info('repository %s not found, creating now', name)
 
            added.append(name)
 
            desc = (repo.description
 
                    if repo.description != 'unknown'
 
                    else '%s repository' % name)
 

	
 
            try:
 
                new_repo = repo_model._create_repo(
 
                    repo_name=name,
 
                    repo_type=repo.alias,
 
                    description=desc,
 
                    repo_group=getattr(group, 'group_id', None),
 
                    owner=user,
 
                    enable_downloads=enable_downloads,
 
                    enable_statistics=enable_statistics,
 
                    private=private,
 
                    state=db.Repository.STATE_CREATED
 
                )
 
            except Exception as e:
 
                log.error('error creating %r: %s: %s', name, type(e).__name__, e)
 
                sa.rollback()
 
                continue
 
            sa.commit()
 
            # we added that repo just now, and make sure it has githook
 
            # installed, and updated server info
 
            if new_repo.repo_type == 'git':
 
                git_repo = new_repo.scm_instance
 
                ScmModel().install_git_hooks(git_repo)
 
                # update repository server-info
 
                log.debug('Running update server info')
 
                git_repo._update_server_info()
 
            new_repo.update_changeset_cache()
 
        elif install_git_hooks or overwrite_git_hooks:
 
            if db_repo.repo_type == 'git':
 
                ScmModel().install_git_hooks(db_repo.scm_instance, force=overwrite_git_hooks)
 

	
 
    removed = []
 
    # remove from database those repositories that are not in the filesystem
 
    for repo in sa.query(db.Repository).all():
 
        if repo.repo_name not in initial_repo_dict:
 
            if remove_obsolete:
 
                log.debug("Removing non-existing repository found in db `%s`",
 
                          repo.repo_name)
 
                try:
 
                    RepoModel().delete(repo, forks='detach', fs_remove=False)
 
                    sa.commit()
 
                except Exception:
 
                    #don't hold further removals on error
 
                    log.error(traceback.format_exc())
 
                    sa.rollback()
 
            removed.append(repo.repo_name)
 
    return added, removed
 

	
 

	
 
def load_extensions(root_path):
 
    try:
 
        ext = create_module('extensions', os.path.join(root_path, 'extensions.py'))
 
    except FileNotFoundError:
 
        try:
 
            ext = create_module('rc', os.path.join(root_path, 'rcextensions', '__init__.py'))
 
            log.warning('The name "rcextensions" is deprecated. Please use a file `extensions.py` instead of a directory `rcextensions`.')
 
        except FileNotFoundError:
 
            return
 

	
 
    log.info('Loaded Kallithea extensions from %s', ext)
 
    kallithea.EXTENSIONS = ext
 

	
 
    # Additional mappings that are not present in the pygments lexers
 
    kallithea.lib.conf.LANGUAGES_EXTENSIONS_MAP.update(getattr(ext, 'EXTRA_MAPPINGS', {}))
 

	
 
    # Override any INDEX_EXTENSIONS
 
    if getattr(ext, 'INDEX_EXTENSIONS', []):
 
        log.debug('settings custom INDEX_EXTENSIONS')
 
        kallithea.lib.conf.INDEX_EXTENSIONS = getattr(ext, 'INDEX_EXTENSIONS', [])
 

	
 
    # Additional INDEX_EXTENSIONS
 
    log.debug('adding extra into INDEX_EXTENSIONS')
 
    kallithea.lib.conf.INDEX_EXTENSIONS.extend(getattr(ext, 'EXTRA_INDEX_EXTENSIONS', []))
 

	
 

	
 
#==============================================================================
 
# MISC
 
#==============================================================================
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
    for name in webutils.extract_mentioned_usernames(text):
 
        user = db.User.get_by_username(name, case_insensitive=True)
 
        if user is not None and not user.is_default_user:
 
            result.add(user)
 
    return result
kallithea/lib/utils2.py
Show inline comments
 
@@ -329,208 +329,192 @@ def credentials_filter(uri):
 
    """
 

	
 
    uri = uri_filter(uri)
 
    # check if we have port
 
    if len(uri) > 2 and uri[2]:
 
        uri[2] = ':' + uri[2]
 

	
 
    return ''.join(uri)
 

	
 

	
 
def get_clone_url(clone_uri_tmpl, prefix_url, repo_name, repo_id, username=None):
 
    parsed_url = urlobject.URLObject(prefix_url)
 
    prefix = urllib.parse.unquote(parsed_url.path.rstrip('/'))
 
    try:
 
        system_user = pwd.getpwuid(os.getuid()).pw_name
 
    except NameError: # TODO: support all systems - especially Windows
 
        system_user = 'kallithea' # hardcoded default value ...
 
    args = {
 
        'scheme': parsed_url.scheme,
 
        'user': urllib.parse.quote(username or ''),
 
        'netloc': parsed_url.netloc + prefix,  # like "hostname:port/prefix" (with optional ":port" and "/prefix")
 
        'prefix': prefix, # undocumented, empty or starting with /
 
        'repo': repo_name,
 
        'repoid': str(repo_id),
 
        'system_user': system_user,
 
        'hostname': parsed_url.hostname,
 
    }
 
    url = re.sub('{([^{}]+)}', lambda m: args.get(m.group(1), m.group(0)), clone_uri_tmpl)
 

	
 
    # remove leading @ sign if it's present. Case of empty user
 
    url_obj = urlobject.URLObject(url)
 
    if not url_obj.username:
 
        url_obj = url_obj.with_username(None)
 

	
 
    return str(url_obj)
 

	
 

	
 
def short_ref_name(ref_type, ref_name):
 
    """Return short description of PR ref - revs will be truncated"""
 
    if ref_type == 'rev':
 
        return ref_name[:12]
 
    return ref_name
 

	
 

	
 
def link_to_ref(repo_name, ref_type, ref_name, rev=None):
 
    """
 
    Return full markup for a PR ref to changeset_home for a changeset.
 
    If ref_type is 'branch', it will link to changelog.
 
    ref_name is shortened if ref_type is 'rev'.
 
    if rev is specified, show it too, explicitly linking to that revision.
 
    """
 
    txt = short_ref_name(ref_type, ref_name)
 
    if ref_type == 'branch':
 
        u = webutils.url('changelog_home', repo_name=repo_name, branch=ref_name)
 
    else:
 
        u = webutils.url('changeset_home', repo_name=repo_name, revision=ref_name)
 
    l = webutils.link_to(repo_name + '#' + txt, u)
 
    if rev and ref_type != 'rev':
 
        l = webutils.literal('%s (%s)' % (l, webutils.link_to(rev[:12], webutils.url('changeset_home', repo_name=repo_name, revision=rev))))
 
    return l
 

	
 

	
 
def get_changeset_safe(repo, rev):
 
    """
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s' % type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, str):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    try:
 
        _url = sa_url.make_url(engine or '')
 
    except ArgumentError:
 
        return engine
 
    if _url.password:
 
        _url.password = 'XXXXX'
 
    return str(_url)
 

	
 

	
 
class HookEnvironmentError(Exception): pass
 

	
 

	
 
def get_hook_environment():
 
    """
 
    Get hook context by deserializing the global KALLITHEA_EXTRAS environment
 
    variable.
 

	
 
    Called early in Git out-of-process hooks to get .ini config path so the
 
    basic environment can be configured properly. Also used in all hooks to get
 
    information about the action that triggered it.
 
    """
 

	
 
    try:
 
        kallithea_extras = os.environ['KALLITHEA_EXTRAS']
 
    except KeyError:
 
        raise HookEnvironmentError("Environment variable KALLITHEA_EXTRAS not found")
 

	
 
    extras = json.loads(kallithea_extras)
 
    for k in ['username', 'repository', 'scm', 'action', 'ip', 'config']:
 
        try:
 
            extras[k]
 
        except KeyError:
 
            raise HookEnvironmentError('Missing key %s in KALLITHEA_EXTRAS %s' % (k, extras))
 

	
 
    return AttributeDict(extras)
 

	
 

	
 
def set_hook_environment(username, ip_addr, repo_name, repo_alias, action=None):
 
    """Prepare global context for running hooks by serializing data in the
 
    global KALLITHEA_EXTRAS environment variable.
 

	
 
    Most importantly, this allow Git hooks to do proper logging and updating of
 
    caches after pushes.
 

	
 
    Must always be called before anything with hooks are invoked.
 
    """
 
    extras = {
 
        'ip': ip_addr, # used in action_logger
 
        'username': username,
 
        'action': action or 'push_local', # used in process_pushed_raw_ids action_logger
 
        'repository': repo_name,
 
        'scm': repo_alias,
 
        'config': kallithea.CONFIG['__file__'], # used by git hook to read config
 
    }
 
    os.environ['KALLITHEA_EXTRAS'] = json.dumps(extras)
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets kallithea user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    try:
 
        return getattr(tmpl_context, 'authuser', None)
 
    except TypeError:  # No object (name: context) has been registered for this thread
 
        return None
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', s).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
 
    else:
 
        str_ = str_.replace(replace * 2, replace)
kallithea/lib/webutils.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.webutils
 
~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Helper functions that may rely on the current WSGI request, exposed in the TG2
 
thread-local "global" variables. It should have few dependencies so it can be
 
imported anywhere - just like the global variables can be used everywhere.
 
"""
 

	
 
import json
 
import logging
 
import random
 
import re
 

	
 
from tg import request, session
 
from webhelpers2.html import HTML, escape, literal
 
from webhelpers2.html.tags import NotGiven, Option, Options, _input
 
from webhelpers2.html.tags import _make_safe_id_component as safeid
 
from webhelpers2.html.tags import checkbox, end_form
 
from webhelpers2.html.tags import form as insecure_form
 
from webhelpers2.html.tags import hidden, link_to, password, radio
 
from webhelpers2.html.tags import select as webhelpers2_select
 
from webhelpers2.html.tags import submit, text, textarea
 
from webhelpers2.number import format_byte_size
 
from webhelpers2.text import chop_at, truncate, wrap_paragraphs
 

	
 
import kallithea
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
# mute pyflakes "imported but unused"
 
assert Option
 
assert checkbox
 
assert chop_at
 
assert end_form
 
assert escape
 
assert format_byte_size
 
assert link_to
 
assert literal
 
assert password
 
assert radio
 
assert safeid
 
assert submit
 
assert text
 
assert textarea
 
assert truncate
 
assert wrap_paragraphs
 

	
 

	
 
#
 
# General Kallithea URL handling
 
#
 

	
 
class UrlGenerator(object):
 
    """Emulate pylons.url in providing a wrapper around routes.url
 

	
 
    This code was added during migration from Pylons to Turbogears2. Pylons
 
    already provided a wrapper like this, but Turbogears2 does not.
 

	
 
    When the routing of Kallithea is changed to use less Routes and more
 
    Turbogears2-style routing, this class may disappear or change.
 

	
 
    url() (the __call__ method) returns the URL based on a route name and
 
    arguments.
 
    url.current() returns the URL of the current page with arguments applied.
 

	
 
    Refer to documentation of Routes for details:
 
    https://routes.readthedocs.io/en/latest/generating.html#generation
 
    """
 
    def __call__(self, *args, **kwargs):
 
        return request.environ['routes.url'](*args, **kwargs)
 

	
 
    def current(self, *args, **kwargs):
 
        return request.environ['routes.url'].current(*args, **kwargs)
 

	
 

	
 
url = UrlGenerator()
 

	
 

	
 
def canonical_url(*args, **kargs):
 
    '''Like url(x, qualified=True), but returns url that not only is qualified
 
    but also canonical, as configured in canonical_url'''
 
    try:
 
        parts = kallithea.CONFIG.get('canonical_url', '').split('://', 1)
 
        kargs['host'] = parts[1]
 
        kargs['protocol'] = parts[0]
 
    except IndexError:
 
        kargs['qualified'] = True
 
    return url(*args, **kargs)
 

	
 

	
 
def canonical_hostname():
 
    '''Return canonical hostname of system'''
 
    try:
 
        parts = kallithea.CONFIG.get('canonical_url', '').split('://', 1)
 
        return parts[1].split('/', 1)[0]
 
    except IndexError:
 
        parts = url('home', qualified=True).split('://', 1)
 
        return parts[1].split('/', 1)[0]
 

	
 

	
 
#
 
# Custom Webhelpers2 stuff
 
#
 

	
 
def html_escape(s):
 
    """Return string with all html escaped.
 
@@ -213,96 +214,112 @@ def _session_flash_messages(append=None,
 
    else:
 
        if append is None:  # common fast path - also used for clearing empty queue
 
            return []  # don't bother saving
 
        flash_messages = []
 
        session[key] = flash_messages
 
    if append is not None and append not in flash_messages:
 
        flash_messages.append(append)
 
    if clear:
 
        session.pop(key, None)
 
    session.save()
 
    return flash_messages
 

	
 

	
 
def flash(message, category, logf=None):
 
    """
 
    Show a message to the user _and_ log it through the specified function
 

	
 
    category: notice (default), warning, error, success
 
    logf: a custom log function - such as log.debug
 

	
 
    logf defaults to log.info, unless category equals 'success', in which
 
    case logf defaults to log.debug.
 
    """
 
    assert category in ('error', 'success', 'warning'), category
 
    if hasattr(message, '__html__'):
 
        # render to HTML for storing in cookie
 
        safe_message = str(message)
 
    else:
 
        # Apply str - the message might be an exception with __str__
 
        # Escape, so we can trust the result without further escaping, without any risk of injection
 
        safe_message = html_escape(str(message))
 
    if logf is None:
 
        logf = log.info
 
        if category == 'success':
 
            logf = log.debug
 

	
 
    logf('Flash %s: %s', category, safe_message)
 

	
 
    _session_flash_messages(append=(category, safe_message))
 

	
 

	
 
def pop_flash_messages():
 
    """Return all accumulated messages and delete them from the session.
 

	
 
    The return value is a list of ``Message`` objects.
 
    """
 
    return [_Message(category, message) for category, message in _session_flash_messages(clear=True)]
 

	
 

	
 
#
 
# Generic-ish formatting and markup
 
#
 

	
 
def js(value):
 
    """Convert Python value to the corresponding JavaScript representation.
 

	
 
    This is necessary to safely insert arbitrary values into HTML <script>
 
    sections e.g. using Mako template expression substitution.
 

	
 
    Note: Rather than using this function, it's preferable to avoid the
 
    insertion of values into HTML <script> sections altogether. Instead,
 
    data should (to the extent possible) be passed to JavaScript using
 
    data attributes or AJAX calls, eliminating the need for JS specific
 
    escaping.
 

	
 
    Note: This is not safe for use in attributes (e.g. onclick), because
 
    quotes are not escaped.
 

	
 
    Because the rules for parsing <script> varies between XHTML (where
 
    normal rules apply for any special characters) and HTML (where
 
    entities are not interpreted, but the literal string "</script>"
 
    is forbidden), the function ensures that the result never contains
 
    '&', '<' and '>', thus making it safe in both those contexts (but
 
    not in attributes).
 
    """
 
    return literal(
 
        ('(' + json.dumps(value) + ')')
 
        # In JSON, the following can only appear in string literals.
 
        .replace('&', r'\x26')
 
        .replace('<', r'\x3c')
 
        .replace('>', r'\x3e')
 
    )
 

	
 

	
 
def jshtml(val):
 
    """HTML escapes a string value, then converts the resulting string
 
    to its corresponding JavaScript representation (see `js`).
 

	
 
    This is used when a plain-text string (possibly containing special
 
    HTML characters) will be used by a script in an HTML context (e.g.
 
    element.innerHTML or jQuery's 'html' method).
 

	
 
    If in doubt, err on the side of using `jshtml` over `js`, since it's
 
    better to escape too much than too little.
 
    """
 
    return js(escape(val))
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
kallithea/tests/other/test_libs.py
Show inline comments
 
@@ -18,193 +18,193 @@ kallithea.tests.other.test_libs
 
Package for testing various lib/helper functions in kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 9, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import hashlib
 
import re
 

	
 
import mock
 
import routes
 
from dateutil import relativedelta
 
from tg import request
 
from tg.util.webtest import test_context
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.lib import webutils
 
from kallithea.lib.utils2 import AttributeDict, get_clone_url, safe_bytes
 
from kallithea.model import db
 
from kallithea.tests import base
 

	
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
 
     '%s://example.com' % proto),
 
    ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
 
                                                '8080'],
 
     '%s://example.com:8080' % proto),
 
]
 

	
 
proto = 'https'
 
TEST_URLS += [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
 
     '%s://example.com' % proto),
 
    ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
 
                                                '8080'],
 
     '%s://example.com:8080' % proto),
 
]
 

	
 

	
 
class TestLibs(base.TestController):
 

	
 
    @base.parametrize('test_url,expected,expected_creds', TEST_URLS)
 
    def test_uri_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import uri_filter
 
        assert uri_filter(test_url) == expected
 

	
 
    @base.parametrize('test_url,expected,expected_creds', TEST_URLS)
 
    def test_credentials_filter(self, test_url, expected, expected_creds):
 
        from kallithea.lib.utils2 import credentials_filter
 
        assert credentials_filter(test_url) == expected_creds
 

	
 
    @base.parametrize('str_bool,expected', [
 
                           ('t', True),
 
                           ('true', True),
 
                           ('y', True),
 
                           ('yes', True),
 
                           ('on', True),
 
                           ('1', True),
 
                           ('Y', True),
 
                           ('yeS', True),
 
                           ('Y', True),
 
                           ('TRUE', True),
 
                           ('T', True),
 
                           ('False', False),
 
                           ('F', False),
 
                           ('FALSE', False),
 
                           ('0', False),
 
    ])
 
    def test_asbool(self, str_bool, expected):
 
        from kallithea.lib.utils2 import asbool
 
        assert asbool(str_bool) == expected
 

	
 
    def test_mention_extractor(self):
 
        from kallithea.lib.utils2 import extract_mentioned_usernames
 
        from kallithea.lib.webutils import extract_mentioned_usernames
 
        sample = (
 
            "@first hi there @world here's my email username@example.com "
 
            "@lukaszb check @one_more22 it pls @ ttwelve @D[] @one@two@three "
 
            "@UPPER    @cAmEL @2one_more22 @john please see this http://org.pl "
 
            "@marian.user just do it @marco-polo and next extract @marco_polo "
 
            "user.dot  hej ! not-needed maril@example.com"
 
        )
 

	
 
        expected = set([
 
            '2one_more22', 'first', 'lukaszb', 'one', 'one_more22', 'UPPER', 'cAmEL', 'john',
 
            'marian.user', 'marco-polo', 'marco_polo', 'world'])
 
        assert expected == set(extract_mentioned_usernames(sample))
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds= -1), '1 second ago'),
 
        (dict(seconds= -60 * 2), '2 minutes ago'),
 
        (dict(hours= -1), '1 hour ago'),
 
        (dict(hours= -24), '1 day ago'),
 
        (dict(hours= -24 * 5), '5 days ago'),
 
        (dict(months= -1), '1 month ago'),
 
        (dict(months= -1, days= -2), '1 month and 2 days ago'),
 
        (dict(months= -1, days= -20), '1 month and 19 days ago'),
 
        (dict(years= -1, months= -1), '1 year and 1 month ago'),
 
        (dict(years= -1, months= -10), '1 year and 10 months ago'),
 
        (dict(years= -2, months= -4), '2 years and 4 months ago'),
 
        (dict(years= -2, months= -11), '2 years and 11 months ago'),
 
        (dict(years= -3, months= -2), '3 years and 2 months ago'),
 
    ])
 
    def test_age(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds= -1), '1 second ago'),
 
        (dict(seconds= -60 * 2), '2 minutes ago'),
 
        (dict(hours= -1), '1 hour ago'),
 
        (dict(hours= -24), '1 day ago'),
 
        (dict(hours= -24 * 5), '5 days ago'),
 
        (dict(months= -1), '1 month ago'),
 
        (dict(months= -1, days= -2), '1 month ago'),
 
        (dict(months= -1, days= -20), '1 month ago'),
 
        (dict(years= -1, months= -1), '13 months ago'),
 
        (dict(years= -1, months= -10), '22 months ago'),
 
        (dict(years= -2, months= -4), '2 years ago'),
 
        (dict(years= -2, months= -11), '3 years ago'),
 
        (dict(years= -3, months= -2), '3 years ago'),
 
        (dict(years= -4, months= -8), '5 years ago'),
 
    ])
 
    def test_age_short(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds=1), 'in 1 second'),
 
        (dict(seconds=60 * 2), 'in 2 minutes'),
 
        (dict(hours=1), 'in 1 hour'),
 
        (dict(hours=24), 'in 1 day'),
 
        (dict(hours=24 * 5), 'in 5 days'),
 
        (dict(months=1), 'in 1 month'),
 
        (dict(months=1, days=1), 'in 1 month and 1 day'),
 
        (dict(years=1, months=1), 'in 1 year and 1 month')
 
    ])
 
    def test_age_in_future(self, age_args, expected):
 
        from kallithea.lib.utils2 import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    def test_tag_extractor(self):
 
        sample = (
 
            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
 
            "[requires] [stale] [see<>=>] [see => http://example.com]"
 
            "[requires => url] [lang => python] [just a tag]"
 
            "[,d] [ => ULR ] [obsolete] [desc]]"
 
        )
 
        res = h.urlify_text(sample, stylize=True)
 
        assert '<div class="label label-meta" data-tag="tag">tag</div>' in res
 
        assert '<div class="label label-meta" data-tag="obsolete">obsolete</div>' in res
 
        assert '<div class="label label-meta" data-tag="stale">stale</div>' in res
 
        assert '<div class="label label-meta" data-tag="lang">python</div>' in res
 
        assert '<div class="label label-meta" data-tag="requires">requires =&gt; <a href="/url">url</a></div>' in res
 
        assert '<div class="label label-meta" data-tag="tag">tag</div>' in res
 

	
 
    def test_alternative_gravatar(self):
 
        _md5 = lambda s: hashlib.md5(safe_bytes(s)).hexdigest()
 

	
0 comments (0 inline, 0 general)