Changeset - 7d5e50517430
[Not reviewed]
default
0 2 0
Mads Kiilerich (mads) - 5 years ago 2021-02-18 12:23:19
mads@kiilerich.com
diffs: change name of high level function name for generating HTML diff to html_diff
2 files changed with 3 insertions and 3 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/files.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.files
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Files controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import os
 
import posixpath
 
import shutil
 
import tempfile
 
import traceback
 
from collections import OrderedDict
 

	
 
from tg import request, response
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound, HTTPNotFound
 

	
 
import kallithea
 
import kallithea.lib.helpers as h
 
from kallithea.controllers import base
 
from kallithea.lib import diffs, webutils
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.exceptions import NonRelativePathError
 
from kallithea.lib.utils2 import asbool, convert_line_endings, detect_mode, safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (ChangesetDoesNotExistError, ChangesetError, EmptyRepositoryError, ImproperArchiveTypeError, NodeAlreadyExistsError,
 
                                          NodeDoesNotExistError, NodeError, RepositoryError, VCSError)
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils import author_email
 
from kallithea.lib.webutils import url
 
from kallithea.model import userlog
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FilesController(base.BaseRepoController):
 

	
 
    def _before(self, *args, **kwargs):
 
        super(FilesController, self)._before(*args, **kwargs)
 

	
 
    def __get_cs(self, rev, silent_empty=False):
 
        """
 
        Safe way to get changeset if error occur it redirects to tip with
 
        proper message
 

	
 
        :param rev: revision to fetch
 
        :silent_empty: return None if repository is empty
 
        """
 

	
 
        try:
 
            return c.db_repo_scm_instance.get_changeset(rev)
 
        except EmptyRepositoryError as e:
 
            if silent_empty:
 
                return None
 
            url_ = url('files_add_home',
 
                       repo_name=c.repo_name,
 
                       revision=0, f_path='', anchor='edit')
 
            add_new = webutils.link_to(_('Click here to add new file'), url_, class_="alert-link")
 
            webutils.flash(_('There are no files yet.') + ' ' + add_new, category='warning')
 
            raise HTTPNotFound()
 
        except (ChangesetDoesNotExistError, LookupError):
 
            msg = _('Such revision does not exist for this repository')
 
            webutils.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            webutils.flash(e, category='error')
 
            raise HTTPNotFound()
 

	
 
    def __get_filenode(self, cs, path):
 
        """
 
        Returns file_node or raise HTTP error.
 

	
 
        :param cs: given changeset
 
        :param path: path to lookup
 
        """
 

	
 
        try:
 
            file_node = cs.get_node(path)
 
            if file_node.is_dir():
 
                raise RepositoryError('given path is a directory')
 
        except ChangesetDoesNotExistError:
 
            msg = _('Such revision does not exist for this repository')
 
            webutils.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            webutils.flash(e, category='error')
 
            raise HTTPNotFound()
 

	
 
        return file_node
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self, repo_name, revision, f_path, annotate=False):
 
        # redirect to given revision from form if given
 
        post_revision = request.POST.get('at_rev', None)
 
        if post_revision:
 
            cs = self.__get_cs(post_revision) # FIXME - unused!
 

	
 
        c.revision = revision
 
        c.changeset = self.__get_cs(revision)
 
        c.branch = request.GET.get('branch', None)
 
        c.f_path = f_path
 
        c.annotate = annotate
 
        cur_rev = c.changeset.revision
 
        # used in files_source.html:
 
        c.cut_off_limit = self.cut_off_limit
 
        c.fulldiff = request.GET.get('fulldiff')
 

	
 
        # prev link
 
        try:
 
            prev_rev = c.db_repo_scm_instance.get_changeset(cur_rev).prev(c.branch)
 
            c.url_prev = url('files_home', repo_name=c.repo_name,
 
                         revision=prev_rev.raw_id, f_path=f_path)
 
            if c.branch:
 
                c.url_prev += '?branch=%s' % c.branch
 
        except (ChangesetDoesNotExistError, VCSError):
 
            c.url_prev = '#'
 

	
 
        # next link
 
        try:
 
            next_rev = c.db_repo_scm_instance.get_changeset(cur_rev).next(c.branch)
 
            c.url_next = url('files_home', repo_name=c.repo_name,
 
                     revision=next_rev.raw_id, f_path=f_path)
 
            if c.branch:
 
                c.url_next += '?branch=%s' % c.branch
 
        except (ChangesetDoesNotExistError, VCSError):
 
            c.url_next = '#'
 

	
 
        # files or dirs
 
        try:
 
            c.file = c.changeset.get_node(f_path)
 

	
 
            if c.file.is_submodule():
 
                raise HTTPFound(location=c.file.url)
 
            elif c.file.is_file():
 
                c.load_full_history = False
 
                # determine if we're on branch head
 
                _branches = c.db_repo_scm_instance.branches
 
                c.on_branch_head = revision in _branches or revision in _branches.values()
 
                _hist = []
 
                c.file_history = []
 
                if c.load_full_history:
 
                    c.file_history, _hist = self._get_node_history(c.changeset, f_path)
 

	
 
                c.authors = []
 
                for a in set([x.author for x in _hist]):
 
                    c.authors.append((author_email(a), h.person(a)))
 
            else:
 
                c.authors = c.file_history = []
 
        except RepositoryError as e:
 
            webutils.flash(e, category='error')
 
            raise HTTPNotFound()
 

	
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            return base.render('files/files_ypjax.html')
 

	
 
        # TODO: tags and bookmarks?
 
        c.revision_options = [(c.changeset.raw_id,
 
                              _('%s at %s') % (b, c.changeset.short_id)) for b in c.changeset.branches] + \
 
            [(n, b) for b, n in c.db_repo_scm_instance.branches.items()]
 
        if c.db_repo_scm_instance.closed_branches:
 
            prefix = _('(closed)') + ' '
 
            c.revision_options += [('-', '-')] + \
 
                [(n, prefix + b) for b, n in c.db_repo_scm_instance.closed_branches.items()]
 

	
 
        return base.render('files/files.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    @base.jsonify
 
    def history(self, repo_name, revision, f_path):
 
        changeset = self.__get_cs(revision)
 
        _file = changeset.get_node(f_path)
 
        if _file.is_file():
 
            file_history, _hist = self._get_node_history(changeset, f_path)
 

	
 
            res = []
 
            for obj in file_history:
 
                res.append({
 
                    'text': obj[1],
 
                    'children': [{'id': o[0], 'text': o[1]} for o in obj[0]]
 
                })
 

	
 
            data = {
 
                'more': False,
 
                'results': res
 
            }
 
            return data
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def authors(self, repo_name, revision, f_path):
 
        changeset = self.__get_cs(revision)
 
        _file = changeset.get_node(f_path)
 
        if _file.is_file():
 
            file_history, _hist = self._get_node_history(changeset, f_path)
 
            c.authors = []
 
            for a in set([x.author for x in _hist]):
 
                c.authors.append((author_email(a), h.person(a)))
 
            return base.render('files/files_history_box.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def rawfile(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        response.content_disposition = \
 
            'attachment; filename=%s' % f_path.split(kallithea.URL_SEP)[-1]
 

	
 
        response.content_type = file_node.mimetype
 
        return file_node.content
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def raw(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        raw_mimetype_mapping = {
 
            # map original mimetype to a mimetype used for "show as raw"
 
            # you can also provide a content-disposition to override the
 
            # default "attachment" disposition.
 
            # orig_type: (new_type, new_dispo)
 

	
 
            # show images inline:
 
            'image/x-icon': ('image/x-icon', 'inline'),
 
            'image/png': ('image/png', 'inline'),
 
            'image/gif': ('image/gif', 'inline'),
 
            'image/jpeg': ('image/jpeg', 'inline'),
 
            'image/svg+xml': ('image/svg+xml', 'inline'),
 
        }
 

	
 
        mimetype = file_node.mimetype
 
        try:
 
            mimetype, dispo = raw_mimetype_mapping[mimetype]
 
        except KeyError:
 
            # we don't know anything special about this, handle it safely
 
            if file_node.is_binary:
 
                # do same as download raw for binary files
 
                mimetype, dispo = 'application/octet-stream', 'attachment'
 
            else:
 
                # do not just use the original mimetype, but force text/plain,
 
                # otherwise it would serve text/html and that might be unsafe.
 
                # Note: underlying vcs library fakes text/plain mimetype if the
 
                # mimetype can not be determined and it thinks it is not
 
                # binary.This might lead to erroneous text display in some
 
                # cases, but helps in other cases, like with text files
 
                # without extension.
 
                mimetype, dispo = 'text/plain', 'inline'
 

	
 
        if dispo == 'attachment':
 
            dispo = 'attachment; filename=%s' % f_path.split(os.sep)[-1]
 

	
 
        response.content_disposition = dispo
 
        response.content_type = mimetype
 
        return file_node.content
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('write')
 
    def delete(self, repo_name, revision, f_path):
 
        repo = c.db_repo
 
        # check if revision is a branch identifier- basically we cannot
 
        # create multiple heads via file editing
 
        _branches = repo.scm_instance.branches
 
        # check if revision is a branch name or branch hash
 
        if revision not in _branches and revision not in _branches.values():
 
            webutils.flash(_('You can only delete files with revision '
 
                      'being a valid branch'), category='warning')
 
            raise HTTPFound(location=webutils.url('files_home',
 
                                  repo_name=repo_name, revision='tip',
 
                                  f_path=f_path))
 

	
 
        r_post = request.POST
 

	
 
        c.cs = self.__get_cs(revision)
 
        c.file = self.__get_filenode(c.cs, f_path)
 

	
 
        c.default_message = _('Deleted file %s via Kallithea') % (f_path)
 
        c.f_path = f_path
 
        node_path = f_path
 
        author = request.authuser.full_contact
 

	
 
        if r_post:
 
            message = r_post.get('message') or c.default_message
 

	
 
            try:
 
                nodes = {
 
                    node_path: {
 
                        'content': ''
 
                    }
 
                }
 
                self.scm_model.delete_nodes(
 
                    user=request.authuser.user_id,
 
                    ip_addr=request.ip_addr,
 
                    repo=c.db_repo,
 
                    message=message,
 
                    nodes=nodes,
 
                    parent_cs=c.cs,
 
                    author=author,
 
                )
 

	
 
                webutils.flash(_('Successfully deleted file %s') % f_path,
 
                        category='success')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                webutils.flash(_('Error occurred during commit'), category='error')
 
            raise HTTPFound(location=url('changeset_home',
 
                                repo_name=c.repo_name, revision='tip'))
 

	
 
        return base.render('files/files_delete.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('write')
 
    def edit(self, repo_name, revision, f_path):
 
        repo = c.db_repo
 
        # check if revision is a branch identifier- basically we cannot
 
        # create multiple heads via file editing
 
        _branches = repo.scm_instance.branches
 
        # check if revision is a branch name or branch hash
 
        if revision not in _branches and revision not in _branches.values():
 
            webutils.flash(_('You can only edit files with revision '
 
                      'being a valid branch'), category='warning')
 
            raise HTTPFound(location=webutils.url('files_home',
 
                                  repo_name=repo_name, revision='tip',
 
                                  f_path=f_path))
 

	
 
        r_post = request.POST
 

	
 
        c.cs = self.__get_cs(revision)
 
        c.file = self.__get_filenode(c.cs, f_path)
 

	
 
        if c.file.is_binary:
 
            raise HTTPFound(location=url('files_home', repo_name=c.repo_name,
 
                            revision=c.cs.raw_id, f_path=f_path))
 
        c.default_message = _('Edited file %s via Kallithea') % (f_path)
 
        c.f_path = f_path
 

	
 
        if r_post:
 
            old_content = safe_str(c.file.content)
 
            sl = old_content.splitlines(1)
 
            first_line = sl[0] if sl else ''
 
            # modes:  0 - Unix, 1 - Mac, 2 - DOS
 
            mode = detect_mode(first_line, 0)
 
            content = convert_line_endings(r_post.get('content', ''), mode)
 

	
 
            message = r_post.get('message') or c.default_message
 
            author = request.authuser.full_contact
 

	
 
            if content == old_content:
 
                webutils.flash(_('No changes'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            try:
 
                self.scm_model.commit_change(repo=c.db_repo_scm_instance,
 
                                             repo_name=repo_name, cs=c.cs,
 
                                             user=request.authuser.user_id,
 
                                             ip_addr=request.ip_addr,
 
                                             author=author, message=message,
 
                                             content=content, f_path=f_path)
 
                webutils.flash(_('Successfully committed to %s') % f_path,
 
                        category='success')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                webutils.flash(_('Error occurred during commit'), category='error')
 
            raise HTTPFound(location=url('changeset_home',
 
                                repo_name=c.repo_name, revision='tip'))
 

	
 
        return base.render('files/files_edit.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('write')
 
    def add(self, repo_name, revision, f_path):
 

	
 
        repo = c.db_repo
 
        r_post = request.POST
 
        c.cs = self.__get_cs(revision, silent_empty=True)
 
        if c.cs is None:
 
            c.cs = EmptyChangeset(alias=c.db_repo_scm_instance.alias)
 
        c.default_message = (_('Added file via Kallithea'))
 
        c.f_path = f_path
 

	
 
        if r_post:
 
            unix_mode = 0
 
            content = convert_line_endings(r_post.get('content', ''), unix_mode)
 

	
 
            message = r_post.get('message') or c.default_message
 
            filename = r_post.get('filename')
 
            location = r_post.get('location', '')
 
            file_obj = r_post.get('upload_file', None)
 

	
 
            if file_obj is not None and hasattr(file_obj, 'filename'):
 
                filename = file_obj.filename
 
                content = file_obj.file
 

	
 
                if hasattr(content, 'file'):
 
                    # non posix systems store real file under file attr
 
                    content = content.file
 

	
 
            if not content:
 
                webutils.flash(_('No content'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            if not filename:
 
                webutils.flash(_('No filename'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            # strip all crap out of file, just leave the basename
 
            filename = os.path.basename(filename)
 
            node_path = posixpath.join(location, filename)
 
            author = request.authuser.full_contact
 

	
 
            try:
 
                nodes = {
 
                    node_path: {
 
                        'content': content
 
                    }
 
                }
 
                self.scm_model.create_nodes(
 
                    user=request.authuser.user_id,
 
                    ip_addr=request.ip_addr,
 
                    repo=c.db_repo,
 
                    message=message,
 
                    nodes=nodes,
 
                    parent_cs=c.cs,
 
                    author=author,
 
                )
 

	
 
                webutils.flash(_('Successfully committed to %s') % node_path,
 
                        category='success')
 
            except NonRelativePathError as e:
 
                webutils.flash(_('Location must be relative path and must not '
 
                          'contain .. in path'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            except (NodeError, NodeAlreadyExistsError) as e:
 
                webutils.flash(_(e), category='error')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                webutils.flash(_('Error occurred during commit'), category='error')
 
            raise HTTPFound(location=url('changeset_home',
 
                                repo_name=c.repo_name, revision='tip'))
 

	
 
        return base.render('files/files_add.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def archivefile(self, repo_name, fname):
 
        fileformat = None
 
        revision = None
 
        ext = None
 
        subrepos = request.GET.get('subrepos') == 'true'
 

	
 
        for a_type, ext_data in settings.ARCHIVE_SPECS.items():
 
            archive_spec = fname.split(ext_data[1])
 
            if len(archive_spec) == 2 and archive_spec[1] == '':
 
                fileformat = a_type or ext_data[1]
 
                revision = archive_spec[0]
 
                ext = ext_data[1]
 

	
 
        try:
 
            dbrepo = RepoModel().get_by_repo_name(repo_name)
 
            if not dbrepo.enable_downloads:
 
                return _('Downloads disabled') # TODO: do something else?
 

	
 
            if c.db_repo_scm_instance.alias == 'hg':
 
                # patch and reset hooks section of UI config to not run any
 
                # hooks on fetching archives with subrepos
 
                for k, v in c.db_repo_scm_instance._repo.ui.configitems('hooks'):
 
                    c.db_repo_scm_instance._repo.ui.setconfig('hooks', k, None)
 

	
 
            cs = c.db_repo_scm_instance.get_changeset(revision)
 
            content_type = settings.ARCHIVE_SPECS[fileformat][0]
 
        except ChangesetDoesNotExistError:
 
            return _('Unknown revision %s') % revision
 
        except EmptyRepositoryError:
 
            return _('Empty repository')
 
        except (ImproperArchiveTypeError, KeyError):
 
            return _('Unknown archive type')
 

	
 
        rev_name = cs.raw_id[:12]
 
        archive_name = '%s-%s%s' % (repo_name.replace('/', '_'), rev_name, ext)
 

	
 
        archive_path = None
 
        cached_archive_path = None
 
        archive_cache_dir = kallithea.CONFIG.get('archive_cache_dir')
 
        if archive_cache_dir and not subrepos: # TODO: subrepo caching?
 
            if not os.path.isdir(archive_cache_dir):
 
                os.makedirs(archive_cache_dir)
 
            cached_archive_path = os.path.join(archive_cache_dir, archive_name)
 
            if os.path.isfile(cached_archive_path):
 
                log.debug('Found cached archive in %s', cached_archive_path)
 
                archive_path = cached_archive_path
 
            else:
 
                log.debug('Archive %s is not yet cached', archive_name)
 

	
 
        if archive_path is None:
 
            # generate new archive
 
            fd, archive_path = tempfile.mkstemp()
 
            log.debug('Creating new temp archive in %s', archive_path)
 
            with os.fdopen(fd, 'wb') as stream:
 
                cs.fill_archive(stream=stream, kind=fileformat, subrepos=subrepos)
 
                # stream (and thus fd) has been closed by cs.fill_archive
 
            if cached_archive_path is not None:
 
                # we generated the archive - move it to cache
 
                log.debug('Storing new archive in %s', cached_archive_path)
 
                shutil.move(archive_path, cached_archive_path)
 
                archive_path = cached_archive_path
 

	
 
        def get_chunked_archive(archive_path):
 
            stream = open(archive_path, 'rb')
 
            while True:
 
                data = stream.read(16 * 1024)
 
                if not data:
 
                    break
 
                yield data
 
            stream.close()
 
            if archive_path != cached_archive_path:
 
                log.debug('Destroying temp archive %s', archive_path)
 
                os.remove(archive_path)
 

	
 
        userlog.action_logger(user=request.authuser,
 
                      action='user_downloaded_archive:%s' % (archive_name),
 
                      repo=repo_name, ipaddr=request.ip_addr, commit=True)
 

	
 
        response.content_disposition = str('attachment; filename=%s' % (archive_name))
 
        response.content_type = str(content_type)
 
        return get_chunked_archive(archive_path)
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def diff(self, repo_name, f_path):
 
        ignore_whitespace_diff = h.get_ignore_whitespace_diff(request.GET)
 
        diff_context_size = h.get_diff_context_size(request.GET)
 
        diff2 = request.GET.get('diff2', '')
 
        diff1 = request.GET.get('diff1', '') or diff2
 
        c.action = request.GET.get('diff')
 
        c.no_changes = diff1 == diff2
 
        c.f_path = f_path
 
        c.big_diff = False
 
        fulldiff = request.GET.get('fulldiff')
 
        c.changes = OrderedDict()
 
        c.changes[diff2] = []
 

	
 
        # special case if we want a show rev only, it's impl here
 
        # to reduce JS and callbacks
 

	
 
        if request.GET.get('show_rev'):
 
            if asbool(request.GET.get('annotate', 'False')):
 
                _url = url('files_annotate_home', repo_name=c.repo_name,
 
                           revision=diff1, f_path=c.f_path)
 
            else:
 
                _url = url('files_home', repo_name=c.repo_name,
 
                           revision=diff1, f_path=c.f_path)
 

	
 
            raise HTTPFound(location=_url)
 
        try:
 
            if diff1 not in ['', None, 'None', '0' * 12, '0' * 40]:
 
                c.changeset_1 = c.db_repo_scm_instance.get_changeset(diff1)
 
                try:
 
                    node1 = c.changeset_1.get_node(f_path)
 
                    if node1.is_dir():
 
                        raise NodeError('%s path is a %s not a file'
 
                                        % (node1, type(node1)))
 
                except NodeDoesNotExistError:
 
                    c.changeset_1 = EmptyChangeset(cs=diff1,
 
                                                   revision=c.changeset_1.revision,
 
                                                   repo=c.db_repo_scm_instance)
 
                    node1 = FileNode(f_path, '', changeset=c.changeset_1)
 
            else:
 
                c.changeset_1 = EmptyChangeset(repo=c.db_repo_scm_instance)
 
                node1 = FileNode(f_path, '', changeset=c.changeset_1)
 

	
 
            if diff2 not in ['', None, 'None', '0' * 12, '0' * 40]:
 
                c.changeset_2 = c.db_repo_scm_instance.get_changeset(diff2)
 
                try:
 
                    node2 = c.changeset_2.get_node(f_path)
 
                    if node2.is_dir():
 
                        raise NodeError('%s path is a %s not a file'
 
                                        % (node2, type(node2)))
 
                except NodeDoesNotExistError:
 
                    c.changeset_2 = EmptyChangeset(cs=diff2,
 
                                                   revision=c.changeset_2.revision,
 
                                                   repo=c.db_repo_scm_instance)
 
                    node2 = FileNode(f_path, '', changeset=c.changeset_2)
 
            else:
 
                c.changeset_2 = EmptyChangeset(repo=c.db_repo_scm_instance)
 
                node2 = FileNode(f_path, '', changeset=c.changeset_2)
 
        except (RepositoryError, NodeError):
 
            log.error(traceback.format_exc())
 
            raise HTTPFound(location=url('files_home', repo_name=c.repo_name,
 
                                f_path=f_path))
 

	
 
        if c.action == 'download':
 
            raw_diff = diffs.get_gitdiff(node1, node2,
 
                                      ignore_whitespace=ignore_whitespace_diff,
 
                                      context=diff_context_size)
 
            diff_name = '%s_vs_%s.diff' % (diff1, diff2)
 
            response.content_type = 'text/plain'
 
            response.content_disposition = (
 
                'attachment; filename=%s' % diff_name
 
            )
 
            return raw_diff
 

	
 
        elif c.action == 'raw':
 
            raw_diff = diffs.get_gitdiff(node1, node2,
 
                                      ignore_whitespace=ignore_whitespace_diff,
 
                                      context=diff_context_size)
 
            response.content_type = 'text/plain'
 
            return raw_diff
 

	
 
        else:
 
            fid = h.FID(diff2, node2.path)
 
            diff_limit = None if fulldiff else self.cut_off_limit
 
            c.a_rev, c.cs_rev, a_path, diff, st, op = diffs.wrapped_diff(filenode_old=node1,
 
            c.a_rev, c.cs_rev, a_path, diff, st, op = diffs.html_diff(filenode_old=node1,
 
                                         filenode_new=node2,
 
                                         diff_limit=diff_limit,
 
                                         ignore_whitespace=ignore_whitespace_diff,
 
                                         line_context=diff_context_size)
 
            c.file_diff_data = [(fid, fid, op, a_path, node2.path, diff, st)]
 
            return base.render('files/file_diff.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def diff_2way(self, repo_name, f_path):
 
        diff1 = request.GET.get('diff1', '')
 
        diff2 = request.GET.get('diff2', '')
 
        try:
 
            if diff1 not in ['', None, 'None', '0' * 12, '0' * 40]:
 
                c.changeset_1 = c.db_repo_scm_instance.get_changeset(diff1)
 
                try:
 
                    node1 = c.changeset_1.get_node(f_path)
 
                    if node1.is_dir():
 
                        raise NodeError('%s path is a %s not a file'
 
                                        % (node1, type(node1)))
 
                except NodeDoesNotExistError:
 
                    c.changeset_1 = EmptyChangeset(cs=diff1,
 
                                                   revision=c.changeset_1.revision,
 
                                                   repo=c.db_repo_scm_instance)
 
                    node1 = FileNode(f_path, '', changeset=c.changeset_1)
 
            else:
 
                c.changeset_1 = EmptyChangeset(repo=c.db_repo_scm_instance)
 
                node1 = FileNode(f_path, '', changeset=c.changeset_1)
 

	
 
            if diff2 not in ['', None, 'None', '0' * 12, '0' * 40]:
 
                c.changeset_2 = c.db_repo_scm_instance.get_changeset(diff2)
 
                try:
 
                    node2 = c.changeset_2.get_node(f_path)
 
                    if node2.is_dir():
 
                        raise NodeError('%s path is a %s not a file'
 
                                        % (node2, type(node2)))
 
                except NodeDoesNotExistError:
 
                    c.changeset_2 = EmptyChangeset(cs=diff2,
 
                                                   revision=c.changeset_2.revision,
 
                                                   repo=c.db_repo_scm_instance)
 
                    node2 = FileNode(f_path, '', changeset=c.changeset_2)
 
            else:
 
                c.changeset_2 = EmptyChangeset(repo=c.db_repo_scm_instance)
 
                node2 = FileNode(f_path, '', changeset=c.changeset_2)
 
        except ChangesetDoesNotExistError as e:
 
            msg = _('Such revision does not exist for this repository')
 
            webutils.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        c.node1 = node1
 
        c.node2 = node2
 
        c.cs1 = c.changeset_1
 
        c.cs2 = c.changeset_2
 

	
 
        return base.render('files/diff_2way.html')
 

	
 
    def _get_node_history(self, cs, f_path, changesets=None):
 
        """
 
        get changesets history for given node
 

	
 
        :param cs: changeset to calculate history
 
        :param f_path: path for node to calculate history for
 
        :param changesets: if passed don't calculate history and take
 
            changesets defined in this list
 
        """
 
        # calculate history based on tip
 
        tip_cs = c.db_repo_scm_instance.get_changeset()
 
        if changesets is None:
 
            try:
 
                changesets = tip_cs.get_file_history(f_path)
 
            except (NodeDoesNotExistError, ChangesetError):
 
                # this node is not present at tip !
 
                changesets = cs.get_file_history(f_path)
 
        hist_l = []
 

	
 
        changesets_group = ([], _("Changesets"))
 
        branches_group = ([], _("Branches"))
 
        tags_group = ([], _("Tags"))
 
        for chs in changesets:
 
            # TODO: loop over chs.branches ... but that will not give all the bogus None branches for Git ...
 
            _branch = chs.branch
 
            n_desc = '%s (%s)' % (h.show_id(chs), _branch)
 
            changesets_group[0].append((chs.raw_id, n_desc,))
 
        hist_l.append(changesets_group)
 

	
 
        for name, chs in c.db_repo_scm_instance.branches.items():
 
            branches_group[0].append((chs, name),)
 
        hist_l.append(branches_group)
 

	
 
        for name, chs in c.db_repo_scm_instance.tags.items():
 
            tags_group[0].append((chs, name),)
 
        hist_l.append(tags_group)
 

	
 
        return hist_l, changesets
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    @base.jsonify
 
    def nodelist(self, repo_name, revision, f_path):
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            cs = self.__get_cs(revision)
 
            _d, _f = ScmModel().get_nodes(repo_name, cs.raw_id, f_path,
 
                                          flat=False)
 
            return {'nodes': _d + _f}
kallithea/lib/diffs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.diffs
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Set of diffing helpers, previously part of vcs
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 4, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 
import difflib
 
import logging
 
import re
 

	
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import webutils
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.exceptions import VCSError
 
from kallithea.lib.vcs.nodes import FileNode, SubModuleNode
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _safe_id(idstring):
 
    r"""Make a string safe for including in an id attribute.
 

	
 
    The HTML spec says that id attributes 'must begin with
 
    a letter ([A-Za-z]) and may be followed by any number
 
    of letters, digits ([0-9]), hyphens ("-"), underscores
 
    ("_"), colons (":"), and periods (".")'. These regexps
 
    are slightly over-zealous, in that they remove colons
 
    and periods unnecessarily.
 

	
 
    Whitespace is transformed into underscores, and then
 
    anything which is not a hyphen or a character that
 
    matches \w (alphanumerics and underscore) is removed.
 

	
 
    """
 
    # Transform all whitespace to underscore
 
    idstring = re.sub(r'\s', "_", idstring)
 
    # Remove everything that is not a hyphen or a member of \w
 
    idstring = re.sub(r'(?!-)\W', "", idstring).lower()
 
    return idstring
 

	
 

	
 
def as_html(table_class='code-difftable', line_class='line',
 
            old_lineno_class='lineno old', new_lineno_class='lineno new',
 
            no_lineno_class='lineno',
 
            code_class='code', parsed_lines=None):
 
    """
 
    Return given diff as html table with customized css classes
 
    """
 
    _html_empty = True
 
    _html = []
 
    _html.append('''<table class="%(table_class)s">\n''' % {
 
        'table_class': table_class
 
    })
 

	
 
    for file_info in parsed_lines:
 
        count_no_lineno = 0  # counter to allow comments on lines without new/old line numbers
 
        for chunk in file_info['chunks']:
 
            _html_empty = False
 
            for change in chunk:
 
                _html.append('''<tr class="%(lc)s %(action)s">\n''' % {
 
                    'lc': line_class,
 
                    'action': change['action']
 
                })
 
                if change['old_lineno'] or change['new_lineno']:
 
                    ###########################################################
 
                    # OLD LINE NUMBER
 
                    ###########################################################
 
                    anchor_old = "%(filename)s_o%(oldline_no)s" % {
 
                        'filename': _safe_id(file_info['filename']),
 
                        'oldline_no': change['old_lineno']
 
                    }
 
                    anchor_old_id = ''
 
                    if change['old_lineno']:
 
                        anchor_old_id = 'id="%s"' % anchor_old
 
                    _html.append('''\t<td %(a_id)s class="%(olc)s">''' % {
 
                        'a_id': anchor_old_id,
 
                        'olc': old_lineno_class,
 
                    })
 
                    _html.append('''<a href="%(url)s" data-pseudo-content="%(label)s"></a>''' % {
 
                        'label': change['old_lineno'],
 
                        'url': '#%s' % anchor_old,
 
                    })
 
                    _html.append('''</td>\n''')
 
                    ###########################################################
 
                    # NEW LINE NUMBER
 
                    ###########################################################
 
                    anchor_new = "%(filename)s_n%(newline_no)s" % {
 
                        'filename': _safe_id(file_info['filename']),
 
                        'newline_no': change['new_lineno']
 
                    }
 
                    anchor_new_id = ''
 
                    if change['new_lineno']:
 
                        anchor_new_id = 'id="%s"' % anchor_new
 
                    _html.append('''\t<td %(a_id)s class="%(nlc)s">''' % {
 
                        'a_id': anchor_new_id,
 
                        'nlc': new_lineno_class
 
                    })
 
                    _html.append('''<a href="%(url)s" data-pseudo-content="%(label)s"></a>''' % {
 
                        'label': change['new_lineno'],
 
                        'url': '#%s' % anchor_new,
 
                    })
 
                    _html.append('''</td>\n''')
 
                else:
 
                    ###########################################################
 
                    # NO LINE NUMBER
 
                    ###########################################################
 
                    anchor = "%(filename)s_%(count_no_lineno)s" % {
 
                        'filename': _safe_id(file_info['filename']),
 
                        'count_no_lineno': count_no_lineno,
 
                    }
 
                    count_no_lineno += 1
 
                    _html.append('''\t<td id="%(anchor)s" class="%(olc)s" colspan="2">''' % {
 
                        'anchor': anchor,
 
                        'olc': no_lineno_class,
 
                    })
 
                    _html.append('''</td>\n''')
 
                ###########################################################
 
                # CODE
 
                ###########################################################
 
                _html.append('''\t<td class="%(cc)s">''' % {
 
                    'cc': code_class,
 
                })
 
                _html.append('''\n\t\t<div class="add-bubble"><div>&nbsp;</div></div><pre>%(code)s</pre>\n''' % {
 
                    'code': change['line']
 
                })
 

	
 
                _html.append('''\t</td>''')
 
                _html.append('''\n</tr>\n''')
 
    _html.append('''</table>''')
 
    if _html_empty:
 
        return None
 
    return ''.join(_html)
 

	
 

	
 
def wrap_to_table(html):
 
    """Given a string with html, return it wrapped in a table, similar to what
 
    as_html returns."""
 
    return '''\
 
              <table class="code-difftable">
 
                <tr class="line">
 
                <td class="lineno new"></td>
 
                <td class="code"><pre>%s</pre></td>
 
                </tr>
 
              </table>''' % html
 

	
 

	
 
def wrapped_diff(filenode_old, filenode_new, diff_limit=None,
 
def html_diff(filenode_old, filenode_new, diff_limit=None,
 
                ignore_whitespace=True, line_context=3):
 
    """
 
    Returns a file diff wrapped into a table.
 
    Returns a file diff as HTML wrapped into a table.
 
    Checks for diff_limit and presents a message if the diff is too big.
 
    """
 
    if filenode_old is None:
 
        filenode_old = FileNode(filenode_new.path, '', EmptyChangeset())
 

	
 
    op = None
 
    a_path = filenode_old.path # default, might be overriden by actual rename in diff
 
    if filenode_old.is_binary or filenode_new.is_binary:
 
        html_diff = wrap_to_table(_('Binary file'))
 
        stats = (0, 0)
 

	
 
    elif diff_limit != -1 and (
 
            diff_limit is None or
 
            (filenode_old.size < diff_limit and filenode_new.size < diff_limit)):
 

	
 
        raw_diff = get_gitdiff(filenode_old, filenode_new,
 
                                ignore_whitespace=ignore_whitespace,
 
                                context=line_context)
 
        diff_processor = DiffProcessor(raw_diff)
 
        if diff_processor.parsed: # there should be exactly one element, for the specified file
 
            f = diff_processor.parsed[0]
 
            op = f['operation']
 
            a_path = f['old_filename']
 

	
 
        html_diff = as_html(parsed_lines=diff_processor.parsed)
 
        stats = diff_processor.stat()
 

	
 
    else:
 
        html_diff = wrap_to_table(_('Changeset was too big and was cut off, use '
 
                               'diff menu to display this diff'))
 
        stats = (0, 0)
 

	
 
    if not html_diff:
 
        submodules = [o for o in [filenode_new, filenode_old] if isinstance(o, SubModuleNode)]
 
        if submodules:
 
            html_diff = wrap_to_table(webutils.escape('Submodule %r' % submodules[0]))
 
        else:
 
            html_diff = wrap_to_table(_('No changes detected'))
 

	
 
    cs1 = filenode_old.changeset.raw_id
 
    cs2 = filenode_new.changeset.raw_id
 

	
 
    return cs1, cs2, a_path, html_diff, stats, op
 

	
 

	
 
def get_gitdiff(filenode_old, filenode_new, ignore_whitespace=True, context=3):
 
    """
 
    Returns git style diff between given ``filenode_old`` and ``filenode_new``.
 
    """
 
    # make sure we pass in default context
 
    context = context or 3
 
    submodules = [o for o in [filenode_new, filenode_old] if isinstance(o, SubModuleNode)]
 
    if submodules:
 
        return b''
 

	
 
    for filenode in (filenode_old, filenode_new):
 
        if not isinstance(filenode, FileNode):
 
            raise VCSError("Given object should be FileNode object, not %s"
 
                % filenode.__class__)
 

	
 
    repo = filenode_new.changeset.repository
 
    old_raw_id = getattr(filenode_old.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 
    new_raw_id = getattr(filenode_new.changeset, 'raw_id', repo.EMPTY_CHANGESET)
 

	
 
    vcs_gitdiff = get_diff(repo, old_raw_id, new_raw_id, filenode_new.path,
 
                           ignore_whitespace, context)
 
    return vcs_gitdiff
 

	
 

	
 
def get_diff(scm_instance, rev1, rev2, path=None, ignore_whitespace=False, context=3):
 
    """
 
    A thin wrapper around vcs lib get_diff.
 
    """
 
    try:
 
        return scm_instance.get_diff(rev1, rev2, path=path,
 
                                     ignore_whitespace=ignore_whitespace, context=context)
 
    except MemoryError:
 
        webutils.flash('MemoryError: Diff is too big', category='error')
 
        return b''
 

	
 

	
 
NEW_FILENODE = 1
 
DEL_FILENODE = 2
 
MOD_FILENODE = 3
 
RENAMED_FILENODE = 4
 
COPIED_FILENODE = 5
 
CHMOD_FILENODE = 6
 
BIN_FILENODE = 7
 

	
 

	
 
class DiffProcessor(object):
 
    """
 
    Give it a unified or git diff and it returns a list of the files that were
 
    mentioned in the diff together with a dict of meta information that
 
    can be used to render it in a HTML template.
 
    """
 
    _diff_git_re = re.compile(b'^diff --git', re.MULTILINE)
 

	
 
    def __init__(self, diff, vcs='hg', diff_limit=None, inline_diff=True):
 
        """
 
        :param diff:   a text in diff format
 
        :param vcs: type of version control hg or git
 
        :param diff_limit: define the size of diff that is considered "big"
 
            based on that parameter cut off will be triggered, set to None
 
            to show full diff
 
        """
 
        if not isinstance(diff, bytes):
 
            raise Exception('Diff must be bytes - got %s' % type(diff))
 

	
 
        self._diff = memoryview(diff)
 
        self.adds = 0
 
        self.removes = 0
 
        self.diff_limit = diff_limit
 
        self.limited_diff = False
 
        self.vcs = vcs
 
        self.parsed = self._parse_gitdiff(inline_diff=inline_diff)
 

	
 
    def _parse_gitdiff(self, inline_diff):
 
        """Parse self._diff and return a list of dicts with meta info and chunks for each file.
 
        Might set limited_diff.
 
        Optionally, do an extra pass and to extra markup of one-liner changes.
 
        """
 
        _files = [] # list of dicts with meta info and chunks
 

	
 
        starts = [m.start() for m in self._diff_git_re.finditer(self._diff)]
 
        starts.append(len(self._diff))
 

	
 
        for start, end in zip(starts, starts[1:]):
 
            if self.diff_limit and end > self.diff_limit:
 
                self.limited_diff = True
 
                continue
 

	
 
            head, diff_lines = _get_header(self.vcs, self._diff[start:end])
 

	
 
            op = None
 
            stats = {
 
                'added': 0,
 
                'deleted': 0,
 
                'binary': False,
 
                'ops': {},
 
            }
 

	
 
            if head['deleted_file_mode']:
 
                op = 'removed'
 
                stats['binary'] = True
 
                stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
            elif head['new_file_mode']:
 
                op = 'added'
 
                stats['binary'] = True
 
                stats['ops'][NEW_FILENODE] = 'new file %s' % head['new_file_mode']
 
            else:  # modify operation, can be cp, rename, chmod
 
                # CHMOD
 
                if head['new_mode'] and head['old_mode']:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][CHMOD_FILENODE] = ('modified file chmod %s => %s'
 
                                        % (head['old_mode'], head['new_mode']))
 
                # RENAME
 
                if (head['rename_from'] and head['rename_to']
 
                      and head['rename_from'] != head['rename_to']):
 
                    op = 'renamed'
 
                    stats['binary'] = True
 
                    stats['ops'][RENAMED_FILENODE] = ('file renamed from %s to %s'
 
                                    % (head['rename_from'], head['rename_to']))
 
                # COPY
 
                if head.get('copy_from') and head.get('copy_to'):
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][COPIED_FILENODE] = ('file copied from %s to %s'
 
                                        % (head['copy_from'], head['copy_to']))
 
                # FALL BACK: detect missed old style add or remove
 
                if op is None:
 
                    if not head['a_file'] and head['b_file']:
 
                        op = 'added'
 
                        stats['binary'] = True
 
                        stats['ops'][NEW_FILENODE] = 'new file'
 

	
 
                    elif head['a_file'] and not head['b_file']:
 
                        op = 'removed'
 
                        stats['binary'] = True
 
                        stats['ops'][DEL_FILENODE] = 'deleted file'
 

	
 
                # it's not ADD not DELETE
 
                if op is None:
 
                    op = 'modified'
 
                    stats['binary'] = True
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 

	
 
            # a real non-binary diff
 
            if head['a_file'] or head['b_file']:
 
                chunks, added, deleted = _parse_lines(diff_lines)
 
                stats['binary'] = False
 
                stats['added'] = added
 
                stats['deleted'] = deleted
 
                # explicit mark that it's a modified file
 
                if op == 'modified':
 
                    stats['ops'][MOD_FILENODE] = 'modified file'
 
            else:  # Git binary patch (or empty diff)
 
                # Git binary patch
 
                if head['bin_patch']:
 
                    stats['ops'][BIN_FILENODE] = 'binary diff not shown'
 
                chunks = []
 

	
 
            if op == 'removed' and chunks:
 
                # a way of seeing deleted content could perhaps be nice - but
 
                # not with the current UI
 
                chunks = []
 

	
 
            chunks.insert(0, [{
 
                'old_lineno': '',
 
                'new_lineno': '',
 
                'action':     'context',
 
                'line':       msg,
 
                } for _op, msg in stats['ops'].items()
 
                  if _op not in [MOD_FILENODE]])
 

	
 
            _files.append({
 
                'old_filename':     head['a_path'],
 
                'filename':         head['b_path'],
 
                'old_revision':     head['a_blob_id'],
 
                'new_revision':     head['b_blob_id'],
 
                'chunks':           chunks,
 
                'operation':        op,
 
                'stats':            stats,
 
            })
 

	
 
        if not inline_diff:
 
            return _files
 

	
 
        # highlight inline changes when one del is followed by one add
 
        for diff_data in _files:
 
            for chunk in diff_data['chunks']:
 
                lineiter = iter(chunk)
 
                try:
 
                    peekline = next(lineiter)
 
                    while True:
 
                        # find a first del line
 
                        while peekline['action'] != 'del':
 
                            peekline = next(lineiter)
 
                        delline = peekline
 
                        peekline = next(lineiter)
 
                        # if not followed by add, eat all following del lines
 
                        if peekline['action'] != 'add':
 
                            while peekline['action'] == 'del':
 
                                peekline = next(lineiter)
 
                            continue
 
                        # found an add - make sure it is the only one
 
                        addline = peekline
 
                        try:
 
                            peekline = next(lineiter)
 
                        except StopIteration:
 
                            # add was last line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                            raise
 
                        if peekline['action'] != 'add':
 
                            # there was only one add line - ok
 
                            _highlight_inline_diff(delline, addline)
 
                except StopIteration:
 
                    pass
 

	
 
        return _files
 

	
 
    def stat(self):
 
        """
 
        Returns tuple of added, and removed lines for this instance
 
        """
 
        return self.adds, self.removes
 

	
 

	
 
_escape_re = re.compile(r'(&)|(<)|(>)|(\t)($)?|(\r)|( $)')
 

	
 

	
 
def _escaper(diff_line):
 
    r"""
 
    Do HTML escaping/markup of a single diff line (including first +/- column)
 

	
 
    >>> _escaper('foobar')
 
    'foobar'
 
    >>> _escaper('@foo & bar')
 
    '@foo &amp; bar'
 
    >>> _escaper('+foo < bar')
 
    '+foo &lt; bar'
 
    >>> _escaper('-foo > bar')
 
    '-foo &gt; bar'
 
    >>> _escaper(' <foo>')
 
    ' &lt;foo&gt;'
 
    >>> _escaper(' foo\tbar')
 
    ' foo<u>\t</u>bar'
 
    >>> _escaper(' foo\rbar\r')
 
    ' foo<u class="cr"></u>bar<u class="cr"></u>'
 
    >>> _escaper(' foo\t')
 
    ' foo<u>\t</u><i></i>'
 
    >>> _escaper(' foo ')
 
    ' foo <i></i>'
 
    >>> _escaper(' foo  ')
 
    ' foo  <i></i>'
 
    >>> _escaper(' ')
 
    ' '
 
    >>> _escaper('  ')
 
    '  <i></i>'
 
    >>> _escaper(' \t')
 
    ' <u>\t</u><i></i>'
 
    >>> _escaper(' \t  ')
 
    ' <u>\t</u>  <i></i>'
 
    >>> _escaper('   \t')
 
    '   <u>\t</u><i></i>'
 
    >>> _escaper(' \t\t  ')
 
    ' <u>\t</u><u>\t</u>  <i></i>'
 
    >>> _escaper('   \t\t')
 
    '   <u>\t</u><u>\t</u><i></i>'
 
    >>> _escaper(' foo&bar<baz>  ')
 
    ' foo&amp;bar&lt;baz&gt;  <i></i>'
 
    """
 

	
 
    def substitute(m):
 
        groups = m.groups()
 
        if groups[0]:
 
            return '&amp;'
 
        if groups[1]:
 
            return '&lt;'
 
        if groups[2]:
 
            return '&gt;'
 
        if groups[3]:
 
            if groups[4] is not None:  # end of line
 
                return '<u>\t</u><i></i>'
 
            return '<u>\t</u>'
 
        if groups[5]:
 
            return '<u class="cr"></u>'
 
        if groups[6]:
 
            if m.start() == 0:
 
                return ' '  # first column space shouldn't make empty lines show up as trailing space
 
            return ' <i></i>'
 
        assert False
 

	
 
    return _escape_re.sub(substitute, diff_line)
 

	
 

	
 
_git_header_re = re.compile(br"""
 
    ^diff[ ]--git[ ](?P<a_path_quote>"?)a/(?P<a_path>.+?)(?P=a_path_quote)[ ](?P<b_path_quote>"?)b/(?P<b_path>.+?)(?P=a_path_quote)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%\n
 
       ^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](?P<a_file_quote>"?)(a/(?P<a_file>.+?)(?P=a_file_quote)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](?P<b_file_quote>"?)(b/(?P<b_file>.+?)(?P=b_file_quote)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
_hg_header_re = re.compile(br"""
 
    ^diff[ ]--git[ ]a/(?P<a_path>.+?)[ ]b/(?P<b_path>.+?)\n
 
    (?:^old[ ]mode[ ](?P<old_mode>\d+)\n
 
       ^new[ ]mode[ ](?P<new_mode>\d+)(?:\n|$))?
 
    (?:^similarity[ ]index[ ](?P<similarity_index>\d+)%(?:\n|$))?
 
    (?:^rename[ ]from[ ](?P<rename_from>.+)\n
 
       ^rename[ ]to[ ](?P<rename_to>.+)(?:\n|$))?
 
    (?:^copy[ ]from[ ](?P<copy_from>.+)\n
 
       ^copy[ ]to[ ](?P<copy_to>.+)(?:\n|$))?
 
    (?:^new[ ]file[ ]mode[ ](?P<new_file_mode>.+)(?:\n|$))?
 
    (?:^deleted[ ]file[ ]mode[ ](?P<deleted_file_mode>.+)(?:\n|$))?
 
    (?:^index[ ](?P<a_blob_id>[0-9A-Fa-f]+)
 
        \.\.(?P<b_blob_id>[0-9A-Fa-f]+)[ ]?(?P<b_mode>.+)?(?:\n|$))?
 
    (?:^(?P<bin_patch>GIT[ ]binary[ ]patch)(?:\n|$))?
 
    (?:^---[ ](a/(?P<a_file>.+?)|/dev/null)\t?(?:\n|$))?
 
    (?:^\+\+\+[ ](b/(?P<b_file>.+?)|/dev/null)\t?(?:\n|$))?
 
""", re.VERBOSE | re.MULTILINE)
 

	
 

	
 
_header_next_check = re.compile(br'''(?!@)(?!literal )(?!delta )''')
 

	
 

	
 
_git_bs_escape_re = re.compile(r'\\(?:([^0-9])|([0-9]{3}))')
 

	
 

	
 
_git_bs_escape_dict = {'\\': '\\', '"': '"', 'r': '\r', 'n': '\n', 't': '\t'}
 

	
 

	
 
def _git_bs_unescape_m(m):
 
    c = m.group(1)
 
    if c is not None:
 
        return _git_bs_escape_dict.get(c) or ('\\' + c)
 
    return chr(int(m.group(2), 8))
 

	
 

	
 
def _get_header(vcs, diff_chunk):
 
    """
 
    Parses a Git diff for a single file (header and chunks) and returns a tuple with:
 

	
 
    1. A dict with meta info:
 

	
 
        a_path, b_path, similarity_index, rename_from, rename_to,
 
        old_mode, new_mode, new_file_mode, deleted_file_mode,
 
        a_blob_id, b_blob_id, b_mode, a_file, b_file
 

	
 
    2. An iterator yielding lines with simple HTML markup.
 
    """
 
    match = None
 
    if vcs == 'git':
 
        match = _git_header_re.match(diff_chunk)
 
    elif vcs == 'hg':
 
        match = _hg_header_re.match(diff_chunk)
 
    if match is None:
 
        raise Exception('diff not recognized as valid %s diff: %r' % (vcs, safe_str(bytes(diff_chunk[:1000]))))
 
    meta_info = {k: None if v is None else safe_str(v) for k, v in match.groupdict().items()}
 
    if vcs == 'git':
 
        for k in ['a_path', 'b_path', 'a_file', 'b_file']:
 
            v = meta_info.get(k)
 
            if v:
 
                meta_info[k] = _git_bs_escape_re.sub(_git_bs_unescape_m, v)
 
    rest = diff_chunk[match.end():]
 
    if rest:
 
        if _header_next_check.match(rest):
 
            raise Exception('cannot parse %s diff header: %r followed by %r' % (vcs, safe_str(bytes(diff_chunk[:match.end()])), safe_str(bytes(rest[:1000]))))
 
        if rest[-1:] != b'\n':
 
            # The diff will generally already have trailing \n (and be a memoryview). It might also be huge so we don't want to allocate it twice. But in this very rare case, we don't care.
 
            rest = bytes(rest) + b'\n'
 
    diff_lines = (_escaper(safe_str(m.group(1))) for m in re.finditer(br'(.*)\n', rest))
 
    return meta_info, diff_lines
 

	
 

	
 
_chunk_re = re.compile(r'^@@ -(\d+)(?:,(\d+))? \+(\d+)(?:,(\d+))? @@(.*)')
 
_newline_marker = re.compile(r'^\\ No newline at end of file')
 

	
 

	
 
def _parse_lines(diff_lines):
 
    """
 
    Given an iterator of diff body lines, parse them and return a dict per
 
    line and added/removed totals.
 
    """
 
    added = deleted = 0
 
    old_line = old_end = new_line = new_end = None
 

	
 
    chunks = []
 
    try:
 
        line = next(diff_lines)
 

	
 
        while True:
 
            lines = []
 
            chunks.append(lines)
 

	
 
            match = _chunk_re.match(line)
 

	
 
            if not match:
 
                raise Exception('error parsing diff @@ line %r' % line)
 

	
 
            gr = match.groups()
 
            (old_line, old_end,
 
             new_line, new_end) = [int(x or 1) for x in gr[:-1]]
 
            old_line -= 1
 
            new_line -= 1
 

	
 
            context = len(gr) == 5
 
            old_end += old_line
 
            new_end += new_line
 

	
 
            if context:
 
                # skip context only if it's first line
 
                if int(gr[0]) > 1:
 
                    lines.append({
 
                        'old_lineno': '',
 
                        'new_lineno': '',
 
                        'action':     'context',
 
                        'line':       line,
 
                    })
 

	
 
            line = next(diff_lines)
 

	
 
            while old_line < old_end or new_line < new_end:
 
                if not line:
 
                    raise Exception('error parsing diff - empty line at -%s+%s' % (old_line, new_line))
 

	
 
                affects_old = affects_new = False
 

	
 
                command = line[0]
 
                if command == '+':
 
                    affects_new = True
 
                    action = 'add'
 
                    added += 1
 
                elif command == '-':
 
                    affects_old = True
 
                    action = 'del'
 
                    deleted += 1
 
                elif command == ' ':
 
                    affects_old = affects_new = True
 
                    action = 'unmod'
 
                else:
 
                    raise Exception('error parsing diff - unknown command in line %r at -%s+%s' % (line, old_line, new_line))
 

	
 
                if not _newline_marker.match(line):
 
                    old_line += affects_old
 
                    new_line += affects_new
 
                    lines.append({
 
                        'old_lineno':   affects_old and old_line or '',
 
                        'new_lineno':   affects_new and new_line or '',
 
                        'action':       action,
 
                        'line':         line[1:],
 
                    })
 

	
 
                line = next(diff_lines)
 

	
 
                if _newline_marker.match(line):
 
                    # we need to append to lines, since this is not
 
                    # counted in the line specs of diff
 
                    lines.append({
 
                        'old_lineno':   '',
 
                        'new_lineno':   '',
 
                        'action':       'context',
 
                        'line':         line,
 
                    })
 
                    line = next(diff_lines)
 
            if old_line > old_end:
 
                raise Exception('error parsing diff - more than %s "-" lines at -%s+%s' % (old_end, old_line, new_line))
 
            if new_line > new_end:
 
                raise Exception('error parsing diff - more than %s "+" lines at -%s+%s' % (new_end, old_line, new_line))
 
    except StopIteration:
 
        pass
 
    if old_line != old_end or new_line != new_end:
 
        raise Exception('diff processing broken when old %s<>%s or new %s<>%s line %r' % (old_line, old_end, new_line, new_end, line))
 

	
 
    return chunks, added, deleted
 

	
 
# Used for inline highlighter word split, must match the substitutions in _escaper
 
_token_re = re.compile(r'()(&amp;|&lt;|&gt;|<u>\t</u>|<u class="cr"></u>| <i></i>|\W+?)')
 

	
 

	
 
def _highlight_inline_diff(old, new):
 
    """
 
    Highlight simple add/remove in two lines given as info dicts. They are
 
    modified in place and given markup with <del>/<ins>.
 
    """
 
    assert old['action'] == 'del'
 
    assert new['action'] == 'add'
 

	
 
    oldwords = _token_re.split(old['line'])
 
    newwords = _token_re.split(new['line'])
 
    sequence = difflib.SequenceMatcher(None, oldwords, newwords)
 

	
 
    oldfragments, newfragments = [], []
 
    for tag, i1, i2, j1, j2 in sequence.get_opcodes():
 
        oldfrag = ''.join(oldwords[i1:i2])
 
        newfrag = ''.join(newwords[j1:j2])
 
        if tag != 'equal':
 
            if oldfrag:
 
                oldfrag = '<del>%s</del>' % oldfrag
 
            if newfrag:
 
                newfrag = '<ins>%s</ins>' % newfrag
 
        oldfragments.append(oldfrag)
 
        newfragments.append(newfrag)
 

	
 
    old['line'] = "".join(oldfragments)
 
    new['line'] = "".join(newfragments)
0 comments (0 inline, 0 general)