Changeset - 0452c45a546c
[Not reviewed]
default
0 3 0
Søren Løvborg - 9 years ago 2017-04-11 14:37:43
sorenl@unity3d.com
pullrequests: fix broken "new PR iteration" handling of ancestor changes

An earlier refactor (5d60c9a391cd) flubbed this code and accidentally
assumed the most recent common ancestor would not change when iterating,
which is of course only the case when there are no merges from 'other' to
'org' among the new revisions.

This was not only not caught during manual testing (nor review), but
neither did the test suite test this. That has now also been rectified.
3 files changed with 87 insertions and 4 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/pullrequests.py
Show inline comments
 
@@ -173,386 +173,387 @@ class PullrequestsController(BaseRepoCon
 
                else:
 
                    selected = 'tag:null:' + repo.EMPTY_CHANGESET
 
                    tags.append((selected, 'null'))
 
            else:
 
                if 'master' in repo.branches:
 
                    selected = 'branch:master:%s' % repo.branches['master']
 
                else:
 
                    k, v = repo.branches.items()[0]
 
                    selected = 'branch:%s:%s' % (k, v)
 

	
 
        groups = [(specials, _("Special")),
 
                  (peers, _("Peer branches")),
 
                  (bookmarks, _("Bookmarks")),
 
                  (branches, _("Branches")),
 
                  (tags, _("Tags")),
 
                  ]
 
        return [g for g in groups if g[0]], selected
 

	
 
    def _get_is_allowed_change_status(self, pull_request):
 
        if pull_request.is_closed():
 
            return False
 

	
 
        owner = request.authuser.user_id == pull_request.owner_id
 
        reviewer = PullRequestReviewer.query() \
 
            .filter(PullRequestReviewer.pull_request == pull_request) \
 
            .filter(PullRequestReviewer.user_id == request.authuser.user_id) \
 
            .count() != 0
 

	
 
        return request.authuser.admin or owner or reviewer
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def show_all(self, repo_name):
 
        c.from_ = request.GET.get('from_') or ''
 
        c.closed = request.GET.get('closed') or ''
 
        p = safe_int(request.GET.get('page'), 1)
 

	
 
        q = PullRequest.query(include_closed=c.closed, sorted=True)
 
        if c.from_:
 
            q = q.filter_by(org_repo=c.db_repo)
 
        else:
 
            q = q.filter_by(other_repo=c.db_repo)
 
        c.pull_requests = q.all()
 

	
 
        c.pullrequests_pager = Page(c.pull_requests, page=p, items_per_page=100)
 

	
 
        return render('/pullrequests/pullrequest_show_all.html')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    def show_my(self):
 
        c.closed = request.GET.get('closed') or ''
 

	
 
        c.my_pull_requests = PullRequest.query(
 
            include_closed=c.closed,
 
            sorted=True,
 
        ).filter_by(owner_id=request.authuser.user_id).all()
 

	
 
        c.participate_in_pull_requests = []
 
        c.participate_in_pull_requests_todo = []
 
        done_status = set([ChangesetStatus.STATUS_APPROVED, ChangesetStatus.STATUS_REJECTED])
 
        for pr in PullRequest.query(
 
            include_closed=c.closed,
 
            reviewer_id=request.authuser.user_id,
 
            sorted=True,
 
        ):
 
            status = pr.user_review_status(request.authuser.user_id) # very inefficient!!!
 
            if status in done_status:
 
                c.participate_in_pull_requests.append(pr)
 
            else:
 
                c.participate_in_pull_requests_todo.append(pr)
 

	
 
        return render('/pullrequests/pullrequest_show_my.html')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def index(self):
 
        org_repo = c.db_repo
 
        org_scm_instance = org_repo.scm_instance
 
        try:
 
            org_scm_instance.get_changeset()
 
        except EmptyRepositoryError as e:
 
            h.flash(h.literal(_('There are no changesets yet')),
 
                    category='warning')
 
            raise HTTPFound(location=url('summary_home', repo_name=org_repo.repo_name))
 

	
 
        org_rev = request.GET.get('rev_end')
 
        # rev_start is not directly useful - its parent could however be used
 
        # as default for other and thus give a simple compare view
 
        rev_start = request.GET.get('rev_start')
 
        other_rev = None
 
        if rev_start:
 
            starters = org_repo.get_changeset(rev_start).parents
 
            if starters:
 
                other_rev = starters[0].raw_id
 
            else:
 
                other_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        branch = request.GET.get('branch')
 

	
 
        c.cs_repos = [(org_repo.repo_name, org_repo.repo_name)]
 
        c.default_cs_repo = org_repo.repo_name
 
        c.cs_refs, c.default_cs_ref = self._get_repo_refs(org_scm_instance, rev=org_rev, branch=branch)
 

	
 
        default_cs_ref_type, default_cs_branch, default_cs_rev = c.default_cs_ref.split(':')
 
        if default_cs_ref_type != 'branch':
 
            default_cs_branch = org_repo.get_changeset(default_cs_rev).branch
 

	
 
        # add org repo to other so we can open pull request against peer branches on itself
 
        c.a_repos = [(org_repo.repo_name, '%s (self)' % org_repo.repo_name)]
 

	
 
        if org_repo.parent:
 
            # add parent of this fork also and select it.
 
            # use the same branch on destination as on source, if available.
 
            c.a_repos.append((org_repo.parent.repo_name, '%s (parent)' % org_repo.parent.repo_name))
 
            c.a_repo = org_repo.parent
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(
 
                    org_repo.parent.scm_instance, branch=default_cs_branch, rev=other_rev)
 

	
 
        else:
 
            c.a_repo = org_repo
 
            c.a_refs, c.default_a_ref = self._get_repo_refs(org_scm_instance, rev=other_rev)
 

	
 
        # gather forks and add to this list ... even though it is rare to
 
        # request forks to pull from their parent
 
        for fork in org_repo.forks:
 
            c.a_repos.append((fork.repo_name, fork.repo_name))
 

	
 
        return render('/pullrequests/pullrequest.html')
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def repo_info(self, repo_name):
 
        repo = c.db_repo
 
        refs, selected_ref = self._get_repo_refs(repo.scm_instance)
 
        return {
 
            'description': repo.description.split('\n', 1)[0],
 
            'selected_ref': selected_ref,
 
            'refs': refs,
 
            }
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def create(self, repo_name):
 
        repo = c.db_repo
 
        try:
 
            _form = PullRequestForm(repo.repo_id)().to_python(request.POST)
 
        except formencode.Invalid as errors:
 
            log.error(traceback.format_exc())
 
            log.error(str(errors))
 
            msg = _('Error creating pull request: %s') % errors.msg
 
            h.flash(msg, 'error')
 
            raise HTTPBadRequest
 

	
 
        # heads up: org and other might seem backward here ...
 
        org_ref = _form['org_ref'] # will have merge_rev as rev but symbolic name
 
        org_repo = Repository.guess_instance(_form['org_repo'])
 

	
 
        other_ref = _form['other_ref'] # will have symbolic name and head revision
 
        other_repo = Repository.guess_instance(_form['other_repo'])
 

	
 
        reviewers = []
 

	
 
        title = _form['pullrequest_title']
 
        description = _form['pullrequest_desc'].strip()
 
        owner = User.get(request.authuser.user_id)
 

	
 
        try:
 
            cmd = CreatePullRequestAction(org_repo, other_repo, org_ref, other_ref, title, description, owner, reviewers)
 
        except CreatePullRequestAction.ValidationError as e:
 
            h.flash(str(e), category='error', logf=log.error)
 
            raise HTTPNotFound
 

	
 
        try:
 
            pull_request = cmd.execute()
 
            Session().commit()
 
        except Exception:
 
            h.flash(_('Error occurred while creating pull request'),
 
                    category='error')
 
            log.error(traceback.format_exc())
 
            raise HTTPFound(location=url('pullrequest_home', repo_name=repo_name))
 

	
 
        h.flash(_('Successfully opened new pull request'),
 
                category='success')
 
        raise HTTPFound(location=pull_request.url())
 

	
 
    def create_new_iteration(self, old_pull_request, new_rev, title, description, reviewers):
 
        owner = User.get(request.authuser.user_id)
 
        new_org_rev = self._get_ref_rev(old_pull_request.org_repo, 'rev', new_rev)
 
        new_other_rev = self._get_ref_rev(old_pull_request.other_repo, old_pull_request.other_ref_parts[0], old_pull_request.other_ref_parts[1])
 
        try:
 
            cmd = CreatePullRequestIterationAction(old_pull_request, new_org_rev, title, description, owner, reviewers)
 
            cmd = CreatePullRequestIterationAction(old_pull_request, new_org_rev, new_other_rev, title, description, owner, reviewers)
 
        except CreatePullRequestAction.ValidationError as e:
 
            h.flash(str(e), category='error', logf=log.error)
 
            raise HTTPNotFound
 

	
 
        try:
 
            pull_request = cmd.execute()
 
            Session().commit()
 
        except Exception:
 
            h.flash(_('Error occurred while creating pull request'),
 
                    category='error')
 
            log.error(traceback.format_exc())
 
            raise HTTPFound(location=old_pull_request.url())
 

	
 
        h.flash(_('New pull request iteration created'),
 
                category='success')
 
        raise HTTPFound(location=pull_request.url())
 

	
 
    # pullrequest_post for PR editing
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def post(self, repo_name, pull_request_id):
 
        pull_request = PullRequest.get_or_404(pull_request_id)
 
        if pull_request.is_closed():
 
            raise HTTPForbidden()
 
        assert pull_request.other_repo.repo_name == repo_name
 
        #only owner or admin can update it
 
        owner = pull_request.owner_id == request.authuser.user_id
 
        repo_admin = h.HasRepoPermissionLevel('admin')(c.repo_name)
 
        if not (h.HasPermissionAny('hg.admin')() or repo_admin or owner):
 
            raise HTTPForbidden()
 

	
 
        _form = PullRequestPostForm()().to_python(request.POST)
 

	
 
        cur_reviewers = set(pull_request.get_reviewer_users())
 
        new_reviewers = set(_get_reviewer(s) for s in _form['review_members'])
 
        old_reviewers = set(_get_reviewer(s) for s in _form['org_review_members'])
 

	
 
        other_added = cur_reviewers - old_reviewers
 
        other_removed = old_reviewers - cur_reviewers
 

	
 
        if other_added:
 
            h.flash(_('Meanwhile, the following reviewers have been added: %s') %
 
                    (', '.join(u.username for u in other_added)),
 
                    category='warning')
 
        if other_removed:
 
            h.flash(_('Meanwhile, the following reviewers have been removed: %s') %
 
                    (', '.join(u.username for u in other_removed)),
 
                    category='warning')
 

	
 
        if _form['updaterev']:
 
            return self.create_new_iteration(pull_request,
 
                                      _form['updaterev'],
 
                                      _form['pullrequest_title'],
 
                                      _form['pullrequest_desc'],
 
                                      new_reviewers)
 

	
 
        added_reviewers = new_reviewers - old_reviewers - cur_reviewers
 
        removed_reviewers = (old_reviewers - new_reviewers) & cur_reviewers
 

	
 
        old_description = pull_request.description
 
        pull_request.title = _form['pullrequest_title']
 
        pull_request.description = _form['pullrequest_desc'].strip() or _('No description')
 
        pull_request.owner = User.get_by_username(_form['owner'])
 
        user = User.get(request.authuser.user_id)
 

	
 
        PullRequestModel().mention_from_description(user, pull_request, old_description)
 
        PullRequestModel().add_reviewers(user, pull_request, added_reviewers)
 
        PullRequestModel().remove_reviewers(user, pull_request, removed_reviewers)
 

	
 
        Session().commit()
 
        h.flash(_('Pull request updated'), category='success')
 

	
 
        raise HTTPFound(location=pull_request.url())
 

	
 
    @LoginRequired()
 
    @NotAnonymous()
 
    @HasRepoPermissionLevelDecorator('read')
 
    @jsonify
 
    def delete(self, repo_name, pull_request_id):
 
        pull_request = PullRequest.get_or_404(pull_request_id)
 
        #only owner can delete it !
 
        if pull_request.owner_id == request.authuser.user_id:
 
            PullRequestModel().delete(pull_request)
 
            Session().commit()
 
            h.flash(_('Successfully deleted pull request'),
 
                    category='success')
 
            raise HTTPFound(location=url('my_pullrequests'))
 
        raise HTTPForbidden()
 

	
 
    @LoginRequired()
 
    @HasRepoPermissionLevelDecorator('read')
 
    def show(self, repo_name, pull_request_id, extra=None):
 
        repo_model = RepoModel()
 
        c.users_array = repo_model.get_users_js()
 
        c.user_groups_array = repo_model.get_user_groups_js()
 
        c.pull_request = PullRequest.get_or_404(pull_request_id)
 
        c.allowed_to_change_status = self._get_is_allowed_change_status(c.pull_request)
 
        cc_model = ChangesetCommentsModel()
 
        cs_model = ChangesetStatusModel()
 

	
 
        # pull_requests repo_name we opened it against
 
        # ie. other_repo must match
 
        if repo_name != c.pull_request.other_repo.repo_name:
 
            raise HTTPNotFound
 

	
 
        # load compare data into template context
 
        c.cs_repo = c.pull_request.org_repo
 
        (c.cs_ref_type,
 
         c.cs_ref_name,
 
         c.cs_rev) = c.pull_request.org_ref.split(':')
 

	
 
        c.a_repo = c.pull_request.other_repo
 
        (c.a_ref_type,
 
         c.a_ref_name,
 
         c.a_rev) = c.pull_request.other_ref.split(':') # a_rev is ancestor
 

	
 
        org_scm_instance = c.cs_repo.scm_instance # property with expensive cache invalidation check!!!
 
        try:
 
            c.cs_ranges = [org_scm_instance.get_changeset(x)
 
                           for x in c.pull_request.revisions]
 
        except ChangesetDoesNotExistError:
 
            c.cs_ranges = []
 
            h.flash(_('Revision %s not found in %s') % (x, c.cs_repo.repo_name),
 
                'error')
 
        c.cs_ranges_org = None # not stored and not important and moving target - could be calculated ...
 
        revs = [ctx.revision for ctx in reversed(c.cs_ranges)]
 
        c.jsdata = graph_data(org_scm_instance, revs)
 

	
 
        c.is_range = False
 
        try:
 
            if c.a_ref_type == 'rev': # this looks like a free range where target is ancestor
 
                cs_a = org_scm_instance.get_changeset(c.a_rev)
 
                root_parents = c.cs_ranges[0].parents
 
                c.is_range = cs_a in root_parents
 
                #c.merge_root = len(root_parents) > 1 # a range starting with a merge might deserve a warning
 
        except ChangesetDoesNotExistError: # probably because c.a_rev not found
 
            pass
 
        except IndexError: # probably because c.cs_ranges is empty, probably because revisions are missing
 
            pass
 

	
 
        avail_revs = set()
 
        avail_show = []
 
        c.cs_branch_name = c.cs_ref_name
 
        c.a_branch_name = None
 
        other_scm_instance = c.a_repo.scm_instance
 
        c.update_msg = ""
 
        c.update_msg_other = ""
 
        try:
 
            if not c.cs_ranges:
 
                c.update_msg = _('Error: changesets not found when displaying pull request from %s.') % c.cs_rev
 
            elif org_scm_instance.alias == 'hg' and c.a_ref_name != 'ancestor':
 
                if c.cs_ref_type != 'branch':
 
                    c.cs_branch_name = org_scm_instance.get_changeset(c.cs_ref_name).branch # use ref_type ?
 
                c.a_branch_name = c.a_ref_name
 
                if c.a_ref_type != 'branch':
 
                    try:
 
                        c.a_branch_name = other_scm_instance.get_changeset(c.a_ref_name).branch # use ref_type ?
 
                    except EmptyRepositoryError:
 
                        c.a_branch_name = 'null' # not a branch name ... but close enough
 
                # candidates: descendants of old head that are on the right branch
 
                #             and not are the old head itself ...
 
                #             and nothing at all if old head is a descendant of target ref name
 
                if not c.is_range and other_scm_instance._repo.revs('present(%s)::&%s', c.cs_ranges[-1].raw_id, c.a_branch_name):
 
                    c.update_msg = _('This pull request has already been merged to %s.') % c.a_branch_name
 
                elif c.pull_request.is_closed():
 
                    c.update_msg = _('This pull request has been closed and can not be updated.')
 
                else: # look for descendants of PR head on source branch in org repo
 
                    avail_revs = org_scm_instance._repo.revs('%s:: & branch(%s)',
 
                                                             revs[0], c.cs_branch_name)
 
                    if len(avail_revs) > 1: # more than just revs[0]
 
                        # also show changesets that not are descendants but would be merged in
 
                        targethead = other_scm_instance.get_changeset(c.a_branch_name).raw_id
 
                        if org_scm_instance.path != other_scm_instance.path:
 
                            # Note: org_scm_instance.path must come first so all
 
                            # valid revision numbers are 100% org_scm compatible
 
                            # - both for avail_revs and for revset results
 
                            hgrepo = unionrepo.unionrepository(org_scm_instance.baseui,
 
                                                               org_scm_instance.path,
 
                                                               other_scm_instance.path)
 
                        else:
 
                            hgrepo = org_scm_instance._repo
 
                        show = set(hgrepo.revs('::%ld & !::parents(%s) & !::%s',
 
                                               avail_revs, revs[0], targethead))
 
                        c.update_msg = _('The following additional changes are available on %s:') % c.cs_branch_name
 
                    else:
 
                        show = set()
 
                        avail_revs = set() # drop revs[0]
 
                        c.update_msg = _('No additional changesets found for iterating on this pull request.')
 

	
 
                    # TODO: handle branch heads that not are tip-most
 
                    brevs = org_scm_instance._repo.revs('%s - %ld - %s', c.cs_branch_name, avail_revs, revs[0])
kallithea/model/pull_request.py
Show inline comments
 
@@ -108,275 +108,276 @@ class PullRequestModel(object):
 
            }
 
        if reviewers:
 
            NotificationModel().create(created_by=user, subject=subject, body=body,
 
                                       recipients=reviewers,
 
                                       type_=Notification.TYPE_PULL_REQUEST,
 
                                       email_kwargs=email_kwargs)
 

	
 
        if mention_recipients:
 
            email_kwargs['is_mention'] = True
 
            subject = _('[Mention]') + ' ' + subject
 
            # FIXME: this subject is wrong and unused!
 
            NotificationModel().create(created_by=user, subject=subject, body=body,
 
                                       recipients=mention_recipients,
 
                                       type_=Notification.TYPE_PULL_REQUEST,
 
                                       email_kwargs=email_kwargs)
 

	
 
    def mention_from_description(self, user, pr, old_description=''):
 
        mention_recipients = (extract_mentioned_users(pr.description) -
 
                              extract_mentioned_users(old_description))
 

	
 
        log.debug("Mentioning %s", mention_recipients)
 
        self.add_reviewers(user, pr, set(), mention_recipients)
 

	
 
    def remove_reviewers(self, user, pull_request, reviewers):
 
        """Remove specified users from being reviewers of the PR."""
 

	
 
        PullRequestReviewer.query() \
 
            .filter_by(pull_request=pull_request) \
 
            .filter(PullRequestReviewer.user_id.in_(r.user_id for r in reviewers)) \
 
            .delete(synchronize_session='fetch') # the default of 'evaluate' is not available
 

	
 
    def delete(self, pull_request):
 
        pull_request = PullRequest.guess_instance(pull_request)
 
        Session().delete(pull_request)
 

	
 
    def close_pull_request(self, pull_request):
 
        pull_request = PullRequest.guess_instance(pull_request)
 
        pull_request.status = PullRequest.STATUS_CLOSED
 
        pull_request.updated_on = datetime.datetime.now()
 

	
 

	
 
class CreatePullRequestAction(object):
 

	
 
    class ValidationError(Exception):
 
        pass
 

	
 
    class Empty(ValidationError):
 
        pass
 

	
 
    class AmbiguousAncestor(ValidationError):
 
        pass
 

	
 
    class Unauthorized(ValidationError):
 
        pass
 

	
 
    @staticmethod
 
    def is_user_authorized(org_repo, other_repo):
 
        """Performs authorization check with only the minimum amount of
 
        information needed for such a check, rather than a full command
 
        object.
 
        """
 
        if (h.HasRepoPermissionLevel('read')(org_repo.repo_name) and
 
            h.HasRepoPermissionLevel('read')(other_repo.repo_name)):
 
            return True
 

	
 
        return False
 

	
 
    def __init__(self, org_repo, other_repo, org_ref, other_ref, title, description, owner, reviewers):
 
        from kallithea.controllers.compare import CompareController
 
        reviewers = set(reviewers)
 
        _assert_valid_reviewers(reviewers)
 

	
 
        (org_ref_type,
 
         org_ref_name,
 
         org_rev) = org_ref.split(':')
 
        org_display = h.short_ref(org_ref_type, org_ref_name)
 
        if org_ref_type == 'rev':
 
            cs = org_repo.scm_instance.get_changeset(org_rev)
 
            org_ref = 'branch:%s:%s' % (cs.branch, cs.raw_id)
 

	
 
        (other_ref_type,
 
         other_ref_name,
 
         other_rev) = other_ref.split(':')
 
        if other_ref_type == 'rev':
 
            cs = other_repo.scm_instance.get_changeset(other_rev)
 
            other_ref_name = cs.raw_id[:12]
 
            other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, cs.raw_id)
 
        other_display = h.short_ref(other_ref_type, other_ref_name)
 

	
 
        cs_ranges, _cs_ranges_not, ancestor_revs = \
 
            CompareController._get_changesets(org_repo.scm_instance.alias,
 
                                              other_repo.scm_instance, other_rev, # org and other "swapped"
 
                                              org_repo.scm_instance, org_rev,
 
                                              )
 
        if not cs_ranges:
 
            raise self.Empty(_('Cannot create empty pull request'))
 

	
 
        if not ancestor_revs:
 
            ancestor_rev = org_repo.scm_instance.EMPTY_CHANGESET
 
        elif len(ancestor_revs) == 1:
 
            ancestor_rev = ancestor_revs[0]
 
        else:
 
            raise self.AmbiguousAncestor(
 
                _('Cannot create pull request - criss cross merge detected, please merge a later %s revision to %s')
 
                % (other_ref_name, org_ref_name))
 

	
 
        self.revisions = [cs_.raw_id for cs_ in cs_ranges]
 

	
 
        # hack: ancestor_rev is not an other_rev but we want to show the
 
        # requested destination and have the exact ancestor
 
        other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, ancestor_rev)
 

	
 
        if not title:
 
            if org_repo == other_repo:
 
                title = '%s to %s' % (org_display, other_display)
 
            else:
 
                title = '%s#%s to %s#%s' % (org_repo.repo_name, org_display,
 
                                            other_repo.repo_name, other_display)
 
        description = description or _('No description')
 

	
 
        self.org_repo = org_repo
 
        self.other_repo = other_repo
 
        self.org_ref = org_ref
 
        self.other_ref = other_ref
 
        self.title = title
 
        self.description = description
 
        self.owner = owner
 
        self.reviewers = reviewers
 

	
 
        if not CreatePullRequestAction.is_user_authorized(self.org_repo, self.other_repo):
 
            raise self.Unauthorized(_('You are not authorized to create the pull request'))
 

	
 
    def execute(self):
 
        created_by = User.get(request.authuser.user_id)
 

	
 
        pr = PullRequest()
 
        pr.org_repo = self.org_repo
 
        pr.org_ref = self.org_ref
 
        pr.other_repo = self.other_repo
 
        pr.other_ref = self.other_ref
 
        pr.revisions = self.revisions
 
        pr.title = self.title
 
        pr.description = self.description
 
        pr.owner = self.owner
 
        Session().add(pr)
 
        Session().flush() # make database assign pull_request_id
 

	
 
        #reset state to under-review
 
        from kallithea.model.changeset_status import ChangesetStatusModel
 
        from kallithea.model.comment import ChangesetCommentsModel
 
        comment = ChangesetCommentsModel().create(
 
            text=u'',
 
            repo=self.org_repo,
 
            author=created_by,
 
            pull_request=pr,
 
            send_email=False,
 
            status_change=ChangesetStatus.STATUS_UNDER_REVIEW,
 
        )
 
        ChangesetStatusModel().set_status(
 
            self.org_repo,
 
            ChangesetStatus.STATUS_UNDER_REVIEW,
 
            created_by,
 
            comment,
 
            pull_request=pr,
 
        )
 

	
 
        mention_recipients = extract_mentioned_users(self.description)
 
        PullRequestModel().add_reviewers(created_by, pr, self.reviewers, mention_recipients)
 

	
 
        return pr
 

	
 

	
 
class CreatePullRequestIterationAction(object):
 
    @staticmethod
 
    def is_user_authorized(old_pull_request):
 
        """Performs authorization check with only the minimum amount of
 
        information needed for such a check, rather than a full command
 
        object.
 
        """
 
        if h.HasPermissionAny('hg.admin')():
 
            return True
 

	
 
        # Authorized to edit the old PR?
 
        if request.authuser.user_id != old_pull_request.owner_id:
 
            return False
 

	
 
        # Authorized to create a new PR?
 
        if not CreatePullRequestAction.is_user_authorized(old_pull_request.org_repo, old_pull_request.other_repo):
 
            return False
 

	
 
        return True
 

	
 
    def __init__(self, old_pull_request, new_org_rev, title, description, owner, reviewers):
 
    def __init__(self, old_pull_request, new_org_rev, new_other_rev, title, description, owner, reviewers):
 
        self.old_pull_request = old_pull_request
 

	
 
        org_repo = old_pull_request.org_repo
 
        org_ref_type, org_ref_name, org_rev = old_pull_request.org_ref.split(':')
 

	
 
        other_repo = old_pull_request.other_repo
 
        other_ref_type, other_ref_name, other_rev = old_pull_request.other_ref.split(':') # other_rev is ancestor
 
        #assert other_ref_type == 'branch', other_ref_type # TODO: what if not?
 

	
 
        new_org_ref = '%s:%s:%s' % (org_ref_type, org_ref_name, new_org_rev)
 
        new_other_ref = '%s:%s:%s' % (other_ref_type, other_ref_name, new_other_rev)
 

	
 
        self.create_action = CreatePullRequestAction(org_repo, other_repo, new_org_ref, old_pull_request.other_ref, None, None, owner, reviewers)
 
        self.create_action = CreatePullRequestAction(org_repo, other_repo, new_org_ref, new_other_ref, None, None, owner, reviewers)
 

	
 
        # Generate complete title/description
 

	
 
        old_revisions = set(old_pull_request.revisions)
 
        revisions = self.create_action.revisions
 
        new_revisions = [r for r in revisions if r not in old_revisions]
 
        lost = old_revisions.difference(revisions)
 

	
 
        infos = ['This is a new iteration of %s "%s".' %
 
                 (h.canonical_url('pullrequest_show', repo_name=old_pull_request.other_repo.repo_name,
 
                      pull_request_id=old_pull_request.pull_request_id),
 
                  old_pull_request.title)]
 

	
 
        if lost:
 
            infos.append(_('Missing changesets since the previous iteration:'))
 
            for r in old_pull_request.revisions:
 
                if r in lost:
 
                    rev_desc = org_repo.get_changeset(r).message.split('\n')[0]
 
                    infos.append('  %s %s' % (h.short_id(r), rev_desc))
 

	
 
        if new_revisions:
 
            infos.append(_('New changesets on %s %s since the previous iteration:') % (org_ref_type, org_ref_name))
 
            for r in reversed(revisions):
 
                if r in new_revisions:
 
                    rev_desc = org_repo.get_changeset(r).message.split('\n')[0]
 
                    infos.append('  %s %s' % (h.short_id(r), h.shorter(rev_desc, 80)))
 

	
 
            if self.create_action.other_ref == old_pull_request.other_ref:
 
                infos.append(_("Ancestor didn't change - diff since previous iteration:"))
 
                infos.append(h.canonical_url('compare_url',
 
                                 repo_name=org_repo.repo_name, # other_repo is always same as repo_name
 
                                 org_ref_type='rev', org_ref_name=h.short_id(org_rev), # use old org_rev as base
 
                                 other_ref_type='rev', other_ref_name=h.short_id(new_org_rev),
 
                                 )) # note: linear diff, merge or not doesn't matter
 
            else:
 
                infos.append(_('This iteration is based on another %s revision and there is no simple diff.') % other_ref_name)
 
        else:
 
           infos.append(_('No changes found on %s %s since previous iteration.') % (org_ref_type, org_ref_name))
 
           # TODO: fail?
 

	
 
        try:
 
            title, old_v = re.match(r'(.*)\(v(\d+)\)\s*$', title).groups()
 
            v = int(old_v) + 1
 
        except (AttributeError, ValueError):
 
            v = 2
 
        self.create_action.title = '%s (v%s)' % (title.strip(), v)
 

	
 
        # using a mail-like separator, insert new iteration info in description with latest first
 
        descriptions = description.replace('\r\n', '\n').split('\n-- \n', 1)
 
        description = descriptions[0].strip() + '\n\n-- \n' + '\n'.join(infos)
 
        if len(descriptions) > 1:
 
            description += '\n\n' + descriptions[1].strip()
 
        self.create_action.description = description
 

	
 
        if not CreatePullRequestIterationAction.is_user_authorized(self.old_pull_request):
 
            raise CreatePullRequestAction.Unauthorized(_('You are not authorized to create the pull request'))
 

	
 
    def execute(self):
 
        pull_request = self.create_action.execute()
 

	
 
        # Close old iteration
 
        from kallithea.model.comment import ChangesetCommentsModel
 
        ChangesetCommentsModel().create(
 
            text=_('Closed, next iteration: %s .') % pull_request.url(canonical=True),
 
            repo=self.old_pull_request.other_repo_id,
 
            author=request.authuser.user_id,
 
            pull_request=self.old_pull_request.pull_request_id,
 
            closing_pr=True)
 
        PullRequestModel().close_pull_request(self.old_pull_request.pull_request_id)
 
        return pull_request
kallithea/tests/functional/test_pullrequests.py
Show inline comments
 
import re
 
import pytest
 

	
 
from tg.util.webtest import test_context
 

	
 
from kallithea.tests.base import *
 
from kallithea.tests.fixture import Fixture
 
from kallithea.model.db import User
 
from kallithea.model.db import PullRequest, User
 
from kallithea.model.meta import Session
 

	
 
from kallithea.controllers.pullrequests import PullrequestsController
 

	
 
fixture = Fixture()
 

	
 
class TestPullrequestsController(TestController):
 

	
 
    def test_index(self):
 
        self.log_user()
 
        response = self.app.get(url(controller='pullrequests', action='index',
 
                                    repo_name=HG_REPO))
 

	
 
    def test_create_trivial(self):
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:stable:4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response = response.follow()
 
        assert response.status == '200 OK'
 
        response.mustcontain('Successfully opened new pull request')
 
        response.mustcontain('No additional changesets found for iterating on this pull request')
 
        response.mustcontain('href="/vcs_test_hg/changeset/4f7e2131323e0749a740c0a56ab68ae9269c562a"')
 

	
 
    def test_available(self):
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'rev:94f45ed825a1:94f45ed825a113e61af7e141f44ca578374abef0',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response = response.follow()
 
        assert response.status == '200 OK'
 
        response.mustcontain(no='No additional changesets found for iterating on this pull request')
 
        response.mustcontain('The following additional changes are available on stable:')
 
        response.mustcontain('<input id="updaterev_4f7e2131323e0749a740c0a56ab68ae9269c562a" name="updaterev" type="radio" value="4f7e2131323e0749a740c0a56ab68ae9269c562a" />')
 
        response.mustcontain('href="/vcs_test_hg/changeset/4f7e2131323e0749a740c0a56ab68ae9269c562a"') # as update
 

	
 
    def test_range(self):
 
        self.log_user()
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:stable:4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'rev:94f45ed825a1:94f45ed825a113e61af7e141f44ca578374abef0',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        response = response.follow()
 
        assert response.status == '200 OK'
 
        response.mustcontain('No additional changesets found for iterating on this pull request')
 
        response.mustcontain('href="/vcs_test_hg/changeset/4f7e2131323e0749a740c0a56ab68ae9269c562a"')
 

	
 
    def test_update_reviewers(self):
 
        self.log_user()
 
        regular_user = User.get_by_username(TEST_USER_REGULAR_LOGIN)
 
        regular_user2 = User.get_by_username(TEST_USER_REGULAR2_LOGIN)
 
        admin_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
 

	
 
        # create initial PR
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {'org_repo': HG_REPO,
 
                                  'org_ref': 'rev:94f45ed825a1:94f45ed825a113e61af7e141f44ca578374abef0',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 },
 
                                 status=302)
 
        pull_request1_id = re.search('/pull-request/(\d+)/', response.location).group(1)
 
        assert response.location == 'http://localhost/%s/pull-request/%s/_/stable' % (HG_REPO, pull_request1_id)
 

	
 
        # create new iteration
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request1_id),
 
                                 {
 
                                  'updaterev': '4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': [regular_user.user_id],
 
                                 },
 
                                 status=302)
 
        pull_request2_id = re.search('/pull-request/(\d+)/', response.location).group(1)
 
        assert pull_request2_id != pull_request1_id
 
        assert response.location == 'http://localhost/%s/pull-request/%s/_/stable' % (HG_REPO, pull_request2_id)
 
        response = response.follow()
 
        # verify reviewer was added
 
        response.mustcontain('<input type="hidden" value="%s" name="review_members" />' % regular_user.user_id)
 

	
 
        # update without creating new iteration
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request2_id),
 
                                 {
 
                                  'pullrequest_title': 'Title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'org_review_members': [admin_user.user_id], # fake - just to get some 'meanwhile' warning ... but it is also added ...
 
                                  'review_members': [regular_user2.user_id, admin_user.user_id],
 
                                 },
 
                                 status=302)
 
        assert response.location == 'http://localhost/%s/pull-request/%s/_/stable' % (HG_REPO, pull_request2_id)
 
        response = response.follow()
 
        # verify reviewers were added / removed
 
        response.mustcontain('Meanwhile, the following reviewers have been added: test_regular')
 
        response.mustcontain('Meanwhile, the following reviewers have been removed: test_admin')
 
        response.mustcontain('<input type="hidden" value="%s" name="review_members" />' % regular_user.user_id)
 
        response.mustcontain('<input type="hidden" value="%s" name="review_members" />' % regular_user2.user_id)
 
        response.mustcontain(no='<input type="hidden" value="%s" name="review_members" />' % admin_user.user_id)
 

	
 
    def test_update_with_invalid_reviewer(self):
 
        invalid_user_id = 99999
 
        self.log_user()
 
        # create a valid pull request
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {
 
                                  'org_repo': HG_REPO,
 
                                  'org_ref': 'rev:94f45ed825a1:94f45ed825a113e61af7e141f44ca578374abef0',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 },
 
                                status=302)
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search('/pull-request/(\d+)/', response.location)
 
        assert m != None
 
        pull_request_id = m.group(1)
 

	
 
        # update it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
                                  'updaterev': '4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': [str(invalid_user_id)],
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_id)
 

	
 
    def test_edit_with_invalid_reviewer(self):
 
        invalid_user_id = 99999
 
        self.log_user()
 
        # create a valid pull request
 
        response = self.app.post(url(controller='pullrequests', action='create',
 
                                     repo_name=HG_REPO),
 
                                 {
 
                                  'org_repo': HG_REPO,
 
                                  'org_ref': 'branch:stable:4f7e2131323e0749a740c0a56ab68ae9269c562a',
 
                                  'other_repo': HG_REPO,
 
                                  'other_ref': 'branch:default:96507bd11ecc815ebc6270fdf6db110928c09c1e',
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  '_authentication_token': self.authentication_token(),
 
                                 },
 
                                status=302)
 
        # location is of the form:
 
        # http://localhost/vcs_test_hg/pull-request/54/_/title
 
        m = re.search('/pull-request/(\d+)/', response.location)
 
        assert m != None
 
        pull_request_id = m.group(1)
 

	
 
        # edit it
 
        response = self.app.post(url(controller='pullrequests', action='post',
 
                                     repo_name=HG_REPO, pull_request_id=pull_request_id),
 
                                 {
 
                                  'pullrequest_title': 'title',
 
                                  'pullrequest_desc': 'description',
 
                                  'owner': TEST_USER_ADMIN_LOGIN,
 
                                  '_authentication_token': self.authentication_token(),
 
                                  'review_members': [str(invalid_user_id)],
 
                                 },
 
                                 status=400)
 
        response.mustcontain('Invalid reviewer &#34;%s&#34; specified' % invalid_user_id)
 

	
 

	
 
    def test_iteration_refs(self):
 
        # Repo graph excerpt:
 
        #   o   fb95b340e0d0 webvcs
 
        #  /:
 
        # o :   41d2568309a0 default
 
        # : :
 
        # : o   5ec21f21aafe webvcs
 
        # : :
 
        # : o   9e6119747791 webvcs
 
        # : :
 
        # o :   3d1091ee5a53 default
 
        # :/
 
        # o     948da46b29c1 default
 

	
 
        self.log_user()
 

	
 
        # create initial PR
 
        response = self.app.post(
 
            url(controller='pullrequests', action='create', repo_name=HG_REPO),
 
            {
 
                'org_repo': HG_REPO,
 
                'org_ref': 'rev:9e6119747791:9e6119747791ff886a5abe1193a730b6bf874e1c',
 
                'other_repo': HG_REPO,
 
                'other_ref': 'branch:default:3d1091ee5a533b1f4577ec7d8a226bb315fb1336',
 
                'pullrequest_title': 'title',
 
                'pullrequest_desc': 'description',
 
                '_authentication_token': self.authentication_token(),
 
            },
 
            status=302)
 
        pr1_id = int(re.search('/pull-request/(\d+)/', response.location).group(1))
 
        pr1 = PullRequest.get(pr1_id)
 

	
 
        assert pr1.org_ref == 'branch:webvcs:9e6119747791ff886a5abe1193a730b6bf874e1c'
 
        assert pr1.other_ref == 'branch:default:948da46b29c125838a717f6a8496eb409717078d'
 

	
 
        Session().rollback() # invalidate loaded PR objects before issuing next request.
 

	
 
        # create PR 2 (new iteration with same ancestor)
 
        response = self.app.post(
 
            url(controller='pullrequests', action='post', repo_name=HG_REPO, pull_request_id=pr1_id),
 
            {
 
                'updaterev': '5ec21f21aafe95220f1fc4843a4a57c378498b71',
 
                'pullrequest_title': 'title',
 
                'pullrequest_desc': 'description',
 
                'owner': TEST_USER_REGULAR_LOGIN,
 
                '_authentication_token': self.authentication_token(),
 
             },
 
             status=302)
 
        pr2_id = int(re.search('/pull-request/(\d+)/', response.location).group(1))
 
        pr1 = PullRequest.get(pr1_id)
 
        pr2 = PullRequest.get(pr2_id)
 

	
 
        assert pr2_id != pr1_id
 
        assert pr1.status == PullRequest.STATUS_CLOSED
 
        assert pr2.org_ref == 'branch:webvcs:5ec21f21aafe95220f1fc4843a4a57c378498b71'
 
        assert pr2.other_ref == pr1.other_ref
 

	
 
        Session().rollback() # invalidate loaded PR objects before issuing next request.
 

	
 
        # create PR 3 (new iteration with new ancestor)
 
        response = self.app.post(
 
            url(controller='pullrequests', action='post', repo_name=HG_REPO, pull_request_id=pr2_id),
 
            {
 
                'updaterev': 'fb95b340e0d03fa51f33c56c991c08077c99303e',
 
                'pullrequest_title': 'title',
 
                'pullrequest_desc': 'description',
 
                'owner': TEST_USER_REGULAR_LOGIN,
 
                '_authentication_token': self.authentication_token(),
 
             },
 
             status=302)
 
        pr3_id = int(re.search('/pull-request/(\d+)/', response.location).group(1))
 
        pr2 = PullRequest.get(pr2_id)
 
        pr3 = PullRequest.get(pr3_id)
 

	
 
        assert pr3_id != pr2_id
 
        assert pr2.status == PullRequest.STATUS_CLOSED
 
        assert pr3.org_ref == 'branch:webvcs:fb95b340e0d03fa51f33c56c991c08077c99303e'
 
        assert pr3.other_ref == 'branch:default:41d2568309a05f422cffb8008e599d385f8af439'
 

	
 

	
 
@pytest.mark.usefixtures("test_context_fixture") # apply fixture for all test methods
 
class TestPullrequestsGetRepoRefs(TestController):
 

	
 
    def setup_method(self, method):
 
        self.repo_name = u'main'
 
        repo = fixture.create_repo(self.repo_name, repo_type='hg')
 
        self.repo_scm_instance = repo.scm_instance
 
        Session.commit()
 
        self.c = PullrequestsController()
 

	
 
    def teardown_method(self, method):
 
        fixture.destroy_repo(u'main')
 
        Session.commit()
 
        Session.remove()
 

	
 
    def test_repo_refs_empty_repo(self):
 
        # empty repo with no commits, no branches, no bookmarks, just one tag
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance)
 
        assert default == 'tag:null:0000000000000000000000000000000000000000'
 

	
 
    def test_repo_refs_one_commit_no_hints(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance)
 
        assert default == 'branch:default:%s' % cs0.raw_id
 
        assert ([('branch:default:%s' % cs0.raw_id, 'default (current tip)')],
 
                'Branches') in refs
 

	
 
    def test_repo_refs_one_commit_rev_hint(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance, rev=cs0.raw_id)
 
        expected = 'branch:default:%s' % cs0.raw_id
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_two_commits_no_hints(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance)
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_two_commits_rev_hints(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance, rev=cs0.raw_id)
 
        expected = 'rev:%s:%s' % (cs0.raw_id, cs0.raw_id)
 
        assert default == expected
 
        assert ([(expected, 'Changeset: %s' % cs0.raw_id[0:12])], 'Special') in refs
 
        assert ([('branch:default:%s' % cs1.raw_id, 'default (current tip)')], 'Branches') in refs
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance, rev=cs1.raw_id)
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_two_commits_branch_hint(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        cs1 = fixture.commit_change(self.repo_name, filename='file2',
 
                content='line2\n', message='commit2', vcs_type='hg',
 
                parent=None, newfile=True)
 

	
 
        refs, default = self.c._get_repo_refs(self.repo_scm_instance, branch='default')
 
        expected = 'branch:default:%s' % cs1.raw_id
 
        assert default == expected
 
        assert ([(expected, 'default (current tip)')], 'Branches') in refs
 

	
 
    def test_repo_refs_one_branch_no_hints(self):
 
        cs0 = fixture.commit_change(self.repo_name, filename='file1',
 
                content='line1\n', message='commit1', vcs_type='hg',
 
                parent=None, newfile=True)
 
        # TODO
0 comments (0 inline, 0 general)