Changeset - e24531aa2449
[Not reviewed]
default
0 2 0
Thomas De Schampheleire - 6 years ago 2020-06-27 20:30:00
thomas.de_schampheleire@nokia.com
api: extend get_pullrequest data with created_on and updated_on fields

The get_pullrequest API call returned most attributes of a pullrequest, but
not created_on and updated_on.

Set microseconds to 0, as done for modified_at in ChangesetStatus. See
commit 71033bd37b4c.
2 files changed with 4 insertions and 0 deletions:
0 comments (0 inline, 0 general)
kallithea/model/db.py
Show inline comments
 
@@ -1950,361 +1950,363 @@ class ChangesetComment(Base, BaseDbModel
 
            text=self.text,
 
        )
 

	
 
    def deletable(self):
 
        return self.created_on > datetime.datetime.now() - datetime.timedelta(minutes=5)
 

	
 

	
 
class ChangesetStatus(Base, BaseDbModel):
 
    __tablename__ = 'changeset_statuses'
 
    __table_args__ = (
 
        Index('cs_revision_idx', 'revision'),
 
        Index('cs_version_idx', 'version'),
 
        Index('cs_pull_request_id_idx', 'pull_request_id'),
 
        Index('cs_changeset_comment_id_idx', 'changeset_comment_id'),
 
        Index('cs_pull_request_id_user_id_version_idx', 'pull_request_id', 'user_id', 'version'),
 
        Index('cs_repo_id_pull_request_id_idx', 'repo_id', 'pull_request_id'),
 
        UniqueConstraint('repo_id', 'revision', 'version'),
 
        _table_args_default_dict,
 
    )
 

	
 
    STATUS_NOT_REVIEWED = DEFAULT = 'not_reviewed'
 
    STATUS_APPROVED = 'approved'
 
    STATUS_REJECTED = 'rejected' # is shown as "Not approved" - TODO: change database content / scheme
 
    STATUS_UNDER_REVIEW = 'under_review'
 

	
 
    STATUSES = [
 
        (STATUS_NOT_REVIEWED, _("Not reviewed")),  # (no icon) and default
 
        (STATUS_UNDER_REVIEW, _("Under review")),
 
        (STATUS_REJECTED, _("Not approved")),
 
        (STATUS_APPROVED, _("Approved")),
 
    ]
 
    STATUSES_DICT = dict(STATUSES)
 

	
 
    changeset_status_id = Column(Integer(), primary_key=True)
 
    repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    revision = Column(String(40), nullable=True)
 
    status = Column(String(128), nullable=False, default=DEFAULT)
 
    comment_id = Column('changeset_comment_id', Integer(), ForeignKey('changeset_comments.comment_id'), nullable=False)
 
    modified_at = Column(DateTime(), nullable=False, default=datetime.datetime.now)
 
    version = Column(Integer(), nullable=False, default=0)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
 

	
 
    author = relationship('User')
 
    repo = relationship('Repository')
 
    comment = relationship('ChangesetComment')
 
    pull_request = relationship('PullRequest')
 

	
 
    def __repr__(self):
 
        return "<%s %r by %r>" % (
 
            self.__class__.__name__,
 
            self.status, self.author
 
        )
 

	
 
    @classmethod
 
    def get_status_lbl(cls, value):
 
        return cls.STATUSES_DICT.get(value)
 

	
 
    @property
 
    def status_lbl(self):
 
        return ChangesetStatus.get_status_lbl(self.status)
 

	
 
    def __json__(self):
 
        return dict(
 
            status=self.status,
 
            modified_at=self.modified_at.replace(microsecond=0),
 
            reviewer=self.author.username,
 
            )
 

	
 

	
 
class PullRequest(Base, BaseDbModel):
 
    __tablename__ = 'pull_requests'
 
    __table_args__ = (
 
        Index('pr_org_repo_id_idx', 'org_repo_id'),
 
        Index('pr_other_repo_id_idx', 'other_repo_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    # values for .status
 
    STATUS_NEW = 'new'
 
    STATUS_CLOSED = 'closed'
 

	
 
    pull_request_id = Column(Integer(), primary_key=True)
 
    title = Column(Unicode(255), nullable=False)
 
    description = Column(UnicodeText(), nullable=False)
 
    status = Column(Unicode(255), nullable=False, default=STATUS_NEW) # only for closedness, not approve/reject/etc
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _revisions = Column('revisions', UnicodeText(), nullable=False)
 
    org_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    org_ref = Column(Unicode(255), nullable=False)
 
    other_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
 
    other_ref = Column(Unicode(255), nullable=False)
 

	
 
    @hybrid_property
 
    def revisions(self):
 
        return self._revisions.split(':')
 

	
 
    @revisions.setter
 
    def revisions(self, val):
 
        self._revisions = ':'.join(val)
 

	
 
    @property
 
    def org_ref_parts(self):
 
        return self.org_ref.split(':')
 

	
 
    @property
 
    def other_ref_parts(self):
 
        return self.other_ref.split(':')
 

	
 
    owner = relationship('User')
 
    reviewers = relationship('PullRequestReviewer',
 
                             cascade="all, delete-orphan")
 
    org_repo = relationship('Repository', primaryjoin='PullRequest.org_repo_id==Repository.repo_id')
 
    other_repo = relationship('Repository', primaryjoin='PullRequest.other_repo_id==Repository.repo_id')
 
    statuses = relationship('ChangesetStatus', order_by='ChangesetStatus.changeset_status_id')
 
    comments = relationship('ChangesetComment', order_by='ChangesetComment.comment_id',
 
                             cascade="all, delete-orphan")
 

	
 
    @classmethod
 
    def query(cls, reviewer_id=None, include_closed=True, sorted=False):
 
        """Add PullRequest-specific helpers for common query constructs.
 

	
 
        reviewer_id: only PRs with the specified user added as reviewer.
 

	
 
        include_closed: if False, do not include closed PRs.
 

	
 
        sorted: if True, apply the default ordering (newest first).
 
        """
 
        q = super(PullRequest, cls).query()
 

	
 
        if reviewer_id is not None:
 
            q = q.join(PullRequestReviewer).filter(PullRequestReviewer.user_id == reviewer_id)
 

	
 
        if not include_closed:
 
            q = q.filter(PullRequest.status != PullRequest.STATUS_CLOSED)
 

	
 
        if sorted:
 
            q = q.order_by(PullRequest.created_on.desc())
 

	
 
        return q
 

	
 
    def get_reviewer_users(self):
 
        """Like .reviewers, but actually returning the users"""
 
        return User.query() \
 
            .join(PullRequestReviewer) \
 
            .filter(PullRequestReviewer.pull_request == self) \
 
            .order_by(PullRequestReviewer.pull_request_reviewers_id) \
 
            .all()
 

	
 
    def is_closed(self):
 
        return self.status == self.STATUS_CLOSED
 

	
 
    def user_review_status(self, user_id):
 
        """Return the user's latest status votes on PR"""
 
        # note: no filtering on repo - that would be redundant
 
        status = ChangesetStatus.query() \
 
            .filter(ChangesetStatus.pull_request == self) \
 
            .filter(ChangesetStatus.user_id == user_id) \
 
            .order_by(ChangesetStatus.version) \
 
            .first()
 
        return str(status.status) if status else ''
 

	
 
    @classmethod
 
    def make_nice_id(cls, pull_request_id):
 
        '''Return pull request id nicely formatted for displaying'''
 
        return '#%s' % pull_request_id
 

	
 
    def nice_id(self):
 
        '''Return the id of this pull request, nicely formatted for displaying'''
 
        return self.make_nice_id(self.pull_request_id)
 

	
 
    def get_api_data(self):
 
        return self.__json__()
 

	
 
    def __json__(self):
 
        clone_uri_tmpl = kallithea.CONFIG.get('clone_uri_tmpl') or Repository.DEFAULT_CLONE_URI
 
        return dict(
 
            pull_request_id=self.pull_request_id,
 
            url=self.url(),
 
            reviewers=self.reviewers,
 
            revisions=self.revisions,
 
            owner=self.owner.username,
 
            title=self.title,
 
            description=self.description,
 
            org_repo_url=self.org_repo.clone_url(clone_uri_tmpl=clone_uri_tmpl),
 
            org_ref_parts=self.org_ref_parts,
 
            other_ref_parts=self.other_ref_parts,
 
            status=self.status,
 
            comments=self.comments,
 
            statuses=self.statuses,
 
            created_on=self.created_on.replace(microsecond=0),
 
            updated_on=self.updated_on.replace(microsecond=0),
 
        )
 

	
 
    def url(self, **kwargs):
 
        canonical = kwargs.pop('canonical', None)
 
        import kallithea.lib.helpers as h
 
        b = self.org_ref_parts[1]
 
        if b != self.other_ref_parts[1]:
 
            s = '/_/' + b
 
        else:
 
            s = '/_/' + self.title
 
        kwargs['extra'] = urlreadable(s)
 
        if canonical:
 
            return h.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                                   pull_request_id=self.pull_request_id, **kwargs)
 
        return h.url('pullrequest_show', repo_name=self.other_repo.repo_name,
 
                     pull_request_id=self.pull_request_id, **kwargs)
 

	
 

	
 
class PullRequestReviewer(Base, BaseDbModel):
 
    __tablename__ = 'pull_request_reviewers'
 
    __table_args__ = (
 
        Index('pull_request_reviewers_user_id_idx', 'user_id'),
 
        _table_args_default_dict,
 
    )
 

	
 
    def __init__(self, user=None, pull_request=None):
 
        self.user = user
 
        self.pull_request = pull_request
 

	
 
    pull_request_reviewers_id = Column('pull_requests_reviewers_id', Integer(), primary_key=True)
 
    pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=False)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 

	
 
    user = relationship('User')
 
    pull_request = relationship('PullRequest')
 

	
 
    def __json__(self):
 
        return dict(
 
            username=self.user.username if self.user else None,
 
        )
 

	
 

	
 
class Notification(object):
 
    __tablename__ = 'notifications'
 

	
 
class UserNotification(object):
 
    __tablename__ = 'user_to_notification'
 

	
 

	
 
class Gist(Base, BaseDbModel):
 
    __tablename__ = 'gists'
 
    __table_args__ = (
 
        Index('g_gist_access_id_idx', 'gist_access_id'),
 
        Index('g_created_on_idx', 'created_on'),
 
        _table_args_default_dict,
 
    )
 

	
 
    GIST_PUBLIC = 'public'
 
    GIST_PRIVATE = 'private'
 
    DEFAULT_FILENAME = 'gistfile1.txt'
 

	
 
    gist_id = Column(Integer(), primary_key=True)
 
    gist_access_id = Column(Unicode(250), nullable=False)
 
    gist_description = Column(UnicodeText(), nullable=False)
 
    owner_id = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=False)
 
    gist_expires = Column(Float(53), nullable=False)
 
    gist_type = Column(Unicode(128), nullable=False)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 

	
 
    owner = relationship('User')
 

	
 
    @hybrid_property
 
    def is_expired(self):
 
        return (self.gist_expires != -1) & (time.time() > self.gist_expires)
 

	
 
    def __repr__(self):
 
        return "<%s %s %s>" % (
 
            self.__class__.__name__,
 
            self.gist_type, self.gist_access_id)
 

	
 
    @classmethod
 
    def guess_instance(cls, value):
 
        return super(Gist, cls).guess_instance(value, Gist.get_by_access_id)
 

	
 
    @classmethod
 
    def get_or_404(cls, id_):
 
        res = cls.query().filter(cls.gist_access_id == id_).scalar()
 
        if res is None:
 
            raise HTTPNotFound
 
        return res
 

	
 
    @classmethod
 
    def get_by_access_id(cls, gist_access_id):
 
        return cls.query().filter(cls.gist_access_id == gist_access_id).scalar()
 

	
 
    def gist_url(self):
 
        alias_url = kallithea.CONFIG.get('gist_alias_url')
 
        if alias_url:
 
            return alias_url.replace('{gistid}', self.gist_access_id)
 

	
 
        import kallithea.lib.helpers as h
 
        return h.canonical_url('gist', gist_id=self.gist_access_id)
 

	
 
    def get_api_data(self):
 
        """
 
        Common function for generating gist related data for API
 
        """
 
        gist = self
 
        data = dict(
 
            gist_id=gist.gist_id,
 
            type=gist.gist_type,
 
            access_id=gist.gist_access_id,
 
            description=gist.gist_description,
 
            url=gist.gist_url(),
 
            expires=gist.gist_expires,
 
            created_on=gist.created_on,
 
        )
 
        return data
 

	
 
    def __json__(self):
 
        data = dict(
 
        )
 
        data.update(self.get_api_data())
 
        return data
 

	
 
    ## SCM functions
 

	
 
    @property
 
    def scm_instance(self):
 
        from kallithea.lib.vcs import get_repo
 
        from kallithea.model.gist import GIST_STORE_LOC
 
        gist_base_path = os.path.join(kallithea.CONFIG['base_path'], GIST_STORE_LOC)
 
        return get_repo(os.path.join(gist_base_path, self.gist_access_id))
 

	
 

	
 
class UserSshKeys(Base, BaseDbModel):
 
    __tablename__ = 'user_ssh_keys'
 
    __table_args__ = (
 
        Index('usk_fingerprint_idx', 'fingerprint'),
 
        _table_args_default_dict
 
    )
 
    __mapper_args__ = {}
 

	
 
    user_ssh_key_id = Column(Integer(), primary_key=True)
 
    user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
 
    _public_key = Column('public_key', UnicodeText(), nullable=False)
 
    description = Column(UnicodeText(), nullable=False)
 
    fingerprint = Column(String(255), nullable=False, unique=True)
 
    created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
 
    last_seen = Column(DateTime(timezone=False), nullable=True)
 

	
 
    user = relationship('User')
 

	
 
    @property
 
    def public_key(self):
 
        return self._public_key
 

	
 
    @public_key.setter
 
    def public_key(self, full_key):
 
        """The full public key is too long to be suitable as database key.
 
        Instead, as a side-effect of setting the public key string, compute the
 
        fingerprints according to https://tools.ietf.org/html/rfc4716#section-4
 
        BUT using sha256 instead of md5, similar to 'ssh-keygen -E sha256 -lf
 
        ~/.ssh/id_rsa.pub' .
 
        """
 
        keytype, key_bytes, comment = ssh.parse_pub_key(full_key)
 
        self._public_key = full_key
 
        self.fingerprint = base64.b64encode(hashlib.sha256(key_bytes).digest()).replace(b'\n', b'').rstrip(b'=').decode()
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -2278,251 +2278,253 @@ class _BaseTestApi(object):
 

	
 
    def test_api_create_gist(self):
 
        id_, params = _build_data(self.apikey_regular, 'create_gist',
 
                                  lifetime=10,
 
                                  description='foobar-gist',
 
                                  gist_type='public',
 
                                  files={'foobar': {'content': 'foo'}})
 
        response = api_call(self, params)
 
        expected = {
 
            'gist': {
 
                'access_id': response.json['result']['gist']['access_id'],
 
                'created_on': response.json['result']['gist']['created_on'],
 
                'description': 'foobar-gist',
 
                'expires': response.json['result']['gist']['expires'],
 
                'gist_id': response.json['result']['gist']['gist_id'],
 
                'type': 'public',
 
                'url': response.json['result']['gist']['url']
 
            },
 
            'msg': 'created new gist'
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(GistModel, 'create', raise_exception)
 
    def test_api_create_gist_exception_occurred(self):
 
        id_, params = _build_data(self.apikey_regular, 'create_gist',
 
                                  files={})
 
        response = api_call(self, params)
 
        expected = 'failed to create gist'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist_regular_user(self):
 
        gist_id = fixture.create_gist(owner=self.TEST_USER_LOGIN).gist_access_id
 
        id_, params = _build_data(self.apikey_regular, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = {'gist': None, 'msg': 'deleted gist ID:%s' % gist_id}
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_delete_gist_regular_user_no_permission(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey_regular, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = 'gist `%s` does not exist' % (gist_id,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    @mock.patch.object(GistModel, 'delete', raise_exception)
 
    def test_api_delete_gist_exception_occurred(self):
 
        gist_id = fixture.create_gist().gist_access_id
 
        id_, params = _build_data(self.apikey, 'delete_gist',
 
                                  gistid=gist_id)
 
        response = api_call(self, params)
 
        expected = 'failed to delete gist ID:%s' % (gist_id,)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_ip(self):
 
        id_, params = _build_data(self.apikey, 'get_ip')
 
        response = api_call(self, params)
 
        expected = {
 
            'server_ip_addr': '0.0.0.0',
 
            'user_ips': []
 
        }
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_server_info(self):
 
        id_, params = _build_data(self.apikey, 'get_server_info')
 
        response = api_call(self, params)
 
        expected = Setting.get_server_info()
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_changesets(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start=0, end=2)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 3
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_max_revisions(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start_date="2011-02-24T00:00:00", max_revisions=10)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 10
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_branch(self):
 
        if self.REPO == 'vcs_test_hg':
 
            branch = 'stable'
 
        else:
 
            pytest.skip("skipping due to missing branches in git test repo")
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, branch_name=branch, start_date="2011-02-24T00:00:00")
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 5
 
        assert 'message' in result[0]
 
        assert 'added' not in result[0]
 

	
 
    def test_api_get_changesets_with_file_list(self):
 
        id_, params = _build_data(self.apikey, 'get_changesets',
 
                                  repoid=self.REPO, start_date="2010-04-07T23:30:30", end_date="2010-04-08T00:31:14", with_file_list=True)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert len(result) == 3
 
        assert 'message' in result[0]
 
        assert 'added' in result[0]
 

	
 
    def test_api_get_changeset(self):
 
        review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert "reviews" not in result
 

	
 
    def test_api_get_changeset_with_reviews(self):
 
        reviewobjs = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION,
 
                                  with_reviews=True)
 
        response = api_call(self, params)
 
        result = ext_json.loads(response.body)["result"]
 
        assert result["raw_id"] == self.TEST_REVISION
 
        assert "reviews" in result
 
        assert len(result["reviews"]) == 1
 
        review = result["reviews"][0]
 
        expected = {
 
            'status': 'approved',
 
            'modified_at': reviewobjs[0].modified_at.replace(microsecond=0).isoformat(),
 
            'reviewer': 'test_admin',
 
        }
 
        assert review == expected
 

	
 
    def test_api_get_changeset_that_does_not_exist(self):
 
        """ Fetch changeset status for non-existant changeset.
 
        revision id is the above git hash used in the test above with the
 
        last 3 nibbles replaced with 0xf.  Should not exist for git _or_ hg.
 
        """
 
        id_, params = _build_data(self.apikey, 'get_changeset',
 
                                  repoid=self.REPO, raw_id = '7ab37bc680b4aa72c34d07b230c866c28e9fcfff')
 
        response = api_call(self, params)
 
        expected = 'Changeset %s does not exist' % ('7ab37bc680b4aa72c34d07b230c866c28e9fcfff',)
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_changeset_without_permission(self):
 
        review = fixture.review_changeset(self.REPO, self.TEST_REVISION, "approved")
 
        RepoModel().revoke_user_permission(repo=self.REPO, user=self.TEST_USER_LOGIN)
 
        RepoModel().revoke_user_permission(repo=self.REPO, user="default")
 
        id_, params = _build_data(self.apikey_regular, 'get_changeset',
 
                                  repoid=self.REPO, raw_id=self.TEST_REVISION)
 
        response = api_call(self, params)
 
        expected = 'Access denied to repo %s' % self.REPO
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_get_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'get test')
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": 'get_pullrequest',
 
            "args": {"pullrequest_id": pull_request_id},
 
        }))
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        expected = {
 
            "status": "new",
 
            "pull_request_id": pull_request_id,
 
            "description": "No description",
 
            "url": "/%s/pull-request/%s/_/%s" % (self.REPO, pull_request_id, "stable"),
 
            "reviewers": [{"username": "test_regular"}],
 
            "org_repo_url": "http://localhost:80/%s" % self.REPO,
 
            "org_ref_parts": ["branch", "stable", self.TEST_PR_SRC],
 
            "other_ref_parts": ["branch", "default", self.TEST_PR_DST],
 
            "comments": [{"username": base.TEST_USER_ADMIN_LOGIN, "text": "",
 
                         "comment_id": pullrequest.comments[0].comment_id}],
 
            "owner": base.TEST_USER_ADMIN_LOGIN,
 
            "statuses": [{"status": "under_review", "reviewer": base.TEST_USER_ADMIN_LOGIN, "modified_at": "2000-01-01T00:00:00"} for i in range(0, len(self.TEST_PR_REVISIONS))],
 
            "title": "get test",
 
            "revisions": self.TEST_PR_REVISIONS,
 
            "created_on": "2000-01-01T00:00:00",
 
            "updated_on": "2000-01-01T00:00:00",
 
        }
 
        self._compare_ok(random_id, expected,
 
                         given=re.sub(br"\d\d\d\d\-\d\d\-\d\dT\d\d\:\d\d\:\d\d",
 
                                      b"2000-01-01T00:00:00", response.body))
 

	
 
    def test_api_close_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, 'close test')
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "close_pr": True},
 
        }))
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == ''
 
        assert pullrequest.status == PullRequest.STATUS_CLOSED
 
        assert pullrequest.is_closed() == True
 

	
 
    def test_api_status_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "status test")
 

	
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(base.TEST_USER_REGULAR2_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        }))
 
        response = api_call(self, params)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        self._compare_error(random_id, "No permission to change pull request status. User needs to be admin, owner or reviewer.", given=response.body)
 
        assert ChangesetStatus.STATUS_UNDER_REVIEW == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": User.get_by_username(base.TEST_USER_REGULAR_LOGIN).api_key,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "status": ChangesetStatus.STATUS_APPROVED},
 
        }))
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert ChangesetStatus.STATUS_APPROVED == ChangesetStatusModel().calculate_pull_request_result(pullrequest)[2]
 

	
 
    def test_api_comment_pullrequest(self):
 
        pull_request_id = fixture.create_pullrequest(self, self.REPO, self.TEST_PR_SRC, self.TEST_PR_DST, "comment test")
 
        random_id = random.randrange(1, 9999)
 
        params = ascii_bytes(ext_json.dumps({
 
            "id": random_id,
 
            "api_key": self.apikey,
 
            "method": "comment_pullrequest",
 
            "args": {"pull_request_id": pull_request_id, "comment_msg": "Looks good to me"},
 
        }))
 
        response = api_call(self, params)
 
        self._compare_ok(random_id, True, given=response.body)
 
        pullrequest = PullRequest().get(pull_request_id)
 
        assert pullrequest.comments[-1].text == 'Looks good to me'
0 comments (0 inline, 0 general)