Changeset - a5c17c93d246
[Not reviewed]
default
0 2 0
Mads Kiilerich (mads) - 5 years ago 2020-12-29 16:31:59
mads@kiilerich.com
Grafted from: 94d85877f688
celery: don't return values from task functions - we use ignore_result=True and will never get anything back when running async
2 files changed with 8 insertions and 15 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/celerylib/__init__.py
Show inline comments
 
@@ -69,29 +69,29 @@ def task(f_org):
 
                meta.Session.remove()  # prevent reuse of auto created db sessions
 
                log.info('executed %s task', f_org.__name__)
 
        runner = kallithea.CELERY_APP.task(name=f_org.__name__, ignore_result=True)(f_async)
 

	
 
        def f_wrapped(*args, **kwargs):
 
            t = runner.apply_async(args=args, kwargs=kwargs)
 
            log.info('executing task %s in async mode - id %s', f_org, t.task_id)
 
            return t
 
    else:
 
        def f_wrapped(*args, **kwargs):
 
            log.info('executing task %s in sync', f_org.__name__)
 
            try:
 
                result = f_org(*args, **kwargs)
 
                f_org(*args, **kwargs)
 
            except Exception as e:
 
                log.error('exception executing sync task %s in sync: %r', f_org.__name__, e)
 
                raise # TODO: return this in FakeTask as with async tasks?
 
            return FakeTask(result)
 
            return FakeTask(None)
 

	
 
    return f_wrapped
 

	
 

	
 
def __get_lockkey(func, *fargs, **fkwargs):
 
    params = list(fargs)
 
    params.extend(['%s-%s' % ar for ar in fkwargs.items()])
 

	
 
    func_name = str(func.__name__) if hasattr(func, '__name__') else str(func)
 

	
 
    lockkey = 'task_%s.lock' % \
 
        sha1(safe_bytes(func_name + '-' + '-'.join(str(x) for x in params))).hexdigest()
kallithea/model/async_tasks.py
Show inline comments
 
@@ -78,53 +78,53 @@ def _author_username(author):
 
@celerylib.task
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
 
    lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    log.info('running task with lockkey %s', lockkey)
 
    try:
 
        lock = celerylib.DaemonLock(os.path.join(config['cache_dir'], lockkey))
 

	
 
        co_day_auth_aggr = {}
 
        commits_by_day_aggregate = {}
 
        db_repo = db.Repository.get_by_repo_name(repo_name)
 
        if db_repo is None:
 
            return True
 
            return
 

	
 
        scm_repo = db_repo.scm_instance
 
        repo_size = scm_repo.count()
 
        # return if repo have no revisions
 
        if repo_size < 1:
 
            lock.release()
 
            return True
 
            return
 

	
 
        skip_date_limit = True
 
        parse_limit = int(config.get('commit_parse_limit'))
 
        last_rev = None
 
        last_cs = None
 
        timegetter = itemgetter('time')
 

	
 
        dbrepo = db.Repository.query() \
 
            .filter(db.Repository.repo_name == repo_name).scalar()
 
        cur_stats = db.Statistics.query() \
 
            .filter(db.Statistics.repository == dbrepo).scalar()
 

	
 
        if cur_stats is not None:
 
            last_rev = cur_stats.stat_on_revision
 

	
 
        if last_rev == scm_repo.get_changeset().revision and repo_size > 1:
 
            # pass silently without any work if we're not on first revision or
 
            # current state of parsing revision(from db marker) is the
 
            # last revision
 
            lock.release()
 
            return True
 
            return
 

	
 
        if cur_stats:
 
            commits_by_day_aggregate = OrderedDict(ext_json.loads(
 
                                        cur_stats.commit_activity_combined))
 
            co_day_auth_aggr = ext_json.loads(cur_stats.commit_activity)
 

	
 
        log.debug('starting parsing %s', parse_limit)
 

	
 
        last_rev = last_rev + 1 if last_rev and last_rev >= 0 else 0
 
        log.debug('Getting revisions from %s to %s',
 
             last_rev, last_rev + parse_limit
 
        )
 
@@ -204,39 +204,38 @@ def get_commits_stats(repo_name, ts_min_
 
            log.debug('getting code trending stats')
 
            stats.languages = ascii_bytes(ext_json.dumps(__get_codes_stats(repo_name)))
 

	
 
        try:
 
            stats.repository = dbrepo
 
            stats.stat_on_revision = last_cs.revision if last_cs else 0
 
            meta.Session().add(stats)
 
            meta.Session().commit()
 
        except:
 
            log.error(traceback.format_exc())
 
            meta.Session().rollback()
 
            lock.release()
 
            return False
 
            return
 

	
 
        # final release
 
        lock.release()
 

	
 
        # execute another task if celery is enabled
 
        if len(scm_repo.revisions) > 1 and kallithea.CELERY_APP and recurse_limit > 0:
 
            get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit - 1)
 
        elif recurse_limit <= 0:
 
            log.debug('Not recursing - limit has been reached')
 
        else:
 
            log.debug('Not recursing')
 
    except celerylib.LockHeld:
 
        log.info('Task with key %s already running', lockkey)
 
        return 'Task with key %s already running' % lockkey
 

	
 

	
 
@celerylib.task
 
def send_email(recipients, subject, body='', html_body='', headers=None, from_name=None):
 
    """
 
    Sends an email with defined parameters from the .ini files.
 

	
 
    :param recipients: list of recipients, if this is None, the defined email
 
        address from field 'email_to' and all admins is used instead
 
    :param subject: subject of the mail
 
    :param body: plain text body of the mail
 
    :param html_body: html version of body
 
@@ -258,25 +257,25 @@ def send_email(recipients, subject, body
 

	
 
    if not recipients:
 
        # if recipients are not defined we send to email_config + all admins
 
        recipients = [u.email for u in db.User.query()
 
                      .filter(db.User.admin == True).all()]
 
        if email_config.get('email_to') is not None:
 
            recipients += email_config.get('email_to').split(',')
 

	
 
        # If there are still no recipients, there are no admins and no address
 
        # configured in email_to, so return.
 
        if not recipients:
 
            log.error("No recipients specified and no fallback available.")
 
            return False
 
            return
 

	
 
        log.warning("No recipients specified for '%s' - sending to admins %s", subject, ' '.join(recipients))
 

	
 
    # SMTP sender
 
    app_email_from = email_config.get('app_email_from', 'Kallithea')
 
    # 'From' header
 
    if from_name is not None:
 
        # set From header based on from_name but with a generic e-mail address
 
        # In case app_email_from is in "Some Name <e-mail>" format, we first
 
        # extract the e-mail address.
 
        envelope_addr = author_email(app_email_from)
 
        headers['From'] = '"%s" <%s>' % (
 
@@ -295,25 +294,25 @@ def send_email(recipients, subject, body
 
              "recipients: %s\n"
 
              "headers: %s\n"
 
              "subject: %s\n"
 
              "body:\n%s\n"
 
              "html:\n%s\n"
 
              % (' '.join(recipients), headers, subject, body, html_body))
 

	
 
    if smtp_server:
 
        log.debug("Sending e-mail. " + logmsg)
 
    else:
 
        log.error("SMTP mail server not configured - cannot send e-mail.")
 
        log.warning(logmsg)
 
        return False
 
        return
 

	
 
    msg = email.message.EmailMessage()
 
    msg['Subject'] = subject
 
    msg['From'] = app_email_from  # fallback - might be overridden by a header
 
    msg['To'] = ', '.join(recipients)
 
    msg['Date'] = email.utils.formatdate(time.time())
 

	
 
    for key, value in headers.items():
 
        del msg[key]  # Delete key first to make sure add_header will replace header (if any), no matter the casing
 
        msg.add_header(key, value)
 

	
 
    msg.set_content(body)
 
@@ -333,26 +332,24 @@ def send_email(recipients, subject, body
 
            smtp_serv.esmtp_features["auth"] = smtp_auth
 

	
 
        if smtp_username and smtp_password is not None:
 
            smtp_serv.login(smtp_username, smtp_password)
 

	
 
        smtp_serv.sendmail(app_email_from, recipients, msg.as_string())
 
        smtp_serv.quit()
 

	
 
        log.info('Mail was sent to: %s' % recipients)
 
    except:
 
        log.error('Mail sending failed')
 
        log.error(traceback.format_exc())
 
        return False
 
    return True
 

	
 

	
 
@celerylib.task
 
def create_repo(form_data, cur_user):
 
    cur_user = db.User.guess_instance(cur_user)
 

	
 
    owner = cur_user
 
    repo_name = form_data['repo_name']
 
    repo_name_full = form_data['repo_name_full']
 
    repo_type = form_data['repo_type']
 
    description = form_data['repo_description']
 
    private = form_data['repo_private']
 
@@ -409,26 +406,24 @@ def create_repo(form_data, cur_user):
 
        meta.Session().commit()
 
    except Exception as e:
 
        log.warning('Exception %s occurred when forking repository, '
 
                    'doing cleanup...' % e)
 
        # rollback things manually !
 
        db_repo = db.Repository.get_by_repo_name(repo_name_full)
 
        if db_repo:
 
            db.Repository.delete(db_repo.repo_id)
 
            meta.Session().commit()
 
            repo.RepoModel()._delete_filesystem_repo(db_repo)
 
        raise
 

	
 
    return True
 

	
 

	
 
@celerylib.task
 
def create_repo_fork(form_data, cur_user):
 
    """
 
    Creates a fork of repository using interval VCS methods
 

	
 
    :param form_data:
 
    :param cur_user:
 
    """
 
    base_path = kallithea.CONFIG['base_path']
 
    cur_user = db.User.guess_instance(cur_user)
 

	
 
@@ -482,26 +477,24 @@ def create_repo_fork(form_data, cur_user
 
        meta.Session().commit()
 
    except Exception as e:
 
        log.warning('Exception %s occurred when forking repository, '
 
                    'doing cleanup...' % e)
 
        # rollback things manually !
 
        db_repo = db.Repository.get_by_repo_name(repo_name_full)
 
        if db_repo:
 
            db.Repository.delete(db_repo.repo_id)
 
            meta.Session().commit()
 
            repo.RepoModel()._delete_filesystem_repo(db_repo)
 
        raise
 

	
 
    return True
 

	
 

	
 
def __get_codes_stats(repo_name):
 
    scm_repo = db.Repository.get_by_repo_name(repo_name).scm_instance
 

	
 
    tip = scm_repo.get_changeset()
 
    code_stats = {}
 

	
 
    for _topnode, _dirnodes, filenodes in tip.walk('/'):
 
        for filenode in filenodes:
 
            ext = filenode.extension.lower()
 
            if ext in conf.LANGUAGES_EXTENSIONS_MAP and not filenode.is_binary:
 
                if ext in code_stats:
0 comments (0 inline, 0 general)