@@ -814,392 +814,396 @@ class Repository(Base, BaseModel):
"""
return self.fork
@property
def just_name(self):
return self.repo_name.split(Repository.url_sep())[-1]
def groups_with_parents(self):
groups = []
if self.group is None:
return groups
cur_gr = self.group
groups.insert(0, cur_gr)
while 1:
gr = getattr(cur_gr, 'parent_group', None)
cur_gr = cur_gr.parent_group
if gr is None:
break
groups.insert(0, gr)
def groups_and_repo(self):
return self.groups_with_parents, self.just_name
@LazyProperty
def repo_path(self):
Returns base full path for that repository means where it actually
exists on a filesystem
q = Session().query(RhodeCodeUi).filter(RhodeCodeUi.ui_key ==
Repository.url_sep())
q = q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
def repo_full_path(self):
p = [self.repo_path]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split(Repository.url_sep())
return os.path.join(*p)
def cache_keys(self):
Returns associated cache keys for that repo
return CacheInvalidation.query()\
.filter(CacheInvalidation.cache_args == self.repo_name)\
.order_by(CacheInvalidation.cache_key)\
.all()
def get_new_name(self, repo_name):
returns new full repository name based on assigned group and new new
:param group_name:
path_prefix = self.group.full_path_splitted if self.group else []
return Repository.url_sep().join(path_prefix + [repo_name])
def _ui(self):
Creates an db based ui object for this repository
from rhodecode.lib.utils import make_ui
return make_ui('db', clear_session=False)
@classmethod
def inject_ui(cls, repo, extras={}):
from rhodecode.lib.vcs.backends.hg import MercurialRepository
from rhodecode.lib.vcs.backends.git import GitRepository
required = (MercurialRepository, GitRepository)
if not isinstance(repo, required):
raise Exception('repo must be instance of %s' % required)
# inject ui extra param to log this action via push logger
for k, v in extras.items():
repo._repo.ui.setconfig('rhodecode_extras', k, v)
def is_valid(cls, repo_name):
returns True if given repo name is a valid filesystem repository
:param cls:
:param repo_name:
from rhodecode.lib.utils import is_valid_repo
return is_valid_repo(repo_name, cls.base_path())
def get_api_data(self):
Common function for generating repo api data
repo = self
data = dict(
repo_id=repo.repo_id,
repo_name=repo.repo_name,
repo_type=repo.repo_type,
clone_uri=repo.clone_uri,
private=repo.private,
created_on=repo.created_on,
description=repo.description,
landing_rev=repo.landing_rev,
owner=repo.user.username,
fork_of=repo.fork.repo_name if repo.fork else None,
enable_statistics=repo.enable_statistics,
enable_locking=repo.enable_locking,
enable_downloads=repo.enable_downloads,
last_changeset=repo.changeset_cache
)
return data
def lock(cls, repo, user_id):
repo.locked = [user_id, time.time()]
Session().add(repo)
Session().commit()
def unlock(cls, repo):
repo.locked = None
def last_db_change(self):
return self.updated_on
def clone_url(self, **override):
from pylons import url
from urlparse import urlparse
import urllib
parsed_url = urlparse(url('home', qualified=True))
default_clone_uri = '%(scheme)s://%(user)s%(pass)s%(netloc)s%(prefix)s%(path)s'
decoded_path = safe_unicode(urllib.unquote(parsed_url.path))
args = {
'user': '',
'pass': '',
'scheme': parsed_url.scheme,
'netloc': parsed_url.netloc,
'prefix': decoded_path,
'path': self.repo_name
}
args.update(override)
return default_clone_uri % args
#==========================================================================
# SCM PROPERTIES
def get_changeset(self, rev=None):
return get_changeset_safe(self.scm_instance, rev)
def get_landing_changeset(self):
Returns landing changeset, or if that doesn't exist returns the tip
cs = self.get_changeset(self.landing_rev) or self.get_changeset()
return cs
def update_changeset_cache(self, cs_cache=None):
Update cache of last changeset for repository, keys should be::
short_id
raw_id
revision
message
date
author
:param cs_cache:
from rhodecode.lib.vcs.backends.base import BaseChangeset
if cs_cache is None:
cs_cache = self.get_changeset()
if isinstance(cs_cache, BaseChangeset):
cs_cache = cs_cache.__json__()
if cs_cache != self.changeset_cache:
if (cs_cache != self.changeset_cache
or not self.last_change
or not self.changeset_cache):
_default = datetime.datetime.fromtimestamp(0)
last_change = cs_cache.get('date') or self.last_change or _default
log.debug('updated repo %s with new cs cache %s' % (self, cs_cache))
self.updated_on = last_change
self.changeset_cache = cs_cache
Session().add(self)
else:
log.debug('Skipping repo:%s already with latest changes' % self)
def tip(self):
return self.get_changeset('tip')
def author(self):
return self.tip.author
def last_change(self):
return self.scm_instance.last_change
def get_comments(self, revisions=None):
Returns comments for this repository grouped by revisions
:param revisions: filter query by revisions only
cmts = ChangesetComment.query()\
.filter(ChangesetComment.repo == self)
if revisions:
cmts = cmts.filter(ChangesetComment.revision.in_(revisions))
grouped = defaultdict(list)
for cmt in cmts.all():
grouped[cmt.revision].append(cmt)
return grouped
def statuses(self, revisions=None):
Returns statuses for this repository
:param revisions: list of revisions to get statuses for
:type revisions: list
statuses = ChangesetStatus.query()\
.filter(ChangesetStatus.repo == self)\
.filter(ChangesetStatus.version == 0)
statuses = statuses.filter(ChangesetStatus.revision.in_(revisions))
grouped = {}
#maybe we have open new pullrequest without a status ?
stat = ChangesetStatus.STATUS_UNDER_REVIEW
status_lbl = ChangesetStatus.get_status_lbl(stat)
for pr in PullRequest.query().filter(PullRequest.org_repo == self).all():
for rev in pr.revisions:
pr_id = pr.pull_request_id
pr_repo = pr.other_repo.repo_name
grouped[rev] = [stat, status_lbl, pr_id, pr_repo]
for stat in statuses.all():
pr_id = pr_repo = None
if stat.pull_request:
pr_id = stat.pull_request.pull_request_id
pr_repo = stat.pull_request.other_repo.repo_name
grouped[stat.revision] = [str(stat.status), stat.status_lbl,
pr_id, pr_repo]
# SCM CACHE INSTANCE
def invalidate(self):
return CacheInvalidation.invalidate(self.repo_name)
def set_invalidate(self):
set a cache for invalidation for this instance
CacheInvalidation.set_invalidate(repo_name=self.repo_name)
def scm_instance(self):
import rhodecode
full_cache = str2bool(rhodecode.CONFIG.get('vcs_full_cache'))
if full_cache:
return self.scm_instance_cached()
return self.__get_instance()
def scm_instance_cached(self, cache_map=None):
@cache_region('long_term')
def _c(repo_name):
rn = self.repo_name
log.debug('Getting cached instance of repo')
if cache_map:
# get using prefilled cache_map
invalidate_repo = cache_map[self.repo_name]
if invalidate_repo:
invalidate_repo = (None if invalidate_repo.cache_active
else invalidate_repo)
# get from invalidate
invalidate_repo = self.invalidate
if invalidate_repo is not None:
region_invalidate(_c, None, rn)
# update our cache
CacheInvalidation.set_valid(invalidate_repo.cache_key)
return _c(rn)
def __get_instance(self):
repo_full_path = self.repo_full_path
try:
alias = get_scm(repo_full_path)[0]
log.debug('Creating instance of %s repository' % alias)
backend = get_backend(alias)
except VCSError:
log.error(traceback.format_exc())
log.error('Perhaps this repository is in db and not in '
'filesystem run rescan repositories with '
'"destroy old data " option from admin panel')
return
if alias == 'hg':
repo = backend(safe_str(repo_full_path), create=False,
baseui=self._ui)
# skip hidden web repository
if repo._get_hidden():
repo = backend(repo_full_path, create=False)
return repo
class RepoGroup(Base, BaseModel):
__tablename__ = 'groups'
__table_args__ = (
UniqueConstraint('group_name', 'group_parent_id'),
CheckConstraint('group_id != group_parent_id'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'},
__mapper_args__ = {'order_by': 'group_name'}
group_id = Column("group_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
group_name = Column("group_name", String(255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
group_parent_id = Column("group_parent_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=None, default=None)
group_description = Column("group_description", String(10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
enable_locking = Column("enable_locking", Boolean(), nullable=False, unique=None, default=False)
repo_group_to_perm = relationship('UserRepoGroupToPerm', cascade='all', order_by='UserRepoGroupToPerm.group_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoGroupToPerm', cascade='all')
parent_group = relationship('RepoGroup', remote_side=group_id)
def __init__(self, group_name='', parent_group=None):
self.group_name = group_name
self.parent_group = parent_group
def __unicode__(self):
return u"<%s('%s:%s')>" % (self.__class__.__name__, self.group_id,
self.group_name)
def groups_choices(cls, groups=None, check_perms=False, show_empty_group=True):
from webhelpers.html import literal as _literal
from rhodecode.model.scm import ScmModel
if not groups:
groups = cls.query().all()
if check_perms:
#filter group user have access to, it's done
#magically inside ScmModel based on current user
groups = ScmModel().get_repos_groups(groups)
repo_groups = []
if show_empty_group:
repo_groups = [('-1', '-- no parent --')]
sep = ' » '
_name = lambda k: _literal(sep.join(k))
repo_groups.extend([(x.group_id, _name(x.full_path_splitted))
for x in groups])
repo_groups = sorted(repo_groups, key=lambda t: t[1].split(sep)[0])
return repo_groups
def url_sep(cls):
return URL_SEP
def get_by_group_name(cls, group_name, cache=False, case_insensitive=False):
if case_insensitive:
gr = cls.query()\
.filter(cls.group_name.ilike(group_name))
Status change: