Changeset - 72e0fe2e3278
[Not reviewed]
default
0 10 0
Mads Kiilerich - 11 years ago 2015-07-24 16:37:40
madski@unity3d.com
cleanup: remove unnecessary '%s'%s formatting
10 files changed with 15 insertions and 18 deletions:
0 comments (0 inline, 0 general)
kallithea/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.__init__
 
~~~~~~~~~~~~~~~~~~
 

	
 
Kallithea, a web based repository management based on pylons
 
versioning implementation: http://www.python.org/dev/peps/pep-0386/
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, (C) 2014 Bradley M. Kuhn, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import sys
 
import platform
 

	
 
VERSION = (0, 2, 2)
 
BACKENDS = {
 
    'hg': 'Mercurial repository',
 
    'git': 'Git repository',
 
}
 

	
 
CELERY_ON = False
 
CELERY_EAGER = False
 

	
 
# link to config for pylons
 
CONFIG = {}
 

	
 
# Linked module for extensions
 
EXTENSIONS = {}
 

	
 
# BRAND controls internal references in database and config to the products
 
# own name.
 
#
 
# NOTE: If you want compatibility with a database that was originally created
 
#  for use with the RhodeCode software product, change BRAND to "rhodecode",
 
#  either by editing here or by creating a new file:
 
#  echo "BRAND = 'rhodecode'" > kallithea/brand.py
 

	
 
BRAND = "kallithea"
 
try:
 
    from kallithea.brand import BRAND
 
except ImportError:
 
    pass
 

	
 
# Prefix for the ui and settings table names
 
DB_PREFIX = (BRAND + "_") if BRAND != "kallithea" else ""
 

	
 
# Users.extern_type and .extern_name value for local users
 
EXTERN_TYPE_INTERNAL = BRAND if BRAND != 'kallithea' else 'internal'
 

	
 
# db_migrate_version.repository_id value, same as kallithea/lib/dbmigrate/migrate.cfg
 
DB_MIGRATIONS = BRAND + "_db_migrations"
 

	
 
try:
 
    from kallithea.lib import get_current_revision
 
    _rev = get_current_revision(quiet=True)
 
    if _rev and len(VERSION) > 3:
 
        VERSION += ('%s' % _rev[0],)
 
        VERSION += (_rev[0],)
 
except ImportError:
 
    pass
 

	
 
__version__ = ('.'.join((str(each) for each in VERSION[:3])))
 
__dbversion__ = 31  # defines current db version for migrations
 
__platform__ = platform.system()
 
__license__ = 'GPLv3'
 
__py_version__ = sys.version_info
 
__author__ = "Various Authors"
 
__url__ = 'https://kallithea-scm.org/'
 

	
 
is_windows = __platform__ in ['Windows']
 
is_unix = not is_windows
 

	
 
if len(VERSION) > 3:
 
    __version__ += '.'+VERSION[3]
 

	
 
    if len(VERSION) > 4:
 
        __version__ += VERSION[4]
 
    else:
 
        __version__ += '0'
kallithea/controllers/feed.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.feed
 
~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Feed controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 23, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 

	
 
from pylons import response, tmpl_context as c
 
from pylons.i18n.translation import _
 

	
 
from beaker.cache import cache_region, region_invalidate
 
from webhelpers.feedgenerator import Atom1Feed, Rss201rev2Feed
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import LoginRequired, HasRepoPermissionAnyDecorator
 
from kallithea.lib.base import BaseRepoController
 
from kallithea.lib.diffs import DiffProcessor, LimitedDiffContainer
 
from kallithea.model.db import CacheInvalidation
 
from kallithea.lib.utils2 import safe_int, str2bool, safe_unicode
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FeedController(BaseRepoController):
 

	
 
    @LoginRequired(api_access=True)
 
    @HasRepoPermissionAnyDecorator('repository.read', 'repository.write',
 
                                   'repository.admin')
 
    def __before__(self):
 
        super(FeedController, self).__before__()
 
        #common values for feeds
 
        self.description = _('Changes on %s repository')
 
        self.title = self.title = _('%s %s feed') % (c.site_name, '%s')
 
        self.language = 'en-us'
 
        self.ttl = "5"
 
        import kallithea
 
        CONF = kallithea.CONFIG
 
        self.include_diff = str2bool(CONF.get('rss_include_diff', False))
 
        self.feed_nr = safe_int(CONF.get('rss_items_per_page', 20))
 
        # we need to protect from parsing huge diffs here other way
 
        # we can kill the server
 
        self.feed_diff_limit = safe_int(CONF.get('rss_cut_off_limit', 32 * 1024))
 

	
 
    def _get_title(self, cs):
 
        return "%s" % (
 
            h.shorter(cs.message, 160)
 
        )
 
        return h.shorter(cs.message, 160)
 

	
 
    def __changes(self, cs):
 
        changes = []
 
        diff_processor = DiffProcessor(cs.diff(),
 
                                       diff_limit=self.feed_diff_limit)
 
        _parsed = diff_processor.prepare(inline_diff=False)
 
        limited_diff = False
 
        if isinstance(_parsed, LimitedDiffContainer):
 
            limited_diff = True
 

	
 
        for st in _parsed:
 
            st.update({'added': st['stats']['added'],
 
                       'removed': st['stats']['deleted']})
 
            changes.append('\n %(operation)s %(filename)s '
 
                           '(%(added)s lines added, %(removed)s lines removed)'
 
                            % st)
 
        if limited_diff:
 
            changes = changes + ['\n ' +
 
                                 _('Changeset was too big and was cut off...')]
 
        return diff_processor, changes
 

	
 
    def __get_desc(self, cs):
 
        desc_msg = [(_('%s committed on %s')
 
                     % (h.person(cs.author), h.fmt_date(cs.date))) + '<br/>']
 
        #branches, tags, bookmarks
 
        if cs.branch:
 
            desc_msg.append('branch: %s<br/>' % cs.branch)
 
        if h.is_hg(c.db_repo_scm_instance):
 
            for book in cs.bookmarks:
 
                desc_msg.append('bookmark: %s<br/>' % book)
 
        for tag in cs.tags:
 
            desc_msg.append('tag: %s<br/>' % tag)
 
        diff_processor, changes = self.__changes(cs)
 
        # rev link
 
        _url = h.canonical_url('changeset_home', repo_name=c.db_repo.repo_name,
 
                   revision=cs.raw_id)
 
        desc_msg.append('changeset: <a href="%s">%s</a>' % (_url, cs.raw_id[:8]))
 

	
 
        desc_msg.append('<pre>')
 
        desc_msg.append(h.urlify_text(cs.message))
 
        desc_msg.append('\n')
 
        desc_msg.extend(changes)
 
        if self.include_diff:
 
            desc_msg.append('\n\n')
 
            desc_msg.append(diff_processor.as_raw())
 
        desc_msg.append('</pre>')
 
        return map(safe_unicode, desc_msg)
 

	
 
    def atom(self, repo_name):
 
        """Produce an atom-1.0 feed via feedgenerator module"""
 

	
 
        @cache_region('long_term')
 
        def _get_feed_from_cache(key, kind):
 
            feed = Atom1Feed(
 
                 title=self.title % repo_name,
 
                 link=h.canonical_url('summary_home', repo_name=repo_name),
 
                 description=self.description % repo_name,
 
                 language=self.language,
 
                 ttl=self.ttl
 
            )
 

	
 
            for cs in reversed(list(c.db_repo_scm_instance[-self.feed_nr:])):
 
                feed.add_item(title=self._get_title(cs),
 
                              link=h.canonical_url('changeset_home', repo_name=repo_name,
 
                                       revision=cs.raw_id),
 
                              author_name=cs.author,
 
                              description=''.join(self.__get_desc(cs)),
 
                              pubdate=cs.date,
 
                              )
 

	
 
            response.content_type = feed.mime_type
 
            return feed.writeString('utf-8')
 

	
 
        kind = 'ATOM'
 
        valid = CacheInvalidation.test_and_set_valid(repo_name, kind)
 
        if not valid:
 
            region_invalidate(_get_feed_from_cache, None, repo_name, kind)
 
        return _get_feed_from_cache(repo_name, kind)
 

	
 
    def rss(self, repo_name):
 
        """Produce an rss2 feed via feedgenerator module"""
 

	
 
        @cache_region('long_term')
 
        def _get_feed_from_cache(key, kind):
 
            feed = Rss201rev2Feed(
 
                title=self.title % repo_name,
 
                link=h.canonical_url('summary_home', repo_name=repo_name),
 
                description=self.description % repo_name,
 
                language=self.language,
 
                ttl=self.ttl
 
            )
 

	
 
            for cs in reversed(list(c.db_repo_scm_instance[-self.feed_nr:])):
 
                feed.add_item(title=self._get_title(cs),
 
                              link=h.canonical_url('changeset_home', repo_name=repo_name,
 
                                       revision=cs.raw_id),
 
                              author_name=cs.author,
 
                              description=''.join(self.__get_desc(cs)),
 
                              pubdate=cs.date,
 
                             )
 

	
 
            response.content_type = feed.mime_type
 
            return feed.writeString('utf-8')
 

	
 
        kind = 'RSS'
 
        valid = CacheInvalidation.test_and_set_valid(repo_name, kind)
 
        if not valid:
 
            region_invalidate(_get_feed_from_cache, None, repo_name, kind)
 
        return _get_feed_from_cache(repo_name, kind)
kallithea/lib/diffs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.diffs
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Set of diffing helpers, previously part of vcs
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 4, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import re
 
import difflib
 
import logging
 

	
 
from itertools import tee, imap
 

	
 
from pylons.i18n.translation import _
 

	
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode, SubModuleNode
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.helpers import escape
 
from kallithea.lib.utils2 import safe_unicode
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def wrap_to_table(str_):
 
    return '''<table class="code-difftable">
 
                <tr class="line no-comment">
 
                <td class="lineno new"></td>
 
                <td class="code no-comment"><pre>%s</pre></td>
 
                </tr>
 
              </table>''' % str_
 

	
 

	
 
def wrapped_diff(filenode_old, filenode_new, cut_off_limit=None,
 
                ignore_whitespace=True, line_context=3,
 
                enable_comments=False):
 
    """
 
    returns a wrapped diff into a table, checks for cut_off_limit and presents
 
    proper message
 
    """
 

	
 
    if filenode_old is None:
 
        filenode_old = FileNode(filenode_new.path, '', EmptyChangeset())
 

	
 
    if filenode_old.is_binary or filenode_new.is_binary:
 
        diff = wrap_to_table(_('Binary file'))
 
        stats = (0, 0)
 
        size = 0
 

	
 
    elif cut_off_limit != -1 and (cut_off_limit is None or
 
    (filenode_old.size < cut_off_limit and filenode_new.size < cut_off_limit)):
 

	
 
        f_gitdiff = get_gitdiff(filenode_old, filenode_new,
 
                                ignore_whitespace=ignore_whitespace,
 
                                context=line_context)
 
        diff_processor = DiffProcessor(f_gitdiff, format='gitdiff')
 

	
 
        diff = diff_processor.as_html(enable_comments=enable_comments)
 
        stats = diff_processor.stat()
 
        size = len(diff or '')
 
    else:
 
        diff = wrap_to_table(_('Changeset was too big and was cut off, use '
 
                               'diff menu to display this diff'))
 
        stats = (0, 0)
 
        size = 0
 
    if not diff:
 
        submodules = filter(lambda o: isinstance(o, SubModuleNode),
 
                            [filenode_new, filenode_old])
 
        if submodules:
 
            diff = wrap_to_table(escape('Submodule %r' % submodules[0]))
 
        else:
 
            diff = wrap_to_table(_('No changes detected'))
 

	
 
    cs1 = filenode_old.changeset.raw_id
 
    cs2 = filenode_new.changeset.raw_id
 

	
 
    return size, cs1, cs2, diff, stats
 

	
 

	
 
def get_gitdiff(filenode_old, filenode_new, ignore_whitespace=True, context=3):
 
    """
 
    Returns git style diff between given ``filenode_old`` and ``filenode_new``.
 

	
 
    :param ignore_whitespace: ignore whitespaces in diff
 
    """
 
    # make sure we pass in default context
 
    context = context or 3
 
    submodules = filter(lambda o: isinstance(o, SubModuleNode),
 
                        [filenode_new, filenode_old])
 
    if submodules:
 
        return ''
 

	
 
    for filenode in (filenode_old, filenode_new):
 
        if not isinstance(filenode, FileNode):
 
            raise VCSError("Given object should be FileNode object, not %s"
 
                % filenode.__class__)
 

	
 
    repo = filenode_new.changeset.repository
 
    old_raw_id = getattr(filenode_old.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 
    new_raw_id = getattr(filenode_new.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 

	
 
    vcs_gitdiff = repo.get_diff(old_raw_id, new_raw_id, filenode_new.path,
 
                                ignore_whitespace, context)
 
    return vcs_gitdiff
 

	
 
NEW_FILENODE = 1
 
DEL_FILENODE = 2
 
MOD_FILENODE = 3
 
RENAMED_FILENODE = 4
 
COPIED_FILENODE = 5
 
CHMOD_FILENODE = 6
 
BIN_FILENODE = 7
 

	
 

	
 
class DiffLimitExceeded(Exception):
 
    pass
 

	
 

	
 
class LimitedDiffContainer(object):
 

	
 
    def __init__(self, diff_limit, cur_diff_size, diff):
 
        self.diff = diff
 
        self.diff_limit = diff_limit
 
        self.cur_diff_size = cur_diff_size
 

	
 
    def __iter__(self):
 
        for l in self.diff:
 
            yield l
 

	
 

	
 
class DiffProcessor(object):
 
    """
 
    Give it a unified or git diff and it returns a list of the files that were
 
    mentioned in the diff together with a dict of meta information that
 
    can be used to render it in a HTML template.
 
    """
 
    _chunk_re = re.compile(r'^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*)')
 
    _newline_marker = re.compile(r'^\\ No newline at end of file')
 
    _git_header_re = re.compile(r"""
 
        # has already been split on this:
 
        # ^diff[ ]--git
 
            [ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
        (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%\n
 
           ^rename[ ]from[ ](?P<rename_from>.+)\n
 
           ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
        (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
           ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
        (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
        (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
        (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
            \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
        (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
        (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
        (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    """, re.VERBOSE | re.MULTILINE)
 
    _hg_header_re = re.compile(r"""
 
        # has already been split on this:
 
        # ^diff[ ]--git
 
            [ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
        (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
           ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
        (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%(?:\n|$))?
 
        (?:^rename[ ]from[ ](?P<rename_from>.+)\n
 
           ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
        (?:^copy[ ]from[ ](?P<copy_from>.+)\n
 
           ^copy[ ]to[ ](?P<copy_to>.+)(?:\n|$))?
 
        (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
        (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
        (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
            \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
        (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
        (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
        (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    """, re.VERBOSE | re.MULTILINE)
 

	
 
    # Used for inline highlighter word split, must match the substitutions in _escaper
 
    _token_re = re.compile(r'()(&amp;|&lt;|&gt;|<u>\t</u>|<u class="cr"></u>| <i></i>|\W+?)')
 

	
 
    _escape_re = re.compile(r'(&)|(<)|(>)|(\t)|(\r)|(?<=.)( \n| $)')
 

	
 

	
 
    def __init__(self, diff, vcs='hg', format='gitdiff', diff_limit=None):
 
        """
 
        :param diff:   a text in diff format
 
        :param vcs: type of version control hg or git
 
        :param format: format of diff passed, `udiff` or `gitdiff`
 
        :param diff_limit: define the size of diff that is considered "big"
 
            based on that parameter cut off will be triggered, set to None
 
            to show full diff
 
        """
 
        if not isinstance(diff, basestring):
 
            raise Exception('Diff must be a basestring got %s instead' % type(diff))
 

	
 
        self._diff = diff
 
        self._format = format
 
        self.adds = 0
 
        self.removes = 0
 
        # calculate diff size
 
        self.diff_size = len(diff)
 
        self.diff_limit = diff_limit
 
        self.cur_diff_size = 0
 
        self.parsed = False
 
        self.parsed_diff = []
 
        self.vcs = vcs
 

	
 
        if format == 'gitdiff':
 
            self.differ = self._highlight_line_difflib
 
            self._parser = self._parse_gitdiff
 
        else:
 
            self.differ = self._highlight_line_udiff
 
            self._parser = self._parse_udiff
 

	
 
    def _copy_iterator(self):
 
        """
 
        make a fresh copy of generator, we should not iterate thru
 
        an original as it's needed for repeating operations on
 
        this instance of DiffProcessor
 
        """
 
        self.__udiff, iterator_copy = tee(self.__udiff)
 
        return iterator_copy
 

	
 
    def _escaper(self, string):
 
        """
 
        Escaper for diff escapes special chars and checks the diff limit
 

	
 
        :param string:
 
        """
 

	
 
        self.cur_diff_size += len(string)
 

	
 
        # escaper gets iterated on each .next() call and it checks if each
 
        # parsed line doesn't exceed the diff limit
 
        if self.diff_limit is not None and self.cur_diff_size > self.diff_limit:
 
            raise DiffLimitExceeded('Diff Limit Exceeded')
 

	
 
        def substitute(m):
 
            groups = m.groups()
 
            if groups[0]:
 
                return '&amp;'
 
            if groups[1]:
 
                return '&lt;'
 
            if groups[2]:
 
                return '&gt;'
 
            if groups[3]:
 
                return '<u>\t</u>'
 
            if groups[4]:
 
                return '<u class="cr"></u>'
 
            if groups[5]:
 
                return ' <i></i>'
 
            assert False
 

	
 
        return self._escape_re.sub(substitute, safe_unicode(string))
 

	
 
    def _line_counter(self, l):
 
        """
 
        Checks each line and bumps total adds/removes for this diff
 

	
 
        :param l:
 
        """
 
        if l.startswith('+') and not l.startswith('+++'):
 
            self.adds += 1
 
        elif l.startswith('-') and not l.startswith('---'):
 
            self.removes += 1
 
        return safe_unicode(l)
 

	
 
    def _highlight_line_difflib(self, line, next_):
 
        """
 
        Highlight inline changes in both lines.
 
        """
 

	
 
        if line['action'] == 'del':
 
            old, new = line, next_
 
        else:
 
            old, new = next_, line
 

	
 
        oldwords = self._token_re.split(old['line'])
 
        newwords = self._token_re.split(new['line'])
 
        sequence = difflib.SequenceMatcher(None, oldwords, newwords)
 

	
 
        oldfragments, newfragments = [], []
 
        for tag, i1, i2, j1, j2 in sequence.get_opcodes():
 
            oldfrag = ''.join(oldwords[i1:i2])
 
            newfrag = ''.join(newwords[j1:j2])
 
            if tag != 'equal':
 
                if oldfrag:
 
                    oldfrag = '<del>%s</del>' % oldfrag
 
                if newfrag:
 
                    newfrag = '<ins>%s</ins>' % newfrag
 
            oldfragments.append(oldfrag)
 
            newfragments.append(newfrag)
 

	
 
        old['line'] = "".join(oldfragments)
 
        new['line'] = "".join(newfragments)
 

	
 
    def _highlight_line_udiff(self, line, next_):
 
        """
 
        Highlight inline changes in both lines.
 
        """
 
        start = 0
 
        limit = min(len(line['line']), len(next_['line']))
 
        while start < limit and line['line'][start] == next_['line'][start]:
 
            start += 1
 
        end = -1
 
        limit -= start
 
        while -end <= limit and line['line'][end] == next_['line'][end]:
 
            end -= 1
 
        end += 1
 
        if start or end:
 
            def do(l):
 
                last = end + len(l['line'])
 
                if l['action'] == 'add':
 
                    tag = 'ins'
 
                else:
 
                    tag = 'del'
 
                l['line'] = '%s<%s>%s</%s>%s' % (
 
                    l['line'][:start],
 
                    tag,
 
                    l['line'][start:last],
 
                    tag,
 
                    l['line'][last:]
 
                )
 
            do(line)
 
            do(next_)
 

	
 
    def _get_header(self, diff_chunk):
 
        """
 
        parses the diff header, and returns parts, and leftover diff
 
        parts consists of 14 elements::
 

	
 
            a_path, b_path, similarity_index, rename_from, rename_to,
 
            old_mode, new_mode, new_file_mode, deleted_file_mode,
 
            a_blob_id, b_blob_id, b_mode, a_file, b_file
 

	
 
        :param diff_chunk:
 
        """
 

	
 
        match = None
 
        if self.vcs == 'git':
 
            match = self._git_header_re.match(diff_chunk)
 
        elif self.vcs == 'hg':
 
            match = self._hg_header_re.match(diff_chunk)
 
        if match is None:
 
            raise Exception('diff not recognized as valid %s diff' % self.vcs)
 
        groups = match.groupdict()
 
        rest = diff_chunk[match.end():]
 
        if rest and not rest.startswith('@') and not rest.startswith('literal ') and not rest.startswith('delta '):
 
            raise Exception('cannot parse diff header: %r followed by %r' % (diff_chunk[:match.end()], rest[:1000]))
 
        difflines = imap(self._escaper, re.findall(r'.*\n|.+$', rest)) # don't split on \r as str.splitlines do
 
        return groups, difflines
 

	
 
    def _clean_line(self, line, command):
 
        if command in ['+', '-', ' ']:
 
            #only modify the line if it's actually a diff thing
 
            line = line[1:]
 
        return line
 

	
 
    def _parse_gitdiff(self, inline_diff=True):
 
        _files = []
 
        diff_container = lambda arg: arg
 

	
 
        ##split the diff in chunks of separate --git a/file b/file chunks
 
        for raw_diff in ('\n' + self._diff).split('\ndiff --git')[1:]:
 
            head, diff = self._get_header(raw_diff)
 

	
 
            op = None
 
            stats = {
 
                'added': 0,
 
                'deleted': 0,
 
                'binary': False,
 
                'ops': {},
 
            }
 

	
 
            if head['deleted_file_mode']:
 
                op = 'D'
 
                stats['binary'] = True
 
                stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
            elif head['new_file_mode']:
 
                op = 'A'
 
                stats['binary'] = True
 
                stats['ops'][NEW_FILENODE] = 'new file %s' % head['new_file_mode']
 
            else:  # modify operation, can be cp, rename, chmod
 
                # CHMOD
 
                if head['new_mode'] and head['old_mode']:
 
                    op = 'M'
 
                    stats['binary'] = True
 
                    stats['ops'][CHMOD_FILENODE] = ('modified file chmod %s => %s'
 
                                        % (head['old_mode'], head['new_mode']))
 
                # RENAME
 
                if (head['rename_from'] and head['rename_to']
 
                      and head['rename_from'] != head['rename_to']):
 
                    op = 'R'
 
                    stats['binary'] = True
 
                    stats['ops'][RENAMED_FILENODE] = ('file renamed from %s to %s'
 
                                    % (head['rename_from'], head['rename_to']))
 
                # COPY
 
                if head.get('copy_from') and head.get('copy_to'):
 
                    op = 'M'
 
                    stats['binary'] = True
 
                    stats['ops'][COPIED_FILENODE] = ('file copied from %s to %s'
 
                                        % (head['copy_from'], head['copy_to']))
 
                # FALL BACK: detect missed old style add or remove
 
                if op is None:
 
                    if not head['a_file'] and head['b_file']:
 
                        op = 'A'
 
                        stats['binary'] = True
 
                        stats['ops'][NEW_FILENODE] = 'new file'
 

	
 
                    elif head['a_file'] and not head['b_file']:
 
                        op = 'D'
 
                        stats['binary'] = True
 
                        stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
                # it's not ADD not DELETE
 
                if op is None:
 
                    op = 'M'
 
                    stats['binary'] = True
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 

	
 
            # a real non-binary diff
 
            if head['a_file'] or head['b_file']:
 
                try:
 
                    chunks, _stats = self._parse_lines(diff)
 
                    stats['binary'] = False
 
                    stats['added'] = _stats[0]
 
                    stats['deleted'] = _stats[1]
 
                    # explicit mark that it's a modified file
 
                    if op == 'M':
 
                        stats['ops'][MOD_FILENODE] = 'modified file'
 

	
 
                except DiffLimitExceeded:
 
                    diff_container = lambda _diff: \
 
                        LimitedDiffContainer(self.diff_limit,
 
                                            self.cur_diff_size, _diff)
 
                    break
 
            else:  # Git binary patch (or empty diff)
 
                # Git binary patch
 
                if head['bin_patch']:
 
                    stats['ops'][BIN_FILENODE] = 'binary diff not shown'
 
                chunks = []
 

	
 
            if op == 'D' and chunks:
 
                # a way of seeing deleted content could perhaps be nice - but
 
                # not with the current UI
 
                chunks = []
 

	
 
            chunks.insert(0, [{
 
                'old_lineno': '',
 
                'new_lineno': '',
 
                'action':     'context',
 
                'line':       msg,
 
                } for _op, msg in stats['ops'].iteritems()
 
                  if _op not in [MOD_FILENODE]])
 

	
 
            _files.append({
 
                'filename':         head['b_path'],
 
                'old_revision':     head['a_blob_id'],
 
                'new_revision':     head['b_blob_id'],
 
                'chunks':           chunks,
 
                'operation':        op,
 
                'stats':            stats,
 
            })
 

	
 
        if not inline_diff:
 
            return diff_container(_files)
 

	
 
        # highlight inline changes
 
        for diff_data in _files:
 
            for chunk in diff_data['chunks']:
 
                lineiter = iter(chunk)
 
                try:
 
                    while 1:
 
                        line = lineiter.next()
 
                        if line['action'] not in ['unmod', 'context']:
 
                            nextline = lineiter.next()
 
                            if nextline['action'] in ['unmod', 'context'] or \
 
                               nextline['action'] == line['action']:
 
                                continue
 
                            self.differ(line, nextline)
 
                except StopIteration:
 
                    pass
 

	
 
        return diff_container(_files)
 

	
 
    def _parse_udiff(self, inline_diff=True):
 
        raise NotImplementedError()
 

	
 
    def _parse_lines(self, diff):
 
        """
 
        Parse the diff and return data for the template.
 
        """
 

	
 
        stats = [0, 0]
 
        (old_line, old_end, new_line, new_end) = (None, None, None, None)
 

	
 
        try:
 
            chunks = []
 
            line = diff.next()
 

	
 
            while True:
 
                lines = []
 
                chunks.append(lines)
 

	
 
                match = self._chunk_re.match(line)
 

	
 
                if not match:
 
                    raise Exception('error parsing diff @@ line %r' % line)
 

	
 
                gr = match.groups()
 
                (old_line, old_end,
 
                 new_line, new_end) = [int(x or 1) for x in gr[:-1]]
 
                old_line -= 1
 
                new_line -= 1
 

	
 
                context = len(gr) == 5
 
                old_end += old_line
 
                new_end += new_line
 

	
 
                if context:
 
                    # skip context only if it's first line
 
                    if int(gr[0]) > 1:
 
                        lines.append({
 
                            'old_lineno': '...',
 
                            'new_lineno': '...',
 
                            'action':     'context',
 
                            'line':       line,
 
                        })
 

	
 
                line = diff.next()
 

	
 
                while old_line < old_end or new_line < new_end:
 
                    if not line:
 
                        raise Exception('error parsing diff - empty line at -%s+%s' % (old_line, new_line))
 

	
 
                    affects_old = affects_new = False
 

	
 
                    command = line[0]
 
                    if command == '+':
 
                        affects_new = True
 
                        action = 'add'
 
                        stats[0] += 1
 
                    elif command == '-':
 
                        affects_old = True
 
                        action = 'del'
 
                        stats[1] += 1
 
                    elif command == ' ':
 
                        affects_old = affects_new = True
 
                        action = 'unmod'
 
                    else:
 
                        raise Exception('error parsing diff - unknown command in line %r at -%s+%s' % (line, old_line, new_line))
 

	
 
                    if not self._newline_marker.match(line):
 
                        old_line += affects_old
 
                        new_line += affects_new
 
                        lines.append({
 
                            'old_lineno':   affects_old and old_line or '',
 
                            'new_lineno':   affects_new and new_line or '',
 
                            'action':       action,
 
                            'line':         self._clean_line(line, command)
 
                        })
 

	
 
                    line = diff.next()
 

	
 
                    if self._newline_marker.match(line):
 
                        # we need to append to lines, since this is not
 
                        # counted in the line specs of diff
 
                        lines.append({
 
                            'old_lineno':   '...',
 
                            'new_lineno':   '...',
 
                            'action':       'context',
 
                            'line':         self._clean_line(line, command)
 
                        })
 
                        line = diff.next()
 
                if old_line > old_end:
 
                        raise Exception('error parsing diff - more than %s "-" lines at -%s+%s' % (old_end, old_line, new_line))
 
                if new_line > new_end:
 
                        raise Exception('error parsing diff - more than %s "+" lines at -%s+%s' % (new_end, old_line, new_line))
 
        except StopIteration:
 
            pass
 
        if old_line != old_end or new_line != new_end:
 
            raise Exception('diff processing broken when old %s<>%s or new %s<>%s line %r' % (old_line, old_end, new_line, new_end, line))
 

	
 
        return chunks, stats
 

	
 
    def _safe_id(self, idstring):
 
        """Make a string safe for including in an id attribute.
 

	
 
        The HTML spec says that id attributes 'must begin with
 
        a letter ([A-Za-z]) and may be followed by any number
 
        of letters, digits ([0-9]), hyphens ("-"), underscores
 
        ("_"), colons (":"), and periods (".")'. These regexps
 
        are slightly over-zealous, in that they remove colons
 
        and periods unnecessarily.
 

	
 
        Whitespace is transformed into underscores, and then
 
        anything which is not a hyphen or a character that
 
        matches \w (alphanumerics and underscore) is removed.
 

	
 
        """
 
        # Transform all whitespace to underscore
 
        idstring = re.sub(r'\s', "_", '%s' % idstring)
 
        idstring = re.sub(r'\s', "_", idstring)
 
        # Remove everything that is not a hyphen or a member of \w
 
        idstring = re.sub(r'(?!-)\W', "", idstring).lower()
 
        return idstring
 

	
 
    def prepare(self, inline_diff=True):
 
        """
 
        Prepare the passed udiff for HTML rendering. It'l return a list
 
        of dicts with diff information
 
        """
 
        parsed = self._parser(inline_diff=inline_diff)
 
        self.parsed = True
 
        self.parsed_diff = parsed
 
        return parsed
 

	
 
    def as_raw(self, diff_lines=None):
 
        """
 
        Returns raw string diff
 
        """
 
        return self._diff
 
        #return u''.join(imap(self._line_counter, self._diff.splitlines(1)))
 

	
 
    def as_html(self, table_class='code-difftable', line_class='line',
 
                old_lineno_class='lineno old', new_lineno_class='lineno new',
 
                no_lineno_class='lineno',
 
                code_class='code', enable_comments=False, parsed_lines=None):
 
        """
 
        Return given diff as html table with customized css classes
 
        """
 
        def _link_to_if(condition, label, url):
 
            """
 
            Generates a link if condition is meet or just the label if not.
 
            """
 

	
 
            if condition:
 
                return '''<a href="%(url)s">%(label)s</a>''' % {
 
                    'url': url,
 
                    'label': label
 
                }
 
            else:
 
                return label
 
        if not self.parsed:
 
            self.prepare()
 

	
 
        diff_lines = self.parsed_diff
 
        if parsed_lines:
 
            diff_lines = parsed_lines
 

	
 
        _html_empty = True
 
        _html = []
 
        _html.append('''<table class="%(table_class)s">\n''' % {
 
            'table_class': table_class
 
        })
 

	
 
        for diff in diff_lines:
 
            for line in diff['chunks']:
 
                _html_empty = False
 
                for change in line:
 
                    _html.append('''<tr class="%(lc)s %(action)s">\n''' % {
 
                        'lc': line_class,
 
                        'action': change['action']
 
                    })
 
                    anchor_old_id = ''
 
                    anchor_new_id = ''
 
                    anchor_old = "%(filename)s_o%(oldline_no)s" % {
 
                        'filename': self._safe_id(diff['filename']),
 
                        'oldline_no': change['old_lineno']
 
                    }
 
                    anchor_new = "%(filename)s_n%(oldline_no)s" % {
 
                        'filename': self._safe_id(diff['filename']),
 
                        'oldline_no': change['new_lineno']
 
                    }
 
                    cond_old = (change['old_lineno'] != '...' and
 
                                change['old_lineno'])
 
                    cond_new = (change['new_lineno'] != '...' and
 
                                change['new_lineno'])
 
                    no_lineno = (change['old_lineno'] == '...' and
 
                                 change['new_lineno'] == '...')
 
                    if cond_old:
 
                        anchor_old_id = 'id="%s"' % anchor_old
 
                    if cond_new:
 
                        anchor_new_id = 'id="%s"' % anchor_new
 
                    ###########################################################
 
                    # OLD LINE NUMBER
 
                    ###########################################################
 
                    _html.append('''\t<td %(a_id)s class="%(olc)s" %(colspan)s>''' % {
 
                        'a_id': anchor_old_id,
 
                        'olc': no_lineno_class if no_lineno else old_lineno_class,
 
                        'colspan': 'colspan="2"' if no_lineno else ''
 
                    })
 

	
 
                    _html.append('''%(link)s''' % {
 
                        'link': _link_to_if(True, change['old_lineno'],
 
                                            '#%s' % anchor_old)
 
                    })
 
                    _html.append('''</td>\n''')
 
                    ###########################################################
 
                    # NEW LINE NUMBER
 
                    ###########################################################
 

	
 
                    if not no_lineno:
 
                        _html.append('''\t<td %(a_id)s class="%(nlc)s">''' % {
 
                            'a_id': anchor_new_id,
 
                            'nlc': new_lineno_class
 
                        })
 

	
 
                        _html.append('''%(link)s''' % {
 
                            'link': _link_to_if(True, change['new_lineno'],
 
                                                '#%s' % anchor_new)
 
                        })
 
                        _html.append('''</td>\n''')
 
                    ###########################################################
 
                    # CODE
 
                    ###########################################################
 
                    comments = '' if enable_comments else 'no-comment'
 
                    _html.append('''\t<td class="%(cc)s %(inc)s">''' % {
 
                        'cc': code_class,
 
                        'inc': comments
 
                    })
 
                    _html.append('''\n\t\t<div class="add-bubble"><div>&nbsp;</div></div><pre>%(code)s</pre>\n''' % {
 
                        'code': change['line']
 
                    })
 

	
 
                    _html.append('''\t</td>''')
 
                    _html.append('''\n</tr>\n''')
 
        _html.append('''</table>''')
 
        if _html_empty:
 
            return None
 
        return ''.join(_html)
 

	
 
    def stat(self):
 
        """
 
        Returns tuple of added, and removed lines for this instance
 
        """
 
        return self.adds, self.removes
kallithea/lib/helpers.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Helper functions
 

	
 
Consists of functions to typically be used within templates, but also
 
available to Controllers. This module is available to both as 'h'.
 
"""
 
import hashlib
 
import StringIO
 
import math
 
import logging
 
import re
 
import urlparse
 
import textwrap
 

	
 
from pygments.formatters.html import HtmlFormatter
 
from pygments import highlight as code_highlight
 
from pylons import url
 
from pylons.i18n.translation import _, ungettext
 

	
 
from webhelpers.html import literal, HTML, escape
 
from webhelpers.html.tools import *
 
from webhelpers.html.builder import make_tag
 
from webhelpers.html.tags import auto_discovery_link, checkbox, css_classes, \
 
    end_form, file, hidden, image, javascript_link, link_to, \
 
    link_to_if, link_to_unless, ol, required_legend, select, stylesheet_link, \
 
    submit, text, password, textarea, title, ul, xml_declaration, radio
 
from webhelpers.html.tools import auto_link, button_to, highlight, \
 
    js_obfuscate, mail_to, strip_links, strip_tags, tag_re
 
from webhelpers.number import format_byte_size, format_bit_size
 
from webhelpers.pylonslib import Flash as _Flash
 
from webhelpers.pylonslib.secure_form import secure_form as form, authentication_token
 
from webhelpers.text import chop_at, collapse, convert_accented_entities, \
 
    convert_misc_entities, lchop, plural, rchop, remove_formatting, \
 
    replace_whitespace, urlify, truncate, wrap_paragraphs
 
from webhelpers.date import time_ago_in_words
 
from webhelpers.paginate import Page as _Page
 
from webhelpers.html.tags import _set_input_attrs, _set_id_attr, \
 
    convert_boolean_attrs, NotGiven, _make_safe_id_component
 

	
 
from kallithea.lib.annotate import annotate_highlight
 
from kallithea.lib.utils import repo_name_slug, get_custom_lexer
 
from kallithea.lib.utils2 import str2bool, safe_unicode, safe_str, \
 
    get_changeset_safe, datetime_to_time, time_to_datetime, AttributeDict,\
 
    safe_int
 
from kallithea.lib.markup_renderer import MarkupRenderer, url_re
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.config.conf import DATE_FORMAT, DATETIME_FORMAT
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.db import URL_SEP, Permission
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def canonical_url(*args, **kargs):
 
    '''Like url(x, qualified=True), but returns url that not only is qualified
 
    but also canonical, as configured in canonical_url'''
 
    from kallithea import CONFIG
 
    try:
 
        parts = CONFIG.get('canonical_url', '').split('://', 1)
 
        kargs['host'] = parts[1].split('/', 1)[0]
 
        kargs['protocol'] = parts[0]
 
    except IndexError:
 
        kargs['qualified'] = True
 
    return url(*args, **kargs)
 

	
 
def canonical_hostname():
 
    '''Return canonical hostname of system'''
 
    from kallithea import CONFIG
 
    try:
 
        parts = CONFIG.get('canonical_url', '').split('://', 1)
 
        return parts[1].split('/', 1)[0]
 
    except IndexError:
 
        parts = url('home', qualified=True).split('://', 1)
 
        return parts[1].split('/', 1)[0]
 

	
 
def html_escape(text):
 
    """Return string with all html escaped.
 
    This is also safe for javascript in html but not necessarily correct.
 
    """
 
    return (text
 
        .replace('&', '&amp;')
 
        .replace(">", "&gt;")
 
        .replace("<", "&lt;")
 
        .replace('"', "&quot;")
 
        .replace("'", "&apos;")
 
        )
 

	
 
def shorter(text, size=20):
 
    postfix = '...'
 
    if len(text) > size:
 
        return text[:size - len(postfix)] + postfix
 
    return text
 

	
 

	
 
def _reset(name, value=None, id=NotGiven, type="reset", **attrs):
 
    """
 
    Reset button
 
    """
 
    _set_input_attrs(attrs, type, name, value)
 
    _set_id_attr(attrs, id, name)
 
    convert_boolean_attrs(attrs, ["disabled"])
 
    return HTML.input(**attrs)
 

	
 
reset = _reset
 
safeid = _make_safe_id_component
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a unique ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 

	
 
    :param raw_id:
 
    :param path:
 
    """
 

	
 
    return 'C-%s-%s' % (short_id(raw_id), hashlib.md5(safe_str(path)).hexdigest()[:12])
 

	
 

	
 
class _GetError(object):
 
    """Get error from form_errors, and represent it as span wrapped error
 
    message
 

	
 
    :param field_name: field to fetch errors for
 
    :param form_errors: form errors dict
 
    """
 

	
 
    def __call__(self, field_name, form_errors):
 
        tmpl = """<span class="error_msg">%s</span>"""
 
        if form_errors and field_name in form_errors:
 
            return literal(tmpl % form_errors.get(field_name))
 

	
 
get_error = _GetError()
 

	
 

	
 
class _FilesBreadCrumbs(object):
 

	
 
    def __call__(self, repo_name, rev, paths):
 
        if isinstance(paths, str):
 
            paths = safe_unicode(paths)
 
        url_l = [link_to(repo_name, url('files_home',
 
                                        repo_name=repo_name,
 
                                        revision=rev, f_path=''),
 
                         class_='ypjax-link')]
 
        paths_l = paths.split('/')
 
        for cnt, p in enumerate(paths_l):
 
            if p != '':
 
                url_l.append(link_to(p,
 
                                     url('files_home',
 
                                         repo_name=repo_name,
 
                                         revision=rev,
 
                                         f_path='/'.join(paths_l[:cnt + 1])
 
                                         ),
 
                                     class_='ypjax-link'
 
                                     )
 
                             )
 

	
 
        return literal('/'.join(url_l))
 

	
 
files_breadcrumbs = _FilesBreadCrumbs()
 

	
 

	
 
class CodeHtmlFormatter(HtmlFormatter):
 
    """
 
    My code Html Formatter for source codes
 
    """
 

	
 
    def wrap(self, source, outfile):
 
        return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
 

	
 
    def _wrap_code(self, source):
 
        for cnt, it in enumerate(source):
 
            i, t = it
 
            t = '<div id="L%s">%s</div>' % (cnt + 1, t)
 
            yield i, t
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        dummyoutfile = StringIO.StringIO()
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            dummyoutfile.write(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        nocls = self.noclasses
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s%d" class="special">%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        # in case you wonder about the seemingly redundant <div> here: since the
 
        # content in the other cell also is wrapped in a div, some browsers in
 
        # some configurations seem to mess up the formatting...
 
        if nocls:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td><div class="linenodiv" '
 
                      'style="background-color: #f0f0f0; padding-right: 10px">'
 
                      '<pre style="line-height: 125%">' +
 
                      ls + '</pre></div></td><td id="hlcode" class="code">')
 
        else:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td class="linenos"><div class="linenodiv"><pre>' +
 
                      ls + '</pre></div></td><td id="hlcode" class="code">')
 
        yield 0, dummyoutfile.getvalue()
 
        yield 0, '</td></tr></table>'
 

	
 

	
 
_whitespace_re = re.compile(r'(\t)|( )(?=\n|</div>)')
 

	
 
def _markup_whitespace(m):
 
    groups = m.groups()
 
    if groups[0]:
 
        return '<u>\t</u>'
 
    if groups[1]:
 
        return ' <i></i>'
 

	
 
def markup_whitespace(s):
 
    return _whitespace_re.sub(_markup_whitespace, s)
 

	
 
def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(markup_whitespace(
 
        code_highlight(filenode.content, lexer, CodeHtmlFormatter(**kwargs))))
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 

	
 
    color_dict = {}
 

	
 
    def gen_color(n=10000):
 
        """generator for getting n of evenly distributed colors using
 
        hsv color and golden ratio. It always return same order of colors
 

	
 
        :returns: RGB tuple
 
        """
 

	
 
        def hsv_to_rgb(h, s, v):
 
            if s == 0.0:
 
                return v, v, v
 
            i = int(h * 6.0)  # XXX assume int() truncates!
 
            f = (h * 6.0) - i
 
            p = v * (1.0 - s)
 
            q = v * (1.0 - s * f)
 
            t = v * (1.0 - s * (1.0 - f))
 
            i = i % 6
 
            if i == 0:
 
                return v, t, p
 
            if i == 1:
 
                return q, v, p
 
            if i == 2:
 
                return p, v, t
 
            if i == 3:
 
                return p, q, v
 
            if i == 4:
 
                return t, p, v
 
            if i == 5:
 
                return v, p, q
 

	
 
        golden_ratio = 0.618033988749895
 
        h = 0.22717784590367374
 

	
 
        for _unused in xrange(n):
 
            h += golden_ratio
 
            h %= 1
 
            HSV_tuple = [h, 0.95, 0.95]
 
            RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
            yield map(lambda x: str(int(x * 256)), RGB_tuple)
 

	
 
    cgenerator = gen_color()
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = cgenerator.next()
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(repo_name):
 

	
 
        def _url_func(changeset):
 
            author = escape(changeset.author)
 
            date = changeset.date
 
            message = escape(changeset.message)
 
            tooltip_html = ("<div style='font-size:0.8em'><b>Author:</b>"
 
                            " %s<br/><b>Date:</b> %s</b><br/><b>Message:"
 
                            "</b> %s<br/></div>") % (author, date, message)
 

	
 
            lnk_format = show_id(changeset)
 
            uri = link_to(
 
                    lnk_format,
 
                    url('changeset_home', repo_name=repo_name,
 
                        revision=changeset.raw_id),
 
                    style=get_color_string(changeset.raw_id),
 
                    class_='tooltip safe-html-title',
 
                    title=tooltip_html
 
                  )
 

	
 
            uri += '\n'
 
            return uri
 
        return _url_func
 

	
 
    return literal(markup_whitespace(annotate_highlight(filenode, url_func(repo_name), **kwargs)))
 

	
 

	
 
def is_following_repo(repo_name, user_id):
 
    from kallithea.model.scm import ScmModel
 
    return ScmModel().is_following_repo(repo_name, user_id)
 

	
 
class _Message(object):
 
    """A message returned by ``Flash.pop_messages()``.
 

	
 
    Converting the message to a string returns the message text. Instances
 
    also have the following attributes:
 

	
 
    * ``message``: the message text.
 
    * ``category``: the category specified when the message was created.
 
    """
 

	
 
    def __init__(self, category, message):
 
        self.category = category
 
        self.message = message
 

	
 
    def __str__(self):
 
        return self.message
 

	
 
    __unicode__ = __str__
 

	
 
    def __html__(self):
 
        return escape(safe_unicode(self.message))
 

	
 
class Flash(_Flash):
 

	
 
    def __call__(self, message, category=None, ignore_duplicate=False, logf=None):
 
        """
 
        Show a message to the user _and_ log it through the specified function
 

	
 
        category: notice (default), warning, error, success
 
        logf: a custom log function - such as log.debug
 

	
 
        logf defaults to log.info, unless category equals 'success', in which
 
        case logf defaults to log.debug.
 
        """
 
        if logf is None:
 
            logf = log.info
 
            if category == 'success':
 
                logf = log.debug
 

	
 
        logf('Flash %s: %s', category, message)
 

	
 
        super(Flash, self).__call__(message, category, ignore_duplicate)
 

	
 
    def pop_messages(self):
 
        """Return all accumulated messages and delete them from the session.
 

	
 
        The return value is a list of ``Message`` objects.
 
        """
 
        from pylons import session
 
        messages = session.pop(self.session_key, [])
 
        session.save()
 
        return [_Message(*m) for m in messages]
 

	
 
flash = Flash()
 

	
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_name, author_email
 
from kallithea.lib.utils2 import credentials_filter, age as _age
 
from kallithea.model.db import User, ChangesetStatus, PullRequest
 

	
 
age = lambda  x, y=False: _age(x, y)
 
capitalize = lambda x: x.capitalize()
 
email = author_email
 
short_id = lambda x: x[:12]
 
hide_credentials = lambda x: ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    from kallithea import CONFIG
 
    def_len = safe_int(CONFIG.get('show_sha_length', 12))
 
    show_rev = str2bool(CONFIG.get('show_revision_number', False))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return '%s' % (raw_id)
 
        return raw_id
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        return date.strftime("%Y-%m-%d %H:%M:%S").decode('utf8')
 

	
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
def user_or_none(author):
 
    email = author_email(author)
 
    if email:
 
        user = User.get_by_email(email, case_insensitive=True, cache=True)
 
        if user is not None:
 
            return user
 

	
 
    user = User.get_by_username(author_name(author), case_insensitive=True, cache=True)
 
    if user is not None:
 
        return user
 

	
 
    return None
 

	
 
def email_or_none(author):
 
    if not author:
 
        return None
 
    user = user_or_none(author)
 
    if user is not None:
 
        return user.email # always use main email address - not necessarily the one used to find user
 

	
 
    # extract email from the commit string
 
    email = author_email(author)
 
    if email:
 
        return email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 
def person(author, show_attr="username"):
 
    """Find the user identified by 'author', return one of the users attributes,
 
    default to the username attribute, None if there is no user"""
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    # if author is already an instance use it for extraction
 
    if isinstance(author, User):
 
        return person_getter(author)
 

	
 
    user = user_or_none(author)
 
    if user is not None:
 
        return person_getter(user)
 

	
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return author_name(author) or email(author)
 

	
 

	
 
def person_by_id(id_, show_attr="username"):
 
    # attr to return from fetched user
 
    person_getter = lambda usr: getattr(usr, show_attr)
 

	
 
    #maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = User.get(id_)
 
        if user is not None:
 
            return person_getter(user)
 
    return id_
 

	
 

	
 
def desc_stylize(value):
 
    """
 
    converts tags from value into html equivalent
 

	
 
    :param value:
 
    """
 
    if not value:
 
        return ''
 

	
 
    value = re.sub(r'\[see\ \=&gt;\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
 
                   '<div class="metatag" tag="see">see =&gt; \\1 </div>', value)
 
    value = re.sub(r'\[license\ \=&gt;\ *([a-zA-Z0-9\/\=\?\&\ \:\/\.\-]*)\]',
 
                   '<div class="metatag" tag="license"><a href="http:\/\/www.opensource.org/licenses/\\1">\\1</a></div>', value)
 
    value = re.sub(r'\[(requires|recommends|conflicts|base)\ \=&gt;\ *([a-zA-Z0-9\-\/]*)\]',
 
                   '<div class="metatag" tag="\\1">\\1 =&gt; <a href="/\\2">\\2</a></div>', value)
 
    value = re.sub(r'\[(lang|language)\ \=&gt;\ *([a-zA-Z\-\/\#\+]*)\]',
 
                   '<div class="metatag" tag="lang">\\2</div>', value)
 
    value = re.sub(r'\[([a-z]+)\]',
 
                  '<div class="metatag" tag="\\1">\\1</div>', value)
 

	
 
    return value
 

	
 

	
 
def boolicon(value):
 
    """Returns boolean value of a value, represented as small html image of true/false
 
    icons
 

	
 
    :param value: value
 
    """
 

	
 
    if value:
 
        return HTML.tag('i', class_="icon-ok")
 
    else:
 
        return HTML.tag('i', class_="icon-minus-circled")
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        def lnk(rev, repo_name):
 
            lazy_cs = False
 
            title = None
 
            _url = '#'
 
            if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
 
                if rev.op and rev.ref_name:
 
                    if rev.op == 'delete_branch':
 
                        lbl = '%s' % _('Deleted branch: %s') % rev.ref_name
 
                        lbl = _('Deleted branch: %s') % rev.ref_name
 
                    elif rev.op == 'tag':
 
                        lbl = '%s' % _('Created tag: %s') % rev.ref_name
 
                        lbl = _('Created tag: %s') % rev.ref_name
 
                    else:
 
                        lbl = 'Unknown operation %s' % rev.op
 
                else:
 
                    lazy_cs = True
 
                    lbl = rev.short_id[:8]
 
                    _url = url('changeset_home', repo_name=repo_name,
 
                               revision=rev.raw_id)
 
            else:
 
                # changeset cannot be found - it might have been stripped or removed
 
                lbl = ('%s' % rev)[:12]
 
                lbl = rev[:12]
 
                title = _('Changeset not found')
 
            if parse_cs:
 
                return link_to(lbl, _url, title=title, class_='tooltip')
 
            return link_to(lbl, _url, raw_id=rev.raw_id, repo_name=repo_name,
 
                           class_='lazy-cs' if lazy_cs else '')
 

	
 
        def _get_op(rev_txt):
 
            _op = None
 
            _name = rev_txt
 
            if len(rev_txt.split('=>')) == 2:
 
                _op, _name = rev_txt.split('=>')
 
            return _op, _name
 

	
 
        revs = []
 
        if len(filter(lambda v: v != '', revs_ids)) > 0:
 
            repo = None
 
            for rev in revs_ids[:revs_top_limit]:
 
                _op, _name = _get_op(rev)
 

	
 
                # we want parsed changesets, or new log store format is bad
 
                if parse_cs:
 
                    try:
 
                        if repo is None:
 
                            repo = user_log.repository.scm_instance
 
                        _rev = repo.get_changeset(rev)
 
                        revs.append(_rev)
 
                    except ChangesetDoesNotExistError:
 
                        log.error('cannot find revision %s in this repo' % rev)
 
                        revs.append(rev)
 
                else:
 
                    _rev = AttributeDict({
 
                        'short_id': rev[:12],
 
                        'raw_id': rev,
 
                        'message': '',
 
                        'op': _op,
 
                        'ref_name': _name
 
                    })
 
                    revs.append(_rev)
 
        cs_links = [" " + ', '.join(
 
            [lnk(rev, repo_name) for rev in revs[:revs_limit]]
 
        )]
 
        _op1, _name1 = _get_op(revs_ids[0])
 
        _op2, _name2 = _get_op(revs_ids[-1])
 

	
 
        _rev = '%s...%s' % (_name1, _name2)
 

	
 
        compare_view = (
 
            ' <div class="compare_view tooltip" title="%s">'
 
            '<a href="%s">%s</a> </div>' % (
 
                _('Show all combined changesets %s->%s') % (
 
                    revs_ids[0][:12], revs_ids[-1][:12]
 
                ),
 
                url('changeset_home', repo_name=repo_name,
 
                    revision=_rev
 
                ),
 
                _('Compare view')
 
            )
 
        )
 

	
 
        # if we have exactly one more than normally displayed
 
        # just display it, takes less space than displaying
 
        # "and 1 more revisions"
 
        if len(revs_ids) == revs_limit + 1:
 
            cs_links.append(", " + lnk(revs[revs_limit], repo_name))
 

	
 
        # hidden-by-default ones
 
        if len(revs_ids) > revs_limit + 1:
 
            uniq_id = revs_ids[0]
 
            html_tmpl = (
 
                '<span> %s <a class="show_more" id="_%s" '
 
                'href="#more">%s</a> %s</span>'
 
            )
 
            if not feed:
 
                cs_links.append(html_tmpl % (
 
                      _('and'),
 
                      uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
 
                      _('revisions')
 
                    )
 
                )
 

	
 
            if not feed:
 
                html_tmpl = '<span id="%s" style="display:none">, %s </span>'
 
            else:
 
                html_tmpl = '<span id="%s"> %s </span>'
 

	
 
            morelinks = ', '.join(
 
              [lnk(rev, repo_name) for rev in revs[revs_limit:]]
 
            )
 

	
 
            if len(revs_ids) > revs_top_limit:
 
                morelinks += ', ...'
 

	
 
            cs_links.append(html_tmpl % (uniq_id, morelinks))
 
        if len(revs) > 1:
 
            cs_links.append(compare_view)
 
        return ''.join(cs_links)
 

	
 
    def get_fork_name():
 
        repo_name = action_params
 
        _url = url('summary_home', repo_name=repo_name)
 
        return _('Fork name %s') % link_to(action_params, _url)
 

	
 
    def get_user_name():
 
        user_name = action_params
 
        return user_name
 

	
 
    def get_users_group():
 
        group_name = action_params
 
        return group_name
 

	
 
    def get_pull_request():
 
        pull_request_id = action_params
 
        nice_id = PullRequest.make_nice_id(pull_request_id)
 

	
 
        deleted = user_log.repository is None
 
        if deleted:
 
            repo_name = user_log.repository_name
 
        else:
 
            repo_name = user_log.repository.repo_name
 

	
 
        return link_to(_('Pull request %s') % nice_id,
 
                    url('pullrequest_show', repo_name=repo_name,
 
                    pull_request_id=pull_request_id))
 

	
 
    def get_archive_name():
 
        archive_name = action_params
 
        return archive_name
 

	
 
    # action : translated str, callback(extractor), icon
 
    action_map = {
 
    'user_deleted_repo':           (_('[deleted] repository'),
 
                                    None, 'icon-trashcan'),
 
    'user_created_repo':           (_('[created] repository'),
 
                                    None, 'icon-plus'),
 
    'user_created_fork':           (_('[created] repository as fork'),
 
                                    None, 'icon-fork'),
 
    'user_forked_repo':            (_('[forked] repository'),
 
                                    get_fork_name, 'icon-fork'),
 
    'user_updated_repo':           (_('[updated] repository'),
 
                                    None, 'icon-pencil'),
 
    'user_downloaded_archive':      (_('[downloaded] archive from repository'),
 
                                    get_archive_name, 'icon-download-cloud'),
 
    'admin_deleted_repo':          (_('[delete] repository'),
 
                                    None, 'icon-trashcan'),
 
    'admin_created_repo':          (_('[created] repository'),
 
                                    None, 'icon-plus'),
 
    'admin_forked_repo':           (_('[forked] repository'),
 
                                    None, 'icon-fork'),
 
    'admin_updated_repo':          (_('[updated] repository'),
 
                                    None, 'icon-pencil'),
 
    'admin_created_user':          (_('[created] user'),
 
                                    get_user_name, 'icon-user'),
 
    'admin_updated_user':          (_('[updated] user'),
 
                                    get_user_name, 'icon-user'),
 
    'admin_created_users_group':   (_('[created] user group'),
 
                                    get_users_group, 'icon-pencil'),
 
    'admin_updated_users_group':   (_('[updated] user group'),
 
                                    get_users_group, 'icon-pencil'),
 
    'user_commented_revision':     (_('[commented] on revision in repository'),
 
                                    get_cs_links, 'icon-comment'),
 
    'user_commented_pull_request': (_('[commented] on pull request for'),
 
                                    get_pull_request, 'icon-comment'),
 
    'user_closed_pull_request':    (_('[closed] pull request for'),
 
                                    get_pull_request, 'icon-ok'),
 
    'push':                        (_('[pushed] into'),
 
                                    get_cs_links, 'icon-move-up'),
 
    'push_local':                  (_('[committed via Kallithea] into repository'),
 
                                    get_cs_links, 'icon-pencil'),
 
    'push_remote':                 (_('[pulled from remote] into repository'),
 
                                    get_cs_links, 'icon-move-up'),
 
    'pull':                        (_('[pulled] from'),
 
                                    None, 'icon-move-down'),
 
    'started_following_repo':      (_('[started following] repository'),
 
                                    None, 'icon-heart'),
 
    'stopped_following_repo':      (_('[stopped following] repository'),
 
                                    None, 'icon-heart-empty'),
 
    }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0]\
 
            .replace('[', '<span class="journal_highlight">')\
 
            .replace(']', '</span>')
 

	
 
    action_params_func = lambda: ""
 

	
 
    if callable(action_str[1]):
 
        action_params_func = action_str[1]
 

	
 
    def action_parser_icon():
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        tmpl = """<i class="%s" alt="%s"></i>"""
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        return literal(tmpl % (ico, action))
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 

	
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, HasPermissionAll, \
 
HasRepoPermissionAny, HasRepoPermissionAll, HasRepoGroupPermissionAll, \
 
HasRepoGroupPermissionAny
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 
def gravatar(email_address, cls='', size=30, ssl_enabled=True):
 
    """return html element of the gravatar
 

	
 
    This method will return an <img> with the resolution double the size (for
 
    retina screens) of the image. If the url returned from gravatar_url is
 
    empty then we fallback to using an icon.
 

	
 
    """
 
    src = gravatar_url(email_address, size*2, ssl_enabled)
 

	
 
    #  here it makes sense to use style="width: ..." (instead of, say, a
 
    # stylesheet) because we using this to generate a high-res (retina) size
 
    tmpl = """<img alt="gravatar" class="{cls}" style="width: {size}px; height: {size}px" src="{src}"/>"""
 

	
 
    # if src is empty then there was no gravatar, so we use a font icon
 
    if not src:
 
        tmpl = """<i class="icon-user {cls}" style="font-size: {size}px;"></i>"""
 

	
 
    tmpl = tmpl.format(cls=cls, size=size, src=src)
 
    return literal(tmpl)
 

	
 
def gravatar_url(email_address, size=30, ssl_enabled=True):
 
    # doh, we need to re-import those to mock it later
 
    from pylons import url
 
    from pylons import tmpl_context as c
 

	
 
    _def = 'anonymous@kallithea-scm.org'  # default gravatar
 
    _use_gravatar = c.visual.use_gravatar
 
    _gravatar_url = c.visual.gravatar_url or User.DEFAULT_GRAVATAR_URL
 

	
 
    email_address = email_address or _def
 

	
 
    if not _use_gravatar or not email_address or email_address == _def:
 
        return ""
 

	
 
    if _use_gravatar:
 
        _md5 = lambda s: hashlib.md5(s).hexdigest()
 

	
 
        tmpl = _gravatar_url
 
        parsed_url = urlparse.urlparse(url.current(qualified=True))
 
        tmpl = tmpl.replace('{email}', email_address)\
 
                   .replace('{md5email}', _md5(safe_str(email_address).lower())) \
 
                   .replace('{netloc}', parsed_url.netloc)\
 
                   .replace('{scheme}', parsed_url.scheme)\
 
                   .replace('{size}', safe_str(size))
 
        return tmpl
 

	
 
class Page(_Page):
 
    """
 
    Custom pager to match rendering style with YUI paginator
 
    """
 

	
 
    def _get_pos(self, cur_page, max_page, items):
 
        edge = (items / 2) + 1
 
        if (cur_page <= edge):
 
            radius = max(items / 2, items - cur_page)
 
        elif (max_page - cur_page) < edge:
 
            radius = (items - 1) - (max_page - cur_page)
 
        else:
 
            radius = items / 2
 

	
 
        left = max(1, (cur_page - (radius)))
 
        right = min(max_page, cur_page + (radius))
 
        return left, cur_page, right
 

	
 
    def _range(self, regexp_match):
 
        """
 
        Return range of linked pages (e.g. '1 2 [3] 4 5 6 7 8').
 

	
 
        Arguments:
 

	
 
        regexp_match
 
            A "re" (regular expressions) match object containing the
 
            radius of linked pages around the current page in
 
            regexp_match.group(1) as a string
 

	
 
        This function is supposed to be called as a callable in
 
        re.sub.
 

	
 
        """
 
        radius = int(regexp_match.group(1))
 

	
 
        # Compute the first and last page number within the radius
 
        # e.g. '1 .. 5 6 [7] 8 9 .. 12'
 
        # -> leftmost_page  = 5
 
        # -> rightmost_page = 9
 
        leftmost_page, _cur, rightmost_page = self._get_pos(self.page,
 
                                                            self.last_page,
 
                                                            (radius * 2) + 1)
 
        nav_items = []
 

	
 
        # Create a link to the first page (unless we are on the first page
 
        # or there would be no need to insert '..' spacers)
 
        if self.page != self.first_page and self.first_page < leftmost_page:
 
            nav_items.append(self._pagerlink(self.first_page, self.first_page))
 

	
 
        # Insert dots if there are pages between the first page
 
        # and the currently displayed page range
 
        if leftmost_page - self.first_page > 1:
 
            # Wrap in a SPAN tag if nolink_attr is set
 
            text = '..'
 
            if self.dotdot_attr:
 
                text = HTML.span(c=text, **self.dotdot_attr)
 
            nav_items.append(text)
 

	
 
        for thispage in xrange(leftmost_page, rightmost_page + 1):
 
            # Highlight the current page number and do not use a link
 
            text = str(thispage)
 
            if thispage == self.page:
 
                text = '%s' % (thispage,)
 
                # Wrap in a SPAN tag if nolink_attr is set
 
                if self.curpage_attr:
 
                    text = HTML.span(c=text, **self.curpage_attr)
 
                nav_items.append(text)
 
            # Otherwise create just a link to that page
 
            else:
 
                text = '%s' % (thispage,)
 
                nav_items.append(self._pagerlink(thispage, text))
 

	
 
        # Insert dots if there are pages between the displayed
 
        # page numbers and the end of the page range
 
        if self.last_page - rightmost_page > 1:
 
            text = '..'
 
            # Wrap in a SPAN tag if nolink_attr is set
 
            if self.dotdot_attr:
 
                text = HTML.span(c=text, **self.dotdot_attr)
 
            nav_items.append(text)
 

	
 
        # Create a link to the very last page (unless we are on the last
 
        # page or there would be no need to insert '..' spacers)
 
        if self.page != self.last_page and rightmost_page < self.last_page:
 
            nav_items.append(self._pagerlink(self.last_page, self.last_page))
 

	
 
        #_page_link = url.current()
 
        #nav_items.append(literal('<link rel="prerender" href="%s?page=%s">' % (_page_link, str(int(self.page)+1))))
 
        #nav_items.append(literal('<link rel="prefetch" href="%s?page=%s">' % (_page_link, str(int(self.page)+1))))
 
        return self.separator.join(nav_items)
 

	
 
    def pager(self, format='~2~', page_param='page', partial_param='partial',
 
        show_if_single_page=False, separator=' ', onclick=None,
 
        symbol_first='<<', symbol_last='>>',
 
        symbol_previous='<', symbol_next='>',
 
        link_attr={'class': 'pager_link', 'rel': 'prerender'},
 
        curpage_attr={'class': 'pager_curpage'},
 
        dotdot_attr={'class': 'pager_dotdot'}, **kwargs):
 

	
 
        self.curpage_attr = curpage_attr
 
        self.separator = separator
 
        self.pager_kwargs = kwargs
 
        self.page_param = page_param
 
        self.partial_param = partial_param
 
        self.onclick = onclick
 
        self.link_attr = link_attr
 
        self.dotdot_attr = dotdot_attr
 

	
 
        # Don't show navigator if there is no more than one page
 
        if self.page_count == 0 or (self.page_count == 1 and not show_if_single_page):
 
            return ''
 

	
 
        from string import Template
 
        # Replace ~...~ in token format by range of pages
 
        result = re.sub(r'~(\d+)~', self._range, format)
 

	
 
        # Interpolate '%' variables
 
        result = Template(result).safe_substitute({
 
            'first_page': self.first_page,
 
            'last_page': self.last_page,
 
            'page': self.page,
 
            'page_count': self.page_count,
 
            'items_per_page': self.items_per_page,
 
            'first_item': self.first_item,
 
            'last_item': self.last_item,
 
            'item_count': self.item_count,
 
            'link_first': self.page > self.first_page and \
 
                    self._pagerlink(self.first_page, symbol_first) or '',
 
            'link_last': self.page < self.last_page and \
 
                    self._pagerlink(self.last_page, symbol_last) or '',
 
            'link_previous': self.previous_page and \
 
                    self._pagerlink(self.previous_page, symbol_previous) \
 
                    or HTML.span(symbol_previous, class_="yui-pg-previous"),
 
            'link_next': self.next_page and \
 
                    self._pagerlink(self.next_page, symbol_next) \
 
                    or HTML.span(symbol_next, class_="yui-pg-next")
 
        })
 

	
 
        return literal(result)
 

	
 

	
 
#==============================================================================
 
# REPO PAGER, PAGER FOR REPOSITORY
 
#==============================================================================
 
class RepoPage(Page):
 

	
 
    def __init__(self, collection, page=1, items_per_page=20,
 
                 item_count=None, url=None, **kwargs):
 

	
 
        """Create a "RepoPage" instance. special pager for paging
 
        repository
 
        """
 
        self._url_generator = url
 

	
 
        # Safe the kwargs class-wide so they can be used in the pager() method
 
        self.kwargs = kwargs
 

	
 
        # Save a reference to the collection
 
        self.original_collection = collection
 

	
 
        self.collection = collection
 

	
 
        # The self.page is the number of the current page.
 
        # The first page has the number 1!
 
        try:
 
            self.page = int(page)  # make it int() if we get it as a string
 
        except (ValueError, TypeError):
 
            self.page = 1
 

	
 
        self.items_per_page = items_per_page
 

	
 
        # Unless the user tells us how many items the collections has
 
        # we calculate that ourselves.
 
        if item_count is not None:
 
            self.item_count = item_count
 
        else:
 
            self.item_count = len(self.collection)
 

	
 
        # Compute the number of the first and last available page
 
        if self.item_count > 0:
 
            self.first_page = 1
 
            self.page_count = int(math.ceil(float(self.item_count) /
 
                                            self.items_per_page))
 
            self.last_page = self.first_page + self.page_count - 1
 

	
 
            # Make sure that the requested page number is the range of
 
            # valid pages
 
            if self.page > self.last_page:
 
                self.page = self.last_page
 
            elif self.page < self.first_page:
 
                self.page = self.first_page
 

	
 
            # Note: the number of items on this page can be less than
 
            #       items_per_page if the last page is not full
 
            self.first_item = max(0, (self.item_count) - (self.page *
 
                                                          items_per_page))
 
            self.last_item = ((self.item_count - 1) - items_per_page *
 
                              (self.page - 1))
 

	
 
            self.items = list(self.collection[self.first_item:self.last_item + 1])
 

	
 
            # Links to previous and next page
 
            if self.page > self.first_page:
 
                self.previous_page = self.page - 1
 
            else:
 
                self.previous_page = None
 

	
 
            if self.page < self.last_page:
 
                self.next_page = self.page + 1
 
            else:
 
                self.next_page = None
 

	
 
        # No items available
 
        else:
 
            self.first_page = None
 
            self.page_count = 0
 
            self.last_page = None
 
            self.first_item = None
 
            self.last_item = None
 
            self.previous_page = None
 
            self.next_page = None
 
            self.items = []
 

	
 
        # This is a subclass of the 'list' type. Initialise the list now.
 
        list.__init__(self, reversed(self.items))
 

	
 

	
 
def changed_tooltip(nodes):
 
    """
 
    Generates a html string for changed nodes in changeset page.
 
    It limits the output to 30 entries
 

	
 
    :param nodes: LazyNodesGenerator
 
    """
 
    if nodes:
 
        pref = ': <br/> '
 
        suf = ''
 
        if len(nodes) > 30:
 
            suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
 
        return literal(pref + '<br/> '.join([safe_unicode(x.path)
 
                                             for x in nodes[:30]]) + suf)
 
    else:
 
        return ': ' + _('No Files')
 

	
 

	
 
def repo_link(groups_and_repos):
 
    """
 
    Makes a breadcrumbs link to repo within a group
 
    joins &raquo; on each group to create a fancy link
 

	
 
    ex::
 
        group >> subgroup >> repo
 

	
 
    :param groups_and_repos:
 
    :param last_url:
 
    """
 
    groups, just_name, repo_name = groups_and_repos
 
    last_url = url('summary_home', repo_name=repo_name)
 
    last_link = link_to(just_name, last_url)
 

	
 
    def make_link(group):
 
        return link_to(group.name,
 
                       url('repos_group_home', group_name=group.group_name))
 
    return literal(' &raquo; '.join(map(make_link, groups) + ['<span>%s</span>' % last_link]))
 

	
 

	
 
def fancy_file_stats(stats):
 
    """
 
    Displays a fancy two colored bar for number of added/deleted
 
    lines of code on file
 

	
 
    :param stats: two element list of added/deleted lines of code
 
    """
 
    from kallithea.lib.diffs import NEW_FILENODE, DEL_FILENODE, \
 
        MOD_FILENODE, RENAMED_FILENODE, CHMOD_FILENODE, BIN_FILENODE
 

	
 
    def cgen(l_type, a_v, d_v):
 
        mapping = {'tr': 'top-right-rounded-corner-mid',
 
                   'tl': 'top-left-rounded-corner-mid',
 
                   'br': 'bottom-right-rounded-corner-mid',
 
                   'bl': 'bottom-left-rounded-corner-mid'}
 
        map_getter = lambda x: mapping[x]
 

	
 
        if l_type == 'a' and d_v:
 
            #case when added and deleted are present
 
            return ' '.join(map(map_getter, ['tl', 'bl']))
 

	
 
        if l_type == 'a' and not d_v:
 
            return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
 

	
 
        if l_type == 'd' and a_v:
 
            return ' '.join(map(map_getter, ['tr', 'br']))
 

	
 
        if l_type == 'd' and not a_v:
 
            return ' '.join(map(map_getter, ['tr', 'br', 'tl', 'bl']))
 

	
 
    a, d = stats['added'], stats['deleted']
 
    width = 100
 

	
 
    if stats['binary']:
 
        #binary mode
 
        lbl = ''
 
        bin_op = 1
 

	
 
        if BIN_FILENODE in stats['ops']:
 
            lbl = 'bin+'
 

	
 
        if NEW_FILENODE in stats['ops']:
 
            lbl += _('new file')
 
            bin_op = NEW_FILENODE
 
        elif MOD_FILENODE in stats['ops']:
 
            lbl += _('mod')
 
            bin_op = MOD_FILENODE
 
        elif DEL_FILENODE in stats['ops']:
 
            lbl += _('del')
 
            bin_op = DEL_FILENODE
 
        elif RENAMED_FILENODE in stats['ops']:
 
            lbl += _('rename')
 
            bin_op = RENAMED_FILENODE
 

	
 
        #chmod can go with other operations
 
        if CHMOD_FILENODE in stats['ops']:
 
            _org_lbl = _('chmod')
 
            lbl += _org_lbl if lbl.endswith('+') else '+%s' % _org_lbl
 

	
 
        #import ipdb;ipdb.set_trace()
 
        b_d = '<div class="bin bin%s %s" style="width:100%%">%s</div>' % (bin_op, cgen('a', a_v='', d_v=0), lbl)
 
        b_a = '<div class="bin bin1" style="width:0%%"></div>'
 
        return literal('<div style="width:%spx">%s%s</div>' % (width, b_a, b_d))
 

	
 
    t = stats['added'] + stats['deleted']
 
    unit = float(width) / (t or 1)
 

	
 
    # needs > 9% of width to be visible or 0 to be hidden
 
    a_p = max(9, unit * a) if a > 0 else 0
 
    d_p = max(9, unit * d) if d > 0 else 0
 
    p_sum = a_p + d_p
 

	
 
    if p_sum > width:
 
        #adjust the percentage to be == 100% since we adjusted to 9
 
        if a_p > d_p:
 
            a_p = a_p - (p_sum - width)
 
        else:
 
            d_p = d_p - (p_sum - width)
 

	
 
    a_v = a if a > 0 else ''
 
    d_v = d if d > 0 else ''
 

	
 
    d_a = '<div class="added %s" style="width:%s%%">%s</div>' % (
 
        cgen('a', a_v, d_v), a_p, a_v
 
    )
 
    d_d = '<div class="deleted %s" style="width:%s%%">%s</div>' % (
 
        cgen('d', a_v, d_v), d_p, d_v
 
    )
 
    return literal('<div style="width:%spx">%s%s</div>' % (width, d_a, d_d))
 

	
 

	
 
def _urlify_text(s):
 
    """
 
    Extract urls from text and make html links out of them
 
    """
 
    def url_func(match_obj):
 
        url_full = match_obj.group(1)
 
        return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
 
    return url_re.sub(url_func, s)
 

	
 
def urlify_text(s, truncate=None, stylize=False, truncatef=truncate):
 
    """
 
    Extract urls from text and make literal html links out of them
 
    """
 
    if truncate is not None:
 
        s = truncatef(s, truncate)
 
    s = html_escape(s)
 
    if stylize:
 
        s = desc_stylize(s)
 
    s = _urlify_text(s)
 
    return literal(s)
 

	
 
def urlify_changesets(text_, repository):
 
    """
 
    Extract revision ids from changeset and make link from them
 

	
 
    :param text_:
 
    :param repository: repo name to build the URL with
 
    """
 
    from pylons import url  # doh, we need to re-import url to mock it later
 

	
 
    def url_func(match_obj):
 
        rev = match_obj.group(0)
 
        return '<a class="revision-link" href="%(url)s">%(rev)s</a>' % {
 
         'url': url('changeset_home', repo_name=repository, revision=rev),
 
         'rev': rev,
 
        }
 

	
 
    return re.sub(r'(?:^|(?<=[\s(),]))([0-9a-fA-F]{12,40})(?=$|\s|[.,:()])', url_func, text_)
 

	
 
def linkify_others(t, l):
 
    urls = re.compile(r'(\<a.*?\<\/a\>)',)
 
    links = []
 
    for e in urls.split(t):
 
        if not urls.match(e):
 
            links.append('<a class="message-link" href="%s">%s</a>' % (l, e))
 
        else:
 
            links.append(e)
 

	
 
    return ''.join(links)
 

	
 
def urlify_commit(text_, repository, link_=None):
 
    """
 
    Parses given text message and makes proper links.
 
    issues are linked to given issue-server, and rest is a changeset link
 
    if link_ is given, in other case it's a plain text
 

	
 
    :param text_:
 
    :param repository:
 
    :param link_: changeset link
 
    """
 
    newtext = html_escape(text_)
 

	
 
    # urlify changesets - extract revisions and make link out of them
 
    newtext = urlify_changesets(newtext, repository)
 

	
 
    # extract http/https links and make them real urls
 
    newtext = _urlify_text(newtext)
 

	
 
    newtext = urlify_issues(newtext, repository, link_)
 

	
 
    return literal(newtext)
 

	
 
def urlify_issues(newtext, repository, link_=None):
 
    from kallithea import CONFIG as conf
 

	
 
    # allow multiple issue servers to be used
 
    valid_indices = [
 
        x.group(1)
 
        for x in map(lambda x: re.match(r'issue_pat(.*)', x), conf.keys())
 
        if x and 'issue_server_link%s' % x.group(1) in conf
 
        and 'issue_prefix%s' % x.group(1) in conf
 
    ]
 

	
 
    if valid_indices:
 
        log.debug('found issue server suffixes `%s` during valuation of: %s'
 
                  % (','.join(valid_indices), newtext))
 

	
 
    for pattern_index in valid_indices:
 
        ISSUE_PATTERN = conf.get('issue_pat%s' % pattern_index)
 
        ISSUE_SERVER_LNK = conf.get('issue_server_link%s' % pattern_index)
 
        ISSUE_PREFIX = conf.get('issue_prefix%s' % pattern_index)
 

	
 
        log.debug('pattern suffix `%s` PAT:%s SERVER_LINK:%s PREFIX:%s'
 
                  % (pattern_index, ISSUE_PATTERN, ISSUE_SERVER_LNK,
 
                     ISSUE_PREFIX))
 

	
 
        URL_PAT = re.compile(r'%s' % ISSUE_PATTERN)
 
        URL_PAT = re.compile(ISSUE_PATTERN)
 

	
 
        def url_func(match_obj):
 
            pref = ''
 
            if match_obj.group().startswith(' '):
 
                pref = ' '
 

	
 
            issue_id = ''.join(match_obj.groups())
 
            issue_url = ISSUE_SERVER_LNK.replace('{id}', issue_id)
 
            if repository:
 
                issue_url = issue_url.replace('{repo}', repository)
 
                repo_name = repository.split(URL_SEP)[-1]
 
                issue_url = issue_url.replace('{repo_name}', repo_name)
 

	
 
            return (
 
                '%(pref)s<a class="%(cls)s" href="%(url)s">'
 
                '%(issue-prefix)s%(id-repr)s'
 
                '</a>'
 
                ) % {
 
                 'pref': pref,
 
                 'cls': 'issue-tracker-link',
 
                 'url': issue_url,
 
                 'id-repr': issue_id,
 
                 'issue-prefix': ISSUE_PREFIX,
 
                 'serv': ISSUE_SERVER_LNK,
 
                }
 
        newtext = URL_PAT.sub(url_func, newtext)
 
        log.debug('processed prefix:`%s` => %s' % (pattern_index, newtext))
 

	
 
    # if we actually did something above
 
    if link_:
 
        # wrap not links into final link => link_
 
        newtext = linkify_others(newtext, link_)
 
    return newtext
 

	
 

	
 
def rst(source):
 
    return literal('<div class="rst-block">%s</div>' %
 
                   MarkupRenderer.rst(source))
 

	
 

	
 
def rst_w_mentions(source):
 
    """
 
    Wrapped rst renderer with @mention highlighting
 

	
 
    :param source:
 
    """
 
    return literal('<div class="rst-block">%s</div>' %
 
                   MarkupRenderer.rst_with_mentions(source))
 

	
 
def short_ref(ref_type, ref_name):
 
    if ref_type == 'rev':
 
        return short_id(ref_name)
 
    return ref_name
 

	
 
def link_to_ref(repo_name, ref_type, ref_name, rev=None):
 
    """
 
    Return full markup for a href to changeset_home for a changeset.
 
    If ref_type is branch it will link to changelog.
 
    ref_name is shortened if ref_type is 'rev'.
 
    if rev is specified show it too, explicitly linking to that revision.
 
    """
 
    txt = short_ref(ref_type, ref_name)
 
    if ref_type == 'branch':
 
        u = url('changelog_home', repo_name=repo_name, branch=ref_name)
 
    else:
 
        u = url('changeset_home', repo_name=repo_name, revision=ref_name)
 
    l = link_to(repo_name + '#' + txt, u)
 
    if rev and ref_type != 'rev':
 
        l = literal('%s (%s)' % (l, link_to(short_id(rev), url('changeset_home', repo_name=repo_name, revision=rev))))
 
    return l
 

	
 
def changeset_status(repo, revision):
 
    return ChangesetStatusModel().get_status(repo, revision)
 

	
 

	
 
def changeset_status_lbl(changeset_status):
 
    return dict(ChangesetStatus.STATUSES).get(changeset_status)
 

	
 

	
 
def get_permission_name(key):
 
    return dict(Permission.PERMS).get(key)
 

	
 

	
 
def journal_filter_help():
 
    return _(textwrap.dedent('''
 
        Example filter terms:
 
            repository:vcs
 
            username:developer
 
            action:*push*
 
            ip:127.0.0.1
 
            date:20120101
 
            date:[20120101100000 TO 20120102]
 

	
 
        Generate wildcards using '*' character:
 
            "repository:vcs*" - search everything starting with 'vcs'
 
            "repository:*vcs*" - search for repository containing 'vcs'
 

	
 
        Optional AND / OR operators in queries
 
            "repository:vcs OR repository:test"
 
            "username:test AND repository:test*"
 
    '''))
 

	
 

	
 
def not_mapped_error(repo_name):
 
    flash(_('%s repository is not mapped to db perhaps'
 
            ' it was created or renamed from the filesystem'
 
            ' please run the application again'
 
            ' in order to rescan repositories') % repo_name, category='error')
 

	
 

	
 
def ip_range(ip_addr):
 
    from kallithea.model.db import UserIpMap
 
    s, e = UserIpMap._get_ip_range(ip_addr)
 
    return '%s - %s' % (s, e)
kallithea/lib/ipaddr.py
Show inline comments
 
# Copyright 2007 Google Inc.
 
#  Licensed to PSF under a Contributor Agreement.
 
#
 
# Licensed under the Apache License, Version 2.0 (the "License");
 
# you may not use this file except in compliance with the License.
 
# You may obtain a copy of the License at
 
#
 
#      http://www.apache.org/licenses/LICENSE-2.0
 
#
 
# Unless required by applicable law or agreed to in writing, software
 
# distributed under the License is distributed on an "AS IS" BASIS,
 
# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or
 
# implied. See the License for the specific language governing
 
# permissions and limitations under the License.
 

	
 
"""A fast, lightweight IPv4/IPv6 manipulation library in Python.
 

	
 
This library is used to create/poke/manipulate IPv4 and IPv6 addresses
 
and networks.
 

	
 
"""
 

	
 
__version__ = 'trunk'
 

	
 
import struct
 

	
 
IPV4LENGTH = 32
 
IPV6LENGTH = 128
 

	
 

	
 
class AddressValueError(ValueError):
 
    """A Value Error related to the address."""
 

	
 

	
 
class NetmaskValueError(ValueError):
 
    """A Value Error related to the netmask."""
 

	
 

	
 
def IPAddress(address, version=None):
 
    """Take an IP string/int and return an object of the correct type.
 

	
 
    Args:
 
        address: A string or integer, the IP address.  Either IPv4 or
 
          IPv6 addresses may be supplied; integers less than 2**32 will
 
          be considered to be IPv4 by default.
 
        version: An Integer, 4 or 6. If set, don't try to automatically
 
          determine what the IP address type is. important for things
 
          like IPAddress(1), which could be IPv4, '0.0.0.1',  or IPv6,
 
          '::1'.
 

	
 
    Returns:
 
        An IPv4Address or IPv6Address object.
 

	
 
    Raises:
 
        ValueError: if the string passed isn't either a v4 or a v6
 
          address.
 

	
 
    """
 
    if version:
 
        if version == 4:
 
            return IPv4Address(address)
 
        elif version == 6:
 
            return IPv6Address(address)
 

	
 
    try:
 
        return IPv4Address(address)
 
    except (AddressValueError, NetmaskValueError):
 
        pass
 

	
 
    try:
 
        return IPv6Address(address)
 
    except (AddressValueError, NetmaskValueError):
 
        pass
 

	
 
    raise ValueError('%r does not appear to be an IPv4 or IPv6 address' %
 
                     address)
 

	
 

	
 
def IPNetwork(address, version=None, strict=False):
 
    """Take an IP string/int and return an object of the correct type.
 

	
 
    Args:
 
        address: A string or integer, the IP address.  Either IPv4 or
 
          IPv6 addresses may be supplied; integers less than 2**32 will
 
          be considered to be IPv4 by default.
 
        version: An Integer, if set, don't try to automatically
 
          determine what the IP address type is. important for things
 
          like IPNetwork(1), which could be IPv4, '0.0.0.1/32', or IPv6,
 
          '::1/128'.
 

	
 
    Returns:
 
        An IPv4Network or IPv6Network object.
 

	
 
    Raises:
 
        ValueError: if the string passed isn't either a v4 or a v6
 
          address. Or if a strict network was requested and a strict
 
          network wasn't given.
 

	
 
    """
 
    if version:
 
        if version == 4:
 
            return IPv4Network(address, strict)
 
        elif version == 6:
 
            return IPv6Network(address, strict)
 

	
 
    try:
 
        return IPv4Network(address, strict)
 
    except (AddressValueError, NetmaskValueError):
 
        pass
 

	
 
    try:
 
        return IPv6Network(address, strict)
 
    except (AddressValueError, NetmaskValueError):
 
        pass
 

	
 
    raise ValueError('%r does not appear to be an IPv4 or IPv6 network' %
 
                     address)
 

	
 

	
 
def v4_int_to_packed(address):
 
    """The binary representation of this address.
 

	
 
    Args:
 
        address: An integer representation of an IPv4 IP address.
 

	
 
    Returns:
 
        The binary representation of this address.
 

	
 
    Raises:
 
        ValueError: If the integer is too large to be an IPv4 IP
 
          address.
 
    """
 
    if address > _BaseV4._ALL_ONES:
 
        raise ValueError('Address too large for IPv4')
 
    return Bytes(struct.pack('!I', address))
 

	
 

	
 
def v6_int_to_packed(address):
 
    """The binary representation of this address.
 

	
 
    Args:
 
        address: An integer representation of an IPv6 IP address.
 

	
 
    Returns:
 
        The binary representation of this address.
 
    """
 
    return Bytes(struct.pack('!QQ', address >> 64, address & (2 ** 64 - 1)))
 

	
 

	
 
def _find_address_range(addresses):
 
    """Find a sequence of addresses.
 

	
 
    Args:
 
        addresses: a list of IPv4 or IPv6 addresses.
 

	
 
    Returns:
 
        A tuple containing the first and last IP addresses in the sequence.
 

	
 
    """
 
    first = last = addresses[0]
 
    for ip in addresses[1:]:
 
        if ip._ip == last._ip + 1:
 
            last = ip
 
        else:
 
            break
 
    return (first, last)
 

	
 

	
 
def _get_prefix_length(number1, number2, bits):
 
    """Get the number of leading bits that are same for two numbers.
 

	
 
    Args:
 
        number1: an integer.
 
        number2: another integer.
 
        bits: the maximum number of bits to compare.
 

	
 
    Returns:
 
        The number of leading bits that are the same for two numbers.
 

	
 
    """
 
    for i in range(bits):
 
        if number1 >> i == number2 >> i:
 
            return bits - i
 
    return 0
 

	
 

	
 
def _count_righthand_zero_bits(number, bits):
 
    """Count the number of zero bits on the right hand side.
 

	
 
    Args:
 
        number: an integer.
 
        bits: maximum number of bits to count.
 

	
 
    Returns:
 
        The number of zero bits on the right hand side of the number.
 

	
 
    """
 
    if number == 0:
 
        return bits
 
    for i in range(bits):
 
        if (number >> i) % 2:
 
            return i
 

	
 

	
 
def summarize_address_range(first, last):
 
    """Summarize a network range given the first and last IP addresses.
 

	
 
    Example:
 
        >>> summarize_address_range(IPv4Address('1.1.1.0'),
 
            IPv4Address('1.1.1.130'))
 
        [IPv4Network('1.1.1.0/25'), IPv4Network('1.1.1.128/31'),
 
        IPv4Network('1.1.1.130/32')]
 

	
 
    Args:
 
        first: the first IPv4Address or IPv6Address in the range.
 
        last: the last IPv4Address or IPv6Address in the range.
 

	
 
    Returns:
 
        The address range collapsed to a list of IPv4Network's or
 
        IPv6Network's.
 

	
 
    Raise:
 
        TypeError:
 
            If the first and last objects are not IP addresses.
 
            If the first and last objects are not the same version.
 
        ValueError:
 
            If the last object is not greater than the first.
 
            If the version is not 4 or 6.
 

	
 
    """
 
    if not (isinstance(first, _BaseIP) and isinstance(last, _BaseIP)):
 
        raise TypeError('first and last must be IP addresses, not networks')
 
    if first.version != last.version:
 
        raise TypeError("%s and %s are not of the same version" % (
 
                str(first), str(last)))
 
    if first > last:
 
        raise ValueError('last IP address must be greater than first')
 

	
 
    networks = []
 

	
 
    if first.version == 4:
 
        ip = IPv4Network
 
    elif first.version == 6:
 
        ip = IPv6Network
 
    else:
 
        raise ValueError('unknown IP version')
 

	
 
    ip_bits = first._max_prefixlen
 
    first_int = first._ip
 
    last_int = last._ip
 
    while first_int <= last_int:
 
        nbits = _count_righthand_zero_bits(first_int, ip_bits)
 
        current = None
 
        while nbits >= 0:
 
            addend = 2 ** nbits - 1
 
            current = first_int + addend
 
            nbits -= 1
 
            if current <= last_int:
 
                break
 
        prefix = _get_prefix_length(first_int, current, ip_bits)
 
        net = ip('%s/%d' % (str(first), prefix))
 
        networks.append(net)
 
        if current == ip._ALL_ONES:
 
            break
 
        first_int = current + 1
 
        first = IPAddress(first_int, version=first._version)
 
    return networks
 

	
 

	
 
def _collapse_address_list_recursive(addresses):
 
    """Loops through the addresses, collapsing concurrent netblocks.
 

	
 
    Example:
 

	
 
        ip1 = IPv4Network('1.1.0.0/24')
 
        ip2 = IPv4Network('1.1.1.0/24')
 
        ip3 = IPv4Network('1.1.2.0/24')
 
        ip4 = IPv4Network('1.1.3.0/24')
 
        ip5 = IPv4Network('1.1.4.0/24')
 
        ip6 = IPv4Network('1.1.0.1/22')
 

	
 
        _collapse_address_list_recursive([ip1, ip2, ip3, ip4, ip5, ip6]) ->
 
          [IPv4Network('1.1.0.0/22'), IPv4Network('1.1.4.0/24')]
 

	
 
        This shouldn't be called directly; it is called via
 
          collapse_address_list([]).
 

	
 
    Args:
 
        addresses: A list of IPv4Network's or IPv6Network's
 

	
 
    Returns:
 
        A list of IPv4Network's or IPv6Network's depending on what we were
 
        passed.
 

	
 
    """
 
    ret_array = []
 
    optimized = False
 

	
 
    for cur_addr in addresses:
 
        if not ret_array:
 
            ret_array.append(cur_addr)
 
            continue
 
        if cur_addr in ret_array[-1]:
 
            optimized = True
 
        elif cur_addr == ret_array[-1].supernet().subnet()[1]:
 
            ret_array.append(ret_array.pop().supernet())
 
            optimized = True
 
        else:
 
            ret_array.append(cur_addr)
 

	
 
    if optimized:
 
        return _collapse_address_list_recursive(ret_array)
 

	
 
    return ret_array
 

	
 

	
 
def collapse_address_list(addresses):
 
    """Collapse a list of IP objects.
 

	
 
    Example:
 
        collapse_address_list([IPv4('1.1.0.0/24'), IPv4('1.1.1.0/24')]) ->
 
          [IPv4('1.1.0.0/23')]
 

	
 
    Args:
 
        addresses: A list of IPv4Network or IPv6Network objects.
 

	
 
    Returns:
 
        A list of IPv4Network or IPv6Network objects depending on what we
 
        were passed.
 

	
 
    Raises:
 
        TypeError: If passed a list of mixed version objects.
 

	
 
    """
 
    i = 0
 
    addrs = []
 
    ips = []
 
    nets = []
 

	
 
    # split IP addresses and networks
 
    for ip in addresses:
 
        if isinstance(ip, _BaseIP):
 
            if ips and ips[-1]._version != ip._version:
 
                raise TypeError("%s and %s are not of the same version" % (
 
                        str(ip), str(ips[-1])))
 
            ips.append(ip)
 
        elif ip._prefixlen == ip._max_prefixlen:
 
            if ips and ips[-1]._version != ip._version:
 
                raise TypeError("%s and %s are not of the same version" % (
 
                        str(ip), str(ips[-1])))
 
            ips.append(ip.ip)
 
        else:
 
            if nets and nets[-1]._version != ip._version:
 
                raise TypeError("%s and %s are not of the same version" % (
 
                        str(ip), str(nets[-1])))
 
            nets.append(ip)
 

	
 
    # sort and dedup
 
    ips = sorted(set(ips))
 
    nets = sorted(set(nets))
 

	
 
    while i < len(ips):
 
        (first, last) = _find_address_range(ips[i:])
 
        i = ips.index(last) + 1
 
        addrs.extend(summarize_address_range(first, last))
 

	
 
    return _collapse_address_list_recursive(sorted(
 
        addrs + nets, key=_BaseNet._get_networks_key))
 

	
 
# backwards compatibility
 
CollapseAddrList = collapse_address_list
 

	
 
# We need to distinguish between the string and packed-bytes representations
 
# of an IP address.  For example, b'0::1' is the IPv4 address 48.58.58.49,
 
# while '0::1' is an IPv6 address.
 
#
 
# In Python 3, the native 'bytes' type already provides this functionality,
 
# so we use it directly.  For earlier implementations where bytes is not a
 
# distinct type, we create a subclass of str to serve as a tag.
 
#
 
# Usage example (Python 2):
 
#   ip = ipaddr.IPAddress(ipaddr.Bytes('xxxx'))
 
#
 
# Usage example (Python 3):
 
#   ip = ipaddr.IPAddress(b'xxxx')
 
try:
 
    if bytes is str:
 
        raise TypeError("bytes is not a distinct type")
 
    Bytes = bytes
 
except (NameError, TypeError):
 
    class Bytes(str):
 
        def __repr__(self):
 
            return 'Bytes(%s)' % str.__repr__(self)
 

	
 

	
 
def get_mixed_type_key(obj):
 
    """Return a key suitable for sorting between networks and addresses.
 

	
 
    Address and Network objects are not sortable by default; they're
 
    fundamentally different so the expression
 

	
 
        IPv4Address('1.1.1.1') <= IPv4Network('1.1.1.1/24')
 

	
 
    doesn't make any sense.  There are some times however, where you may wish
 
    to have ipaddr sort these for you anyway. If you need to do this, you
 
    can use this function as the key= argument to sorted().
 

	
 
    Args:
 
      obj: either a Network or Address object.
 
    Returns:
 
      appropriate key.
 

	
 
    """
 
    if isinstance(obj, _BaseNet):
 
        return obj._get_networks_key()
 
    elif isinstance(obj, _BaseIP):
 
        return obj._get_address_key()
 
    return NotImplemented
 

	
 

	
 
class _IPAddrBase(object):
 

	
 
    """The mother class."""
 

	
 
    def __index__(self):
 
        return self._ip
 

	
 
    def __int__(self):
 
        return self._ip
 

	
 
    def __hex__(self):
 
        return hex(self._ip)
 

	
 
    @property
 
    def exploded(self):
 
        """Return the longhand version of the IP address as a string."""
 
        return self._explode_shorthand_ip_string()
 

	
 
    @property
 
    def compressed(self):
 
        """Return the shorthand version of the IP address as a string."""
 
        return str(self)
 

	
 

	
 
class _BaseIP(_IPAddrBase):
 

	
 
    """A generic IP object.
 

	
 
    This IP class contains the version independent methods which are
 
    used by single IP addresses.
 

	
 
    """
 

	
 
    def __eq__(self, other):
 
        try:
 
            return (self._ip == other._ip
 
                    and self._version == other._version)
 
        except AttributeError:
 
            return NotImplemented
 

	
 
    def __ne__(self, other):
 
        eq = self.__eq__(other)
 
        if eq is NotImplemented:
 
            return NotImplemented
 
        return not eq
 

	
 
    def __le__(self, other):
 
        gt = self.__gt__(other)
 
        if gt is NotImplemented:
 
            return NotImplemented
 
        return not gt
 

	
 
    def __ge__(self, other):
 
        lt = self.__lt__(other)
 
        if lt is NotImplemented:
 
            return NotImplemented
 
        return not lt
 

	
 
    def __lt__(self, other):
 
        if self._version != other._version:
 
            raise TypeError('%s and %s are not of the same version' % (
 
                    str(self), str(other)))
 
        if not isinstance(other, _BaseIP):
 
            raise TypeError('%s and %s are not of the same type' % (
 
                    str(self), str(other)))
 
        if self._ip != other._ip:
 
            return self._ip < other._ip
 
        return False
 

	
 
    def __gt__(self, other):
 
        if self._version != other._version:
 
            raise TypeError('%s and %s are not of the same version' % (
 
                    str(self), str(other)))
 
        if not isinstance(other, _BaseIP):
 
            raise TypeError('%s and %s are not of the same type' % (
 
                    str(self), str(other)))
 
        if self._ip != other._ip:
 
            return self._ip > other._ip
 
        return False
 

	
 
    # Shorthand for Integer addition and subtraction. This is not
 
    # meant to ever support addition/subtraction of addresses.
 
    def __add__(self, other):
 
        if not isinstance(other, int):
 
            return NotImplemented
 
        return IPAddress(int(self) + other, version=self._version)
 

	
 
    def __sub__(self, other):
 
        if not isinstance(other, int):
 
            return NotImplemented
 
        return IPAddress(int(self) - other, version=self._version)
 

	
 
    def __repr__(self):
 
        return '%s(%r)' % (self.__class__.__name__, str(self))
 

	
 
    def __str__(self):
 
        return  '%s' % self._string_from_ip_int(self._ip)
 
        return self._string_from_ip_int(self._ip)
 

	
 
    def __hash__(self):
 
        return hash(hex(long(self._ip)))
 

	
 
    def _get_address_key(self):
 
        return (self._version, self)
 

	
 
    @property
 
    def version(self):
 
        raise NotImplementedError('BaseIP has no version')
 

	
 

	
 
class _BaseNet(_IPAddrBase):
 

	
 
    """A generic IP object.
 

	
 
    This IP class contains the version independent methods which are
 
    used by networks.
 

	
 
    """
 

	
 
    def __init__(self, address):
 
        self._cache = {}
 

	
 
    def __repr__(self):
 
        return '%s(%r)' % (self.__class__.__name__, str(self))
 

	
 
    def iterhosts(self):
 
        """Generate Iterator over usable hosts in a network.
 

	
 
           This is like __iter__ except it doesn't return the network
 
           or broadcast addresses.
 

	
 
        """
 
        cur = int(self.network) + 1
 
        bcast = int(self.broadcast) - 1
 
        while cur <= bcast:
 
            cur += 1
 
            yield IPAddress(cur - 1, version=self._version)
 

	
 
    def __iter__(self):
 
        cur = int(self.network)
 
        bcast = int(self.broadcast)
 
        while cur <= bcast:
 
            cur += 1
 
            yield IPAddress(cur - 1, version=self._version)
 

	
 
    def __getitem__(self, n):
 
        network = int(self.network)
 
        broadcast = int(self.broadcast)
 
        if n >= 0:
 
            if network + n > broadcast:
 
                raise IndexError
 
            return IPAddress(network + n, version=self._version)
 
        else:
 
            n += 1
 
            if broadcast + n < network:
 
                raise IndexError
 
            return IPAddress(broadcast + n, version=self._version)
 

	
 
    def __lt__(self, other):
 
        if self._version != other._version:
 
            raise TypeError('%s and %s are not of the same version' % (
 
                    str(self), str(other)))
 
        if not isinstance(other, _BaseNet):
 
            raise TypeError('%s and %s are not of the same type' % (
 
                    str(self), str(other)))
 
        if self.network != other.network:
 
            return self.network < other.network
 
        if self.netmask != other.netmask:
 
            return self.netmask < other.netmask
 
        return False
 

	
 
    def __gt__(self, other):
 
        if self._version != other._version:
 
            raise TypeError('%s and %s are not of the same version' % (
 
                    str(self), str(other)))
 
        if not isinstance(other, _BaseNet):
 
            raise TypeError('%s and %s are not of the same type' % (
 
                    str(self), str(other)))
 
        if self.network != other.network:
 
            return self.network > other.network
 
        if self.netmask != other.netmask:
 
            return self.netmask > other.netmask
 
        return False
 

	
 
    def __le__(self, other):
 
        gt = self.__gt__(other)
 
        if gt is NotImplemented:
 
            return NotImplemented
 
        return not gt
 

	
 
    def __ge__(self, other):
 
        lt = self.__lt__(other)
 
        if lt is NotImplemented:
 
            return NotImplemented
 
        return not lt
 

	
 
    def __eq__(self, other):
 
        try:
 
            return (self._version == other._version
 
                    and self.network == other.network
 
                    and int(self.netmask) == int(other.netmask))
 
        except AttributeError:
 
            if isinstance(other, _BaseIP):
 
                return (self._version == other._version
 
                        and self._ip == other._ip)
 

	
 
    def __ne__(self, other):
 
        eq = self.__eq__(other)
 
        if eq is NotImplemented:
 
            return NotImplemented
 
        return not eq
 

	
 
    def __str__(self):
 
        return  '%s/%s' % (str(self.ip),
 
                           str(self._prefixlen))
 

	
 
    def __hash__(self):
 
        return hash(int(self.network) ^ int(self.netmask))
 

	
 
    def __contains__(self, other):
 
        # always false if one is v4 and the other is v6.
 
        if self._version != other._version:
 
            return False
 
        # dealing with another network.
 
        if isinstance(other, _BaseNet):
 
            return (self.network <= other.network and
 
                    self.broadcast >= other.broadcast)
 
        # dealing with another address
 
        else:
 
            return (int(self.network) <= int(other._ip) <=
 
                    int(self.broadcast))
 

	
 
    def overlaps(self, other):
 
        """Tell if self is partly contained in other."""
 
        return self.network in other or self.broadcast in other or (
 
            other.network in self or other.broadcast in self)
 

	
 
    @property
 
    def network(self):
 
        x = self._cache.get('network')
 
        if x is None:
 
            x = IPAddress(self._ip & int(self.netmask), version=self._version)
 
            self._cache['network'] = x
 
        return x
 

	
 
    @property
 
    def broadcast(self):
 
        x = self._cache.get('broadcast')
 
        if x is None:
 
            x = IPAddress(self._ip | int(self.hostmask), version=self._version)
 
            self._cache['broadcast'] = x
 
        return x
 

	
 
    @property
 
    def hostmask(self):
 
        x = self._cache.get('hostmask')
 
        if x is None:
 
            x = IPAddress(int(self.netmask) ^ self._ALL_ONES,
 
                          version=self._version)
 
            self._cache['hostmask'] = x
 
        return x
 

	
 
    @property
 
    def with_prefixlen(self):
 
        return '%s/%d' % (str(self.ip), self._prefixlen)
 

	
 
    @property
 
    def with_netmask(self):
 
        return '%s/%s' % (str(self.ip), str(self.netmask))
 

	
 
    @property
 
    def with_hostmask(self):
 
        return '%s/%s' % (str(self.ip), str(self.hostmask))
 

	
 
    @property
 
    def numhosts(self):
 
        """Number of hosts in the current subnet."""
 
        return int(self.broadcast) - int(self.network) + 1
 

	
 
    @property
 
    def version(self):
 
        raise NotImplementedError('BaseNet has no version')
 

	
 
    @property
 
    def prefixlen(self):
 
        return self._prefixlen
 

	
 
    def address_exclude(self, other):
 
        """Remove an address from a larger block.
 

	
 
        For example:
 

	
 
            addr1 = IPNetwork('10.1.1.0/24')
 
            addr2 = IPNetwork('10.1.1.0/26')
 
            addr1.address_exclude(addr2) =
 
                [IPNetwork('10.1.1.64/26'), IPNetwork('10.1.1.128/25')]
 

	
 
        or IPv6:
 

	
 
            addr1 = IPNetwork('::1/32')
 
            addr2 = IPNetwork('::1/128')
 
            addr1.address_exclude(addr2) = [IPNetwork('::0/128'),
 
                IPNetwork('::2/127'),
 
                IPNetwork('::4/126'),
 
                IPNetwork('::8/125'),
 
                ...
 
                IPNetwork('0:0:8000::/33')]
 

	
 
        Args:
 
            other: An IPvXNetwork object of the same type.
 

	
 
        Returns:
 
            A sorted list of IPvXNetwork objects addresses which is self
 
            minus other.
 

	
 
        Raises:
 
            TypeError: If self and other are of differing address
 
              versions, or if other is not a network object.
 
            ValueError: If other is not completely contained by self.
 

	
 
        """
 
        if not self._version == other._version:
 
            raise TypeError("%s and %s are not of the same version" % (
 
                str(self), str(other)))
 

	
 
        if not isinstance(other, _BaseNet):
 
            raise TypeError("%s is not a network object" % str(other))
 

	
 
        if other not in self:
 
            raise ValueError('%s not contained in %s' % (str(other),
 
                                                         str(self)))
 
        if other == self:
 
            return []
 

	
 
        ret_addrs = []
 

	
 
        # Make sure we're comparing the network of other.
 
        other = IPNetwork('%s/%s' % (str(other.network), str(other.prefixlen)),
 
                   version=other._version)
 

	
 
        s1, s2 = self.subnet()
 
        while s1 != other and s2 != other:
 
            if other in s1:
 
                ret_addrs.append(s2)
 
                s1, s2 = s1.subnet()
 
            elif other in s2:
 
                ret_addrs.append(s1)
 
                s1, s2 = s2.subnet()
 
            else:
 
                # If we got here, there's a bug somewhere.
 
                assert True == False, ('Error performing exclusion: '
 
                                       's1: %s s2: %s other: %s' %
 
                                       (str(s1), str(s2), str(other)))
 
        if s1 == other:
 
            ret_addrs.append(s2)
 
        elif s2 == other:
 
            ret_addrs.append(s1)
 
        else:
 
            # If we got here, there's a bug somewhere.
 
            assert True == False, ('Error performing exclusion: '
 
                                   's1: %s s2: %s other: %s' %
 
                                   (str(s1), str(s2), str(other)))
 

	
 
        return sorted(ret_addrs, key=_BaseNet._get_networks_key)
 

	
 
    def compare_networks(self, other):
 
        """Compare two IP objects.
 

	
 
        This is only concerned about the comparison of the integer
 
        representation of the network addresses.  This means that the
 
        host bits aren't considered at all in this method.  If you want
 
        to compare host bits, you can easily enough do a
 
        'HostA._ip < HostB._ip'
 

	
 
        Args:
 
            other: An IP object.
 

	
 
        Returns:
 
            If the IP versions of self and other are the same, returns:
 

	
 
            -1 if self < other:
 
              eg: IPv4('1.1.1.0/24') < IPv4('1.1.2.0/24')
 
              IPv6('1080::200C:417A') < IPv6('1080::200B:417B')
 
            0 if self == other
 
              eg: IPv4('1.1.1.1/24') == IPv4('1.1.1.2/24')
 
              IPv6('1080::200C:417A/96') == IPv6('1080::200C:417B/96')
 
            1 if self > other
 
              eg: IPv4('1.1.1.0/24') > IPv4('1.1.0.0/24')
 
              IPv6('1080::1:200C:417A/112') >
 
              IPv6('1080::0:200C:417A/112')
 

	
 
            If the IP versions of self and other are different, returns:
 

	
 
            -1 if self._version < other._version
 
              eg: IPv4('10.0.0.1/24') < IPv6('::1/128')
 
            1 if self._version > other._version
 
              eg: IPv6('::1/128') > IPv4('255.255.255.0/24')
 

	
 
        """
 
        if self._version < other._version:
 
            return -1
 
        if self._version > other._version:
 
            return 1
 
        # self._version == other._version below here:
 
        if self.network < other.network:
 
            return -1
 
        if self.network > other.network:
 
            return 1
 
        # self.network == other.network below here:
 
        if self.netmask < other.netmask:
 
            return -1
 
        if self.netmask > other.netmask:
 
            return 1
 
        # self.network == other.network and self.netmask == other.netmask
 
        return 0
 

	
 
    def _get_networks_key(self):
 
        """Network-only key function.
 

	
 
        Returns an object that identifies this address' network and
 
        netmask. This function is a suitable "key" argument for sorted()
 
        and list.sort().
 

	
 
        """
 
        return (self._version, self.network, self.netmask)
 

	
 
    def _ip_int_from_prefix(self, prefixlen=None):
 
        """Turn the prefix length netmask into a int for comparison.
 

	
 
        Args:
 
            prefixlen: An integer, the prefix length.
 

	
 
        Returns:
 
            An integer.
 

	
 
        """
 
        if not prefixlen and prefixlen != 0:
 
            prefixlen = self._prefixlen
 
        return self._ALL_ONES ^ (self._ALL_ONES >> prefixlen)
 

	
 
    def _prefix_from_ip_int(self, ip_int, mask=32):
 
        """Return prefix length from the decimal netmask.
 

	
 
        Args:
 
            ip_int: An integer, the IP address.
 
            mask: The netmask.  Defaults to 32.
 

	
 
        Returns:
 
            An integer, the prefix length.
 

	
 
        """
 
        while mask:
 
            if ip_int & 1 == 1:
 
                break
 
            ip_int >>= 1
 
            mask -= 1
 

	
 
        return mask
 

	
 
    def _ip_string_from_prefix(self, prefixlen=None):
 
        """Turn a prefix length into a dotted decimal string.
 

	
 
        Args:
 
            prefixlen: An integer, the netmask prefix length.
 

	
 
        Returns:
 
            A string, the dotted decimal netmask string.
 

	
 
        """
 
        if not prefixlen:
 
            prefixlen = self._prefixlen
 
        return self._string_from_ip_int(self._ip_int_from_prefix(prefixlen))
 

	
 
    def iter_subnets(self, prefixlen_diff=1, new_prefix=None):
 
        """The subnets which join to make the current subnet.
 

	
 
        In the case that self contains only one IP
 
        (self._prefixlen == 32 for IPv4 or self._prefixlen == 128
 
        for IPv6), return a list with just ourself.
 

	
 
        Args:
 
            prefixlen_diff: An integer, the amount the prefix length
 
              should be increased by. This should not be set if
 
              new_prefix is also set.
 
            new_prefix: The desired new prefix length. This must be a
 
              larger number (smaller prefix) than the existing prefix.
 
              This should not be set if prefixlen_diff is also set.
 

	
 
        Returns:
 
            An iterator of IPv(4|6) objects.
 

	
 
        Raises:
 
            ValueError: The prefixlen_diff is too small or too large.
 
                OR
 
            prefixlen_diff and new_prefix are both set or new_prefix
 
              is a smaller number than the current prefix (smaller
 
              number means a larger network)
 

	
 
        """
 
        if self._prefixlen == self._max_prefixlen:
 
            yield self
 
            return
 

	
 
        if new_prefix is not None:
 
            if new_prefix < self._prefixlen:
 
                raise ValueError('new prefix must be longer')
 
            if prefixlen_diff != 1:
 
                raise ValueError('cannot set prefixlen_diff and new_prefix')
 
            prefixlen_diff = new_prefix - self._prefixlen
 

	
 
        if prefixlen_diff < 0:
 
            raise ValueError('prefix length diff must be > 0')
 
        new_prefixlen = self._prefixlen + prefixlen_diff
 

	
 
        if not self._is_valid_netmask(str(new_prefixlen)):
 
            raise ValueError(
 
                'prefix length diff %d is invalid for netblock %s' % (
 
                    new_prefixlen, str(self)))
 

	
 
        first = IPNetwork('%s/%s' % (str(self.network),
 
                                     str(self._prefixlen + prefixlen_diff)),
 
                         version=self._version)
 

	
 
        yield first
 
        current = first
 
        while True:
 
            broadcast = current.broadcast
 
            if broadcast == self.broadcast:
 
                return
 
            new_addr = IPAddress(int(broadcast) + 1, version=self._version)
 
            current = IPNetwork('%s/%s' % (str(new_addr), str(new_prefixlen)),
 
                                version=self._version)
 

	
 
            yield current
 

	
 
    def masked(self):
 
        """Return the network object with the host bits masked out."""
 
        return IPNetwork('%s/%d' % (self.network, self._prefixlen),
 
                         version=self._version)
 

	
 
    def subnet(self, prefixlen_diff=1, new_prefix=None):
 
        """Return a list of subnets, rather than an iterator."""
 
        return list(self.iter_subnets(prefixlen_diff, new_prefix))
 

	
 
    def supernet(self, prefixlen_diff=1, new_prefix=None):
 
        """The supernet containing the current network.
 

	
 
        Args:
 
            prefixlen_diff: An integer, the amount the prefix length of
 
              the network should be decreased by.  For example, given a
 
              /24 network and a prefixlen_diff of 3, a supernet with a
 
              /21 netmask is returned.
 

	
 
        Returns:
 
            An IPv4 network object.
 

	
 
        Raises:
 
            ValueError: If self.prefixlen - prefixlen_diff < 0. I.e., you have a
 
              negative prefix length.
 
                OR
 
            If prefixlen_diff and new_prefix are both set or new_prefix is a
 
              larger number than the current prefix (larger number means a
 
              smaller network)
 

	
 
        """
 
        if self._prefixlen == 0:
 
            return self
 

	
 
        if new_prefix is not None:
 
            if new_prefix > self._prefixlen:
 
                raise ValueError('new prefix must be shorter')
 
            if prefixlen_diff != 1:
 
                raise ValueError('cannot set prefixlen_diff and new_prefix')
 
            prefixlen_diff = self._prefixlen - new_prefix
 

	
 
        if self.prefixlen - prefixlen_diff < 0:
 
            raise ValueError(
 
                'current prefixlen is %d, cannot have a prefixlen_diff of %d' %
 
                (self.prefixlen, prefixlen_diff))
 
        return IPNetwork('%s/%s' % (str(self.network),
 
                                    str(self.prefixlen - prefixlen_diff)),
 
                         version=self._version)
 

	
 
    # backwards compatibility
 
    Subnet = subnet
 
    Supernet = supernet
 
    AddressExclude = address_exclude
 
    CompareNetworks = compare_networks
 
    Contains = __contains__
 

	
 

	
 
class _BaseV4(object):
 

	
 
    """Base IPv4 object.
 

	
 
    The following methods are used by IPv4 objects in both single IP
 
    addresses and networks.
 

	
 
    """
 

	
 
    # Equivalent to 255.255.255.255 or 32 bits of 1's.
 
    _ALL_ONES = (2 ** IPV4LENGTH) - 1
 
    _DECIMAL_DIGITS = frozenset('0123456789')
 

	
 
    def __init__(self, address):
 
        self._version = 4
 
        self._max_prefixlen = IPV4LENGTH
 

	
 
    def _explode_shorthand_ip_string(self):
 
        return str(self)
 

	
 
    def _ip_int_from_string(self, ip_str):
 
        """Turn the given IP string into an integer for comparison.
 

	
 
        Args:
 
            ip_str: A string, the IP ip_str.
 

	
 
        Returns:
 
            The IP ip_str as an integer.
 

	
 
        Raises:
 
            AddressValueError: if ip_str isn't a valid IPv4 Address.
 

	
 
        """
 
        octets = ip_str.split('.')
 
        if len(octets) != 4:
 
            raise AddressValueError(ip_str)
 

	
 
        packed_ip = 0
 
        for oc in octets:
 
            try:
 
                packed_ip = (packed_ip << 8) | self._parse_octet(oc)
 
            except ValueError:
 
                raise AddressValueError(ip_str)
 
        return packed_ip
 

	
 
    def _parse_octet(self, octet_str):
 
        """Convert a decimal octet into an integer.
 

	
 
        Args:
 
            octet_str: A string, the number to parse.
 

	
 
        Returns:
 
            The octet as an integer.
 

	
 
        Raises:
 
            ValueError: if the octet isn't strictly a decimal from [0..255].
 

	
 
        """
 
        # Whitelist the characters, since int() allows a lot of bizarre stuff.
 
        if not self._DECIMAL_DIGITS.issuperset(octet_str):
 
            raise ValueError
 
        octet_int = int(octet_str, 10)
 
        # Disallow leading zeroes, because no clear standard exists on
 
        # whether these should be interpreted as decimal or octal.
 
        if octet_int > 255 or (octet_str[0] == '0' and len(octet_str) > 1):
 
            raise ValueError
 
        return octet_int
 

	
 
    def _string_from_ip_int(self, ip_int):
 
        """Turns a 32-bit integer into dotted decimal notation.
 

	
 
        Args:
 
            ip_int: An integer, the IP address.
 

	
 
        Returns:
 
            The IP address as a string in dotted decimal notation.
 

	
 
        """
 
        octets = []
 
        for _ in xrange(4):
 
            octets.insert(0, str(ip_int & 0xFF))
 
            ip_int >>= 8
 
        return '.'.join(octets)
 

	
 
    @property
 
    def max_prefixlen(self):
 
        return self._max_prefixlen
 

	
 
    @property
 
    def packed(self):
 
        """The binary representation of this address."""
 
        return v4_int_to_packed(self._ip)
 

	
 
    @property
 
    def version(self):
 
        return self._version
 

	
 
    @property
 
    def is_reserved(self):
 
        """Test if the address is otherwise IETF reserved.
 

	
 
         Returns:
 
             A boolean, True if the address is within the
 
             reserved IPv4 Network range.
 

	
 
        """
 
        return self in IPv4Network('240.0.0.0/4')
 

	
 
    @property
 
    def is_private(self):
 
        """Test if this address is allocated for private networks.
 

	
 
        Returns:
 
            A boolean, True if the address is reserved per RFC 1918.
 

	
 
        """
 
        return (self in IPv4Network('10.0.0.0/8') or
 
                self in IPv4Network('172.16.0.0/12') or
 
                self in IPv4Network('192.168.0.0/16'))
 

	
 
    @property
 
    def is_multicast(self):
 
        """Test if the address is reserved for multicast use.
 

	
 
        Returns:
 
            A boolean, True if the address is multicast.
 
            See RFC 3171 for details.
 

	
 
        """
 
        return self in IPv4Network('224.0.0.0/4')
 

	
 
    @property
 
    def is_unspecified(self):
 
        """Test if the address is unspecified.
 

	
 
        Returns:
 
            A boolean, True if this is the unspecified address as defined in
 
            RFC 5735 3.
 

	
 
        """
 
        return self in IPv4Network('0.0.0.0')
 

	
 
    @property
 
    def is_loopback(self):
 
        """Test if the address is a loopback address.
 

	
 
        Returns:
 
            A boolean, True if the address is a loopback per RFC 3330.
 

	
 
        """
 
        return self in IPv4Network('127.0.0.0/8')
 

	
 
    @property
 
    def is_link_local(self):
 
        """Test if the address is reserved for link-local.
 

	
 
        Returns:
 
            A boolean, True if the address is link-local per RFC 3927.
 

	
 
        """
 
        return self in IPv4Network('169.254.0.0/16')
 

	
 

	
 
class IPv4Address(_BaseV4, _BaseIP):
 

	
 
    """Represent and manipulate single IPv4 Addresses."""
 

	
 
    def __init__(self, address):
 

	
 
        """
 
        Args:
 
            address: A string or integer representing the IP
 
              '192.168.1.1'
 

	
 
              Additionally, an integer can be passed, so
 
              IPv4Address('192.168.1.1') == IPv4Address(3232235777).
 
              or, more generally
 
              IPv4Address(int(IPv4Address('192.168.1.1'))) ==
 
                IPv4Address('192.168.1.1')
 

	
 
        Raises:
 
            AddressValueError: If ipaddr isn't a valid IPv4 address.
 

	
 
        """
 
        _BaseV4.__init__(self, address)
 

	
 
        # Efficient constructor from integer.
 
        if isinstance(address, (int, long)):
 
            self._ip = address
 
            if address < 0 or address > self._ALL_ONES:
 
                raise AddressValueError(address)
 
            return
 

	
 
        # Constructing from a packed address
 
        if isinstance(address, Bytes):
 
            try:
 
                self._ip, = struct.unpack('!I', address)
 
            except struct.error:
 
                raise AddressValueError(address)  # Wrong length.
 
            return
 

	
 
        # Assume input argument to be string or any object representation
 
        # which converts into a formatted IP string.
 
        addr_str = str(address)
 
        self._ip = self._ip_int_from_string(addr_str)
 

	
 

	
 
class IPv4Network(_BaseV4, _BaseNet):
 

	
 
    """This class represents and manipulates 32-bit IPv4 networks.
 

	
 
    Attributes: [examples for IPv4Network('1.2.3.4/27')]
 
        ._ip: 16909060
 
        .ip: IPv4Address('1.2.3.4')
 
        .network: IPv4Address('1.2.3.0')
 
        .hostmask: IPv4Address('0.0.0.31')
 
        .broadcast: IPv4Address('1.2.3.31')
 
        .netmask: IPv4Address('255.255.255.224')
 
        .prefixlen: 27
 

	
 
    """
 

	
 
    # the valid octets for host and netmasks. only useful for IPv4.
 
    _valid_mask_octets = set((255, 254, 252, 248, 240, 224, 192, 128, 0))
 

	
 
    def __init__(self, address, strict=False):
 
        """Instantiate a new IPv4 network object.
 

	
 
        Args:
 
            address: A string or integer representing the IP [& network].
 
              '192.168.1.1/24'
 
              '192.168.1.1/255.255.255.0'
 
              '192.168.1.1/0.0.0.255'
 
              are all functionally the same in IPv4. Similarly,
 
              '192.168.1.1'
 
              '192.168.1.1/255.255.255.255'
 
              '192.168.1.1/32'
 
              are also functionaly equivalent. That is to say, failing to
 
              provide a subnetmask will create an object with a mask of /32.
 

	
 
              If the mask (portion after the / in the argument) is given in
 
              dotted quad form, it is treated as a netmask if it starts with a
 
              non-zero field (e.g. /255.0.0.0 == /8) and as a hostmask if it
 
              starts with a zero field (e.g. 0.255.255.255 == /8), with the
 
              single exception of an all-zero mask which is treated as a
 
              netmask == /0. If no mask is given, a default of /32 is used.
 

	
 
              Additionally, an integer can be passed, so
 
              IPv4Network('192.168.1.1') == IPv4Network(3232235777).
 
              or, more generally
 
              IPv4Network(int(IPv4Network('192.168.1.1'))) ==
 
                IPv4Network('192.168.1.1')
 

	
 
            strict: A boolean. If true, ensure that we have been passed
 
              A true network address, eg, 192.168.1.0/24 and not an
 
              IP address on a network, eg, 192.168.1.1/24.
 

	
 
        Raises:
 
            AddressValueError: If ipaddr isn't a valid IPv4 address.
 
            NetmaskValueError: If the netmask isn't valid for
 
              an IPv4 address.
 
            ValueError: If strict was True and a network address was not
 
              supplied.
 

	
 
        """
 
        _BaseNet.__init__(self, address)
 
        _BaseV4.__init__(self, address)
 

	
 
        # Constructing from an integer or packed bytes.
 
        if isinstance(address, (int, long, Bytes)):
 
            self.ip = IPv4Address(address)
 
            self._ip = self.ip._ip
 
            self._prefixlen = self._max_prefixlen
 
            self.netmask = IPv4Address(self._ALL_ONES)
 
            return
kallithea/lib/pidlock.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
from __future__ import with_statement
 
import os
 
import errno
 

	
 
from multiprocessing.util import Finalize
 

	
 
from kallithea.lib.compat import kill
 

	
 

	
 
class LockHeld(Exception):
 
    pass
 

	
 

	
 
class DaemonLock(object):
 
    """daemon locking
 
    USAGE:
 
    try:
 
        l = DaemonLock(file_='/path/tolockfile',desc='test lock')
 
        main()
 
        l.release()
 
    except LockHeld:
 
        sys.exit(1)
 
    """
 

	
 
    def __init__(self, file_=None, callbackfn=None,
 
                 desc='daemon lock', debug=False):
 

	
 
        lock_name = os.path.join(os.path.dirname(__file__), 'running.lock')
 
        self.pidfile = file_ if file_ else lock_name
 
        self.callbackfn = callbackfn
 
        self.desc = desc
 
        self.debug = debug
 
        self.held = False
 
        #run the lock automatically !
 
        self.lock()
 
        self._finalize = Finalize(self, DaemonLock._on_finalize,
 
                                  args=(self, debug), exitpriority=10)
 

	
 
    @staticmethod
 
    def _on_finalize(lock, debug):
 
        if lock.held:
 
            if debug:
 
                print 'lock held finalizing and running lock.release()'
 
            lock.release()
 

	
 
    def lock(self):
 
        """
 
        locking function, if lock is present it
 
        will raise LockHeld exception
 
        """
 
        lockname = '%s' % (os.getpid())
 
        lockname = str(os.getpid())
 
        if self.debug:
 
            print 'running lock'
 
        self.trylock()
 
        self.makelock(lockname, self.pidfile)
 
        return True
 

	
 
    def trylock(self):
 
        running_pid = False
 
        if self.debug:
 
            print 'checking for already running process'
 
        try:
 
            with open(self.pidfile, 'r') as f:
 
                try:
 
                    running_pid = int(f.readline())
 
                except ValueError:
 
                    running_pid = -1
 

	
 
            if self.debug:
 
                print ('lock file present running_pid: %s, '
 
                       'checking for execution' % (running_pid,))
 
            # Now we check the PID from lock file matches to the current
 
            # process PID
 
            if running_pid:
 
                try:
 
                    kill(running_pid, 0)
 
                except OSError, exc:
 
                    if exc.errno in (errno.ESRCH, errno.EPERM):
 
                        print ("Lock File is there but"
 
                               " the program is not running")
 
                        print "Removing lock file for the: %s" % running_pid
 
                        self.release()
 
                    else:
 
                        raise
 
                else:
 
                    print "You already have an instance of the program running"
 
                    print "It is running as process %s" % running_pid
 
                    raise LockHeld()
 

	
 
        except IOError, e:
 
            if e.errno != 2:
 
                raise
 

	
 
    def release(self):
 
        """releases the pid by removing the pidfile
 
        """
 
        if self.debug:
 
            print 'trying to release the pidlock'
 

	
 
        if self.callbackfn:
 
            #execute callback function on release
 
            if self.debug:
 
                print 'executing callback function %s' % self.callbackfn
 
            self.callbackfn()
 
        try:
 
            if self.debug:
 
                print 'removing pidfile %s' % self.pidfile
 
            os.remove(self.pidfile)
 
            self.held = False
 
        except OSError, e:
 
            if self.debug:
 
                print 'removing pidfile failed %s' % e
 
            pass
 

	
 
    def makelock(self, lockname, pidfile):
 
        """
 
        this function will make an actual lock
 

	
 
        :param lockname: actual pid of file
 
        :param pidfile: the file to write the pid in
 
        """
 
        if self.debug:
 
            print 'creating a file %s and pid: %s' % (pidfile, lockname)
 

	
 
        dir_, file_ = os.path.split(pidfile)
 
        if not os.path.isdir(dir_):
 
            os.makedirs(dir_)
 
        with open(self.pidfile, 'wb') as f:
 
            f.write(lockname)
 
        self.held = True
kallithea/lib/vcs/subprocessio.py
Show inline comments
 
"""
 
Module provides a class allowing to wrap communication over subprocess.Popen
 
input, output, error streams into a meaningfull, non-blocking, concurrent
 
stream processor exposing the output data as an iterator fitting to be a
 
return value passed by a WSGI applicaiton to a WSGI server per PEP 3333.
 

	
 
Copyright (c) 2011  Daniel Dotsenko <dotsa[at]hotmail.com>
 

	
 
This file is part of git_http_backend.py Project.
 

	
 
git_http_backend.py Project is free software: you can redistribute it and/or
 
modify it under the terms of the GNU Lesser General Public License as
 
published by the Free Software Foundation, either version 2.1 of the License,
 
or (at your option) any later version.
 

	
 
git_http_backend.py Project is distributed in the hope that it will be useful,
 
but WITHOUT ANY WARRANTY; without even the implied warranty of
 
MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
 
GNU Lesser General Public License for more details.
 

	
 
You should have received a copy of the GNU Lesser General Public License
 
along with git_http_backend.py Project.
 
If not, see <http://www.gnu.org/licenses/>.
 
"""
 
import os
 
import subprocess
 
from kallithea.lib.vcs.utils.compat import deque, Event, Thread, _bytes, _bytearray
 

	
 

	
 
class StreamFeeder(Thread):
 
    """
 
    Normal writing into pipe-like is blocking once the buffer is filled.
 
    This thread allows a thread to seep data from a file-like into a pipe
 
    without blocking the main thread.
 
    We close inpipe once the end of the source stream is reached.
 
    """
 

	
 
    def __init__(self, source):
 
        super(StreamFeeder, self).__init__()
 
        self.daemon = True
 
        filelike = False
 
        self.bytes = _bytes()
 
        if type(source) in (type(''), _bytes, _bytearray):  # string-like
 
            self.bytes = _bytes(source)
 
        else:  # can be either file pointer or file-like
 
            if type(source) in (int, long):  # file pointer it is
 
                ## converting file descriptor (int) stdin into file-like
 
                source = os.fdopen(source, 'rb', 16384)
 
            # let's see if source is file-like by now
 
            filelike = hasattr(source, 'read')
 
        if not filelike and not self.bytes:
 
            raise TypeError("StreamFeeder's source object must be a readable "
 
                            "file-like, a file descriptor, or a string-like.")
 
        self.source = source
 
        self.readiface, self.writeiface = os.pipe()
 

	
 
    def run(self):
 
        t = self.writeiface
 
        if self.bytes:
 
            os.write(t, self.bytes)
 
        else:
 
            s = self.source
 
            b = s.read(4096)
 
            while b:
 
                os.write(t, b)
 
                b = s.read(4096)
 
        os.close(t)
 

	
 
    @property
 
    def output(self):
 
        return self.readiface
 

	
 

	
 
class InputStreamChunker(Thread):
 
    def __init__(self, source, target, buffer_size, chunk_size):
 

	
 
        super(InputStreamChunker, self).__init__()
 

	
 
        self.daemon = True  # die die die.
 

	
 
        self.source = source
 
        self.target = target
 
        self.chunk_count_max = int(buffer_size / chunk_size) + 1
 
        self.chunk_size = chunk_size
 

	
 
        self.data_added = Event()
 
        self.data_added.clear()
 

	
 
        self.keep_reading = Event()
 
        self.keep_reading.set()
 

	
 
        self.EOF = Event()
 
        self.EOF.clear()
 

	
 
        self.go = Event()
 
        self.go.set()
 

	
 
    def stop(self):
 
        self.go.clear()
 
        self.EOF.set()
 
        try:
 
            # this is not proper, but is done to force the reader thread let
 
            # go of the input because, if successful, .close() will send EOF
 
            # down the pipe.
 
            self.source.close()
 
        except:
 
            pass
 

	
 
    def run(self):
 
        s = self.source
 
        t = self.target
 
        cs = self.chunk_size
 
        ccm = self.chunk_count_max
 
        kr = self.keep_reading
 
        da = self.data_added
 
        go = self.go
 

	
 
        try:
 
            b = s.read(cs)
 
        except ValueError:
 
            b = ''
 

	
 
        while b and go.is_set():
 
            if len(t) > ccm:
 
                kr.clear()
 
                kr.wait(2)
 
                # # this only works on 2.7.x and up
 
                # if not kr.wait(10):
 
                #     raise Exception("Timed out while waiting for input to be read.")
 
                # instead we'll use this
 
                if len(t) > ccm + 3:
 
                    raise IOError(
 
                        "Timed out while waiting for input from subprocess.")
 
            t.append(b)
 
            da.set()
 
            try:
 
                b = s.read(cs)
 
            except ValueError: # probably "I/O operation on closed file"
 
                b = ''
 

	
 
        self.EOF.set()
 
        da.set()  # for cases when done but there was no input.
 

	
 

	
 
class BufferedGenerator(object):
 
    """
 
    Class behaves as a non-blocking, buffered pipe reader.
 
    Reads chunks of data (through a thread)
 
    from a blocking pipe, and attaches these to an array (Deque) of chunks.
 
    Reading is halted in the thread when max chunks is internally buffered.
 
    The .next() may operate in blocking or non-blocking fashion by yielding
 
    '' if no data is ready
 
    to be sent or by not returning until there is some data to send
 
    When we get EOF from underlying source pipe we raise the marker to raise
 
    StopIteration after the last chunk of data is yielded.
 
    """
 

	
 
    def __init__(self, source, buffer_size=65536, chunk_size=4096,
 
                 starting_values=[], bottomless=False):
 

	
 
        if bottomless:
 
            maxlen = int(buffer_size / chunk_size)
 
        else:
 
            maxlen = None
 

	
 
        self.data = deque(starting_values, maxlen)
 
        self.worker = InputStreamChunker(source, self.data, buffer_size,
 
                                         chunk_size)
 
        if starting_values:
 
            self.worker.data_added.set()
 
        self.worker.start()
 

	
 
    ####################
 
    # Generator's methods
 
    ####################
 

	
 
    def __iter__(self):
 
        return self
 

	
 
    def next(self):
 
        while not len(self.data) and not self.worker.EOF.is_set():
 
            self.worker.data_added.clear()
 
            self.worker.data_added.wait(0.2)
 
        if len(self.data):
 
            self.worker.keep_reading.set()
 
            return _bytes(self.data.popleft())
 
        elif self.worker.EOF.is_set():
 
            raise StopIteration
 

	
 
    def throw(self, type, value=None, traceback=None):
 
        if not self.worker.EOF.is_set():
 
            raise type(value)
 

	
 
    def start(self):
 
        self.worker.start()
 

	
 
    def stop(self):
 
        self.worker.stop()
 

	
 
    def close(self):
 
        try:
 
            self.worker.stop()
 
            self.throw(GeneratorExit)
 
        except (GeneratorExit, StopIteration):
 
            pass
 

	
 
    def __del__(self):
 
        self.close()
 

	
 
    ####################
 
    # Threaded reader's infrastructure.
 
    ####################
 
    @property
 
    def input(self):
 
        return self.worker.w
 

	
 
    @property
 
    def data_added_event(self):
 
        return self.worker.data_added
 

	
 
    @property
 
    def data_added(self):
 
        return self.worker.data_added.is_set()
 

	
 
    @property
 
    def reading_paused(self):
 
        return not self.worker.keep_reading.is_set()
 

	
 
    @property
 
    def done_reading_event(self):
 
        """
 
        Done_reading does not mean that the iterator's buffer is empty.
 
        Iterator might have done reading from underlying source, but the read
 
        chunks might still be available for serving through .next() method.
 

	
 
        :returns: An Event class instance.
 
        """
 
        return self.worker.EOF
 

	
 
    @property
 
    def done_reading(self):
 
        """
 
        Done_reading does not mean that the iterator's buffer is empty.
 
        Iterator might have done reading from underlying source, but the read
 
        chunks might still be available for serving through .next() method.
 

	
 
        :returns: An Bool value.
 
        """
 
        return self.worker.EOF.is_set()
 

	
 
    @property
 
    def length(self):
 
        """
 
        returns int.
 

	
 
        This is the length of the queue of chunks, not the length of
 
        the combined contents in those chunks.
 

	
 
        __len__() cannot be meaningfully implemented because this
 
        reader is just flying through a bottomless pit content and
 
        can only know the length of what it already saw.
 

	
 
        If __len__() on WSGI server per PEP 3333 returns a value,
 
        the response's length will be set to that. In order not to
 
        confuse WSGI PEP3333 servers, we will not implement __len__
 
        at all.
 
        """
 
        return len(self.data)
 

	
 
    def prepend(self, x):
 
        self.data.appendleft(x)
 

	
 
    def append(self, x):
 
        self.data.append(x)
 

	
 
    def extend(self, o):
 
        self.data.extend(o)
 

	
 
    def __getitem__(self, i):
 
        return self.data[i]
 

	
 

	
 
class SubprocessIOChunker(object):
 
    """
 
    Processor class wrapping handling of subprocess IO.
 

	
 
    In a way, this is a "communicate()" replacement with a twist.
 

	
 
    - We are multithreaded. Writing in and reading out, err are all sep threads.
 
    - We support concurrent (in and out) stream processing.
 
    - The output is not a stream. It's a queue of read string (bytes, not unicode)
 
      chunks. The object behaves as an iterable. You can "for chunk in obj:" us.
 
    - We are non-blocking in more respects than communicate()
 
      (reading from subprocess out pauses when internal buffer is full, but
 
       does not block the parent calling code. On the flip side, reading from
 
       slow-yielding subprocess may block the iteration until data shows up. This
 
       does not block the parallel inpipe reading occurring parallel thread.)
 

	
 
    The purpose of the object is to allow us to wrap subprocess interactions into
 
    an iterable that can be passed to a WSGI server as the application's return
 
    value. Because of stream-processing-ability, WSGI does not have to read ALL
 
    of the subprocess's output and buffer it, before handing it to WSGI server for
 
    HTTP response. Instead, the class initializer reads just a bit of the stream
 
    to figure out if error occurred or likely to occur and if not, just hands the
 
    further iteration over subprocess output to the server for completion of HTTP
 
    response.
 

	
 
    The real or perceived subprocess error is trapped and raised as one of
 
    EnvironmentError family of exceptions
 

	
 
    Example usage:
 
    #    try:
 
    #        answer = SubprocessIOChunker(
 
    #            cmd,
 
    #            input,
 
    #            buffer_size = 65536,
 
    #            chunk_size = 4096
 
    #            )
 
    #    except (EnvironmentError) as e:
 
    #        print str(e)
 
    #        raise e
 
    #
 
    #    return answer
 

	
 

	
 
    """
 

	
 
    def __init__(self, cmd, inputstream=None, buffer_size=65536,
 
                 chunk_size=4096, starting_values=[], **kwargs):
 
        """
 
        Initializes SubprocessIOChunker
 

	
 
        :param cmd: A Subprocess.Popen style "cmd". Can be string or array of strings
 
        :param inputstream: (Default: None) A file-like, string, or file pointer.
 
        :param buffer_size: (Default: 65536) A size of total buffer per stream in bytes.
 
        :param chunk_size: (Default: 4096) A max size of a chunk. Actual chunk may be smaller.
 
        :param starting_values: (Default: []) An array of strings to put in front of output que.
 
        """
 

	
 
        if inputstream:
 
            input_streamer = StreamFeeder(inputstream)
 
            input_streamer.start()
 
            inputstream = input_streamer.output
 

	
 
        # Note: fragile cmd mangling has been removed for use in Kallithea
 
        assert isinstance(cmd, list), cmd
 

	
 
        _p = subprocess.Popen(cmd, bufsize=-1,
 
                              stdin=inputstream,
 
                              stdout=subprocess.PIPE,
 
                              stderr=subprocess.PIPE,
 
                              **kwargs)
 

	
 
        bg_out = BufferedGenerator(_p.stdout, buffer_size, chunk_size,
 
                                   starting_values)
 
        bg_err = BufferedGenerator(_p.stderr, 16000, 1, bottomless=True)
 

	
 
        while not bg_out.done_reading and not bg_out.reading_paused and not bg_err.length:
 
            # doing this until we reach either end of file, or end of buffer.
 
            bg_out.data_added_event.wait(1)
 
            bg_out.data_added_event.clear()
 

	
 
        # at this point it's still ambiguous if we are done reading or just full buffer.
 
        # Either way, if error (returned by ended process, or implied based on
 
        # presence of stuff in stderr output) we error out.
 
        # Else, we are happy.
 
        _returncode = _p.poll()
 
        if _returncode or (_returncode is None and bg_err.length):
 
            try:
 
                _p.terminate()
 
            except Exception:
 
                pass
 
            bg_out.stop()
 
            out = ''.join(bg_out)
 
            bg_err.stop()
 
            err = ''.join(bg_err)
 
            if (err.strip() == 'fatal: The remote end hung up unexpectedly' and
 
                out.startswith('0034shallow ')):
 
                # hack inspired by https://github.com/schacon/grack/pull/7
 
                bg_out = iter([out])
 
                _p = None
 
            elif err:
 
                raise EnvironmentError(
 
                    "Subprocess exited due to an error:\n" + err)
 
            else:
 
                raise EnvironmentError(
 
                    "Subprocess exited with non 0 ret code:%s" % _returncode)
 
        self.process = _p
 
        self.output = bg_out
 
        self.error = bg_err
 
        self.inputstream = inputstream
 

	
 
    def __iter__(self):
 
        return self
 

	
 
    def next(self):
 
        if self.process and self.process.poll():
 
            err = '%s' % ''.join(self.error)
 
            err = ''.join(self.error)
 
            raise EnvironmentError("Subprocess exited due to an error:\n" + err)
 
        return self.output.next()
 

	
 
    def throw(self, type, value=None, traceback=None):
 
        if self.output.length or not self.output.done_reading:
 
            raise type(value)
 

	
 
    def close(self):
 
        try:
 
            self.process.terminate()
 
        except:
 
            pass
 
        try:
 
            self.output.close()
 
        except:
 
            pass
 
        try:
 
            self.error.close()
 
        except:
 
            pass
 
        try:
 
            os.close(self.inputstream)
 
        except:
 
            pass
 

	
 
    def __del__(self):
 
        self.close()
kallithea/lib/vcs/utils/diffs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# original copyright: 2007-2008 by Armin Ronacher
 
# licensed under the BSD license.
 

	
 
import re
 
import difflib
 
import logging
 
import itertools
 

	
 
from difflib import unified_diff
 

	
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode, NodeError
 
from kallithea.lib.vcs.utils import safe_unicode
 

	
 

	
 
def get_udiff(filenode_old, filenode_new, show_whitespace=True):
 
    """
 
    Returns unified diff between given ``filenode_old`` and ``filenode_new``.
 
    """
 
    try:
 
        filenode_old_date = filenode_old.changeset.date
 
    except NodeError:
 
        filenode_old_date = None
 

	
 
    try:
 
        filenode_new_date = filenode_new.changeset.date
 
    except NodeError:
 
        filenode_new_date = None
 

	
 
    for filenode in (filenode_old, filenode_new):
 
        if not isinstance(filenode, FileNode):
 
            raise VCSError("Given object should be FileNode object, not %s"
 
                % filenode.__class__)
 

	
 
    if filenode_old_date and filenode_new_date:
 
        if not filenode_old_date < filenode_new_date:
 
            logging.debug("Generating udiff for filenodes with not increasing "
 
                "dates")
 

	
 
    vcs_udiff = unified_diff(filenode_old.content.splitlines(True),
 
                               filenode_new.content.splitlines(True),
 
                               filenode_old.name,
 
                               filenode_new.name,
 
                               filenode_old_date,
 
                               filenode_old_date)
 
    return vcs_udiff
 

	
 

	
 
def get_gitdiff(filenode_old, filenode_new, ignore_whitespace=True):
 
    """
 
    Returns git style diff between given ``filenode_old`` and ``filenode_new``.
 

	
 
    :param ignore_whitespace: ignore whitespaces in diff
 
    """
 

	
 
    for filenode in (filenode_old, filenode_new):
 
        if not isinstance(filenode, FileNode):
 
            raise VCSError("Given object should be FileNode object, not %s"
 
                % filenode.__class__)
 

	
 
    old_raw_id = getattr(filenode_old.changeset, 'raw_id', '0' * 40)
 
    new_raw_id = getattr(filenode_new.changeset, 'raw_id', '0' * 40)
 

	
 
    repo = filenode_new.changeset.repository
 
    vcs_gitdiff = repo.get_diff(old_raw_id, new_raw_id, filenode_new.path,
 
                                 ignore_whitespace)
 

	
 
    return vcs_gitdiff
 

	
 

	
 
class DiffProcessor(object):
 
    """
 
    Give it a unified diff and it returns a list of the files that were
 
    mentioned in the diff together with a dict of meta information that
 
    can be used to render it in a HTML template.
 
    """
 
    _chunk_re = re.compile(r'@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*)')
 

	
 
    def __init__(self, diff, differ='diff', format='udiff'):
 
        """
 
        :param diff:   a text in diff format or generator
 
        :param format: format of diff passed, `udiff` or `gitdiff`
 
        """
 
        if isinstance(diff, basestring):
 
            diff = [diff]
 

	
 
        self.__udiff = diff
 
        self.__format = format
 
        self.adds = 0
 
        self.removes = 0
 

	
 
        if isinstance(self.__udiff, basestring):
 
            self.lines = iter(self.__udiff.splitlines(1))
 

	
 
        elif self.__format == 'gitdiff':
 
            udiff_copy = self.copy_iterator()
 
            self.lines = itertools.imap(self.escaper,
 
                                        self._parse_gitdiff(udiff_copy))
 
        else:
 
            udiff_copy = self.copy_iterator()
 
            self.lines = itertools.imap(self.escaper, udiff_copy)
 

	
 
        # Select a differ.
 
        if differ == 'difflib':
 
            self.differ = self._highlight_line_difflib
 
        else:
 
            self.differ = self._highlight_line_udiff
 

	
 
    def escaper(self, string):
 
        return string.replace('<', '&lt;').replace('>', '&gt;')
 

	
 
    def copy_iterator(self):
 
        """
 
        make a fresh copy of generator, we should not iterate thru
 
        an original as it's needed for repeating operations on
 
        this instance of DiffProcessor
 
        """
 
        self.__udiff, iterator_copy = itertools.tee(self.__udiff)
 
        return iterator_copy
 

	
 
    def _extract_rev(self, line1, line2):
 
        """
 
        Extract the filename and revision hint from a line.
 
        """
 

	
 
        try:
 
            if line1.startswith('--- ') and line2.startswith('+++ '):
 
                l1 = line1[4:].split(None, 1)
 
                old_filename = l1[0].lstrip('a/') if len(l1) >= 1 else None
 
                old_rev = l1[1] if len(l1) == 2 else 'old'
 

	
 
                l2 = line2[4:].split(None, 1)
 
                new_filename = l2[0].lstrip('b/') if len(l1) >= 1 else None
 
                new_rev = l2[1] if len(l2) == 2 else 'new'
 

	
 
                filename = old_filename if (old_filename !=
 
                                            'dev/null') else new_filename
 

	
 
                return filename, new_rev, old_rev
 
        except (ValueError, IndexError):
 
            pass
 

	
 
        return None, None, None
 

	
 
    def _parse_gitdiff(self, diffiterator):
 
        def line_decoder(l):
 
            if l.startswith('+') and not l.startswith('+++'):
 
                self.adds += 1
 
            elif l.startswith('-') and not l.startswith('---'):
 
                self.removes += 1
 
            return safe_unicode(l)
 

	
 
        output = list(diffiterator)
 
        size = len(output)
 

	
 
        if size == 2:
 
            l = []
 
            l.extend([output[0]])
 
            l.extend(output[1].splitlines(1))
 
            return map(line_decoder, l)
 
        elif size == 1:
 
            return  map(line_decoder, output[0].splitlines(1))
 
        elif size == 0:
 
            return []
 

	
 
        raise Exception('wrong size of diff %s' % size)
 

	
 
    def _highlight_line_difflib(self, line, next):
 
        """
 
        Highlight inline changes in both lines.
 
        """
 

	
 
        if line['action'] == 'del':
 
            old, new = line, next
 
        else:
 
            old, new = next, line
 

	
 
        oldwords = re.split(r'(\W)', old['line'])
 
        newwords = re.split(r'(\W)', new['line'])
 

	
 
        sequence = difflib.SequenceMatcher(None, oldwords, newwords)
 

	
 
        oldfragments, newfragments = [], []
 
        for tag, i1, i2, j1, j2 in sequence.get_opcodes():
 
            oldfrag = ''.join(oldwords[i1:i2])
 
            newfrag = ''.join(newwords[j1:j2])
 
            if tag != 'equal':
 
                if oldfrag:
 
                    oldfrag = '<del>%s</del>' % oldfrag
 
                if newfrag:
 
                    newfrag = '<ins>%s</ins>' % newfrag
 
            oldfragments.append(oldfrag)
 
            newfragments.append(newfrag)
 

	
 
        old['line'] = "".join(oldfragments)
 
        new['line'] = "".join(newfragments)
 

	
 
    def _highlight_line_udiff(self, line, next):
 
        """
 
        Highlight inline changes in both lines.
 
        """
 
        start = 0
 
        limit = min(len(line['line']), len(next['line']))
 
        while start < limit and line['line'][start] == next['line'][start]:
 
            start += 1
 
        end = -1
 
        limit -= start
 
        while -end <= limit and line['line'][end] == next['line'][end]:
 
            end -= 1
 
        end += 1
 
        if start or end:
 
            def do(l):
 
                last = end + len(l['line'])
 
                if l['action'] == 'add':
 
                    tag = 'ins'
 
                else:
 
                    tag = 'del'
 
                l['line'] = '%s<%s>%s</%s>%s' % (
 
                    l['line'][:start],
 
                    tag,
 
                    l['line'][start:last],
 
                    tag,
 
                    l['line'][last:]
 
                )
 
            do(line)
 
            do(next)
 

	
 
    def _parse_udiff(self):
 
        """
 
        Parse the diff an return data for the template.
 
        """
 
        lineiter = self.lines
 
        files = []
 
        try:
 
            line = lineiter.next()
 
            # skip first context
 
            skipfirst = True
 
            while 1:
 
                # continue until we found the old file
 
                if not line.startswith('--- '):
 
                    line = lineiter.next()
 
                    continue
 

	
 
                chunks = []
 
                filename, old_rev, new_rev = \
 
                    self._extract_rev(line, lineiter.next())
 
                files.append({
 
                    'filename':         filename,
 
                    'old_revision':     old_rev,
 
                    'new_revision':     new_rev,
 
                    'chunks':           chunks
 
                })
 

	
 
                line = lineiter.next()
 
                while line:
 
                    match = self._chunk_re.match(line)
 
                    if not match:
 
                        break
 

	
 
                    lines = []
 
                    chunks.append(lines)
 

	
 
                    old_line, old_end, new_line, new_end = \
 
                        [int(x or 1) for x in match.groups()[:-1]]
 
                    old_line -= 1
 
                    new_line -= 1
 
                    context = len(match.groups()) == 5
 
                    old_end += old_line
 
                    new_end += new_line
 

	
 
                    if context:
 
                        if not skipfirst:
 
                            lines.append({
 
                                'old_lineno': '...',
 
                                'new_lineno': '...',
 
                                'action': 'context',
 
                                'line': line,
 
                            })
 
                        else:
 
                            skipfirst = False
 

	
 
                    line = lineiter.next()
 
                    while old_line < old_end or new_line < new_end:
 
                        if line:
 
                            command, line = line[0], line[1:]
 
                        else:
 
                            command = ' '
 
                        affects_old = affects_new = False
 

	
 
                        # ignore those if we don't expect them
 
                        if command in '#@':
 
                            continue
 
                        elif command == '+':
 
                            affects_new = True
 
                            action = 'add'
 
                        elif command == '-':
 
                            affects_old = True
 
                            action = 'del'
 
                        else:
 
                            affects_old = affects_new = True
 
                            action = 'unmod'
 

	
 
                        old_line += affects_old
 
                        new_line += affects_new
 
                        lines.append({
 
                            'old_lineno':   affects_old and old_line or '',
 
                            'new_lineno':   affects_new and new_line or '',
 
                            'action':       action,
 
                            'line':         line
 
                        })
 
                        line = lineiter.next()
 

	
 
        except StopIteration:
 
            pass
 

	
 
        # highlight inline changes
 
        for file in files:
 
            for chunk in chunks:
 
                lineiter = iter(chunk)
 
                #first = True
 
                try:
 
                    while 1:
 
                        line = lineiter.next()
 
                        if line['action'] != 'unmod':
 
                            nextline = lineiter.next()
 
                            if nextline['action'] == 'unmod' or \
 
                               nextline['action'] == line['action']:
 
                                continue
 
                            self.differ(line, nextline)
 
                except StopIteration:
 
                    pass
 

	
 
        return files
 

	
 
    def prepare(self):
 
        """
 
        Prepare the passed udiff for HTML rendering. It'l return a list
 
        of dicts
 
        """
 
        return self._parse_udiff()
 

	
 
    def _safe_id(self, idstring):
 
        """Make a string safe for including in an id attribute.
 

	
 
        The HTML spec says that id attributes 'must begin with
 
        a letter ([A-Za-z]) and may be followed by any number
 
        of letters, digits ([0-9]), hyphens ("-"), underscores
 
        ("_"), colons (":"), and periods (".")'. These regexps
 
        are slightly over-zealous, in that they remove colons
 
        and periods unnecessarily.
 

	
 
        Whitespace is transformed into underscores, and then
 
        anything which is not a hyphen or a character that
 
        matches \w (alphanumerics and underscore) is removed.
 

	
 
        """
 
        # Transform all whitespace to underscore
 
        idstring = re.sub(r'\s', "_", '%s' % idstring)
 
        idstring = re.sub(r'\s', "_", idstring)
 
        # Remove everything that is not a hyphen or a member of \w
 
        idstring = re.sub(r'(?!-)\W', "", idstring).lower()
 
        return idstring
 

	
 
    def raw_diff(self):
 
        """
 
        Returns raw string as udiff
 
        """
 
        udiff_copy = self.copy_iterator()
 
        if self.__format == 'gitdiff':
 
            udiff_copy = self._parse_gitdiff(udiff_copy)
 
        return u''.join(udiff_copy)
 

	
 
    def as_html(self, table_class='code-difftable', line_class='line',
 
                new_lineno_class='lineno old', old_lineno_class='lineno new',
 
                code_class='code'):
 
        """
 
        Return udiff as html table with customized css classes
 
        """
 
        def _link_to_if(condition, label, url):
 
            """
 
            Generates a link if condition is meet or just the label if not.
 
            """
 

	
 
            if condition:
 
                return '''<a href="%(url)s">%(label)s</a>''' % {'url': url,
 
                                                                'label': label}
 
            else:
 
                return label
 
        diff_lines = self.prepare()
 
        _html_empty = True
 
        _html = []
 
        _html.append('''<table class="%(table_class)s">\n''' \
 
                                            % {'table_class': table_class})
 
        for diff in diff_lines:
 
            for line in diff['chunks']:
 
                _html_empty = False
 
                for change in line:
 
                    _html.append('''<tr class="%(line_class)s %(action)s">\n''' \
 
                        % {'line_class': line_class,
 
                           'action': change['action']})
 
                    anchor_old_id = ''
 
                    anchor_new_id = ''
 
                    anchor_old = "%(filename)s_o%(oldline_no)s" % \
 
                                {'filename': self._safe_id(diff['filename']),
 
                                 'oldline_no': change['old_lineno']}
 
                    anchor_new = "%(filename)s_n%(oldline_no)s" % \
 
                                {'filename': self._safe_id(diff['filename']),
 
                                 'oldline_no': change['new_lineno']}
 
                    cond_old = change['old_lineno'] != '...' and \
 
                                                        change['old_lineno']
 
                    cond_new = change['new_lineno'] != '...' and \
 
                                                        change['new_lineno']
 
                    if cond_old:
 
                        anchor_old_id = 'id="%s"' % anchor_old
 
                    if cond_new:
 
                        anchor_new_id = 'id="%s"' % anchor_new
 
                    ###########################################################
 
                    # OLD LINE NUMBER
 
                    ###########################################################
 
                    _html.append('''\t<td %(a_id)s class="%(old_lineno_cls)s">''' \
 
                                    % {'a_id': anchor_old_id,
 
                                       'old_lineno_cls': old_lineno_class})
 

	
 
                    _html.append('''<pre>%(link)s</pre>''' \
 
                        % {'link':
 
                        _link_to_if(cond_old, change['old_lineno'], '#%s' \
 
                                                                % anchor_old)})
 
                    _html.append('''</td>\n''')
 
                    ###########################################################
 
                    # NEW LINE NUMBER
 
                    ###########################################################
 

	
 
                    _html.append('''\t<td %(a_id)s class="%(new_lineno_cls)s">''' \
 
                                    % {'a_id': anchor_new_id,
 
                                       'new_lineno_cls': new_lineno_class})
 

	
 
                    _html.append('''<pre>%(link)s</pre>''' \
 
                        % {'link':
 
                        _link_to_if(cond_new, change['new_lineno'], '#%s' \
 
                                                                % anchor_new)})
 
                    _html.append('''</td>\n''')
 
                    ###########################################################
 
                    # CODE
 
                    ###########################################################
 
                    _html.append('''\t<td class="%(code_class)s">''' \
 
                                                % {'code_class': code_class})
 
                    _html.append('''\n\t\t<pre>%(code)s</pre>\n''' \
 
                                                % {'code': change['line']})
 
                    _html.append('''\t</td>''')
 
                    _html.append('''\n</tr>\n''')
 
        _html.append('''</table>''')
 
        if _html_empty:
 
            return None
 
        return ''.join(_html)
 

	
 
    def stat(self):
 
        """
 
        Returns tuple of adde,and removed lines for this instance
 
        """
 
        return self.adds, self.removes
kallithea/model/comment.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.comment
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
comments model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 11, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 

	
 
from pylons.i18n.translation import _
 
from collections import defaultdict
 

	
 
from kallithea.lib.utils2 import extract_mentioned_users, safe_unicode
 
from kallithea.lib import helpers as h
 
from kallithea.model import BaseModel
 
from kallithea.model.db import ChangesetComment, User, \
 
    Notification, PullRequest
 
from kallithea.model.notification import NotificationModel
 
from kallithea.model.meta import Session
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class ChangesetCommentsModel(BaseModel):
 

	
 
    cls = ChangesetComment
 

	
 
    def __get_changeset_comment(self, changeset_comment):
 
        return self._get_instance(ChangesetComment, changeset_comment)
 

	
 
    def __get_pull_request(self, pull_request):
 
        return self._get_instance(PullRequest, pull_request)
 

	
 
    def _extract_mentions(self, s):
 
        user_objects = []
 
        for username in extract_mentioned_users(s):
 
            user_obj = User.get_by_username(username, case_insensitive=True)
 
            if user_obj:
 
                user_objects.append(user_obj)
 
        return user_objects
 

	
 
    def _get_notification_data(self, repo, comment, user, comment_text,
 
                               line_no=None, revision=None, pull_request=None,
 
                               status_change=None, closing_pr=False):
 
        """
 
        :returns: tuple (subj,body,recipients,notification_type,email_kwargs)
 
        """
 
        # make notification
 
        body = comment_text  # text of the comment
 
        line = ''
 
        if line_no:
 
            line = _('on line %s') % line_no
 

	
 
        #changeset
 
        if revision:
 
            notification_type = Notification.TYPE_CHANGESET_COMMENT
 
            cs = repo.scm_instance.get_changeset(revision)
 
            desc = "%s" % (cs.short_id)
 
            desc = cs.short_id
 

	
 
            threading = ['%s-rev-%s@%s' % (repo.repo_name, revision, h.canonical_hostname())]
 
            if line_no: # TODO: url to file _and_ line number
 
                threading.append('%s-rev-%s-line-%s@%s' % (repo.repo_name, revision, line_no,
 
                                                           h.canonical_hostname()))
 
            comment_url = h.canonical_url('changeset_home',
 
                repo_name=repo.repo_name,
 
                revision=revision,
 
                anchor='comment-%s' % comment.comment_id)
 
            subj = safe_unicode(
 
                h.link_to('Re changeset: %(desc)s %(line)s' % \
 
                          {'desc': desc, 'line': line},
 
                          comment_url)
 
            )
 
            # get the current participants of this changeset
 
            recipients = ChangesetComment.get_users(revision=revision)
 
            # add changeset author if it's in kallithea system
 
            cs_author = User.get_from_cs_author(cs.author)
 
            if not cs_author:
 
                #use repo owner if we cannot extract the author correctly
 
                cs_author = repo.user
 
            recipients += [cs_author]
 
            email_kwargs = {
 
                'status_change': status_change,
 
                'cs_comment_user': h.person(user, 'full_name_and_username'),
 
                'cs_target_repo': h.canonical_url('summary_home', repo_name=repo.repo_name),
 
                'cs_comment_url': comment_url,
 
                'raw_id': revision,
 
                'message': cs.message,
 
                'repo_name': repo.repo_name,
 
                'short_id': h.short_id(revision),
 
                'branch': cs.branch,
 
                'comment_username': user.username,
 
                'threading': threading,
 
            }
 
        #pull request
 
        elif pull_request:
 
            notification_type = Notification.TYPE_PULL_REQUEST_COMMENT
 
            desc = comment.pull_request.title
 
            _org_ref_type, org_ref_name, _org_rev = comment.pull_request.org_ref.split(':')
 
            threading = ['%s-pr-%s@%s' % (pull_request.other_repo.repo_name,
 
                                          pull_request.pull_request_id,
 
                                          h.canonical_hostname())]
 
            if line_no: # TODO: url to file _and_ line number
 
                threading.append('%s-pr-%s-line-%s@%s' % (pull_request.other_repo.repo_name,
 
                                                          pull_request.pull_request_id, line_no,
 
                                                          h.canonical_hostname()))
 
            comment_url = pull_request.url(canonical=True,
 
                anchor='comment-%s' % comment.comment_id)
 
            subj = safe_unicode(
 
                h.link_to('Re pull request %(pr_nice_id)s: %(desc)s %(line)s' % \
 
                          {'desc': desc,
 
                           'pr_nice_id': comment.pull_request.nice_id(),
 
                           'line': line},
 
                          comment_url)
 
            )
 
            # get the current participants of this pull request
 
            recipients = ChangesetComment.get_users(pull_request_id=
 
                                                pull_request.pull_request_id)
 
            # add pull request author
 
            recipients += [pull_request.author]
 

	
 
            # add the reviewers to notification
 
            recipients += [x.user for x in pull_request.reviewers]
 

	
 
            #set some variables for email notification
 
            email_kwargs = {
 
                'pr_title': pull_request.title,
 
                'pr_nice_id': pull_request.nice_id(),
 
                'status_change': status_change,
 
                'closing_pr': closing_pr,
 
                'pr_comment_url': comment_url,
 
                'pr_comment_user': h.person(user, 'full_name_and_username'),
 
                'pr_target_repo': h.canonical_url('summary_home',
 
                                   repo_name=pull_request.other_repo.repo_name),
 
                'repo_name': pull_request.other_repo.repo_name,
 
                'ref': org_ref_name,
 
                'comment_username': user.username,
 
                'threading': threading,
 
            }
 

	
 
        return subj, body, recipients, notification_type, email_kwargs
 

	
 
    def create(self, text, repo, user, revision=None, pull_request=None,
 
               f_path=None, line_no=None, status_change=None, closing_pr=False,
 
               send_email=True):
 
        """
 
        Creates a new comment for either a changeset or a pull request.
 
        status_change and closing_pr is only for the optional email.
 

	
 
        Returns the created comment.
 
        """
 
        if not status_change and not text:
 
            log.warning('Missing text for comment, skipping...')
 
            return None
 

	
 
        repo = self._get_repo(repo)
 
        user = self._get_user(user)
 
        comment = ChangesetComment()
 
        comment.repo = repo
 
        comment.author = user
 
        comment.text = text
 
        comment.f_path = f_path
 
        comment.line_no = line_no
 

	
 
        if revision:
 
            comment.revision = revision
 
        elif pull_request:
 
            pull_request = self.__get_pull_request(pull_request)
 
            comment.pull_request = pull_request
 
        else:
 
            raise Exception('Please specify revision or pull_request_id')
 

	
 
        Session().add(comment)
 
        Session().flush()
 

	
 
        if send_email:
 
            (subj, body, recipients, notification_type,
 
             email_kwargs) = self._get_notification_data(
 
                                repo, comment, user,
 
                                comment_text=text,
 
                                line_no=line_no,
 
                                revision=revision,
 
                                pull_request=pull_request,
 
                                status_change=status_change,
 
                                closing_pr=closing_pr)
 
            email_kwargs['is_mention'] = False
 
            # create notification objects, and emails
 
            NotificationModel().create(
 
                created_by=user, subject=subj, body=body,
 
                recipients=recipients, type_=notification_type,
 
                email_kwargs=email_kwargs,
 
            )
 

	
 
            mention_recipients = set(self._extract_mentions(body))\
 
                                    .difference(recipients)
 
            if mention_recipients:
 
                email_kwargs['is_mention'] = True
 
                subj = _('[Mention]') + ' ' + subj
 
                NotificationModel().create(
 
                    created_by=user, subject=subj, body=body,
 
                    recipients=mention_recipients,
 
                    type_=notification_type,
 
                    email_kwargs=email_kwargs
 
                )
 

	
 
        return comment
 

	
 
    def delete(self, comment):
 
        comment = self.__get_changeset_comment(comment)
 
        Session().delete(comment)
 

	
 
        return comment
 

	
 
    def get_comments(self, repo_id, revision=None, pull_request=None):
 
        """
 
        Gets general comments for either revision or pull_request.
 

	
 
        Returns a list, ordered by creation date.
 
        """
 
        return self._get_comments(repo_id, revision=revision, pull_request=pull_request,
 
                                  inline=False)
 

	
 
    def get_inline_comments(self, repo_id, revision=None, pull_request=None):
 
        """
 
        Gets inline comments for either revision or pull_request.
 

	
 
        Returns a list of tuples with file path and list of comments per line number.
 
        """
 
        comments = self._get_comments(repo_id, revision=revision, pull_request=pull_request,
 
                                      inline=True)
 

	
 
        paths = defaultdict(lambda: defaultdict(list))
 
        for co in comments:
 
            paths[co.f_path][co.line_no].append(co)
 
        return paths.items()
 

	
 
    def _get_comments(self, repo_id, revision=None, pull_request=None, inline=False):
 
        """
 
        Gets comments for either revision or pull_request_id, either inline or general.
 
        """
 
        q = Session().query(ChangesetComment)\
 
            .filter(ChangesetComment.repo_id == repo_id)
 

	
 
        if inline:
 
            q = q.filter(ChangesetComment.line_no != None)\
 
                .filter(ChangesetComment.f_path != None)
 
        else:
 
            q = q.filter(ChangesetComment.line_no == None)\
 
                .filter(ChangesetComment.f_path == None)
 

	
 
        if revision:
 
            q = q.filter(ChangesetComment.revision == revision)
 
        elif pull_request:
 
            pull_request = self.__get_pull_request(pull_request)
 
            q = q.filter(ChangesetComment.pull_request == pull_request)
 
        else:
 
            raise Exception('Please specify either revision or pull_request')
 

	
 
        return q.order_by(ChangesetComment.created_on).all()
kallithea/tests/functional/test_my_account.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from kallithea.model.db import User, UserFollowing, Repository, UserApiKeys
 
from kallithea.tests import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.lib import helpers as h
 
from kallithea.model.user import UserModel
 
from kallithea.model.meta import Session
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestMyAccountController(TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            Session().commit()
 

	
 
    def test_my_account(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account'))
 

	
 
        response.mustcontain('value="%s' % TEST_USER_ADMIN_LOGIN)
 

	
 
    def test_my_account_my_repos(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_repos'))
 
        cnt = Repository.query().filter(Repository.user ==
 
                           User.get_by_username(TEST_USER_ADMIN_LOGIN)).count()
 
        response.mustcontain('"totalRecords": %s' % cnt)
 

	
 
    def test_my_account_my_watched(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_watched'))
 

	
 
        cnt = UserFollowing.query().filter(UserFollowing.user ==
 
                            User.get_by_username(TEST_USER_ADMIN_LOGIN)).count()
 
        response.mustcontain('"totalRecords": %s' % cnt)
 

	
 
    def test_my_account_my_emails(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 
    def test_my_account_my_emails_add_existing_email(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'new_email': TEST_USER_REGULAR_EMAIL, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'This e-mail address is already in use')
 

	
 
    def test_my_account_my_emails_add_mising_email_in_form(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 
        response = self.app.post(url('my_account_emails'),)
 
        self.checkSessionFlash(response, 'Please enter an email address')
 

	
 
    def test_my_account_my_emails_add_remove(self):
 
        self.log_user()
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'new_email': 'foo@barz.com', '_authentication_token': self.authentication_token()})
 

	
 
        response = self.app.get(url('my_account_emails'))
 

	
 
        from kallithea.model.db import UserEmailMap
 
        email_id = UserEmailMap.query()\
 
            .filter(UserEmailMap.user == User.get_by_username(TEST_USER_ADMIN_LOGIN))\
 
            .filter(UserEmailMap.email == 'foo@barz.com').one().email_id
 

	
 
        response.mustcontain('foo@barz.com')
 
        response.mustcontain('<input id="del_email_id" name="del_email_id" type="hidden" value="%s" />' % email_id)
 

	
 
        response = self.app.post(url('my_account_emails'),
 
                                 {'del_email_id': email_id, '_method': 'delete', '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'Removed email from user')
 
        response = self.app.get(url('my_account_emails'))
 
        response.mustcontain('No additional emails specified')
 

	
 

	
 
    @parameterized.expand(
 
        [('firstname', {'firstname': 'new_username'}),
 
         ('lastname', {'lastname': 'new_username'}),
 
         ('admin', {'admin': True}),
 
         ('admin', {'admin': False}),
 
         ('extern_type', {'extern_type': 'ldap'}),
 
         ('extern_type', {'extern_type': None}),
 
         #('extern_name', {'extern_name': 'test'}),
 
         #('extern_name', {'extern_name': None}),
 
         ('active', {'active': False}),
 
         ('active', {'active': True}),
 
         ('email', {'email': 'some@email.com'}),
 
        # ('new_password', {'new_password': 'foobar123',
 
        #                   'password_confirmation': 'foobar123'})
 
        ])
 
    def test_my_account_update(self, name, attrs):
 
        usr = fixture.create_user(self.test_user_1, password='qweqwe',
 
                                  email='testme@example.com',
 
                                  extern_type='internal',
 
                                  extern_name=self.test_user_1,
 
                                  skip_if_exists=True)
 
        params = usr.get_api_data(True)  # current user data
 
        user_id = usr.user_id
 
        self.log_user(username=self.test_user_1, password='qweqwe')
 

	
 
        params.update({'password_confirmation': ''})
 
        params.update({'new_password': ''})
 
        params.update({'extern_type': 'internal'})
 
        params.update({'extern_name': self.test_user_1})
 
        params.update({'_authentication_token': self.authentication_token()})
 

	
 
        params.update(attrs)
 
        response = self.app.post(url('my_account'), params)
 

	
 
        self.checkSessionFlash(response,
 
                               'Your account was updated successfully')
 

	
 
        updated_user = User.get_by_username(self.test_user_1)
 
        updated_params = updated_user.get_api_data(True)
 
        updated_params.update({'password_confirmation': ''})
 
        updated_params.update({'new_password': ''})
 

	
 
        params['last_login'] = updated_params['last_login']
 
        if name == 'email':
 
            params['emails'] = [attrs['email']]
 
        if name == 'extern_type':
 
            #cannot update this via form, expected value is original one
 
            params['extern_type'] = "internal"
 
        if name == 'extern_name':
 
            #cannot update this via form, expected value is original one
 
            params['extern_name'] = str(user_id)
 
        if name == 'active':
 
            #my account cannot deactivate account
 
            params['active'] = True
 
        if name == 'admin':
 
            #my account cannot make you an admin !
 
            params['admin'] = False
 

	
 
        params.pop('_authentication_token')
 
        self.assertEqual(params, updated_params)
 

	
 
    def test_my_account_update_err_email_exists(self):
 
        self.log_user()
 

	
 
        new_email = TEST_USER_REGULAR_EMAIL  # already existing email
 
        response = self.app.post(url('my_account'),
 
                                params=dict(
 
                                    username=TEST_USER_ADMIN_LOGIN,
 
                                    new_password=TEST_USER_ADMIN_PASS,
 
                                    password_confirmation='test122',
 
                                    firstname='NewName',
 
                                    lastname='NewLastname',
 
                                    email=new_email,
 
                                    _authentication_token=self.authentication_token())
 
                                )
 

	
 
        response.mustcontain('This e-mail address is already in use')
 

	
 
    def test_my_account_update_err(self):
 
        self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 

	
 
        new_email = 'newmail.pl'
 
        response = self.app.post(url('my_account'),
 
                                 params=dict(
 
                                            username=TEST_USER_ADMIN_LOGIN,
 
                                            new_password=TEST_USER_ADMIN_PASS,
 
                                            password_confirmation='test122',
 
                                            firstname='NewName',
 
                                            lastname='NewLastname',
 
                                            email=new_email,
 
                                            _authentication_token=self.authentication_token()))
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        from kallithea.model import validators
 
        msg = validators.ValidUsername(edit=False, old_data={})\
 
                ._messages['username_exists']
 
        msg = h.html_escape(msg % {'username': TEST_USER_ADMIN_LOGIN})
 
        response.mustcontain(u"%s" % msg)
 
        response.mustcontain(msg)
 

	
 
    def test_my_account_api_keys(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.get(url('my_account_api_keys'))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @parameterized.expand([
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_my_account_add_api_keys(self, desc, lifetime):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        try:
 
            response = response.follow()
 
            user = User.get(usr['user_id'])
 
            for api_key in user.api_keys:
 
                response.mustcontain(api_key)
 
        finally:
 
            for api_key in UserApiKeys.query().all():
 
                Session().delete(api_key)
 
                Session().commit()
 

	
 
    def test_my_account_remove_api_key(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        response = self.app.post(url('my_account_api_keys'),
 
                                 {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
 
        response = response.follow()
 

	
 
        #now delete our key
 
        keys = UserApiKeys.query().all()
 
        self.assertEqual(1, len(keys))
 

	
 
        response = self.app.post(url('my_account_api_keys'),
 
                 {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully deleted')
 
        keys = UserApiKeys.query().all()
 
        self.assertEqual(0, len(keys))
 

	
 

	
 
    def test_my_account_reset_main_api_key(self):
 
        usr = self.log_user(TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS)
 
        user = User.get(usr['user_id'])
 
        api_key = user.api_key
 
        response = self.app.get(url('my_account_api_keys'))
 
        response.mustcontain(api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
        response = self.app.post(url('my_account_api_keys'),
 
                 {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
 
        self.checkSessionFlash(response, 'API key successfully reset')
 
        response = response.follow()
 
        response.mustcontain(no=[api_key])
0 comments (0 inline, 0 general)