@@ -29,6 +29,7 @@ import socket
import traceback
import logging
from os.path import dirname as dn, join as jn
from pylons import config
from hashlib import md5
from decorator import decorator
@@ -37,15 +38,17 @@ from vcs.utils.lazy import LazyProperty
from rhodecode import CELERY_ON
from rhodecode.lib import str2bool, safe_str
from rhodecode.lib.pidlock import DaemonLock, LockHeld
from rhodecode.model import init_model
from rhodecode.model import meta
from rhodecode.model.db import Statistics, Repository, User
from sqlalchemy import engine_from_config
from celery.messaging import establish_connection
log = logging.getLogger(__name__)
class ResultWrapper(object):
def __init__(self, task):
self.task = task
@@ -103,3 +106,22 @@ def locked_task(func):
return 'Task with key %s already running' % lockkey
return decorator(__wrapper, func)
def get_session():
if CELERY_ON:
engine = engine_from_config(config, 'sqlalchemy.db1.')
init_model(engine)
sa = meta.Session
return sa
def dbsession(func):
def __wrapper(func, *fargs, **fkwargs):
try:
ret = func(*fargs, **fkwargs)
return ret
finally:
meta.Session.remove()
@@ -41,18 +41,15 @@ from vcs import get_backend
from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
__get_lockkey, LockHeld, DaemonLock
from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
from rhodecode.lib.helpers import person
from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
from rhodecode.lib.utils import add_cache, action_logger
from rhodecode.lib.compat import json, OrderedDict
add_cache(config)
@@ -60,13 +57,6 @@ __all__ = ['whoosh_index', 'get_commits_
'reset_user_password', 'send_email']
def get_logger(cls):
@@ -81,21 +71,23 @@ def get_logger(cls):
@task(ignore_result=True)
@locked_task
@dbsession
def whoosh_index(repo_location, full_index):
from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
# log = whoosh_index.get_logger(whoosh_index)
log = whoosh_index.get_logger(whoosh_index)
DBS = get_session()
index_location = config['index_dir']
WhooshIndexingDaemon(index_location=index_location,
repo_location=repo_location, sa=get_session())\
repo_location=repo_location, sa=DBS)\
.run(full_index=full_index)
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
log = get_logger(get_commits_stats)
lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
ts_max_y)
lockkey_path = config['here']
@@ -103,7 +95,6 @@ def get_commits_stats(repo_name, ts_min_
log.info('running task with lockkey %s', lockkey)
sa = get_session()
lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
# for js data compatibilty cleans the key for person from '
@@ -128,9 +119,9 @@ def get_commits_stats(repo_name, ts_min_
last_cs = None
timegetter = itemgetter('time')
dbrepo = sa.query(Repository)\
dbrepo = DBS.query(Repository)\
.filter(Repository.repo_name == repo_name).scalar()
cur_stats = sa.query(Statistics)\
cur_stats = DBS.query(Statistics)\
.filter(Statistics.repository == dbrepo).scalar()
if cur_stats is not None:
@@ -234,11 +225,11 @@ def get_commits_stats(repo_name, ts_min_
stats.repository = dbrepo
stats.stat_on_revision = last_cs.revision if last_cs else 0
sa.add(stats)
sa.commit()
DBS.add(stats)
DBS.commit()
except:
log.error(traceback.format_exc())
sa.rollback()
DBS.rollback()
lock.release()
return False
@@ -254,13 +245,14 @@ def get_commits_stats(repo_name, ts_min_
def send_password_link(user_email):
from rhodecode.model.notification import EmailNotificationModel
log = get_logger(send_password_link)
user = User.get_by_email(user_email)
if user:
log.debug('password reset user found %s' % user)
@@ -283,28 +275,29 @@ def send_password_link(user_email):
return True
def reset_user_password(user_email):
from rhodecode.lib import auth
log = get_logger(reset_user_password)
new_passwd = auth.PasswordGenerator().gen_password(8,
auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
user.password = auth.get_crypt_password(new_passwd)
user.api_key = auth.generate_api_key(user.username)
sa.add(user)
DBS.add(user)
log.info('change password for %s', user_email)
if new_passwd is None:
raise Exception('unable to generate new password')
run_task(send_email, user_email,
'Your new password',
@@ -319,6 +312,7 @@ def reset_user_password(user_email):
def send_email(recipients, subject, body, html_body=''):
"""
Sends an email with defined parameters from the .ini files.
@@ -330,7 +324,8 @@ def send_email(recipients, subject, body
:param html_body: html version of body
log = get_logger(send_email)
email_config = config
subject = "%s %s" % (email_config.get('email_prefix'), subject)
if not recipients:
@@ -361,6 +356,7 @@ def send_email(recipients, subject, body
def create_repo_fork(form_data, cur_user):
Creates a fork of repository using interval VCS methods
@@ -371,11 +367,11 @@ def create_repo_fork(form_data, cur_user
from rhodecode.model.repo import RepoModel
log = get_logger(create_repo_fork)
Session = get_session()
DBS = create_repo_fork.DBS
base_path = Repository.base_path()
RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
alias = form_data['repo_type']
org_repo_name = form_data['org_path']
@@ -391,12 +387,12 @@ def create_repo_fork(form_data, cur_user
src_url=safe_str(source_repo_path),
update_after_clone=update_after_clone)
action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
org_repo_name, '', Session)
org_repo_name, '', DBS)
action_logger(cur_user, 'user_created_fork:%s' % fork_name,
fork_name, '', Session)
fork_name, '', DBS)
# finally commit at latest possible stage
Session.commit()
def __get_codes_stats(repo_name):
repo = Repository.get_by_repo_name(repo_name).scm_instance
Status change: