Files @ 67e5b90801aa
Branch filter:

Location: kallithea/kallithea/model/pull_request.py

mads
lib: move webhelpers2 and friends to webutils

Gives less of the unfortunate use of helpers - especially in low level libs.
  1
  2
  3
  4
  5
  6
  7
  8
  9
 10
 11
 12
 13
 14
 15
 16
 17
 18
 19
 20
 21
 22
 23
 24
 25
 26
 27
 28
 29
 30
 31
 32
 33
 34
 35
 36
 37
 38
 39
 40
 41
 42
 43
 44
 45
 46
 47
 48
 49
 50
 51
 52
 53
 54
 55
 56
 57
 58
 59
 60
 61
 62
 63
 64
 65
 66
 67
 68
 69
 70
 71
 72
 73
 74
 75
 76
 77
 78
 79
 80
 81
 82
 83
 84
 85
 86
 87
 88
 89
 90
 91
 92
 93
 94
 95
 96
 97
 98
 99
100
101
102
103
104
105
106
107
108
109
110
111
112
113
114
115
116
117
118
119
120
121
122
123
124
125
126
127
128
129
130
131
132
133
134
135
136
137
138
139
140
141
142
143
144
145
146
147
148
149
150
151
152
153
154
155
156
157
158
159
160
161
162
163
164
165
166
167
168
169
170
171
172
173
174
175
176
177
178
179
180
181
182
183
184
185
186
187
188
189
190
191
192
193
194
195
196
197
198
199
200
201
202
203
204
205
206
207
208
209
210
211
212
213
214
215
216
217
218
219
220
221
222
223
224
225
226
227
228
229
230
231
232
233
234
235
236
237
238
239
240
241
242
243
244
245
246
247
248
249
250
251
252
253
254
255
256
257
258
259
260
261
262
263
264
265
266
267
268
269
270
271
272
273
274
275
276
277
278
279
280
281
282
283
284
285
286
287
288
289
290
291
292
293
294
295
296
297
298
299
300
301
302
303
304
305
306
307
308
309
310
311
312
313
314
315
316
317
318
319
320
321
322
323
324
325
326
327
328
329
330
331
332
333
334
335
336
337
338
339
340
341
342
343
344
345
346
347
348
349
350
351
352
353
354
355
356
357
358
359
360
361
362
363
364
365
366
367
368
369
370
371
372
373
374
375
376
377
378
379
380
381
382
383
384
385
386
387
388
389
390
391
392
393
394
395
396
397
398
399
400
401
402
403
404
405
406
# -*- coding: utf-8 -*-
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
# GNU General Public License for more details.
#
# You should have received a copy of the GNU General Public License
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
"""
kallithea.model.pull_request
~~~~~~~~~~~~~~~~~~~~~~~~~~~~

pull request model for Kallithea

This file was forked by the Kallithea project in July 2014.
Original author and date, and relevant copyright and licensing information is below:
:created_on: Jun 6, 2012
:author: marcink
:copyright: (c) 2013 RhodeCode GmbH, and others.
:license: GPLv3, see LICENSE.md for more details.
"""

import datetime
import logging
import re

from tg import request
from tg.i18n import ugettext as _

from kallithea.lib import auth
from kallithea.lib import helpers as h
from kallithea.lib import webutils
from kallithea.lib.hooks import log_create_pullrequest
from kallithea.lib.utils import extract_mentioned_users
from kallithea.lib.utils2 import ascii_bytes
from kallithea.model import db, meta
from kallithea.model.notification import NotificationModel


log = logging.getLogger(__name__)


def _assert_valid_reviewers(seq):
    """Sanity check: elements are actual User objects, and not the default user."""
    assert not any(user.is_default_user for user in seq)


class PullRequestModel(object):

    def add_reviewers(self, user, pr, reviewers, mention_recipients=None):
        """Add reviewer and send notification to them.
        """
        reviewers = set(reviewers)
        _assert_valid_reviewers(reviewers)
        if mention_recipients is not None:
            mention_recipients = set(mention_recipients) - reviewers
            _assert_valid_reviewers(mention_recipients)

        redundant_reviewers = set(db.User.query() \
            .join(db.PullRequestReviewer) \
            .filter(db.PullRequestReviewer.pull_request == pr) \
            .filter(db.PullRequestReviewer.user_id.in_(r.user_id for r in reviewers))
            .all())

        if redundant_reviewers:
            log.debug('Following reviewers were already part of pull request %s: %s', pr.pull_request_id, redundant_reviewers)

            reviewers -= redundant_reviewers

        log.debug('Adding reviewers to pull request %s: %s', pr.pull_request_id, reviewers)
        for reviewer in reviewers:
            prr = db.PullRequestReviewer(reviewer, pr)
            meta.Session().add(prr)

        # notification to reviewers
        pr_url = pr.url(canonical=True)
        threading = ['%s-pr-%s@%s' % (pr.other_repo.repo_name,
                                      pr.pull_request_id,
                                      webutils.canonical_hostname())]
        subject = webutils.link_to(
            _('%(user)s wants you to review pull request %(pr_nice_id)s: %(pr_title)s') %
                {'user': user.username,
                 'pr_title': pr.title,
                 'pr_nice_id': pr.nice_id()},
            pr_url)
        body = pr.description
        _org_ref_type, org_ref_name, _org_rev = pr.org_ref.split(':')
        _other_ref_type, other_ref_name, _other_rev = pr.other_ref.split(':')
        revision_data = [(x.raw_id, x.message)
                         for x in map(pr.org_repo.get_changeset, pr.revisions)]
        email_kwargs = {
            'pr_title': pr.title,
            'pr_title_short': h.shorter(pr.title, 50),
            'pr_user_created': user.full_name_and_username,
            'pr_repo_url': webutils.canonical_url('summary_home', repo_name=pr.other_repo.repo_name),
            'pr_url': pr_url,
            'pr_revisions': revision_data,
            'repo_name': pr.other_repo.repo_name,
            'org_repo_name': pr.org_repo.repo_name,
            'pr_nice_id': pr.nice_id(),
            'pr_target_repo': webutils.canonical_url('summary_home',
                               repo_name=pr.other_repo.repo_name),
            'pr_target_branch': other_ref_name,
            'pr_source_repo': webutils.canonical_url('summary_home',
                               repo_name=pr.org_repo.repo_name),
            'pr_source_branch': org_ref_name,
            'pr_owner': pr.owner,
            'pr_owner_username': pr.owner.username,
            'pr_username': user.username,
            'threading': threading,
            'is_mention': False,
            }
        if reviewers:
            NotificationModel().create(created_by=user, subject=subject, body=body,
                                       recipients=reviewers,
                                       type_=NotificationModel.TYPE_PULL_REQUEST,
                                       email_kwargs=email_kwargs)

        if mention_recipients:
            email_kwargs['is_mention'] = True
            subject = _('[Mention]') + ' ' + subject
            # FIXME: this subject is wrong and unused!
            NotificationModel().create(created_by=user, subject=subject, body=body,
                                       recipients=mention_recipients,
                                       type_=NotificationModel.TYPE_PULL_REQUEST,
                                       email_kwargs=email_kwargs)

        return reviewers, redundant_reviewers

    def mention_from_description(self, user, pr, old_description=''):
        mention_recipients = (extract_mentioned_users(pr.description) -
                              extract_mentioned_users(old_description))

        log.debug("Mentioning %s", mention_recipients)
        self.add_reviewers(user, pr, set(), mention_recipients)

    def remove_reviewers(self, user, pull_request, reviewers):
        """Remove specified users from being reviewers of the PR."""
        if not reviewers:
            return # avoid SQLAlchemy warning about empty sequence for IN-predicate

        db.PullRequestReviewer.query() \
            .filter_by(pull_request=pull_request) \
            .filter(db.PullRequestReviewer.user_id.in_(r.user_id for r in reviewers)) \
            .delete(synchronize_session='fetch') # the default of 'evaluate' is not available

    def delete(self, pull_request):
        pull_request = db.PullRequest.guess_instance(pull_request)
        meta.Session().delete(pull_request)
        if pull_request.org_repo.scm_instance.alias == 'git':
            # remove a ref under refs/pull/ so that commits can be garbage-collected
            try:
                del pull_request.org_repo.scm_instance._repo[b"refs/pull/%d/head" % pull_request.pull_request_id]
            except KeyError:
                pass

    def close_pull_request(self, pull_request):
        pull_request = db.PullRequest.guess_instance(pull_request)
        pull_request.status = db.PullRequest.STATUS_CLOSED
        pull_request.updated_on = datetime.datetime.now()


class CreatePullRequestAction(object):

    class ValidationError(Exception):
        pass

    class Empty(ValidationError):
        pass

    class AmbiguousAncestor(ValidationError):
        pass

    class Unauthorized(ValidationError):
        pass

    @staticmethod
    def is_user_authorized(org_repo, other_repo):
        """Performs authorization check with only the minimum amount of
        information needed for such a check, rather than a full command
        object.
        """
        if (auth.HasRepoPermissionLevel('read')(org_repo.repo_name) and
            auth.HasRepoPermissionLevel('read')(other_repo.repo_name)
        ):
            return True

        return False

    def __init__(self, org_repo, other_repo, org_ref, other_ref, title, description, owner, reviewers):
        reviewers = set(reviewers)
        _assert_valid_reviewers(reviewers)

        (org_ref_type,
         org_ref_name,
         org_rev) = org_ref.split(':')
        org_display = h.short_ref(org_ref_type, org_ref_name)
        if org_ref_type == 'rev':
            cs = org_repo.scm_instance.get_changeset(org_rev)
            org_ref = 'branch:%s:%s' % (cs.branch, cs.raw_id)

        (other_ref_type,
         other_ref_name,
         other_rev) = other_ref.split(':')
        if other_ref_type == 'rev':
            cs = other_repo.scm_instance.get_changeset(other_rev)
            other_ref_name = cs.raw_id[:12]
            other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, cs.raw_id)
        other_display = h.short_ref(other_ref_type, other_ref_name)

        cs_ranges, _cs_ranges_not, ancestor_revs = \
            org_repo.scm_instance.get_diff_changesets(other_rev, org_repo.scm_instance, org_rev) # org and other "swapped"
        if not cs_ranges:
            raise self.Empty(_('Cannot create empty pull request'))

        if not ancestor_revs:
            ancestor_rev = org_repo.scm_instance.EMPTY_CHANGESET
        elif len(ancestor_revs) == 1:
            ancestor_rev = ancestor_revs[0]
        else:
            raise self.AmbiguousAncestor(
                _('Cannot create pull request - criss cross merge detected, please merge a later %s revision to %s')
                % (other_ref_name, org_ref_name))

        self.revisions = [cs_.raw_id for cs_ in cs_ranges]

        # hack: ancestor_rev is not an other_rev but we want to show the
        # requested destination and have the exact ancestor
        other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, ancestor_rev)

        if not title:
            if org_repo == other_repo:
                title = '%s to %s' % (org_display, other_display)
            else:
                title = '%s#%s to %s#%s' % (org_repo.repo_name, org_display,
                                            other_repo.repo_name, other_display)
        description = description or _('No description')

        self.org_repo = org_repo
        self.other_repo = other_repo
        self.org_ref = org_ref
        self.org_rev = org_rev
        self.other_ref = other_ref
        self.title = title
        self.description = description
        self.owner = owner
        self.reviewers = reviewers

        if not CreatePullRequestAction.is_user_authorized(self.org_repo, self.other_repo):
            raise self.Unauthorized(_('You are not authorized to create the pull request'))

    def execute(self):
        created_by = db.User.get(request.authuser.user_id)

        pr = db.PullRequest()
        pr.org_repo = self.org_repo
        pr.org_ref = self.org_ref
        pr.other_repo = self.other_repo
        pr.other_ref = self.other_ref
        pr.revisions = self.revisions
        pr.title = self.title
        pr.description = self.description
        pr.owner = self.owner
        meta.Session().add(pr)
        meta.Session().flush() # make database assign pull_request_id

        if self.org_repo.scm_instance.alias == 'git':
            # create a ref under refs/pull/ so that commits don't get garbage-collected
            self.org_repo.scm_instance._repo[b"refs/pull/%d/head" % pr.pull_request_id] = ascii_bytes(self.org_rev)

        # reset state to under-review
        from kallithea.model.changeset_status import ChangesetStatusModel
        from kallithea.model.comment import ChangesetCommentsModel
        comment = ChangesetCommentsModel().create(
            text='',
            repo=self.org_repo,
            author=created_by,
            pull_request=pr,
            send_email=False,
            status_change=db.ChangesetStatus.STATUS_UNDER_REVIEW,
        )
        ChangesetStatusModel().set_status(
            self.org_repo,
            db.ChangesetStatus.STATUS_UNDER_REVIEW,
            created_by,
            comment,
            pull_request=pr,
        )

        mention_recipients = extract_mentioned_users(self.description)
        PullRequestModel().add_reviewers(created_by, pr, self.reviewers, mention_recipients)

        log_create_pullrequest(pr.get_dict(), created_by)

        return pr


class CreatePullRequestIterationAction(object):
    @staticmethod
    def is_user_authorized(old_pull_request):
        """Performs authorization check with only the minimum amount of
        information needed for such a check, rather than a full command
        object.
        """
        if auth.HasPermissionAny('hg.admin')():
            return True

        # Authorized to edit the old PR?
        if request.authuser.user_id != old_pull_request.owner_id:
            return False

        # Authorized to create a new PR?
        if not CreatePullRequestAction.is_user_authorized(old_pull_request.org_repo, old_pull_request.other_repo):
            return False

        return True

    def __init__(self, old_pull_request, new_org_rev, new_other_rev, title, description, owner, reviewers):
        self.old_pull_request = old_pull_request

        org_repo = old_pull_request.org_repo
        org_ref_type, org_ref_name, org_rev = old_pull_request.org_ref.split(':')

        other_repo = old_pull_request.other_repo
        other_ref_type, other_ref_name, other_rev = old_pull_request.other_ref.split(':') # other_rev is ancestor
        #assert other_ref_type == 'branch', other_ref_type # TODO: what if not?

        new_org_ref = '%s:%s:%s' % (org_ref_type, org_ref_name, new_org_rev)
        new_other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, new_other_rev)

        self.create_action = CreatePullRequestAction(org_repo, other_repo, new_org_ref, new_other_ref, None, None, owner, reviewers)

        # Generate complete title/description

        old_revisions = set(old_pull_request.revisions)
        revisions = self.create_action.revisions
        new_revisions = [r for r in revisions if r not in old_revisions]
        lost = old_revisions.difference(revisions)

        infos = ['This is a new iteration of %s "%s".' %
                 (webutils.canonical_url('pullrequest_show', repo_name=old_pull_request.other_repo.repo_name,
                      pull_request_id=old_pull_request.pull_request_id),
                  old_pull_request.title)]

        if lost:
            infos.append(_('Missing changesets since the previous iteration:'))
            for r in old_pull_request.revisions:
                if r in lost:
                    rev_desc = org_repo.get_changeset(r).message.split('\n')[0]
                    infos.append('  %s %s' % (r[:12], rev_desc))

        if new_revisions:
            infos.append(_('New changesets on %s %s since the previous iteration:') % (org_ref_type, org_ref_name))
            for r in reversed(revisions):
                if r in new_revisions:
                    rev_desc = org_repo.get_changeset(r).message.split('\n')[0]
                    infos.append('  %s %s' % (r[:12], h.shorter(rev_desc, 80)))

            if self.create_action.other_ref == old_pull_request.other_ref:
                infos.append(_("Ancestor didn't change - diff since previous iteration:"))
                infos.append(webutils.canonical_url('compare_url',
                                 repo_name=org_repo.repo_name, # other_repo is always same as repo_name
                                 org_ref_type='rev', org_ref_name=org_rev[:12], # use old org_rev as base
                                 other_ref_type='rev', other_ref_name=new_org_rev[:12],
                                 )) # note: linear diff, merge or not doesn't matter
            else:
                infos.append(_('This iteration is based on another %s revision and there is no simple diff.') % other_ref_name)
        else:
            infos.append(_('No changes found on %s %s since previous iteration.') % (org_ref_type, org_ref_name))
            # TODO: fail?

        v = 2
        m = re.match(r'(.*)\(v(\d+)\)\s*$', title)
        if m is not None:
            title = m.group(1)
            v = int(m.group(2)) + 1
        self.create_action.title = '%s (v%s)' % (title.strip(), v)

        # using a mail-like separator, insert new iteration info in description with latest first
        descriptions = description.replace('\r\n', '\n').split('\n-- \n', 1)
        description = descriptions[0].strip() + '\n\n-- \n' + '\n'.join(infos)
        if len(descriptions) > 1:
            description += '\n\n' + descriptions[1].strip()
        self.create_action.description = description

        if not CreatePullRequestIterationAction.is_user_authorized(self.old_pull_request):
            raise CreatePullRequestAction.Unauthorized(_('You are not authorized to create the pull request'))

    def execute(self):
        pull_request = self.create_action.execute()

        # Close old iteration
        from kallithea.model.comment import ChangesetCommentsModel
        ChangesetCommentsModel().create(
            text=_('Closed, next iteration: %s .') % pull_request.url(canonical=True),
            repo=self.old_pull_request.other_repo_id,
            author=request.authuser.user_id,
            pull_request=self.old_pull_request.pull_request_id,
            closing_pr=True)
        PullRequestModel().close_pull_request(self.old_pull_request.pull_request_id)
        return pull_request