@@ -135,194 +135,197 @@ def get_repo_by_id(repo_name):
:return: repo_name if matched else None
"""
_repo_id = _extract_id_from_repo_name(repo_name)
if _repo_id:
from kallithea.model.db import Repository
repo = Repository.get(_repo_id)
if repo:
# TODO: return repo instead of reponame? or would that be a layering violation?
return repo.repo_name
return None
def action_logger(user, action, repo, ipaddr='', sa=None, commit=False):
Action logger for various actions made by users
:param user: user that made this action, can be a unique username string or
object containing user_id attribute
:param action: action to log, should be on of predefined unique actions for
easy translations
:param repo: string name of repository or object containing repo_id,
that action was made on
:param ipaddr: optional IP address from what the action was made
:param sa: optional sqlalchemy session
if not sa:
sa = meta.Session()
# if we don't get explicit IP address try to get one from registered user
# in tmpl context var
if not ipaddr:
ipaddr = getattr(get_current_authuser(), 'ip_addr', '')
if getattr(user, 'user_id', None):
user_obj = User.get(user.user_id)
elif isinstance(user, basestring):
user_obj = User.get_by_username(user)
else:
raise Exception('You have to provide a user object or a username')
if getattr(repo, 'repo_id', None):
repo_obj = Repository.get(repo.repo_id)
repo_name = repo_obj.repo_name
elif isinstance(repo, basestring):
repo_name = repo.lstrip('/')
repo_obj = Repository.get_by_repo_name(repo_name)
repo_obj = None
repo_name = u''
user_log = UserLog()
user_log.user_id = user_obj.user_id
user_log.username = user_obj.username
user_log.action = safe_unicode(action)
user_log.repository = repo_obj
user_log.repository_name = repo_name
user_log.action_date = datetime.datetime.now()
user_log.user_ip = ipaddr
sa.add(user_log)
log.info('Logging action:%s on %s by user:%s ip:%s',
action, safe_unicode(repo), user_obj, ipaddr)
if commit:
sa.commit()
def get_filesystem_repos(path):
Scans given path for repos and return (name,(type,path)) tuple
:param path: path to scan for repositories
:param recursive: recursive search and return names with subdirs in front
# remove ending slash for better results
path = safe_str(path.rstrip(os.sep))
log.debug('now scanning in %s', path)
def isdir(*n):
return os.path.isdir(os.path.join(*n))
for root, dirs, _files in os.walk(path):
recurse_dirs = []
for subdir in dirs:
# skip removed repos
if REMOVED_REPO_PAT.match(subdir):
continue
#skip .<something> dirs TODO: rly? then we should prevent creating them ...
if subdir.startswith('.'):
cur_path = os.path.join(root, subdir)
if isdir(cur_path, '.git'):
log.warning('ignoring non-bare Git repo: %s', cur_path)
if (isdir(cur_path, '.hg') or
isdir(cur_path, '.git') or
isdir(cur_path, '.svn') or
isdir(cur_path, 'objects') and (isdir(cur_path, 'refs') or
os.path.isfile(os.path.join(cur_path, 'packed-refs')))):
if not os.access(cur_path, os.R_OK) or not os.access(cur_path, os.X_OK):
log.warning('ignoring repo path without access: %s', cur_path)
if not os.access(cur_path, os.W_OK):
log.warning('repo path without write access: %s', cur_path)
try:
scm_info = get_scm(cur_path)
assert cur_path.startswith(path)
repo_path = cur_path[len(path) + 1:]
yield repo_path, scm_info
continue # no recursion
except VCSError:
# We should perhaps ignore such broken repos, but especially
# the bare git detection is unreliable so we dive into it
pass
recurse_dirs.append(subdir)
dirs[:] = recurse_dirs
def is_valid_repo(repo_name, base_path, scm=None):
Returns True if given path is a valid repository False otherwise.
If scm param is given also compare if given scm is the same as expected
from scm parameter
:param repo_name:
:param base_path:
:param scm:
:return True: if given path is a valid repository
full_path = os.path.join(safe_str(base_path), safe_str(repo_name))
scm_ = get_scm(full_path)
if scm:
return scm_[0] == scm
return True
return False
def is_valid_repo_group(repo_group_name, base_path, skip_path_check=False):
Returns True if given path is a repository group False otherwise
full_path = os.path.join(safe_str(base_path), safe_str(repo_group_name))
# check if it's not a repo
if is_valid_repo(repo_group_name, base_path):
# we need to check bare git repos at higher level
# since we might match branches/hooks/info/objects or possible
# other things inside bare git repo
get_scm(os.path.dirname(full_path))
# check if it's a valid path
if skip_path_check or os.path.isdir(full_path):
#propagated from mercurial documentation
ui_sections = ['alias', 'auth',
'decode/encode', 'defaults',
'diff', 'email',
'extensions', 'format',
'merge-patterns', 'merge-tools',
'hooks', 'http_proxy',
'smtp', 'patch',
'paths', 'profiling',
'server', 'trusted',
'ui', 'web', ]
def make_ui(read_from='file', path=None, checkpaths=True, clear_session=True):
A function that will read python rc files or database
and make an mercurial ui object from read options
Status change: