Changeset - 1b683a4eb9fc
[Not reviewed]
default
0 2 0
Mads Kiilerich (mads) - 5 years ago 2020-12-28 00:43:18
mads@kiilerich.com
Grafted from: b6354f238d1b
TurboGears: drop workaround for < 2.4

Backout a38e05a0c79e and tweak.
2 files changed with 2 insertions and 7 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/celerylib/__init__.py
Show inline comments
 
@@ -56,78 +56,76 @@ class FakeTask(object):
 
    task_id = None
 

	
 

	
 
def task(f_org):
 
    """Wrapper of celery.task.task, running async if CELERY_APP
 
    """
 

	
 
    if kallithea.CELERY_APP:
 
        def f_async(*args, **kwargs):
 
            log.info('executing %s task', f_org.__name__)
 
            try:
 
                f_org(*args, **kwargs)
 
            finally:
 
                log.info('executed %s task', f_org.__name__)
 
        f_async.__name__ = f_org.__name__
 
        runner = kallithea.CELERY_APP.task(ignore_result=True)(f_async)
 

	
 
        def f_wrapped(*args, **kwargs):
 
            t = runner.apply_async(args=args, kwargs=kwargs)
 
            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
 
            return t
 
    else:
 
        def f_wrapped(*args, **kwargs):
 
            log.info('executing task %s in sync', f_org.__name__)
 
            try:
 
                result = f_org(*args, **kwargs)
 
            except Exception as e:
 
                log.error('exception executing sync task %s in sync: %r', f_org.__name__, e)
 
                raise # TODO: return this in FakeTask as with async tasks?
 
            return FakeTask(result)
 

	
 
    return f_wrapped
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
 
    params = list(fargs)
 
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
 

	
 
    lockkey = 'task_%s.lock' % \
 
        sha1(safe_bytes(func_name + '-' + '-'.join(str(x) for x in params))).hexdigest()
 
    return lockkey
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
 
        lockkey_path = config.get('cache_dir') or config['app_conf']['cache_dir']  # Backward compatibility for TurboGears < 2.4
 

	
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(os.path.join(lockkey_path, lockkey))
 
            l = DaemonLock(os.path.join(config['cache_dir'], lockkey))
 
            ret = func(*fargs, **fkwargs)
 
            l.release()
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
 

	
 

	
 
def get_session():
 
    sa = meta.Session()
 
    return sa
 

	
 

	
 
def dbsession(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        try:
 
            ret = func(*fargs, **fkwargs)
 
            return ret
 
        finally:
 
            if kallithea.CELERY_APP and not kallithea.CELERY_APP.conf.task_always_eager:
 
                meta.Session.remove()
 

	
 
    return decorator(__wrapper, func)
kallithea/model/async_tasks.py
Show inline comments
 
@@ -39,102 +39,99 @@ from time import mktime
 
import celery.utils.log
 
from tg import config
 

	
 
import kallithea
 
from kallithea.lib import celerylib, conf, ext_json, hooks
 
from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
from kallithea.lib.utils2 import asbool, ascii_bytes
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.model import db, repo, userlog
 

	
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
 

	
 

	
 
log = celery.utils.log.get_task_logger(__name__)
 

	
 

	
 
@celerylib.task
 
@celerylib.locked_task
 
@celerylib.dbsession
 
def whoosh_index(repo_location, full_index):
 
    celerylib.get_session() # initialize database connection
 

	
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
                         .run(full_index=full_index)
 

	
 

	
 
def _author_username(author):
 
    """Return the username of the user identified by the email part of the 'author' string,
 
    default to the name or email.
 
    Kind of similar to h.person() ."""
 
    email = author_email(author)
 
    if email:
 
        user = db.User.get_by_email(email)
 
        if user is not None:
 
            return user.username
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return author_name(author) or email
 

	
 

	
 
@celerylib.task
 
@celerylib.dbsession
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
 
    DBS = celerylib.get_session()
 
    lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    lockkey_path = config.get('cache_dir') or config['app_conf']['cache_dir']  # Backward compatibility for TurboGears < 2.4
 

	
 
    log.info('running task with lockkey %s', lockkey)
 

	
 
    try:
 
        lock = celerylib.DaemonLock(os.path.join(lockkey_path, lockkey))
 
        lock = celerylib.DaemonLock(os.path.join(config['cache_dir'], lockkey))
 

	
 
        co_day_auth_aggr = {}
 
        commits_by_day_aggregate = {}
 
        db_repo = db.Repository.get_by_repo_name(repo_name)
 
        if db_repo is None:
 
            return True
 

	
 
        scm_repo = db_repo.scm_instance
 
        repo_size = scm_repo.count()
 
        # return if repo have no revisions
 
        if repo_size < 1:
 
            lock.release()
 
            return True
 

	
 
        skip_date_limit = True
 
        parse_limit = int(config.get('commit_parse_limit'))
 
        last_rev = None
 
        last_cs = None
 
        timegetter = itemgetter('time')
 

	
 
        dbrepo = DBS.query(db.Repository) \
 
            .filter(db.Repository.repo_name == repo_name).scalar()
 
        cur_stats = DBS.query(db.Statistics) \
 
            .filter(db.Statistics.repository == dbrepo).scalar()
 

	
 
        if cur_stats is not None:
 
            last_rev = cur_stats.stat_on_revision
 

	
 
        if last_rev == scm_repo.get_changeset().revision and repo_size > 1:
 
            # pass silently without any work if we're not on first revision or
 
            # current state of parsing revision(from db marker) is the
 
            # last revision
 
            lock.release()
 
            return True
 

	
 
        if cur_stats:
 
            commits_by_day_aggregate = OrderedDict(ext_json.loads(
 
                                        cur_stats.commit_activity_combined))
 
            co_day_auth_aggr = ext_json.loads(cur_stats.commit_activity)
 

	
 
        log.debug('starting parsing %s', parse_limit)
 

	
 
        last_rev = last_rev + 1 if last_rev and last_rev >= 0 else 0
 
        log.debug('Getting revisions from %s to %s',
 
             last_rev, last_rev + parse_limit
 
        )
 
        usernames_cache = {}
 
        for cs in scm_repo[last_rev:last_rev + parse_limit]:
0 comments (0 inline, 0 general)