# -*- coding: utf-8 -*-
"""
rhodecode.lib.celerylib.__init__
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
celery libs for RhodeCode
:created_on: Nov 27, 2010
:author: marcink
:copyright: (C) 2009-2011 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import sys
import socket
import traceback
import logging
from os.path import dirname as dn, join as jn
from hashlib import md5
from decorator import decorator
from pylons import config
from vcs.utils.lazy import LazyProperty
from rhodecode.lib import str2bool
from rhodecode.lib.pidlock import DaemonLock, LockHeld
from celery.messaging import establish_connection
log = logging.getLogger(__name__)
try:
CELERY_ON = str2bool(config['app_conf'].get('use_celery'))
except KeyError:
CELERY_ON = False
class ResultWrapper(object):
def __init__(self, task):
self.task = task
@LazyProperty
def result(self):
return self.task
def run_task(task, *args, **kwargs):
if CELERY_ON:
t = task.apply_async(args=args, kwargs=kwargs)
log.info('running task %s:%s', t.task_id, task)
return t
except socket.error, e:
if e.errno == 111:
log.debug('Unable to connect to celeryd. Sync execution')
else:
log.error(traceback.format_exc())
except KeyError, e:
except Exception, e:
log.debug('executing task %s in sync mode', task)
return ResultWrapper(task(*args, **kwargs))
def __get_lockkey(func, *fargs, **fkwargs):
params = list(fargs)
params.extend(['%s-%s' % ar for ar in fkwargs.items()])
func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
lockkey = 'task_%s' % \
lockkey = 'task_%s.lock' % \
md5(func_name + '-' + '-'.join(map(str, params))).hexdigest()
return lockkey
def locked_task(func):
def __wrapper(func, *fargs, **fkwargs):
lockkey = __get_lockkey(func, *fargs, **fkwargs)
lockkey_path = dn(dn(dn(os.path.abspath(__file__))))
log.info('running task with lockkey %s', lockkey)
l = DaemonLock(lockkey)
l = DaemonLock(jn(lockkey_path, lockkey))
ret = func(*fargs, **fkwargs)
l.release()
return ret
except LockHeld:
log.info('LockHeld')
return 'Task with key %s already running' % lockkey
return decorator(__wrapper, func)
rhodecode.lib.celerylib.tasks
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
RhodeCode task modules, containing all task that suppose to be run
by celery daemon
:created_on: Oct 6, 2010
from celery.decorators import task
from time import mktime
from operator import itemgetter
from string import lower
from pylons.i18n.translation import _
from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP
from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
__get_lockkey, LockHeld, DaemonLock
from rhodecode.lib.helpers import person
from rhodecode.lib.smtp_mailer import SmtpMailer
from rhodecode.lib.utils import add_cache
from rhodecode.lib.odict import OrderedDict
from rhodecode.model import init_model
from rhodecode.model import meta
from rhodecode.model.db import RhodeCodeUi, Statistics, Repository
from vcs.backends import get_repo
from sqlalchemy import engine_from_config
add_cache(config)
import json
except ImportError:
#python 2.5 compatibility
import simplejson as json
__all__ = ['whoosh_index', 'get_commits_stats',
'reset_user_password', 'send_email']
def get_session():
engine = engine_from_config(config, 'sqlalchemy.db1.')
init_model(engine)
sa = meta.Session()
return sa
def get_repos_path():
sa = get_session()
q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one()
return q.ui_value
@task(ignore_result=True)
@locked_task
def whoosh_index(repo_location, full_index):
#log = whoosh_index.get_logger()
from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
index_location = config['index_dir']
WhooshIndexingDaemon(index_location=index_location,
repo_location=repo_location, sa=get_session())\
.run(full_index=full_index)
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
log = get_commits_stats.get_logger()
except:
lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
ts_max_y)
lockkey_path = dn(dn(dn(dn(os.path.abspath(__file__)))))
print jn(lockkey_path, lockkey)
lock = DaemonLock(lockkey)
lock = l = DaemonLock(jn(lockkey_path, lockkey))
#for js data compatibilty cleans the key for person from '
akc = lambda k: person(k).replace('"', "")
co_day_auth_aggr = {}
commits_by_day_aggregate = {}
repos_path = get_repos_path()
p = os.path.join(repos_path, repo_name)
repo = get_repo(p)
repo_size = len(repo.revisions)
#return if repo have no revisions
if repo_size < 1:
lock.release()
return True
skip_date_limit = True
parse_limit = int(config['app_conf'].get('commit_parse_limit'))
last_rev = 0
last_cs = None
timegetter = itemgetter('time')
dbrepo = sa.query(Repository)\
.filter(Repository.repo_name == repo_name).scalar()
cur_stats = sa.query(Statistics)\
.filter(Statistics.repository == dbrepo).scalar()
if cur_stats is not None:
last_rev = cur_stats.stat_on_revision
if last_rev == repo.get_changeset().revision and repo_size > 1:
#pass silently without any work if we're not on first revision or
#current state of parsing revision(from db marker) is the
#last revision
if cur_stats:
commits_by_day_aggregate = OrderedDict(json.loads(
cur_stats.commit_activity_combined))
co_day_auth_aggr = json.loads(cur_stats.commit_activity)
log.debug('starting parsing %s', parse_limit)
lmktime = mktime
last_rev = last_rev + 1 if last_rev > 0 else last_rev
@@ -54,97 +54,97 @@ INDEX_EXTENSIONS = LANGUAGES_EXTENSIONS_
#CUSTOM ANALYZER wordsplit + lowercase filter
ANALYZER = RegexTokenizer(expression=r"\w+") | LowercaseFilter()
#INDEX SCHEMA DEFINITION
SCHEMA = Schema(owner=TEXT(),
repository=TEXT(stored=True),
path=TEXT(stored=True),
content=FieldType(format=Characters(ANALYZER),
scorable=True, stored=True),
modtime=STORED(), extension=TEXT(stored=True))
IDX_NAME = 'HG_INDEX'
FORMATTER = HtmlFormatter('span', between='\n<span class="break">...</span>\n')
FRAGMENTER = SimpleFragmenter(200)
class MakeIndex(BasePasterCommand):
max_args = 1
min_args = 1
usage = "CONFIG_FILE"
summary = "Creates index for full text search given configuration file"
group_name = "RhodeCode"
takes_config_file = -1
parser = Command.standard_parser(verbose=True)
def command(self):
repo_location = self.options.repo_location
repo_list = map(strip, self.options.repo_list.split(',')) \
if self.options.repo_list else None
#======================================================================
# WHOOSH DAEMON
from rhodecode.lib.pidlock import LockHeld, DaemonLock
l = DaemonLock()
l = DaemonLock(file=jn(dn(dn(index_location)), 'make_index.lock'))
repo_location=repo_location,
repo_list=repo_list)\
.run(full_index=self.options.full_index)
sys.exit(1)
def update_parser(self):
self.parser.add_option('--repo-location',
action='store',
dest='repo_location',
help="Specifies repositories location to index REQUIRED",
)
self.parser.add_option('--index-only',
dest='repo_list',
help="Specifies a comma separated list of repositores "
"to build index on OPTIONAL",
self.parser.add_option('-f',
action='store_true',
dest='full_index',
help="Specifies that index should be made full i.e"
" destroy old and build from scratch",
default=False)
def __init__(self, search_type, searcher, matcher, highlight_items):
self.search_type = search_type
self.searcher = searcher
self.matcher = matcher
self.highlight_items = highlight_items
self.fragment_size = 200 / 2
def doc_ids(self):
docs_id = []
while self.matcher.is_active():
docnum = self.matcher.id()
chunks = [offsets for offsets in self.get_chunks()]
docs_id.append([docnum, chunks])
self.matcher.next()
return docs_id
def __str__(self):
return '<%s at %s>' % (self.__class__.__name__, len(self.doc_ids))
rhodecode.lib.utils
~~~~~~~~~~~~~~~~~~~
Utilities library for RhodeCode
:created_on: Apr 18, 2010
import datetime
import paste
import beaker
from paste.script.command import Command, BadCommand
from UserDict import DictMixin
from mercurial import ui, config, hg
from mercurial.error import RepoError
from webhelpers.text import collapse, remove_formatting, strip_tags
from vcs.backends.base import BaseChangeset
from rhodecode.model.caching_query import FromCache
from rhodecode.model.db import Repository, User, RhodeCodeUi, UserLog, Group, \
RhodeCodeSettings
from rhodecode.model.repo import RepoModel
from rhodecode.model.user import UserModel
def recursive_replace(str, replace=' '):
"""Recursive replace of given sign to just one instance
:param str: given string
:param replace: char to find and replace multiple instances
Examples::
>>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
'Mighty-Mighty-Bo-sstones'
if str.find(replace * 2) == -1:
return str
str = str.replace(replace * 2, replace)
return recursive_replace(str, replace)
def repo_name_slug(value):
"""Return slug of name of repository
This function is called on each creation/modification
of repository to prevent bad names in repo
slug = remove_formatting(value)
@@ -425,97 +426,97 @@ def add_cache(settings):
region_settings.setdefault('lock_dir',
cache_settings.get('lock_dir'))
region_settings.setdefault('data_dir',
cache_settings.get('data_dir'))
if 'type' not in region_settings:
region_settings['type'] = cache_settings.get('type',
'memory')
beaker.cache.cache_regions[region] = region_settings
def get_current_revision():
"""Returns tuple of (number, id) from repository containing this package
or None if repository could not be found.
from vcs import get_repo
from vcs.utils.helpers import get_scm
from vcs.exceptions import RepositoryError, VCSError
repopath = os.path.join(os.path.dirname(__file__), '..', '..')
scm = get_scm(repopath)[0]
repo = get_repo(path=repopath, alias=scm)
tip = repo.get_changeset()
return (tip.revision, tip.short_id)
except (ImportError, RepositoryError, VCSError), err:
logging.debug("Cannot retrieve rhodecode's revision. Original error "
"was: %s" % err)
return None
#==============================================================================
# TEST FUNCTIONS AND CREATORS
def create_test_index(repo_location, full_index):
"""Makes default test index
:param repo_location:
:param full_index:
import shutil
index_location = os.path.join(repo_location, 'index')
if os.path.exists(index_location):
shutil.rmtree(index_location)
repo_location=repo_location)\
pass
def create_test_env(repos_test_path, config):
"""Makes a fresh database and
install test repository into tmp dir
from rhodecode.lib.db_manage import DbManage
from rhodecode.tests import HG_REPO, GIT_REPO, NEW_HG_REPO, NEW_GIT_REPO, \
HG_FORK, GIT_FORK, TESTS_TMP_PATH
import tarfile
from os.path import dirname as dn, join as jn, abspath
log = logging.getLogger('TestEnvCreator')
# create logger
log.setLevel(logging.DEBUG)
log.propagate = True
# create console handler and set level to debug
ch = logging.StreamHandler()
ch.setLevel(logging.DEBUG)
# create formatter
formatter = logging.Formatter("%(asctime)s - %(name)s -"
" %(levelname)s - %(message)s")
# add formatter to ch
ch.setFormatter(formatter)
# add ch to logger
log.addHandler(ch)
#PART ONE create db
dbconf = config['sqlalchemy.db1.url']
log.debug('making test db %s', dbconf)
dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
tests=True)
dbmanage.create_tables(override=True)
dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
dbmanage.create_default_user()
dbmanage.admin_prompt()
dbmanage.create_permissions()
Status change: