@@ -39,386 +39,389 @@ import warnings
from os.path import abspath
from os.path import dirname
from webhelpers.text import collapse, remove_formatting, strip_tags
from beaker.cache import _cache_decorate
from kallithea.lib.vcs.utils.hgcompat import ui, config
from kallithea.lib.vcs.utils.helpers import get_scm
from kallithea.lib.vcs.exceptions import VCSError
from kallithea.model import meta
from kallithea.model.db import Repository, User, Ui, \
UserLog, RepoGroup, Setting, UserGroup
from kallithea.model.repo_group import RepoGroupModel
from kallithea.lib.utils2 import safe_str, safe_unicode, get_current_authuser
from kallithea.lib.vcs.utils.fakemod import create_module
log = logging.getLogger(__name__)
REMOVED_REPO_PAT = re.compile(r'rm__\d{8}_\d{6}_\d{6}_.*')
def recursive_replace(str_, replace=' '):
"""
Recursive replace of given sign to just one instance
:param str_: given string
:param replace: char to find and replace multiple instances
Examples::
>>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
'Mighty-Mighty-Bo-sstones'
if str_.find(replace * 2) == -1:
return str_
else:
str_ = str_.replace(replace * 2, replace)
return recursive_replace(str_, replace)
def repo_name_slug(value):
Return slug of name of repository
This function is called on each creation/modification
of repository to prevent bad names in repo
slug = remove_formatting(value)
slug = strip_tags(slug)
for c in """`?=[]\;'"<>,/~!@#$%^&*()+{}|: """:
slug = slug.replace(c, '-')
slug = recursive_replace(slug, '-')
slug = collapse(slug, '-')
return slug
#==============================================================================
# PERM DECORATOR HELPERS FOR EXTRACTING NAMES FOR PERM CHECKS
def get_repo_slug(request):
_repo = request.environ['pylons.routes_dict'].get('repo_name')
if _repo:
_repo = _repo.rstrip('/')
return _repo
def get_repo_group_slug(request):
_group = request.environ['pylons.routes_dict'].get('group_name')
if _group:
_group = _group.rstrip('/')
return _group
def get_user_group_slug(request):
_group = request.environ['pylons.routes_dict'].get('id')
_group = UserGroup.get(_group)
return _group.users_group_name
return None
def _extract_id_from_repo_name(repo_name):
if repo_name.startswith('/'):
repo_name = repo_name.lstrip('/')
by_id_match = re.match(r'^_(\d{1,})', repo_name)
if by_id_match:
return by_id_match.groups()[0]
def get_repo_by_id(repo_name):
Extracts repo_name by id from special urls. Example url is _11/repo_name
:param repo_name:
:return: repo_name if matched else None
_repo_id = _extract_id_from_repo_name(repo_name)
if _repo_id:
from kallithea.model.db import Repository
repo = Repository.get(_repo_id)
if repo:
# TODO: return repo instead of reponame? or would that be a layering violation?
return repo.repo_name
def action_logger(user, action, repo, ipaddr='', sa=None, commit=False):
Action logger for various actions made by users
:param user: user that made this action, can be a unique username string or
object containing user_id attribute
:param action: action to log, should be on of predefined unique actions for
easy translations
:param repo: string name of repository or object containing repo_id,
that action was made on
:param ipaddr: optional IP address from what the action was made
:param sa: optional sqlalchemy session
if not sa:
sa = meta.Session()
# if we don't get explicit IP address try to get one from registered user
# in tmpl context var
if not ipaddr:
ipaddr = getattr(get_current_authuser(), 'ip_addr', '')
if getattr(user, 'user_id', None):
user_obj = User.get(user.user_id)
elif isinstance(user, basestring):
user_obj = User.get_by_username(user)
raise Exception('You have to provide a user object or a username')
if getattr(repo, 'repo_id', None):
repo_obj = Repository.get(repo.repo_id)
repo_name = repo_obj.repo_name
elif isinstance(repo, basestring):
repo_name = repo.lstrip('/')
repo_obj = Repository.get_by_repo_name(repo_name)
repo_obj = None
repo_name = u''
user_log = UserLog()
user_log.user_id = user_obj.user_id
user_log.username = user_obj.username
user_log.action = safe_unicode(action)
user_log.repository = repo_obj
user_log.repository_name = repo_name
user_log.action_date = datetime.datetime.now()
user_log.user_ip = ipaddr
sa.add(user_log)
log.info('Logging action:%s on %s by user:%s ip:%s',
action, safe_unicode(repo), user_obj, ipaddr)
if commit:
sa.commit()
def get_filesystem_repos(path):
Scans given path for repos and return (name,(type,path)) tuple
:param path: path to scan for repositories
:param recursive: recursive search and return names with subdirs in front
# remove ending slash for better results
path = safe_str(path.rstrip(os.sep))
log.debug('now scanning in %s', path)
def isdir(*n):
return os.path.isdir(os.path.join(*n))
for root, dirs, _files in os.walk(path):
recurse_dirs = []
for subdir in dirs:
# skip removed repos
if REMOVED_REPO_PAT.match(subdir):
continue
#skip .<something> dirs TODO: rly? then we should prevent creating them ...
if subdir.startswith('.'):
cur_path = os.path.join(root, subdir)
if isdir(cur_path, '.git'):
log.warning('ignoring non-bare Git repo: %s', cur_path)
if (isdir(cur_path, '.hg') or
isdir(cur_path, '.git') or
isdir(cur_path, '.svn') or
isdir(cur_path, 'objects') and (isdir(cur_path, 'refs') or
os.path.isfile(os.path.join(cur_path, 'packed-refs')))):
if not os.access(cur_path, os.R_OK) or not os.access(cur_path, os.X_OK):
log.warning('ignoring repo path without access: %s', cur_path)
if not os.access(cur_path, os.W_OK):
log.warning('repo path without write access: %s', cur_path)
try:
scm_info = get_scm(cur_path)
assert cur_path.startswith(path)
repo_path = cur_path[len(path) + 1:]
yield repo_path, scm_info
continue # no recursion
except VCSError:
# We should perhaps ignore such broken repos, but especially
# the bare git detection is unreliable so we dive into it
pass
recurse_dirs.append(subdir)
dirs[:] = recurse_dirs
def is_valid_repo(repo_name, base_path, scm=None):
Returns True if given path is a valid repository False otherwise.
If scm param is given also compare if given scm is the same as expected
from scm parameter
:param base_path:
:param scm:
:return True: if given path is a valid repository
full_path = os.path.join(safe_str(base_path), safe_str(repo_name))
scm_ = get_scm(full_path)
if scm:
return scm_[0] == scm
return True
return False
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
Returns True if given path is a repository group False otherwise
full_path = os.path.join(safe_str(base_path), safe_str(repo_group_name))
# check if it's not a repo
if is_valid_repo(repo_group_name, base_path):
# we need to check bare git repos at higher level
# since we might match branches/hooks/info/objects or possible
# other things inside bare git repo
get_scm(os.path.dirname(full_path))
# check if it's a valid path
if skip_path_check or os.path.isdir(full_path):
#propagated from mercurial documentation
ui_sections = ['alias', 'auth',
'decode/encode', 'defaults',
'diff', 'email',
'extensions', 'format',
'merge-patterns', 'merge-tools',
'hooks', 'http_proxy',
'smtp', 'patch',
'paths', 'profiling',
'server', 'trusted',
'ui', 'web', ]
def make_ui(read_from='file', path=None, checkpaths=True, clear_session=True):
A function that will read python rc files or database
and make an mercurial ui object from read options
:param path: path to mercurial config file
:param checkpaths: check the path
:param read_from: read from 'file' or 'db'
baseui = ui.ui()
# clean the baseui object
baseui._ocfg = config.config()
baseui._ucfg = config.config()
baseui._tcfg = config.config()
if read_from == 'file':
if not os.path.isfile(path):
log.debug('hgrc file is not present at %s, skipping...', path)
log.debug('reading hgrc from %s', path)
cfg = config.config()
cfg.read(path)
for section in ui_sections:
for k, v in cfg.items(section):
log.debug('settings ui from file: [%s] %s=%s', section, k, v)
baseui.setconfig(safe_str(section), safe_str(k), safe_str(v))
elif read_from == 'db':
ret = sa.query(Ui).all()
hg_ui = ret
for ui_ in hg_ui:
if ui_.ui_active:
ui_val = '' if ui_.ui_value is None else safe_str(ui_.ui_value)
log.debug('settings ui from db: [%s] %s=%r', ui_.ui_section,
ui_.ui_key, ui_val)
baseui.setconfig(safe_str(ui_.ui_section), safe_str(ui_.ui_key),
ui_val)
if clear_session:
meta.Session.remove()
# force set push_ssl requirement to False, Kallithea handles that
baseui.setconfig('web', 'push_ssl', False)
baseui.setconfig('web', 'allow_push', '*')
# prevent interactive questions for ssh password / passphrase
ssh = baseui.config('ui', 'ssh', default='ssh')
baseui.setconfig('ui', 'ssh', '%s -oBatchMode=yes -oIdentitiesOnly=yes' % ssh)
return baseui
def set_app_settings(config):
Updates app config with new settings from database
:param config:
hgsettings = Setting.get_app_settings()
for k, v in hgsettings.items():
config[k] = v
def set_vcs_config(config):
Patch VCS config with some Kallithea specific stuff
:param config: kallithea.CONFIG
from kallithea.lib.vcs import conf
from kallithea.lib.utils2 import aslist
conf.settings.BACKENDS = {
'hg': 'kallithea.lib.vcs.backends.hg.MercurialRepository',
'git': 'kallithea.lib.vcs.backends.git.GitRepository',
}
conf.settings.GIT_EXECUTABLE_PATH = config.get('git_path', 'git')
conf.settings.GIT_REV_FILTER = config.get('git_rev_filter', '--all').strip()
conf.settings.DEFAULT_ENCODINGS = aslist(config.get('default_encoding',
'utf8'), sep=',')
def set_indexer_config(config):
Update Whoosh index mapping
from kallithea.config import conf
log.debug('adding extra into INDEX_EXTENSIONS')
conf.INDEX_EXTENSIONS.extend(re.split('\s+', config.get('index.extensions', '')))
log.debug('adding extra into INDEX_FILENAMES')
conf.INDEX_FILENAMES.extend(re.split('\s+', config.get('index.filenames', '')))
Status change: