@@ -482,124 +482,130 @@ class Repository(Base, BaseModel):
enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
user = relationship('User')
fork = relationship('Repository', remote_side=repo_id)
group = relationship('Group')
repo_to_perm = relationship('RepoToPerm', cascade='all', order_by='RepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id', cascade='all')
logs = relationship('UserLog', cascade='all')
def __repr__(self):
return "<%s('%s:%s')>" % (self.__class__.__name__,
self.repo_id, self.repo_name)
@classmethod
def url_sep(cls):
return '/'
def get_by_repo_name(cls, repo_name):
q = Session.query(cls).filter(cls.repo_name == repo_name)
q = q.options(joinedload(Repository.fork))\
.options(joinedload(Repository.user))\
.options(joinedload(Repository.group))\
return q.one()
def get_repo_forks(cls, repo_id):
return Session.query(cls).filter(Repository.fork_id == repo_id)
def base_path(cls):
"""
Returns base path when all repos are stored
:param cls:
q = Session.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/')
q = Session.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key ==
cls.url_sep())
q.options(FromCache("sql_cache_short", "repository_repo_path"))
return q.one().ui_value
@property
def just_name(self):
return self.repo_name.split(os.sep)[-1]
def groups_with_parents(self):
groups = []
if self.group is None:
return groups
cur_gr = self.group
groups.insert(0, cur_gr)
while 1:
gr = getattr(cur_gr, 'parent_group', None)
cur_gr = cur_gr.parent_group
if gr is None:
break
groups.insert(0, gr)
def groups_and_repo(self):
return self.groups_with_parents, self.just_name
@LazyProperty
def repo_path(self):
Returns base full path for that repository means where it actually
exists on a filesystem
Repository.url_sep())
def repo_full_path(self):
p = [self.repo_path]
# we need to split the name by / since this is how we store the
# names in the database, but that eventually needs to be converted
# into a valid system path
p += self.repo_name.split('/')
p += self.repo_name.split(Repository.url_sep())
return os.path.join(*p)
def get_new_name(self, repo_name):
returns new full repository name based on assigned group and new new
:param group_name:
path_prefix = self.group.full_path_splitted if self.group else []
return '/'.join(path_prefix + [repo_name])
return Repository.url_sep().join(path_prefix + [repo_name])
def _ui(self):
Creates an db based ui object for this repository
from mercurial import ui
from mercurial import config
baseui = ui.ui()
#clean the baseui object
baseui._ocfg = config.config()
baseui._ucfg = config.config()
baseui._tcfg = config.config()
ret = Session.query(RhodeCodeUi)\
.options(FromCache("sql_cache_short", "repository_repo_ui")).all()
hg_ui = ret
for ui_ in hg_ui:
if ui_.ui_active:
log.debug('settings ui from db[%s]%s:%s', ui_.ui_section,
ui_.ui_key, ui_.ui_value)
# -*- coding: utf-8 -*-
rhodecode.model.scm
~~~~~~~~~~~~~~~~~~~
Scm model for RhodeCode
:created_on: Apr 9, 2010
:author: marcink
:copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import time
import traceback
import logging
from sqlalchemy.exc import DatabaseError
from vcs import get_backend
from vcs.exceptions import RepositoryError
from vcs.utils.lazy import LazyProperty
from vcs.nodes import FileNode
from rhodecode import BACKENDS
from rhodecode.lib import helpers as h
from rhodecode.lib import safe_str
from rhodecode.lib.auth import HasRepoPermissionAny
from rhodecode.lib.utils import get_repos as get_filesystem_repos, make_ui, \
action_logger, EmptyChangeset
from rhodecode.model import BaseModel
from rhodecode.model.db import Repository, RhodeCodeUi, CacheInvalidation, \
UserFollowing, UserLog, User
log = logging.getLogger(__name__)
@@ -125,52 +126,57 @@ class ScmModel(BaseModel):
def repos_path(self):
"""Get's the repositories root path from database
q = self.sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
return q.ui_value
def repo_scan(self, repos_path=None):
"""Listing of repositories in given path. This path should not be a
repository itself. Return a dictionary of repository objects
:param repos_path: path to directory containing repositories
log.info('scanning for repositories in %s', repos_path)
if repos_path is None:
repos_path = self.repos_path
baseui = make_ui('db')
repos_list = {}
for name, path in get_filesystem_repos(repos_path, recursive=True):
# name need to be decomposed and put back together using the /
# since this is internal storage separator for rhodecode
name = Repository.url_sep().join(name.split(os.sep))
try:
if name in repos_list:
raise RepositoryError('Duplicate repository name %s '
'found in %s' % (name, path))
else:
klass = get_backend(path[0])
if path[0] == 'hg' and path[0] in BACKENDS.keys():
# for mercurial we need to have an str path
repos_list[name] = klass(safe_str(path[1]),
baseui=baseui)
if path[0] == 'git' and path[0] in BACKENDS.keys():
repos_list[name] = klass(path[1])
except OSError:
continue
return repos_list
def get_repos(self, all_repos=None, sort_key=None):
Get all repos from db and for each repo create it's
backend instance and fill that backed with information from database
:param all_repos: list of repository names as strings
give specific repositories list, good for filtering
Status change: