Changeset - a765b2961eda
[Not reviewed]
default
0 4 0
Mads Kiilerich (mads) - 5 years ago 2020-10-11 00:19:46
mads@kiilerich.com
Grafted from: 97578610caa5
lib: move extract_mentioned_users out of utils2

Avoid model.db reference to keep utils2.py independent of Kallithea classes.
4 files changed with 14 insertions and 14 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/utils.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.utils
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Utilities library for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 18, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 
import os
 
import re
 
import sys
 
import traceback
 
import urllib.error
 
from distutils.version import StrictVersion
 

	
 
import mercurial.config
 
import mercurial.error
 
import mercurial.ui
 

	
 
import kallithea.config.conf
 
from kallithea.lib.exceptions import InvalidCloneUriException
 
from kallithea.lib.utils2 import ascii_bytes, aslist, get_current_authuser, safe_bytes, safe_str
 
from kallithea.lib.utils2 import ascii_bytes, aslist, extract_mentioned_usernames, get_current_authuser, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.git.repository import GitRepository
 
from kallithea.lib.vcs.backends.hg.repository import MercurialRepository
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils.fakemod import create_module
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model import meta
 
from kallithea.model.db import RepoGroup, Repository, Setting, Ui, User, UserGroup, UserLog
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
 

	
 

	
 
#==============================================================================
 
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
 
#==============================================================================
 
def get_repo_slug(request):
 
    _repo = request.environ['pylons.routes_dict'].get('repo_name')
 
    if _repo:
 
        _repo = _repo.rstrip('/')
 
    return _repo
 

	
 

	
 
def get_repo_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('group_name')
 
    if _group:
 
        _group = _group.rstrip('/')
 
    return _group
 

	
 

	
 
def get_user_group_slug(request):
 
    _group = request.environ['pylons.routes_dict'].get('id')
 
    _group = UserGroup.get(_group)
 
    if _group:
 
        return _group.users_group_name
 
    return None
 

	
 

	
 
def _get_permanent_id(s):
 
    """Helper for decoding stable URLs with repo ID. For a string like '_123'
 
    return 123.
 
    """
 
    by_id_match = re.match(r'^_(\d+)$', s)
 
    if by_id_match is None:
 
        return None
 
    return int(by_id_match.group(1))
 
@@ -546,48 +546,58 @@ git_req_ver = StrictVersion('1.7.4')
 

	
 
def check_git_version():
 
    """
 
    Checks what version of git is installed on the system, and raise a system exit
 
    if it's too old for Kallithea to work properly.
 
    """
 
    if 'git' not in kallithea.BACKENDS:
 
        return None
 

	
 
    if not settings.GIT_EXECUTABLE_PATH:
 
        log.warning('No git executable configured - check "git_path" in the ini file.')
 
        return None
 

	
 
    try:
 
        stdout, stderr = GitRepository._run_git_command(['--version'])
 
    except RepositoryError as e:
 
        # message will already have been logged as error
 
        log.warning('No working git executable found - check "git_path" in the ini file.')
 
        return None
 

	
 
    if stderr:
 
        log.warning('Error/stderr from "%s --version":\n%s', settings.GIT_EXECUTABLE_PATH, safe_str(stderr))
 

	
 
    if not stdout:
 
        log.warning('No working git executable found - check "git_path" in the ini file.')
 
        return None
 

	
 
    output = safe_str(stdout).strip()
 
    m = re.search(r"\d+.\d+.\d+", output)
 
    if m:
 
        ver = StrictVersion(m.group(0))
 
        log.debug('Git executable: "%s", version %s (parsed from: "%s")',
 
                  settings.GIT_EXECUTABLE_PATH, ver, output)
 
        if ver < git_req_ver:
 
            log.error('Kallithea detected %s version %s, which is too old '
 
                      'for the system to function properly. '
 
                      'Please upgrade to version %s or later. '
 
                      'If you strictly need Mercurial repositories, you can '
 
                      'clear the "git_path" setting in the ini file.',
 
                      settings.GIT_EXECUTABLE_PATH, ver, git_req_ver)
 
            log.error("Terminating ...")
 
            sys.exit(1)
 
    else:
 
        ver = StrictVersion('0.0.0')
 
        log.warning('Error finding version number in "%s --version" stdout:\n%s',
 
                    settings.GIT_EXECUTABLE_PATH, output)
 

	
 
    return ver
 

	
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
        user = User.get_by_username(name, case_insensitive=True)
 
        if user is not None and not user.is_default_user:
 
            result.add(user)
 
    return result
kallithea/lib/utils2.py
Show inline comments
 
@@ -333,107 +333,96 @@ def get_changeset_safe(repo, rev):
 
    Safe version of get_changeset if this changeset doesn't exists for a
 
    repo it returns a Dummy one instead
 

	
 
    :param repo:
 
    :param rev:
 
    """
 
    if not isinstance(repo, BaseRepository):
 
        raise Exception('You must pass an Repository '
 
                        'object as first argument got %s' % type(repo))
 

	
 
    try:
 
        cs = repo.get_changeset(rev)
 
    except (RepositoryError, LookupError):
 
        cs = EmptyChangeset(requested_revision=rev)
 
    return cs
 

	
 

	
 
def datetime_to_time(dt):
 
    if dt:
 
        return time.mktime(dt.timetuple())
 

	
 

	
 
def time_to_datetime(tm):
 
    if tm:
 
        if isinstance(tm, str):
 
            try:
 
                tm = float(tm)
 
            except ValueError:
 
                return
 
        return datetime.datetime.fromtimestamp(tm)
 

	
 

	
 
# Must match regexp in kallithea/public/js/base.js MentionsAutoComplete()
 
# Check char before @ - it must not look like we are in an email addresses.
 
# Matching is greedy so we don't have to look beyond the end.
 
MENTIONS_REGEX = re.compile(r'(?:^|(?<=[^a-zA-Z0-9]))@([a-zA-Z0-9][-_.a-zA-Z0-9]*[a-zA-Z0-9])')
 

	
 

	
 
def extract_mentioned_usernames(text):
 
    r"""
 
    Returns list of (possible) usernames @mentioned in given text.
 

	
 
    >>> extract_mentioned_usernames('@1-2.a_X,@1234 not@not @ddd@not @n @ee @ff @gg, @gg;@hh @n\n@zz,')
 
    ['1-2.a_X', '1234', 'ddd', 'ee', 'ff', 'gg', 'gg', 'hh', 'zz']
 
    """
 
    return MENTIONS_REGEX.findall(text)
 

	
 

	
 
def extract_mentioned_users(text):
 
    """ Returns set of actual database Users @mentioned in given text. """
 
    from kallithea.model.db import User
 
    result = set()
 
    for name in extract_mentioned_usernames(text):
 
        user = User.get_by_username(name, case_insensitive=True)
 
        if user is not None and not user.is_default_user:
 
            result.add(user)
 
    return result
 

	
 

	
 
class AttributeDict(dict):
 
    def __getattr__(self, attr):
 
        return self.get(attr, None)
 
    __setattr__ = dict.__setitem__
 
    __delattr__ = dict.__delitem__
 

	
 

	
 
def obfuscate_url_pw(engine):
 
    try:
 
        _url = sa_url.make_url(engine or '')
 
    except ArgumentError:
 
        return engine
 
    if _url.password:
 
        _url.password = 'XXXXX'
 
    return str(_url)
 

	
 

	
 
class HookEnvironmentError(Exception): pass
 

	
 

	
 
def get_hook_environment():
 
    """
 
    Get hook context by deserializing the global KALLITHEA_EXTRAS environment
 
    variable.
 

	
 
    Called early in Git out-of-process hooks to get .ini config path so the
 
    basic environment can be configured properly. Also used in all hooks to get
 
    information about the action that triggered it.
 
    """
 

	
 
    try:
 
        kallithea_extras = os.environ['KALLITHEA_EXTRAS']
 
    except KeyError:
 
        raise HookEnvironmentError("Environment variable KALLITHEA_EXTRAS not found")
 

	
 
    extras = json.loads(kallithea_extras)
 
    for k in ['username', 'repository', 'scm', 'action', 'ip', 'config']:
 
        try:
 
            extras[k]
 
        except KeyError:
 
            raise HookEnvironmentError('Missing key %s in KALLITHEA_EXTRAS %s' % (k, extras))
 

	
 
    return AttributeDict(extras)
 

	
 

	
 
def set_hook_environment(username, ip_addr, repo_name, repo_alias, action=None):
 
    """Prepare global context for running hooks by serializing data in the
 
    global KALLITHEA_EXTRAS environment variable.
kallithea/model/comment.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.comment
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
comments model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 11, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
from collections import defaultdict
 

	
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.utils2 import extract_mentioned_users
 
from kallithea.lib.utils import extract_mentioned_users
 
from kallithea.model.db import ChangesetComment, PullRequest, Repository, User
 
from kallithea.model.meta import Session
 
from kallithea.model.notification import NotificationModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _list_changeset_commenters(revision):
 
    return (Session().query(User)
 
        .join(ChangesetComment.author)
 
        .filter(ChangesetComment.revision == revision)
 
        .all())
 

	
 
def _list_pull_request_commenters(pull_request):
 
    return (Session().query(User)
 
        .join(ChangesetComment.author)
 
        .filter(ChangesetComment.pull_request_id == pull_request.pull_request_id)
 
        .all())
 

	
 

	
 
class ChangesetCommentsModel(object):
 

	
 
    def _get_notification_data(self, repo, comment, author, comment_text,
 
                               line_no=None, revision=None, pull_request=None,
 
                               status_change=None, closing_pr=False):
 
        """
 
        :returns: tuple (subj,body,recipients,notification_type,email_kwargs)
 
        """
 
        # make notification
 
        body = comment_text  # text of the comment
 
        line = ''
 
        if line_no:
 
            line = _('on line %s') % line_no
 

	
 
        # changeset
 
        if revision:
 
            notification_type = NotificationModel.TYPE_CHANGESET_COMMENT
 
            cs = repo.scm_instance.get_changeset(revision)
 
            desc = cs.short_id
 

	
 
            threading = ['%s-rev-%s@%s' % (repo.repo_name, revision, h.canonical_hostname())]
 
            if line_no: # TODO: url to file _and_ line number
 
                threading.append('%s-rev-%s-line-%s@%s' % (repo.repo_name, revision, line_no,
 
                                                           h.canonical_hostname()))
 
            comment_url = h.canonical_url('changeset_home',
 
                repo_name=repo.repo_name,
 
                revision=revision,
kallithea/model/pull_request.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.pull_request
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
pull request model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 
import re
 

	
 
from tg import request
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib.hooks import log_create_pullrequest
 
from kallithea.lib.utils2 import ascii_bytes, extract_mentioned_users
 
from kallithea.lib.utils import extract_mentioned_users
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model.db import ChangesetStatus, PullRequest, PullRequestReviewer, User
 
from kallithea.model.meta import Session
 
from kallithea.model.notification import NotificationModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _assert_valid_reviewers(seq):
 
    """Sanity check: elements are actual User objects, and not the default user."""
 
    assert not any(user.is_default_user for user in seq)
 

	
 

	
 
class PullRequestModel(object):
 

	
 
    def add_reviewers(self, user, pr, reviewers, mention_recipients=None):
 
        """Add reviewer and send notification to them.
 
        """
 
        reviewers = set(reviewers)
 
        _assert_valid_reviewers(reviewers)
 
        if mention_recipients is not None:
 
            mention_recipients = set(mention_recipients) - reviewers
 
            _assert_valid_reviewers(mention_recipients)
 

	
 
        redundant_reviewers = set(User.query() \
 
            .join(PullRequestReviewer) \
 
            .filter(PullRequestReviewer.pull_request == pr) \
 
            .filter(PullRequestReviewer.user_id.in_(r.user_id for r in reviewers))
 
            .all())
 

	
 
        if redundant_reviewers:
 
            log.debug('Following reviewers were already part of pull request %s: %s', pr.pull_request_id, redundant_reviewers)
 

	
 
            reviewers -= redundant_reviewers
 

	
 
        log.debug('Adding reviewers to pull request %s: %s', pr.pull_request_id, reviewers)
 
        for reviewer in reviewers:
 
            prr = PullRequestReviewer(reviewer, pr)
 
            Session().add(prr)
 

	
 
        # notification to reviewers
 
        pr_url = pr.url(canonical=True)
 
        threading = ['%s-pr-%s@%s' % (pr.other_repo.repo_name,
 
                                      pr.pull_request_id,
 
                                      h.canonical_hostname())]
 
        subject = h.link_to(
 
            _('%(user)s wants you to review pull request %(pr_nice_id)s: %(pr_title)s') %
 
                {'user': user.username,
0 comments (0 inline, 0 general)