Changeset - 853717af31d3
[Not reviewed]
default
0 1 0
Mads Kiilerich (mads) - 5 years ago 2021-01-01 18:04:16
mads@kiilerich.com
Grafted from: ec1e77cb37bb
celery: let async tasks choose at runtime if they should use immediate execution or dispatch to the Celery worker

Make it completely safe to use task annotation at import time, before global
config has been set.
1 file changed with 17 insertions and 16 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/celerylib/__init__.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.celerylib
 
~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
celery libs for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Nov 27, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import os
 
from hashlib import sha1
 

	
 
from decorator import decorator
 
from tg import config
 

	
 
import kallithea
 
from kallithea.lib.pidlock import DaemonLock, LockHeld
 
from kallithea.lib.utils2 import asbool, safe_bytes
 
from kallithea.model import meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def task(f_org):
 
    """Wrapper of celery.task.task, running async if CELERY_APP
 
    """Wrapper of celery.task.task, run at import time, before kallithea.CONFIG has been set, and before kallithea.CELERY_APP has been configured.
 
    """
 

	
 
    if asbool(kallithea.CONFIG.get('use_celery')):
 
        def f_async(*args, **kwargs):
 
            log.info('executing %s task', f_org.__name__)
 
            try:
 
                f_org(*args, **kwargs)
 
            finally:
 
                meta.Session.remove()  # prevent reuse of auto created db sessions
 
                log.info('executed %s task', f_org.__name__)
 
        runner = kallithea.CELERY_APP.task(name=f_org.__name__, ignore_result=True)(f_async)
 
    def f_async(*args, **kwargs):
 
        log.info('executing async task %s', f_org.__name__)
 
        try:
 
            f_org(*args, **kwargs)
 
        finally:
 
            meta.Session.remove()  # prevent reuse of auto created db sessions
 
            log.info('executed async task %s', f_org.__name__)
 

	
 
        def f_wrapped(*args, **kwargs):
 
    runner = kallithea.CELERY_APP.task(name=f_org.__name__, ignore_result=True)(f_async)
 

	
 
    def f_wrapped(*args, **kwargs):
 
        if asbool(kallithea.CONFIG.get('use_celery')):
 
            t = runner.apply_async(args=args, kwargs=kwargs)
 
            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
 
    else:
 
        def f_wrapped(*args, **kwargs):
 
            log.info('executing task %s in sync', f_org.__name__)
 
            log.info('executing async task %s - id %s', f_org, t.task_id)
 
        else:
 
            # invoke f_org directly, without the meta.Session.remove in f_async
 
            log.info('executing sync task %s', f_org.__name__)
 
            try:
 
                f_org(*args, **kwargs)
 
            except Exception as e:
 
                log.error('exception executing sync task %s in sync: %r', f_org.__name__, e)
 
                log.error('exception executing sync task %s: %r', f_org.__name__, e)
 
                raise # TODO: report errors differently ... and consistently between sync and async
 

	
 
    return f_wrapped
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
 
    params = list(fargs)
 
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
 

	
 
    lockkey = 'task_%s.lock' % \
 
        sha1(safe_bytes(func_name + '-' + '-'.join(str(x) for x in params))).hexdigest()
 
    return lockkey
 

	
 

	
 
def locked_task(func):
 
    def __wrapper(func, *fargs, **fkwargs):
 
        lockkey = __get_lockkey(func, *fargs, **fkwargs)
 
        log.info('running task with lockkey %s', lockkey)
 
        try:
 
            l = DaemonLock(os.path.join(config['cache_dir'], lockkey))
 
            ret = func(*fargs, **fkwargs)
 
            l.release()
 
            return ret
 
        except LockHeld:
 
            log.info('LockHeld')
 
            return 'Task with key %s already running' % lockkey
 

	
 
    return decorator(__wrapper, func)
0 comments (0 inline, 0 general)