@@ -1377,1176 +1377,1182 @@ class Repository(Base, BaseModel):
:param revisions: filter query by revisions only
"""
cmts = ChangesetComment.query()\
.filter(ChangesetComment.repo == self)
if revisions is not None:
if not revisions:
return [] # don't use sql 'in' on empty set
cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
grouped = collections.defaultdict(list)
for cmt in cmts.all():
grouped[cmt.revision].append(cmt)
return grouped
def statuses(self, revisions):
Returns statuses for this repository.
PRs without any votes do _not_ show up as unreviewed.
:param revisions: list of revisions to get statuses for
return {}
statuses = ChangesetStatus.query()\
.filter(ChangesetStatus.repo == self)\
.filter(ChangesetStatus.version == 0)\
.filter(ChangesetStatus.revision.in_(revisions))
grouped = {}
for stat in statuses.all():
pr_id = pr_nice_id = pr_repo = None
if stat.pull_request:
pr_id = stat.pull_request.pull_request_id
pr_nice_id = PullRequest.make_nice_id(pr_id)
pr_repo = stat.pull_request.other_repo.repo_name
grouped[stat.revision] = [str(stat.status), stat.status_lbl,
pr_id, pr_repo, pr_nice_id]
def _repo_size(self):
from kallithea.lib import helpers as h
log.debug('calculating repository size...')
return h.format_byte_size(self.scm_instance.size)
#==========================================================================
# SCM CACHE INSTANCE
def set_invalidate(self):
Mark caches of this repo as invalid.
CacheInvalidation.set_invalidate(self.repo_name)
def scm_instance_no_cache(self):
return self.__get_instance()
@property
def scm_instance(self):
import kallithea
full_cache = str2bool(kallithea.CONFIG.get('vcs_full_cache'))
if full_cache:
return self.scm_instance_cached()
def scm_instance_cached(self, valid_cache_keys=None):
@cache_region('long_term')
def _c(repo_name):
rn = self.repo_name
valid = CacheInvalidation.test_and_set_valid(rn, None, valid_cache_keys=valid_cache_keys)
if not valid:
log.debug('Cache for %s invalidated, getting new object', rn)
region_invalidate(_c, None, rn)
else:
log.debug('Getting scm_instance of %s from cache', rn)
return _c(rn)
def __get_instance(self):
repo_full_path = self.repo_full_path
alias = get_scm(repo_full_path)[0]
log.debug('Creating instance of %s repository from %s',
alias, repo_full_path)
backend = get_backend(alias)
if alias == 'hg':
repo = backend(safe_str(repo_full_path), create=False,
baseui=self._ui)
repo = backend(repo_full_path, create=False)
return repo
def __json__(self):
return dict(landing_rev = self.landing_rev)
class RepoGroup(Base, BaseModel):
__tablename__ = 'groups'
__table_args__ = (
UniqueConstraint('group_name', 'group_parent_id'),
CheckConstraint('group_id != group_parent_id'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8', 'sqlite_autoincrement': True},
)
__mapper_args__ = {'order_by': 'group_name'}
SEP = ' » '
group_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
group_name = Column(String(255, convert_unicode=False), nullable=False, unique=True, default=None)
group_parent_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=True, unique=None, default=None)
group_description = Column(String(10000, convert_unicode=False), nullable=True, unique=None, default=None)
enable_locking = Column(Boolean(), nullable=False, unique=None, default=False)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
created_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
repo_group_to_perm = relationship('UserRepoGroupToPerm', cascade='all', order_by='UserRepoGroupToPerm.group_to_perm_id')
users_group_to_perm = relationship('UserGroupRepoGroupToPerm', cascade='all')
parent_group = relationship('RepoGroup', remote_side=group_id)
user = relationship('User')
def __init__(self, group_name='', parent_group=None):
self.group_name = group_name
self.parent_group = parent_group
def __unicode__(self):
return u"<%s('id:%s:%s')>" % (self.__class__.__name__, self.group_id,
self.group_name)
@classmethod
def _generate_choice(cls, repo_group):
"""Return tuple with group_id and name as html literal"""
from webhelpers.html import literal
if repo_group is None:
return (-1, u'-- %s --' % _('top level'))
return repo_group.group_id, literal(cls.SEP.join(repo_group.full_path_splitted))
def groups_choices(cls, groups):
"""Return tuples with group_id and name as html literal."""
return sorted((cls._generate_choice(g) for g in groups),
key=lambda c: c[1].split(cls.SEP))
def url_sep(cls):
return URL_SEP
def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
if case_insensitive:
gr = cls.query()\
.filter(cls.group_name.ilike(group_name))
.filter(cls.group_name == group_name)
if cache:
gr = gr.options(FromCache(
"sql_cache_short",
"get_group_%s" % _hash_key(group_name)
return gr.scalar()
def parents(self):
parents_recursion_limit = 10
groups = []
if self.parent_group is None:
return groups
cur_gr = self.parent_group
groups.insert(0, cur_gr)
cnt = 0
while 1:
cnt += 1
gr = getattr(cur_gr, 'parent_group', None)
cur_gr = cur_gr.parent_group
if gr is None:
break
if cnt == parents_recursion_limit:
# this will prevent accidental infinite loops
log.error(('more than %s parents found for group %s, stopping '
'recursive parent fetching' % (parents_recursion_limit, self)))
groups.insert(0, gr)
def children(self):
return RepoGroup.query().filter(RepoGroup.parent_group == self)
def name(self):
return self.group_name.split(RepoGroup.url_sep())[-1]
def full_path(self):
return self.group_name
def full_path_splitted(self):
return self.group_name.split(RepoGroup.url_sep())
def repositories(self):
return Repository.query()\
.filter(Repository.group == self)\
.order_by(Repository.repo_name)
def repositories_recursive_count(self):
cnt = self.repositories.count()
def children_count(group):
for child in group.children:
cnt += child.repositories.count()
cnt += children_count(child)
return cnt
return cnt + children_count(self)
def _recursive_objects(self, include_repos=True):
all_ = []
def _get_members(root_gr):
if include_repos:
for r in root_gr.repositories:
all_.append(r)
childs = root_gr.children.all()
if childs:
for gr in childs:
all_.append(gr)
_get_members(gr)
_get_members(self)
return [self] + all_
def recursive_groups_and_repos(self):
Recursive return all groups, with repositories in those groups
return self._recursive_objects()
def recursive_groups(self):
Returns all children groups for this group including children of children
return self._recursive_objects(include_repos=False)
def get_new_name(self, group_name):
returns new full group name based on parent and new name
:param group_name:
path_prefix = (self.parent_group.full_path_splitted if
self.parent_group else [])
return RepoGroup.url_sep().join(path_prefix + [group_name])
def get_api_data(self):
Common function for generating api data
group = self
data = dict(
group_id=group.group_id,
group_name=group.group_name,
group_description=group.group_description,
parent_group=group.parent_group.group_name if group.parent_group else None,
repositories=[x.repo_name for x in group.repositories],
owner=group.user.username
return data
class Permission(Base, BaseModel):
__tablename__ = 'permissions'
Index('p_perm_name_idx', 'permission_name'),
PERMS = [
('hg.admin', _('Kallithea Administrator')),
('repository.none', _('Default user has no access to new repositories')),
('repository.read', _('Default user has read access to new repositories')),
('repository.write', _('Default user has write access to new repositories')),
('repository.admin', _('Default user has admin access to new repositories')),
('group.none', _('Default user has no access to new repository groups')),
('group.read', _('Default user has read access to new repository groups')),
('group.write', _('Default user has write access to new repository groups')),
('group.admin', _('Default user has admin access to new repository groups')),
('usergroup.none', _('Default user has no access to new user groups')),
('usergroup.read', _('Default user has read access to new user groups')),
('usergroup.write', _('Default user has write access to new user groups')),
('usergroup.admin', _('Default user has admin access to new user groups')),
('hg.repogroup.create.false', _('Only admins can create repository groups')),
('hg.repogroup.create.true', _('Non-admins can create repository groups')),
('hg.usergroup.create.false', _('Only admins can create user groups')),
('hg.usergroup.create.true', _('Non-admins can create user groups')),
('hg.create.none', _('Only admins can create top level repositories')),
('hg.create.repository', _('Non-admins can create top level repositories')),
('hg.create.write_on_repogroup.true', _('Repository creation enabled with write permission to a repository group')),
('hg.create.write_on_repogroup.false', _('Repository creation disabled with write permission to a repository group')),
('hg.fork.none', _('Only admins can fork repositories')),
('hg.fork.repository', _('Non-admins can fork repositories')),
('hg.register.none', _('Registration disabled')),
('hg.register.manual_activate', _('User registration with manual account activation')),
('hg.register.auto_activate', _('User registration with automatic account activation')),
('hg.extern_activate.manual', _('Manual activation of external account')),
('hg.extern_activate.auto', _('Automatic activation of external account')),
]
#definition of system default permissions for DEFAULT user
DEFAULT_USER_PERMISSIONS = [
'repository.read',
'group.read',
'usergroup.read',
'hg.create.repository',
'hg.create.write_on_repogroup.true',
'hg.fork.repository',
'hg.register.manual_activate',
'hg.extern_activate.auto',
# defines which permissions are more important higher the more important
# Weight defines which permissions are more important.
# The higher number the more important.
PERM_WEIGHTS = {
'repository.none': 0,
'repository.read': 1,
'repository.write': 3,
'repository.admin': 4,
'group.none': 0,
'group.read': 1,
'group.write': 3,
'group.admin': 4,
'usergroup.none': 0,
'usergroup.read': 1,
'usergroup.write': 3,
'usergroup.admin': 4,
'hg.repogroup.create.false': 0,
'hg.repogroup.create.true': 1,
'hg.usergroup.create.false': 0,
'hg.usergroup.create.true': 1,
'hg.fork.none': 0,
'hg.fork.repository': 1,
'hg.create.none': 0,
'hg.create.repository': 1
}
permission_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
permission_name = Column(String(255, convert_unicode=False), nullable=True, unique=None, default=None)
permission_longname = Column(String(255, convert_unicode=False), nullable=True, unique=None, default=None)
return u"<%s('%s:%s')>" % (
self.__class__.__name__, self.permission_id, self.permission_name
def get_by_key(cls, key):
return cls.query().filter(cls.permission_name == key).scalar()
def get_default_perms(cls, default_user_id):
q = Session().query(UserRepoToPerm, Repository, cls)\
.join((Repository, UserRepoToPerm.repository_id == Repository.repo_id))\
.join((cls, UserRepoToPerm.permission_id == cls.permission_id))\
.filter(UserRepoToPerm.user_id == default_user_id)
return q.all()
def get_default_group_perms(cls, default_user_id):
q = Session().query(UserRepoGroupToPerm, RepoGroup, cls)\
.join((RepoGroup, UserRepoGroupToPerm.group_id == RepoGroup.group_id))\
.join((cls, UserRepoGroupToPerm.permission_id == cls.permission_id))\
.filter(UserRepoGroupToPerm.user_id == default_user_id)
def get_default_user_group_perms(cls, default_user_id):
q = Session().query(UserUserGroupToPerm, UserGroup, cls)\
.join((UserGroup, UserUserGroupToPerm.user_group_id == UserGroup.users_group_id))\
.join((cls, UserUserGroupToPerm.permission_id == cls.permission_id))\
.filter(UserUserGroupToPerm.user_id == default_user_id)
class UserRepoToPerm(Base, BaseModel):
__tablename__ = 'repo_to_perm'
UniqueConstraint('user_id', 'repository_id', 'permission_id'),
'mysql_charset': 'utf8', 'sqlite_autoincrement': True}
repo_to_perm_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
permission_id = Column(Integer(), ForeignKey('permissions.permission_id'), nullable=False, unique=None, default=None)
repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=None, default=None)
repository = relationship('Repository')
permission = relationship('Permission')
def create(cls, user, repository, permission):
n = cls()
n.user = user
n.repository = repository
n.permission = permission
Session().add(n)
return n
return u'<%s => %s >' % (self.user, self.repository)
class UserUserGroupToPerm(Base, BaseModel):
__tablename__ = 'user_user_group_to_perm'
UniqueConstraint('user_id', 'user_group_id', 'permission_id'),
user_user_group_to_perm_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
user_group = relationship('UserGroup')
def create(cls, user, user_group, permission):
n.user_group = user_group
return u'<%s => %s >' % (self.user, self.user_group)
class UserToPerm(Base, BaseModel):
__tablename__ = 'user_to_perm'
UniqueConstraint('user_id', 'permission_id'),
user_to_perm_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
return u'<%s => %s >' % (self.user, self.permission)
class UserGroupRepoToPerm(Base, BaseModel):
__tablename__ = 'users_group_repo_to_perm'
UniqueConstraint('repository_id', 'users_group_id', 'permission_id'),
users_group_to_perm_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
users_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
users_group = relationship('UserGroup')
def create(cls, users_group, repository, permission):
n.users_group = users_group
return u'<UserGroupRepoToPerm:%s => %s >' % (self.users_group, self.repository)
class UserGroupUserGroupToPerm(Base, BaseModel):
__tablename__ = 'user_group_user_group_to_perm'
UniqueConstraint('target_user_group_id', 'user_group_id', 'permission_id'),
CheckConstraint('target_user_group_id != user_group_id'),
user_group_user_group_to_perm_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
target_user_group_id = Column(Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
target_user_group = relationship('UserGroup', primaryjoin='UserGroupUserGroupToPerm.target_user_group_id==UserGroup.users_group_id')
user_group = relationship('UserGroup', primaryjoin='UserGroupUserGroupToPerm.user_group_id==UserGroup.users_group_id')
def create(cls, target_user_group, user_group, permission):
n.target_user_group = target_user_group
return u'<UserGroupUserGroup:%s => %s >' % (self.target_user_group, self.user_group)
class UserGroupToPerm(Base, BaseModel):
__tablename__ = 'users_group_to_perm'
UniqueConstraint('users_group_id', 'permission_id',),
class UserRepoGroupToPerm(Base, BaseModel):
__tablename__ = 'user_repo_group_to_perm'
UniqueConstraint('user_id', 'group_id', 'permission_id'),
group_to_perm_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
group_id = Column(Integer(), ForeignKey('groups.group_id'), nullable=False, unique=None, default=None)
group = relationship('RepoGroup')
def create(cls, user, repository_group, permission):
n.group = repository_group
class UserGroupRepoGroupToPerm(Base, BaseModel):
__tablename__ = 'users_group_repo_group_to_perm'
UniqueConstraint('users_group_id', 'group_id'),
users_group_repo_group_to_perm_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
def create(cls, user_group, repository_group, permission):
n.users_group = user_group
class Statistics(Base, BaseModel):
__tablename__ = 'statistics'
UniqueConstraint('repository_id'),
stat_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
repository_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False, unique=True, default=None)
stat_on_revision = Column(Integer(), nullable=False)
commit_activity = Column(LargeBinary(1000000), nullable=False)#JSON data
commit_activity_combined = Column(LargeBinary(), nullable=False)#JSON data
languages = Column(LargeBinary(1000000), nullable=False)#JSON data
repository = relationship('Repository', single_parent=True)
class UserFollowing(Base, BaseModel):
__tablename__ = 'user_followings'
UniqueConstraint('user_id', 'follows_repository_id'),
UniqueConstraint('user_id', 'follows_user_id'),
user_following_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
follows_repo_id = Column("follows_repository_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=None, default=None)
follows_user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True, unique=None, default=None)
follows_from = Column(DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
user = relationship('User', primaryjoin='User.user_id==UserFollowing.user_id')
follows_user = relationship('User', primaryjoin='User.user_id==UserFollowing.follows_user_id')
follows_repository = relationship('Repository', order_by='Repository.repo_name')
def get_repo_followers(cls, repo_id):
return cls.query().filter(cls.follows_repo_id == repo_id)
class CacheInvalidation(Base, BaseModel):
__tablename__ = 'cache_invalidation'
UniqueConstraint('cache_key'),
Index('key_idx', 'cache_key'),
# cache_id, not used
cache_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
# cache_key as created by _get_cache_key
cache_key = Column(String(255, convert_unicode=False))
# cache_args is a repo_name
cache_args = Column(String(255, convert_unicode=False))
# instance sets cache_active True when it is caching, other instances set
# cache_active to False to indicate that this cache is invalid
cache_active = Column(Boolean(), nullable=True, unique=None, default=False)
def __init__(self, cache_key, repo_name=''):
self.cache_key = cache_key
self.cache_args = repo_name
self.cache_active = False
return u"<%s('%s:%s[%s]')>" % (
self.__class__.__name__,
self.cache_id, self.cache_key, self.cache_active)
def _cache_key_partition(self):
prefix, repo_name, suffix = self.cache_key.partition(self.cache_args)
return prefix, repo_name, suffix
def get_prefix(self):
get prefix that might have been used in _get_cache_key to
generate self.cache_key. Only used for informational purposes
in repo_edit.html.
# prefix, repo_name, suffix
return self._cache_key_partition()[0]
def get_suffix(self):
get suffix that might have been used in _get_cache_key to
return self._cache_key_partition()[2]
def clear_cache(cls):
Delete all cache keys from database.
Should only be run when all instances are down and all entries thus stale.
cls.query().delete()
Session().commit()
def _get_cache_key(cls, key):
Wrapper for generating a unique cache key for this instance and "key".
key must / will start with a repo_name which will be stored in .cache_args .
prefix = kallithea.CONFIG.get('instance_id', '')
return "%s%s" % (prefix, key)
def set_invalidate(cls, repo_name, delete=False):
Mark all caches of a repo as invalid in the database.
inv_objs = Session().query(cls).filter(cls.cache_args == repo_name).all()
log.debug('for repo %s got %s invalidation objects',
safe_str(repo_name), inv_objs)
for inv_obj in inv_objs:
log.debug('marking %s key for invalidation based on repo_name=%s',
inv_obj, safe_str(repo_name))
if delete:
Session().delete(inv_obj)
inv_obj.cache_active = False
Session().add(inv_obj)
def test_and_set_valid(cls, repo_name, kind, valid_cache_keys=None):
Mark this cache key as active and currently cached.
Return True if the existing cache registration still was valid.
Return False to indicate that it had been invalidated and caches should be refreshed.
key = (repo_name + '_' + kind) if kind else repo_name
cache_key = cls._get_cache_key(key)
if valid_cache_keys and cache_key in valid_cache_keys:
return True
inv_obj = cls.query().filter(cls.cache_key == cache_key).scalar()
if not inv_obj:
inv_obj = CacheInvalidation(cache_key, repo_name)
if inv_obj.cache_active:
inv_obj.cache_active = True
try:
except exc.IntegrityError:
raise
# TOCTOU - another thread added the key at the same time; no further action required
return False
def get_valid_cache_keys(cls):
Return opaque object with information of which caches still are valid
and can be used without checking for invalidation.
return set(inv_obj.cache_key for inv_obj in cls.query().filter(cls.cache_active).all())
class ChangesetComment(Base, BaseModel):
__tablename__ = 'changeset_comments'
Index('cc_revision_idx', 'revision'),
Index('cc_pull_request_id_idx', 'pull_request_id'),
comment_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
revision = Column(String(40))
pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'))
line_no = Column(Unicode(10))
f_path = Column(Unicode(1000))
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False)
text = Column(UnicodeText(25000), nullable=False)
modified_at = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
author = relationship('User')
repo = relationship('Repository')
# status_change is frequently used directly in templates - make it a lazy
# join to avoid fetching each related ChangesetStatus on demand.
# There will only be one ChangesetStatus referencing each comment so the join will not explode.
status_change = relationship('ChangesetStatus',
cascade="all, delete-orphan", lazy='joined')
pull_request = relationship('PullRequest')
def get_users(cls, revision=None, pull_request_id=None):
Returns user associated with this ChangesetComment. ie those
who actually commented
:param cls:
:param revision:
q = Session().query(User)\
.join(ChangesetComment.author)
if revision is not None:
q = q.filter(cls.revision == revision)
elif pull_request_id is not None:
q = q.filter(cls.pull_request_id == pull_request_id)
def url(self):
anchor = "comment-%s" % self.comment_id
import kallithea.lib.helpers as h
if self.revision:
return h.url('changeset_home', repo_name=self.repo.repo_name, revision=self.revision, anchor=anchor)
elif self.pull_request_id is not None:
return self.pull_request.url(anchor=anchor)
class ChangesetStatus(Base, BaseModel):
__tablename__ = 'changeset_statuses'
Index('cs_revision_idx', 'revision'),
Index('cs_version_idx', 'version'),
Index('cs_pull_request_id_idx', 'pull_request_id'),
Index('cs_changeset_comment_id_idx', 'changeset_comment_id'),
Index('cs_pull_request_id_user_id_version_idx', 'pull_request_id', 'user_id', 'version'),
UniqueConstraint('repo_id', 'revision', 'version'),
STATUS_NOT_REVIEWED = DEFAULT = 'not_reviewed'
STATUS_APPROVED = 'approved'
STATUS_REJECTED = 'rejected'
STATUS_UNDER_REVIEW = 'under_review'
STATUSES = [
(STATUS_NOT_REVIEWED, _("Not reviewed")), # (no icon) and default
(STATUS_APPROVED, _("Approved")),
(STATUS_REJECTED, _("Rejected")),
(STATUS_UNDER_REVIEW, _("Under review")),
changeset_status_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=False, unique=None)
revision = Column(String(40), nullable=False)
status = Column(String(128), nullable=False, default=DEFAULT)
changeset_comment_id = Column(Integer(), ForeignKey('changeset_comments.comment_id'), nullable=False)
modified_at = Column(DateTime(), nullable=False, default=datetime.datetime.now)
version = Column(Integer(), nullable=False, default=0)
pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=True)
comment = relationship('ChangesetComment')
self.status, self.author
def get_status_lbl(cls, value):
return dict(cls.STATUSES).get(value)
def status_lbl(self):
return ChangesetStatus.get_status_lbl(self.status)
class PullRequest(Base, BaseModel):
__tablename__ = 'pull_requests'
Index('pr_org_repo_id_idx', 'org_repo_id'),
Index('pr_other_repo_id_idx', 'other_repo_id'),
# values for .status
STATUS_NEW = u'new'
STATUS_CLOSED = u'closed'
pull_request_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
title = Column(Unicode(255), nullable=True)
description = Column(UnicodeText(10240))
status = Column(Unicode(255), nullable=False, default=STATUS_NEW) # only for closedness, not approve/reject/etc
updated_on = Column(DateTime(timezone=False), nullable=False, default=datetime.datetime.now)
_revisions = Column('revisions', UnicodeText(20500)) # 500 revisions max
org_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
org_ref = Column(Unicode(255), nullable=False)
other_repo_id = Column(Integer(), ForeignKey('repositories.repo_id'), nullable=False)
other_ref = Column(Unicode(255), nullable=False)
@hybrid_property
def revisions(self):
return self._revisions.split(':')
@revisions.setter
def revisions(self, val):
self._revisions = safe_unicode(':'.join(val))
def org_ref_parts(self):
return self.org_ref.split(':')
def other_ref_parts(self):
return self.other_ref.split(':')
owner = relationship('User')
reviewers = relationship('PullRequestReviewers',
cascade="all, delete-orphan")
org_repo = relationship('Repository', primaryjoin='PullRequest.org_repo_id==Repository.repo_id')
other_repo = relationship('Repository', primaryjoin='PullRequest.other_repo_id==Repository.repo_id')
statuses = relationship('ChangesetStatus')
comments = relationship('ChangesetComment',
def is_closed(self):
return self.status == self.STATUS_CLOSED
def user_review_status(self, user_id):
"""Return the user's latest status votes on PR"""
# note: no filtering on repo - that would be redundant
status = ChangesetStatus.query()\
.filter(ChangesetStatus.pull_request == self)\
.filter(ChangesetStatus.user_id == user_id)\
.order_by(ChangesetStatus.version)\
.first()
return str(status.status) if status else ''
def make_nice_id(cls, pull_request_id):
'''Return pull request id nicely formatted for displaying'''
return '#%s' % pull_request_id
def nice_id(self):
'''Return the id of this pull request, nicely formatted for displaying'''
return self.make_nice_id(self.pull_request_id)
return dict(
revisions=self.revisions
def url(self, **kwargs):
canonical = kwargs.pop('canonical', None)
b = self.org_ref_parts[1]
if b != self.other_ref_parts[1]:
s = '/_/' + b
s = '/_/' + self.title
kwargs['extra'] = urlreadable(s)
if canonical:
return h.canonical_url('pullrequest_show', repo_name=self.other_repo.repo_name,
pull_request_id=self.pull_request_id, **kwargs)
return h.url('pullrequest_show', repo_name=self.other_repo.repo_name,
class PullRequestReviewers(Base, BaseModel):
__tablename__ = 'pull_request_reviewers'
def __init__(self, user=None, pull_request=None):
self.user = user
self.pull_request = pull_request
pull_requests_reviewers_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
pull_request_id = Column(Integer(), ForeignKey('pull_requests.pull_request_id'), nullable=False)
user_id = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
class Notification(Base, BaseModel):
__tablename__ = 'notifications'
Index('notification_type_idx', 'type'),
TYPE_CHANGESET_COMMENT = u'cs_comment'
TYPE_MESSAGE = u'message'
TYPE_MENTION = u'mention'
TYPE_REGISTRATION = u'registration'
TYPE_PULL_REQUEST = u'pull_request'
TYPE_PULL_REQUEST_COMMENT = u'pull_request_comment'
notification_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
subject = Column(Unicode(512), nullable=True)
body = Column(UnicodeText(50000), nullable=True)
created_by = Column(Integer(), ForeignKey('users.user_id'), nullable=True)
type_ = Column('type', Unicode(255))
created_by_user = relationship('User')
notifications_to_users = relationship('UserNotification', cascade="all, delete-orphan")
def recipients(self):
return [x.user for x in UserNotification.query()
.filter(UserNotification.notification == self)
.order_by(UserNotification.user_id.asc()).all()]
def create(cls, created_by, subject, body, recipients, type_=None):
if type_ is None:
type_ = Notification.TYPE_MESSAGE
notification = cls()
notification.created_by_user = created_by
notification.subject = subject
notification.body = body
notification.type_ = type_
notification.created_on = datetime.datetime.now()
for u in recipients:
assoc = UserNotification()
assoc.notification = notification
assoc.user_id = u.user_id
Session().add(assoc)
Session().add(notification)
Session().flush() # assign notificaiton.notification_id
return notification
def description(self):
from kallithea.model.notification import NotificationModel
return NotificationModel().make_description(self)
class UserNotification(Base, BaseModel):
__tablename__ = 'user_to_notification'
UniqueConstraint('user_id', 'notification_id'),
user_id = Column(Integer(), ForeignKey('users.user_id'), primary_key=True)
notification_id = Column(Integer(), ForeignKey('notifications.notification_id'), primary_key=True)
read = Column(Boolean, default=False)
sent_on = Column(DateTime(timezone=False), nullable=True, unique=None)
notification = relationship('Notification')
def mark_as_read(self):
self.read = True
Session().add(self)
class Gist(Base, BaseModel):
__tablename__ = 'gists'
Index('g_gist_access_id_idx', 'gist_access_id'),
Index('g_created_on_idx', 'created_on'),
GIST_PUBLIC = u'public'
GIST_PRIVATE = u'private'
DEFAULT_FILENAME = u'gistfile1.txt'
gist_id = Column(Integer(), nullable=False, unique=True, primary_key=True)
gist_access_id = Column(Unicode(250))
gist_description = Column(UnicodeText(1024))
gist_owner = Column('user_id', Integer(), ForeignKey('users.user_id'), nullable=True)
gist_expires = Column(Float(53), nullable=False)
gist_type = Column(Unicode(128), nullable=False)
def __repr__(self):
return '<Gist:[%s]%s>' % (self.gist_type, self.gist_access_id)
def get_or_404(cls, id_):
res = cls.query().filter(cls.gist_access_id == id_).scalar()
if res is None:
raise HTTPNotFound
return res
def get_by_access_id(cls, gist_access_id):
return cls.query().filter(cls.gist_access_id == gist_access_id).scalar()
def gist_url(self):
alias_url = kallithea.CONFIG.get('gist_alias_url')
if alias_url:
return alias_url.replace('{gistid}', self.gist_access_id)
return h.canonical_url('gist', gist_id=self.gist_access_id)
def base_path(cls):
Returns base path where all gists are stored
from kallithea.model.gist import GIST_STORE_LOC
q = Session().query(Ui)\
.filter(Ui.ui_key == URL_SEP)
q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
return os.path.join(q.one().ui_value, GIST_STORE_LOC)
Common function for generating gist related data for API
gist = self
gist_id=gist.gist_id,
type=gist.gist_type,
access_id=gist.gist_access_id,
description=gist.gist_description,
url=gist.gist_url(),
expires=gist.gist_expires,
created_on=gist.created_on,
data.update(self.get_api_data())
## SCM functions
from kallithea.lib.vcs import get_repo
base_path = self.base_path()
return get_repo(os.path.join(*map(safe_str,
[base_path, self.gist_access_id])))
class DbMigrateVersion(Base, BaseModel):
__tablename__ = 'db_migrate_version'
repository_id = Column(String(250), nullable=False, unique=True, primary_key=True)
repository_path = Column(Text)
version = Column(Integer)
Status change: