@@ -26,29 +26,32 @@
import os
import sys
import socket
import traceback
import logging
from os.path import dirname as dn, join as jn
from pylons import config
from hashlib import md5
from decorator import decorator
from vcs.utils.lazy import LazyProperty
from rhodecode import CELERY_ON
from rhodecode.lib import str2bool, safe_str
from rhodecode.lib.pidlock import DaemonLock, LockHeld
from rhodecode.model import init_model
from rhodecode.model import meta
from rhodecode.model.db import Statistics, Repository, User
from sqlalchemy import engine_from_config
from celery.messaging import establish_connection
log = logging.getLogger(__name__)
class ResultWrapper(object):
def __init__(self, task):
self.task = task
@LazyProperty
def result(self):
@@ -100,6 +103,25 @@ def locked_task(func):
return ret
except LockHeld:
log.info('LockHeld')
return 'Task with key %s already running' % lockkey
return decorator(__wrapper, func)
def get_session():
if CELERY_ON:
engine = engine_from_config(config, 'sqlalchemy.db1.')
init_model(engine)
sa = meta.Session
return sa
def dbsession(func):
def __wrapper(func, *fargs, **fkwargs):
try:
ret = func(*fargs, **fkwargs)
finally:
meta.Session.remove()
@@ -38,38 +38,28 @@ from pylons import config, url
from pylons.i18n.translation import _
from vcs import get_backend
from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str
from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \
__get_lockkey, LockHeld, DaemonLock
from rhodecode.lib.celerylib import run_task, locked_task, dbsession, \
str2bool, __get_lockkey, LockHeld, DaemonLock, get_session
from rhodecode.lib.helpers import person
from rhodecode.lib.rcmail.smtp_mailer import SmtpMailer
from rhodecode.lib.utils import add_cache, action_logger
from rhodecode.lib.compat import json, OrderedDict
add_cache(config)
__all__ = ['whoosh_index', 'get_commits_stats',
'reset_user_password', 'send_email']
def get_logger(cls):
log = cls.get_logger()
except:
@@ -78,35 +68,36 @@ def get_logger(cls):
return log
@task(ignore_result=True)
@locked_task
@dbsession
def whoosh_index(repo_location, full_index):
from rhodecode.lib.indexers.daemon import WhooshIndexingDaemon
# log = whoosh_index.get_logger(whoosh_index)
log = whoosh_index.get_logger(whoosh_index)
DBS = get_session()
index_location = config['index_dir']
WhooshIndexingDaemon(index_location=index_location,
repo_location=repo_location, sa=get_session())\
repo_location=repo_location, sa=DBS)\
.run(full_index=full_index)
def get_commits_stats(repo_name, ts_min_y, ts_max_y):
log = get_logger(get_commits_stats)
lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y,
ts_max_y)
lockkey_path = config['here']
log.info('running task with lockkey %s', lockkey)
sa = get_session()
lock = l = DaemonLock(file_=jn(lockkey_path, lockkey))
# for js data compatibilty cleans the key for person from '
akc = lambda k: person(k).replace('"', "")
co_day_auth_aggr = {}
@@ -125,15 +116,15 @@ def get_commits_stats(repo_name, ts_min_
skip_date_limit = True
parse_limit = int(config['app_conf'].get('commit_parse_limit'))
last_rev = None
last_cs = None
timegetter = itemgetter('time')
dbrepo = sa.query(Repository)\
dbrepo = DBS.query(Repository)\
.filter(Repository.repo_name == repo_name).scalar()
cur_stats = sa.query(Statistics)\
cur_stats = DBS.query(Statistics)\
.filter(Statistics.repository == dbrepo).scalar()
if cur_stats is not None:
last_rev = cur_stats.stat_on_revision
if last_rev == repo.get_changeset().revision and repo_size > 1:
@@ -231,17 +222,17 @@ def get_commits_stats(repo_name, ts_min_
log.debug('getting code trending stats')
stats.languages = json.dumps(__get_codes_stats(repo_name))
stats.repository = dbrepo
stats.stat_on_revision = last_cs.revision if last_cs else 0
sa.add(stats)
sa.commit()
DBS.add(stats)
DBS.commit()
log.error(traceback.format_exc())
sa.rollback()
DBS.rollback()
lock.release()
return False
#final release
@@ -251,19 +242,20 @@ def get_commits_stats(repo_name, ts_min_
return True
def send_password_link(user_email):
from rhodecode.model.notification import EmailNotificationModel
log = get_logger(send_password_link)
user = User.get_by_email(user_email)
if user:
log.debug('password reset user found %s' % user)
link = url('reset_password_confirmation', key=user.api_key,
qualified=True)
reg_type = EmailNotificationModel.TYPE_PASSWORD_RESET
@@ -280,34 +272,35 @@ def send_password_link(user_email):
def reset_user_password(user_email):
from rhodecode.lib import auth
log = get_logger(reset_user_password)
new_passwd = auth.PasswordGenerator().gen_password(8,
auth.PasswordGenerator.ALPHABETS_BIG_SMALL)
user.password = auth.get_crypt_password(new_passwd)
user.api_key = auth.generate_api_key(user.username)
sa.add(user)
DBS.add(user)
log.info('change password for %s', user_email)
if new_passwd is None:
raise Exception('unable to generate new password')
run_task(send_email, user_email,
'Your new password',
'Your new RhodeCode password:%s' % (new_passwd))
log.info('send new password mail to %s', user_email)
@@ -316,24 +309,26 @@ def reset_user_password(user_email):
def send_email(recipients, subject, body, html_body=''):
"""
Sends an email with defined parameters from the .ini files.
:param recipients: list of recipients, it this is empty the defined email
address from field 'email_to' is used instead
:param subject: subject of the mail
:param body: body of the mail
:param html_body: html version of body
log = get_logger(send_email)
email_config = config
subject = "%s %s" % (email_config.get('email_prefix'), subject)
if not recipients:
# if recipients are not defined we send to email_config + all admins
admins = [u.email for u in User.query()
.filter(User.admin == True).all()]
@@ -358,27 +353,28 @@ def send_email(recipients, subject, body
def create_repo_fork(form_data, cur_user):
Creates a fork of repository using interval VCS methods
:param form_data:
:param cur_user:
from rhodecode.model.repo import RepoModel
log = get_logger(create_repo_fork)
Session = get_session()
DBS = create_repo_fork.DBS
base_path = Repository.base_path()
RepoModel(Session).create(form_data, cur_user, just_db=True, fork=True)
RepoModel(DBS).create(form_data, cur_user, just_db=True, fork=True)
alias = form_data['repo_type']
org_repo_name = form_data['org_path']
fork_name = form_data['repo_name_full']
update_after_clone = form_data['update_after_clone']
source_repo_path = os.path.join(base_path, org_repo_name)
@@ -388,18 +384,18 @@ def create_repo_fork(form_data, cur_user
destination_fork_path)
backend = get_backend(alias)
backend(safe_str(destination_fork_path), create=True,
src_url=safe_str(source_repo_path),
update_after_clone=update_after_clone)
action_logger(cur_user, 'user_forked_repo:%s' % fork_name,
org_repo_name, '', Session)
org_repo_name, '', DBS)
action_logger(cur_user, 'user_created_fork:%s' % fork_name,
fork_name, '', Session)
fork_name, '', DBS)
# finally commit at latest possible stage
Session.commit()
def __get_codes_stats(repo_name):
repo = Repository.get_by_repo_name(repo_name).scm_instance
tip = repo.get_changeset()
code_stats = {}
Status change: