Changeset - 1f92cded1bd2
[Not reviewed]
default
0 5 0
Mads Kiilerich (mads) - 5 years ago 2020-12-29 22:07:41
mads@kiilerich.com
celery: move Whoosh indexing task to whoosh library where it belongs

Avoid bundling everything from many different layers in one big task library.

This is more feasible now when we don't need kallithea.CELERY_APP set at import
time.
5 files changed with 18 insertions and 14 deletions:
0 comments (0 inline, 0 general)
kallithea/controllers/admin/settings.py
Show inline comments
 
@@ -15,48 +15,49 @@
 
kallithea.controllers.admin.settings
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
settings controller for Kallithea admin
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jul 14, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import config, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
import kallithea
 
import kallithea.lib.indexers.daemon
 
from kallithea.controllers import base
 
from kallithea.lib import webutils
 
from kallithea.lib.auth import HasPermissionAnyDecorator, LoginRequired
 
from kallithea.lib.utils import repo2db_mapper, set_app_settings
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.vcs import VCSError
 
from kallithea.lib.webutils import url
 
from kallithea.model import async_tasks, db, meta
 
from kallithea.model.forms import ApplicationSettingsForm, ApplicationUiSettingsForm, ApplicationVisualisationForm
 
from kallithea.model.notification import EmailNotificationModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SettingsController(base.BaseController):
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def _before(self, *args, **kwargs):
 
        super(SettingsController, self)._before(*args, **kwargs)
 

	
 
    def _get_hg_ui_settings(self):
 
        ret = db.Ui.query().all()
 
@@ -357,49 +358,49 @@ class SettingsController(base.BaseContro
 
                    log.error(traceback.format_exc())
 
                    webutils.flash(_('Error occurred during hook creation'),
 
                            category='error')
 

	
 
                raise HTTPFound(location=url('admin_settings_hooks'))
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        c.hooks = db.Ui.get_builtin_hooks()
 
        c.custom_hooks = db.Ui.get_custom_hooks()
 

	
 
        return htmlfill.render(
 
            base.render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_search(self):
 
        c.active = 'search'
 
        if request.POST:
 
            repo_location = self._get_hg_ui_settings()['paths_root_path']
 
            full_index = request.POST.get('full_index', False)
 
            async_tasks.whoosh_index(repo_location, full_index)
 
            kallithea.lib.indexers.daemon.whoosh_index(repo_location, full_index)
 
            webutils.flash(_('Whoosh reindex task scheduled'), category='success')
 
            raise HTTPFound(location=url('admin_settings_search'))
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        return htmlfill.render(
 
            base.render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_system(self):
 
        c.active = 'system'
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        c.ini = kallithea.CONFIG
 
        server_info = db.Setting.get_server_info()
 
        for key, val in server_info.items():
 
            setattr(c, key, val)
 

	
kallithea/lib/celery_app.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
"""
 
Kallithea wrapper of Celery
 

	
 
The Celery configuration is in the Kallithea ini file but must be converted to an
 
entirely different format before Celery can use it.
 

	
 
We read the configuration from tg.config at module import time. This module can
 
thus not be imported in global scope but must be imported on demand in function
 
scope after tg.config has been initialized.
 

	
 
To make sure that the config really has been initialized, we check one of the
 
mandatory settings.
 
"""
 

	
 
import logging
 

	
 

	
 
class CeleryConfig(object):
 
    imports = ['kallithea.model.async_tasks']
 
    imports = [
 
        'kallithea.lib.indexers.daemon',
 
        'kallithea.model.async_tasks',
 
    ]
 
    task_always_eager = False
 

	
 
list_config_names = {'imports', 'accept_content'}
 

	
 

	
 
desupported = set([
 
    'broker.url',
 
    'celery.accept.content',
 
    'celery.always.eager',
 
    'celery.amqp.task.result.expires',
 
    'celeryd.concurrency',
 
    'celeryd.max.tasks.per.child',
 
    'celery.result.backend',  # Note: the .ini template used this instead of 'celery.result_backend' in 0.6
 
    'celery.result.dburi',
 
    'celery.result.serialier',
 
    'celery.result.serializer',
 
    'celery.send.task.error.emails',
 
    'celery.task_always_eager',  # still a valid configuration in celery, but not supported in Kallithea
 
    'celery.task.serializer',
 
])
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
kallithea/lib/indexers/daemon.py
Show inline comments
 
@@ -11,51 +11,53 @@
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.indexers.daemon
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
A daemon will read from task table and run tasks
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 26, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import os
 
import traceback
 
from shutil import rmtree
 
from time import mktime
 

	
 
from tg import config
 
from whoosh.index import create_in, exists_in, open_dir
 
from whoosh.qparser import QueryParser
 

	
 
from kallithea.lib import celerylib
 
from kallithea.lib.conf import INDEX_EXTENSIONS, INDEX_FILENAMES
 
from kallithea.lib.indexers import CHGSET_IDX_NAME, CHGSETS_SCHEMA, IDX_NAME, SCHEMA
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, ChangesetError, NodeDoesNotExistError, RepositoryError
 
from kallithea.model import db
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger('whoosh_indexer')
 

	
 

	
 
class WhooshIndexingDaemon(object):
 
    """
 
    Daemon for atomic indexing jobs
 
    """
 

	
 
    def __init__(self, indexname=IDX_NAME, index_location=None,
 
                 repo_location=None, repo_list=None,
 
                 repo_update_list=None):
 
        self.indexname = indexname
 

	
 
        self.index_location = index_location
 
        if not index_location:
 
            raise Exception('You have to provide index location')
 
@@ -423,24 +425,33 @@ class WhooshIndexingDaemon(object):
 
        for repo_name, repo in sorted(self.repo_paths.items()):
 
            log.debug('Updating indices for repo %s', repo_name)
 
            # skip indexing if there aren't any revisions
 
            if len(repo) < 1:
 
                continue
 

	
 
            self.index_files(file_idx_writer, repo_name, repo)
 
            self.index_changesets(chgset_idx_writer, repo_name, repo)
 

	
 
        log.debug('>> COMMITING CHANGES <<')
 
        file_idx_writer.commit(merge=True)
 
        chgset_idx_writer.commit(merge=True)
 
        log.debug('>>> FINISHED BUILDING INDEX <<<')
 

	
 
    def update_indexes(self):
 
        self.update_file_index()
 
        self.update_changeset_index()
 

	
 
    def run(self, full_index=False):
 
        """Run daemon"""
 
        if full_index or self.initial:
 
            self.build_indexes()
 
        else:
 
            self.update_indexes()
 

	
 

	
 
@celerylib.task
 
@celerylib.locked_task
 
def whoosh_index(repo_location, full_index):
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
                         .run(full_index=full_index)
kallithea/model/async_tasks.py
Show inline comments
 
@@ -20,69 +20,59 @@ by celery daemon
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Oct 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import email.message
 
import email.utils
 
import os
 
import smtplib
 
import time
 
import traceback
 
from collections import OrderedDict
 
from operator import itemgetter
 
from time import mktime
 

	
 
import celery.utils.log
 
from tg import config
 

	
 
import kallithea
 
from kallithea.lib import celerylib, conf, ext_json, hooks
 
from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
from kallithea.lib.utils2 import asbool, ascii_bytes
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.model import db, meta, repo, userlog
 

	
 

	
 
__all__ = ['whoosh_index', 'get_commits_stats', 'send_email']
 
__all__ = ['get_commits_stats', 'send_email']
 

	
 

	
 
log = celery.utils.log.get_task_logger(__name__)
 

	
 

	
 
@celerylib.task
 
@celerylib.locked_task
 
def whoosh_index(repo_location, full_index):
 
    index_location = config['index_dir']
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
                         .run(full_index=full_index)
 

	
 

	
 
def _author_username(author):
 
    """Return the username of the user identified by the email part of the 'author' string,
 
    default to the name or email.
 
    Kind of similar to h.person() ."""
 
    email = author_email(author)
 
    if email:
 
        user = db.User.get_by_email(email)
 
        if user is not None:
 
            return user.username
 
    # Still nothing?  Just pass back the author name if any, else the email
 
    return author_name(author) or email
 

	
 

	
 
@celerylib.task
 
def get_commits_stats(repo_name, ts_min_y, ts_max_y, recurse_limit=100):
 
    lockkey = celerylib.__get_lockkey('get_commits_stats', repo_name, ts_min_y,
 
                            ts_max_y)
 
    log.info('running task with lockkey %s', lockkey)
 
    try:
 
        lock = celerylib.DaemonLock(os.path.join(config['cache_dir'], lockkey))
 

	
 
        co_day_auth_aggr = {}
 
        commits_by_day_aggregate = {}
 
        db_repo = db.Repository.get_by_repo_name(repo_name)
scripts/deps.py
Show inline comments
 
@@ -138,49 +138,48 @@ kallithea.lib.auth_modules
 
kallithea.lib.celerylib
 
kallithea.lib.db_manage
 
kallithea.lib.helpers
 
kallithea.lib.hooks
 
kallithea.lib.indexers
 
kallithea.lib.utils
 
kallithea.lib.utils2
 
kallithea.lib.vcs
 
kallithea.lib.webutils
 
kallithea.model
 
kallithea.model.async_tasks
 
kallithea.model.scm
 
kallithea.templates.py
 
'''.split())
 

	
 
shown_modules = normal_modules | top_modules
 

	
 
# break the chains somehow - this is a cleanup TODO list
 
known_violations = set([
 
('kallithea.lib.auth_modules', 'kallithea.lib.auth'),  # needs base&facade
 
('kallithea.lib.utils', 'kallithea.model'),  # clean up utils
 
('kallithea.lib.utils', 'kallithea.model.db'),
 
('kallithea.lib.utils', 'kallithea.model.scm'),
 
('kallithea.model.async_tasks', 'kallithea.lib.hooks'),
 
('kallithea.model.async_tasks', 'kallithea.lib.indexers'),
 
('kallithea.model.async_tasks', 'kallithea.model'),
 
('kallithea.model', 'kallithea.lib.auth'),  # auth.HasXXX
 
('kallithea.model', 'kallithea.lib.auth_modules'),  # validators
 
('kallithea.model', 'kallithea.lib.hooks'),  # clean up hooks
 
('kallithea.model', 'kallithea.model.scm'),
 
('kallithea.model.scm', 'kallithea.lib.hooks'),
 
])
 

	
 
extra_edges = [
 
('kallithea.config', 'kallithea.controllers'),  # through TG
 
('kallithea.lib.auth', 'kallithea.lib.auth_modules'),  # custom loader
 
]
 

	
 

	
 
def normalize(s):
 
    """Given a string with dot path, return the string it should be shown as."""
 
    parts = s.replace('.__init__', '').split('.')
 
    short_2 = '.'.join(parts[:2])
 
    short_3 = '.'.join(parts[:3])
 
    short_4 = '.'.join(parts[:4])
 
    if parts[0] in ['scripts', 'contributor_data', 'i18n_utils']:
 
        return 'scripts'
 
    if short_3 == 'kallithea.model.meta':
 
        return 'kallithea.model.db'
0 comments (0 inline, 0 general)