@@ -72,193 +72,193 @@ def recursive_replace(str, replace=' '):
def repo_name_slug(value):
"""Return slug of name of repository
This function is called on each creation/modification
of repository to prevent bad names in repo
"""
slug = remove_formatting(value)
slug = strip_tags(slug)
for c in """=[]\;'"<>,/~!@#$%^&*()+{}|: """:
slug = slug.replace(c, '-')
slug = recursive_replace(slug, '-')
slug = collapse(slug, '-')
return slug
def get_repo_slug(request):
return request.environ['pylons.routes_dict'].get('repo_name')
def action_logger(user, action, repo, ipaddr='', sa=None):
Action logger for various actions made by users
:param user: user that made this action, can be a unique username string or
object containing user_id attribute
:param action: action to log, should be on of predefined unique actions for
easy translations
:param repo: string name of repository or object containing repo_id,
that action was made on
:param ipaddr: optional ip address from what the action was made
:param sa: optional sqlalchemy session
if not sa:
sa = meta.Session()
try:
um = UserModel()
if hasattr(user, 'user_id'):
user_obj = user
elif isinstance(user, basestring):
user_obj = um.get_by_username(user, cache=False)
else:
raise Exception('You have to provide user object or username')
rm = RepoModel()
if hasattr(repo, 'repo_id'):
repo_obj = rm.get(repo.repo_id, cache=False)
repo_name = repo_obj.repo_name
elif isinstance(repo, basestring):
repo_name = repo.lstrip('/')
repo_obj = rm.get_by_repo_name(repo_name, cache=False)
raise Exception('You have to provide repository to action logger')
user_log = UserLog()
user_log.user_id = user_obj.user_id
user_log.action = action
user_log.repository_id = repo_obj.repo_id
user_log.repository_name = repo_name
user_log.action_date = datetime.datetime.now()
user_log.user_ip = ipaddr
sa.add(user_log)
sa.commit()
log.info('Adding user %s, action %s on %s', user_obj, action, repo)
except:
log.error(traceback.format_exc())
sa.rollback()
def get_repos(path, recursive=False):
Scans given path for repos and return (name,(type,path)) tuple
:param path: path to scann for repositories
:param recursive: recursive search and return names with subdirs in front
from vcs.utils.helpers import get_scm
from vcs.exceptions import VCSError
if path.endswith('/'):
#add ending slash for better results
path = path[:-1]
def _get_repos(p):
for dirpath in os.listdir(p):
if os.path.isfile(os.path.join(p, dirpath)):
continue
cur_path = os.path.join(p, dirpath)
scm_info = get_scm(cur_path)
yield scm_info[1].split(path)[-1].lstrip('/'), scm_info
yield scm_info[1].split(path)[-1].lstrip(os.sep), scm_info
except VCSError:
if not recursive:
#check if this dir containts other repos for recursive scan
rec_path = os.path.join(p, dirpath)
if os.path.isdir(rec_path):
for inner_scm in _get_repos(rec_path):
yield inner_scm
return _get_repos(path)
def check_repo_fast(repo_name, base_path):
Check given path for existence of directory
:param repo_name:
:param base_path:
:return False: if this directory is present
if os.path.isdir(os.path.join(base_path, repo_name)):return False
return True
def check_repo(repo_name, base_path, verify=True):
repo_path = os.path.join(base_path, repo_name)
if not check_repo_fast(repo_name, base_path):
return False
r = hg.repository(ui.ui(), repo_path)
if verify:
hg.verify(r)
#here we hnow that repo exists it was verified
log.info('%s repo is already created', repo_name)
except RepoError:
#it means that there is no valid repo there...
log.info('%s repo is free for creation', repo_name)
def ask_ok(prompt, retries=4, complaint='Yes or no, please!'):
while True:
ok = raw_input(prompt)
if ok in ('y', 'ye', 'yes'): return True
if ok in ('n', 'no', 'nop', 'nope'): return False
retries = retries - 1
if retries < 0: raise IOError
print complaint
#propagated from mercurial documentation
ui_sections = ['alias', 'auth',
'decode/encode', 'defaults',
'diff', 'email',
'extensions', 'format',
'merge-patterns', 'merge-tools',
'hooks', 'http_proxy',
'smtp', 'patch',
'paths', 'profiling',
'server', 'trusted',
'ui', 'web', ]
def make_ui(read_from='file', path=None, checkpaths=True):
"""A function that will read python rc files or database
and make an mercurial ui object from read options
:param path: path to mercurial config file
:param checkpaths: check the path
:param read_from: read from 'file' or 'db'
baseui = ui.ui()
#clean the baseui object
baseui._ocfg = config.config()
baseui._ucfg = config.config()
baseui._tcfg = config.config()
if read_from == 'file':
if not os.path.isfile(path):
log.warning('Unable to read config file %s' % path)
log.debug('reading hgrc from %s', path)
cfg = config.config()
cfg.read(path)
for section in ui_sections:
for k, v in cfg.items(section):
log.debug('settings ui from file[%s]%s:%s', section, k, v)
baseui.setconfig(section, k, v)
elif read_from == 'db':
ret = sa.query(RhodeCodeUi)\
.options(FromCache("sql_cache_short",
"get_hg_ui_settings")).all()
Status change: