diff --git a/rhodecode/lib/celerylib/tasks.py b/rhodecode/lib/celerylib/tasks.py --- a/rhodecode/lib/celerylib/tasks.py +++ b/rhodecode/lib/celerylib/tasks.py @@ -5,10 +5,10 @@ RhodeCode task modules, containing all task that suppose to be run by celery daemon - + :created_on: Oct 6, 2010 :author: marcink - :copyright: (C) 2009-2011 Marcin Kuzminski + :copyright: (C) 2009-2011 Marcin Kuzminski :license: GPLv3, see COPYING for more details. """ # This program is free software: you can redistribute it and/or modify @@ -28,20 +28,25 @@ from celery.decorators import task import os import traceback import logging +from os.path import dirname as dn, join as jn from time import mktime from operator import itemgetter +from string import lower -from pylons import config +from pylons import config, url from pylons.i18n.translation import _ -from rhodecode.lib.celerylib import run_task, locked_task, str2bool +from rhodecode.lib import LANGUAGES_EXTENSIONS_MAP, safe_str +from rhodecode.lib.celerylib import run_task, locked_task, str2bool, \ + __get_lockkey, LockHeld, DaemonLock from rhodecode.lib.helpers import person from rhodecode.lib.smtp_mailer import SmtpMailer -from rhodecode.lib.utils import OrderedDict, add_cache +from rhodecode.lib.utils import add_cache +from rhodecode.lib.odict import OrderedDict from rhodecode.model import init_model from rhodecode.model import meta -from rhodecode.model.db import RhodeCodeUi +from rhodecode.model.db import RhodeCodeUi, Statistics, Repository from vcs.backends import get_repo @@ -60,6 +65,7 @@ __all__ = ['whoosh_index', 'get_commits_ CELERY_ON = str2bool(config['app_conf'].get('use_celery')) + def get_session(): if CELERY_ON: engine = engine_from_config(config, 'sqlalchemy.db1.') @@ -67,11 +73,13 @@ def get_session(): sa = meta.Session() return sa + def get_repos_path(): sa = get_session() q = sa.query(RhodeCodeUi).filter(RhodeCodeUi.ui_key == '/').one() return q.ui_value + @task(ignore_result=True) @locked_task def whoosh_index(repo_location, full_index): @@ -82,143 +90,202 @@ def whoosh_index(repo_location, full_ind repo_location=repo_location, sa=get_session())\ .run(full_index=full_index) + @task(ignore_result=True) -@locked_task def get_commits_stats(repo_name, ts_min_y, ts_max_y): try: log = get_commits_stats.get_logger() except: log = logging.getLogger(__name__) - from rhodecode.model.db import Statistics, Repository + lockkey = __get_lockkey('get_commits_stats', repo_name, ts_min_y, + ts_max_y) + lockkey_path = dn(dn(dn(dn(os.path.abspath(__file__))))) + log.info('running task with lockkey %s', lockkey) + try: + lock = l = DaemonLock(jn(lockkey_path, lockkey)) - #for js data compatibilty - author_key_cleaner = lambda k: person(k).replace('"', "") + #for js data compatibilty cleans the key for person from ' + akc = lambda k: person(k).replace('"', "") - commits_by_day_author_aggregate = {} - commits_by_day_aggregate = {} - repos_path = get_repos_path() - p = os.path.join(repos_path, repo_name) - repo = get_repo(p) + co_day_auth_aggr = {} + commits_by_day_aggregate = {} + repos_path = get_repos_path() + repo = get_repo(safe_str(os.path.join(repos_path, repo_name))) + repo_size = len(repo.revisions) + #return if repo have no revisions + if repo_size < 1: + lock.release() + return True - skip_date_limit = True - parse_limit = 250 #limit for single task changeset parsing optimal for - last_rev = 0 - last_cs = None - timegetter = itemgetter('time') + skip_date_limit = True + parse_limit = int(config['app_conf'].get('commit_parse_limit')) + last_rev = 0 + last_cs = None + timegetter = itemgetter('time') - sa = get_session() + sa = get_session() - dbrepo = sa.query(Repository)\ - .filter(Repository.repo_name == repo_name).scalar() - cur_stats = sa.query(Statistics)\ - .filter(Statistics.repository == dbrepo).scalar() - if cur_stats: - last_rev = cur_stats.stat_on_revision - if not repo.revisions: - return True + dbrepo = sa.query(Repository)\ + .filter(Repository.repo_name == repo_name).scalar() + cur_stats = sa.query(Statistics)\ + .filter(Statistics.repository == dbrepo).scalar() - if last_rev == repo.revisions[-1] and len(repo.revisions) > 1: - #pass silently without any work if we're not on first revision or - #current state of parsing revision(from db marker) is the last revision - return True + if cur_stats is not None: + last_rev = cur_stats.stat_on_revision + + if last_rev == repo.get_changeset().revision and repo_size > 1: + #pass silently without any work if we're not on first revision or + #current state of parsing revision(from db marker) is the + #last revision + lock.release() + return True - if cur_stats: - commits_by_day_aggregate = OrderedDict( - json.loads( + if cur_stats: + commits_by_day_aggregate = OrderedDict(json.loads( cur_stats.commit_activity_combined)) - commits_by_day_author_aggregate = json.loads(cur_stats.commit_activity) + co_day_auth_aggr = json.loads(cur_stats.commit_activity) - log.debug('starting parsing %s', parse_limit) - lmktime = mktime + log.debug('starting parsing %s', parse_limit) + lmktime = mktime + + last_rev = last_rev + 1 if last_rev > 0 else last_rev - last_rev = last_rev + 1 if last_rev > 0 else last_rev - for rev in repo.revisions[last_rev:last_rev + parse_limit]: - last_cs = cs = repo.get_changeset(rev) - k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1], - cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0]) + for cs in repo[last_rev:last_rev + parse_limit]: + last_cs = cs # remember last parsed changeset + k = lmktime([cs.date.timetuple()[0], cs.date.timetuple()[1], + cs.date.timetuple()[2], 0, 0, 0, 0, 0, 0]) - if commits_by_day_author_aggregate.has_key(author_key_cleaner(cs.author)): - try: - l = [timegetter(x) for x in commits_by_day_author_aggregate\ - [author_key_cleaner(cs.author)]['data']] - time_pos = l.index(k) - except ValueError: - time_pos = False + if akc(cs.author) in co_day_auth_aggr: + try: + l = [timegetter(x) for x in + co_day_auth_aggr[akc(cs.author)]['data']] + time_pos = l.index(k) + except ValueError: + time_pos = False + + if time_pos >= 0 and time_pos is not False: + + datadict = \ + co_day_auth_aggr[akc(cs.author)]['data'][time_pos] - if time_pos >= 0 and time_pos is not False: + datadict["commits"] += 1 + datadict["added"] += len(cs.added) + datadict["changed"] += len(cs.changed) + datadict["removed"] += len(cs.removed) - datadict = commits_by_day_author_aggregate\ - [author_key_cleaner(cs.author)]['data'][time_pos] + else: + if k >= ts_min_y and k <= ts_max_y or skip_date_limit: - datadict["commits"] += 1 - datadict["added"] += len(cs.added) - datadict["changed"] += len(cs.changed) - datadict["removed"] += len(cs.removed) + datadict = {"time": k, + "commits": 1, + "added": len(cs.added), + "changed": len(cs.changed), + "removed": len(cs.removed), + } + co_day_auth_aggr[akc(cs.author)]['data']\ + .append(datadict) else: if k >= ts_min_y and k <= ts_max_y or skip_date_limit: + co_day_auth_aggr[akc(cs.author)] = { + "label": akc(cs.author), + "data": [{"time":k, + "commits":1, + "added":len(cs.added), + "changed":len(cs.changed), + "removed":len(cs.removed), + }], + "schema": ["commits"], + } - datadict = {"time":k, - "commits":1, - "added":len(cs.added), - "changed":len(cs.changed), - "removed":len(cs.removed), - } - commits_by_day_author_aggregate\ - [author_key_cleaner(cs.author)]['data'].append(datadict) + #gather all data by day + if k in commits_by_day_aggregate: + commits_by_day_aggregate[k] += 1 + else: + commits_by_day_aggregate[k] = 1 - else: - if k >= ts_min_y and k <= ts_max_y or skip_date_limit: - commits_by_day_author_aggregate[author_key_cleaner(cs.author)] = { - "label":author_key_cleaner(cs.author), - "data":[{"time":k, - "commits":1, - "added":len(cs.added), - "changed":len(cs.changed), - "removed":len(cs.removed), - }], - "schema":["commits"], - } + overview_data = sorted(commits_by_day_aggregate.items(), + key=itemgetter(0)) + + if not co_day_auth_aggr: + co_day_auth_aggr[akc(repo.contact)] = { + "label": akc(repo.contact), + "data": [0, 1], + "schema": ["commits"], + } + + stats = cur_stats if cur_stats else Statistics() + stats.commit_activity = json.dumps(co_day_auth_aggr) + stats.commit_activity_combined = json.dumps(overview_data) + + log.debug('last revison %s', last_rev) + leftovers = len(repo.revisions[last_rev:]) + log.debug('revisions to parse %s', leftovers) - #gather all data by day - if commits_by_day_aggregate.has_key(k): - commits_by_day_aggregate[k] += 1 - else: - commits_by_day_aggregate[k] = 1 + if last_rev == 0 or leftovers < parse_limit: + log.debug('getting code trending stats') + stats.languages = json.dumps(__get_codes_stats(repo_name)) - overview_data = sorted(commits_by_day_aggregate.items(), key=itemgetter(0)) - if not commits_by_day_author_aggregate: - commits_by_day_author_aggregate[author_key_cleaner(repo.contact)] = { - "label":author_key_cleaner(repo.contact), - "data":[0, 1], - "schema":["commits"], - } + try: + stats.repository = dbrepo + stats.stat_on_revision = last_cs.revision if last_cs else 0 + sa.add(stats) + sa.commit() + except: + log.error(traceback.format_exc()) + sa.rollback() + lock.release() + return False + + #final release + lock.release() - stats = cur_stats if cur_stats else Statistics() - stats.commit_activity = json.dumps(commits_by_day_author_aggregate) - stats.commit_activity_combined = json.dumps(overview_data) + #execute another task if celery is enabled + if len(repo.revisions) > 1 and CELERY_ON: + run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y) + return True + except LockHeld: + log.info('LockHeld') + return 'Task with key %s already running' % lockkey - log.debug('last revison %s', last_rev) - leftovers = len(repo.revisions[last_rev:]) - log.debug('revisions to parse %s', leftovers) +@task(ignore_result=True) +def send_password_link(user_email): + try: + log = reset_user_password.get_logger() + except: + log = logging.getLogger(__name__) - if last_rev == 0 or leftovers < parse_limit: - log.debug('getting code trending stats') - stats.languages = json.dumps(__get_codes_stats(repo_name)) - - stats.repository = dbrepo - stats.stat_on_revision = last_cs.revision if last_cs else 0 + from rhodecode.lib import auth + from rhodecode.model.db import User try: - sa.add(stats) - sa.commit() + sa = get_session() + user = sa.query(User).filter(User.email == user_email).scalar() + + if user: + link = url('reset_password_confirmation', key=user.api_key, + qualified=True) + tmpl = """ +Hello %s + +We received a request to create a new password for your account. + +You can generate it by clicking following URL: + +%s + +If you didn't request new password please ignore this email. + """ + run_task(send_email, user_email, + "RhodeCode password reset link", + tmpl % (user.short_contact, link)) + log.info('send new password mail to %s', user_email) + except: + log.error('Failed to update user password') log.error(traceback.format_exc()) - sa.rollback() return False - if len(repo.revisions) > 1: - run_task(get_commits_stats, repo_name, ts_min_y, ts_max_y) return True @@ -240,6 +307,7 @@ def reset_user_password(user_email): auth.PasswordGenerator.ALPHABETS_BIG_SMALL) if user: user.password = auth.get_crypt_password(new_passwd) + user.api_key = auth.generate_api_key(user.username) sa.add(user) sa.commit() log.info('change password for %s', user_email) @@ -251,23 +319,22 @@ def reset_user_password(user_email): sa.rollback() run_task(send_email, user_email, - "Your new rhodecode password", - 'Your new rhodecode password:%s' % (new_passwd)) + "Your new RhodeCode password", + 'Your new RhodeCode password:%s' % (new_passwd)) log.info('send new password mail to %s', user_email) - except: log.error('Failed to update user password') log.error(traceback.format_exc()) return True + @task(ignore_result=True) def send_email(recipients, subject, body): """ Sends an email with defined parameters from the .ini files. - - + :param recipients: list of recipients, it this is empty the defined email address from field 'email_to' is used instead :param subject: subject of the mail @@ -290,10 +357,11 @@ def send_email(recipients, subject, body mail_port = email_config.get('smtp_port') tls = str2bool(email_config.get('smtp_use_tls')) ssl = str2bool(email_config.get('smtp_use_ssl')) + debug = str2bool(config.get('debug')) try: m = SmtpMailer(mail_from, user, passwd, mail_server, - mail_port, ssl, tls) + mail_port, ssl, tls, debug=debug) m.send(recipients, subject, body) except: log.error('Mail sending failed') @@ -301,16 +369,17 @@ def send_email(recipients, subject, body return False return True + @task(ignore_result=True) def create_repo_fork(form_data, cur_user): + from rhodecode.model.repo import RepoModel + from vcs import get_backend + try: log = create_repo_fork.get_logger() except: log = logging.getLogger(__name__) - from rhodecode.model.repo import RepoModel - from vcs import get_backend - repo_model = RepoModel(get_session()) repo_model.create(form_data, cur_user, just_db=True, fork=True) repo_name = form_data['repo_name'] @@ -323,81 +392,22 @@ def create_repo_fork(form_data, cur_user backend = get_backend(alias) backend(str(repo_fork_path), create=True, src_url=str(repo_path)) + def __get_codes_stats(repo_name): - LANGUAGES_EXTENSIONS_MAP = {'scm': 'Scheme', 'asmx': 'VbNetAspx', 'Rout': - 'RConsole', 'rest': 'Rst', 'abap': 'ABAP', 'go': 'Go', 'phtml': 'HtmlPhp', - 'ns2': 'Newspeak', 'xml': 'EvoqueXml', 'sh-session': 'BashSession', 'ads': - 'Ada', 'clj': 'Clojure', 'll': 'Llvm', 'ebuild': 'Bash', 'adb': 'Ada', - 'ada': 'Ada', 'c++-objdump': 'CppObjdump', 'aspx': - 'VbNetAspx', 'ksh': 'Bash', 'coffee': 'CoffeeScript', 'vert': 'GLShader', - 'Makefile.*': 'Makefile', 'di': 'D', 'dpatch': 'DarcsPatch', 'rake': - 'Ruby', 'moo': 'MOOCode', 'erl-sh': 'ErlangShell', 'geo': 'GLShader', - 'pov': 'Povray', 'bas': 'VbNet', 'bat': 'Batch', 'd': 'D', 'lisp': - 'CommonLisp', 'h': 'C', 'rbx': 'Ruby', 'tcl': 'Tcl', 'c++': 'Cpp', 'md': - 'MiniD', '.vimrc': 'Vim', 'xsd': 'Xml', 'ml': 'Ocaml', 'el': 'CommonLisp', - 'befunge': 'Befunge', 'xsl': 'Xslt', 'pyx': 'Cython', 'cfm': - 'ColdfusionHtml', 'evoque': 'Evoque', 'cfg': 'Ini', 'htm': 'Html', - 'Makefile': 'Makefile', 'cfc': 'ColdfusionHtml', 'tex': 'Tex', 'cs': - 'CSharp', 'mxml': 'Mxml', 'patch': 'Diff', 'apache.conf': 'ApacheConf', - 'scala': 'Scala', 'applescript': 'AppleScript', 'GNUmakefile': 'Makefile', - 'c-objdump': 'CObjdump', 'lua': 'Lua', 'apache2.conf': 'ApacheConf', 'rb': - 'Ruby', 'gemspec': 'Ruby', 'rl': 'RagelObjectiveC', 'vala': 'Vala', 'tmpl': - 'Cheetah', 'bf': 'Brainfuck', 'plt': 'Gnuplot', 'G': 'AntlrRuby', 'xslt': - 'Xslt', 'flxh': 'Felix', 'asax': 'VbNetAspx', 'Rakefile': 'Ruby', 'S': 'S', - 'wsdl': 'Xml', 'js': 'Javascript', 'autodelegate': 'Myghty', 'properties': - 'Ini', 'bash': 'Bash', 'c': 'C', 'g': 'AntlrRuby', 'r3': 'Rebol', 's': - 'Gas', 'ashx': 'VbNetAspx', 'cxx': 'Cpp', 'boo': 'Boo', 'prolog': 'Prolog', - 'sqlite3-console': 'SqliteConsole', 'cl': 'CommonLisp', 'cc': 'Cpp', 'pot': - 'Gettext', 'vim': 'Vim', 'pxi': 'Cython', 'yaml': 'Yaml', 'SConstruct': - 'Python', 'diff': 'Diff', 'txt': 'Text', 'cw': 'Redcode', 'pxd': 'Cython', - 'plot': 'Gnuplot', 'java': 'Java', 'hrl': 'Erlang', 'py': 'Python', - 'makefile': 'Makefile', 'squid.conf': 'SquidConf', 'asm': 'Nasm', 'toc': - 'Tex', 'kid': 'Genshi', 'rhtml': 'Rhtml', 'po': 'Gettext', 'pl': 'Prolog', - 'pm': 'Perl', 'hx': 'Haxe', 'ascx': 'VbNetAspx', 'ooc': 'Ooc', 'asy': - 'Asymptote', 'hs': 'Haskell', 'SConscript': 'Python', 'pytb': - 'PythonTraceback', 'myt': 'Myghty', 'hh': 'Cpp', 'R': 'S', 'aux': 'Tex', - 'rst': 'Rst', 'cpp-objdump': 'CppObjdump', 'lgt': 'Logtalk', 'rss': 'Xml', - 'flx': 'Felix', 'b': 'Brainfuck', 'f': 'Fortran', 'rbw': 'Ruby', - '.htaccess': 'ApacheConf', 'cxx-objdump': 'CppObjdump', 'j': 'ObjectiveJ', - 'mll': 'Ocaml', 'yml': 'Yaml', 'mu': 'MuPAD', 'r': 'Rebol', 'ASM': 'Nasm', - 'erl': 'Erlang', 'mly': 'Ocaml', 'mo': 'Modelica', 'def': 'Modula2', 'ini': - 'Ini', 'control': 'DebianControl', 'vb': 'VbNet', 'vapi': 'Vala', 'pro': - 'Prolog', 'spt': 'Cheetah', 'mli': 'Ocaml', 'as': 'ActionScript3', 'cmd': - 'Batch', 'cpp': 'Cpp', 'io': 'Io', 'tac': 'Python', 'haml': 'Haml', 'rkt': - 'Racket', 'st':'Smalltalk', 'inc': 'Povray', 'pas': 'Delphi', 'cmake': - 'CMake', 'csh':'Tcsh', 'hpp': 'Cpp', 'feature': 'Gherkin', 'html': 'Html', - 'php':'Php', 'php3':'Php', 'php4':'Php', 'php5':'Php', 'xhtml': 'Html', - 'hxx': 'Cpp', 'eclass': 'Bash', 'css': 'Css', - 'frag': 'GLShader', 'd-objdump': 'DObjdump', 'weechatlog': 'IrcLogs', - 'tcsh': 'Tcsh', 'objdump': 'Objdump', 'pyw': 'Python', 'h++': 'Cpp', - 'py3tb': 'Python3Traceback', 'jsp': 'Jsp', 'sql': 'Sql', 'mak': 'Makefile', - 'php': 'Php', 'mao': 'Mako', 'man': 'Groff', 'dylan': 'Dylan', 'sass': - 'Sass', 'cfml': 'ColdfusionHtml', 'darcspatch': 'DarcsPatch', 'tpl': - 'Smarty', 'm': 'ObjectiveC', 'f90': 'Fortran', 'mod': 'Modula2', 'sh': - 'Bash', 'lhs': 'LiterateHaskell', 'sources.list': 'SourcesList', 'axd': - 'VbNetAspx', 'sc': 'Python'} - repos_path = get_repos_path() - p = os.path.join(repos_path, repo_name) - repo = get_repo(p) + repo = get_repo(safe_str(os.path.join(repos_path, repo_name))) tip = repo.get_changeset() code_stats = {} def aggregate(cs): for f in cs[2]: - ext = f.extension - key = LANGUAGES_EXTENSIONS_MAP.get(ext, ext) - key = key or ext + ext = lower(f.extension) if ext in LANGUAGES_EXTENSIONS_MAP.keys() and not f.is_binary: - if code_stats.has_key(key): - code_stats[key] += 1 + if ext in code_stats: + code_stats[ext] += 1 else: - code_stats[key] = 1 + code_stats[ext] = 1 map(aggregate, tip.walk('/')) return code_stats or {} - - - -