Changeset - f3fab7b124f2
kallithea/config/middleware/pygrack.py
Show inline comments
 
@@ -15,49 +15,49 @@
 
kallithea.lib.middleware.pygrack
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Python implementation of git-http-backend's Smart HTTP protocol
 

	
 
Based on original code from git_http_backend.py project.
 

	
 
Copyright (c) 2010 Daniel Dotsenko <dotsa@hotmail.com>
 
Copyright (c) 2012 Marcin Kuzminski <marcin@python-works.com>
 

	
 
This file was forked by the Kallithea project in July 2014.
 
"""
 

	
 
import logging
 
import os
 
import socket
 
import traceback
 

	
 
from dulwich.server import update_server_info
 
from dulwich.web import GunzipFilter, LimitedInputFilter
 
from webob import Request, Response, exc
 

	
 
import kallithea
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs import get_repo, subprocessio
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FileWrapper(object):
 

	
 
    def __init__(self, fd, content_length):
 
        self.fd = fd
 
        self.content_length = content_length
 
        self.remain = content_length
 

	
 
    def read(self, size):
 
        if size <= self.remain:
 
            try:
 
                data = self.fd.read(size)
 
            except socket.error:
 
                raise IOError(self)
 
            self.remain -= size
 
        elif self.remain:
 
            data = self.fd.read(self.remain)
 
            self.remain = 0
 
        else:
 
            data = None
 
@@ -149,50 +149,48 @@ class GitRepository(object):
 
            inputstream = FileWrapper(environ['wsgi.input'],
 
                                      req.content_length)
 
        else:
 
            inputstream = environ['wsgi.input']
 

	
 
        gitenv = dict(os.environ)
 
        # forget all configs
 
        gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
 
        cmd = [_git_path, git_command[4:], '--stateless-rpc', self.content_path]
 
        log.debug('handling cmd %s', cmd)
 
        try:
 
            out = subprocessio.SubprocessIOChunker(
 
                cmd,
 
                inputstream=inputstream,
 
                env=gitenv,
 
                cwd=self.content_path,
 
            )
 
        except EnvironmentError as e:
 
            log.error(traceback.format_exc())
 
            raise exc.HTTPExpectationFailed()
 

	
 
        if git_command in ['git-receive-pack']:
 
            # updating refs manually after each push.
 
            # Needed for pre-1.7.0.4 git clients using regular HTTP mode.
 

	
 
            from kallithea.lib.vcs import get_repo
 
            repo = get_repo(self.content_path)
 
            if repo:
 
                update_server_info(repo._repo)
 

	
 
        resp = Response()
 
        resp.content_type = 'application/x-%s-result' % git_command
 
        resp.charset = None
 
        resp.app_iter = out
 
        return resp
 

	
 
    def __call__(self, environ, start_response):
 
        req = Request(environ)
 
        _path = self._get_fixedpath(req.path_info)
 
        if _path.startswith('info/refs'):
 
            app = self.inforefs
 
        elif req.accept.acceptable_offers(self.valid_accepts):
 
            app = self.backend
 
        try:
 
            resp = app(req, environ)
 
        except exc.HTTPException as e:
 
            resp = e
 
            log.error(traceback.format_exc())
 
        except Exception as e:
 
            log.error(traceback.format_exc())
kallithea/controllers/admin/settings.py
Show inline comments
 
@@ -14,48 +14,49 @@
 
"""
 
kallithea.controllers.admin.settings
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
settings controller for Kallithea admin
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jul 14, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import config, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
import kallithea
 
from kallithea.lib import webutils
 
from kallithea.lib.auth import HasPermissionAnyDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, render
 
from kallithea.lib.celerylib import tasks
 
from kallithea.lib.utils import repo2db_mapper, set_app_settings
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.vcs import VCSError
 
from kallithea.lib.webutils import url
 
from kallithea.model import db, meta
 
from kallithea.model.forms import ApplicationSettingsForm, ApplicationUiSettingsForm, ApplicationVisualisationForm
 
from kallithea.model.notification import EmailNotificationModel
 
from kallithea.model.scm import ScmModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SettingsController(BaseController):
 

	
 
    @LoginRequired(allow_default_user=True)
 
    def _before(self, *args, **kwargs):
 
        super(SettingsController, self)._before(*args, **kwargs)
 

	
 
    def _get_hg_ui_settings(self):
 
@@ -288,49 +289,48 @@ class SettingsController(BaseController)
 
            test_body = ('Kallithea Email test, '
 
                               'Kallithea version: %s' % c.kallithea_version)
 
            if not test_email:
 
                webutils.flash(_('Please enter email address'), category='error')
 
                raise HTTPFound(location=url('admin_settings_email'))
 

	
 
            test_email_txt_body = EmailNotificationModel() \
 
                .get_email_tmpl(EmailNotificationModel.TYPE_DEFAULT,
 
                                'txt', body=test_body)
 
            test_email_html_body = EmailNotificationModel() \
 
                .get_email_tmpl(EmailNotificationModel.TYPE_DEFAULT,
 
                                'html', body=test_body)
 

	
 
            recipients = [test_email] if test_email else None
 

	
 
            tasks.send_email(recipients, test_email_subj,
 
                             test_email_txt_body, test_email_html_body)
 

	
 
            webutils.flash(_('Send email task created'), category='success')
 
            raise HTTPFound(location=url('admin_settings_email'))
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        import kallithea
 
        c.ini = kallithea.CONFIG
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_hooks(self):
 
        c.active = 'hooks'
 
        if request.POST:
 
            if c.visual.allow_custom_hooks_settings:
 
                ui_key = request.POST.get('new_hook_ui_key')
 
                ui_value = request.POST.get('new_hook_ui_value')
 

	
 
                hook_id = request.POST.get('hook_id')
 

	
 
                try:
 
                    ui_key = ui_key and ui_key.strip()
 
                    if ui_key in (x.ui_key for x in db.Ui.get_custom_hooks()):
 
                        webutils.flash(_('Hook already exists'), category='error')
 
                    elif ui_key in (x.ui_key for x in db.Ui.get_builtin_hooks()):
 
                        webutils.flash(_('Builtin hooks are read-only. Please use another hook name.'), category='error')
 
@@ -378,35 +378,34 @@ class SettingsController(BaseController)
 
        c.active = 'search'
 
        if request.POST:
 
            repo_location = self._get_hg_ui_settings()['paths_root_path']
 
            full_index = request.POST.get('full_index', False)
 
            tasks.whoosh_index(repo_location, full_index)
 
            webutils.flash(_('Whoosh reindex task scheduled'), category='success')
 
            raise HTTPFound(location=url('admin_settings_search'))
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_system(self):
 
        c.active = 'system'
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        import kallithea
 
        c.ini = kallithea.CONFIG
 
        server_info = db.Setting.get_server_info()
 
        for key, val in server_info.items():
 
            setattr(c, key, val)
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
kallithea/controllers/login.py
Show inline comments
 
@@ -19,48 +19,49 @@ Login controller for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 22, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import re
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import request, session
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPFound
 

	
 
from kallithea.lib import webutils
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator
 
from kallithea.lib.base import BaseController, log_in_user, render
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.recaptcha import submit
 
from kallithea.lib.webutils import url
 
from kallithea.model import db, meta
 
from kallithea.model.forms import LoginForm, PasswordResetConfirmationForm, PasswordResetRequestForm, RegisterForm
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class LoginController(BaseController):
 

	
 
    def _validate_came_from(self, came_from,
 
            _re=re.compile(r"/(?!/)[-!#$%&'()*+,./:;=?@_~0-9A-Za-z]*$")):
 
        """Return True if came_from is valid and can and should be used.
 

	
 
        Determines if a URI reference is valid and relative to the origin;
 
        or in RFC 3986 terms, whether it matches this production:
 

	
 
          origin-relative-ref = path-absolute [ "?" query ] [ "#" fragment ]
 

	
 
        with the exception that '%' escapes are not validated and '#' is
 
        allowed inside the fragment part.
 
        """
 
        return _re.match(came_from) is not None
 
@@ -111,94 +112,92 @@ class LoginController(BaseController):
 
            if not request.authuser.is_anonymous:
 
                raise HTTPFound(location=c.came_from)
 
            # continue to show login to default user
 

	
 
        return render('/login.html')
 

	
 
    @HasPermissionAnyDecorator('hg.admin', 'hg.register.auto_activate',
 
                               'hg.register.manual_activate')
 
    def register(self):
 
        def_user_perms = AuthUser(dbuser=db.User.get_default_user()).global_permissions
 
        c.auto_active = 'hg.register.auto_activate' in def_user_perms
 

	
 
        settings = db.Setting.get_app_settings()
 
        captcha_private_key = settings.get('captcha_private_key')
 
        c.captcha_active = bool(captcha_private_key)
 
        c.captcha_public_key = settings.get('captcha_public_key')
 

	
 
        if request.POST:
 
            register_form = RegisterForm()()
 
            try:
 
                form_result = register_form.to_python(dict(request.POST))
 
                form_result['active'] = c.auto_active
 

	
 
                if c.captcha_active:
 
                    from kallithea.lib.recaptcha import submit
 
                    response = submit(request.POST.get('g-recaptcha-response'),
 
                                      private_key=captcha_private_key,
 
                                      remoteip=request.ip_addr)
 
                    if not response.is_valid:
 
                        _value = form_result
 
                        _msg = _('Bad captcha')
 
                        error_dict = {'recaptcha_field': _msg}
 
                        raise formencode.Invalid(_msg, _value, None,
 
                                                 error_dict=error_dict)
 

	
 
                UserModel().create_registration(form_result)
 
                webutils.flash(_('You have successfully registered with %s') % (c.site_name or 'Kallithea'),
 
                        category='success')
 
                meta.Session().commit()
 
                raise HTTPFound(location=url('login_home'))
 

	
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('/register.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 
            except UserCreationError as e:
 
                # container auth or other auth functions that create users on
 
                # the fly can throw this exception signaling that there's issue
 
                # with user creation, explanation should be provided in
 
                # Exception itself
 
                webutils.flash(e, 'error')
 

	
 
        return render('/register.html')
 

	
 
    def password_reset(self):
 
        settings = db.Setting.get_app_settings()
 
        captcha_private_key = settings.get('captcha_private_key')
 
        c.captcha_active = bool(captcha_private_key)
 
        c.captcha_public_key = settings.get('captcha_public_key')
 

	
 
        if request.POST:
 
            password_reset_form = PasswordResetRequestForm()()
 
            try:
 
                form_result = password_reset_form.to_python(dict(request.POST))
 
                if c.captcha_active:
 
                    from kallithea.lib.recaptcha import submit
 
                    response = submit(request.POST.get('g-recaptcha-response'),
 
                                      private_key=captcha_private_key,
 
                                      remoteip=request.ip_addr)
 
                    if not response.is_valid:
 
                        _value = form_result
 
                        _msg = _('Bad captcha')
 
                        error_dict = {'recaptcha_field': _msg}
 
                        raise formencode.Invalid(_msg, _value, None,
 
                                                 error_dict=error_dict)
 
                redirect_link = UserModel().send_reset_password_email(form_result)
 
                webutils.flash(_('A password reset confirmation code has been sent'),
 
                            category='success')
 
                raise HTTPFound(location=redirect_link)
 

	
 
            except formencode.Invalid as errors:
 
                return htmlfill.render(
 
                    render('/password_reset.html'),
 
                    defaults=errors.value,
 
                    errors=errors.error_dict or {},
 
                    prefix_error=False,
 
                    encoding="UTF-8",
 
                    force_defaults=False)
 

	
 
        return render('/password_reset.html')
kallithea/controllers/routing.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Routes configuration
 

	
 
The more specific and detailed routes should be defined first so they
 
may take precedent over the more generic routes. For more information
 
refer to the routes manual at http://routes.groovie.org/docs/
 
"""
 

	
 
import routes
 

	
 
import kallithea
 
from kallithea.lib.utils import is_valid_repo, is_valid_repo_group
 
from kallithea.lib.utils2 import safe_str
 

	
 

	
 
class Mapper(routes.Mapper):
 
    """
 
    Subclassed Mapper with routematch patched to decode "unicode" str url to
 
    *real* unicode str before applying matches and invoking controller methods.
 
    """
 

	
 
    def routematch(self, url=None, environ=None):
 
        """
 
        routematch that also decode url from "fake bytes" to real unicode
 
        string before matching and invoking controllers.
 
        """
 
        # Process url like get_path_info does ... but PATH_INFO has already
 
        # been retrieved from environ and is passed, so - let's just use that
 
        # instead.
 
        url = safe_str(url.encode('latin1'))
 
        return super().routematch(url=url, environ=environ)
 

	
 

	
 
def make_map(config):
 
    """Create, configure and return the routes Mapper"""
 
    rmap = Mapper(directory=config['paths']['controllers'],
 
                  always_scan=config['debug'])
 
    rmap.minimization = False
 
    rmap.explicit = False
 

	
 
    from kallithea.lib.utils import is_valid_repo, is_valid_repo_group
 

	
 
    def check_repo(environ, match_dict):
 
        """
 
        Check for valid repository for proper 404 handling.
 
        Also, a bit of side effect modifying match_dict ...
 
        """
 
        if match_dict.get('f_path'):
 
            # fix for multiple initial slashes that causes errors
 
            match_dict['f_path'] = match_dict['f_path'].lstrip('/')
 

	
 
        return is_valid_repo(match_dict['repo_name'], config['base_path'])
 

	
 
    def check_group(environ, match_dict):
 
        """
 
        check for valid repository group for proper 404 handling
 

	
 
        :param environ:
 
        :param match_dict:
 
        """
 
        repo_group_name = match_dict.get('group_name')
 
        return is_valid_repo_group(repo_group_name, config['base_path'])
 

	
 
    def check_group_skip_path(environ, match_dict):
 
        """
 
        check for valid repository group for proper 404 handling, but skips
kallithea/lib/helpers.py
Show inline comments
 
@@ -6,48 +6,49 @@
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Helper functions
 

	
 
Consists of functions to typically be used within templates, but also
 
available to Controllers. This module is available to both as 'h'.
 
"""
 
import hashlib
 
import json
 
import logging
 
import re
 
import textwrap
 
import urllib.parse
 

	
 
from beaker.cache import cache_region
 
from pygments import highlight as code_highlight
 
from pygments.formatters.html import HtmlFormatter
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea.lib.annotate import annotate_highlight
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel
 
from kallithea.lib.diffs import BIN_FILENODE, CHMOD_FILENODE, DEL_FILENODE, MOD_FILENODE, NEW_FILENODE, RENAMED_FILENODE
 
from kallithea.lib.markup_renderer import url_re
 
from kallithea.lib.pygmentsutils import get_custom_lexer
 
from kallithea.lib.utils2 import (MENTIONS_REGEX, AttributeDict, age, asbool, credentials_filter, fmt_date, link_to_ref, safe_bytes, safe_int, safe_str,
 
                                  shorter, time_to_datetime)
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
#==============================================================================
 
# SCM FILTERS available via h.
 
#==============================================================================
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.webutils import (HTML, Option, canonical_url, checkbox, chop_at, end_form, escape, form, format_byte_size, hidden, html_escape, link_to,
 
                                    literal, password, pop_flash_messages, radio, reset, safeid, select, session_csrf_secret_name, session_csrf_secret_token,
 
                                    submit, text, textarea, truncate, url, wrap_paragraphs)
 
from kallithea.model import db
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
@@ -750,94 +751,90 @@ def action_parser(user_log, feed=False, 
 
        action = user_log.action
 
        action_params = None
 
        x = action.split(':')
 

	
 
        if len(x) > 1:
 
            action, action_params = x
 

	
 
        ico = action_map.get(action, ['', '', ''])[2]
 
        html = """<i class="%s"></i>""" % ico
 
        return literal(html)
 

	
 
    # returned callbacks we need to call to get
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 
def gravatar_div(email_address, cls='', size=30, **div_attributes):
 
    """Return an html literal with a span around a gravatar if they are enabled.
 
    Extra keyword parameters starting with 'div_' will get the prefix removed
 
    and '_' changed to '-' and be used as attributes on the div. The default
 
    class is 'gravatar'.
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 
    if 'div_class' not in div_attributes:
 
        div_attributes['div_class'] = "gravatar"
 
    attributes = []
 
    for k, v in sorted(div_attributes.items()):
 
        assert k.startswith('div_'), k
 
        attributes.append(' %s="%s"' % (k[4:].replace('_', '-'), escape(v)))
 
    return literal("""<span%s>%s</span>""" %
 
                   (''.join(attributes),
 
                    gravatar(email_address, cls=cls, size=size)))
 

	
 

	
 
def gravatar(email_address, cls='', size=30):
 
    """return html element of the gravatar
 

	
 
    This method will return an <img> with the resolution double the size (for
 
    retina screens) of the image. If the url returned from gravatar_url is
 
    empty then we fallback to using an icon.
 

	
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 

	
 
    src = gravatar_url(email_address, size * 2)
 

	
 
    if src:
 
        # here it makes sense to use style="width: ..." (instead of, say, a
 
        # stylesheet) because we using this to generate a high-res (retina) size
 
        html = ('<i class="icon-gravatar {cls}"'
 
                ' style="font-size: {size}px;background-size: {size}px;background-image: url(\'{src}\')"'
 
                '></i>').format(cls=cls, size=size, src=src)
 

	
 
    else:
 
        # if src is empty then there was no gravatar, so we use a font icon
 
        html = ("""<i class="icon-user {cls}" style="font-size: {size}px;"></i>"""
 
            .format(cls=cls, size=size))
 

	
 
    return literal(html)
 

	
 

	
 
def gravatar_url(email_address, size=30, default=''):
 
    from tg import tmpl_context as c
 

	
 
    if not c.visual.use_gravatar:
 
        return ""
 

	
 
    _def = 'anonymous@kallithea-scm.org'  # default gravatar
 
    email_address = email_address or _def
 

	
 
    if email_address == _def:
 
        return default
 

	
 
    parsed_url = urllib.parse.urlparse(url.current(qualified=True))
 
    return (c.visual.gravatar_url or db.User.DEFAULT_GRAVATAR_URL) \
 
               .replace('{email}', email_address) \
 
               .replace('{md5email}', hashlib.md5(safe_bytes(email_address).lower()).hexdigest()) \
 
               .replace('{netloc}', parsed_url.netloc) \
 
               .replace('{scheme}', parsed_url.scheme) \
 
               .replace('{size}', str(size))
 

	
 

	
 
def changed_tooltip(nodes):
 
    """
 
    Generates a html string for changed nodes in changeset page.
 
    It limits the output to 30 entries
 

	
 
    :param nodes: LazyNodesGenerator
kallithea/lib/hooks.py
Show inline comments
 
@@ -9,48 +9,49 @@
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.hooks
 
~~~~~~~~~~~~~~~~~~~
 

	
 
Hooks run by Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import time
 

	
 
import mercurial.scmutil
 
import paste.deploy
 

	
 
import kallithea
 
from kallithea.lib import webutils
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils import action_logger, make_ui
 
from kallithea.lib.utils2 import HookEnvironmentError, ascii_str, get_hook_environment, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.model import db
 

	
 

	
 
def _get_scm_size(alias, root_path):
 
    if not alias.startswith('.'):
 
        alias += '.'
 

	
 
    size_scm, size_root = 0, 0
 
    for path, dirs, files in os.walk(root_path):
 
        if path.find(alias) != -1:
 
            for f in files:
 
                try:
 
                    size_scm += os.path.getsize(os.path.join(path, f))
 
                except OSError:
 
                    pass
 
        else:
 
            for f in files:
 
@@ -280,50 +281,48 @@ def log_delete_user(user_dict, deleted_b
 
     'ldap_dn',
 
     'email',
 
     'api_key',
 
     'last_login',
 
     'full_name',
 
     'active',
 
     'password',
 
     'emails',
 

	
 
    """
 
    callback = getattr(kallithea.EXTENSIONS, 'DELETE_USER_HOOK', None)
 
    if callable(callback):
 
        callback(deleted_by=deleted_by, **user_dict)
 

	
 

	
 
def _hook_environment(repo_path):
 
    """
 
    Create a light-weight environment for stand-alone scripts and return an UI and the
 
    db repository.
 

	
 
    Git hooks are executed as subprocess of Git while Kallithea is waiting, and
 
    they thus need enough info to be able to create an app environment and
 
    connect to the database.
 
    """
 
    import paste.deploy
 

	
 
    import kallithea.config.application
 

	
 
    extras = get_hook_environment()
 

	
 
    path_to_ini_file = extras['config']
 
    config = paste.deploy.appconfig('config:' + path_to_ini_file)
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    kallithea.config.application.make_app(config.global_conf, **config.local_conf)
 

	
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
 

	
 
    repo = db.Repository.get_by_full_path(repo_path)
 
    if not repo:
 
        raise OSError('Repository %s not found in database' % repo_path)
 

	
 
    baseui = make_ui()
 
    return baseui, repo
 

	
 

	
 
def handle_git_pre_receive(repo_path, git_stdin_lines):
 
    """Called from Git pre-receive hook.
 
    The returned value is used as hook exit code and must be 0.
kallithea/lib/utils2.py
Show inline comments
 
@@ -18,48 +18,49 @@ kallithea.lib.utils2
 
Some simple helper functions.
 
Note: all these functions should be independent of Kallithea classes, i.e.
 
models, controllers, etc.  to prevent import cycles.
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jan 5, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import binascii
 
import datetime
 
import json
 
import os
 
import re
 
import time
 
import urllib.parse
 

	
 
import urlobject
 
from dateutil import relativedelta
 
from sqlalchemy.engine import url as sa_url
 
from sqlalchemy.exc import ArgumentError
 
from tg import tmpl_context
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from tg.support.converters import asbool, aslist
 
from webhelpers2.text import collapse, remove_formatting, strip_tags
 

	
 
import kallithea
 
from kallithea.lib import webutils
 
from kallithea.lib.vcs.backends.base import BaseRepository, EmptyChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, safe_bytes, safe_str  # re-export
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 

	
 
try:
 
    import pwd
 
except ImportError:
 
    pass
 

	
 

	
 
# mute pyflakes "imported but unused"
 
assert asbool
 
assert aslist
 
assert ascii_bytes
 
assert ascii_str
 
@@ -470,49 +471,48 @@ def set_hook_environment(username, ip_ad
 
    """Prepare global context for running hooks by serializing data in the
 
    global KALLITHEA_EXTRAS environment variable.
 

	
 
    Most importantly, this allow Git hooks to do proper logging and updating of
 
    caches after pushes.
 

	
 
    Must always be called before anything with hooks are invoked.
 
    """
 
    extras = {
 
        'ip': ip_addr, # used in log_push/pull_action action_logger
 
        'username': username,
 
        'action': action or 'push_local', # used in log_push_action_raw_ids action_logger
 
        'repository': repo_name,
 
        'scm': repo_alias, # used to pick hack in log_push_action_raw_ids
 
        'config': kallithea.CONFIG['__file__'], # used by git hook to read config
 
    }
 
    os.environ['KALLITHEA_EXTRAS'] = json.dumps(extras)
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets kallithea user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    from tg import tmpl_context
 
    try:
 
        return getattr(tmpl_context, 'authuser', None)
 
    except TypeError:  # No object (name: context) has been registered for this thread
 
        return None
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', s).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
 
    Recursive replace of given sign to just one instance
 

	
 
    :param str_: given string
 
    :param replace: char to find and replace multiple instances
 

	
 
    Examples::
 
    >>> recursive_replace("Mighty---Mighty-Bo--sstones",'-')
 
    'Mighty-Mighty-Bo-sstones'
 
    """
 

	
 
    if str_.find(replace * 2) == -1:
 
        return str_
kallithea/lib/vcs/backends/git/inmemory.py
Show inline comments
 
import datetime
 
import posixpath
 
import stat
 
import time
 

	
 
from dulwich import objects
 

	
 
from kallithea.lib.vcs.backends.base import BaseInMemoryChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import ascii_str, safe_bytes
 

	
 
from . import repository
 

	
 

	
 
class GitInMemoryChangeset(BaseInMemoryChangeset):
 

	
 
    def commit(self, message, author, parents=None, branch=None, date=None,
 
               **kwargs):
 
        """
 
        Performs in-memory commit (doesn't check workdir in any way) and
 
        returns newly created ``Changeset``. Updates repository's
 
        ``revisions``.
 

	
 
        :param message: message of the commit
 
        :param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
 
        :param parents: single parent or sequence of parents from which commit
 
          would be derived
 
        :param date: ``datetime.datetime`` instance. Defaults to
 
          ``datetime.datetime.now()``.
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        self.check_integrity(parents)
 

	
 
        from .repository import GitRepository
 
        if branch is None:
 
            branch = GitRepository.DEFAULT_BRANCH_NAME
 
            branch = repository.GitRepository.DEFAULT_BRANCH_NAME
 

	
 
        repo = self.repository._repo
 
        object_store = repo.object_store
 

	
 
        ENCODING = b"UTF-8"  # TODO: should probably be kept in sync with safe_str/safe_bytes and vcs/conf/settings.py DEFAULT_ENCODINGS
 

	
 
        # Create tree and populates it with blobs
 
        commit_tree = self.parents[0] and repo[self.parents[0]._commit.tree] or \
 
            objects.Tree()
 
        for node in self.added + self.changed:
 
            # Compute subdirs if needed
 
            dirpath, nodename = posixpath.split(node.path)
 
            dirnames = safe_bytes(dirpath).split(b'/') if dirpath else []
 
            parent = commit_tree
 
            ancestors = [('', parent)]
 

	
 
            # Tries to dig for the deepest existing tree
 
            while dirnames:
 
                curdir = dirnames.pop(0)
 
                try:
 
                    dir_id = parent[curdir][1]
 
                except KeyError:
 
                    # put curdir back into dirnames and stops
 
                    dirnames.insert(0, curdir)
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
@@ -15,51 +15,49 @@ import os
 
import re
 
import time
 
import urllib.error
 
import urllib.parse
 
import urllib.request
 
from collections import OrderedDict
 

	
 
import mercurial.util  # import url as hg_url
 
from dulwich.client import SubprocessGitClient
 
from dulwich.config import ConfigFile
 
from dulwich.objects import Tag
 
from dulwich.repo import NotGitRepository, Repo
 
from dulwich.server import update_server_info
 

	
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
 
                                          TagDoesNotExistError)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_str
 
from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
 

	
 
from .changeset import GitChangeset
 
from .inmemory import GitInMemoryChangeset
 
from .workdir import GitWorkdir
 
from . import changeset, inmemory, workdir
 

	
 

	
 
SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitRepository(BaseRepository):
 
    """
 
    Git repository backend.
 
    """
 
    DEFAULT_BRANCH_NAME = 'master'
 
    scm = 'git'
 

	
 
    def __init__(self, repo_path, create=False, src_url=None,
 
                 update_after_clone=False, bare=False):
 

	
 
        self.path = abspath(repo_path)
 
        self.repo = self._get_repo(create, src_url, update_after_clone, bare)
 
        self.bare = self.repo.bare
 

	
 
    @property
 
    def _config_files(self):
 
        return [
 
@@ -446,51 +444,51 @@ class GitRepository(BaseRepository):
 
                        if isinstance(obj, Tag):
 
                            sha = _repo.get_object(sha).object[1]
 
                    _refs[_key] = [sha, type_]
 
                    break
 
        return _refs
 

	
 
    def _heads(self, reverse=False):
 
        refs = self._repo.get_refs()
 
        heads = {}
 

	
 
        for key, val in refs.items():
 
            for ref_key in [b'refs/heads/', b'refs/remotes/origin/']:
 
                if key.startswith(ref_key):
 
                    n = key[len(ref_key):]
 
                    if n not in [b'HEAD']:
 
                        heads[n] = val
 

	
 
        return heads if reverse else dict((y, x) for x, y in heads.items())
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``GitChangeset`` object representing commit from git repository
 
        at the given revision or head (most recent commit) if None given.
 
        """
 
        if isinstance(revision, GitChangeset):
 
        if isinstance(revision, changeset.GitChangeset):
 
            return revision
 
        return GitChangeset(repository=self, revision=self._get_revision(revision))
 
        return changeset.GitChangeset(repository=self, revision=self._get_revision(revision))
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
           end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``GitChangeset`` objects from start to end (both
 
        are inclusive), in ascending date order (unless ``reverse`` is set).
 

	
 
        :param start: changeset ID, as str; first returned changeset
 
        :param end: changeset ID, as str; last returned changeset
 
        :param start_date: if specified, changesets with commit date less than
 
          ``start_date`` would be filtered out from returned set
 
        :param end_date: if specified, changesets with commit date greater than
 
          ``end_date`` would be filtered out from returned set
 
        :param branch_name: if specified, changesets not reachable from given
 
          branch would be filtered out from returned set
 
        :param reverse: if ``True``, returned generator would be reversed
 
          (meaning that returned changesets would have descending date order)
 

	
 
        :raise BranchDoesNotExistError: If given ``branch_name`` does not
 
            exist.
 
        :raise ChangesetDoesNotExistError: If changeset for given ``start`` or
 
          ``end`` could not be found.
 

	
 
        """
 
@@ -648,49 +646,49 @@ class GitRepository(BaseRepository):
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['show'] + flags + [rev2]
 
        else:
 
            rev1 = self.get_changeset(rev1).raw_id
 
            rev2 = self.get_changeset(rev2).raw_id
 
            cmd = ['diff'] + flags + [rev1, rev2]
 

	
 
        if path:
 
            cmd += ['--', path]
 

	
 
        stdout, stderr = self._run_git_command(cmd, cwd=self.path)
 
        # If we used 'show' command, strip first few lines (until actual diff
 
        # starts)
 
        if rev1 == self.EMPTY_CHANGESET:
 
            parts = stdout.split(b'\ndiff ', 1)
 
            if len(parts) > 1:
 
                stdout = b'diff ' + parts[1]
 
        return stdout
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        """
 
        Returns ``GitInMemoryChangeset`` object for this repository.
 
        """
 
        return GitInMemoryChangeset(self)
 
        return inmemory.GitInMemoryChangeset(self)
 

	
 
    def clone(self, url, update_after_clone=True, bare=False):
 
        """
 
        Tries to clone changes from external location.
 

	
 
        :param update_after_clone: If set to ``False``, git won't checkout
 
          working directory
 
        :param bare: If set to ``True``, repository would be cloned into
 
          *bare* git repository (no working directory at all).
 
        """
 
        url = self._get_url(url)
 
        cmd = ['clone', '-q']
 
        if bare:
 
            cmd.append('--bare')
 
        elif not update_after_clone:
 
            cmd.append('--no-checkout')
 
        cmd += ['--', url, self.path]
 
        # If error occurs run_git_command raises RepositoryError already
 
        self.run_git_command(cmd)
 

	
 
    def pull(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
@@ -707,49 +705,49 @@ class GitRepository(BaseRepository):
 
        so = self.run_git_command(['ls-remote', '-h', url])
 
        cmd = ['fetch', url, '--']
 
        for line in (x for x in so.splitlines()):
 
            sha, ref = line.split('\t')
 
            cmd.append('+%s:%s' % (ref, ref))
 
        self.run_git_command(cmd)
 

	
 
    def _update_server_info(self):
 
        """
 
        runs gits update-server-info command in this repo instance
 
        """
 
        try:
 
            update_server_info(self._repo)
 
        except OSError as e:
 
            if e.errno not in [errno.ENOENT, errno.EROFS]:
 
                raise
 
            # Workaround for dulwich crashing on for example its own dulwich/tests/data/repos/simple_merge.git/info/refs.lock
 
            log.error('Ignoring %s running update-server-info: %s', type(e).__name__, e)
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return GitWorkdir(self)
 
        return workdir.GitWorkdir(self)
 

	
 
    def get_config_value(self, section, name, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
 
        elif isinstance(config_file, str):
 
            config_file = [config_file]
 

	
 
        def gen_configs():
 
            for path in config_file + self._config_files:
 
                try:
 
                    yield ConfigFile.from_path(path)
 
                except (IOError, OSError, ValueError):
 
                    continue
 

	
 
        for config in gen_configs():
 
            try:
kallithea/lib/vcs/backends/hg/inmemory.py
Show inline comments
 
import datetime
 

	
 
import mercurial.context
 
import mercurial.node
 

	
 
from kallithea.lib.vcs.backends.base import BaseInMemoryChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import ascii_str, safe_bytes, safe_str
 

	
 
from . import repository
 

	
 

	
 
class MercurialInMemoryChangeset(BaseInMemoryChangeset):
 

	
 
    def commit(self, message, author, parents=None, branch=None, date=None,
 
               **kwargs):
 
        """
 
        Performs in-memory commit (doesn't check workdir in any way) and
 
        returns newly created ``Changeset``. Updates repository's
 
        ``revisions``.
 

	
 
        :param message: message of the commit
 
        :param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
 
        :param parents: single parent or sequence of parents from which commit
 
          would be derived
 
        :param date: ``datetime.datetime`` instance. Defaults to
 
          ``datetime.datetime.now()``.
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        self.check_integrity(parents)
 

	
 
        if not isinstance(message, str):
 
            raise RepositoryError('message must be a str - got %r' % type(message))
 
        if not isinstance(author, str):
 
            raise RepositoryError('author must be a str - got %r' % type(author))
 

	
 
        from .repository import MercurialRepository
 
        if branch is None:
 
            branch = MercurialRepository.DEFAULT_BRANCH_NAME
 
            branch = repository.MercurialRepository.DEFAULT_BRANCH_NAME
 
        kwargs[b'branch'] = safe_bytes(branch)
 

	
 
        def filectxfn(_repo, memctx, bytes_path):
 
            """
 
            Callback from Mercurial, returning ctx to commit for the given
 
            path.
 
            """
 
            path = safe_str(bytes_path)
 

	
 
            # check if this path is removed
 
            if path in (node.path for node in self.removed):
 
                return None
 

	
 
            # check if this path is added
 
            for node in self.added:
 
                if node.path == path:
 
                    return mercurial.context.memfilectx(_repo, memctx, path=bytes_path,
 
                        data=node.content,
 
                        islink=False,
 
                        isexec=node.is_executable,
 
                        copysource=False)
 

	
 
            # or changed
 
            for node in self.changed:
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -23,51 +23,49 @@ import mercurial.error
 
import mercurial.exchange
 
import mercurial.hg
 
import mercurial.hgweb
 
import mercurial.httppeer
 
import mercurial.localrepo
 
import mercurial.match
 
import mercurial.mdiff
 
import mercurial.node
 
import mercurial.patch
 
import mercurial.scmutil
 
import mercurial.sshpeer
 
import mercurial.tags
 
import mercurial.ui
 
import mercurial.unionrepo
 
import mercurial.util
 

	
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
 
                                          TagDoesNotExistError, VCSError)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, author_email, author_name, date_fromtimestamp, makedate, safe_bytes, safe_str
 
from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 
from .changeset import MercurialChangeset
 
from .inmemory import MercurialInMemoryChangeset
 
from .workdir import MercurialWorkdir
 
from . import changeset, inmemory, workdir
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialRepository(BaseRepository):
 
    """
 
    Mercurial repository backend
 
    """
 
    DEFAULT_BRANCH_NAME = 'default'
 
    scm = 'hg'
 

	
 
    def __init__(self, repo_path, create=False, baseui=None, src_url=None,
 
                 update_after_clone=False):
 
        """
 
        Raises RepositoryError if repository could not be find at the given
 
        ``repo_path``.
 

	
 
        :param repo_path: local path of the repository
 
        :param create=False: if set to True, would try to create repository if
 
           it does not exist rather than raising exception
 
        :param baseui=None: user data
 
        :param src_url=None: would try to clone repository from given location
 
        :param update_after_clone=False: sets update of working copy after
 
@@ -359,49 +357,49 @@ class MercurialRepository(BaseRepository
 
        """
 
        try:
 
            if src_url:
 
                url = safe_bytes(self._get_url(src_url))
 
                opts = {}
 
                if not update_after_clone:
 
                    opts.update({'noupdate': True})
 
                MercurialRepository._check_url(url, self.baseui)
 
                mercurial.commands.clone(self.baseui, url, safe_bytes(self.path), **opts)
 

	
 
                # Don't try to create if we've already cloned repo
 
                create = False
 
            return mercurial.localrepo.instance(self.baseui, safe_bytes(self.path), create=create)
 
        except (mercurial.error.Abort, mercurial.error.RepoError) as err:
 
            if create:
 
                msg = "Cannot create repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            else:
 
                msg = "Not valid repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            raise RepositoryError(msg)
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        return MercurialInMemoryChangeset(self)
 
        return inmemory.MercurialInMemoryChangeset(self)
 

	
 
    @LazyProperty
 
    def description(self):
 
        _desc = self._repo.ui.config(b'web', b'description', None, untrusted=True)
 
        return safe_str(_desc or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        return safe_str(mercurial.hgweb.common.get_contact(self._repo.ui.config)
 
                            or b'Unknown')
 

	
 
    @LazyProperty
 
    def last_change(self):
 
        """
 
        Returns last change made on this repository as datetime object
 
        """
 
        return date_fromtimestamp(self._get_mtime(), makedate()[1])
 

	
 
    def _get_mtime(self):
 
        try:
 
            return time.mktime(self.get_changeset().date.timetuple())
 
        except RepositoryError:
 
            # fallback to filesystem
 
            cl_path = os.path.join(self.path, '.hg', "00changelog.i")
 
@@ -469,49 +467,49 @@ class MercurialRepository(BaseRepository
 

	
 
    def _get_archives(self, archive_name='tip'):
 
        allowed = self.baseui.configlist(b"web", b"allow_archive",
 
                                         untrusted=True)
 
        for name, ext in [(b'zip', '.zip'), (b'gz', '.tar.gz'), (b'bz2', '.tar.bz2')]:
 
            if name in allowed or self._repo.ui.configbool(b"web",
 
                                                           b"allow" + name,
 
                                                           untrusted=True):
 
                yield {"type": safe_str(name), "extension": ext, "node": archive_name}
 

	
 
    def _get_url(self, url):
 
        """
 
        Returns normalized url. If schema is not given, fall back to
 
        filesystem (``file:///``) schema.
 
        """
 
        if url != 'default' and '://' not in url:
 
            url = "file:" + urllib.request.pathname2url(url)
 
        return url
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``MercurialChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return MercurialChangeset(repository=self, revision=self._get_revision(revision))
 
        return changeset.MercurialChangeset(repository=self, revision=self._get_revision(revision))
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``MercurialChangeset`` objects from start to end
 
        (both are inclusive)
 

	
 
        :param start: None, str, int or mercurial lookup format
 
        :param end:  None, str, int or mercurial lookup format
 
        :param start_date:
 
        :param end_date:
 
        :param branch_name:
 
        :param reversed: return changesets in reversed order
 
        """
 
        start_raw_id = self._get_revision(start)
 
        start_pos = None if start is None else self.revisions.index(start_raw_id)
 
        end_raw_id = self._get_revision(end)
 
        end_pos = None if end is None else self.revisions.index(end_raw_id)
 

	
 
        if start_pos is not None and end_pos is not None and start_pos > end_pos:
 
            raise RepositoryError("Start revision '%s' cannot be "
 
                                  "after end revision '%s'" % (start, end))
 

	
 
        if branch_name and branch_name not in self.allbranches:
 
@@ -595,49 +593,49 @@ class MercurialRepository(BaseRepository
 
                self.get_changeset(ascii_str(hgrepo[rev].hex()))
 
                for rev in hgrepo.revs(
 
                    b"ancestors(id(%s)) and not ancestors(id(%s)) and not id(%s)",
 
                    ascii_bytes(org_rev), ascii_bytes(other_rev), ascii_bytes(other_rev))
 
            ]
 

	
 
        return other_changesets, org_changesets, ancestors
 

	
 
    def pull(self, url):
 
        """
 
        Tries to pull changes from external location.
 
        """
 
        other = mercurial.hg.peer(self._repo, {}, safe_bytes(self._get_url(url)))
 
        try:
 
            mercurial.exchange.pull(self._repo, other, heads=None, force=None)
 
        except mercurial.error.Abort as err:
 
            # Propagate error but with vcs's type
 
            raise RepositoryError(str(err))
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return MercurialWorkdir(self)
 
        return workdir.MercurialWorkdir(self)
 

	
 
    def get_config_value(self, section, name=None, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
 
        elif isinstance(config_file, str):
 
            config_file = [config_file]
 

	
 
        config = self._repo.ui
 
        if config_file:
 
            config = mercurial.ui.ui()
 
            for path in config_file:
 
                config.readconfig(safe_bytes(path))
 
        value = config.config(safe_bytes(section), safe_bytes(name))
 
        return value if value is None else safe_str(value)
 

	
 
    def get_user_name(self, config_file=None):
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
"""
 
Utilities aimed to help achieve mostly basic tasks.
 
"""
 

	
 
import datetime
 
import logging
 
import os
 
import re
 
import time
 
import urllib.request
 

	
 
import mercurial.url
 
from pygments import highlight
 
from pygments.formatters import TerminalFormatter
 
from pygments.lexers import ClassNotFound, guess_lexer_for_filename
 

	
 
from kallithea.lib.vcs import backends
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 

	
 
ALIASES = ['hg', 'git']
 

	
 

	
 
def get_scm(path, search_up=False, explicit_alias=None):
 
    """
 
    Returns one of alias from ``ALIASES`` (in order of precedence same as
 
    shortcuts given in ``ALIASES``) and top working dir path for the given
 
    argument. If no scm-specific directory is found or more than one scm is
 
    found at that directory, ``VCSError`` is raised.
 

	
 
    :param search_up: if set to ``True``, this function would try to
 
      move up to parent directory every time no scm is recognized for the
 
      currently checked path. Default: ``False``.
 
    :param explicit_alias: can be one of available backend aliases, when given
 
      it will return given explicit alias in repositories under more than one
 
      version control, if explicit_alias is different than found it will raise
 
      VCSError
 
    """
 
    if not os.path.isdir(path):
 
@@ -50,69 +51,68 @@ def get_scm(path, search_up=False, expli
 
        path = newpath
 

	
 
    if len(found_scms) > 1:
 
        for scm in found_scms:
 
            if scm[0] == explicit_alias:
 
                return scm
 
        raise VCSError('More than one [%s] scm found at given path %s'
 
                       % (', '.join((x[0] for x in found_scms)), path))
 

	
 
    if len(found_scms) == 0:
 
        raise VCSError('No scm found at given path %s' % path)
 

	
 
    return found_scms[0]
 

	
 

	
 
def get_scms_for_path(path):
 
    """
 
    Returns all scm's found at the given path. If no scm is recognized
 
    - empty list is returned.
 

	
 
    :param path: path to directory which should be checked. May be callable.
 

	
 
    :raises VCSError: if given ``path`` is not a directory
 
    """
 
    from kallithea.lib.vcs.backends import get_backend
 
    if hasattr(path, '__call__'):
 
        path = path()
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %r is not a directory" % path)
 

	
 
    result = []
 
    for key in ALIASES:
 
        # find .hg / .git
 
        dirname = os.path.join(path, '.' + key)
 
        if os.path.isdir(dirname):
 
            result.append(key)
 
            continue
 
        # find rm__.hg / rm__.git too - left overs from old method for deleting
 
        dirname = os.path.join(path, 'rm__.' + key)
 
        if os.path.isdir(dirname):
 
            return result
 
        # We still need to check if it's not bare repository as
 
        # bare repos don't have working directories
 
        try:
 
            get_backend(key)(path)
 
            backends.get_backend(key)(path)
 
            result.append(key)
 
            continue
 
        except RepositoryError:
 
            # Wrong backend
 
            pass
 
        except VCSError:
 
            # No backend at all
 
            pass
 
    return result
 

	
 

	
 
def get_highlighted_code(name, code, type='terminal'):
 
    """
 
    If pygments are available on the system
 
    then returned output is colored. Otherwise
 
    unchanged content is returned.
 
    """
 
    try:
 
        lexer = guess_lexer_for_filename(name, code)
 
        formatter = TerminalFormatter()
 
        content = highlight(code, lexer, formatter)
 
    except ClassNotFound:
 
        logging.debug("Couldn't guess Lexer, will not use pygments.")
 
        content = code
kallithea/model/db.py
Show inline comments
 
@@ -16,60 +16,61 @@ kallithea.model.db
 
~~~~~~~~~~~~~~~~~~
 

	
 
Database Models for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 08, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import logging
 
import os
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
import urlobject
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json, ssh, webutils
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import asbool, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, urlreadable
 
from kallithea.lib.vcs import get_backend, get_repo
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model import meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
class BaseDbModel(object):
 
    """
 
    Base Model for all classes
 
    """
 

	
 
    @classmethod
 
    def _get_keys(cls):
 
        """return column names for this model """
 
        # Note: not a normal dict - iterator gives "users.firstname", but keys gives "firstname"
 
        return class_mapper(cls).c.keys()
 

	
 
    def get_dict(self):
 
        """
 
@@ -1150,49 +1151,48 @@ class Repository(meta.Base, BaseDbModel)
 
            scm_repo = repo.scm_instance_no_cache()
 
            data.update(dict(
 
                tags=scm_repo.tags,
 
                branches=scm_repo.branches,
 
                bookmarks=scm_repo.bookmarks,
 
            ))
 
        if with_pullrequests:
 
            data['pull_requests'] = repo.pull_requests_other
 
        settings = Setting.get_app_settings()
 
        repository_fields = asbool(settings.get('repository_fields'))
 
        if repository_fields:
 
            for f in self.extra_fields:
 
                data[f.field_key_prefixed] = f.field_value
 

	
 
        return data
 

	
 
    @property
 
    def last_db_change(self):
 
        return self.updated_on
 

	
 
    @property
 
    def clone_uri_hidden(self):
 
        clone_uri = self.clone_uri
 
        if clone_uri:
 
            import urlobject
 
            url_obj = urlobject.URLObject(self.clone_uri)
 
            if url_obj.password:
 
                clone_uri = url_obj.with_password('*****')
 
        return clone_uri
 

	
 
    def clone_url(self, clone_uri_tmpl, with_id=False, username=None):
 
        if '{repo}' not in clone_uri_tmpl and '_{repoid}' not in clone_uri_tmpl:
 
            log.error("Configured clone_uri_tmpl %r has no '{repo}' or '_{repoid}' and cannot toggle to use repo id URLs", clone_uri_tmpl)
 
        elif with_id:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('{repo}', '_{repoid}')
 
        else:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('_{repoid}', '{repo}')
 

	
 
        prefix_url = webutils.canonical_url('home')
 

	
 
        return get_clone_url(clone_uri_tmpl=clone_uri_tmpl,
 
                             prefix_url=prefix_url,
 
                             repo_name=self.repo_name,
 
                             repo_id=self.repo_id,
 
                             username=username)
 

	
 
    def set_state(self, state):
 
        self.repo_state = state
 

	
 
@@ -1205,49 +1205,48 @@ class Repository(meta.Base, BaseDbModel)
 

	
 
    def get_landing_changeset(self):
 
        """
 
        Returns landing changeset, or if that doesn't exist returns the tip
 
        """
 
        _rev_type, _rev = self.landing_rev
 
        cs = self.get_changeset(_rev)
 
        if isinstance(cs, EmptyChangeset):
 
            return self.get_changeset()
 
        return cs
 

	
 
    def update_changeset_cache(self, cs_cache=None):
 
        """
 
        Update cache of last changeset for repository, keys should be::
 

	
 
            short_id
 
            raw_id
 
            revision
 
            message
 
            date
 
            author
 

	
 
        :param cs_cache:
 
        """
 
        from kallithea.lib.vcs.backends.base import BaseChangeset
 
        if cs_cache is None:
 
            cs_cache = EmptyChangeset()
 
            # use no-cache version here
 
            scm_repo = self.scm_instance_no_cache()
 
            if scm_repo:
 
                cs_cache = scm_repo.get_changeset()
 

	
 
        if isinstance(cs_cache, BaseChangeset):
 
            cs_cache = cs_cache.__json__()
 

	
 
        if (not self.changeset_cache or cs_cache['raw_id'] != self.changeset_cache['raw_id']):
 
            _default = datetime.datetime.fromtimestamp(0)
 
            last_change = cs_cache.get('date') or _default
 
            log.debug('updated repo %s with new cs cache %s',
 
                      self.repo_name, cs_cache)
 
            self.updated_on = last_change
 
            self.changeset_cache = cs_cache
 
            meta.Session().commit()
 
        else:
 
            log.debug('changeset_cache for %s already up to date with %s',
 
                      self.repo_name, cs_cache['raw_id'])
 

	
 
    @property
 
    def tip(self):
kallithea/model/user.py
Show inline comments
 
@@ -18,49 +18,49 @@ kallithea.model.user
 
users model for Kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Apr 9, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import hashlib
 
import hmac
 
import logging
 
import time
 
import traceback
 

	
 
from sqlalchemy.exc import DatabaseError
 
from tg import config
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import webutils
 
from kallithea.lib.exceptions import DefaultUserException, UserOwnsReposException
 
from kallithea.lib.utils2 import generate_api_key, get_current_authuser
 
from kallithea.model import db, meta
 
from kallithea.model import db, forms, meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserModel(object):
 
    password_reset_token_lifetime = 86400 # 24 hours
 

	
 
    def get(self, user_id):
 
        user = db.User.query()
 
        return user.get(user_id)
 

	
 
    def get_user(self, user):
 
        return db.User.guess_instance(user)
 

	
 
    def create(self, form_data, cur_user=None):
 
        if not cur_user:
 
            cur_user = getattr(get_current_authuser(), 'username', None)
 

	
 
        from kallithea.lib.hooks import check_allowed_create_user, log_create_user
 
        _fd = form_data
 
        user_data = {
 
            'username': _fd['username'],
 
            'password': _fd['password'],
 
@@ -430,76 +430,74 @@ class UserModel(object):
 
        return new
 

	
 
    def revoke_perm(self, user, perm):
 
        """
 
        Revoke users global permissions
 

	
 
        :param user:
 
        :param perm:
 
        """
 
        user = db.User.guess_instance(user)
 
        perm = db.Permission.guess_instance(perm)
 

	
 
        db.UserToPerm.query().filter(
 
            db.UserToPerm.user == user,
 
            db.UserToPerm.permission == perm,
 
        ).delete()
 

	
 
    def add_extra_email(self, user, email):
 
        """
 
        Adds email address to UserEmailMap
 

	
 
        :param user:
 
        :param email:
 
        """
 
        from kallithea.model import forms
 
        form = forms.UserExtraEmailForm()()
 
        data = form.to_python(dict(email=email))
 
        user = db.User.guess_instance(user)
 

	
 
        obj = db.UserEmailMap()
 
        obj.user = user
 
        obj.email = data['email']
 
        meta.Session().add(obj)
 
        return obj
 

	
 
    def delete_extra_email(self, user, email_id):
 
        """
 
        Removes email address from UserEmailMap
 

	
 
        :param user:
 
        :param email_id:
 
        """
 
        user = db.User.guess_instance(user)
 
        obj = db.UserEmailMap.query().get(email_id)
 
        if obj is not None:
 
            meta.Session().delete(obj)
 

	
 
    def add_extra_ip(self, user, ip):
 
        """
 
        Adds IP address to UserIpMap
 

	
 
        :param user:
 
        :param ip:
 
        """
 
        from kallithea.model import forms
 
        form = forms.UserExtraIpForm()()
 
        data = form.to_python(dict(ip=ip))
 
        user = db.User.guess_instance(user)
 

	
 
        obj = db.UserIpMap()
 
        obj.user = user
 
        obj.ip_addr = data['ip']
 
        meta.Session().add(obj)
 
        return obj
 

	
 
    def delete_extra_ip(self, user, ip_id):
 
        """
 
        Removes IP address from UserIpMap
 

	
 
        :param user:
 
        :param ip_id:
 
        """
 
        user = db.User.guess_instance(user)
 
        obj = db.UserIpMap.query().get(ip_id)
 
        if obj:
 
            meta.Session().delete(obj)
kallithea/model/validators.py
Show inline comments
 
@@ -7,49 +7,49 @@
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Set of generic validators
 
"""
 

	
 
import logging
 
import os
 
import re
 
from collections import defaultdict
 

	
 
import formencode
 
import ipaddr
 
import sqlalchemy
 
from formencode.validators import CIDR, Bool, Email, FancyValidator, Int, IPAddress, NotEmpty, Number, OneOf, Regex, Set, String, StringBoolean, UnicodeString
 
from sqlalchemy import func
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel
 
from kallithea.lib import auth
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib.exceptions import InvalidCloneUriException, LdapImportError
 
from kallithea.lib.utils import is_valid_repo_uri
 
from kallithea.lib.utils2 import asbool, aslist, repo_name_slug
 
from kallithea.model import db
 

	
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
def UniqueListFromString():
 
    class _UniqueListFromString(formencode.FancyValidator):
 
        """
 
        Split value on ',' and make unique while preserving order
 
        """
 
        messages = dict(
 
            empty=_('Value cannot be an empty list'),
 
            missing_value=_('Value cannot be an empty list'),
 
        )
 

	
 
@@ -434,102 +434,102 @@ def ValidForkType(old_data=None):
 
                )
 
    return _validator
 

	
 

	
 
def CanWriteGroup(old_data=None):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create repository in this group"),
 
            'permission_denied_root': _("no permission to create repository "
 
                                        "in root location")
 
        }
 

	
 
        def _convert_to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = db.RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            # create repositories with write permission on group is set to true
 
            group_admin = HasRepoGroupPermissionLevel('admin')(gr_name,
 
            group_admin = auth.HasRepoGroupPermissionLevel('admin')(gr_name,
 
                                            'can write into group validator')
 
            group_write = HasRepoGroupPermissionLevel('write')(gr_name,
 
            group_write = auth.HasRepoGroupPermissionLevel('write')(gr_name,
 
                                            'can write into group validator')
 
            forbidden = not (group_admin or group_write)
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            can_create_repos = auth.HasPermissionAny('hg.admin', 'hg.create.repository')
 
            gid = (old_data['repo_group'].get('group_id')
 
                   if (old_data and 'repo_group' in old_data) else None)
 
            value_changed = gid != value
 
            new = not old_data
 
            # do check if we changed the value, there's a case that someone got
 
            # revoked write permissions to a repository, he still created, we
 
            # don't need to check permission if he didn't change the value of
 
            # groups in form box
 
            if value_changed or new:
 
                # parent group need to be existing
 
                if gr and forbidden:
 
                    msg = self.message('permission_denied', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 
                ## check if we can write to root location !
 
                elif gr is None and not can_create_repos():
 
                    msg = self.message('permission_denied_root', state)
 
                    raise formencode.Invalid(msg, value, state,
 
                        error_dict=dict(repo_type=msg)
 
                    )
 

	
 
    return _validator
 

	
 

	
 
def CanCreateGroup(can_create_in_root=False):
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'permission_denied': _("You don't have permissions "
 
                                   "to create a group in this location")
 
        }
 

	
 
        def to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = db.RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                # we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and not can_create_in_root
 
            forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            forbidden = not auth.HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = self.message('permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'repo_group':
 
        EMPTY_PERM = 'group.none'
 
    elif type_ == 'repo':
 
        EMPTY_PERM = 'repository.none'
 
    elif type_ == 'user_group':
 
        EMPTY_PERM = 'usergroup.none'
 

	
 
    class _validator(formencode.validators.FancyValidator):
 
        messages = {
 
            'perm_new_member_name':
 
                _('This username or user group name is not valid')
 
        }
 

	
 
        def to_python(self, value, state):
kallithea/tests/api/api_base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Tests for the JSON-RPC web api.
 
"""
 

	
 
import os
 
import random
 
import re
 
import string
 

	
 
import mock
 
import pytest
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model import db, meta
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.pull_request import PullRequestModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture, raise_exception
 

	
 

	
 
API_URL = '/_admin/api'
 
TEST_USER_GROUP = 'test_user_group'
 
TEST_REPO_GROUP = 'test_repo_group'
 

	
 
@@ -156,49 +157,48 @@ class _BaseTestApi(object):
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_missing_non_optional_param_args_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_repo')
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 

	
 
        expected = 'Missing non optional `repoid` arg in JSON DATA'
 
        self._compare_error(id_, expected, given=response.body)
 

	
 
    def test_api_args_is_null(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_is_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_different_args(self):
 
        import string
 
        expected = {
 
            'ascii_letters': string.ascii_letters,
 
            'ws': string.whitespace,
 
            'printables': string.printable
 
        }
 
        id_, params = _build_data(self.apikey, 'test', args=expected)
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 
        self._compare_ok(id_, expected, response.body)
 

	
 
    def test_api_get_users(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        response = api_call(self, params)
 
        ret_all = []
 
        _users = db.User.query().filter_by(is_default_user=False) \
 
            .order_by(db.User.username).all()
 
        for usr in _users:
 
            ret = usr.get_api_data()
 
            ret_all.append(jsonify(ret))
 
        expected = ret_all
 
        self._compare_ok(id_, expected, given=response.body)
 

	
 
    def test_api_get_user(self):
 
        id_, params = _build_data(self.apikey, 'get_user',
kallithea/tests/base.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import logging
 
import os
 
import re
 
import tempfile
 
import time
 

	
 
import pytest
 
from beaker.cache import cache_managers
 
from webtest import TestApp
 

	
 
from kallithea.lib.utils2 import ascii_str
 
from kallithea.model import db
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
skipif = pytest.mark.skipif
 
parametrize = pytest.mark.parametrize
 

	
 
# Hack: These module global values MUST be set to actual values before running any tests. This is currently done by conftest.py.
 
url = None
 
testapp = None
 

	
 
__all__ = [
 
    'skipif', 'parametrize', 'url', 'TestController',
 
    'ldap_lib_installed', 'pam_lib_installed', 'invalidate_all_caches',
 
    'TESTS_TMP_PATH', 'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO',
 
    'HG_FORK', 'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_ADMIN_PASS',
 
    'TEST_USER_ADMIN_EMAIL', 'TEST_USER_REGULAR_LOGIN', 'TEST_USER_REGULAR_PASS',
 
    'TEST_USER_REGULAR_EMAIL', 'TEST_USER_REGULAR2_LOGIN',
 
    'TEST_USER_REGULAR2_PASS', 'TEST_USER_REGULAR2_EMAIL', 'IP_ADDR',
 
    'TEST_HG_REPO', 'TEST_HG_REPO_CLONE', 'TEST_HG_REPO_PULL', 'TEST_GIT_REPO',
 
@@ -105,49 +106,48 @@ TEST_HG_REPO_PULL = os.path.join(TESTS_T
 
# skip ldap tests if LDAP lib is not installed
 
ldap_lib_installed = False
 
try:
 
    import ldap
 
    ldap.API_VERSION
 
    ldap_lib_installed = True
 
except ImportError:
 
    # means that python-ldap is not installed
 
    pass
 

	
 
try:
 
    import pam
 
    pam.PAM_TEXT_INFO
 
    pam_lib_installed = True
 
except ImportError:
 
    pam_lib_installed = False
 

	
 

	
 
def invalidate_all_caches():
 
    """Invalidate all beaker caches currently configured.
 
    Useful when manipulating IP permissions in a test and changes need to take
 
    effect immediately.
 
    Note: Any use of this function is probably a workaround - it should be
 
    replaced with a more specific cache invalidation in code or test."""
 
    from beaker.cache import cache_managers
 
    for cache in cache_managers.values():
 
        cache.clear()
 

	
 

	
 
class NullHandler(logging.Handler):
 
    def emit(self, record):
 
        pass
 

	
 

	
 
class TestController(object):
 
    """Pytest-style test controller"""
 

	
 
    # Note: pytest base classes cannot have an __init__ method
 

	
 
    @pytest.fixture(autouse=True)
 
    def app_fixture(self):
 
        h = NullHandler()
 
        logging.getLogger("kallithea").addHandler(h)
 
        self.app = TestApp(testapp)
 
        return self.app
 

	
 
    def log_user(self, username=TEST_USER_ADMIN_LOGIN,
 
                 password=TEST_USER_ADMIN_PASS):
 
        self._logged_username = username
kallithea/tests/conftest.py
Show inline comments
 
import logging
 
import os
 
import sys
 
import time
 

	
 
import formencode
 
import pkg_resources
 
import pytest
 
from paste.deploy import loadwsgi
 
from pytest_localserver.http import WSGIServer
 
from routes.util import URLGenerator
 
from tg.util.webtest import test_context
 

	
 
import kallithea.tests.base  # FIXME: needed for setting testapp instance!!!
 
from kallithea.controllers.root import RootController
 
from kallithea.lib import inifile
 
from kallithea.lib.utils import repo2db_mapper
 
from kallithea.model import db, meta
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests.base import TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS, TEST_USER_REGULAR_LOGIN, TESTS_TMP_PATH, invalidate_all_caches
 
from kallithea.tests.fixture import create_test_env, create_test_index
 

	
 

	
 
def pytest_configure():
 
    os.environ['TZ'] = 'UTC'
 
    if not kallithea.is_windows:
 
        time.tzset() # only available on Unix
 

	
 
    path = os.getcwd()
 
    sys.path.insert(0, path)
 
    pkg_resources.working_set.add_entry(path)
 

	
 
    # Disable INFO logging of test database creation, restore with NOTSET
 
    logging.disable(logging.INFO)
 

	
 
    ini_settings = {
 
        '[server:main]': {
 
            'port': '4999',
 
        },
 
        '[app:main]': {
 
            'ssh_enabled': 'true',
 
            # Mainly to safeguard against accidentally overwriting the real one:
 
            'ssh_authorized_keys': os.path.join(TESTS_TMP_PATH, 'authorized_keys'),
 
            #'ssh_locale': 'C',
 
            'app_instance_uuid': 'test',
 
@@ -48,49 +49,48 @@ def pytest_configure():
 
            #'i18n.lang': '',
 
        },
 
        '[handler_console]': {
 
            'formatter': 'color_formatter',
 
        },
 
        # The 'handler_console_sql' block is very similar to the one in
 
        # development.ini, but without the explicit 'level=DEBUG' setting:
 
        # it causes duplicate sqlalchemy debug logs, one through
 
        # handler_console_sql and another through another path.
 
        '[handler_console_sql]': {
 
            'formatter': 'color_formatter_sql',
 
        },
 
    }
 
    create_database = os.environ.get('TEST_DB')  # TODO: rename to 'CREATE_TEST_DB'
 
    if create_database:
 
        ini_settings['[app:main]']['sqlalchemy.url'] = create_database
 
    reuse_database = os.environ.get('REUSE_TEST_DB')
 
    if reuse_database:
 
        ini_settings['[app:main]']['sqlalchemy.url'] = reuse_database
 

	
 
    test_ini_file = os.path.join(TESTS_TMP_PATH, 'test.ini')
 
    inifile.create(test_ini_file, None, ini_settings)
 

	
 
    context = loadwsgi.loadcontext(loadwsgi.APP, 'config:%s' % test_ini_file)
 
    from kallithea.tests.fixture import create_test_env, create_test_index
 

	
 
    # set KALLITHEA_NO_TMP_PATH=1 to disable re-creating the database and test repos
 
    if not int(os.environ.get('KALLITHEA_NO_TMP_PATH', 0)):
 
        create_test_env(TESTS_TMP_PATH, context.config(), reuse_database=bool(reuse_database))
 

	
 
    # set KALLITHEA_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
 
    if not int(os.environ.get('KALLITHEA_WHOOSH_TEST_DISABLE', 0)):
 
        create_test_index(TESTS_TMP_PATH, context.config(), True)
 

	
 
    kallithea.tests.base.testapp = context.create()
 
    # do initial repo scan
 
    repo2db_mapper(ScmModel().repo_scan(TESTS_TMP_PATH))
 

	
 
    logging.disable(logging.NOTSET)
 

	
 
    kallithea.tests.base.url = URLGenerator(RootController().mapper, {'HTTP_HOST': 'example.com'})
 

	
 
    # set fixed language for form messages, regardless of environment settings
 
    formencode.api.set_stdtranslation(languages=[])
 

	
 

	
 
@pytest.fixture
 
def create_test_user():
 
    """Provide users that automatically disappear after test is over."""
kallithea/tests/fixture.py
Show inline comments
 
@@ -6,62 +6,65 @@
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Helpers for fixture generation
 
"""
 

	
 
import logging
 
import os
 
import shutil
 
import tarfile
 
from os.path import dirname
 

	
 
from tg import request
 
from tg.util.webtest import test_context
 

	
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
from kallithea.lib.pidlock import DaemonLock
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.model import db, meta
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.pull_request import CreatePullRequestAction  # , CreatePullRequestIterationAction, PullRequestModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests.base import (GIT_REPO, HG_REPO, IP_ADDR, TEST_USER_ADMIN_EMAIL, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS, TEST_USER_REGULAR2_EMAIL,
 
                                  TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR_EMAIL, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
 
                                  TESTS_TMP_PATH, invalidate_all_caches)
 
from kallithea.tests.vcs import setup_package
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
 

	
 

	
 
def raise_exception(*args, **kwargs):
 
    raise Exception('raise_exception raised exception')
 

	
 

	
 
class Fixture(object):
 

	
 
    def __init__(self):
 
        pass
 

	
 
    def anon_access(self, status):
 
        """
 
        Context manager for controlling anonymous access.
 
        Anon access will be set and committed, but restored again when exiting the block.
 

	
 
        Usage:
 

	
 
        fixture = Fixture()
 
@@ -383,59 +386,55 @@ def create_test_env(repos_test_path, con
 
    log.debug('making test vcs repositories')
 

	
 
    idx_path = config['index_dir']
 
    data_path = config['cache_dir']
 

	
 
    # clean index and data
 
    if idx_path and os.path.exists(idx_path):
 
        log.debug('remove %s', idx_path)
 
        shutil.rmtree(idx_path)
 

	
 
    if data_path and os.path.exists(data_path):
 
        log.debug('remove %s', data_path)
 
        shutil.rmtree(data_path)
 

	
 
    # CREATE DEFAULT TEST REPOS
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    # LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock
 

	
 
    index_location = os.path.join(config['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    l = DaemonLock(os.path.join(index_location, 'make_index.lock'))
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
        .run(full_index=full_index)
 
    l.release()
 

	
 

	
 
def failing_test_hook(ui, repo, **kwargs):
 
    ui.write(b"failing_test_hook failed\n")
 
    return 1
 

	
 

	
 
def exception_test_hook(ui, repo, **kwargs):
 
    raise Exception("exception_test_hook threw an exception")
 

	
 

	
 
def passing_test_hook(ui, repo, **kwargs):
 
    ui.write(b"passing_test_hook succeeded\n")
 
    return 0
kallithea/tests/functional/test_my_account.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from tg.util.webtest import test_context
 

	
 
from kallithea.lib import webutils
 
from kallithea.model import db, meta
 
from kallithea.model import db, meta, validators
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestMyAccountController(base.TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
    def teardown_class(cls):
 
        if db.User.get_by_username(cls.test_user_1):
 
            UserModel().delete(cls.test_user_1)
 
            meta.Session().commit()
 

	
 
    def test_my_account(self):
 
        self.log_user()
 
        response = self.app.get(base.url('my_account'))
 

	
 
        response.mustcontain('value="%s' % base.TEST_USER_ADMIN_LOGIN)
 

	
 
    def test_my_account_my_repos(self):
 
@@ -161,49 +161,48 @@ class TestMyAccountController(base.TestC
 
                                    password_confirmation='test122',
 
                                    firstname='NewName',
 
                                    lastname='NewLastname',
 
                                    email=new_email,
 
                                    _session_csrf_secret_token=self.session_csrf_secret_token())
 
                                )
 

	
 
        response.mustcontain('This email address is already in use')
 

	
 
    def test_my_account_update_err(self):
 
        self.log_user(base.TEST_USER_REGULAR2_LOGIN, base.TEST_USER_REGULAR2_PASS)
 

	
 
        new_email = 'newmail.pl'
 
        response = self.app.post(base.url('my_account'),
 
                                 params=dict(
 
                                            username=base.TEST_USER_ADMIN_LOGIN,
 
                                            new_password=base.TEST_USER_ADMIN_PASS,
 
                                            password_confirmation='test122',
 
                                            firstname='NewName',
 
                                            lastname='NewLastname',
 
                                            email=new_email,
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        from kallithea.model import validators
 
        with test_context(self.app):
 
            msg = validators.ValidUsername(edit=False, old_data={}) \
 
                    ._messages['username_exists']
 
        msg = webutils.html_escape(msg % {'username': base.TEST_USER_ADMIN_LOGIN})
 
        response.mustcontain(msg)
 

	
 
    def test_my_account_api_keys(self):
 
        usr = self.log_user(base.TEST_USER_REGULAR2_LOGIN, base.TEST_USER_REGULAR2_PASS)
 
        user = db.User.get(usr['user_id'])
 
        response = self.app.get(base.url('my_account_api_keys'))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
 

	
 
    @base.parametrize('desc,lifetime', [
 
        ('forever', -1),
 
        ('5mins', 60*5),
 
        ('30days', 60*60*24*30),
 
    ])
 
    def test_my_account_add_api_keys(self, desc, lifetime):
 
        usr = self.log_user(base.TEST_USER_REGULAR2_LOGIN, base.TEST_USER_REGULAR2_PASS)
 
        user = db.User.get(usr['user_id'])
 
        response = self.app.post(base.url('my_account_api_keys'),
 
                                 {'description': desc, 'lifetime': lifetime, '_session_csrf_secret_token': self.session_csrf_secret_token()})
 
        self.checkSessionFlash(response, 'API key successfully created')
kallithea/tests/other/test_libs.py
Show inline comments
 
@@ -6,55 +6,59 @@
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.tests.other.test_libs
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Package for testing various lib/helper functions in kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 9, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import hashlib
 
import re
 

	
 
import mock
 
import routes
 
from dateutil import relativedelta
 
from tg import request
 
from tg.util.webtest import test_context
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.lib import webutils
 
from kallithea.lib.utils2 import AttributeDict, safe_bytes
 
from kallithea.lib.utils2 import AttributeDict, get_clone_url, safe_bytes
 
from kallithea.model import db
 
from kallithea.tests import base
 

	
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://127.0.0.1:8080' % proto, ['%s://' % proto, '127.0.0.1', '8080'],
 
     '%s://127.0.0.1:8080' % proto),
 
    ('%s://example.com' % proto, ['%s://' % proto, 'example.com'],
 
     '%s://example.com' % proto),
 
    ('%s://user:pass@example.com:8080' % proto, ['%s://' % proto, 'example.com',
 
                                                '8080'],
 
     '%s://example.com:8080' % proto),
 
]
 

	
 
proto = 'https'
 
TEST_URLS += [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
@@ -117,190 +121,180 @@ class TestLibs(base.TestController):
 
        )
 

	
 
        expected = set([
 
            '2one_more22', 'first', 'lukaszb', 'one', 'one_more22', 'UPPER', 'cAmEL', 'john',
 
            'marian.user', 'marco-polo', 'marco_polo', 'world'])
 
        assert expected == set(extract_mentioned_usernames(sample))
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds= -1), '1 second ago'),
 
        (dict(seconds= -60 * 2), '2 minutes ago'),
 
        (dict(hours= -1), '1 hour ago'),
 
        (dict(hours= -24), '1 day ago'),
 
        (dict(hours= -24 * 5), '5 days ago'),
 
        (dict(months= -1), '1 month ago'),
 
        (dict(months= -1, days= -2), '1 month and 2 days ago'),
 
        (dict(months= -1, days= -20), '1 month and 19 days ago'),
 
        (dict(years= -1, months= -1), '1 year and 1 month ago'),
 
        (dict(years= -1, months= -10), '1 year and 10 months ago'),
 
        (dict(years= -2, months= -4), '2 years and 4 months ago'),
 
        (dict(years= -2, months= -11), '2 years and 11 months ago'),
 
        (dict(years= -3, months= -2), '3 years and 2 months ago'),
 
    ])
 
    def test_age(self, age_args, expected):
 
        from dateutil import relativedelta
 

	
 
        from kallithea.lib.utils2 import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds= -1), '1 second ago'),
 
        (dict(seconds= -60 * 2), '2 minutes ago'),
 
        (dict(hours= -1), '1 hour ago'),
 
        (dict(hours= -24), '1 day ago'),
 
        (dict(hours= -24 * 5), '5 days ago'),
 
        (dict(months= -1), '1 month ago'),
 
        (dict(months= -1, days= -2), '1 month ago'),
 
        (dict(months= -1, days= -20), '1 month ago'),
 
        (dict(years= -1, months= -1), '13 months ago'),
 
        (dict(years= -1, months= -10), '22 months ago'),
 
        (dict(years= -2, months= -4), '2 years ago'),
 
        (dict(years= -2, months= -11), '3 years ago'),
 
        (dict(years= -3, months= -2), '3 years ago'),
 
        (dict(years= -4, months= -8), '5 years ago'),
 
    ])
 
    def test_age_short(self, age_args, expected):
 
        from dateutil import relativedelta
 

	
 
        from kallithea.lib.utils2 import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds=1), 'in 1 second'),
 
        (dict(seconds=60 * 2), 'in 2 minutes'),
 
        (dict(hours=1), 'in 1 hour'),
 
        (dict(hours=24), 'in 1 day'),
 
        (dict(hours=24 * 5), 'in 5 days'),
 
        (dict(months=1), 'in 1 month'),
 
        (dict(months=1, days=1), 'in 1 month and 1 day'),
 
        (dict(years=1, months=1), 'in 1 year and 1 month')
 
    ])
 
    def test_age_in_future(self, age_args, expected):
 
        from dateutil import relativedelta
 

	
 
        from kallithea.lib.utils2 import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    def test_tag_extractor(self):
 
        sample = (
 
            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
 
            "[requires] [stale] [see<>=>] [see => http://example.com]"
 
            "[requires => url] [lang => python] [just a tag]"
 
            "[,d] [ => ULR ] [obsolete] [desc]]"
 
        )
 
        res = h.urlify_text(sample, stylize=True)
 
        assert '<div class="label label-meta" data-tag="tag">tag</div>' in res
 
        assert '<div class="label label-meta" data-tag="obsolete">obsolete</div>' in res
 
        assert '<div class="label label-meta" data-tag="stale">stale</div>' in res
 
        assert '<div class="label label-meta" data-tag="lang">python</div>' in res
 
        assert '<div class="label label-meta" data-tag="requires">requires =&gt; <a href="/url">url</a></div>' in res
 
        assert '<div class="label label-meta" data-tag="tag">tag</div>' in res
 

	
 
    def test_alternative_gravatar(self):
 
        _md5 = lambda s: hashlib.md5(safe_bytes(s)).hexdigest()
 

	
 
        # mock tg.tmpl_context
 
        def fake_tmpl_context(_url):
 
            _c = AttributeDict()
 
            _c.visual = AttributeDict()
 
            _c.visual.use_gravatar = True
 
            _c.visual.gravatar_url = _url
 

	
 
            return _c
 

	
 
        with mock.patch('kallithea.lib.webutils.url.current', lambda *a, **b: 'https://example.com'):
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('tg.tmpl_context', fake):
 
                    from kallithea.lib.webutils import url
 
                    assert url.current() == 'https://example.com'
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                    assert webutils.url.current() == 'https://example.com'
 
                    grav = h.gravatar_url(email_address='test@example.com', size=24)
 
                    assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('tg.tmpl_context', fake):
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                grav = h.gravatar_url(email_address='test@example.com', size=24)
 
                assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}')
 
            with mock.patch('tg.tmpl_context', fake):
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                em = 'test@example.com'
 
                grav = h.gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s' % (_md5(em))
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}/{size}')
 
            with mock.patch('tg.tmpl_context', fake):
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                em = 'test@example.com'
 
                grav = h.gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s/%s' % (_md5(em), 24)
 

	
 
            fake = fake_tmpl_context(_url='{scheme}://{netloc}/{md5email}/{size}')
 
            with mock.patch('tg.tmpl_context', fake):
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                em = 'test@example.com'
 
                grav = h.gravatar_url(email_address=em, size=24)
 
                assert grav == 'https://example.com/%s/%s' % (_md5(em), 24)
 

	
 
    @base.parametrize('clone_uri_tmpl,repo_name,username,prefix,expected', [
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', None, '', 'http://vps1:8000/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '', 'http://username@vps1:8000/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', None, '/prefix', 'http://vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'user', '/prefix', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '/prefix', 'http://username@vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'user', '/prefix/', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '/prefix/', 'http://username@vps1:8000/prefix/group/repo1'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', None, '', 'http://vps1:8000/_23'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://username@vps1:8000/_23'),
 
        ('http://{user}@{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://username@vps1:8000/_23'),
 
        ('http://{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://vps1:8000/_23'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', 'username', '', 'https://username@proxy1.example.com/group/repo1'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', None, '', 'https://proxy1.example.com/group/repo1'),
 
        ('https://proxy1.example.com/{user}/{repo}', 'group/repo1', 'username', '', 'https://proxy1.example.com/username/group/repo1'),
 
    ])
 
    def test_clone_url_generator(self, clone_uri_tmpl, repo_name, username, prefix, expected):
 
        from kallithea.lib.utils2 import get_clone_url
 
        clone_url = get_clone_url(clone_uri_tmpl=clone_uri_tmpl, prefix_url='http://vps1:8000' + prefix,
 
                                  repo_name=repo_name, repo_id=23, username=username)
 
        assert clone_url == expected
 

	
 
    def _quick_url(self, text, tmpl="""<a class="changeset_hash" href="%s">%s</a>""", url_=None):
 
        """
 
        Changes `some text url[foo]` => `some text <a href="/">foo</a>
 

	
 
        :param text:
 
        """
 
        import re
 

	
 
        # quickly change expected url[] into a link
 
        url_pattern = re.compile(r'(?:url\[)(.+?)(?:\])')
 

	
 
        def url_func(match_obj):
 
            _url = match_obj.groups()[0]
 
            return tmpl % (url_ or '/repo_name/changeset/%s' % _url, _url)
 
        return url_pattern.sub(url_func, text)
 

	
 
    @base.parametrize('sample,expected', [
 
      ("",
 
       ""),
 
      ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
 
       """git-svn-id: <a href="https://svn.apache.org/repos/asf/libcloud/trunk@1441655">https://svn.apache.org/repos/asf/libcloud/trunk@1441655</a> 13f79535-47bb-0310-9956-ffa450edef68"""),
 
      ("from rev 000000000000",
 
       """from rev url[000000000000]"""),
 
      ("from rev 000000000000123123 also rev 000000000000",
 
       """from rev url[000000000000123123] also rev url[000000000000]"""),
 
      ("this should-000 00",
 
       """this should-000 00"""),
 
      ("longtextffffffffff rev 123123123123",
 
       """longtextffffffffff rev url[123123123123]"""),
 
      ("rev ffffffffffffffffffffffffffffffffffffffffffffffffff",
 
       """rev ffffffffffffffffffffffffffffffffffffffffffffffffff"""),
 
      ("ffffffffffff some text traalaa",
 
@@ -532,61 +526,55 @@ class TestLibs(base.TestController):
 
      ("_ID", 'NAME'),
 
      ("_ID/", 'NAME/'),
 
      ("_ID/1/2", 'NAME/1/2'),
 
      ("_IDa", '_IDa'),
 
    ])
 
    def test_fix_repo_id_name(self, test, expected):
 
        repo = db.Repository.get_by_repo_name(base.HG_REPO)
 
        test = test.replace('ID', str(repo.repo_id))
 
        expected = expected.replace('NAME', repo.repo_name).replace('ID', str(repo.repo_id))
 
        from kallithea.lib.utils import fix_repo_id_name
 
        replaced = fix_repo_id_name(test)
 
        assert replaced == expected, 'url:%s, got:`%s` expected: `%s`' % (test, replaced, expected)
 

	
 
    @base.parametrize('canonical,test,expected', [
 
        ('http://www.example.org/', '/abc/xyz', 'http://www.example.org/abc/xyz'),
 
        ('http://www.example.org', '/abc/xyz', 'http://www.example.org/abc/xyz'),
 
        ('http://www.example.org', '/abc/xyz/', 'http://www.example.org/abc/xyz/'),
 
        ('http://www.example.org', 'abc/xyz/', 'http://www.example.org/abc/xyz/'),
 
        ('http://www.example.org', 'about', 'http://www.example.org/about-page'),
 
        ('http://www.example.org/repos/', 'abc/xyz/', 'http://www.example.org/repos/abc/xyz/'),
 
        ('http://www.example.org/kallithea/repos/', 'abc/xyz/', 'http://www.example.org/kallithea/repos/abc/xyz/'),
 
    ])
 
    def test_canonical_url(self, canonical, test, expected):
 
        # setup url(), used by canonical_url
 
        import routes
 
        from tg import request
 

	
 
        m = routes.Mapper()
 
        m.connect('about', '/about-page')
 
        url = routes.URLGenerator(m, {'HTTP_HOST': 'http_host.example.org'})
 

	
 
        config_mock = {
 
            'canonical_url': canonical,
 
        }
 

	
 
        with test_context(self.app):
 
            request.environ['routes.url'] = url
 
            with mock.patch('kallithea.CONFIG', config_mock):
 
                assert webutils.canonical_url(test) == expected
 

	
 
    @base.parametrize('canonical,expected', [
 
        ('http://www.example.org', 'www.example.org'),
 
        ('http://www.example.org/repos/', 'www.example.org'),
 
        ('http://www.example.org/kallithea/repos/', 'www.example.org'),
 
    ])
 
    def test_canonical_hostname(self, canonical, expected):
 
        import routes
 
        from tg import request
 

	
 
        # setup url(), used by canonical_hostname
 
        m = routes.Mapper()
 
        url = routes.URLGenerator(m, {'HTTP_HOST': 'http_host.example.org'})
 

	
 
        config_mock = {
 
            'canonical_url': canonical,
 
        }
 

	
 
        with test_context(self.app):
 
            request.environ['routes.url'] = url
 
            with mock.patch('kallithea.CONFIG', config_mock):
 
                assert webutils.canonical_hostname() == expected
kallithea/tests/performance/test_vcs.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.model import db
 
from kallithea.tests import base
 

	
 

	
 
@pytest.mark.skipif("'TEST_PERFORMANCE' not in os.environ", reason="skipping performance tests, set TEST_PERFORMANCE in environment if desired")
 
class TestVCSPerformance(base.TestController):
 

	
 
    def graphmod(self, repo):
 
        """ Simple test for running the graph_data function for profiling/testing performance. """
 
        from kallithea.lib.graphmod import graph_data
 
        dbr = db.Repository.get_by_repo_name(repo)
 
        scm_inst = dbr.scm_instance
 
        collection = scm_inst.get_changesets(start=0, end=None, branch_name=None)
 
        revs = [x.revision for x in collection]
 
        jsdata = graph_data(scm_inst, revs)
 

	
 
    def test_graphmod_hg(self, benchmark):
 
        benchmark(self.graphmod, base.HG_REPO)
 

	
 
    def test_graphmod_git(self, benchmark):
 
        benchmark(self.graphmod, base.GIT_REPO)
kallithea/tests/scripts/manual_test_concurrency.py
Show inline comments
 
@@ -20,48 +20,49 @@ Test suite for making push/pull operatio
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Dec 30, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 

	
 
"""
 

	
 
import logging
 
import os
 
import shutil
 
import sys
 
import tempfile
 
from os.path import dirname
 
from subprocess import PIPE, Popen
 

	
 
from paste.deploy import appconfig
 
from sqlalchemy import engine_from_config
 

	
 
import kallithea.config.application
 
from kallithea.lib.auth import get_crypt_password
 
from kallithea.model import db, meta
 
from kallithea.model.base import init_model
 
from kallithea.model.repo import RepoModel
 
from kallithea.tests.base import HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS
 

	
 

	
 
rel_path = dirname(dirname(dirname(dirname(os.path.abspath(__file__)))))
 
conf = appconfig('config:development.ini', relative_to=rel_path)
 
kallithea.config.application.make_app(conf.global_conf, **conf.local_conf)
 

	
 
USER = TEST_USER_ADMIN_LOGIN
 
PASS = TEST_USER_ADMIN_PASS
 
HOST = 'server.local'
 
METHOD = 'pull'
 
DEBUG = True
 
log = logging.getLogger(__name__)
 

	
 

	
 
class Command(object):
 

	
 
    def __init__(self, cwd):
 
        self.cwd = cwd
 

	
 
    def execute(self, cmd, *args):
 
        """Runs command on the system with given ``args``.
 
        """
 

	
 
@@ -93,49 +94,48 @@ def create_test_user(force=True):
 
        print('removing current user')
 
        for repo in sa.query(db.Repository).filter(db.Repository.user == user).all():
 
            sa.delete(repo)
 
        sa.delete(user)
 
        sa.commit()
 

	
 
    if user is None or force:
 
        print('creating new one')
 
        new_usr = db.User()
 
        new_usr.username = USER
 
        new_usr.password = get_crypt_password(PASS)
 
        new_usr.email = 'mail@example.com'
 
        new_usr.name = 'test'
 
        new_usr.lastname = 'lasttestname'
 
        new_usr.active = True
 
        new_usr.admin = True
 
        sa.add(new_usr)
 
        sa.commit()
 

	
 
    print('done')
 

	
 

	
 
def create_test_repo(force=True):
 
    print('creating test repo')
 
    from kallithea.model.repo import RepoModel
 
    sa = get_session()
 

	
 
    user = sa.query(db.User).filter(db.User.username == USER).scalar()
 
    if user is None:
 
        raise Exception('user not found')
 

	
 
    repo = sa.query(db.Repository).filter(db.Repository.repo_name == HG_REPO).scalar()
 

	
 
    if repo is None:
 
        print('repo not found creating')
 

	
 
        form_data = {'repo_name': HG_REPO,
 
                     'repo_type': 'hg',
 
                     'private': False,
 
                     'clone_uri': ''}
 
        rm = RepoModel()
 
        rm.base_path = '/home/hg'
 
        rm.create(form_data, user)
 

	
 
    print('done')
 

	
 

	
 
def set_anonymous_access(enable=True):
 
    sa = get_session()
kallithea/tests/vcs/test_repository.py
Show inline comments
 
import copy
 
import datetime
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs import TEST_USER_CONFIG_FILE
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class RepositoryBaseTest(_BackendTestMixin):
 
    recreate_repo_per_test = False
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        return super(RepositoryBaseTest, cls)._get_commits()[:1]
 

	
 
    def test_get_config_value(self):
 
        assert self.repo.get_config_value('universal', 'foo', TEST_USER_CONFIG_FILE) == 'bar'
 

	
 
    def test_get_config_value_defaults_to_None(self):
 
        assert self.repo.get_config_value('universal', 'nonexist', TEST_USER_CONFIG_FILE) == None
 

	
 
    def test_get_user_name(self):
 
        assert self.repo.get_user_name(TEST_USER_CONFIG_FILE) == 'Foo Bar'
 

	
 
    def test_get_user_email(self):
 
        assert self.repo.get_user_email(TEST_USER_CONFIG_FILE) == 'foo.bar@example.com'
 

	
 
    def test_repo_equality(self):
 
        assert self.repo == self.repo
 

	
 
    def test_repo_equality_broken_object(self):
 
        import copy
 
        _repo = copy.copy(self.repo)
 
        delattr(_repo, 'path')
 
        assert self.repo != _repo
 

	
 
    def test_repo_equality_other_object(self):
 
        class dummy(object):
 
            path = self.repo.path
 
        assert self.repo != dummy()
 

	
 

	
 
class TestGitRepositoryBase(RepositoryBaseTest):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgRepositoryBase(RepositoryBaseTest):
 
    backend_alias = 'hg'
 

	
 

	
 
class RepositoryGetDiffTest(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
kallithea/tests/vcs/test_workdirs.py
Show inline comments
 
import datetime
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs.exceptions import BranchDoesNotExistError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class WorkdirTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 20),
 
                'added': [
 
                    FileNode('foobar', content='Foobar'),
 
                    FileNode('foobar2', content='Foobar II'),
 
                    FileNode('foo/bar/baz', content='baz here!'),
 
                ],
 
            },
 
            {
 
                'message': 'Changes...',
 
                'author': 'Jane Doe <jane.doe@example.com>',
 
                'date': datetime.datetime(2010, 1, 1, 21),
 
                'added': [
 
@@ -46,46 +47,44 @@ class WorkdirTestCaseMixin(_BackendTestM
 
            message='New branch: foobar',
 
            author='joe',
 
            branch='foobar',
 
        )
 
        assert self.repo.workdir.get_branch() == self.default_branch
 

	
 
    def test_get_changeset(self):
 
        old_head = self.repo.get_changeset()
 
        self.imc.add(FileNode('docs/index.txt',
 
            content='Documentation\n'))
 
        head = self.imc.commit(
 
            message='New branch: foobar',
 
            author='joe',
 
            branch='foobar',
 
        )
 
        assert self.repo.workdir.get_branch() == self.default_branch
 
        self.repo.workdir.checkout_branch('foobar')
 
        assert self.repo.workdir.get_changeset() == head
 

	
 
        # Make sure that old head is still there after update to default branch
 
        self.repo.workdir.checkout_branch(self.default_branch)
 
        assert self.repo.workdir.get_changeset() == old_head
 

	
 
    def test_checkout_branch(self):
 
        from kallithea.lib.vcs.exceptions import BranchDoesNotExistError
 

	
 
        # first, 'foobranch' does not exist.
 
        with pytest.raises(BranchDoesNotExistError):
 
            self.repo.workdir.checkout_branch(branch='foobranch')
 
        # create new branch 'foobranch'.
 
        self.imc.add(FileNode('file1', content='blah'))
 
        self.imc.commit(message='asd', author='john', branch='foobranch')
 
        # go back to the default branch
 
        self.repo.workdir.checkout_branch()
 
        assert self.repo.workdir.get_branch() == self.backend_class.DEFAULT_BRANCH_NAME
 
        # checkout 'foobranch'
 
        self.repo.workdir.checkout_branch('foobranch')
 
        assert self.repo.workdir.get_branch() == 'foobranch'
 

	
 

	
 
class TestGitBranch(WorkdirTestCaseMixin):
 
    backend_alias = 'git'
 

	
 

	
 
class TestHgBranch(WorkdirTestCaseMixin):
 
    backend_alias = 'hg'
setup.py
Show inline comments
 
#!/usr/bin/env python3
 
# -*- coding: utf-8 -*-
 
import os
 
import platform
 
import re
 
import sys
 

	
 
import setuptools
 
# monkey patch setuptools to use distutils owner/group functionality
 
from setuptools.command import sdist
 

	
 

	
 
if sys.version_info < (3, 6):
 
    raise Exception('Kallithea requires Python 3.6 or later')
 

	
 

	
 
here = os.path.abspath(os.path.dirname(__file__))
 

	
 

	
 
def _get_meta_var(name, data, callback_handler=None):
 
    import re
 
    matches = re.compile(r'(?:%s)\s*=\s*(.*)' % name).search(data)
 
    if matches:
 
        s = eval(matches.groups()[0])
 
        if callable(callback_handler):
 
            return callback_handler(s)
 
        return s
 

	
 
_meta = open(os.path.join(here, 'kallithea', '__init__.py'), 'r')
 
_metadata = _meta.read()
 
_meta.close()
 

	
 
def callback(V):
 
    return '.'.join(map(str, V[:3])) + '.'.join(V[3:])
 
__version__ = _get_meta_var('VERSION', _metadata, callback)
 
__license__ = _get_meta_var('__license__', _metadata)
 
__author__ = _get_meta_var('__author__', _metadata)
 
__url__ = _get_meta_var('__url__', _metadata)
 
# defines current platform
 
__platform__ = platform.system()
 

	
 
is_windows = __platform__ in ['Windows']
 

	
 
requirements = [
 
    "alembic >= 1.0.10, < 1.5",
0 comments (0 inline, 0 general)