Changeset - 853717af31d3
[Not reviewed]
default
0 1 0
Mads Kiilerich (mads) - 5 years ago 2021-01-01 18:04:16
mads@kiilerich.com
Grafted from: ec1e77cb37bb
celery: let async tasks choose at runtime if they should use immediate execution or dispatch to the Celery worker

Make it completely safe to use task annotation at import time, before global
config has been set.
1 file changed with 9 insertions and 8 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/celerylib/__init__.py
Show inline comments
 
@@ -40,35 +40,36 @@ from kallithea.model import meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def task(f_org):
 
    """Wrapper of celery.task.task, running async if CELERY_APP
 
    """Wrapper of celery.task.task, run at import time, before kallithea.CONFIG has been set, and before kallithea.CELERY_APP has been configured.
 
    """
 

	
 
    if asbool(kallithea.CONFIG.get('use_celery')):
 
        def f_async(*args, **kwargs):
 
            log.info('executing %s task', f_org.__name__)
 
        log.info('executing async task %s', f_org.__name__)
 
            try:
 
                f_org(*args, **kwargs)
 
            finally:
 
                meta.Session.remove()  # prevent reuse of auto created db sessions
 
                log.info('executed %s task', f_org.__name__)
 
            log.info('executed async task %s', f_org.__name__)
 

	
 
        runner = kallithea.CELERY_APP.task(name=f_org.__name__, ignore_result=True)(f_async)
 

	
 
        def f_wrapped(*args, **kwargs):
 
        if asbool(kallithea.CONFIG.get('use_celery')):
 
            t = runner.apply_async(args=args, kwargs=kwargs)
 
            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
 
            log.info('executing async task %s - id %s', f_org, t.task_id)
 
    else:
 
        def f_wrapped(*args, **kwargs):
 
            log.info('executing task %s in sync', f_org.__name__)
 
            # invoke f_org directly, without the meta.Session.remove in f_async
 
            log.info('executing sync task %s', f_org.__name__)
 
            try:
 
                f_org(*args, **kwargs)
 
            except Exception as e:
 
                log.error('exception executing sync task %s in sync: %r', f_org.__name__, e)
 
                log.error('exception executing sync task %s: %r', f_org.__name__, e)
 
                raise # TODO: report errors differently ... and consistently between sync and async
 

	
 
    return f_wrapped
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
0 comments (0 inline, 0 general)