Changeset - b095a92a4312
[Not reviewed]
default
0 3 0
Mads Kiilerich (mads) - 5 years ago 2020-10-29 11:38:10
mads@kiilerich.com
Grafted from: ef144335e79a
lib: drop h.email alias - use vcs.utils directly
3 files changed with 6 insertions and 10 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/files.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.controllers.files
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Files controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 21, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import os
 
import posixpath
 
import shutil
 
import tempfile
 
import traceback
 
from collections import OrderedDict
 

	
 
from tg import request, response
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound, HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import diffs
 
from kallithea.lib import helpers as h
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController, jsonify, render
 
from kallithea.lib.exceptions import NonRelativePathError
 
from kallithea.lib.utils import action_logger
 
from kallithea.lib.utils2 import asbool, convert_line_endings, detect_mode, safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (ChangesetDoesNotExistError, ChangesetError, EmptyRepositoryError, ImproperArchiveTypeError, NodeAlreadyExistsError,
 
                                          NodeDoesNotExistError, NodeError, RepositoryError, VCSError)
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.lib.vcs.utils import author_email
 
from kallithea.lib.webutils import url
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FilesController(BaseRepoController):
 

	
 
    def _before(self, *args, **kwargs):
 
        super(FilesController, self)._before(*args, **kwargs)
 

	
 
    def __get_cs(self, rev, silent_empty=False):
 
        """
 
        Safe way to get changeset if error occur it redirects to tip with
 
        proper message
 

	
 
        :param rev: revision to fetch
 
        :silent_empty: return None if repository is empty
 
        """
 

	
 
        try:
 
            return c.db_repo_scm_instance.get_changeset(rev)
 
        except EmptyRepositoryError as e:
 
            if silent_empty:
 
                return None
 
            url_ = url('files_add_home',
 
                       repo_name=c.repo_name,
 
                       revision=0, f_path='', anchor='edit')
 
            add_new = h.link_to(_('Click here to add new file'), url_, class_="alert-link")
 
            h.flash(_('There are no files yet.') + ' ' + add_new, category='warning')
 
            raise HTTPNotFound()
 
        except (ChangesetDoesNotExistError, LookupError):
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(e, category='error')
 
            raise HTTPNotFound()
 

	
 
    def __get_filenode(self, cs, path):
 
        """
 
        Returns file_node or raise HTTP error.
 

	
 
        :param cs: given changeset
 
        :param path: path to lookup
 
        """
 

	
 
        try:
 
            file_node = cs.get_node(path)
 
            if file_node.is_dir():
 
                raise RepositoryError('given path is a directory')
 
        except ChangesetDoesNotExistError:
 
            msg = _('Such revision does not exist for this repository')
 
            h.flash(msg, category='error')
 
            raise HTTPNotFound()
 
        except RepositoryError as e:
 
            h.flash(e, category='error')
 
            raise HTTPNotFound()
 

	
 
        return file_node
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self, repo_name, revision, f_path, annotate=False):
 
        # redirect to given revision from form if given
 
        post_revision = request.POST.get('at_rev', None)
 
        if post_revision:
 
            cs = self.__get_cs(post_revision) # FIXME - unused!
 

	
 
        c.revision = revision
 
        c.changeset = self.__get_cs(revision)
 
        c.branch = request.GET.get('branch', None)
 
        c.f_path = f_path
 
        c.annotate = annotate
 
        cur_rev = c.changeset.revision
 
        # used in files_source.html:
 
        c.cut_off_limit = self.cut_off_limit
 
        c.fulldiff = request.GET.get('fulldiff')
 

	
 
        # prev link
 
        try:
 
            prev_rev = c.db_repo_scm_instance.get_changeset(cur_rev).prev(c.branch)
 
            c.url_prev = url('files_home', repo_name=c.repo_name,
 
                         revision=prev_rev.raw_id, f_path=f_path)
 
            if c.branch:
 
                c.url_prev += '?branch=%s' % c.branch
 
        except (ChangesetDoesNotExistError, VCSError):
 
            c.url_prev = '#'
 

	
 
        # next link
 
        try:
 
            next_rev = c.db_repo_scm_instance.get_changeset(cur_rev).next(c.branch)
 
            c.url_next = url('files_home', repo_name=c.repo_name,
 
                     revision=next_rev.raw_id, f_path=f_path)
 
            if c.branch:
 
                c.url_next += '?branch=%s' % c.branch
 
        except (ChangesetDoesNotExistError, VCSError):
 
            c.url_next = '#'
 

	
 
        # files or dirs
 
        try:
 
            c.file = c.changeset.get_node(f_path)
 

	
 
            if c.file.is_submodule():
 
                raise HTTPFound(location=c.file.url)
 
            elif c.file.is_file():
 
                c.load_full_history = False
 
                # determine if we're on branch head
 
                _branches = c.db_repo_scm_instance.branches
 
                c.on_branch_head = revision in _branches or revision in _branches.values()
 
                _hist = []
 
                c.file_history = []
 
                if c.load_full_history:
 
                    c.file_history, _hist = self._get_node_history(c.changeset, f_path)
 

	
 
                c.authors = []
 
                for a in set([x.author for x in _hist]):
 
                    c.authors.append((h.email(a), h.person(a)))
 
                    c.authors.append((author_email(a), h.person(a)))
 
            else:
 
                c.authors = c.file_history = []
 
        except RepositoryError as e:
 
            h.flash(e, category='error')
 
            raise HTTPNotFound()
 

	
 
        if request.environ.get('HTTP_X_PARTIAL_XHR'):
 
            return render('files/files_ypjax.html')
 

	
 
        # TODO: tags and bookmarks?
 
        c.revision_options = [(c.changeset.raw_id,
 
                              _('%s at %s') % (b, h.short_id(c.changeset.raw_id))) for b in c.changeset.branches] + \
 
            [(n, b) for b, n in c.db_repo_scm_instance.branches.items()]
 
        if c.db_repo_scm_instance.closed_branches:
 
            prefix = _('(closed)') + ' '
 
            c.revision_options += [('-', '-')] + \
 
                [(n, prefix + b) for b, n in c.db_repo_scm_instance.closed_branches.items()]
 

	
 
        return render('files/files.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def history(self, repo_name, revision, f_path):
 
        changeset = self.__get_cs(revision)
 
        _file = changeset.get_node(f_path)
 
        if _file.is_file():
 
            file_history, _hist = self._get_node_history(changeset, f_path)
 

	
 
            res = []
 
            for obj in file_history:
 
                res.append({
 
                    'text': obj[1],
 
                    'children': [{'id': o[0], 'text': o[1]} for o in obj[0]]
 
                })
 

	
 
            data = {
 
                'more': False,
 
                'results': res
 
            }
 
            return data
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def authors(self, repo_name, revision, f_path):
 
        changeset = self.__get_cs(revision)
 
        _file = changeset.get_node(f_path)
 
        if _file.is_file():
 
            file_history, _hist = self._get_node_history(changeset, f_path)
 
            c.authors = []
 
            for a in set([x.author for x in _hist]):
 
                c.authors.append((h.email(a), h.person(a)))
 
                c.authors.append((author_email(a), h.person(a)))
 
            return render('files/files_history_box.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def rawfile(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        response.content_disposition = \
 
            'attachment; filename=%s' % f_path.split(kallithea.URL_SEP)[-1]
 

	
 
        response.content_type = file_node.mimetype
 
        return file_node.content
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def raw(self, repo_name, revision, f_path):
 
        cs = self.__get_cs(revision)
 
        file_node = self.__get_filenode(cs, f_path)
 

	
 
        raw_mimetype_mapping = {
 
            # map original mimetype to a mimetype used for "show as raw"
 
            # you can also provide a content-disposition to override the
 
            # default "attachment" disposition.
 
            # orig_type: (new_type, new_dispo)
 

	
 
            # show images inline:
 
            'image/x-icon': ('image/x-icon', 'inline'),
 
            'image/png': ('image/png', 'inline'),
 
            'image/gif': ('image/gif', 'inline'),
 
            'image/jpeg': ('image/jpeg', 'inline'),
 
            'image/svg+xml': ('image/svg+xml', 'inline'),
 
        }
 

	
 
        mimetype = file_node.mimetype
 
        try:
 
            mimetype, dispo = raw_mimetype_mapping[mimetype]
 
        except KeyError:
 
            # we don't know anything special about this, handle it safely
 
            if file_node.is_binary:
 
                # do same as download raw for binary files
 
                mimetype, dispo = 'application/octet-stream', 'attachment'
 
            else:
 
                # do not just use the original mimetype, but force text/plain,
 
                # otherwise it would serve text/html and that might be unsafe.
 
                # Note: underlying vcs library fakes text/plain mimetype if the
 
                # mimetype can not be determined and it thinks it is not
 
                # binary.This might lead to erroneous text display in some
 
                # cases, but helps in other cases, like with text files
 
                # without extension.
 
                mimetype, dispo = 'text/plain', 'inline'
 

	
 
        if dispo == 'attachment':
 
            dispo = 'attachment; filename=%s' % f_path.split(os.sep)[-1]
 

	
 
        response.content_disposition = dispo
 
        response.content_type = mimetype
 
        return file_node.content
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('write')
 
    def delete(self, repo_name, revision, f_path):
 
        repo = c.db_repo
 
        # check if revision is a branch identifier- basically we cannot
 
        # create multiple heads via file editing
 
        _branches = repo.scm_instance.branches
 
        # check if revision is a branch name or branch hash
 
        if revision not in _branches and revision not in _branches.values():
 
            h.flash(_('You can only delete files with revision '
 
                      'being a valid branch'), category='warning')
 
            raise HTTPFound(location=h.url('files_home',
 
                                  repo_name=repo_name, revision='tip',
 
                                  f_path=f_path))
 

	
 
        r_post = request.POST
 

	
 
        c.cs = self.__get_cs(revision)
 
        c.file = self.__get_filenode(c.cs, f_path)
 

	
 
        c.default_message = _('Deleted file %s via Kallithea') % (f_path)
 
        c.f_path = f_path
 
        node_path = f_path
 
        author = request.authuser.full_contact
 

	
 
        if r_post:
 
            message = r_post.get('message') or c.default_message
 

	
 
            try:
 
                nodes = {
 
                    node_path: {
 
                        'content': ''
 
                    }
 
                }
 
                self.scm_model.delete_nodes(
 
                    user=request.authuser.user_id,
 
                    ip_addr=request.ip_addr,
 
                    repo=c.db_repo,
 
                    message=message,
 
                    nodes=nodes,
 
                    parent_cs=c.cs,
 
                    author=author,
 
                )
 

	
 
                h.flash(_('Successfully deleted file %s') % f_path,
 
                        category='success')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during commit'), category='error')
 
            raise HTTPFound(location=url('changeset_home',
 
                                repo_name=c.repo_name, revision='tip'))
 

	
 
        return render('files/files_delete.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('write')
 
    def edit(self, repo_name, revision, f_path):
 
        repo = c.db_repo
 
        # check if revision is a branch identifier- basically we cannot
 
        # create multiple heads via file editing
 
        _branches = repo.scm_instance.branches
 
        # check if revision is a branch name or branch hash
 
        if revision not in _branches and revision not in _branches.values():
 
            h.flash(_('You can only edit files with revision '
 
                      'being a valid branch'), category='warning')
 
            raise HTTPFound(location=h.url('files_home',
 
                                  repo_name=repo_name, revision='tip',
 
                                  f_path=f_path))
 

	
 
        r_post = request.POST
 

	
 
        c.cs = self.__get_cs(revision)
 
        c.file = self.__get_filenode(c.cs, f_path)
 

	
 
        if c.file.is_binary:
 
            raise HTTPFound(location=url('files_home', repo_name=c.repo_name,
 
                            revision=c.cs.raw_id, f_path=f_path))
 
        c.default_message = _('Edited file %s via Kallithea') % (f_path)
 
        c.f_path = f_path
 

	
 
        if r_post:
 
            old_content = safe_str(c.file.content)
 
            sl = old_content.splitlines(1)
 
            first_line = sl[0] if sl else ''
 
            # modes:  0 - Unix, 1 - Mac, 2 - DOS
 
            mode = detect_mode(first_line, 0)
 
            content = convert_line_endings(r_post.get('content', ''), mode)
 

	
 
            message = r_post.get('message') or c.default_message
 
            author = request.authuser.full_contact
 

	
 
            if content == old_content:
 
                h.flash(_('No changes'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            try:
 
                self.scm_model.commit_change(repo=c.db_repo_scm_instance,
 
                                             repo_name=repo_name, cs=c.cs,
 
                                             user=request.authuser.user_id,
 
                                             ip_addr=request.ip_addr,
 
                                             author=author, message=message,
 
                                             content=content, f_path=f_path)
 
                h.flash(_('Successfully committed to %s') % f_path,
 
                        category='success')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during commit'), category='error')
 
            raise HTTPFound(location=url('changeset_home',
 
                                repo_name=c.repo_name, revision='tip'))
 

	
 
        return render('files/files_edit.html')
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('write')
 
    def add(self, repo_name, revision, f_path):
 

	
 
        repo = c.db_repo
 
        r_post = request.POST
 
        c.cs = self.__get_cs(revision, silent_empty=True)
 
        if c.cs is None:
 
            c.cs = EmptyChangeset(alias=c.db_repo_scm_instance.alias)
 
        c.default_message = (_('Added file via Kallithea'))
 
        c.f_path = f_path
 

	
 
        if r_post:
 
            unix_mode = 0
 
            content = convert_line_endings(r_post.get('content', ''), unix_mode)
 

	
 
            message = r_post.get('message') or c.default_message
 
            filename = r_post.get('filename')
 
            location = r_post.get('location', '')
 
            file_obj = r_post.get('upload_file', None)
 

	
 
            if file_obj is not None and hasattr(file_obj, 'filename'):
 
                filename = file_obj.filename
 
                content = file_obj.file
 

	
 
                if hasattr(content, 'file'):
 
                    # non posix systems store real file under file attr
 
                    content = content.file
 

	
 
            if not content:
 
                h.flash(_('No content'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            if not filename:
 
                h.flash(_('No filename'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            # strip all crap out of file, just leave the basename
 
            filename = os.path.basename(filename)
 
            node_path = posixpath.join(location, filename)
 
            author = request.authuser.full_contact
 

	
 
            try:
 
                nodes = {
 
                    node_path: {
 
                        'content': content
 
                    }
 
                }
 
                self.scm_model.create_nodes(
 
                    user=request.authuser.user_id,
 
                    ip_addr=request.ip_addr,
 
                    repo=c.db_repo,
 
                    message=message,
 
                    nodes=nodes,
 
                    parent_cs=c.cs,
 
                    author=author,
 
                )
 

	
 
                h.flash(_('Successfully committed to %s') % node_path,
 
                        category='success')
 
            except NonRelativePathError as e:
 
                h.flash(_('Location must be relative path and must not '
 
                          'contain .. in path'), category='warning')
 
                raise HTTPFound(location=url('changeset_home', repo_name=c.repo_name,
 
                                    revision='tip'))
 
            except (NodeError, NodeAlreadyExistsError) as e:
 
                h.flash(_(e), category='error')
 
            except Exception:
 
                log.error(traceback.format_exc())
 
                h.flash(_('Error occurred during commit'), category='error')
 
            raise HTTPFound(location=url('changeset_home',
 
                                repo_name=c.repo_name, revision='tip'))
 

	
 
        return render('files/files_add.html')
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def archivefile(self, repo_name, fname):
 
        fileformat = None
 
        revision = None
 
        ext = None
 
        subrepos = request.GET.get('subrepos') == 'true'
 

	
 
        for a_type, ext_data in settings.ARCHIVE_SPECS.items():
 
            archive_spec = fname.split(ext_data[1])
 
            if len(archive_spec) == 2 and archive_spec[1] == '':
 
                fileformat = a_type or ext_data[1]
 
                revision = archive_spec[0]
 
                ext = ext_data[1]
 

	
 
        try:
 
            dbrepo = RepoModel().get_by_repo_name(repo_name)
 
            if not dbrepo.enable_downloads:
 
                return _('Downloads disabled') # TODO: do something else?
 

	
 
            if c.db_repo_scm_instance.alias == 'hg':
 
                # patch and reset hooks section of UI config to not run any
 
                # hooks on fetching archives with subrepos
 
                for k, v in c.db_repo_scm_instance._repo.ui.configitems('hooks'):
 
                    c.db_repo_scm_instance._repo.ui.setconfig('hooks', k, None)
 

	
 
            cs = c.db_repo_scm_instance.get_changeset(revision)
 
            content_type = settings.ARCHIVE_SPECS[fileformat][0]
 
        except ChangesetDoesNotExistError:
 
            return _('Unknown revision %s') % revision
 
        except EmptyRepositoryError:
 
            return _('Empty repository')
 
        except (ImproperArchiveTypeError, KeyError):
 
            return _('Unknown archive type')
 

	
 
        rev_name = cs.raw_id[:12]
 
        archive_name = '%s-%s%s' % (repo_name.replace('/', '_'), rev_name, ext)
 

	
 
        archive_path = None
 
        cached_archive_path = None
 
        archive_cache_dir = kallithea.CONFIG.get('archive_cache_dir')
 
        if archive_cache_dir and not subrepos: # TODO: subrepo caching?
 
            if not os.path.isdir(archive_cache_dir):
 
                os.makedirs(archive_cache_dir)
 
            cached_archive_path = os.path.join(archive_cache_dir, archive_name)
 
            if os.path.isfile(cached_archive_path):
 
                log.debug('Found cached archive in %s', cached_archive_path)
 
                archive_path = cached_archive_path
 
            else:
 
                log.debug('Archive %s is not yet cached', archive_name)
 

	
 
        if archive_path is None:
 
            # generate new archive
 
            fd, archive_path = tempfile.mkstemp()
 
            log.debug('Creating new temp archive in %s', archive_path)
 
            with os.fdopen(fd, 'wb') as stream:
 
                cs.fill_archive(stream=stream, kind=fileformat, subrepos=subrepos)
 
                # stream (and thus fd) has been closed by cs.fill_archive
 
            if cached_archive_path is not None:
 
                # we generated the archive - move it to cache
 
                log.debug('Storing new archive in %s', cached_archive_path)
 
                shutil.move(archive_path, cached_archive_path)
 
                archive_path = cached_archive_path
 

	
 
        def get_chunked_archive(archive_path):
 
            stream = open(archive_path, 'rb')
 
            while True:
 
                data = stream.read(16 * 1024)
 
                if not data:
 
                    break
 
                yield data
 
            stream.close()
 
            if archive_path != cached_archive_path:
 
                log.debug('Destroying temp archive %s', archive_path)
 
                os.remove(archive_path)
 

	
 
        action_logger(user=request.authuser,
 
                      action='user_downloaded_archive:%s' % (archive_name),
 
                      repo=repo_name, ipaddr=request.ip_addr, commit=True)
 

	
 
        response.content_disposition = str('attachment; filename=%s' % (archive_name))
 
        response.content_type = str(content_type)
 
        return get_chunked_archive(archive_path)
 

	
 
    @LoginRequired(allow_default_user=True)
 
    @HasRepoPermissionLevelDecorator('read')
 
    def diff(self, repo_name, f_path):
 
        ignore_whitespace_diff = h.get_ignore_whitespace_diff(request.GET)
 
        diff_context_size = h.get_diff_context_size(request.GET)
 
        diff2 = request.GET.get('diff2', '')
 
        diff1 = request.GET.get('diff1', '') or diff2
 
        c.action = request.GET.get('diff')
 
        c.no_changes = diff1 == diff2
 
        c.f_path = f_path
 
        c.big_diff = False
 
        fulldiff = request.GET.get('fulldiff')
 
        c.changes = OrderedDict()
 
        c.changes[diff2] = []
 

	
 
        # special case if we want a show rev only, it's impl here
 
        # to reduce JS and callbacks
 

	
 
        if request.GET.get('show_rev'):
 
            if asbool(request.GET.get('annotate', 'False')):
 
                _url = url('files_annotate_home', repo_name=c.repo_name,
 
                           revision=diff1, f_path=c.f_path)
 
            else:
 
                _url = url('files_home', repo_name=c.repo_name,
 
                           revision=diff1, f_path=c.f_path)
 

	
 
            raise HTTPFound(location=_url)
 
        try:
 
            if diff1 not in ['', None, 'None', '0' * 12, '0' * 40]:
 
                c.changeset_1 = c.db_repo_scm_instance.get_changeset(diff1)
 
                try:
 
                    node1 = c.changeset_1.get_node(f_path)
 
                    if node1.is_dir():
 
                        raise NodeError('%s path is a %s not a file'
 
                                        % (node1, type(node1)))
 
                except NodeDoesNotExistError:
 
                    c.changeset_1 = EmptyChangeset(cs=diff1,
 
                                                   revision=c.changeset_1.revision,
 
                                                   repo=c.db_repo_scm_instance)
 
                    node1 = FileNode(f_path, '', changeset=c.changeset_1)
 
            else:
 
                c.changeset_1 = EmptyChangeset(repo=c.db_repo_scm_instance)
 
                node1 = FileNode(f_path, '', changeset=c.changeset_1)
 

	
 
            if diff2 not in ['', None, 'None', '0' * 12, '0' * 40]:
 
                c.changeset_2 = c.db_repo_scm_instance.get_changeset(diff2)
 
                try:
 
                    node2 = c.changeset_2.get_node(f_path)
 
                    if node2.is_dir():
 
                        raise NodeError('%s path is a %s not a file'
 
                                        % (node2, type(node2)))
 
                except NodeDoesNotExistError:
 
                    c.changeset_2 = EmptyChangeset(cs=diff2,
 
                                                   revision=c.changeset_2.revision,
kallithea/lib/helpers.py
Show inline comments
 
@@ -161,863 +161,861 @@ def jshtml(val):
 
    """
 
    return js(escape(val))
 

	
 

	
 
def shorter(s, size=20, firstline=False, postfix='...'):
 
    """Truncate s to size, including the postfix string if truncating.
 
    If firstline, truncate at newline.
 
    """
 
    if firstline:
 
        s = s.split('\n', 1)[0].rstrip()
 
    if len(s) > size:
 
        return s[:size - len(postfix)] + postfix
 
    return s
 

	
 

	
 
def reset(name, value, id=NotGiven, **attrs):
 
    """Create a reset button, similar to webhelpers2.html.tags.submit ."""
 
    return _input("reset", name, value, id, attrs)
 

	
 

	
 
def select(name, selected_values, options, id=NotGiven, **attrs):
 
    """Convenient wrapper of webhelpers2 to let it accept options as a tuple list"""
 
    if isinstance(options, list):
 
        option_list = options
 
        # Handle old value,label lists ... where value also can be value,label lists
 
        options = Options()
 
        for x in option_list:
 
            if isinstance(x, tuple) and len(x) == 2:
 
                value, label = x
 
            elif isinstance(x, str):
 
                value = label = x
 
            else:
 
                log.error('invalid select option %r', x)
 
                raise
 
            if isinstance(value, list):
 
                og = options.add_optgroup(label)
 
                for x in value:
 
                    if isinstance(x, tuple) and len(x) == 2:
 
                        group_value, group_label = x
 
                    elif isinstance(x, str):
 
                        group_value = group_label = x
 
                    else:
 
                        log.error('invalid select option %r', x)
 
                        raise
 
                    og.add_option(group_label, group_value)
 
            else:
 
                options.add_option(label, value)
 
    return webhelpers2_select(name, selected_values, options, id=id, **attrs)
 

	
 

	
 
safeid = _make_safe_id_component
 

	
 

	
 
def FID(raw_id, path):
 
    """
 
    Creates a unique ID for filenode based on it's hash of path and revision
 
    it's safe to use in urls
 
    """
 
    return 'C-%s-%s' % (short_id(raw_id), hashlib.md5(safe_bytes(path)).hexdigest()[:12])
 

	
 

	
 
def get_ignore_whitespace_diff(GET):
 
    """Return true if URL requested whitespace to be ignored"""
 
    return bool(GET.get('ignorews'))
 

	
 

	
 
def ignore_whitespace_link(GET, anchor=None):
 
    """Return snippet with link to current URL with whitespace ignoring toggled"""
 
    params = dict(GET)  # ignoring duplicates
 
    if get_ignore_whitespace_diff(GET):
 
        params.pop('ignorews')
 
        title = _("Show whitespace changes")
 
    else:
 
        params['ignorews'] = '1'
 
        title = _("Ignore whitespace changes")
 
    params['anchor'] = anchor
 
    return link_to(
 
        literal('<i class="icon-strike"></i>'),
 
        url.current(**params),
 
        title=title,
 
        **{'data-toggle': 'tooltip'})
 

	
 

	
 
def get_diff_context_size(GET):
 
    """Return effective context size requested in URL"""
 
    return safe_int(GET.get('context'), default=3)
 

	
 

	
 
def increase_context_link(GET, anchor=None):
 
    """Return snippet with link to current URL with double context size"""
 
    context = get_diff_context_size(GET) * 2
 
    params = dict(GET)  # ignoring duplicates
 
    params['context'] = str(context)
 
    params['anchor'] = anchor
 
    return link_to(
 
        literal('<i class="icon-sort"></i>'),
 
        url.current(**params),
 
        title=_('Increase diff context to %(num)s lines') % {'num': context},
 
        **{'data-toggle': 'tooltip'})
 

	
 

	
 
class _FilesBreadCrumbs(object):
 

	
 
    def __call__(self, repo_name, rev, paths):
 
        url_l = [link_to(repo_name, url('files_home',
 
                                        repo_name=repo_name,
 
                                        revision=rev, f_path=''),
 
                         class_='ypjax-link')]
 
        paths_l = paths.split('/')
 
        for cnt, p in enumerate(paths_l):
 
            if p != '':
 
                url_l.append(link_to(p,
 
                                     url('files_home',
 
                                         repo_name=repo_name,
 
                                         revision=rev,
 
                                         f_path='/'.join(paths_l[:cnt + 1])
 
                                         ),
 
                                     class_='ypjax-link'
 
                                     )
 
                             )
 

	
 
        return literal('/'.join(url_l))
 

	
 

	
 
files_breadcrumbs = _FilesBreadCrumbs()
 

	
 

	
 
class CodeHtmlFormatter(HtmlFormatter):
 
    """
 
    My code Html Formatter for source codes
 
    """
 

	
 
    def wrap(self, source, outfile):
 
        return self._wrap_div(self._wrap_pre(self._wrap_code(source)))
 

	
 
    def _wrap_code(self, source):
 
        for cnt, it in enumerate(source):
 
            i, t = it
 
            t = '<span id="L%s">%s</span>' % (cnt + 1, t)
 
            yield i, t
 

	
 
    def _wrap_tablelinenos(self, inner):
 
        inner_lines = []
 
        lncount = 0
 
        for t, line in inner:
 
            if t:
 
                lncount += 1
 
            inner_lines.append(line)
 

	
 
        fl = self.linenostart
 
        mw = len(str(lncount + fl - 1))
 
        sp = self.linenospecial
 
        st = self.linenostep
 
        la = self.lineanchors
 
        aln = self.anchorlinenos
 
        nocls = self.noclasses
 
        if sp:
 
            lines = []
 

	
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if i % sp == 0:
 
                        if aln:
 
                            lines.append('<a href="#%s%d" class="special">%*d</a>' %
 
                                         (la, i, mw, i))
 
                        else:
 
                            lines.append('<span class="special">%*d</span>' % (mw, i))
 
                    else:
 
                        if aln:
 
                            lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                        else:
 
                            lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 
        else:
 
            lines = []
 
            for i in range(fl, fl + lncount):
 
                if i % st == 0:
 
                    if aln:
 
                        lines.append('<a href="#%s%d">%*d</a>' % (la, i, mw, i))
 
                    else:
 
                        lines.append('%*d' % (mw, i))
 
                else:
 
                    lines.append('')
 
            ls = '\n'.join(lines)
 

	
 
        # in case you wonder about the seemingly redundant <div> here: since the
 
        # content in the other cell also is wrapped in a div, some browsers in
 
        # some configurations seem to mess up the formatting...
 
        if nocls:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        else:
 
            yield 0, ('<table class="%stable">' % self.cssclass +
 
                      '<tr><td class="linenos"><div class="linenodiv">'
 
                      '<pre>' + ls + '</pre></div></td>'
 
                      '<td id="hlcode" class="code">')
 
        yield 0, ''.join(inner_lines)
 
        yield 0, '</td></tr></table>'
 

	
 

	
 
_whitespace_re = re.compile(r'(\t)|( )(?=\n|</div>)')
 

	
 

	
 
def _markup_whitespace(m):
 
    groups = m.groups()
 
    if groups[0]:
 
        return '<u>\t</u>'
 
    if groups[1]:
 
        return ' <i></i>'
 

	
 

	
 
def markup_whitespace(s):
 
    return _whitespace_re.sub(_markup_whitespace, s)
 

	
 

	
 
def pygmentize(filenode, **kwargs):
 
    """
 
    pygmentize function using pygments
 

	
 
    :param filenode:
 
    """
 
    lexer = get_custom_lexer(filenode.extension) or filenode.lexer
 
    return literal(markup_whitespace(
 
        code_highlight(safe_str(filenode.content), lexer, CodeHtmlFormatter(**kwargs))))
 

	
 

	
 
def hsv_to_rgb(h, s, v):
 
    if s == 0.0:
 
        return v, v, v
 
    i = int(h * 6.0)  # XXX assume int() truncates!
 
    f = (h * 6.0) - i
 
    p = v * (1.0 - s)
 
    q = v * (1.0 - s * f)
 
    t = v * (1.0 - s * (1.0 - f))
 
    i = i % 6
 
    if i == 0:
 
        return v, t, p
 
    if i == 1:
 
        return q, v, p
 
    if i == 2:
 
        return p, v, t
 
    if i == 3:
 
        return p, q, v
 
    if i == 4:
 
        return t, p, v
 
    if i == 5:
 
        return v, p, q
 

	
 

	
 
def gen_color(n=10000):
 
    """generator for getting n of evenly distributed colors using
 
    hsv color and golden ratio. It always return same order of colors
 

	
 
    :returns: RGB tuple
 
    """
 

	
 
    golden_ratio = 0.618033988749895
 
    h = 0.22717784590367374
 

	
 
    for _unused in range(n):
 
        h += golden_ratio
 
        h %= 1
 
        HSV_tuple = [h, 0.95, 0.95]
 
        RGB_tuple = hsv_to_rgb(*HSV_tuple)
 
        yield [str(int(x * 256)) for x in RGB_tuple]
 

	
 

	
 
def pygmentize_annotation(repo_name, filenode, **kwargs):
 
    """
 
    pygmentize function for annotation
 

	
 
    :param filenode:
 
    """
 
    cgenerator = gen_color()
 
    color_dict = {}
 

	
 
    def get_color_string(cs):
 
        if cs in color_dict:
 
            col = color_dict[cs]
 
        else:
 
            col = color_dict[cs] = next(cgenerator)
 
        return "color: rgb(%s)! important;" % (', '.join(col))
 

	
 
    def url_func(changeset):
 
        author = escape(changeset.author)
 
        date = changeset.date
 
        message = escape(changeset.message)
 
        tooltip_html = ("<b>Author:</b> %s<br/>"
 
                        "<b>Date:</b> %s</b><br/>"
 
                        "<b>Message:</b> %s") % (author, date, message)
 

	
 
        lnk_format = show_id(changeset)
 
        uri = link_to(
 
                lnk_format,
 
                url('changeset_home', repo_name=repo_name,
 
                    revision=changeset.raw_id),
 
                style=get_color_string(changeset.raw_id),
 
                **{'data-toggle': 'popover',
 
                   'data-content': tooltip_html}
 
              )
 

	
 
        uri += '\n'
 
        return uri
 

	
 
    return literal(markup_whitespace(annotate_highlight(filenode, url_func, **kwargs)))
 

	
 

	
 
class _Message(object):
 
    """A message returned by ``pop_flash_messages()``.
 

	
 
    Converting the message to a string returns the message text. Instances
 
    also have the following attributes:
 

	
 
    * ``category``: the category specified when the message was created.
 
    * ``message``: the html-safe message text.
 
    """
 

	
 
    def __init__(self, category, message):
 
        self.category = category
 
        self.message = message
 

	
 

	
 
def _session_flash_messages(append=None, clear=False):
 
    """Manage a message queue in tg.session: return the current message queue
 
    after appending the given message, and possibly clearing the queue."""
 
    key = 'flash'
 
    if key in session:
 
        flash_messages = session[key]
 
    else:
 
        if append is None:  # common fast path - also used for clearing empty queue
 
            return []  # don't bother saving
 
        flash_messages = []
 
        session[key] = flash_messages
 
    if append is not None and append not in flash_messages:
 
        flash_messages.append(append)
 
    if clear:
 
        session.pop(key, None)
 
    session.save()
 
    return flash_messages
 

	
 

	
 
def flash(message, category, logf=None):
 
    """
 
    Show a message to the user _and_ log it through the specified function
 

	
 
    category: notice (default), warning, error, success
 
    logf: a custom log function - such as log.debug
 

	
 
    logf defaults to log.info, unless category equals 'success', in which
 
    case logf defaults to log.debug.
 
    """
 
    assert category in ('error', 'success', 'warning'), category
 
    if hasattr(message, '__html__'):
 
        # render to HTML for storing in cookie
 
        safe_message = str(message)
 
    else:
 
        # Apply str - the message might be an exception with __str__
 
        # Escape, so we can trust the result without further escaping, without any risk of injection
 
        safe_message = html_escape(str(message))
 
    if logf is None:
 
        logf = log.info
 
        if category == 'success':
 
            logf = log.debug
 

	
 
    logf('Flash %s: %s', category, safe_message)
 

	
 
    _session_flash_messages(append=(category, safe_message))
 

	
 

	
 
def pop_flash_messages():
 
    """Return all accumulated messages and delete them from the session.
 

	
 
    The return value is a list of ``Message`` objects.
 
    """
 
    return [_Message(category, message) for category, message in _session_flash_messages(clear=True)]
 

	
 

	
 
def capitalize(x):
 
    return x.capitalize()
 

	
 
email = author_email
 

	
 
def short_id(x):
 
    return x[:12]
 

	
 
def hide_credentials(x):
 
    return ''.join(credentials_filter(x))
 

	
 

	
 
def show_id(cs):
 
    """
 
    Configurable function that shows ID
 
    by default it's r123:fffeeefffeee
 

	
 
    :param cs: changeset instance
 
    """
 
    def_len = safe_int(kallithea.CONFIG.get('show_sha_length', 12))
 
    show_rev = asbool(kallithea.CONFIG.get('show_revision_number', False))
 

	
 
    raw_id = cs.raw_id[:def_len]
 
    if show_rev:
 
        return 'r%s:%s' % (cs.revision, raw_id)
 
    else:
 
        return raw_id
 

	
 

	
 
def fmt_date(date):
 
    if date:
 
        return date.strftime("%Y-%m-%d %H:%M:%S")
 
    return ""
 

	
 

	
 
def is_git(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'git'
 

	
 

	
 
def is_hg(repository):
 
    if hasattr(repository, 'alias'):
 
        _type = repository.alias
 
    elif hasattr(repository, 'repo_type'):
 
        _type = repository.repo_type
 
    else:
 
        _type = repository
 
    return _type == 'hg'
 

	
 

	
 
@cache_region('long_term', 'user_attr_or_none')
 
def user_attr_or_none(author, show_attr):
 
    """Try to match email part of VCS committer string with a local user and return show_attr
 
    - or return None if user not found"""
 
    email = author_email(author)
 
    if email:
 
        user = db.User.get_by_email(email)
 
        if user is not None:
 
            return getattr(user, show_attr)
 
    return None
 

	
 

	
 
def email_or_none(author):
 
    """Try to match email part of VCS committer string with a local user.
 
    Return primary email of user, email part of the specified author name, or None."""
 
    if not author:
 
        return None
 
    email = user_attr_or_none(author, 'email')
 
    if email is not None:
 
        return email # always use user's main email address - not necessarily the one used to find user
 

	
 
    # extract email from the commit string
 
    email = author_email(author)
 
    if email:
 
        return email
 

	
 
    # No valid email, not a valid user in the system, none!
 
    return None
 

	
 

	
 
def person(author, show_attr="username"):
 
    """Find the user identified by 'author', return one of the users attributes,
 
    default to the username attribute, None if there is no user"""
 
    # if author is already an instance use it for extraction
 
    if isinstance(author, db.User):
 
        return getattr(author, show_attr)
 

	
 
    value = user_attr_or_none(author, show_attr)
 
    if value is not None:
 
        return value
 

	
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return author_name(author) or email(author)
 
    return author_name(author) or author_email(author)
 

	
 

	
 
def person_by_id(id_, show_attr="username"):
 
    # maybe it's an ID ?
 
    if str(id_).isdigit() or isinstance(id_, int):
 
        id_ = int(id_)
 
        user = db.User.get(id_)
 
        if user is not None:
 
            return getattr(user, show_attr)
 
    return id_
 

	
 

	
 
def boolicon(value):
 
    """Returns boolean value of a value, represented as small html image of true/false
 
    icons
 

	
 
    :param value: value
 
    """
 

	
 
    if value:
 
        return HTML.tag('i', class_="icon-ok")
 
    else:
 
        return HTML.tag('i', class_="icon-minus-circled")
 

	
 

	
 
def action_parser(user_log, feed=False, parse_cs=False):
 
    """
 
    This helper will action_map the specified string action into translated
 
    fancy names with icons and links
 

	
 
    :param user_log: user log instance
 
    :param feed: use output for feeds (no html and fancy icons)
 
    :param parse_cs: parse Changesets into VCS instances
 
    """
 

	
 
    action = user_log.action
 
    action_params = ' '
 

	
 
    x = action.split(':')
 

	
 
    if len(x) > 1:
 
        action, action_params = x
 

	
 
    def get_cs_links():
 
        revs_limit = 3  # display this amount always
 
        revs_top_limit = 50  # show upto this amount of changesets hidden
 
        revs_ids = action_params.split(',')
 
        deleted = user_log.repository is None
 
        if deleted:
 
            return ','.join(revs_ids)
 

	
 
        repo_name = user_log.repository.repo_name
 

	
 
        def lnk(rev, repo_name):
 
            lazy_cs = False
 
            title_ = None
 
            url_ = '#'
 
            if isinstance(rev, BaseChangeset) or isinstance(rev, AttributeDict):
 
                if rev.op and rev.ref_name:
 
                    if rev.op == 'delete_branch':
 
                        lbl = _('Deleted branch: %s') % rev.ref_name
 
                    elif rev.op == 'tag':
 
                        lbl = _('Created tag: %s') % rev.ref_name
 
                    else:
 
                        lbl = 'Unknown operation %s' % rev.op
 
                else:
 
                    lazy_cs = True
 
                    lbl = rev.short_id[:8]
 
                    url_ = url('changeset_home', repo_name=repo_name,
 
                               revision=rev.raw_id)
 
            else:
 
                # changeset cannot be found - it might have been stripped or removed
 
                lbl = rev[:12]
 
                title_ = _('Changeset %s not found') % lbl
 
            if parse_cs:
 
                return link_to(lbl, url_, title=title_, **{'data-toggle': 'tooltip'})
 
            return link_to(lbl, url_, class_='lazy-cs' if lazy_cs else '',
 
                           **{'data-raw_id': rev.raw_id, 'data-repo_name': repo_name})
 

	
 
        def _get_op(rev_txt):
 
            _op = None
 
            _name = rev_txt
 
            if len(rev_txt.split('=>')) == 2:
 
                _op, _name = rev_txt.split('=>')
 
            return _op, _name
 

	
 
        revs = []
 
        if len([v for v in revs_ids if v != '']) > 0:
 
            repo = None
 
            for rev in revs_ids[:revs_top_limit]:
 
                _op, _name = _get_op(rev)
 

	
 
                # we want parsed changesets, or new log store format is bad
 
                if parse_cs:
 
                    try:
 
                        if repo is None:
 
                            repo = user_log.repository.scm_instance
 
                        _rev = repo.get_changeset(rev)
 
                        revs.append(_rev)
 
                    except ChangesetDoesNotExistError:
 
                        log.error('cannot find revision %s in this repo', rev)
 
                        revs.append(rev)
 
                else:
 
                    _rev = AttributeDict({
 
                        'short_id': rev[:12],
 
                        'raw_id': rev,
 
                        'message': '',
 
                        'op': _op,
 
                        'ref_name': _name
 
                    })
 
                    revs.append(_rev)
 
        cs_links = [" " + ', '.join(
 
            [lnk(rev, repo_name) for rev in revs[:revs_limit]]
 
        )]
 
        _op1, _name1 = _get_op(revs_ids[0])
 
        _op2, _name2 = _get_op(revs_ids[-1])
 

	
 
        _rev = '%s...%s' % (_name1, _name2)
 

	
 
        compare_view = (
 
            ' <div class="compare_view" data-toggle="tooltip" title="%s">'
 
            '<a href="%s">%s</a> </div>' % (
 
                _('Show all combined changesets %s->%s') % (
 
                    revs_ids[0][:12], revs_ids[-1][:12]
 
                ),
 
                url('changeset_home', repo_name=repo_name,
 
                    revision=_rev
 
                ),
 
                _('Compare view')
 
            )
 
        )
 

	
 
        # if we have exactly one more than normally displayed
 
        # just display it, takes less space than displaying
 
        # "and 1 more revisions"
 
        if len(revs_ids) == revs_limit + 1:
 
            cs_links.append(", " + lnk(revs[revs_limit], repo_name))
 

	
 
        # hidden-by-default ones
 
        if len(revs_ids) > revs_limit + 1:
 
            uniq_id = revs_ids[0]
 
            html_tmpl = (
 
                '<span> %s <a class="show_more" id="_%s" '
 
                'href="#more">%s</a> %s</span>'
 
            )
 
            if not feed:
 
                cs_links.append(html_tmpl % (
 
                      _('and'),
 
                      uniq_id, _('%s more') % (len(revs_ids) - revs_limit),
 
                      _('revisions')
 
                    )
 
                )
 

	
 
            if not feed:
 
                html_tmpl = '<span id="%s" style="display:none">, %s </span>'
 
            else:
 
                html_tmpl = '<span id="%s"> %s </span>'
 

	
 
            morelinks = ', '.join(
 
              [lnk(rev, repo_name) for rev in revs[revs_limit:]]
 
            )
 

	
 
            if len(revs_ids) > revs_top_limit:
 
                morelinks += ', ...'
 

	
 
            cs_links.append(html_tmpl % (uniq_id, morelinks))
 
        if len(revs) > 1:
 
            cs_links.append(compare_view)
 
        return ''.join(cs_links)
 

	
 
    def get_fork_name():
 
        repo_name = action_params
 
        url_ = url('summary_home', repo_name=repo_name)
 
        return _('Fork name %s') % link_to(action_params, url_)
 

	
 
    def get_user_name():
 
        user_name = action_params
 
        return user_name
 

	
 
    def get_users_group():
 
        group_name = action_params
 
        return group_name
 

	
 
    def get_pull_request():
 
        pull_request_id = action_params
 
        nice_id = db.PullRequest.make_nice_id(pull_request_id)
 

	
 
        deleted = user_log.repository is None
 
        if deleted:
 
            repo_name = user_log.repository_name
 
        else:
 
            repo_name = user_log.repository.repo_name
 

	
 
        return link_to(_('Pull request %s') % nice_id,
 
                    url('pullrequest_show', repo_name=repo_name,
 
                    pull_request_id=pull_request_id))
 

	
 
    def get_archive_name():
 
        archive_name = action_params
 
        return archive_name
 

	
 
    # action : translated str, callback(extractor), icon
 
    action_map = {
 
        'user_deleted_repo':           (_('[deleted] repository'),
 
                                        None, 'icon-trashcan'),
 
        'user_created_repo':           (_('[created] repository'),
 
                                        None, 'icon-plus'),
 
        'user_created_fork':           (_('[created] repository as fork'),
 
                                        None, 'icon-fork'),
 
        'user_forked_repo':            (_('[forked] repository'),
 
                                        get_fork_name, 'icon-fork'),
 
        'user_updated_repo':           (_('[updated] repository'),
 
                                        None, 'icon-pencil'),
 
        'user_downloaded_archive':      (_('[downloaded] archive from repository'),
 
                                        get_archive_name, 'icon-download-cloud'),
 
        'admin_deleted_repo':          (_('[delete] repository'),
 
                                        None, 'icon-trashcan'),
 
        'admin_created_repo':          (_('[created] repository'),
 
                                        None, 'icon-plus'),
 
        'admin_forked_repo':           (_('[forked] repository'),
 
                                        None, 'icon-fork'),
 
        'admin_updated_repo':          (_('[updated] repository'),
 
                                        None, 'icon-pencil'),
 
        'admin_created_user':          (_('[created] user'),
 
                                        get_user_name, 'icon-user'),
 
        'admin_updated_user':          (_('[updated] user'),
 
                                        get_user_name, 'icon-user'),
 
        'admin_created_users_group':   (_('[created] user group'),
 
                                        get_users_group, 'icon-pencil'),
 
        'admin_updated_users_group':   (_('[updated] user group'),
 
                                        get_users_group, 'icon-pencil'),
 
        'user_commented_revision':     (_('[commented] on revision in repository'),
 
                                        get_cs_links, 'icon-comment'),
 
        'user_commented_pull_request': (_('[commented] on pull request for'),
 
                                        get_pull_request, 'icon-comment'),
 
        'user_closed_pull_request':    (_('[closed] pull request for'),
 
                                        get_pull_request, 'icon-ok'),
 
        'push':                        (_('[pushed] into'),
 
                                        get_cs_links, 'icon-move-up'),
 
        'push_local':                  (_('[committed via Kallithea] into repository'),
 
                                        get_cs_links, 'icon-pencil'),
 
        'push_remote':                 (_('[pulled from remote] into repository'),
 
                                        get_cs_links, 'icon-move-up'),
 
        'pull':                        (_('[pulled] from'),
 
                                        None, 'icon-move-down'),
 
        'started_following_repo':      (_('[started following] repository'),
 
                                        None, 'icon-heart'),
 
        'stopped_following_repo':      (_('[stopped following] repository'),
 
                                        None, 'icon-heart-empty'),
 
    }
 

	
 
    action_str = action_map.get(action, action)
 
    if feed:
 
        action = action_str[0].replace('[', '').replace(']', '')
 
    else:
 
        action = action_str[0] \
 
            .replace('[', '<b>') \
 
            .replace(']', '</b>')
 

	
 
    action_params_func = action_str[1] if callable(action_str[1]) else (lambda: "")
 

	
 
    def action_parser_icon():
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        html = """<i class="%s"></i>""" % ico
 
        return literal(html)
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 
def gravatar_div(email_address, cls='', size=30, **div_attributes):
 
    """Return an html literal with a span around a gravatar if they are enabled.
 
    Extra keyword parameters starting with 'div_' will get the prefix removed
 
    and '_' changed to '-' and be used as attributes on the div. The default
 
    class is 'gravatar'.
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 
    if 'div_class' not in div_attributes:
 
        div_attributes['div_class'] = "gravatar"
 
    attributes = []
 
    for k, v in sorted(div_attributes.items()):
 
        assert k.startswith('div_'), k
 
        attributes.append(' %s="%s"' % (k[4:].replace('_', '-'), escape(v)))
 
    return literal("""<span%s>%s</span>""" %
 
                   (''.join(attributes),
 
                    gravatar(email_address, cls=cls, size=size)))
 

	
 

	
 
def gravatar(email_address, cls='', size=30):
 
    """return html element of the gravatar
 

	
 
    This method will return an <img> with the resolution double the size (for
 
    retina screens) of the image. If the url returned from gravatar_url is
 
    empty then we fallback to using an icon.
 

	
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 

	
 
    src = gravatar_url(email_address, size * 2)
 

	
 
    if src:
 
        # here it makes sense to use style="width: ..." (instead of, say, a
 
        # stylesheet) because we using this to generate a high-res (retina) size
 
        html = ('<i class="icon-gravatar {cls}"'
 
                ' style="font-size: {size}px;background-size: {size}px;background-image: url(\'{src}\')"'
 
                '></i>').format(cls=cls, size=size, src=src)
 

	
 
    else:
 
        # if src is empty then there was no gravatar, so we use a font icon
 
        html = ("""<i class="icon-user {cls}" style="font-size: {size}px;"></i>"""
 
            .format(cls=cls, size=size))
 

	
 
    return literal(html)
 

	
 

	
 
def gravatar_url(email_address, size=30, default=''):
 
    from tg import tmpl_context as c
 

	
 
    if not c.visual.use_gravatar:
 
        return ""
 

	
 
    _def = 'anonymous@kallithea-scm.org'  # default gravatar
 
    email_address = email_address or _def
 

	
 
    if email_address == _def:
 
        return default
 

	
 
    parsed_url = urllib.parse.urlparse(url.current(qualified=True))
 
    return (c.visual.gravatar_url or db.User.DEFAULT_GRAVATAR_URL) \
 
               .replace('{email}', email_address) \
 
               .replace('{md5email}', hashlib.md5(safe_bytes(email_address).lower()).hexdigest()) \
 
               .replace('{netloc}', parsed_url.netloc) \
 
               .replace('{scheme}', parsed_url.scheme) \
 
               .replace('{size}', str(size))
 

	
 

	
 
def changed_tooltip(nodes):
 
    """
 
    Generates a html string for changed nodes in changeset page.
 
    It limits the output to 30 entries
 

	
 
    :param nodes: LazyNodesGenerator
 
    """
 
    if nodes:
 
        pref = ': <br/> '
 
        suf = ''
 
        if len(nodes) > 30:
 
            suf = '<br/>' + _(' and %s more') % (len(nodes) - 30)
 
        return literal(pref + '<br/> '.join([x.path
 
                                             for x in nodes[:30]]) + suf)
 
    else:
 
        return ': ' + _('No files')
 

	
 

	
 
def fancy_file_stats(stats):
 
    """
 
    Displays a fancy two colored bar for number of added/deleted
 
    lines of code on file
 

	
 
    :param stats: two element list of added/deleted lines of code
 
    """
 

	
 
    a, d = stats['added'], stats['deleted']
 
    width = 100
 

	
 
    if stats['binary']:
 
        # binary mode
 
        lbl = ''
 
        bin_op = 1
 

	
kallithea/model/db.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import logging
 
import os
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json, ssh
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import asbool, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, urlreadable
 
from kallithea.lib.vcs import get_backend
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model import meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        # Note: not a normal dict - iterator gives "users.firstname", but keys gives "firstname"
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
        return dict with keys and values corresponding
 
        to this model data """
 

	
 
        d = {}
 
        for k in self._get_keys():
 
            d[k] = getattr(self, k)
 

	
 
        # also use __json__() if present to get additional fields
 
        _json_attr = getattr(self, '__json__', None)
 
        if _json_attr:
 
            # update with attributes from __json__
 
            if callable(_json_attr):
 
                _json_attr = _json_attr()
 
            for k, val in _json_attr.items():
 
                d[k] = val
 
        return d
 

	
 
    def get_appstruct(self):
 
        """return list with keys and values tuples corresponding
 
        to this model data """
 

	
 
        return [
 
            (k, getattr(self, k))
 
            for k in self._get_keys()
 
        ]
 

	
 
    def populate_obj(self, populate_dict):
 
        """populate model with data from given populate_dict"""
 

	
 
        for k in self._get_keys():
 
            if k in populate_dict:
 
                setattr(self, k, populate_dict[k])
 

	
 
    @classmethod
 
    def query(cls):
 
        return meta.Session().query(cls)
 

	
 
    @classmethod
 
    def get(cls, id_):
 
        if id_:
 
            return cls.query().get(id_)
 

	
 
    @classmethod
 
    def guess_instance(cls, value, callback=None):
 
        """Haphazardly attempt to convert `value` to a `cls` instance.
 

	
 
        If `value` is None or already a `cls` instance, return it. If `value`
 
        is a number (or looks like one if you squint just right), assume it's
 
        a database primary key and let SQLAlchemy sort things out. Otherwise,
 
        fall back to resolving it using `callback` (if specified); this could
 
        e.g. be a function that looks up instances by name (though that won't
 
        work if the name begins with a digit). Otherwise, raise Exception.
 
        """
 

	
 
        if value is None:
 
            return None
 
        if isinstance(value, cls):
 
            return value
 
        if isinstance(value, int):
 
            return cls.get(value)
 
        if isinstance(value, str) and value.isdigit():
 
            return cls.get(int(value))
 
        if callback is not None:
 
            return callback(value)
 

	
 
        raise Exception(
 
            'given object must be int, long or Instance of %s '
 
            'got %s, no callback provided' % (cls, type(value))
 
        )
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        try:
 
            id_ = int(id_)
 
        except (TypeError, ValueError):
 
            raise HTTPNotFound
 

	
 
        res = cls.query().get(id_)
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def delete(cls, id_):
 
        obj = cls.query().get(id_)
 
        meta.Session().delete(obj)
 

	
 
    def __repr__(self):
 
        return '<DB:%s>' % (self.__class__.__name__)
 

	
 

	
 
_table_args_default_dict = {'extend_existing': True,
 
                            'mysql_engine': 'InnoDB',
 
                            'sqlite_autoincrement': True,
 
                           }
 

	
 
class Setting(meta.Base, BaseDbModel):
 
    __tablename__ = 'settings'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    SETTINGS_TYPES = {
 
        'str': safe_bytes,
 
        'int': safe_int,
 
        'unicode': safe_str,
 
        'bool': asbool,
 
        'list': functools.partial(aslist, sep=',')
 
    }
 

	
 
    app_settings_id = Column(Integer(), primary_key=True)
 
    app_settings_name = Column(String(255), nullable=False, unique=True)
 
    _app_settings_value = Column("app_settings_value", Unicode(4096), nullable=False)
 
    _app_settings_type = Column("app_settings_type", String(255), nullable=True) # FIXME: not nullable?
 

	
 
    def __init__(self, key='', val='', type='unicode'):
 
        self.app_settings_name = key
 
        self.app_settings_value = val
 
        self.app_settings_type = type
 

	
 
    @validates('_app_settings_value')
 
    def validate_settings_value(self, key, val):
 
        assert isinstance(val, str)
 
        return val
 

	
 
    @hybrid_property
 
    def app_settings_value(self):
 
        v = self._app_settings_value
 
        _type = self.app_settings_type
 
        converter = self.SETTINGS_TYPES.get(_type) or self.SETTINGS_TYPES['unicode']
 
        return converter(v)
 

	
 
    @app_settings_value.setter
 
    def app_settings_value(self, val):
 
        """
 
        Setter that will always make sure we use str in app_settings_value
 
        """
 
        self._app_settings_value = safe_str(val)
 

	
 
    @hybrid_property
 
    def app_settings_type(self):
 
        return self._app_settings_type
 

	
 
    @app_settings_type.setter
 
    def app_settings_type(self, val):
 
        if val not in self.SETTINGS_TYPES:
 
            raise Exception('type must be one of %s got %s'
 
                            % (list(self.SETTINGS_TYPES), val))
 
        self._app_settings_type = val
 

	
 
    def __repr__(self):
 
        return "<%s %s.%s=%r>" % (
 
            self.__class__.__name__,
 
            self.app_settings_name, self.app_settings_type, self.app_settings_value
 
        )
 

	
 
    @classmethod
 
    def get_by_name(cls, key):
 
        return cls.query() \
 
            .filter(cls.app_settings_name == key).scalar()
 

	
 
    @classmethod
 
    def get_by_name_or_create(cls, key, val='', type='unicode'):
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            res = cls(key, val, type)
 
        return res
 

	
 
    @classmethod
 
    def create_or_update(cls, key, val=None, type=None):
 
        """
 
        Creates or updates Kallithea setting. If updates are triggered, it will only
 
        update parameters that are explicitly set. 'None' values will be skipped.
 

	
 
        :param key:
 
        :param val:
 
        :param type:
 
        :return:
 
        """
 
        res = cls.get_by_name(key)
 
        if res is None:
 
            # new setting
 
            val = val if val is not None else ''
 
            type = type if type is not None else 'unicode'
 
            res = cls(key, val, type)
 
            meta.Session().add(res)
 
        else:
 
            if val is not None:
 
                # update if set
 
                res.app_settings_value = val
 
            if type is not None:
 
                # update if set
 
                res.app_settings_type = type
 
        return res
 

	
 
    @classmethod
 
    def get_app_settings(cls):
 

	
 
        ret = cls.query()
 
        if ret is None:
 
            raise Exception('Could not get application settings !')
 
        settings = {}
 
        for each in ret:
 
            settings[each.app_settings_name] = \
 
                each.app_settings_value
 

	
 
        return settings
 

	
 
    @classmethod
 
    def get_auth_settings(cls):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('auth_')).all()
 
        fd = {}
 
        for row in ret:
 
            fd[row.app_settings_name] = row.app_settings_value
 
        return fd
 

	
 
    @classmethod
 
    def get_default_repo_settings(cls, strip_prefix=False):
 
        ret = cls.query() \
 
                .filter(cls.app_settings_name.startswith('default_')).all()
 
        fd = {}
 
        for row in ret:
 
            key = row.app_settings_name
 
            if strip_prefix:
 
                key = remove_prefix(key, prefix='default_')
 
            fd.update({key: row.app_settings_value})
 

	
 
        return fd
 

	
 
    @classmethod
 
    def get_server_info(cls):
 
        import platform
 

	
 
        import pkg_resources
 

	
 
        from kallithea.lib.utils import check_git_version
 
        mods = [(p.project_name, p.version) for p in pkg_resources.working_set]
 
        info = {
 
            'modules': sorted(mods, key=lambda k: k[0].lower()),
 
            'py_version': platform.python_version(),
 
            'platform': platform.platform(),
 
            'kallithea_version': kallithea.__version__,
 
            'git_version': str(check_git_version()),
 
            'git_path': kallithea.CONFIG.get('git_path')
 
        }
 
        return info
 

	
 

	
 
class Ui(meta.Base, BaseDbModel):
 
    __tablename__ = 'ui'
 
    __table_args__ = (
 
        Index('ui_ui_section_ui_key_idx', 'ui_section', 'ui_key'),
 
        UniqueConstraint('ui_section', 'ui_key'),
 
        _table_args_default_dict,
 
    )
 

	
 
    HOOK_UPDATE = 'changegroup.update'
 
    HOOK_REPO_SIZE = 'changegroup.repo_size'
 

	
 
    ui_id = Column(Integer(), primary_key=True)
 
    ui_section = Column(String(255), nullable=False)
 
    ui_key = Column(String(255), nullable=False)
 
    ui_value = Column(String(255), nullable=True) # FIXME: not nullable?
 
    ui_active = Column(Boolean(), nullable=False, default=True)
 

	
 
    @classmethod
 
    def get_by_key(cls, section, key):
 
        """ Return specified Ui object, or None if not found. """
 
        return cls.query().filter_by(ui_section=section, ui_key=key).scalar()
 

	
 
    @classmethod
 
    def get_or_create(cls, section, key):
 
        """ Return specified Ui object, creating it if necessary. """
 
        setting = cls.get_by_key(section, key)
 
        if setting is None:
 
            setting = cls(ui_section=section, ui_key=key)
 
            meta.Session().add(setting)
 
        return setting
 

	
 
    @classmethod
 
    def get_builtin_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.order_by(cls.ui_section, cls.ui_key)
 
        return q.all()
 

	
 
    @classmethod
 
    def get_custom_hooks(cls):
 
        q = cls.query()
 
        q = q.filter(~cls.ui_key.in_([cls.HOOK_UPDATE, cls.HOOK_REPO_SIZE]))
 
        q = q.filter(cls.ui_section == 'hooks')
 
        q = q.order_by(cls.ui_section, cls.ui_key)
 
        return q.all()
 

	
 
    @classmethod
 
    def get_repos_location(cls):
 
        return cls.get_by_key('paths', '/').ui_value
 

	
 
    @classmethod
 
    def create_or_update_hook(cls, key, val):
 
        new_ui = cls.get_or_create('hooks', key)
 
        new_ui.ui_active = True
 
        new_ui.ui_value = val
 

	
 
    def __repr__(self):
 
        return '<%s %s.%s=%r>' % (
 
            self.__class__.__name__,
 
            self.ui_section, self.ui_key, self.ui_value)
 

	
 

	
 
class User(meta.Base, BaseDbModel):
 
    __tablename__ = 'users'
 
    __table_args__ = (
 
        Index('u_username_idx', 'username'),
 
        Index('u_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_USER_NAME = 'default'
 
    DEFAULT_GRAVATAR_URL = 'https://secure.gravatar.com/avatar/{md5email}?d=identicon&s={size}'
 
    # The name of the default auth type in extern_type, 'internal' lives in auth_internal.py
 
    DEFAULT_AUTH_TYPE = 'internal'
 

	
 
    user_id = Column(Integer(), primary_key=True)
 
    username = Column(String(255), nullable=False, unique=True)
 
    password = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    admin = Column(Boolean(), nullable=False, default=False)
 
    name = Column("firstname", Unicode(255), nullable=False)
 
    lastname = Column(Unicode(255), nullable=False)
 
    _email = Column("email", String(255), nullable=True, unique=True) # FIXME: not nullable?
 
    last_login = Column(DateTime(timezone=False), nullable=True)
 
    extern_type = Column(String(255), nullable=True) # FIXME: not nullable?
 
    extern_name = Column(String(255), nullable=True) # FIXME: not nullable?
 
    api_key = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _user_data = Column("user_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    user_log = relationship('UserLog')
 
    user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
 

	
 
    repositories = relationship('Repository')
 
    repo_groups = relationship('RepoGroup')
 
    user_groups = relationship('UserGroup')
 
    user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
 
    followings = relationship('UserFollowing', primaryjoin='UserFollowing.user_id==User.user_id', cascade='all')
 

	
 
    repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
 
    repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
 

	
 
    group_member = relationship('UserGroupMember', cascade='all')
 

	
 
    # comments created by this user
 
    user_comments = relationship('ChangesetComment', cascade='all')
 
    # extra emails for this user
 
    user_emails = relationship('UserEmailMap', cascade='all')
 
    # extra API keys
 
    user_api_keys = relationship('UserApiKeys', cascade='all')
 
    ssh_keys = relationship('UserSshKeys', cascade='all')
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 
    @property
 
    def firstname(self):
 
        # alias for future
 
        return self.name
 

	
 
    @property
 
    def emails(self):
 
        other = UserEmailMap.query().filter(UserEmailMap.user == self).all()
 
        return [self.email] + [x.email for x in other]
 

	
 
    @property
 
    def api_keys(self):
 
        other = UserApiKeys.query().filter(UserApiKeys.user == self).all()
 
        return [self.api_key] + [x.api_key for x in other]
 

	
 
    @property
 
    def ip_addresses(self):
 
        ret = UserIpMap.query().filter(UserIpMap.user == self).all()
 
        return [x.ip_addr for x in ret]
 

	
 
    @property
 
    def full_name(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def full_name_or_username(self):
 
        """
 
        Show full name.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s' % (self.firstname, self.lastname)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_name_and_username(self):
 
        """
 
        Show full name and username as 'Firstname Lastname (username)'.
 
        If full name is not set, fall back to username.
 
        """
 
        return ('%s %s (%s)' % (self.firstname, self.lastname, self.username)
 
                if (self.firstname and self.lastname) else self.username)
 

	
 
    @property
 
    def full_contact(self):
 
        return '%s %s <%s>' % (self.firstname, self.lastname, self.email)
 

	
 
    @property
 
    def short_contact(self):
 
        return '%s %s' % (self.firstname, self.lastname)
 

	
 
    @property
 
    def is_admin(self):
 
        return self.admin
 

	
 
    @hybrid_property
 
    def is_default_user(self):
 
        return self.username == User.DEFAULT_USER_NAME
 

	
 
    @hybrid_property
 
    def user_data(self):
 
        if not self._user_data:
 
            return {}
 

	
 
        try:
 
            return ext_json.loads(self._user_data)
 
        except TypeError:
 
            return {}
 

	
 
    @user_data.setter
 
    def user_data(self, val):
 
        try:
 
            self._user_data = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r>" % (self.__class__.__name__, self.user_id, self.username)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(User, cls).guess_instance(value, User.get_by_username)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_, allow_default=True):
 
        '''
 
        Overridden version of BaseDbModel.get_or_404, with an extra check on
 
        the default user.
 
        '''
 
        user = super(User, cls).get_or_404(id_)
 
        if not allow_default and user.is_default_user:
 
            raise DefaultUserException()
 
        return user
 

	
 
    @classmethod
 
    def get_by_username_or_email(cls, username_or_email, case_insensitive=True):
 
        """
 
        For anything that looks like an email address, look up by the email address (matching
 
        case insensitively).
 
        For anything else, try to look up by the user name.
 

	
 
        This assumes no normal username can have '@' symbol.
 
        """
 
        if '@' in username_or_email:
 
            return User.get_by_email(username_or_email)
 
        else:
 
            return User.get_by_username(username_or_email, case_insensitive=case_insensitive)
 

	
 
    @classmethod
 
    def get_by_username(cls, username, case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(sqlalchemy.func.lower(cls.username) == sqlalchemy.func.lower(username))
 
        else:
 
            q = cls.query().filter(cls.username == username)
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get_by_api_key(cls, api_key, fallback=True):
 
        if len(api_key) != 40 or not api_key.isalnum():
 
            return None
 

	
 
        q = cls.query().filter(cls.api_key == api_key)
 
        res = q.scalar()
 

	
 
        if fallback and not res:
 
            # fallback to additional keys
 
            _res = UserApiKeys.query().filter_by(api_key=api_key, is_expired=False).first()
 
            if _res:
 
                res = _res.user
 
        if res is None or not res.active or res.is_default_user:
 
            return None
 
        return res
 

	
 
    @classmethod
 
    def get_by_email(cls, email, cache=False):
 
        q = cls.query().filter(sqlalchemy.func.lower(cls.email) == sqlalchemy.func.lower(email))
 
        ret = q.scalar()
 
        if ret is None:
 
            q = UserEmailMap.query()
 
            # try fetching in alternate email map
 
            q = q.filter(sqlalchemy.func.lower(UserEmailMap.email) == sqlalchemy.func.lower(email))
 
            q = q.options(joinedload(UserEmailMap.user))
 
            ret = getattr(q.scalar(), 'user', None)
 

	
 
        return ret
 

	
 
    @classmethod
 
    def get_from_cs_author(cls, author):
 
        """
 
        Tries to get User objects out of commit author string
 

	
 
        :param author:
 
        """
 
        from kallithea.lib.helpers import author_name, email
 

	
 
        # Valid email in the attribute passed, see if they're in the system
 
        _email = email(author)
 
        _email = author_email(author)
 
        if _email:
 
            user = cls.get_by_email(_email)
 
            if user is not None:
 
                return user
 
        # Maybe we can match by username?
 
        _author = author_name(author)
 
        user = cls.get_by_username(_author, case_insensitive=True)
 
        if user is not None:
 
            return user
 

	
 
    def update_lastlogin(self):
 
        """Update user lastlogin"""
 
        self.last_login = datetime.datetime.now()
 
        log.debug('updated user %s lastlogin', self.username)
 

	
 
    @classmethod
 
    def get_first_admin(cls):
 
        user = User.query().filter(User.admin == True).first()
 
        if user is None:
 
            raise Exception('Missing administrative account!')
 
        return user
 

	
 
    @classmethod
 
    def get_default_user(cls):
 
        user = User.get_by_username(User.DEFAULT_USER_NAME)
 
        if user is None:
 
            raise Exception('Missing default account!')
 
        return user
 

	
 
    def get_api_data(self, details=False):
 
        """
 
        Common function for generating user related data for API
 
        """
 
        user = self
 
        data = dict(
 
            user_id=user.user_id,
 
            username=user.username,
 
            firstname=user.name,
 
            lastname=user.lastname,
 
            email=user.email,
 
            emails=user.emails,
 
            active=user.active,
 
            admin=user.admin,
 
        )
 
        if details:
 
            data.update(dict(
 
                extern_type=user.extern_type,
 
                extern_name=user.extern_name,
 
                api_key=user.api_key,
 
                api_keys=user.api_keys,
 
                last_login=user.last_login,
 
                ip_addresses=user.ip_addresses
 
                ))
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
            full_name=self.full_name,
 
            full_name_or_username=self.full_name_or_username,
 
            short_contact=self.short_contact,
 
            full_contact=self.full_contact
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 

	
 
class UserApiKeys(meta.Base, BaseDbModel):
 
    __tablename__ = 'user_api_keys'
 
    __table_args__ = (
 
        Index('uak_api_key_idx', 'api_key'),
 
        Index('uak_api_key_expires_idx', 'api_key', 'expires'),
 
        _table_args_default_dict,
 
    )
 

	
 
    user_api_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    api_key = Column(String(255), nullable=False, unique=True)
 
    description = Column(UnicodeText(), nullable=False)
 
    expires = Column(Float(53), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    user = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.expires != -1) & (time.time() > self.expires)
 

	
 

	
 
class UserEmailMap(meta.Base, BaseDbModel):
 
    __tablename__ = 'user_email_map'
 
    __table_args__ = (
 
        Index('uem_email_idx', 'email'),
 
        _table_args_default_dict,
 
    )
 

	
 
    email_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _email = Column("email", String(255), nullable=False, unique=True)
 
    user = relationship('User')
 

	
 
    @validates('_email')
 
    def validate_email(self, key, email):
 
        # check if this email is not main one
 
        main_email = meta.Session().query(User).filter(User.email == email).scalar()
 
        if main_email is not None:
 
            raise AttributeError('email %s is present is user table' % email)
 
        return email
 

	
 
    @hybrid_property
 
    def email(self):
 
        return self._email
 

	
 
    @email.setter
 
    def email(self, val):
 
        self._email = val.lower() if val else None
 

	
 

	
 
class UserIpMap(meta.Base, BaseDbModel):
 
    __tablename__ = 'user_ip_map'
 
    __table_args__ = (
 
        UniqueConstraint('user_id', 'ip_addr'),
 
        _table_args_default_dict,
 
    )
 

	
 
    ip_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    ip_addr = Column(String(255), nullable=False)
 
    active = Column(Boolean(), nullable=False, default=True)
 
    user = relationship('User')
 

	
 
    @classmethod
 
    def _get_ip_range(cls, ip_addr):
 
        net = ipaddr.IPNetwork(address=ip_addr)
 
        return [str(net.network), str(net.broadcast)]
 

	
 
    def __json__(self):
 
        return dict(
 
          ip_addr=self.ip_addr,
 
          ip_range=self._get_ip_range(self.ip_addr)
 
        )
 

	
 
    def __repr__(self):
 
        return "<%s %s: %s>" % (self.__class__.__name__, self.user_id, self.ip_addr)
 

	
 

	
 
class UserLog(meta.Base, BaseDbModel):
 
    __tablename__ = 'user_logs'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    user_log_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
 
    username = Column(String(255), nullable=False)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    repository_name = Column(Unicode(255), nullable=False)
 
    user_ip = Column(String(255), nullable=True)
 
    action = Column(UnicodeText(), nullable=False)
 
    action_date = Column(DateTime(timezone=False), nullable=False)
 

	
 
    def __repr__(self):
 
        return "<%s %r: %r>" % (self.__class__.__name__,
 
                                  self.repository_name,
 
                                  self.action)
 

	
 
    @property
 
    def action_as_day(self):
 
        return datetime.date(*self.action_date.timetuple()[:3])
 

	
 
    user = relationship('User')
 
    repository = relationship('Repository', cascade='')
 

	
 

	
 
class UserGroup(meta.Base, BaseDbModel):
 
    __tablename__ = 'users_groups'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_id = Column(Integer(), primary_key=True)
 
    users_group_name = Column(Unicode(255), nullable=False, unique=True)
 
    user_group_description = Column(Unicode(10000), nullable=True) # FIXME: not nullable?
 
    users_group_active = Column(Boolean(), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _group_data = Column("group_data", LargeBinary(), nullable=True)  # JSON data # FIXME: not nullable?
 

	
 
    members = relationship('UserGroupMember', cascade="all, delete-orphan")
 
    users_group_to_perm = relationship('UserGroupToPerm', cascade='all')
 
    users_group_repo_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    users_group_repo_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
 
    user_user_group_to_perm = relationship('UserUserGroupToPerm', cascade='all')
 
    user_group_user_group_to_perm = relationship('UserGroupUserGroupToPerm', primaryjoin="UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id", cascade='all')
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def group_data(self):
 
        if not self._group_data:
 
            return {}
 

	
 
        try:
 
            return ext_json.loads(self._group_data)
 
        except TypeError:
 
            return {}
 

	
 
    @group_data.setter
 
    def group_data(self, val):
 
        try:
 
            self._group_data = ascii_bytes(ext_json.dumps(val))
 
        except Exception:
 
            log.error(traceback.format_exc())
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r>" % (self.__class__.__name__,
 
                                  self.users_group_id,
 
                                  self.users_group_name)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(UserGroup, cls).guess_instance(value, UserGroup.get_by_group_name)
 

	
 
    @classmethod
 
    def get_by_group_name(cls, group_name, case_insensitive=False):
 
        if case_insensitive:
 
            q = cls.query().filter(sqlalchemy.func.lower(cls.users_group_name) == sqlalchemy.func.lower(group_name))
 
        else:
 
            q = cls.query().filter(cls.users_group_name == group_name)
 
        return q.scalar()
 

	
 
    @classmethod
 
    def get(cls, user_group_id):
 
        user_group = cls.query()
 
        return user_group.get(user_group_id)
 

	
 
    def get_api_data(self, with_members=True):
 
        user_group = self
 

	
 
        data = dict(
 
            users_group_id=user_group.users_group_id,
 
            group_name=user_group.users_group_name,
 
            group_description=user_group.user_group_description,
 
            active=user_group.users_group_active,
 
            owner=user_group.owner.username,
 
        )
 
        if with_members:
 
            data['members'] = [
 
                ugm.user.get_api_data()
 
                for ugm in user_group.members
 
            ]
 

	
 
        return data
 

	
 

	
 
class UserGroupMember(meta.Base, BaseDbModel):
 
    __tablename__ = 'users_groups_members'
 
    __table_args__ = (
 
        _table_args_default_dict,
 
    )
 

	
 
    users_group_member_id = Column(Integer(), primary_key=True)
 
    users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    users_group = relationship('UserGroup')
 

	
 
    def __init__(self, gr_id='', u_id=''):
 
        self.users_group_id = gr_id
 
        self.user_id = u_id
 

	
 

	
 
class RepositoryField(meta.Base, BaseDbModel):
 
    __tablename__ = 'repositories_fields'
 
    __table_args__ = (
 
        UniqueConstraint('repository_id', 'field_key'),  # no-multi field
 
        _table_args_default_dict,
 
    )
 

	
 
    PREFIX = 'ex_'  # prefix used in form to not conflict with already existing fields
 

	
 
    repo_field_id = Column(Integer(), primary_key=True)
 
    repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    field_key = Column(String(250), nullable=False)
 
    field_label = Column(String(1024), nullable=False)
 
    field_value = Column(String(10000), nullable=False)
 
    field_desc = Column(String(1024), nullable=False)
 
    field_type = Column(String(255), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    repository = relationship('Repository')
 

	
 
    @property
 
    def field_key_prefixed(self):
 
        return 'ex_%s' % self.field_key
 

	
 
    @classmethod
 
    def un_prefix_key(cls, key):
 
        if key.startswith(cls.PREFIX):
 
            return key[len(cls.PREFIX):]
 
        return key
 

	
 
    @classmethod
 
    def get_by_key_name(cls, key, repo):
 
        row = cls.query() \
 
                .filter(cls.repository == repo) \
 
                .filter(cls.field_key == key).scalar()
 
        return row
 

	
 

	
 
class Repository(meta.Base, BaseDbModel):
 
    __tablename__ = 'repositories'
 
    __table_args__ = (
 
        Index('r_repo_name_idx', 'repo_name'),
 
        _table_args_default_dict,
 
    )
 

	
 
    DEFAULT_CLONE_URI = '{scheme}://{user}@{netloc}/{repo}'
 
    DEFAULT_CLONE_SSH = 'ssh://{system_user}@{hostname}/{repo}'
 

	
 
    STATE_CREATED = 'repo_state_created'
 
    STATE_PENDING = 'repo_state_pending'
 
    STATE_ERROR = 'repo_state_error'
 

	
 
    repo_id = Column(Integer(), primary_key=True)
 
    repo_name = Column(Unicode(255), nullable=False, unique=True)
 
    repo_state = Column(String(255), nullable=False)
 

	
 
    clone_uri = Column(String(255), nullable=True) # FIXME: not nullable?
 
    repo_type = Column(String(255), nullable=False) # 'hg' or 'git'
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    private = Column(Boolean(), nullable=False)
 
    enable_statistics = Column("statistics", Boolean(), nullable=False, default=True)
 
    enable_downloads = Column("downloads", Boolean(), nullable=False, default=True)
 
    description = Column(Unicode(10000), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    _landing_revision = Column("landing_revision", String(255), nullable=False)
 
    _changeset_cache = Column("changeset_cache", LargeBinary(), nullable=True) # JSON data # FIXME: not nullable?
 

	
 
    fork_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=True)
 
    group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=True)
 

	
 
    owner = relationship('User')
 
    fork = relationship('Repository', remote_side=repo_id)
 
    group = relationship('RepoGroup')
 
    repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
 
    users_group_to_perm = relationship('UserGroupRepoToPerm', cascade='all')
 
    stats = relationship('Statistics', cascade='all', uselist=False)
 

	
 
    followers = relationship('UserFollowing',
 
                             primaryjoin='UserFollowing.follows_repository_id==Repository.repo_id',
 
                             cascade='all')
 
    extra_fields = relationship('RepositoryField',
 
                                cascade="all, delete-orphan")
 

	
 
    logs = relationship('UserLog')
 
    comments = relationship('ChangesetComment', cascade="all, delete-orphan")
 

	
 
    pull_requests_org = relationship('PullRequest',
 
                    primaryjoin='PullRequest.org_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    pull_requests_other = relationship('PullRequest',
 
                    primaryjoin='PullRequest.other_repo_id==Repository.repo_id',
 
                    cascade="all, delete-orphan")
 

	
 
    def __repr__(self):
 
        return "<%s %s: %r>" % (self.__class__.__name__,
 
                                  self.repo_id, self.repo_name)
 

	
 
    @hybrid_property
 
    def landing_rev(self):
 
        # always should return [rev_type, rev]
 
        if self._landing_revision:
 
            _rev_info = self._landing_revision.split(':')
 
            if len(_rev_info) < 2:
 
                _rev_info.insert(0, 'rev')
 
            return [_rev_info[0], _rev_info[1]]
 
        return [None, None]
 

	
 
    @landing_rev.setter
 
    def landing_rev(self, val):
 
        if ':' not in val:
0 comments (0 inline, 0 general)