Changeset - b4749d02192d
[Not reviewed]
default
0 5 0
Thomas De Schampheleire - 5 years ago 2020-11-05 22:41:09
thomas.de_schampheleire@nokia.com
model: remove unused 'subject' parameter of NotificationModel.create()

The subject of emails is determined with EmailNotificationModel._subj_map,
based on the notification type. The 'subject' parameter passed to
NotificationModel.create is completely unused.

Remove this parameter and update its callers, removing code that is now no
longer used.
5 files changed with 8 insertions and 40 deletions:
0 comments (0 inline, 0 general)
kallithea/model/comment.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.comment
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
comments model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 11, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
from collections import defaultdict
 

	
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import webutils
 
from kallithea.lib.utils import extract_mentioned_users
 
from kallithea.lib.utils2 import shorter
 
from kallithea.model import db, meta, notification
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _list_changeset_commenters(revision):
 
    return (meta.Session().query(db.User)
 
        .join(db.ChangesetComment.author)
 
        .filter(db.ChangesetComment.revision == revision)
 
        .all())
 

	
 
def _list_pull_request_commenters(pull_request):
 
    return (meta.Session().query(db.User)
 
        .join(db.ChangesetComment.author)
 
        .filter(db.ChangesetComment.pull_request_id == pull_request.pull_request_id)
 
        .all())
 

	
 

	
 
class ChangesetCommentsModel(object):
 

	
 
    def create_notification(self, repo, comment, author, comment_text,
 
                                line_no=None, revision=None, pull_request=None,
 
                                status_change=None, closing_pr=False):
 

	
 
        line = ''
 
        if line_no:
 
            line = _('on line %s') % line_no
 

	
 
        # changeset
 
        if revision:
 
            notification_type = notification.NotificationModel.TYPE_CHANGESET_COMMENT
 
            cs = repo.scm_instance.get_changeset(revision)
 
            desc = cs.short_id
 

	
 
            threading = ['%s-rev-%s@%s' % (repo.repo_name, revision, webutils.canonical_hostname())]
 
            if line_no: # TODO: url to file _and_ line number
 
                threading.append('%s-rev-%s-line-%s@%s' % (repo.repo_name, revision, line_no,
 
                                                           webutils.canonical_hostname()))
 
            comment_url = webutils.canonical_url('changeset_home',
 
                repo_name=repo.repo_name,
 
                revision=revision,
 
                anchor='comment-%s' % comment.comment_id)
 
            subj = webutils.link_to(
 
                'Re changeset: %(desc)s %(line)s' %
 
                          {'desc': desc, 'line': line},
 
                 comment_url)
 
            # get the current participants of this changeset
 
            recipients = _list_changeset_commenters(revision)
 
            # add changeset author if it's known locally
 
            cs_author = db.User.get_from_cs_author(cs.author)
 
            if not cs_author:
 
                # use repo owner if we cannot extract the author correctly
 
                # FIXME: just use committer name even if not a user
 
                cs_author = repo.owner
 
            recipients.append(cs_author)
 

	
 
            email_kwargs = {
 
                'status_change': status_change,
 
                'cs_comment_user': author.full_name_and_username,
 
                'cs_target_repo': webutils.canonical_url('summary_home', repo_name=repo.repo_name),
 
                'cs_comment_url': comment_url,
 
                'cs_url': webutils.canonical_url('changeset_home', repo_name=repo.repo_name, revision=revision),
 
                'raw_id': revision,
 
                'message': cs.message,
 
                'message_short': shorter(cs.message, 50, firstline=True),
 
                'cs_author': cs_author,
 
                'cs_author_username': cs_author.username,
 
                'repo_name': repo.repo_name,
 
                'short_id': revision[:12],
 
                'branch': cs.branch,
 
                'comment_username': author.username,
 
                'threading': threading,
 
            }
 
        # pull request
 
        elif pull_request:
 
            notification_type = notification.NotificationModel.TYPE_PULL_REQUEST_COMMENT
 
            desc = comment.pull_request.title
 
            _org_ref_type, org_ref_name, _org_rev = comment.pull_request.org_ref.split(':')
 
            _other_ref_type, other_ref_name, _other_rev = comment.pull_request.other_ref.split(':')
 
            threading = ['%s-pr-%s@%s' % (pull_request.other_repo.repo_name,
 
                                          pull_request.pull_request_id,
 
                                          webutils.canonical_hostname())]
 
            if line_no: # TODO: url to file _and_ line number
 
                threading.append('%s-pr-%s-line-%s@%s' % (pull_request.other_repo.repo_name,
 
                                                          pull_request.pull_request_id, line_no,
 
                                                          webutils.canonical_hostname()))
 
            comment_url = pull_request.url(canonical=True,
 
                anchor='comment-%s' % comment.comment_id)
 
            subj = webutils.link_to(
 
                'Re pull request %(pr_nice_id)s: %(desc)s %(line)s' %
 
                          {'desc': desc,
 
                           'pr_nice_id': comment.pull_request.nice_id(),
 
                           'line': line},
 
                comment_url)
 
            # get the current participants of this pull request
 
            recipients = _list_pull_request_commenters(pull_request)
 
            recipients.append(pull_request.owner)
 
            recipients += pull_request.get_reviewer_users()
 

	
 
            # set some variables for email notification
 
            email_kwargs = {
 
                'pr_title': pull_request.title,
 
                'pr_title_short': shorter(pull_request.title, 50),
 
                'pr_nice_id': pull_request.nice_id(),
 
                'status_change': status_change,
 
                'closing_pr': closing_pr,
 
                'pr_comment_url': comment_url,
 
                'pr_url': pull_request.url(canonical=True),
 
                'pr_comment_user': author.full_name_and_username,
 
                'pr_target_repo': webutils.canonical_url('summary_home',
 
                                   repo_name=pull_request.other_repo.repo_name),
 
                'pr_target_branch': other_ref_name,
 
                'pr_source_repo': webutils.canonical_url('summary_home',
 
                                   repo_name=pull_request.org_repo.repo_name),
 
                'pr_source_branch': org_ref_name,
 
                'pr_owner': pull_request.owner,
 
                'pr_owner_username': pull_request.owner.username,
 
                'repo_name': pull_request.other_repo.repo_name,
 
                'comment_username': author.username,
 
                'threading': threading,
 
            }
 

	
 
        email_kwargs['is_mention'] = False
 
        # create notification objects, and emails
 
        notification.NotificationModel().create(
 
            created_by=author, subject=subj, body=comment_text,
 
            created_by=author, body=comment_text,
 
            recipients=recipients, type_=notification_type,
 
            email_kwargs=email_kwargs,
 
        )
 

	
 
        mention_recipients = extract_mentioned_users(comment_text).difference(recipients)
 
        if mention_recipients:
 
            email_kwargs['is_mention'] = True
 
            subj = _('[Mention]') + ' ' + subj
 
            # FIXME: this subject is wrong and unused!
 
            notification.NotificationModel().create(
 
                created_by=author, subject=subj, body=comment_text,
 
                created_by=author, body=comment_text,
 
                recipients=mention_recipients,
 
                type_=notification_type,
 
                email_kwargs=email_kwargs
 
            )
 

	
 

	
 
    def create(self, text, repo, author, revision=None, pull_request=None,
 
               f_path=None, line_no=None, status_change=None, closing_pr=False,
 
               send_email=True):
 
        """
 
        Creates a new comment for either a changeset or a pull request.
 
        status_change and closing_pr is only for the optional email.
 

	
 
        Returns the created comment.
 
        """
 
        if not status_change and not text:
 
            log.warning('Missing text for comment, skipping...')
 
            return None
 

	
 
        repo = db.Repository.guess_instance(repo)
 
        author = db.User.guess_instance(author)
 
        comment = db.ChangesetComment()
 
        comment.repo = repo
 
        comment.author = author
 
        comment.text = text
 
        comment.f_path = f_path
 
        comment.line_no = line_no
 

	
 
        if revision is not None:
 
            comment.revision = revision
 
        elif pull_request is not None:
 
            pull_request = db.PullRequest.guess_instance(pull_request)
 
            comment.pull_request = pull_request
 
        else:
 
            raise Exception('Please specify revision or pull_request_id')
 

	
 
        meta.Session().add(comment)
 
        meta.Session().flush()
 

	
 
        if send_email:
 
            self.create_notification(
 
                repo, comment, author, text, line_no, revision, pull_request,
 
                status_change, closing_pr
 
            )
 

	
 
        return comment
 

	
 
    def delete(self, comment):
 
        comment = db.ChangesetComment.guess_instance(comment)
 
        meta.Session().delete(comment)
 

	
 
        return comment
 

	
 
    def get_comments(self, repo_id, revision=None, pull_request=None):
 
        """
 
        Gets general comments for either revision or pull_request.
 

	
 
        Returns a list, ordered by creation date.
 
        """
 
        return self._get_comments(repo_id, revision=revision, pull_request=pull_request,
 
                                  inline=False)
 

	
 
    def get_inline_comments(self, repo_id, revision=None, pull_request=None,
 
                f_path=None, line_no=None):
 
        """
 
        Gets inline comments for either revision or pull_request.
 

	
 
        Returns a list of tuples with file path and list of comments per line number.
 
        """
 
        comments = self._get_comments(repo_id, revision=revision, pull_request=pull_request,
 
                                      inline=True, f_path=f_path, line_no=line_no)
 

	
 
        paths = defaultdict(lambda: defaultdict(list))
 
        for co in comments:
 
            paths[co.f_path][co.line_no].append(co)
 
        return sorted(paths.items())
 

	
 
    def _get_comments(self, repo_id, revision=None, pull_request=None,
 
                inline=False, f_path=None, line_no=None):
 
        """
 
        Gets comments for either revision or pull_request_id, either inline or general.
 
        If a file path and optionally line number are given, return only the matching inline comments.
 
        """
 
        if f_path is None and line_no is not None:
 
            raise Exception("line_no only makes sense if f_path is given.")
 

	
 
        if inline is None and f_path is not None:
 
            raise Exception("f_path only makes sense for inline comments.")
 

	
 
        q = meta.Session().query(db.ChangesetComment)
 

	
 
        if inline:
 
            if f_path is not None:
 
                # inline comments for a given file...
 
                q = q.filter(db.ChangesetComment.f_path == f_path)
 
                if line_no is None:
 
                    # ... on any line
 
                    q = q.filter(db.ChangesetComment.line_no != None)
 
                else:
 
                    # ... on specific line
 
                    q = q.filter(db.ChangesetComment.line_no == line_no)
 
            else:
 
                # all inline comments
 
                q = q.filter(db.ChangesetComment.line_no != None) \
 
                    .filter(db.ChangesetComment.f_path != None)
 
        else:
 
            # all general comments
 
            q = q.filter(db.ChangesetComment.line_no == None) \
 
                .filter(db.ChangesetComment.f_path == None)
 

	
 
        if revision is not None:
 
            q = q.filter(db.ChangesetComment.revision == revision) \
 
                .filter(db.ChangesetComment.repo_id == repo_id)
 
        elif pull_request is not None:
 
            pull_request = db.PullRequest.guess_instance(pull_request)
 
            q = q.filter(db.ChangesetComment.pull_request == pull_request)
 
        else:
 
            raise Exception('Please specify either revision or pull_request')
 

	
 
        return q.order_by(db.ChangesetComment.created_on).all()
kallithea/model/notification.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.notification
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Model for notifications
 

	
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 20, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 

	
 
from tg import app_globals
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib.utils2 import fmt_date
 
from kallithea.model import db
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class NotificationModel(object):
 

	
 
    TYPE_CHANGESET_COMMENT = 'cs_comment'
 
    TYPE_MESSAGE = 'message'
 
    TYPE_MENTION = 'mention' # not used
 
    TYPE_REGISTRATION = 'registration'
 
    TYPE_PULL_REQUEST = 'pull_request'
 
    TYPE_PULL_REQUEST_COMMENT = 'pull_request_comment'
 

	
 
    def create(self, created_by, subject, body, recipients=None,
 
    def create(self, created_by, body, recipients=None,
 
               type_=TYPE_MESSAGE, with_email=True,
 
               email_kwargs=None, repo_name=None):
 
        """
 

	
 
        Creates notification of given type
 

	
 
        :param created_by: int, str or User instance. User who created this
 
            notification
 
        :param subject:
 
        :param body:
 
        :param recipients: list of int, str or User objects, when None
 
            is given send to all admins
 
        :param type_: type of notification
 
        :param with_email: send email with this notification
 
        :param email_kwargs: additional dict to pass as args to email template
 
        """
 
        import kallithea.lib.helpers as h
 
        from kallithea.lib.celerylib import tasks
 
        email_kwargs = email_kwargs or {}
 
        if recipients and not getattr(recipients, '__iter__', False):
 
            raise Exception('recipients must be a list or iterable')
 

	
 
        created_by_obj = db.User.guess_instance(created_by)
 

	
 
        recipients_objs = set()
 
        if recipients:
 
            for u in recipients:
 
                obj = db.User.guess_instance(u)
 
                if obj is not None:
 
                    recipients_objs.add(obj)
 
                else:
 
                    # TODO: inform user that requested operation couldn't be completed
 
                    log.error('cannot email unknown user %r', u)
 
            log.debug('sending notifications %s to %s',
 
                type_, recipients_objs
 
            )
 
        elif recipients is None:
 
            # empty recipients means to all admins
 
            recipients_objs = db.User.query().filter(db.User.admin == True).all()
 
            log.debug('sending notifications %s to admins: %s',
 
                type_, recipients_objs
 
            )
 
        #else: silently skip notification mails?
 

	
 
        if not with_email:
 
            return
 

	
 
        headers = {}
 
        headers['X-Kallithea-Notification-Type'] = type_
 
        if 'threading' in email_kwargs:
 
            headers['References'] = ' '.join('<%s>' % x for x in email_kwargs['threading'])
 

	
 
        # this is passed into template
 
        created_on = fmt_date(datetime.datetime.now())
 
        html_kwargs = {
 
                  'subject': subject,
 
                  'body': h.render_w_mentions(body, repo_name),
 
                  'when': created_on,
 
                  'user': created_by_obj.username,
 
                  }
 

	
 
        txt_kwargs = {
 
                  'subject': subject,
 
                  'body': body,
 
                  'when': created_on,
 
                  'user': created_by_obj.username,
 
                  }
 

	
 
        html_kwargs.update(email_kwargs)
 
        txt_kwargs.update(email_kwargs)
 
        email_subject = EmailNotificationModel() \
 
                            .get_email_description(type_, **txt_kwargs)
 
        email_txt_body = EmailNotificationModel() \
 
                            .get_email_tmpl(type_, 'txt', **txt_kwargs)
 
        email_html_body = EmailNotificationModel() \
 
                            .get_email_tmpl(type_, 'html', **html_kwargs)
 

	
 
        # don't send email to the person who caused the notification, except for
 
        # notifications about new pull requests where the author is explicitly
 
        # added.
 
        rec_mails = set(obj.email for obj in recipients_objs)
 
        if type_ == NotificationModel.TYPE_PULL_REQUEST:
 
            rec_mails.add(created_by_obj.email)
 
        else:
 
            rec_mails.discard(created_by_obj.email)
 

	
 
        # send email with notification to participants
 
        for rec_mail in sorted(rec_mails):
 
            tasks.send_email([rec_mail], email_subject, email_txt_body,
 
                     email_html_body, headers,
 
                     from_name=created_by_obj.full_name_or_username)
 

	
 

	
 
class EmailNotificationModel(object):
 

	
 
    TYPE_CHANGESET_COMMENT = NotificationModel.TYPE_CHANGESET_COMMENT
 
    TYPE_MESSAGE = NotificationModel.TYPE_MESSAGE # only used for testing
 
    # NotificationModel.TYPE_MENTION is not used
 
    TYPE_PASSWORD_RESET = 'password_link'
 
    TYPE_REGISTRATION = NotificationModel.TYPE_REGISTRATION
 
    TYPE_PULL_REQUEST = NotificationModel.TYPE_PULL_REQUEST
 
    TYPE_PULL_REQUEST_COMMENT = NotificationModel.TYPE_PULL_REQUEST_COMMENT
 
    TYPE_DEFAULT = 'default'
 

	
 
    def __init__(self):
 
        super(EmailNotificationModel, self).__init__()
 
        self._tmpl_lookup = app_globals.mako_lookup
 
        self.email_types = {
 
            self.TYPE_CHANGESET_COMMENT: 'changeset_comment',
 
            self.TYPE_PASSWORD_RESET: 'password_reset',
 
            self.TYPE_REGISTRATION: 'registration',
 
            self.TYPE_DEFAULT: 'default',
 
            self.TYPE_PULL_REQUEST: 'pull_request',
 
            self.TYPE_PULL_REQUEST_COMMENT: 'pull_request_comment',
 
        }
 
        self._subj_map = {
 
            self.TYPE_CHANGESET_COMMENT: _('[Comment] %(repo_name)s changeset %(short_id)s "%(message_short)s" on %(branch)s by %(cs_author_username)s'),
 
            self.TYPE_MESSAGE: 'Test Message',
 
            # self.TYPE_PASSWORD_RESET
 
            self.TYPE_REGISTRATION: _('New user %(new_username)s registered'),
 
            # self.TYPE_DEFAULT
 
            self.TYPE_PULL_REQUEST: _('[Review] %(repo_name)s PR %(pr_nice_id)s "%(pr_title_short)s" from %(pr_source_branch)s by %(pr_owner_username)s'),
 
            self.TYPE_PULL_REQUEST_COMMENT: _('[Comment] %(repo_name)s PR %(pr_nice_id)s "%(pr_title_short)s" from %(pr_source_branch)s by %(pr_owner_username)s'),
 
        }
 

	
 
    def get_email_description(self, type_, **kwargs):
 
        """
 
        return subject for email based on given type
 
        """
 
        tmpl = self._subj_map[type_]
 
        try:
 
            subj = tmpl % kwargs
 
        except KeyError as e:
 
            log.error('error generating email subject for %r from %s: %s', type_, ', '.join(self._subj_map), e)
 
            raise
 
        # gmail doesn't do proper threading but will ignore leading square
 
        # bracket content ... so that is where we put status info
 
        bracket_tags = []
 
        status_change = kwargs.get('status_change')
 
        if status_change:
 
            bracket_tags.append(str(status_change))  # apply str to evaluate LazyString before .join
 
        if kwargs.get('closing_pr'):
 
            bracket_tags.append(_('Closing'))
 
        if bracket_tags:
 
            if subj.startswith('['):
 
                subj = '[' + ', '.join(bracket_tags) + ': ' + subj[1:]
 
            else:
 
                subj = '[' + ', '.join(bracket_tags) + '] ' + subj
 
        return subj
 

	
 
    def get_email_tmpl(self, type_, content_type, **kwargs):
 
        """
 
        return generated template for email based on given type
 
        """
 
        import kallithea.lib.helpers as h
 

	
 
        base = 'email/' + self.email_types.get(type_, self.email_types[self.TYPE_DEFAULT]) + '.' + content_type
 
        email_template = self._tmpl_lookup.get_template(base)
 
        # translator and helpers inject
 
        _kwargs = {'_': _,
 
                   'h': h,
 
                   'c': c}
 
        _kwargs.update(kwargs)
 
        if content_type == 'html':
 
            _kwargs.update({
 
                "color_text": "#202020",
 
                "color_emph": "#395fa0",
 
                "color_link": "#395fa0",
 
                "color_border": "#ddd",
 
                "color_background_grey": "#f9f9f9",
 
                "color_button": "#395fa0",
 
                "monospace_style": "font-family:Lucida Console,Consolas,Monaco,Inconsolata,Liberation Mono,monospace",
 
                "sans_style": "font-family:Helvetica,Arial,sans-serif",
 
                })
 
            _kwargs.update({
 
                "default_style": "%(sans_style)s;font-weight:200;font-size:14px;line-height:17px;color:%(color_text)s" % _kwargs,
 
                "comment_style": "%(monospace_style)s;white-space:pre-wrap" % _kwargs,
 
                "data_style": "border:%(color_border)s 1px solid;background:%(color_background_grey)s" % _kwargs,
 
                "emph_style": "font-weight:600;color:%(color_emph)s" % _kwargs,
 
                "link_style": "color:%(color_link)s;text-decoration:none" % _kwargs,
 
                "link_text_style": "color:%(color_text)s;text-decoration:none;border:%(color_border)s 1px solid;background:%(color_background_grey)s" % _kwargs,
 
                })
 

	
 
        log.debug('rendering tmpl %s with kwargs %s', base, _kwargs)
 
        return email_template.render_unicode(**_kwargs)
kallithea/model/pull_request.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.pull_request
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
pull request model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 6, 2012
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import logging
 
import re
 

	
 
from tg import request
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import auth, hooks, webutils
 
from kallithea.lib.utils import extract_mentioned_users
 
from kallithea.lib.utils2 import ascii_bytes, short_ref_name, shorter
 
from kallithea.model import changeset_status, comment, db, meta, notification
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def _assert_valid_reviewers(seq):
 
    """Sanity check: elements are actual User objects, and not the default user."""
 
    assert not any(user.is_default_user for user in seq)
 

	
 

	
 
class PullRequestModel(object):
 

	
 
    def add_reviewers(self, user, pr, reviewers, mention_recipients=None):
 
        """Add reviewer and send notification to them.
 
        """
 
        reviewers = set(reviewers)
 
        _assert_valid_reviewers(reviewers)
 
        if mention_recipients is not None:
 
            mention_recipients = set(mention_recipients) - reviewers
 
            _assert_valid_reviewers(mention_recipients)
 

	
 
        redundant_reviewers = set(db.User.query() \
 
            .join(db.PullRequestReviewer) \
 
            .filter(db.PullRequestReviewer.pull_request == pr) \
 
            .filter(db.PullRequestReviewer.user_id.in_(r.user_id for r in reviewers))
 
            .all())
 

	
 
        if redundant_reviewers:
 
            log.debug('Following reviewers were already part of pull request %s: %s', pr.pull_request_id, redundant_reviewers)
 

	
 
            reviewers -= redundant_reviewers
 

	
 
        log.debug('Adding reviewers to pull request %s: %s', pr.pull_request_id, reviewers)
 
        for reviewer in reviewers:
 
            prr = db.PullRequestReviewer(reviewer, pr)
 
            meta.Session().add(prr)
 

	
 
        # notification to reviewers
 
        pr_url = pr.url(canonical=True)
 
        threading = ['%s-pr-%s@%s' % (pr.other_repo.repo_name,
 
                                      pr.pull_request_id,
 
                                      webutils.canonical_hostname())]
 
        subject = webutils.link_to(
 
            _('%(user)s wants you to review pull request %(pr_nice_id)s: %(pr_title)s') %
 
                {'user': user.username,
 
                 'pr_title': pr.title,
 
                 'pr_nice_id': pr.nice_id()},
 
            pr_url)
 
        body = pr.description
 
        _org_ref_type, org_ref_name, _org_rev = pr.org_ref.split(':')
 
        _other_ref_type, other_ref_name, _other_rev = pr.other_ref.split(':')
 
        revision_data = [(x.raw_id, x.message)
 
                         for x in map(pr.org_repo.get_changeset, pr.revisions)]
 
        email_kwargs = {
 
            'pr_title': pr.title,
 
            'pr_title_short': shorter(pr.title, 50),
 
            'pr_user_created': user.full_name_and_username,
 
            'pr_repo_url': webutils.canonical_url('summary_home', repo_name=pr.other_repo.repo_name),
 
            'pr_url': pr_url,
 
            'pr_revisions': revision_data,
 
            'repo_name': pr.other_repo.repo_name,
 
            'org_repo_name': pr.org_repo.repo_name,
 
            'pr_nice_id': pr.nice_id(),
 
            'pr_target_repo': webutils.canonical_url('summary_home',
 
                               repo_name=pr.other_repo.repo_name),
 
            'pr_target_branch': other_ref_name,
 
            'pr_source_repo': webutils.canonical_url('summary_home',
 
                               repo_name=pr.org_repo.repo_name),
 
            'pr_source_branch': org_ref_name,
 
            'pr_owner': pr.owner,
 
            'pr_owner_username': pr.owner.username,
 
            'pr_username': user.username,
 
            'threading': threading,
 
            'is_mention': False,
 
            }
 
        if reviewers:
 
            notification.NotificationModel().create(created_by=user, subject=subject, body=body,
 
            notification.NotificationModel().create(created_by=user, body=body,
 
                                       recipients=reviewers,
 
                                       type_=notification.NotificationModel.TYPE_PULL_REQUEST,
 
                                       email_kwargs=email_kwargs)
 

	
 
        if mention_recipients:
 
            email_kwargs['is_mention'] = True
 
            subject = _('[Mention]') + ' ' + subject
 
            # FIXME: this subject is wrong and unused!
 
            notification.NotificationModel().create(created_by=user, subject=subject, body=body,
 
            notification.NotificationModel().create(created_by=user, body=body,
 
                                       recipients=mention_recipients,
 
                                       type_=notification.NotificationModel.TYPE_PULL_REQUEST,
 
                                       email_kwargs=email_kwargs)
 

	
 
        return reviewers, redundant_reviewers
 

	
 
    def mention_from_description(self, user, pr, old_description=''):
 
        mention_recipients = (extract_mentioned_users(pr.description) -
 
                              extract_mentioned_users(old_description))
 

	
 
        log.debug("Mentioning %s", mention_recipients)
 
        self.add_reviewers(user, pr, set(), mention_recipients)
 

	
 
    def remove_reviewers(self, user, pull_request, reviewers):
 
        """Remove specified users from being reviewers of the PR."""
 
        if not reviewers:
 
            return # avoid SQLAlchemy warning about empty sequence for IN-predicate
 

	
 
        db.PullRequestReviewer.query() \
 
            .filter_by(pull_request=pull_request) \
 
            .filter(db.PullRequestReviewer.user_id.in_(r.user_id for r in reviewers)) \
 
            .delete(synchronize_session='fetch') # the default of 'evaluate' is not available
 

	
 
    def delete(self, pull_request):
 
        pull_request = db.PullRequest.guess_instance(pull_request)
 
        meta.Session().delete(pull_request)
 
        if pull_request.org_repo.scm_instance.alias == 'git':
 
            # remove a ref under refs/pull/ so that commits can be garbage-collected
 
            try:
 
                del pull_request.org_repo.scm_instance._repo[b"refs/pull/%d/head" % pull_request.pull_request_id]
 
            except KeyError:
 
                pass
 

	
 
    def close_pull_request(self, pull_request):
 
        pull_request = db.PullRequest.guess_instance(pull_request)
 
        pull_request.status = db.PullRequest.STATUS_CLOSED
 
        pull_request.updated_on = datetime.datetime.now()
 

	
 

	
 
class CreatePullRequestAction(object):
 

	
 
    class ValidationError(Exception):
 
        pass
 

	
 
    class Empty(ValidationError):
 
        pass
 

	
 
    class AmbiguousAncestor(ValidationError):
 
        pass
 

	
 
    class Unauthorized(ValidationError):
 
        pass
 

	
 
    @staticmethod
 
    def is_user_authorized(org_repo, other_repo):
 
        """Performs authorization check with only the minimum amount of
 
        information needed for such a check, rather than a full command
 
        object.
 
        """
 
        if (auth.HasRepoPermissionLevel('read')(org_repo.repo_name) and
 
            auth.HasRepoPermissionLevel('read')(other_repo.repo_name)
 
        ):
 
            return True
 

	
 
        return False
 

	
 
    def __init__(self, org_repo, other_repo, org_ref, other_ref, title, description, owner, reviewers):
 
        reviewers = set(reviewers)
 
        _assert_valid_reviewers(reviewers)
 

	
 
        (org_ref_type,
 
         org_ref_name,
 
         org_rev) = org_ref.split(':')
 
        org_display = short_ref_name(org_ref_type, org_ref_name)
 
        if org_ref_type == 'rev':
 
            cs = org_repo.scm_instance.get_changeset(org_rev)
 
            org_ref = 'branch:%s:%s' % (cs.branch, cs.raw_id)
 

	
 
        (other_ref_type,
 
         other_ref_name,
 
         other_rev) = other_ref.split(':')
 
        if other_ref_type == 'rev':
 
            cs = other_repo.scm_instance.get_changeset(other_rev)
 
            other_ref_name = cs.raw_id[:12]
 
            other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, cs.raw_id)
 
        other_display = short_ref_name(other_ref_type, other_ref_name)
 

	
 
        cs_ranges, _cs_ranges_not, ancestor_revs = \
 
            org_repo.scm_instance.get_diff_changesets(other_rev, org_repo.scm_instance, org_rev) # org and other "swapped"
 
        if not cs_ranges:
 
            raise self.Empty(_('Cannot create empty pull request'))
 

	
 
        if not ancestor_revs:
 
            ancestor_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        elif len(ancestor_revs) == 1:
 
            ancestor_rev = ancestor_revs[0]
 
        else:
 
            raise self.AmbiguousAncestor(
 
                _('Cannot create pull request - criss cross merge detected, please merge a later %s revision to %s')
 
                % (other_ref_name, org_ref_name))
 

	
 
        self.revisions = [cs_.raw_id for cs_ in cs_ranges]
 

	
 
        # hack: ancestor_rev is not an other_rev but we want to show the
 
        # requested destination and have the exact ancestor
 
        other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, ancestor_rev)
 

	
 
        if not title:
 
            if org_repo == other_repo:
 
                title = '%s to %s' % (org_display, other_display)
 
            else:
 
                title = '%s#%s to %s#%s' % (org_repo.repo_name, org_display,
 
                                            other_repo.repo_name, other_display)
 
        description = description or _('No description')
 

	
 
        self.org_repo = org_repo
 
        self.other_repo = other_repo
 
        self.org_ref = org_ref
 
        self.org_rev = org_rev
 
        self.other_ref = other_ref
 
        self.title = title
 
        self.description = description
 
        self.owner = owner
 
        self.reviewers = reviewers
 

	
 
        if not CreatePullRequestAction.is_user_authorized(self.org_repo, self.other_repo):
 
            raise self.Unauthorized(_('You are not authorized to create the pull request'))
 

	
 
    def execute(self):
 
        created_by = db.User.get(request.authuser.user_id)
 

	
 
        pr = db.PullRequest()
 
        pr.org_repo = self.org_repo
 
        pr.org_ref = self.org_ref
 
        pr.other_repo = self.other_repo
 
        pr.other_ref = self.other_ref
 
        pr.revisions = self.revisions
 
        pr.title = self.title
 
        pr.description = self.description
 
        pr.owner = self.owner
 
        meta.Session().add(pr)
 
        meta.Session().flush() # make database assign pull_request_id
 

	
 
        if self.org_repo.scm_instance.alias == 'git':
 
            # create a ref under refs/pull/ so that commits don't get garbage-collected
 
            self.org_repo.scm_instance._repo[b"refs/pull/%d/head" % pr.pull_request_id] = ascii_bytes(self.org_rev)
 

	
 
        # reset state to under-review
 
        new_comment = comment.ChangesetCommentsModel().create(
 
            text='',
 
            repo=self.org_repo,
 
            author=created_by,
 
            pull_request=pr,
 
            send_email=False,
 
            status_change=db.ChangesetStatus.STATUS_UNDER_REVIEW,
 
        )
 
        changeset_status.ChangesetStatusModel().set_status(
 
            self.org_repo,
 
            db.ChangesetStatus.STATUS_UNDER_REVIEW,
 
            created_by,
 
            new_comment,
 
            pull_request=pr,
 
        )
 

	
 
        mention_recipients = extract_mentioned_users(self.description)
 
        PullRequestModel().add_reviewers(created_by, pr, self.reviewers, mention_recipients)
 

	
 
        hooks.log_create_pullrequest(pr.get_dict(), created_by)
 

	
 
        return pr
 

	
 

	
 
class CreatePullRequestIterationAction(object):
 
    @staticmethod
 
    def is_user_authorized(old_pull_request):
 
        """Performs authorization check with only the minimum amount of
 
        information needed for such a check, rather than a full command
 
        object.
 
        """
 
        if auth.HasPermissionAny('hg.admin')():
 
            return True
 

	
 
        # Authorized to edit the old PR?
 
        if request.authuser.user_id != old_pull_request.owner_id:
 
            return False
 

	
 
        # Authorized to create a new PR?
 
        if not CreatePullRequestAction.is_user_authorized(old_pull_request.org_repo, old_pull_request.other_repo):
 
            return False
 

	
 
        return True
 

	
kallithea/model/user.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.model.user
 
~~~~~~~~~~~~~~~~~~~~
 

	
 
users model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import hashlib
 
import hmac
 
import logging
 
import time
 
import traceback
 

	
 
from sqlalchemy.exc import DatabaseError
 
from tg import config
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import hooks, webutils
 
from kallithea.lib.exceptions import DefaultUserException, UserOwnsReposException
 
from kallithea.lib.utils2 import check_password, generate_api_key, get_crypt_password, get_current_authuser
 
from kallithea.model import db, forms, meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserModel(object):
 
    password_reset_token_lifetime = 86400 # 24 hours
 

	
 
    def get(self, user_id):
 
        user = db.User.query()
 
        return user.get(user_id)
 

	
 
    def get_user(self, user):
 
        return db.User.guess_instance(user)
 

	
 
    def create(self, form_data, cur_user=None):
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        _fd = form_data
 
        user_data = {
 
            'username': _fd['username'],
 
            'password': _fd['password'],
 
            'email': _fd['email'],
 
            'firstname': _fd['firstname'],
 
            'lastname': _fd['lastname'],
 
            'active': _fd['active'],
 
            'admin': False
 
        }
 
        # raises UserCreationError if it's not allowed
 
        hooks.check_allowed_create_user(user_data, cur_user)
 

	
 
        new_user = db.User()
 
        for k, v in form_data.items():
 
            if k == 'password':
 
                v = get_crypt_password(v)
 
            if k == 'firstname':
 
                k = 'name'
 
            setattr(new_user, k, v)
 

	
 
        new_user.api_key = generate_api_key()
 
        meta.Session().add(new_user)
 
        meta.Session().flush() # make database assign new_user.user_id
 

	
 
        hooks.log_create_user(new_user.get_dict(), cur_user)
 
        return new_user
 

	
 
    def create_or_update(self, username, password, email, firstname='',
 
                         lastname='', active=True, admin=False,
 
                         extern_type=None, extern_name=None, cur_user=None):
 
        """
 
        Creates a new instance if not found, or updates current one
 

	
 
        :param username:
 
        :param password:
 
        :param email:
 
        :param active:
 
        :param firstname:
 
        :param lastname:
 
        :param active:
 
        :param admin:
 
        :param extern_name:
 
        :param extern_type:
 
        :param cur_user:
 
        """
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        user_data = {
 
            'username': username, 'password': password,
 
            'email': email, 'firstname': firstname, 'lastname': lastname,
 
            'active': active, 'admin': admin
 
        }
 
        # raises UserCreationError if it's not allowed
 
        hooks.check_allowed_create_user(user_data, cur_user)
 

	
 
        log.debug('Checking for %s account in Kallithea database', username)
 
        user = db.User.get_by_username(username, case_insensitive=True)
 
        if user is None:
 
            log.debug('creating new user %s', username)
 
            new_user = db.User()
 
            edit = False
 
        else:
 
            log.debug('updating user %s', username)
 
            new_user = user
 
            edit = True
 

	
 
        try:
 
            new_user.username = username
 
            new_user.admin = admin
 
            new_user.email = email
 
            new_user.active = active
 
            new_user.extern_name = extern_name
 
            new_user.extern_type = extern_type
 
            new_user.name = firstname
 
            new_user.lastname = lastname
 

	
 
            if not edit:
 
                new_user.api_key = generate_api_key()
 

	
 
            # set password only if creating an user or password is changed
 
            password_change = new_user.password and \
 
                not check_password(password, new_user.password)
 
            if not edit or password_change:
 
                reason = 'new password' if edit else 'new user'
 
                log.debug('Updating password reason=>%s', reason)
 
                new_user.password = get_crypt_password(password) \
 
                    if password else ''
 

	
 
            if user is None:
 
                meta.Session().add(new_user)
 
                meta.Session().flush() # make database assign new_user.user_id
 

	
 
            if not edit:
 
                hooks.log_create_user(new_user.get_dict(), cur_user)
 

	
 
            return new_user
 
        except (DatabaseError,):
 
            log.error(traceback.format_exc())
 
            raise
 

	
 
    def create_registration(self, form_data):
 
        from kallithea.model import notification
 

	
 
        form_data['admin'] = False
 
        form_data['extern_type'] = db.User.DEFAULT_AUTH_TYPE
 
        form_data['extern_name'] = ''
 
        new_user = self.create(form_data)
 

	
 
        # notification to admins
 
        subject = _('New user registration')
 
        body = (
 
            'New user registration\n'
 
            '---------------------\n'
 
            '- Username: {user.username}\n'
 
            '- Full Name: {user.full_name}\n'
 
            '- Email: {user.email}\n'
 
            ).format(user=new_user)
 
        edit_url = webutils.canonical_url('edit_user', id=new_user.user_id)
 
        email_kwargs = {
 
            'registered_user_url': edit_url,
 
            'new_username': new_user.username,
 
            'new_email': new_user.email,
 
            'new_full_name': new_user.full_name}
 
        notification.NotificationModel().create(created_by=new_user, subject=subject,
 
        notification.NotificationModel().create(created_by=new_user,
 
                                   body=body, recipients=None,
 
                                   type_=notification.NotificationModel.TYPE_REGISTRATION,
 
                                   email_kwargs=email_kwargs)
 

	
 
    def update(self, user_id, form_data, skip_attrs=None):
 
        skip_attrs = skip_attrs or []
 
        user = self.get(user_id)
 
        if user.is_default_user:
 
            raise DefaultUserException(
 
                            _("You can't edit this user since it's "
 
                              "crucial for entire application"))
 

	
 
        for k, v in form_data.items():
 
            if k in skip_attrs:
 
                continue
 
            if k == 'new_password' and v:
 
                user.password = get_crypt_password(v)
 
            else:
 
                # old legacy thing orm models store firstname as name,
 
                # need proper refactor to username
 
                if k == 'firstname':
 
                    k = 'name'
 
                setattr(user, k, v)
 

	
 
    def update_user(self, user, **kwargs):
 
        user = db.User.guess_instance(user)
 
        if user.is_default_user:
 
            raise DefaultUserException(
 
                _("You can't edit this user since it's"
 
                  " crucial for entire application")
 
            )
 

	
 
        for k, v in kwargs.items():
 
            if k == 'password' and v:
 
                v = get_crypt_password(v)
 

	
 
            setattr(user, k, v)
 
        return user
 

	
 
    def delete(self, user, cur_user=None):
 
        if cur_user is None:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 
        user = db.User.guess_instance(user)
 

	
 
        if user.is_default_user:
 
            raise DefaultUserException(
 
                _("You can't remove this user since it is"
 
                  " crucial for the entire application"))
 
        if user.repositories:
 
            repos = [x.repo_name for x in user.repositories]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s repositories and cannot be '
 
                  'removed. Switch owners or remove those repositories: %s')
 
                % (user.username, len(repos), ', '.join(repos)))
 
        if user.repo_groups:
 
            repogroups = [x.group_name for x in user.repo_groups]
 
            raise UserOwnsReposException(_(
 
                'User "%s" still owns %s repository groups and cannot be '
 
                'removed. Switch owners or remove those repository groups: %s')
 
                % (user.username, len(repogroups), ', '.join(repogroups)))
 
        if user.user_groups:
 
            usergroups = [x.users_group_name for x in user.user_groups]
 
            raise UserOwnsReposException(
 
                _('User "%s" still owns %s user groups and cannot be '
 
                  'removed. Switch owners or remove those user groups: %s')
 
                % (user.username, len(usergroups), ', '.join(usergroups)))
 
        meta.Session().delete(user)
 

	
 
        hooks.log_delete_user(user.get_dict(), cur_user)
 

	
 
    def can_change_password(self, user):
 
        from kallithea.lib import auth_modules
 
        managed_fields = auth_modules.get_managed_fields(user)
 
        return 'password' not in managed_fields
 

	
 
    def get_reset_password_token(self, user, timestamp, session_id):
 
        """
 
        The token is a 40-digit hexstring, calculated as a HMAC-SHA1.
 

	
 
        In a traditional HMAC scenario, an attacker is unable to know or
 
        influence the secret key, but can know or influence the message
 
        and token. This scenario is slightly different (in particular
 
        since the message sender is also the message recipient), but
 
        sufficiently similar to use an HMAC. Benefits compared to a plain
 
        SHA1 hash includes resistance against a length extension attack.
 

	
 
        The HMAC key consists of the following values (known only to the
 
        server and authorized users):
 

	
 
        * per-application secret (the `app_instance_uuid` setting), without
 
          which an attacker cannot counterfeit tokens
 
        * hashed user password, invalidating the token upon password change
 

	
 
        The HMAC message consists of the following values (potentially known
 
        to an attacker):
 

	
 
        * session ID (the anti-CSRF token), requiring an attacker to have
 
          access to the browser session in which the token was created
 
        * numeric user ID, limiting the token to a specific user (yet allowing
 
          users to be renamed)
 
        * user email address
 
        * time of token issue (a Unix timestamp, to enable token expiration)
 

	
 
        The key and message values are separated by NUL characters, which are
 
        guaranteed not to occur in any of the values.
 
        """
 
        app_secret = config.get('app_instance_uuid')
 
        return hmac.new(
 
            '\0'.join([app_secret, user.password]).encode('utf-8'),
 
            msg='\0'.join([session_id, str(user.user_id), user.email, str(timestamp)]).encode('utf-8'),
 
            digestmod=hashlib.sha1,
 
        ).hexdigest()
 

	
 
    def send_reset_password_email(self, data):
 
        """
 
        Sends email with a password reset token and link to the password
 
        reset confirmation page with all information (including the token)
 
        pre-filled. Also returns URL of that page, only without the token,
 
        allowing users to copy-paste or manually enter the token from the
 
        email.
 
        """
 
        from kallithea.lib.celerylib import tasks
 
        from kallithea.model import notification
 

	
 
        user_email = data['email']
 
        user = db.User.get_by_email(user_email)
 
        timestamp = int(time.time())
 
        if user is not None:
 
            if self.can_change_password(user):
 
                log.debug('password reset user %s found', user)
 
                token = self.get_reset_password_token(user,
 
                                                      timestamp,
 
                                                      webutils.session_csrf_secret_token())
 
                # URL must be fully qualified; but since the token is locked to
 
                # the current browser session, we must provide a URL with the
 
                # current scheme and hostname, rather than the canonical_url.
 
                link = webutils.url('reset_password_confirmation', qualified=True,
 
                             email=user_email,
 
                             timestamp=timestamp,
 
                             token=token)
 
            else:
 
                log.debug('password reset user %s found but was managed', user)
 
                token = link = None
 
            reg_type = notification.EmailNotificationModel.TYPE_PASSWORD_RESET
 
            body = notification.EmailNotificationModel().get_email_tmpl(
 
                reg_type, 'txt',
 
                user=user.short_contact,
 
                reset_token=token,
 
                reset_url=link)
 
            html_body = notification.EmailNotificationModel().get_email_tmpl(
 
                reg_type, 'html',
 
                user=user.short_contact,
 
                reset_token=token,
 
                reset_url=link)
 
            log.debug('sending email')
 
            tasks.send_email([user_email], _("Password reset link"), body, html_body)
 
            log.info('send new password mail to %s', user_email)
 
        else:
 
            log.debug("password reset email %s not found", user_email)
 

	
 
        return webutils.url('reset_password_confirmation',
 
                     email=user_email,
 
                     timestamp=timestamp)
 

	
 
    def verify_reset_password_token(self, email, timestamp, token):
 
        user = db.User.get_by_email(email)
 
        if user is None:
 
            log.debug("user with email %s not found", email)
 
            return False
 

	
 
        token_age = int(time.time()) - int(timestamp)
 

	
 
        if token_age < 0:
 
            log.debug('timestamp is from the future')
 
            return False
 

	
 
        if token_age > UserModel.password_reset_token_lifetime:
 
            log.debug('password reset token expired')
 
            return False
 

	
 
        expected_token = self.get_reset_password_token(user,
 
                                                       timestamp,
 
                                                       webutils.session_csrf_secret_token())
 
        log.debug('computed password reset token: %s', expected_token)
 
        log.debug('received password reset token: %s', token)
 
        return expected_token == token
 

	
 
    def reset_password(self, user_email, new_passwd):
 
        from kallithea.lib.celerylib import tasks
 
        user = db.User.get_by_email(user_email)
 
        if user is not None:
 
            if not self.can_change_password(user):
kallithea/tests/models/test_notifications.py
Show inline comments
 
import os
 
import re
 

	
 
import mock
 
from tg.util.webtest import test_context
 

	
 
import kallithea.lib.celerylib
 
import kallithea.lib.celerylib.tasks
 
import kallithea.lib.helpers as h
 
from kallithea.model import db, meta
 
from kallithea.model.notification import EmailNotificationModel, NotificationModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 

	
 

	
 
class TestNotifications(base.TestController):
 

	
 
    def setup_method(self, method):
 
        meta.Session.remove()
 
        u1 = UserModel().create_or_update(username='u1',
 
                                        password='qweqwe',
 
                                        email='u1@example.com',
 
                                        firstname='u1', lastname='u1')
 
        meta.Session().commit()
 
        self.u1 = u1.user_id
 

	
 
        u2 = UserModel().create_or_update(username='u2',
 
                                        password='qweqwe',
 
                                        email='u2@example.com',
 
                                        firstname='u2', lastname='u3')
 
        meta.Session().commit()
 
        self.u2 = u2.user_id
 

	
 
        u3 = UserModel().create_or_update(username='u3',
 
                                        password='qweqwe',
 
                                        email='u3@example.com',
 
                                        firstname='u3', lastname='u3')
 
        meta.Session().commit()
 
        self.u3 = u3.user_id
 

	
 
    def test_create_notification(self):
 
        with test_context(self.app):
 
            usrs = [self.u1, self.u2]
 

	
 
            def send_email(recipients, subject, body='', html_body='', headers=None, from_name=None):
 
                assert recipients == ['u2@example.com']
 
                assert subject == 'Test Message'
 
                assert body == "hi there"
 
                assert '>hi there<' in html_body
 
                assert from_name == 'u1 u1'
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                NotificationModel().create(created_by=self.u1,
 
                                                   subject='subj', body='hi there',
 
                                                   body='hi there',
 
                                                   recipients=usrs)
 

	
 
    @mock.patch.object(h, 'canonical_url', (lambda arg, **kwargs: 'http://%s/?%s' % (arg, '&'.join('%s=%s' % (k, v) for (k, v) in sorted(kwargs.items())))))
 
    def test_dump_html_mails(self):
 
        # Exercise all notification types and dump them to one big html file
 
        l = []
 

	
 
        def send_email(recipients, subject, body='', html_body='', headers=None, from_name=None):
 
            l.append('<hr/>\n')
 
            l.append('<h1>%s</h1>\n' % desc) # desc is from outer scope
 
            l.append('<pre>\n')
 
            l.append('From: %s <name@example.com>\n' % from_name)
 
            l.append('To: %s\n' % ' '.join(recipients))
 
            l.append('Subject: %s\n' % subject)
 
            l.append('</pre>\n')
 
            l.append('<hr/>\n')
 
            l.append('<pre>%s</pre>\n' % body)
 
            l.append('<hr/>\n')
 
            l.append(html_body)
 
            l.append('<hr/>\n')
 

	
 
        with test_context(self.app):
 
            with mock.patch.object(kallithea.lib.celerylib.tasks, 'send_email', send_email):
 
                pr_kwargs = dict(
 
                    pr_nice_id='#7',
 
                    pr_title='The Title',
 
                    pr_title_short='The Title',
 
                    pr_url='http://pr.org/7',
 
                    pr_target_repo='http://mainline.com/repo',
 
                    pr_target_branch='trunk',
 
                    pr_source_repo='https://dev.org/repo',
 
                    pr_source_branch='devbranch',
 
                    pr_owner=db.User.get(self.u2),
 
                    pr_owner_username='u2'
 
                    )
 

	
 
                for type_, body, kwargs in [
 
                        (NotificationModel.TYPE_CHANGESET_COMMENT,
 
                         'This is the new \'comment\'.\n\n - and here it ends indented.',
 
                         dict(
 
                            short_id='cafe1234',
 
                            raw_id='cafe1234c0ffeecafe',
 
                            branch='brunch',
 
                            cs_comment_user='Opinionated User (jsmith)',
 
                            cs_comment_url='http://comment.org',
 
                            is_mention=[False, True],
 
                            message='This changeset did something clever which is hard to explain',
 
                            message_short='This changeset did something cl...',
 
                            status_change=[None, 'Approved'],
 
                            cs_target_repo='http://example.com/repo_target',
 
                            cs_url='http://changeset.com',
 
                            cs_author_username=db.User.get(self.u2).username,
 
                            cs_author=db.User.get(self.u2))),
 
                        (NotificationModel.TYPE_MESSAGE,
 
                         'This is the \'body\' of the "test" message\n - nothing interesting here except indentation.',
 
                         dict()),
 
                        #(NotificationModel.TYPE_MENTION, '$body', None), # not used
 
                        (NotificationModel.TYPE_REGISTRATION,
 
                         'Registration body',
 
                         dict(
 
                            new_username='newbie',
 
                            registered_user_url='http://newbie.org',
 
                            new_email='new@email.com',
 
                            new_full_name='New Full Name')),
 
                        (NotificationModel.TYPE_PULL_REQUEST,
 
                         'This PR is \'awesome\' because it does <stuff>\n - please approve indented!',
 
                         dict(
 
                            pr_user_created='Requesting User (root)', # pr_owner should perhaps be used for @mention in description ...
 
                            is_mention=[False, True],
 
                            pr_revisions=[('123abc'*7, "Introduce one and two\n\nand that's it"), ('567fed'*7, 'Make one plus two equal tree')],
 
                            org_repo_name='repo_org',
 
                            **pr_kwargs)),
 
                        (NotificationModel.TYPE_PULL_REQUEST_COMMENT,
 
                         'Me too!\n\n - and indented on second line',
 
                         dict(
 
                            closing_pr=[False, True],
 
                            is_mention=[False, True],
 
                            pr_comment_user='Opinionated User (jsmith)',
 
                            pr_comment_url='http://pr.org/comment',
 
                            status_change=[None, 'Under Review'],
 
                            **pr_kwargs)),
 
                        ]:
 
                    kwargs['repo_name'] = 'repo/name'
 
                    params = [(type_, type_, body, kwargs)]
 
                    for param_name in ['is_mention', 'status_change', 'closing_pr']: # TODO: inline/general
 
                        if not isinstance(kwargs.get(param_name), list):
 
                            continue
 
                        new_params = []
 
                        for v in kwargs[param_name]:
 
                            for desc, type_, body, kwargs in params:
 
                                kwargs = dict(kwargs)
 
                                kwargs[param_name] = v
 
                                new_params.append(('%s, %s=%r' % (desc, param_name, v), type_, body, kwargs))
 
                        params = new_params
 

	
 
                    for desc, type_, body, kwargs in params:
 
                        # desc is used as "global" variable
 
                        NotificationModel().create(created_by=self.u1,
 
                                                           subject='unused', body=body, email_kwargs=kwargs,
 
                                                           body=body, email_kwargs=kwargs,
 
                                                           recipients=[self.u2], type_=type_)
 

	
 
                # Email type TYPE_PASSWORD_RESET has no corresponding notification type - test it directly:
 
                desc = 'TYPE_PASSWORD_RESET'
 
                kwargs = dict(user='John Doe', reset_token='decbf64715098db5b0bd23eab44bd792670ab746', reset_url='http://reset.com/decbf64715098db5b0bd23eab44bd792670ab746')
 
                kallithea.lib.celerylib.tasks.send_email(['john@doe.com'],
 
                    "Password reset link",
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'txt', **kwargs),
 
                    EmailNotificationModel().get_email_tmpl(EmailNotificationModel.TYPE_PASSWORD_RESET, 'html', **kwargs),
 
                    from_name=db.User.get(self.u1).full_name_or_username)
 

	
 
        out = '<!doctype html>\n<html lang="en">\n<head><title>Notifications</title><meta http-equiv="Content-Type" content="text/html; charset=UTF-8"></head>\n<body>\n%s\n</body>\n</html>\n' % \
 
            re.sub(r'<(/?(?:!doctype|html|head|title|meta|body)\b[^>]*)>', r'<!--\1-->', ''.join(l))
 

	
 
        outfn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.out.html')
 
        reffn = os.path.join(os.path.dirname(__file__), 'test_dump_html_mails.ref.html')
 
        with open(outfn, 'w') as f:
 
            f.write(out)
 
        with open(reffn) as f:
 
            ref = f.read()
 
        assert ref == out # copy test_dump_html_mails.out.html to test_dump_html_mails.ref.html to update expectations
 
        os.unlink(outfn)
0 comments (0 inline, 0 general)