Changeset - 0c65a8f15e54
[Not reviewed]
default
0 9 0
Mads Kiilerich (mads) - 5 years ago 2020-10-29 14:32:42
mads@kiilerich.com
lib: move canonical_url & co to webutils

This gives less of the unfortunate use of helpers - especially in model.
9 files changed with 66 insertions and 62 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/feed.py
Show inline comments
 
@@ -27,24 +27,25 @@ Original author and date, and relevant c
 

	
 

	
 
import logging
 

	
 
from beaker.cache import cache_region
 
from tg import response
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea.lib import feeds
 
from kallithea.lib import helpers as h
 
from kallithea.lib import webutils
 
from kallithea.lib.auth import HasRepoPermissionLevelDecorator, LoginRequired
 
from kallithea.lib.base import BaseRepoController
 
from kallithea.lib.diffs import DiffProcessor
 
from kallithea.lib.utils2 import asbool, safe_int, safe_str
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FeedController(BaseRepoController):
 

	
 
    @LoginRequired(allow_default_user=True)
 
@@ -75,55 +76,55 @@ class FeedController(BaseRepoController)
 

	
 
        for st in diff_processor.parsed:
 
            st.update({'added': st['stats']['added'],
 
                       'removed': st['stats']['deleted']})
 
            changes.append('\n %(operation)s %(filename)s '
 
                           '(%(added)s lines added, %(removed)s lines removed)'
 
                            % st)
 
        if diff_processor.limited_diff:
 
            changes = changes + ['\n ' +
 
                                 _('Changeset was too big and was cut off...')]
 

	
 
        # rev link
 
        _url = h.canonical_url('changeset_home', repo_name=c.db_repo.repo_name,
 
        _url = webutils.canonical_url('changeset_home', repo_name=c.db_repo.repo_name,
 
                   revision=cs.raw_id)
 
        desc_msg.append('changeset: <a href="%s">%s</a>' % (_url, cs.raw_id[:8]))
 

	
 
        desc_msg.append('<pre>')
 
        desc_msg.append(h.urlify_text(cs.message))
 
        desc_msg.append('\n')
 
        desc_msg.extend(changes)
 
        if asbool(kallithea.CONFIG.get('rss_include_diff', False)):
 
            desc_msg.append('\n\n')
 
            desc_msg.append(safe_str(raw_diff))
 
        desc_msg.append('</pre>')
 
        return desc_msg
 

	
 
    def _feed(self, repo_name, feeder):
 
        """Produce a simple feed"""
 

	
 
        @cache_region('long_term_file', '_get_feed_from_cache')
 
        def _get_feed_from_cache(*_cache_keys):  # parameters are not really used - only as caching key
 
            header = dict(
 
                title=_('%s %s feed') % (c.site_name, repo_name),
 
                link=h.canonical_url('summary_home', repo_name=repo_name),
 
                link=webutils.canonical_url('summary_home', repo_name=repo_name),
 
                description=_('Changes on %s repository') % repo_name,
 
            )
 

	
 
            rss_items_per_page = safe_int(kallithea.CONFIG.get('rss_items_per_page', 20))
 
            entries=[]
 
            for cs in reversed(list(c.db_repo_scm_instance[-rss_items_per_page:])):
 
                entries.append(dict(
 
                    title=self._get_title(cs),
 
                    link=h.canonical_url('changeset_home', repo_name=repo_name, revision=cs.raw_id),
 
                    link=webutils.canonical_url('changeset_home', repo_name=repo_name, revision=cs.raw_id),
 
                    author_email=cs.author_email,
 
                    author_name=cs.author_name,
 
                    description=''.join(self.__get_desc(cs)),
 
                    pubdate=cs.date,
 
                ))
 
            return feeder.render(header, entries)
 

	
 
        response.content_type = feeder.content_type
 
        return _get_feed_from_cache(repo_name, feeder.__name__)
 

	
 
    def atom(self, repo_name):
 
        """Produce a simple atom-1.0 feed"""
kallithea/controllers/journal.py
Show inline comments
 
@@ -29,25 +29,25 @@ import logging
 
import traceback
 
from itertools import groupby
 

	
 
from sqlalchemy import or_
 
from sqlalchemy.orm import joinedload
 
from tg import request, response
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.controllers.admin.admin import _journal_filter
 
from kallithea.lib import feeds
 
from kallithea.lib import feeds, webutils
 
from kallithea.lib.auth import LoginRequired
 
from kallithea.lib.base import BaseController, render
 
from kallithea.lib.page import Page
 
from kallithea.lib.utils2 import AttributeDict, safe_int
 
from kallithea.model import db, meta
 
from kallithea.model.repo import RepoModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
language = 'en-us'
 
@@ -116,56 +116,56 @@ class JournalController(BaseController):
 
        for entry in journal[:feed_nr]:
 
            user = entry.user
 
            if user is None:
 
                # fix deleted users
 
                user = AttributeDict({'short_contact': entry.username,
 
                                      'email': '',
 
                                      'full_contact': ''})
 
            action, action_extra, ico = h.action_parser(entry, feed=True)
 
            title = "%s - %s %s" % (user.short_contact, action(),
 
                                    entry.repository.repo_name)
 
            _url = None
 
            if entry.repository is not None:
 
                _url = h.canonical_url('changelog_home',
 
                _url = webutils.canonical_url('changelog_home',
 
                           repo_name=entry.repository.repo_name)
 

	
 
            entries.append(dict(
 
                title=title,
 
                pubdate=entry.action_date,
 
                link=_url or h.canonical_url(''),
 
                link=_url or webutils.canonical_url(''),
 
                author_email=user.email,
 
                author_name=user.full_name_or_username,
 
                description=action_extra(),
 
            ))
 

	
 
        return feeder.render(header, entries)
 

	
 
    def _atom_feed(self, repos, public=True):
 
        if public:
 
            link = h.canonical_url('public_journal_atom')
 
            link = webutils.canonical_url('public_journal_atom')
 
            desc = '%s %s %s' % (c.site_name, _('Public Journal'),
 
                                  'atom feed')
 
        else:
 
            link = h.canonical_url('journal_atom')
 
            link = webutils.canonical_url('journal_atom')
 
            desc = '%s %s %s' % (c.site_name, _('Journal'), 'atom feed')
 

	
 
        return self._feed(repos, feeds.AtomFeed, link, desc)
 

	
 
    def _rss_feed(self, repos, public=True):
 
        if public:
 
            link = h.canonical_url('public_journal_atom')
 
            link = webutils.canonical_url('public_journal_atom')
 
            desc = '%s %s %s' % (c.site_name, _('Public Journal'),
 
                                  'rss feed')
 
        else:
 
            link = h.canonical_url('journal_atom')
 
            link = webutils.canonical_url('journal_atom')
 
            desc = '%s %s %s' % (c.site_name, _('Journal'), 'rss feed')
 

	
 
        return self._feed(repos, feeds.RssFeed, link, desc)
 

	
 
    @LoginRequired()
 
    def index(self):
 
        # Return a rendered template
 
        p = safe_int(request.GET.get('page'), 1)
 
        c.user = db.User.get(request.authuser.user_id)
 
        c.following = db.UserFollowing.query() \
 
            .filter(db.UserFollowing.user_id == request.authuser.user_id) \
 
            .options(joinedload(db.UserFollowing.follows_repository)) \
kallithea/lib/helpers.py
Show inline comments
 
@@ -46,74 +46,53 @@ from kallithea.lib.annotate import annot
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel
 
from kallithea.lib.diffs import BIN_FILENODE, CHMOD_FILENODE, DEL_FILENODE, MOD_FILENODE, NEW_FILENODE, RENAMED_FILENODE
 
from kallithea.lib.markup_renderer import url_re
 
from kallithea.lib.pygmentsutils import get_custom_lexer
 
from kallithea.lib.utils2 import MENTIONS_REGEX, AttributeDict, age, asbool, credentials_filter, safe_bytes, safe_int, safe_str, time_to_datetime
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.webutils import url
 
from kallithea.lib.webutils import canonical_url, url
 
from kallithea.model import db
 
from kallithea.model.changeset_status import ChangesetStatusModel
 

	
 

	
 
# mute pyflakes "imported but unused"
 
assert Option
 
assert checkbox
 
assert end_form
 
assert password
 
assert radio
 
assert submit
 
assert text
 
assert textarea
 
assert format_byte_size
 
assert chop_at
 
assert wrap_paragraphs
 
assert HasPermissionAny
 
assert HasRepoGroupPermissionLevel
 
assert HasRepoPermissionLevel
 
assert age
 
assert time_to_datetime
 
assert EmptyChangeset
 
assert canonical_url
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def canonical_url(*args, **kargs):
 
    '''Like url(x, qualified=True), but returns url that not only is qualified
 
    but also canonical, as configured in canonical_url'''
 
    try:
 
        parts = kallithea.CONFIG.get('canonical_url', '').split('://', 1)
 
        kargs['host'] = parts[1]
 
        kargs['protocol'] = parts[0]
 
    except IndexError:
 
        kargs['qualified'] = True
 
    return url(*args, **kargs)
 

	
 

	
 
def canonical_hostname():
 
    '''Return canonical hostname of system'''
 
    try:
 
        parts = kallithea.CONFIG.get('canonical_url', '').split('://', 1)
 
        return parts[1].split('/', 1)[0]
 
    except IndexError:
 
        parts = url('home', qualified=True).split('://', 1)
 
        return parts[1].split('/', 1)[0]
 

	
 

	
 
def html_escape(s):
 
    """Return string with all html escaped.
 
    This is also safe for javascript in html but not necessarily correct.
 
    """
 
    return (s
 
        .replace('&', '&amp;')
 
        .replace(">", "&gt;")
 
        .replace("<", "&lt;")
 
        .replace('"', "&quot;")
 
        .replace("'", "&apos;") # Note: this is HTML5 not HTML4 and might not work in mails
 
        )
 

	
kallithea/lib/webutils.py
Show inline comments
 
@@ -6,31 +6,37 @@
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.webutils
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
Helper functions that rely on the current WSGI request, exposed in the TG2
 
Helper functions that may rely on the current WSGI request, exposed in the TG2
 
thread-local "global" variables. It should have few dependencies so it can be
 
imported anywhere - just like the global variables can be used everywhere.
 
"""
 

	
 
from tg import request
 

	
 
import kallithea
 

	
 

	
 
#
 
# General Kallithea URL handling
 
#
 

	
 
class UrlGenerator(object):
 
    """Emulate pylons.url in providing a wrapper around routes.url
 

	
 
    This code was added during migration from Pylons to Turbogears2. Pylons
 
    already provided a wrapper like this, but Turbogears2 does not.
 

	
 
    When the routing of Kallithea is changed to use less Routes and more
 
    Turbogears2-style routing, this class may disappear or change.
 

	
 
    url() (the __call__ method) returns the URL based on a route name and
 
    arguments.
 
@@ -38,12 +44,34 @@ class UrlGenerator(object):
 

	
 
    Refer to documentation of Routes for details:
 
    https://routes.readthedocs.io/en/latest/generating.html#generation
 
    """
 
    def __call__(self, *args, **kwargs):
 
        return request.environ['routes.url'](*args, **kwargs)
 

	
 
    def current(self, *args, **kwargs):
 
        return request.environ['routes.url'].current(*args, **kwargs)
 

	
 

	
 
url = UrlGenerator()
 

	
 

	
 
def canonical_url(*args, **kargs):
 
    '''Like url(x, qualified=True), but returns url that not only is qualified
 
    but also canonical, as configured in canonical_url'''
 
    try:
 
        parts = kallithea.CONFIG.get('canonical_url', '').split('://', 1)
 
        kargs['host'] = parts[1]
 
        kargs['protocol'] = parts[0]
 
    except IndexError:
 
        kargs['qualified'] = True
 
    return url(*args, **kargs)
 

	
 

	
 
def canonical_hostname():
 
    '''Return canonical hostname of system'''
 
    try:
 
        parts = kallithea.CONFIG.get('canonical_url', '').split('://', 1)
 
        return parts[1].split('/', 1)[0]
 
    except IndexError:
 
        parts = url('home', qualified=True).split('://', 1)
 
        return parts[1].split('/', 1)[0]
kallithea/model/comment.py
Show inline comments
 
@@ -22,24 +22,25 @@ Original author and date, and relevant c
 
:created_on: Nov 11, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
from collections import defaultdict
 

	
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import helpers as h
 
from kallithea.lib import webutils
 
from kallithea.lib.utils import extract_mentioned_users
 
from kallithea.model import db, meta
 
from kallithea.model.notification import NotificationModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _list_changeset_commenters(revision):
 
    return (meta.Session().query(db.User)
 
        .join(db.ChangesetComment.author)
 
        .filter(db.ChangesetComment.revision == revision)
 
@@ -63,103 +64,103 @@ class ChangesetCommentsModel(object):
 
        # make notification
 
        body = comment_text  # text of the comment
 
        line = ''
 
        if line_no:
 
            line = _('on line %s') % line_no
 

	
 
        # changeset
 
        if revision:
 
            notification_type = NotificationModel.TYPE_CHANGESET_COMMENT
 
            cs = repo.scm_instance.get_changeset(revision)
 
            desc = cs.short_id
 

	
 
            threading = ['%s-rev-%s@%s' % (repo.repo_name, revision, h.canonical_hostname())]
 
            threading = ['%s-rev-%s@%s' % (repo.repo_name, revision, webutils.canonical_hostname())]
 
            if line_no: # TODO: url to file _and_ line number
 
                threading.append('%s-rev-%s-line-%s@%s' % (repo.repo_name, revision, line_no,
 
                                                           h.canonical_hostname()))
 
            comment_url = h.canonical_url('changeset_home',
 
                                                           webutils.canonical_hostname()))
 
            comment_url = webutils.canonical_url('changeset_home',
 
                repo_name=repo.repo_name,
 
                revision=revision,
 
                anchor='comment-%s' % comment.comment_id)
 
            subj = h.link_to(
 
                'Re changeset: %(desc)s %(line)s' %
 
                          {'desc': desc, 'line': line},
 
                 comment_url)
 
            # get the current participants of this changeset
 
            recipients = _list_changeset_commenters(revision)
 
            # add changeset author if it's known locally
 
            cs_author = db.User.get_from_cs_author(cs.author)
 
            if not cs_author:
 
                # use repo owner if we cannot extract the author correctly
 
                # FIXME: just use committer name even if not a user
 
                cs_author = repo.owner
 
            recipients.append(cs_author)
 

	
 
            email_kwargs = {
 
                'status_change': status_change,
 
                'cs_comment_user': author.full_name_and_username,
 
                'cs_target_repo': h.canonical_url('summary_home', repo_name=repo.repo_name),
 
                'cs_target_repo': webutils.canonical_url('summary_home', repo_name=repo.repo_name),
 
                'cs_comment_url': comment_url,
 
                'cs_url': h.canonical_url('changeset_home', repo_name=repo.repo_name, revision=revision),
 
                'cs_url': webutils.canonical_url('changeset_home', repo_name=repo.repo_name, revision=revision),
 
                'raw_id': revision,
 
                'message': cs.message,
 
                'message_short': h.shorter(cs.message, 50, firstline=True),
 
                'cs_author': cs_author,
 
                'cs_author_username': cs_author.username,
 
                'repo_name': repo.repo_name,
 
                'short_id': revision[:12],
 
                'branch': cs.branch,
 
                'comment_username': author.username,
 
                'threading': threading,
 
            }
 
        # pull request
 
        elif pull_request:
 
            notification_type = NotificationModel.TYPE_PULL_REQUEST_COMMENT
 
            desc = comment.pull_request.title
 
            _org_ref_type, org_ref_name, _org_rev = comment.pull_request.org_ref.split(':')
 
            _other_ref_type, other_ref_name, _other_rev = comment.pull_request.other_ref.split(':')
 
            threading = ['%s-pr-%s@%s' % (pull_request.other_repo.repo_name,
 
                                          pull_request.pull_request_id,
 
                                          h.canonical_hostname())]
 
                                          webutils.canonical_hostname())]
 
            if line_no: # TODO: url to file _and_ line number
 
                threading.append('%s-pr-%s-line-%s@%s' % (pull_request.other_repo.repo_name,
 
                                                          pull_request.pull_request_id, line_no,
 
                                                          h.canonical_hostname()))
 
                                                          webutils.canonical_hostname()))
 
            comment_url = pull_request.url(canonical=True,
 
                anchor='comment-%s' % comment.comment_id)
 
            subj = h.link_to(
 
                'Re pull request %(pr_nice_id)s: %(desc)s %(line)s' %
 
                          {'desc': desc,
 
                           'pr_nice_id': comment.pull_request.nice_id(),
 
                           'line': line},
 
                comment_url)
 
            # get the current participants of this pull request
 
            recipients = _list_pull_request_commenters(pull_request)
 
            recipients.append(pull_request.owner)
 
            recipients += pull_request.get_reviewer_users()
 

	
 
            # set some variables for email notification
 
            email_kwargs = {
 
                'pr_title': pull_request.title,
 
                'pr_title_short': h.shorter(pull_request.title, 50),
 
                'pr_nice_id': pull_request.nice_id(),
 
                'status_change': status_change,
 
                'closing_pr': closing_pr,
 
                'pr_comment_url': comment_url,
 
                'pr_url': pull_request.url(canonical=True),
 
                'pr_comment_user': author.full_name_and_username,
 
                'pr_target_repo': h.canonical_url('summary_home',
 
                'pr_target_repo': webutils.canonical_url('summary_home',
 
                                   repo_name=pull_request.other_repo.repo_name),
 
                'pr_target_branch': other_ref_name,
 
                'pr_source_repo': h.canonical_url('summary_home',
 
                'pr_source_repo': webutils.canonical_url('summary_home',
 
                                   repo_name=pull_request.org_repo.repo_name),
 
                'pr_source_branch': org_ref_name,
 
                'pr_owner': pull_request.owner,
 
                'pr_owner_username': pull_request.owner.username,
 
                'repo_name': pull_request.other_repo.repo_name,
 
                'comment_username': author.username,
 
                'threading': threading,
 
            }
 

	
 
        return subj, body, recipients, notification_type, email_kwargs
 

	
 
    def create(self, text, repo, author, revision=None, pull_request=None,
kallithea/model/db.py
Show inline comments
 
@@ -1176,26 +1176,25 @@ class Repository(meta.Base, BaseDbModel)
 
            if url_obj.password:
 
                clone_uri = url_obj.with_password('*****')
 
        return clone_uri
 

	
 
    def clone_url(self, clone_uri_tmpl, with_id=False, username=None):
 
        if '{repo}' not in clone_uri_tmpl and '_{repoid}' not in clone_uri_tmpl:
 
            log.error("Configured clone_uri_tmpl %r has no '{repo}' or '_{repoid}' and cannot toggle to use repo id URLs", clone_uri_tmpl)
 
        elif with_id:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('{repo}', '_{repoid}')
 
        else:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('_{repoid}', '{repo}')
 

	
 
        import kallithea.lib.helpers as h
 
        prefix_url = h.canonical_url('home')
 
        prefix_url = webutils.canonical_url('home')
 

	
 
        return get_clone_url(clone_uri_tmpl=clone_uri_tmpl,
 
                             prefix_url=prefix_url,
 
                             repo_name=self.repo_name,
 
                             repo_id=self.repo_id,
 
                             username=username)
 

	
 
    def set_state(self, state):
 
        self.repo_state = state
 

	
 
    #==========================================================================
 
    # SCM PROPERTIES
 
@@ -2130,33 +2129,32 @@ class PullRequest(meta.Base, BaseDbModel
 
            org_repo_url=self.org_repo.clone_url(clone_uri_tmpl=clone_uri_tmpl),
 
            org_ref_parts=self.org_ref_parts,
 
            other_ref_parts=self.other_ref_parts,
 
            status=self.status,
 
            comments=self.comments,
 
            statuses=self.statuses,
 
            created_on=self.created_on.replace(microsecond=0),
 
            updated_on=self.updated_on.replace(microsecond=0),
 
        )
 

	
 
    def url(self, **kwargs):
 
        canonical = kwargs.pop('canonical', None)
 
        import kallithea.lib.helpers as h
 
        b = self.org_ref_parts[1]
 
        if b != self.other_ref_parts[1]:
 
            s = '/_/' + b
 
        else:
 
            s = '/_/' + self.title
 
        kwargs['extra'] = urlreadable(s)
 
        if canonical:
 
            return h.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
            return webutils.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                                   pull_request_id=self.pull_request_id, **kwargs)
 
        return webutils.url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                     pull_request_id=self.pull_request_id, **kwargs)
 

	
 

	
 
class PullRequestReviewer(meta.Base, BaseDbModel):
 
    __tablename__ = 'pull_request_reviewers'
 
    __table_args__ = (
 
        Index('pull_request_reviewers_user_id_idx', 'user_id'),
 
        UniqueConstraint('pull_request_id', 'user_id'),
 
        _table_args_default_dict,
 
    )
 
@@ -2228,26 +2226,25 @@ class Gist(meta.Base, BaseDbModel):
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def get_by_access_id(cls, gist_access_id):
 
        return cls.query().filter(cls.gist_access_id == gist_access_id).scalar()
 

	
 
    def gist_url(self):
 
        alias_url = kallithea.CONFIG.get('gist_alias_url')
 
        if alias_url:
 
            return alias_url.replace('{gistid}', self.gist_access_id)
 

	
 
        import kallithea.lib.helpers as h
 
        return h.canonical_url('gist', gist_id=self.gist_access_id)
 
        return webutils.canonical_url('gist', gist_id=self.gist_access_id)
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating gist related data for API
 
        """
 
        gist = self
 
        data = dict(
 
            gist_id=gist.gist_id,
 
            type=gist.gist_type,
 
            access_id=gist.gist_access_id,
 
            description=gist.gist_description,
 
            url=gist.gist_url(),
kallithea/model/pull_request.py
Show inline comments
 
@@ -25,24 +25,25 @@ Original author and date, and relevant c
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 
import re
 

	
 
from tg import request
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import auth
 
from kallithea.lib import helpers as h
 
from kallithea.lib import webutils
 
from kallithea.lib.hooks import log_create_pullrequest
 
from kallithea.lib.utils import extract_mentioned_users
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model import db, meta
 
from kallithea.model.notification import NotificationModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _assert_valid_reviewers(seq):
 
    """Sanity check: elements are actual User objects, and not the default user."""
 
@@ -71,50 +72,50 @@ class PullRequestModel(object):
 

	
 
            reviewers -= redundant_reviewers
 

	
 
        log.debug('Adding reviewers to pull request %s: %s', pr.pull_request_id, reviewers)
 
        for reviewer in reviewers:
 
            prr = db.PullRequestReviewer(reviewer, pr)
 
            meta.Session().add(prr)
 

	
 
        # notification to reviewers
 
        pr_url = pr.url(canonical=True)
 
        threading = ['%s-pr-%s@%s' % (pr.other_repo.repo_name,
 
                                      pr.pull_request_id,
 
                                      h.canonical_hostname())]
 
                                      webutils.canonical_hostname())]
 
        subject = h.link_to(
 
            _('%(user)s wants you to review pull request %(pr_nice_id)s: %(pr_title)s') %
 
                {'user': user.username,
 
                 'pr_title': pr.title,
 
                 'pr_nice_id': pr.nice_id()},
 
            pr_url)
 
        body = pr.description
 
        _org_ref_type, org_ref_name, _org_rev = pr.org_ref.split(':')
 
        _other_ref_type, other_ref_name, _other_rev = pr.other_ref.split(':')
 
        revision_data = [(x.raw_id, x.message)
 
                         for x in map(pr.org_repo.get_changeset, pr.revisions)]
 
        email_kwargs = {
 
            'pr_title': pr.title,
 
            'pr_title_short': h.shorter(pr.title, 50),
 
            'pr_user_created': user.full_name_and_username,
 
            'pr_repo_url': h.canonical_url('summary_home', repo_name=pr.other_repo.repo_name),
 
            'pr_repo_url': webutils.canonical_url('summary_home', repo_name=pr.other_repo.repo_name),
 
            'pr_url': pr_url,
 
            'pr_revisions': revision_data,
 
            'repo_name': pr.other_repo.repo_name,
 
            'org_repo_name': pr.org_repo.repo_name,
 
            'pr_nice_id': pr.nice_id(),
 
            'pr_target_repo': h.canonical_url('summary_home',
 
            'pr_target_repo': webutils.canonical_url('summary_home',
 
                               repo_name=pr.other_repo.repo_name),
 
            'pr_target_branch': other_ref_name,
 
            'pr_source_repo': h.canonical_url('summary_home',
 
            'pr_source_repo': webutils.canonical_url('summary_home',
 
                               repo_name=pr.org_repo.repo_name),
 
            'pr_source_branch': org_ref_name,
 
            'pr_owner': pr.owner,
 
            'pr_owner_username': pr.owner.username,
 
            'pr_username': user.username,
 
            'threading': threading,
 
            'is_mention': False,
 
            }
 
        if reviewers:
 
            NotificationModel().create(created_by=user, subject=subject, body=body,
 
                                       recipients=reviewers,
 
                                       type_=NotificationModel.TYPE_PULL_REQUEST,
 
@@ -333,45 +334,45 @@ class CreatePullRequestIterationAction(o
 
        new_other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, new_other_rev)
 

	
 
        self.create_action = CreatePullRequestAction(org_repo, other_repo, new_org_ref, new_other_ref, None, None, owner, reviewers)
 

	
 
        # Generate complete title/description
 

	
 
        old_revisions = set(old_pull_request.revisions)
 
        revisions = self.create_action.revisions
 
        new_revisions = [r for r in revisions if r not in old_revisions]
 
        lost = old_revisions.difference(revisions)
 

	
 
        infos = ['This is a new iteration of %s "%s".' %
 
                 (h.canonical_url('pullrequest_show', repo_name=old_pull_request.other_repo.repo_name,
 
                 (webutils.canonical_url('pullrequest_show', repo_name=old_pull_request.other_repo.repo_name,
 
                      pull_request_id=old_pull_request.pull_request_id),
 
                  old_pull_request.title)]
 

	
 
        if lost:
 
            infos.append(_('Missing changesets since the previous iteration:'))
 
            for r in old_pull_request.revisions:
 
                if r in lost:
 
                    rev_desc = org_repo.get_changeset(r).message.split('\n')[0]
 
                    infos.append('  %s %s' % (r[:12], rev_desc))
 

	
 
        if new_revisions:
 
            infos.append(_('New changesets on %s %s since the previous iteration:') % (org_ref_type, org_ref_name))
 
            for r in reversed(revisions):
 
                if r in new_revisions:
 
                    rev_desc = org_repo.get_changeset(r).message.split('\n')[0]
 
                    infos.append('  %s %s' % (r[:12], h.shorter(rev_desc, 80)))
 

	
 
            if self.create_action.other_ref == old_pull_request.other_ref:
 
                infos.append(_("Ancestor didn't change - diff since previous iteration:"))
 
                infos.append(h.canonical_url('compare_url',
 
                infos.append(webutils.canonical_url('compare_url',
 
                                 repo_name=org_repo.repo_name, # other_repo is always same as repo_name
 
                                 org_ref_type='rev', org_ref_name=org_rev[:12], # use old org_rev as base
 
                                 other_ref_type='rev', other_ref_name=new_org_rev[:12],
 
                                 )) # note: linear diff, merge or not doesn't matter
 
            else:
 
                infos.append(_('This iteration is based on another %s revision and there is no simple diff.') % other_ref_name)
 
        else:
 
            infos.append(_('No changes found on %s %s since previous iteration.') % (org_ref_type, org_ref_name))
 
            # TODO: fail?
 

	
 
        v = 2
 
        m = re.match(r'(.*)\(v(\d+)\)\s*$', title)
kallithea/model/user.py
Show inline comments
 
@@ -157,42 +157,41 @@ class UserModel(object):
 
                meta.Session().add(new_user)
 
                meta.Session().flush() # make database assign new_user.user_id
 

	
 
            if not edit:
 
                log_create_user(new_user.get_dict(), cur_user)
 

	
 
            return new_user
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_registration(self, form_data):
 
        import kallithea.lib.helpers as h
 
        from kallithea.model.notification import NotificationModel
 

	
 
        form_data['admin'] = False
 
        form_data['extern_type'] = db.User.DEFAULT_AUTH_TYPE
 
        form_data['extern_name'] = ''
 
        new_user = self.create(form_data)
 

	
 
        # notification to admins
 
        subject = _('New user registration')
 
        body = (
 
            'New user registration\n'
 
            '---------------------\n'
 
            '- Username: {user.username}\n'
 
            '- Full Name: {user.full_name}\n'
 
            '- Email: {user.email}\n'
 
            ).format(user=new_user)
 
        edit_url = h.canonical_url('edit_user', id=new_user.user_id)
 
        edit_url = webutils.canonical_url('edit_user', id=new_user.user_id)
 
        email_kwargs = {
 
            'registered_user_url': edit_url,
 
            'new_username': new_user.username,
 
            'new_email': new_user.email,
 
            'new_full_name': new_user.full_name}
 
        NotificationModel().create(created_by=new_user, subject=subject,
 
                                   body=body, recipients=None,
 
                                   type_=NotificationModel.TYPE_REGISTRATION,
 
                                   email_kwargs=email_kwargs)
 

	
 
    def update(self, user_id, form_data, skip_attrs=None):
 
        from kallithea.lib.auth import get_crypt_password
kallithea/tests/other/test_libs.py
Show inline comments
 
@@ -22,24 +22,25 @@ Original author and date, and relevant c
 
:created_on: Jun 9, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import hashlib
 

	
 
import mock
 
from tg.util.webtest import test_context
 

	
 
from kallithea.lib import webutils
 
from kallithea.lib.utils2 import AttributeDict, safe_bytes
 
from kallithea.model import db
 
from kallithea.tests import base
 

	
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
@@ -552,49 +553,46 @@ class TestLibs(base.TestController):
 
        ('http://www.example.org', '/abc/xyz', 'http://www.example.org/abc/xyz'),
 
        ('http://www.example.org', '/abc/xyz/', 'http://www.example.org/abc/xyz/'),
 
        ('http://www.example.org', 'abc/xyz/', 'http://www.example.org/abc/xyz/'),
 
        ('http://www.example.org', 'about', 'http://www.example.org/about-page'),
 
        ('http://www.example.org/repos/', 'abc/xyz/', 'http://www.example.org/repos/abc/xyz/'),
 
        ('http://www.example.org/kallithea/repos/', 'abc/xyz/', 'http://www.example.org/kallithea/repos/abc/xyz/'),
 
    ])
 
    def test_canonical_url(self, canonical, test, expected):
 
        # setup url(), used by canonical_url
 
        import routes
 
        from tg import request
 

	
 
        from kallithea.lib.helpers import canonical_url
 
        m = routes.Mapper()
 
        m.connect('about', '/about-page')
 
        url = routes.URLGenerator(m, {'HTTP_HOST': 'http_host.example.org'})
 

	
 
        config_mock = {
 
            'canonical_url': canonical,
 
        }
 

	
 
        with test_context(self.app):
 
            request.environ['routes.url'] = url
 
            with mock.patch('kallithea.CONFIG', config_mock):
 
                assert canonical_url(test) == expected
 
                assert webutils.canonical_url(test) == expected
 

	
 
    @base.parametrize('canonical,expected', [
 
        ('http://www.example.org', 'www.example.org'),
 
        ('http://www.example.org/repos/', 'www.example.org'),
 
        ('http://www.example.org/kallithea/repos/', 'www.example.org'),
 
    ])
 
    def test_canonical_hostname(self, canonical, expected):
 
        import routes
 
        from tg import request
 

	
 
        from kallithea.lib.helpers import canonical_hostname
 

	
 
        # setup url(), used by canonical_hostname
 
        m = routes.Mapper()
 
        url = routes.URLGenerator(m, {'HTTP_HOST': 'http_host.example.org'})
 

	
 
        config_mock = {
 
            'canonical_url': canonical,
 
        }
 

	
 
        with test_context(self.app):
 
            request.environ['routes.url'] = url
 
            with mock.patch('kallithea.CONFIG', config_mock):
 
                assert canonical_hostname() == expected
 
                assert webutils.canonical_hostname() == expected
0 comments (0 inline, 0 general)