Changeset - f3fab7b124f2
kallithea/config/middleware/pygrack.py
Show inline comments
 
@@ -27,25 +27,25 @@ This file was forked by the Kallithea pr
 

	
 
import logging
 
import os
 
import socket
 
import traceback
 

	
 
from dulwich.server import update_server_info
 
from dulwich.web import GunzipFilter, LimitedInputFilter
 
from webob import Request, Response, exc
 

	
 
import kallithea
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs import get_repo, subprocessio
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class FileWrapper(object):
 

	
 
    def __init__(self, fd, content_length):
 
        self.fd = fd
 
        self.content_length = content_length
 
        self.remain = content_length
 

	
 
@@ -161,26 +161,24 @@ class GitRepository(object):
 
                cmd,
 
                inputstream=inputstream,
 
                env=gitenv,
 
                cwd=self.content_path,
 
            )
 
        except EnvironmentError as e:
 
            log.error(traceback.format_exc())
 
            raise exc.HTTPExpectationFailed()
 

	
 
        if git_command in ['git-receive-pack']:
 
            # updating refs manually after each push.
 
            # Needed for pre-1.7.0.4 git clients using regular HTTP mode.
 

	
 
            from kallithea.lib.vcs import get_repo
 
            repo = get_repo(self.content_path)
 
            if repo:
 
                update_server_info(repo._repo)
 

	
 
        resp = Response()
 
        resp.content_type = 'application/x-%s-result' % git_command
 
        resp.charset = None
 
        resp.app_iter = out
 
        return resp
 

	
 
    def __call__(self, environ, start_response):
 
        req = Request(environ)
kallithea/controllers/admin/settings.py
Show inline comments
 
@@ -26,24 +26,25 @@ Original author and date, and relevant c
 
"""
 

	
 
import logging
 
import traceback
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import config, request
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPFound
 

	
 
import kallithea
 
from kallithea.lib import webutils
 
from kallithea.lib.auth import HasPermissionAnyDecorator, LoginRequired
 
from kallithea.lib.base import BaseController, render
 
from kallithea.lib.celerylib import tasks
 
from kallithea.lib.utils import repo2db_mapper, set_app_settings
 
from kallithea.lib.utils2 import safe_str
 
from kallithea.lib.vcs import VCSError
 
from kallithea.lib.webutils import url
 
from kallithea.model import db, meta
 
from kallithea.model.forms import ApplicationSettingsForm, ApplicationUiSettingsForm, ApplicationVisualisationForm
 
from kallithea.model.notification import EmailNotificationModel
 
from kallithea.model.scm import ScmModel
 
@@ -300,25 +301,24 @@ class SettingsController(BaseController)
 

	
 
            recipients = [test_email] if test_email else None
 

	
 
            tasks.send_email(recipients, test_email_subj,
 
                             test_email_txt_body, test_email_html_body)
 

	
 
            webutils.flash(_('Send email task created'), category='success')
 
            raise HTTPFound(location=url('admin_settings_email'))
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        import kallithea
 
        c.ini = kallithea.CONFIG
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_hooks(self):
 
        c.active = 'hooks'
 
        if request.POST:
 
@@ -390,23 +390,22 @@ class SettingsController(BaseController)
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
 

	
 
    @HasPermissionAnyDecorator('hg.admin')
 
    def settings_system(self):
 
        c.active = 'system'
 

	
 
        defaults = db.Setting.get_app_settings()
 
        defaults.update(self._get_hg_ui_settings())
 

	
 
        import kallithea
 
        c.ini = kallithea.CONFIG
 
        server_info = db.Setting.get_server_info()
 
        for key, val in server_info.items():
 
            setattr(c, key, val)
 

	
 
        return htmlfill.render(
 
            render('admin/settings/settings.html'),
 
            defaults=defaults,
 
            encoding="UTF-8",
 
            force_defaults=False)
kallithea/controllers/login.py
Show inline comments
 
@@ -31,24 +31,25 @@ import re
 

	
 
import formencode
 
from formencode import htmlfill
 
from tg import request, session
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 
from webob.exc import HTTPBadRequest, HTTPFound
 

	
 
from kallithea.lib import webutils
 
from kallithea.lib.auth import AuthUser, HasPermissionAnyDecorator
 
from kallithea.lib.base import BaseController, log_in_user, render
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.recaptcha import submit
 
from kallithea.lib.webutils import url
 
from kallithea.model import db, meta
 
from kallithea.model.forms import LoginForm, PasswordResetConfirmationForm, PasswordResetRequestForm, RegisterForm
 
from kallithea.model.user import UserModel
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class LoginController(BaseController):
 

	
 
    def _validate_came_from(self, came_from,
 
@@ -123,25 +124,24 @@ class LoginController(BaseController):
 
        settings = db.Setting.get_app_settings()
 
        captcha_private_key = settings.get('captcha_private_key')
 
        c.captcha_active = bool(captcha_private_key)
 
        c.captcha_public_key = settings.get('captcha_public_key')
 

	
 
        if request.POST:
 
            register_form = RegisterForm()()
 
            try:
 
                form_result = register_form.to_python(dict(request.POST))
 
                form_result['active'] = c.auto_active
 

	
 
                if c.captcha_active:
 
                    from kallithea.lib.recaptcha import submit
 
                    response = submit(request.POST.get('g-recaptcha-response'),
 
                                      private_key=captcha_private_key,
 
                                      remoteip=request.ip_addr)
 
                    if not response.is_valid:
 
                        _value = form_result
 
                        _msg = _('Bad captcha')
 
                        error_dict = {'recaptcha_field': _msg}
 
                        raise formencode.Invalid(_msg, _value, None,
 
                                                 error_dict=error_dict)
 

	
 
                UserModel().create_registration(form_result)
 
                webutils.flash(_('You have successfully registered with %s') % (c.site_name or 'Kallithea'),
 
@@ -168,25 +168,24 @@ class LoginController(BaseController):
 

	
 
    def password_reset(self):
 
        settings = db.Setting.get_app_settings()
 
        captcha_private_key = settings.get('captcha_private_key')
 
        c.captcha_active = bool(captcha_private_key)
 
        c.captcha_public_key = settings.get('captcha_public_key')
 

	
 
        if request.POST:
 
            password_reset_form = PasswordResetRequestForm()()
 
            try:
 
                form_result = password_reset_form.to_python(dict(request.POST))
 
                if c.captcha_active:
 
                    from kallithea.lib.recaptcha import submit
 
                    response = submit(request.POST.get('g-recaptcha-response'),
 
                                      private_key=captcha_private_key,
 
                                      remoteip=request.ip_addr)
 
                    if not response.is_valid:
 
                        _value = form_result
 
                        _msg = _('Bad captcha')
 
                        error_dict = {'recaptcha_field': _msg}
 
                        raise formencode.Invalid(_msg, _value, None,
 
                                                 error_dict=error_dict)
 
                redirect_link = UserModel().send_reset_password_email(form_result)
 
                webutils.flash(_('A password reset confirmation code has been sent'),
 
                            category='success')
kallithea/controllers/routing.py
Show inline comments
 
@@ -13,24 +13,25 @@
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
Routes configuration
 

	
 
The more specific and detailed routes should be defined first so they
 
may take precedent over the more generic routes. For more information
 
refer to the routes manual at http://routes.groovie.org/docs/
 
"""
 

	
 
import routes
 

	
 
import kallithea
 
from kallithea.lib.utils import is_valid_repo, is_valid_repo_group
 
from kallithea.lib.utils2 import safe_str
 

	
 

	
 
class Mapper(routes.Mapper):
 
    """
 
    Subclassed Mapper with routematch patched to decode "unicode" str url to
 
    *real* unicode str before applying matches and invoking controller methods.
 
    """
 

	
 
    def routematch(self, url=None, environ=None):
 
        """
 
        routematch that also decode url from "fake bytes" to real unicode
 
@@ -41,26 +42,24 @@ class Mapper(routes.Mapper):
 
        # instead.
 
        url = safe_str(url.encode('latin1'))
 
        return super().routematch(url=url, environ=environ)
 

	
 

	
 
def make_map(config):
 
    """Create, configure and return the routes Mapper"""
 
    rmap = Mapper(directory=config['paths']['controllers'],
 
                  always_scan=config['debug'])
 
    rmap.minimization = False
 
    rmap.explicit = False
 

	
 
    from kallithea.lib.utils import is_valid_repo, is_valid_repo_group
 

	
 
    def check_repo(environ, match_dict):
 
        """
 
        Check for valid repository for proper 404 handling.
 
        Also, a bit of side effect modifying match_dict ...
 
        """
 
        if match_dict.get('f_path'):
 
            # fix for multiple initial slashes that causes errors
 
            match_dict['f_path'] = match_dict['f_path'].lstrip('/')
 

	
 
        return is_valid_repo(match_dict['repo_name'], config['base_path'])
 

	
 
    def check_group(environ, match_dict):
kallithea/lib/helpers.py
Show inline comments
 
@@ -18,24 +18,25 @@ Consists of functions to typically be us
 
available to Controllers. This module is available to both as 'h'.
 
"""
 
import hashlib
 
import json
 
import logging
 
import re
 
import textwrap
 
import urllib.parse
 

	
 
from beaker.cache import cache_region
 
from pygments import highlight as code_highlight
 
from pygments.formatters.html import HtmlFormatter
 
from tg import tmpl_context as c
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea.lib.annotate import annotate_highlight
 
#==============================================================================
 
# PERMS
 
#==============================================================================
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel, HasRepoPermissionLevel
 
from kallithea.lib.diffs import BIN_FILENODE, CHMOD_FILENODE, DEL_FILENODE, MOD_FILENODE, NEW_FILENODE, RENAMED_FILENODE
 
from kallithea.lib.markup_renderer import url_re
 
from kallithea.lib.pygmentsutils import get_custom_lexer
 
from kallithea.lib.utils2 import (MENTIONS_REGEX, AttributeDict, age, asbool, credentials_filter, fmt_date, link_to_ref, safe_bytes, safe_int, safe_str,
 
@@ -762,70 +763,66 @@ def action_parser(user_log, feed=False, 
 
    return [lambda: literal(action), action_params_func, action_parser_icon]
 

	
 

	
 
#==============================================================================
 
# GRAVATAR URL
 
#==============================================================================
 
def gravatar_div(email_address, cls='', size=30, **div_attributes):
 
    """Return an html literal with a span around a gravatar if they are enabled.
 
    Extra keyword parameters starting with 'div_' will get the prefix removed
 
    and '_' changed to '-' and be used as attributes on the div. The default
 
    class is 'gravatar'.
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 
    if 'div_class' not in div_attributes:
 
        div_attributes['div_class'] = "gravatar"
 
    attributes = []
 
    for k, v in sorted(div_attributes.items()):
 
        assert k.startswith('div_'), k
 
        attributes.append(' %s="%s"' % (k[4:].replace('_', '-'), escape(v)))
 
    return literal("""<span%s>%s</span>""" %
 
                   (''.join(attributes),
 
                    gravatar(email_address, cls=cls, size=size)))
 

	
 

	
 
def gravatar(email_address, cls='', size=30):
 
    """return html element of the gravatar
 

	
 
    This method will return an <img> with the resolution double the size (for
 
    retina screens) of the image. If the url returned from gravatar_url is
 
    empty then we fallback to using an icon.
 

	
 
    """
 
    from tg import tmpl_context as c
 
    if not c.visual.use_gravatar:
 
        return ''
 

	
 
    src = gravatar_url(email_address, size * 2)
 

	
 
    if src:
 
        # here it makes sense to use style="width: ..." (instead of, say, a
 
        # stylesheet) because we using this to generate a high-res (retina) size
 
        html = ('<i class="icon-gravatar {cls}"'
 
                ' style="font-size: {size}px;background-size: {size}px;background-image: url(\'{src}\')"'
 
                '></i>').format(cls=cls, size=size, src=src)
 

	
 
    else:
 
        # if src is empty then there was no gravatar, so we use a font icon
 
        html = ("""<i class="icon-user {cls}" style="font-size: {size}px;"></i>"""
 
            .format(cls=cls, size=size))
 

	
 
    return literal(html)
 

	
 

	
 
def gravatar_url(email_address, size=30, default=''):
 
    from tg import tmpl_context as c
 

	
 
    if not c.visual.use_gravatar:
 
        return ""
 

	
 
    _def = 'anonymous@kallithea-scm.org'  # default gravatar
 
    email_address = email_address or _def
 

	
 
    if email_address == _def:
 
        return default
 

	
 
    parsed_url = urllib.parse.urlparse(url.current(qualified=True))
 
    return (c.visual.gravatar_url or db.User.DEFAULT_GRAVATAR_URL) \
 
               .replace('{email}', email_address) \
kallithea/lib/hooks.py
Show inline comments
 
@@ -21,24 +21,25 @@ This file was forked by the Kallithea pr
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Aug 6, 2010
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import os
 
import sys
 
import time
 

	
 
import mercurial.scmutil
 
import paste.deploy
 

	
 
import kallithea
 
from kallithea.lib import webutils
 
from kallithea.lib.exceptions import UserCreationError
 
from kallithea.lib.utils import action_logger, make_ui
 
from kallithea.lib.utils2 import HookEnvironmentError, ascii_str, get_hook_environment, safe_bytes, safe_str
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.model import db
 

	
 

	
 
def _get_scm_size(alias, root_path):
 
    if not alias.startswith('.'):
 
@@ -292,26 +293,24 @@ def log_delete_user(user_dict, deleted_b
 
        callback(deleted_by=deleted_by, **user_dict)
 

	
 

	
 
def _hook_environment(repo_path):
 
    """
 
    Create a light-weight environment for stand-alone scripts and return an UI and the
 
    db repository.
 

	
 
    Git hooks are executed as subprocess of Git while Kallithea is waiting, and
 
    they thus need enough info to be able to create an app environment and
 
    connect to the database.
 
    """
 
    import paste.deploy
 

	
 
    import kallithea.config.application
 

	
 
    extras = get_hook_environment()
 

	
 
    path_to_ini_file = extras['config']
 
    config = paste.deploy.appconfig('config:' + path_to_ini_file)
 
    #logging.config.fileConfig(ini_file_path) # Note: we are in a different process - don't use configured logging
 
    kallithea.config.application.make_app(config.global_conf, **config.local_conf)
 

	
 
    # fix if it's not a bare repo
 
    if repo_path.endswith(os.sep + '.git'):
 
        repo_path = repo_path[:-5]
kallithea/lib/utils2.py
Show inline comments
 
@@ -30,24 +30,25 @@ Original author and date, and relevant c
 
import binascii
 
import datetime
 
import json
 
import os
 
import re
 
import time
 
import urllib.parse
 

	
 
import urlobject
 
from dateutil import relativedelta
 
from sqlalchemy.engine import url as sa_url
 
from sqlalchemy.exc import ArgumentError
 
from tg import tmpl_context
 
from tg.i18n import ugettext as _
 
from tg.i18n import ungettext
 
from tg.support.converters import asbool, aslist
 
from webhelpers2.text import collapse, remove_formatting, strip_tags
 

	
 
import kallithea
 
from kallithea.lib import webutils
 
from kallithea.lib.vcs.backends.base import BaseRepository, EmptyChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, safe_bytes, safe_str  # re-export
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 

	
 
@@ -482,25 +483,24 @@ def set_hook_environment(username, ip_ad
 
        'repository': repo_name,
 
        'scm': repo_alias, # used to pick hack in log_push_action_raw_ids
 
        'config': kallithea.CONFIG['__file__'], # used by git hook to read config
 
    }
 
    os.environ['KALLITHEA_EXTRAS'] = json.dumps(extras)
 

	
 

	
 
def get_current_authuser():
 
    """
 
    Gets kallithea user from threadlocal tmpl_context variable if it's
 
    defined, else returns None.
 
    """
 
    from tg import tmpl_context
 
    try:
 
        return getattr(tmpl_context, 'authuser', None)
 
    except TypeError:  # No object (name: context) has been registered for this thread
 
        return None
 

	
 

	
 
def urlreadable(s, _cleanstringsub=re.compile('[^-a-zA-Z0-9./]+').sub):
 
    return _cleanstringsub('_', s).rstrip('_')
 

	
 

	
 
def recursive_replace(str_, replace=' '):
 
    """
kallithea/lib/vcs/backends/git/inmemory.py
Show inline comments
 
import datetime
 
import posixpath
 
import stat
 
import time
 

	
 
from dulwich import objects
 

	
 
from kallithea.lib.vcs.backends.base import BaseInMemoryChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import ascii_str, safe_bytes
 

	
 
from . import repository
 

	
 

	
 
class GitInMemoryChangeset(BaseInMemoryChangeset):
 

	
 
    def commit(self, message, author, parents=None, branch=None, date=None,
 
               **kwargs):
 
        """
 
        Performs in-memory commit (doesn't check workdir in any way) and
 
        returns newly created ``Changeset``. Updates repository's
 
        ``revisions``.
 

	
 
        :param message: message of the commit
 
        :param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
 
        :param parents: single parent or sequence of parents from which commit
 
          would be derived
 
        :param date: ``datetime.datetime`` instance. Defaults to
 
          ``datetime.datetime.now()``.
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        self.check_integrity(parents)
 

	
 
        from .repository import GitRepository
 
        if branch is None:
 
            branch = GitRepository.DEFAULT_BRANCH_NAME
 
            branch = repository.GitRepository.DEFAULT_BRANCH_NAME
 

	
 
        repo = self.repository._repo
 
        object_store = repo.object_store
 

	
 
        ENCODING = b"UTF-8"  # TODO: should probably be kept in sync with safe_str/safe_bytes and vcs/conf/settings.py DEFAULT_ENCODINGS
 

	
 
        # Create tree and populates it with blobs
 
        commit_tree = self.parents[0] and repo[self.parents[0]._commit.tree] or \
 
            objects.Tree()
 
        for node in self.added + self.changed:
 
            # Compute subdirs if needed
 
            dirpath, nodename = posixpath.split(node.path)
kallithea/lib/vcs/backends/git/repository.py
Show inline comments
 
@@ -27,27 +27,25 @@ from dulwich.repo import NotGitRepositor
 
from dulwich.server import update_server_info
 

	
 
from kallithea.lib.vcs import subprocessio
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.conf import settings
 
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
 
                                          TagDoesNotExistError)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_str
 
from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
 

	
 
from .changeset import GitChangeset
 
from .inmemory import GitInMemoryChangeset
 
from .workdir import GitWorkdir
 
from . import changeset, inmemory, workdir
 

	
 

	
 
SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitRepository(BaseRepository):
 
    """
 
    Git repository backend.
 
    """
 
    DEFAULT_BRANCH_NAME = 'master'
 
@@ -458,27 +456,27 @@ class GitRepository(BaseRepository):
 
                if key.startswith(ref_key):
 
                    n = key[len(ref_key):]
 
                    if n not in [b'HEAD']:
 
                        heads[n] = val
 

	
 
        return heads if reverse else dict((y, x) for x, y in heads.items())
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``GitChangeset`` object representing commit from git repository
 
        at the given revision or head (most recent commit) if None given.
 
        """
 
        if isinstance(revision, GitChangeset):
 
        if isinstance(revision, changeset.GitChangeset):
 
            return revision
 
        return GitChangeset(repository=self, revision=self._get_revision(revision))
 
        return changeset.GitChangeset(repository=self, revision=self._get_revision(revision))
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
           end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``GitChangeset`` objects from start to end (both
 
        are inclusive), in ascending date order (unless ``reverse`` is set).
 

	
 
        :param start: changeset ID, as str; first returned changeset
 
        :param end: changeset ID, as str; last returned changeset
 
        :param start_date: if specified, changesets with commit date less than
 
          ``start_date`` would be filtered out from returned set
 
        :param end_date: if specified, changesets with commit date greater than
 
@@ -660,25 +658,25 @@ class GitRepository(BaseRepository):
 
        # starts)
 
        if rev1 == self.EMPTY_CHANGESET:
 
            parts = stdout.split(b'\ndiff ', 1)
 
            if len(parts) > 1:
 
                stdout = b'diff ' + parts[1]
 
        return stdout
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        """
 
        Returns ``GitInMemoryChangeset`` object for this repository.
 
        """
 
        return GitInMemoryChangeset(self)
 
        return inmemory.GitInMemoryChangeset(self)
 

	
 
    def clone(self, url, update_after_clone=True, bare=False):
 
        """
 
        Tries to clone changes from external location.
 

	
 
        :param update_after_clone: If set to ``False``, git won't checkout
 
          working directory
 
        :param bare: If set to ``True``, repository would be cloned into
 
          *bare* git repository (no working directory at all).
 
        """
 
        url = self._get_url(url)
 
        cmd = ['clone', '-q']
 
@@ -719,25 +717,25 @@ class GitRepository(BaseRepository):
 
            update_server_info(self._repo)
 
        except OSError as e:
 
            if e.errno not in [errno.ENOENT, errno.EROFS]:
 
                raise
 
            # Workaround for dulwich crashing on for example its own dulwich/tests/data/repos/simple_merge.git/info/refs.lock
 
            log.error('Ignoring %s running update-server-info: %s', type(e).__name__, e)
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return GitWorkdir(self)
 
        return workdir.GitWorkdir(self)
 

	
 
    def get_config_value(self, section, name, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
kallithea/lib/vcs/backends/hg/inmemory.py
Show inline comments
 
import datetime
 

	
 
import mercurial.context
 
import mercurial.node
 

	
 
from kallithea.lib.vcs.backends.base import BaseInMemoryChangeset
 
from kallithea.lib.vcs.exceptions import RepositoryError
 
from kallithea.lib.vcs.utils import ascii_str, safe_bytes, safe_str
 

	
 
from . import repository
 

	
 

	
 
class MercurialInMemoryChangeset(BaseInMemoryChangeset):
 

	
 
    def commit(self, message, author, parents=None, branch=None, date=None,
 
               **kwargs):
 
        """
 
        Performs in-memory commit (doesn't check workdir in any way) and
 
        returns newly created ``Changeset``. Updates repository's
 
        ``revisions``.
 

	
 
        :param message: message of the commit
 
        :param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
 
@@ -26,27 +28,26 @@ class MercurialInMemoryChangeset(BaseInM
 
        :param branch: branch name, as string. If none given, default backend's
 
          branch would be used.
 

	
 
        :raises ``CommitError``: if any error occurs while committing
 
        """
 
        self.check_integrity(parents)
 

	
 
        if not isinstance(message, str):
 
            raise RepositoryError('message must be a str - got %r' % type(message))
 
        if not isinstance(author, str):
 
            raise RepositoryError('author must be a str - got %r' % type(author))
 

	
 
        from .repository import MercurialRepository
 
        if branch is None:
 
            branch = MercurialRepository.DEFAULT_BRANCH_NAME
 
            branch = repository.MercurialRepository.DEFAULT_BRANCH_NAME
 
        kwargs[b'branch'] = safe_bytes(branch)
 

	
 
        def filectxfn(_repo, memctx, bytes_path):
 
            """
 
            Callback from Mercurial, returning ctx to commit for the given
 
            path.
 
            """
 
            path = safe_str(bytes_path)
 

	
 
            # check if this path is removed
 
            if path in (node.path for node in self.removed):
 
                return None
kallithea/lib/vcs/backends/hg/repository.py
Show inline comments
 
@@ -35,27 +35,25 @@ import mercurial.tags
 
import mercurial.ui
 
import mercurial.unionrepo
 
import mercurial.util
 

	
 
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
 
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
 
                                          TagDoesNotExistError, VCSError)
 
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, author_email, author_name, date_fromtimestamp, makedate, safe_bytes, safe_str
 
from kallithea.lib.vcs.utils.helpers import get_urllib_request_handlers
 
from kallithea.lib.vcs.utils.lazy import LazyProperty
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 
from .changeset import MercurialChangeset
 
from .inmemory import MercurialInMemoryChangeset
 
from .workdir import MercurialWorkdir
 
from . import changeset, inmemory, workdir
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialRepository(BaseRepository):
 
    """
 
    Mercurial repository backend
 
    """
 
    DEFAULT_BRANCH_NAME = 'default'
 
    scm = 'hg'
 

	
 
@@ -371,25 +369,25 @@ class MercurialRepository(BaseRepository
 
            return mercurial.localrepo.instance(self.baseui, safe_bytes(self.path), create=create)
 
        except (mercurial.error.Abort, mercurial.error.RepoError) as err:
 
            if create:
 
                msg = "Cannot create repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            else:
 
                msg = "Not valid repository at %s. Original error was %s" \
 
                    % (self.name, err)
 
            raise RepositoryError(msg)
 

	
 
    @LazyProperty
 
    def in_memory_changeset(self):
 
        return MercurialInMemoryChangeset(self)
 
        return inmemory.MercurialInMemoryChangeset(self)
 

	
 
    @LazyProperty
 
    def description(self):
 
        _desc = self._repo.ui.config(b'web', b'description', None, untrusted=True)
 
        return safe_str(_desc or b'unknown')
 

	
 
    @LazyProperty
 
    def contact(self):
 
        return safe_str(mercurial.hgweb.common.get_contact(self._repo.ui.config)
 
                            or b'Unknown')
 

	
 
    @LazyProperty
 
@@ -481,25 +479,25 @@ class MercurialRepository(BaseRepository
 
        Returns normalized url. If schema is not given, fall back to
 
        filesystem (``file:///``) schema.
 
        """
 
        if url != 'default' and '://' not in url:
 
            url = "file:" + urllib.request.pathname2url(url)
 
        return url
 

	
 
    def get_changeset(self, revision=None):
 
        """
 
        Returns ``MercurialChangeset`` object representing repository's
 
        changeset at the given ``revision``.
 
        """
 
        return MercurialChangeset(repository=self, revision=self._get_revision(revision))
 
        return changeset.MercurialChangeset(repository=self, revision=self._get_revision(revision))
 

	
 
    def get_changesets(self, start=None, end=None, start_date=None,
 
                       end_date=None, branch_name=None, reverse=False, max_revisions=None):
 
        """
 
        Returns iterator of ``MercurialChangeset`` objects from start to end
 
        (both are inclusive)
 

	
 
        :param start: None, str, int or mercurial lookup format
 
        :param end:  None, str, int or mercurial lookup format
 
        :param start_date:
 
        :param end_date:
 
        :param branch_name:
 
@@ -607,25 +605,25 @@ class MercurialRepository(BaseRepository
 
        other = mercurial.hg.peer(self._repo, {}, safe_bytes(self._get_url(url)))
 
        try:
 
            mercurial.exchange.pull(self._repo, other, heads=None, force=None)
 
        except mercurial.error.Abort as err:
 
            # Propagate error but with vcs's type
 
            raise RepositoryError(str(err))
 

	
 
    @LazyProperty
 
    def workdir(self):
 
        """
 
        Returns ``Workdir`` instance for this repository.
 
        """
 
        return MercurialWorkdir(self)
 
        return workdir.MercurialWorkdir(self)
 

	
 
    def get_config_value(self, section, name=None, config_file=None):
 
        """
 
        Returns configuration value for a given [``section``] and ``name``.
 

	
 
        :param section: Section we want to retrieve value from
 
        :param name: Name of configuration we want to retrieve
 
        :param config_file: A path to file which should be used to retrieve
 
          configuration from (might also be a list of file paths)
 
        """
 
        if config_file is None:
 
            config_file = []
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
@@ -5,24 +5,25 @@ Utilities aimed to help achieve mostly b
 
import datetime
 
import logging
 
import os
 
import re
 
import time
 
import urllib.request
 

	
 
import mercurial.url
 
from pygments import highlight
 
from pygments.formatters import TerminalFormatter
 
from pygments.lexers import ClassNotFound, guess_lexer_for_filename
 

	
 
from kallithea.lib.vcs import backends
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils import safe_str
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 

	
 
ALIASES = ['hg', 'git']
 

	
 

	
 
def get_scm(path, search_up=False, explicit_alias=None):
 
    """
 
    Returns one of alias from ``ALIASES`` (in order of precedence same as
 
    shortcuts given in ``ALIASES``) and top working dir path for the given
 
@@ -62,45 +63,44 @@ def get_scm(path, search_up=False, expli
 
    return found_scms[0]
 

	
 

	
 
def get_scms_for_path(path):
 
    """
 
    Returns all scm's found at the given path. If no scm is recognized
 
    - empty list is returned.
 

	
 
    :param path: path to directory which should be checked. May be callable.
 

	
 
    :raises VCSError: if given ``path`` is not a directory
 
    """
 
    from kallithea.lib.vcs.backends import get_backend
 
    if hasattr(path, '__call__'):
 
        path = path()
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %r is not a directory" % path)
 

	
 
    result = []
 
    for key in ALIASES:
 
        # find .hg / .git
 
        dirname = os.path.join(path, '.' + key)
 
        if os.path.isdir(dirname):
 
            result.append(key)
 
            continue
 
        # find rm__.hg / rm__.git too - left overs from old method for deleting
 
        dirname = os.path.join(path, 'rm__.' + key)
 
        if os.path.isdir(dirname):
 
            return result
 
        # We still need to check if it's not bare repository as
 
        # bare repos don't have working directories
 
        try:
 
            get_backend(key)(path)
 
            backends.get_backend(key)(path)
 
            result.append(key)
 
            continue
 
        except RepositoryError:
 
            # Wrong backend
 
            pass
 
        except VCSError:
 
            # No backend at all
 
            pass
 
    return result
 

	
 

	
 
def get_highlighted_code(name, code, type='terminal'):
kallithea/model/db.py
Show inline comments
 
@@ -28,36 +28,37 @@ Original author and date, and relevant c
 
import base64
 
import collections
 
import datetime
 
import functools
 
import hashlib
 
import logging
 
import os
 
import time
 
import traceback
 

	
 
import ipaddr
 
import sqlalchemy
 
import urlobject
 
from sqlalchemy import Boolean, Column, DateTime, Float, ForeignKey, Index, Integer, LargeBinary, String, Unicode, UnicodeText, UniqueConstraint
 
from sqlalchemy.ext.hybrid import hybrid_property
 
from sqlalchemy.orm import class_mapper, joinedload, relationship, validates
 
from tg.i18n import lazy_ugettext as _
 
from webob.exc import HTTPNotFound
 

	
 
import kallithea
 
from kallithea.lib import ext_json, ssh, webutils
 
from kallithea.lib.exceptions import DefaultUserException
 
from kallithea.lib.utils2 import asbool, ascii_bytes, aslist, get_changeset_safe, get_clone_url, remove_prefix, safe_bytes, safe_int, safe_str, urlreadable
 
from kallithea.lib.vcs import get_backend, get_repo
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
 
from kallithea.lib.vcs.utils import author_email, author_name
 
from kallithea.lib.vcs.utils.helpers import get_scm
 
from kallithea.model import meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
#==============================================================================
 
# BASE CLASSES
 
#==============================================================================
 

	
 
class BaseDbModel(object):
 
@@ -1162,25 +1163,24 @@ class Repository(meta.Base, BaseDbModel)
 
                data[f.field_key_prefixed] = f.field_value
 

	
 
        return data
 

	
 
    @property
 
    def last_db_change(self):
 
        return self.updated_on
 

	
 
    @property
 
    def clone_uri_hidden(self):
 
        clone_uri = self.clone_uri
 
        if clone_uri:
 
            import urlobject
 
            url_obj = urlobject.URLObject(self.clone_uri)
 
            if url_obj.password:
 
                clone_uri = url_obj.with_password('*****')
 
        return clone_uri
 

	
 
    def clone_url(self, clone_uri_tmpl, with_id=False, username=None):
 
        if '{repo}' not in clone_uri_tmpl and '_{repoid}' not in clone_uri_tmpl:
 
            log.error("Configured clone_uri_tmpl %r has no '{repo}' or '_{repoid}' and cannot toggle to use repo id URLs", clone_uri_tmpl)
 
        elif with_id:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('{repo}', '_{repoid}')
 
        else:
 
            clone_uri_tmpl = clone_uri_tmpl.replace('_{repoid}', '{repo}')
 
@@ -1217,25 +1217,24 @@ class Repository(meta.Base, BaseDbModel)
 
        """
 
        Update cache of last changeset for repository, keys should be::
 

	
 
            short_id
 
            raw_id
 
            revision
 
            message
 
            date
 
            author
 

	
 
        :param cs_cache:
 
        """
 
        from kallithea.lib.vcs.backends.base import BaseChangeset
 
        if cs_cache is None:
 
            cs_cache = EmptyChangeset()
 
            # use no-cache version here
 
            scm_repo = self.scm_instance_no_cache()
 
            if scm_repo:
 
                cs_cache = scm_repo.get_changeset()
 

	
 
        if isinstance(cs_cache, BaseChangeset):
 
            cs_cache = cs_cache.__json__()
 

	
 
        if (not self.changeset_cache or cs_cache['raw_id'] != self.changeset_cache['raw_id']):
 
            _default = datetime.datetime.fromtimestamp(0)
kallithea/model/user.py
Show inline comments
 
@@ -30,25 +30,25 @@ import hashlib
 
import hmac
 
import logging
 
import time
 
import traceback
 

	
 
from sqlalchemy.exc import DatabaseError
 
from tg import config
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib import webutils
 
from kallithea.lib.exceptions import DefaultUserException, UserOwnsReposException
 
from kallithea.lib.utils2 import generate_api_key, get_current_authuser
 
from kallithea.model import db, meta
 
from kallithea.model import db, forms, meta
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class UserModel(object):
 
    password_reset_token_lifetime = 86400 # 24 hours
 

	
 
    def get(self, user_id):
 
        user = db.User.query()
 
        return user.get(user_id)
 

	
 
@@ -442,25 +442,24 @@ class UserModel(object):
 
        db.UserToPerm.query().filter(
 
            db.UserToPerm.user == user,
 
            db.UserToPerm.permission == perm,
 
        ).delete()
 

	
 
    def add_extra_email(self, user, email):
 
        """
 
        Adds email address to UserEmailMap
 

	
 
        :param user:
 
        :param email:
 
        """
 
        from kallithea.model import forms
 
        form = forms.UserExtraEmailForm()()
 
        data = form.to_python(dict(email=email))
 
        user = db.User.guess_instance(user)
 

	
 
        obj = db.UserEmailMap()
 
        obj.user = user
 
        obj.email = data['email']
 
        meta.Session().add(obj)
 
        return obj
 

	
 
    def delete_extra_email(self, user, email_id):
 
        """
 
@@ -472,25 +471,24 @@ class UserModel(object):
 
        user = db.User.guess_instance(user)
 
        obj = db.UserEmailMap.query().get(email_id)
 
        if obj is not None:
 
            meta.Session().delete(obj)
 

	
 
    def add_extra_ip(self, user, ip):
 
        """
 
        Adds IP address to UserIpMap
 

	
 
        :param user:
 
        :param ip:
 
        """
 
        from kallithea.model import forms
 
        form = forms.UserExtraIpForm()()
 
        data = form.to_python(dict(ip=ip))
 
        user = db.User.guess_instance(user)
 

	
 
        obj = db.UserIpMap()
 
        obj.user = user
 
        obj.ip_addr = data['ip']
 
        meta.Session().add(obj)
 
        return obj
 

	
 
    def delete_extra_ip(self, user, ip_id):
 
        """
kallithea/model/validators.py
Show inline comments
 
@@ -19,25 +19,25 @@ import logging
 
import os
 
import re
 
from collections import defaultdict
 

	
 
import formencode
 
import ipaddr
 
import sqlalchemy
 
from formencode.validators import CIDR, Bool, Email, FancyValidator, Int, IPAddress, NotEmpty, Number, OneOf, Regex, Set, String, StringBoolean, UnicodeString
 
from sqlalchemy import func
 
from tg.i18n import ugettext as _
 

	
 
import kallithea
 
from kallithea.lib.auth import HasPermissionAny, HasRepoGroupPermissionLevel
 
from kallithea.lib import auth
 
from kallithea.lib.compat import OrderedSet
 
from kallithea.lib.exceptions import InvalidCloneUriException, LdapImportError
 
from kallithea.lib.utils import is_valid_repo_uri
 
from kallithea.lib.utils2 import asbool, aslist, repo_name_slug
 
from kallithea.model import db
 

	
 

	
 
# silence warnings and pylint
 
UnicodeString, OneOf, Int, Number, Regex, Email, Bool, StringBoolean, Set, \
 
    NotEmpty, IPAddress, CIDR, String, FancyValidator
 

	
 
log = logging.getLogger(__name__)
 
@@ -446,30 +446,30 @@ def CanWriteGroup(old_data=None):
 

	
 
        def _convert_to_python(self, value, state):
 
            # root location
 
            if value == -1:
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = db.RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            # create repositories with write permission on group is set to true
 
            group_admin = HasRepoGroupPermissionLevel('admin')(gr_name,
 
            group_admin = auth.HasRepoGroupPermissionLevel('admin')(gr_name,
 
                                            'can write into group validator')
 
            group_write = HasRepoGroupPermissionLevel('write')(gr_name,
 
            group_write = auth.HasRepoGroupPermissionLevel('write')(gr_name,
 
                                            'can write into group validator')
 
            forbidden = not (group_admin or group_write)
 
            can_create_repos = HasPermissionAny('hg.admin', 'hg.create.repository')
 
            can_create_repos = auth.HasPermissionAny('hg.admin', 'hg.create.repository')
 
            gid = (old_data['repo_group'].get('group_id')
 
                   if (old_data and 'repo_group' in old_data) else None)
 
            value_changed = gid != value
 
            new = not old_data
 
            # do check if we changed the value, there's a case that someone got
 
            # revoked write permissions to a repository, he still created, we
 
            # don't need to check permission if he didn't change the value of
 
            # groups in form box
 
            if value_changed or new:
 
                # parent group need to be existing
 
                if gr and forbidden:
 
                    msg = self.message('permission_denied', state)
 
@@ -499,25 +499,25 @@ def CanCreateGroup(can_create_in_root=Fa
 
                return None
 
            return value
 

	
 
        def _validate_python(self, value, state):
 
            gr = db.RepoGroup.get(value)
 
            gr_name = gr.group_name if gr is not None else None # None means ROOT location
 

	
 
            if can_create_in_root and gr is None:
 
                # we can create in root, we're fine no validations required
 
                return
 

	
 
            forbidden_in_root = gr is None and not can_create_in_root
 
            forbidden = not HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            forbidden = not auth.HasRepoGroupPermissionLevel('admin')(gr_name, 'can create group validator')
 
            if forbidden_in_root or forbidden:
 
                msg = self.message('permission_denied', state)
 
                raise formencode.Invalid(msg, value, state,
 
                    error_dict=dict(parent_group_id=msg)
 
                )
 

	
 
    return _validator
 

	
 

	
 
def ValidPerms(type_='repo'):
 
    if type_ == 'repo_group':
 
        EMPTY_PERM = 'group.none'
kallithea/tests/api/api_base.py
Show inline comments
 
@@ -10,24 +10,25 @@
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
Tests for the JSON-RPC web api.
 
"""
 

	
 
import os
 
import random
 
import re
 
import string
 

	
 
import mock
 
import pytest
 

	
 
from kallithea.lib import ext_json
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.utils2 import ascii_bytes
 
from kallithea.model import db, meta
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.pull_request import PullRequestModel
 
from kallithea.model.repo import RepoModel
 
@@ -168,25 +169,24 @@ class _BaseTestApi(object):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": null')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_is_bad(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
 
        params = params.replace('"args": {}', '"args": 1')
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 

	
 
    def test_api_args_different_args(self):
 
        import string
 
        expected = {
 
            'ascii_letters': string.ascii_letters,
 
            'ws': string.whitespace,
 
            'printables': string.printable
 
        }
 
        id_, params = _build_data(self.apikey, 'test', args=expected)
 
        response = api_call(self, params)
 
        assert response.status == '200 OK'
 
        self._compare_ok(id_, expected, response.body)
 

	
 
    def test_api_get_users(self):
 
        id_, params = _build_data(self.apikey, 'get_users', )
kallithea/tests/base.py
Show inline comments
 
@@ -11,24 +11,25 @@
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import datetime
 
import logging
 
import os
 
import re
 
import tempfile
 
import time
 

	
 
import pytest
 
from beaker.cache import cache_managers
 
from webtest import TestApp
 

	
 
from kallithea.lib.utils2 import ascii_str
 
from kallithea.model import db
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
skipif = pytest.mark.skipif
 
parametrize = pytest.mark.parametrize
 

	
 
# Hack: These module global values MUST be set to actual values before running any tests. This is currently done by conftest.py.
 
@@ -117,25 +118,24 @@ try:
 
    pam.PAM_TEXT_INFO
 
    pam_lib_installed = True
 
except ImportError:
 
    pam_lib_installed = False
 

	
 

	
 
def invalidate_all_caches():
 
    """Invalidate all beaker caches currently configured.
 
    Useful when manipulating IP permissions in a test and changes need to take
 
    effect immediately.
 
    Note: Any use of this function is probably a workaround - it should be
 
    replaced with a more specific cache invalidation in code or test."""
 
    from beaker.cache import cache_managers
 
    for cache in cache_managers.values():
 
        cache.clear()
 

	
 

	
 
class NullHandler(logging.Handler):
 
    def emit(self, record):
 
        pass
 

	
 

	
 
class TestController(object):
 
    """Pytest-style test controller"""
 

	
kallithea/tests/conftest.py
Show inline comments
 
@@ -10,24 +10,25 @@ from paste.deploy import loadwsgi
 
from pytest_localserver.http import WSGIServer
 
from routes.util import URLGenerator
 
from tg.util.webtest import test_context
 

	
 
import kallithea.tests.base  # FIXME: needed for setting testapp instance!!!
 
from kallithea.controllers.root import RootController
 
from kallithea.lib import inifile
 
from kallithea.lib.utils import repo2db_mapper
 
from kallithea.model import db, meta
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.user import UserModel
 
from kallithea.tests.base import TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS, TEST_USER_REGULAR_LOGIN, TESTS_TMP_PATH, invalidate_all_caches
 
from kallithea.tests.fixture import create_test_env, create_test_index
 

	
 

	
 
def pytest_configure():
 
    os.environ['TZ'] = 'UTC'
 
    if not kallithea.is_windows:
 
        time.tzset() # only available on Unix
 

	
 
    path = os.getcwd()
 
    sys.path.insert(0, path)
 
    pkg_resources.working_set.add_entry(path)
 

	
 
    # Disable INFO logging of test database creation, restore with NOTSET
 
@@ -60,25 +61,24 @@ def pytest_configure():
 
    }
 
    create_database = os.environ.get('TEST_DB')  # TODO: rename to 'CREATE_TEST_DB'
 
    if create_database:
 
        ini_settings['[app:main]']['sqlalchemy.url'] = create_database
 
    reuse_database = os.environ.get('REUSE_TEST_DB')
 
    if reuse_database:
 
        ini_settings['[app:main]']['sqlalchemy.url'] = reuse_database
 

	
 
    test_ini_file = os.path.join(TESTS_TMP_PATH, 'test.ini')
 
    inifile.create(test_ini_file, None, ini_settings)
 

	
 
    context = loadwsgi.loadcontext(loadwsgi.APP, 'config:%s' % test_ini_file)
 
    from kallithea.tests.fixture import create_test_env, create_test_index
 

	
 
    # set KALLITHEA_NO_TMP_PATH=1 to disable re-creating the database and test repos
 
    if not int(os.environ.get('KALLITHEA_NO_TMP_PATH', 0)):
 
        create_test_env(TESTS_TMP_PATH, context.config(), reuse_database=bool(reuse_database))
 

	
 
    # set KALLITHEA_WHOOSH_TEST_DISABLE=1 to disable whoosh index during tests
 
    if not int(os.environ.get('KALLITHEA_WHOOSH_TEST_DISABLE', 0)):
 
        create_test_index(TESTS_TMP_PATH, context.config(), True)
 

	
 
    kallithea.tests.base.testapp = context.create()
 
    # do initial repo scan
 
    repo2db_mapper(ScmModel().repo_scan(TESTS_TMP_PATH))
kallithea/tests/fixture.py
Show inline comments
 
@@ -18,38 +18,41 @@ Helpers for fixture generation
 

	
 
import logging
 
import os
 
import shutil
 
import tarfile
 
from os.path import dirname
 

	
 
from tg import request
 
from tg.util.webtest import test_context
 

	
 
from kallithea.lib.auth import AuthUser
 
from kallithea.lib.db_manage import DbManage
 
from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
from kallithea.lib.pidlock import DaemonLock
 
from kallithea.lib.vcs.backends.base import EmptyChangeset
 
from kallithea.model import db, meta
 
from kallithea.model.changeset_status import ChangesetStatusModel
 
from kallithea.model.comment import ChangesetCommentsModel
 
from kallithea.model.gist import GistModel
 
from kallithea.model.pull_request import CreatePullRequestAction  # , CreatePullRequestIterationAction, PullRequestModel
 
from kallithea.model.repo import RepoModel
 
from kallithea.model.repo_group import RepoGroupModel
 
from kallithea.model.scm import ScmModel
 
from kallithea.model.user import UserModel
 
from kallithea.model.user_group import UserGroupModel
 
from kallithea.tests.base import (GIT_REPO, HG_REPO, IP_ADDR, TEST_USER_ADMIN_EMAIL, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS, TEST_USER_REGULAR2_EMAIL,
 
                                  TEST_USER_REGULAR2_LOGIN, TEST_USER_REGULAR2_PASS, TEST_USER_REGULAR_EMAIL, TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS,
 
                                  TESTS_TMP_PATH, invalidate_all_caches)
 
from kallithea.tests.vcs import setup_package
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 
FIXTURES = os.path.join(dirname(dirname(os.path.abspath(__file__))), 'tests', 'fixtures')
 

	
 

	
 
def raise_exception(*args, **kwargs):
 
    raise Exception('raise_exception raised exception')
 

	
 

	
 
class Fixture(object):
 
@@ -395,36 +398,32 @@ def create_test_env(repos_test_path, con
 
        shutil.rmtree(data_path)
 

	
 
    # CREATE DEFAULT TEST REPOS
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_hg.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, HG_REPO))
 
    tar.close()
 

	
 
    tar = tarfile.open(os.path.join(FIXTURES, 'vcs_test_git.tar.gz'))
 
    tar.extractall(os.path.join(TESTS_TMP_PATH, GIT_REPO))
 
    tar.close()
 

	
 
    # LOAD VCS test stuff
 
    from kallithea.tests.vcs import setup_package
 
    setup_package()
 

	
 

	
 
def create_test_index(repo_location, config, full_index):
 
    """
 
    Makes default test index
 
    """
 

	
 
    from kallithea.lib.indexers.daemon import WhooshIndexingDaemon
 
    from kallithea.lib.pidlock import DaemonLock
 

	
 
    index_location = os.path.join(config['index_dir'])
 
    if not os.path.exists(index_location):
 
        os.makedirs(index_location)
 

	
 
    l = DaemonLock(os.path.join(index_location, 'make_index.lock'))
 
    WhooshIndexingDaemon(index_location=index_location,
 
                         repo_location=repo_location) \
 
        .run(full_index=full_index)
 
    l.release()
 

	
 

	
 
def failing_test_hook(ui, repo, **kwargs):
kallithea/tests/functional/test_my_account.py
Show inline comments
 
# -*- coding: utf-8 -*-
 

	
 
from tg.util.webtest import test_context
 

	
 
from kallithea.lib import webutils
 
from kallithea.model import db, meta
 
from kallithea.model import db, meta, validators
 
from kallithea.model.user import UserModel
 
from kallithea.tests import base
 
from kallithea.tests.fixture import Fixture
 

	
 

	
 
fixture = Fixture()
 

	
 

	
 
class TestMyAccountController(base.TestController):
 
    test_user_1 = 'testme'
 

	
 
    @classmethod
 
@@ -173,25 +173,24 @@ class TestMyAccountController(base.TestC
 
        new_email = 'newmail.pl'
 
        response = self.app.post(base.url('my_account'),
 
                                 params=dict(
 
                                            username=base.TEST_USER_ADMIN_LOGIN,
 
                                            new_password=base.TEST_USER_ADMIN_PASS,
 
                                            password_confirmation='test122',
 
                                            firstname='NewName',
 
                                            lastname='NewLastname',
 
                                            email=new_email,
 
                                            _session_csrf_secret_token=self.session_csrf_secret_token()))
 

	
 
        response.mustcontain('An email address must contain a single @')
 
        from kallithea.model import validators
 
        with test_context(self.app):
 
            msg = validators.ValidUsername(edit=False, old_data={}) \
 
                    ._messages['username_exists']
 
        msg = webutils.html_escape(msg % {'username': base.TEST_USER_ADMIN_LOGIN})
 
        response.mustcontain(msg)
 

	
 
    def test_my_account_api_keys(self):
 
        usr = self.log_user(base.TEST_USER_REGULAR2_LOGIN, base.TEST_USER_REGULAR2_PASS)
 
        user = db.User.get(usr['user_id'])
 
        response = self.app.get(base.url('my_account_api_keys'))
 
        response.mustcontain(user.api_key)
 
        response.mustcontain('Expires: Never')
kallithea/tests/other/test_libs.py
Show inline comments
 
@@ -18,31 +18,35 @@ kallithea.tests.other.test_libs
 
Package for testing various lib/helper functions in kallithea
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Jun 9, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 
import datetime
 
import hashlib
 
import re
 

	
 
import mock
 
import routes
 
from dateutil import relativedelta
 
from tg import request
 
from tg.util.webtest import test_context
 

	
 
import kallithea.lib.helpers as h
 
from kallithea.lib import webutils
 
from kallithea.lib.utils2 import AttributeDict, safe_bytes
 
from kallithea.lib.utils2 import AttributeDict, get_clone_url, safe_bytes
 
from kallithea.model import db
 
from kallithea.tests import base
 

	
 

	
 
proto = 'http'
 
TEST_URLS = [
 
    ('%s://127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
    ('%s://username:pass@127.0.0.1' % proto, ['%s://' % proto, '127.0.0.1'],
 
     '%s://127.0.0.1' % proto),
 
@@ -129,72 +133,66 @@ class TestLibs(base.TestController):
 
        (dict(hours= -24), '1 day ago'),
 
        (dict(hours= -24 * 5), '5 days ago'),
 
        (dict(months= -1), '1 month ago'),
 
        (dict(months= -1, days= -2), '1 month and 2 days ago'),
 
        (dict(months= -1, days= -20), '1 month and 19 days ago'),
 
        (dict(years= -1, months= -1), '1 year and 1 month ago'),
 
        (dict(years= -1, months= -10), '1 year and 10 months ago'),
 
        (dict(years= -2, months= -4), '2 years and 4 months ago'),
 
        (dict(years= -2, months= -11), '2 years and 11 months ago'),
 
        (dict(years= -3, months= -2), '3 years and 2 months ago'),
 
    ])
 
    def test_age(self, age_args, expected):
 
        from dateutil import relativedelta
 

	
 
        from kallithea.lib.utils2 import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds= -1), '1 second ago'),
 
        (dict(seconds= -60 * 2), '2 minutes ago'),
 
        (dict(hours= -1), '1 hour ago'),
 
        (dict(hours= -24), '1 day ago'),
 
        (dict(hours= -24 * 5), '5 days ago'),
 
        (dict(months= -1), '1 month ago'),
 
        (dict(months= -1, days= -2), '1 month ago'),
 
        (dict(months= -1, days= -20), '1 month ago'),
 
        (dict(years= -1, months= -1), '13 months ago'),
 
        (dict(years= -1, months= -10), '22 months ago'),
 
        (dict(years= -2, months= -4), '2 years ago'),
 
        (dict(years= -2, months= -11), '3 years ago'),
 
        (dict(years= -3, months= -2), '3 years ago'),
 
        (dict(years= -4, months= -8), '5 years ago'),
 
    ])
 
    def test_age_short(self, age_args, expected):
 
        from dateutil import relativedelta
 

	
 
        from kallithea.lib.utils2 import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), show_short_version=True, now=n) == expected
 

	
 
    @base.parametrize('age_args,expected', [
 
        (dict(), 'just now'),
 
        (dict(seconds=1), 'in 1 second'),
 
        (dict(seconds=60 * 2), 'in 2 minutes'),
 
        (dict(hours=1), 'in 1 hour'),
 
        (dict(hours=24), 'in 1 day'),
 
        (dict(hours=24 * 5), 'in 5 days'),
 
        (dict(months=1), 'in 1 month'),
 
        (dict(months=1, days=1), 'in 1 month and 1 day'),
 
        (dict(years=1, months=1), 'in 1 year and 1 month')
 
    ])
 
    def test_age_in_future(self, age_args, expected):
 
        from dateutil import relativedelta
 

	
 
        from kallithea.lib.utils2 import age
 
        with test_context(self.app):
 
            n = datetime.datetime(year=2012, month=5, day=17)
 
            delt = lambda *args, **kwargs: relativedelta.relativedelta(*args, **kwargs)
 
            assert age(n + delt(**age_args), now=n) == expected
 

	
 
    def test_tag_extractor(self):
 
        sample = (
 
            "hello pta[tag] gog [[]] [[] sda ero[or]d [me =>>< sa]"
 
            "[requires] [stale] [see<>=>] [see => http://example.com]"
 
            "[requires => url] [lang => python] [just a tag]"
 
            "[,d] [ => ULR ] [obsolete] [desc]]"
 
@@ -212,83 +210,79 @@ class TestLibs(base.TestController):
 

	
 
        # mock tg.tmpl_context
 
        def fake_tmpl_context(_url):
 
            _c = AttributeDict()
 
            _c.visual = AttributeDict()
 
            _c.visual.use_gravatar = True
 
            _c.visual.gravatar_url = _url
 

	
 
            return _c
 

	
 
        with mock.patch('kallithea.lib.webutils.url.current', lambda *a, **b: 'https://example.com'):
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('tg.tmpl_context', fake):
 
                    from kallithea.lib.webutils import url
 
                    assert url.current() == 'https://example.com'
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                    assert webutils.url.current() == 'https://example.com'
 
                    grav = h.gravatar_url(email_address='test@example.com', size=24)
 
                    assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{email}')
 
            with mock.patch('tg.tmpl_context', fake):
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                grav = h.gravatar_url(email_address='test@example.com', size=24)
 
                assert grav == 'http://example.com/test@example.com'
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}')
 
            with mock.patch('tg.tmpl_context', fake):
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                em = 'test@example.com'
 
                grav = h.gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s' % (_md5(em))
 

	
 
            fake = fake_tmpl_context(_url='http://example.com/{md5email}/{size}')
 
            with mock.patch('tg.tmpl_context', fake):
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                em = 'test@example.com'
 
                grav = h.gravatar_url(email_address=em, size=24)
 
                assert grav == 'http://example.com/%s/%s' % (_md5(em), 24)
 

	
 
            fake = fake_tmpl_context(_url='{scheme}://{netloc}/{md5email}/{size}')
 
            with mock.patch('tg.tmpl_context', fake):
 
            with mock.patch('kallithea.lib.helpers.c', fake):
 
                em = 'test@example.com'
 
                grav = h.gravatar_url(email_address=em, size=24)
 
                assert grav == 'https://example.com/%s/%s' % (_md5(em), 24)
 

	
 
    @base.parametrize('clone_uri_tmpl,repo_name,username,prefix,expected', [
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', None, '', 'http://vps1:8000/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '', 'http://username@vps1:8000/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', None, '/prefix', 'http://vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'user', '/prefix', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '/prefix', 'http://username@vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'user', '/prefix/', 'http://user@vps1:8000/prefix/group/repo1'),
 
        (db.Repository.DEFAULT_CLONE_URI, 'group/repo1', 'username', '/prefix/', 'http://username@vps1:8000/prefix/group/repo1'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', None, '', 'http://vps1:8000/_23'),
 
        ('{scheme}://{user}@{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://username@vps1:8000/_23'),
 
        ('http://{user}@{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://username@vps1:8000/_23'),
 
        ('http://{netloc}/_{repoid}', 'group/repo1', 'username', '', 'http://vps1:8000/_23'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', 'username', '', 'https://username@proxy1.example.com/group/repo1'),
 
        ('https://{user}@proxy1.example.com/{repo}', 'group/repo1', None, '', 'https://proxy1.example.com/group/repo1'),
 
        ('https://proxy1.example.com/{user}/{repo}', 'group/repo1', 'username', '', 'https://proxy1.example.com/username/group/repo1'),
 
    ])
 
    def test_clone_url_generator(self, clone_uri_tmpl, repo_name, username, prefix, expected):
 
        from kallithea.lib.utils2 import get_clone_url
 
        clone_url = get_clone_url(clone_uri_tmpl=clone_uri_tmpl, prefix_url='http://vps1:8000' + prefix,
 
                                  repo_name=repo_name, repo_id=23, username=username)
 
        assert clone_url == expected
 

	
 
    def _quick_url(self, text, tmpl="""<a class="changeset_hash" href="%s">%s</a>""", url_=None):
 
        """
 
        Changes `some text url[foo]` => `some text <a href="/">foo</a>
 

	
 
        :param text:
 
        """
 
        import re
 

	
 
        # quickly change expected url[] into a link
 
        url_pattern = re.compile(r'(?:url\[)(.+?)(?:\])')
 

	
 
        def url_func(match_obj):
 
            _url = match_obj.groups()[0]
 
            return tmpl % (url_ or '/repo_name/changeset/%s' % _url, _url)
 
        return url_pattern.sub(url_func, text)
 

	
 
    @base.parametrize('sample,expected', [
 
      ("",
 
       ""),
 
      ("git-svn-id: https://svn.apache.org/repos/asf/libcloud/trunk@1441655 13f79535-47bb-0310-9956-ffa450edef68",
 
@@ -544,49 +538,43 @@ class TestLibs(base.TestController):
 

	
 
    @base.parametrize('canonical,test,expected', [
 
        ('http://www.example.org/', '/abc/xyz', 'http://www.example.org/abc/xyz'),
 
        ('http://www.example.org', '/abc/xyz', 'http://www.example.org/abc/xyz'),
 
        ('http://www.example.org', '/abc/xyz/', 'http://www.example.org/abc/xyz/'),
 
        ('http://www.example.org', 'abc/xyz/', 'http://www.example.org/abc/xyz/'),
 
        ('http://www.example.org', 'about', 'http://www.example.org/about-page'),
 
        ('http://www.example.org/repos/', 'abc/xyz/', 'http://www.example.org/repos/abc/xyz/'),
 
        ('http://www.example.org/kallithea/repos/', 'abc/xyz/', 'http://www.example.org/kallithea/repos/abc/xyz/'),
 
    ])
 
    def test_canonical_url(self, canonical, test, expected):
 
        # setup url(), used by canonical_url
 
        import routes
 
        from tg import request
 

	
 
        m = routes.Mapper()
 
        m.connect('about', '/about-page')
 
        url = routes.URLGenerator(m, {'HTTP_HOST': 'http_host.example.org'})
 

	
 
        config_mock = {
 
            'canonical_url': canonical,
 
        }
 

	
 
        with test_context(self.app):
 
            request.environ['routes.url'] = url
 
            with mock.patch('kallithea.CONFIG', config_mock):
 
                assert webutils.canonical_url(test) == expected
 

	
 
    @base.parametrize('canonical,expected', [
 
        ('http://www.example.org', 'www.example.org'),
 
        ('http://www.example.org/repos/', 'www.example.org'),
 
        ('http://www.example.org/kallithea/repos/', 'www.example.org'),
 
    ])
 
    def test_canonical_hostname(self, canonical, expected):
 
        import routes
 
        from tg import request
 

	
 
        # setup url(), used by canonical_hostname
 
        m = routes.Mapper()
 
        url = routes.URLGenerator(m, {'HTTP_HOST': 'http_host.example.org'})
 

	
 
        config_mock = {
 
            'canonical_url': canonical,
 
        }
 

	
 
        with test_context(self.app):
 
            request.environ['routes.url'] = url
 
            with mock.patch('kallithea.CONFIG', config_mock):
 
                assert webutils.canonical_hostname() == expected
kallithea/tests/performance/test_vcs.py
Show inline comments
 
@@ -5,33 +5,33 @@
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import pytest
 

	
 
from kallithea.lib.graphmod import graph_data
 
from kallithea.model import db
 
from kallithea.tests import base
 

	
 

	
 
@pytest.mark.skipif("'TEST_PERFORMANCE' not in os.environ", reason="skipping performance tests, set TEST_PERFORMANCE in environment if desired")
 
class TestVCSPerformance(base.TestController):
 

	
 
    def graphmod(self, repo):
 
        """ Simple test for running the graph_data function for profiling/testing performance. """
 
        from kallithea.lib.graphmod import graph_data
 
        dbr = db.Repository.get_by_repo_name(repo)
 
        scm_inst = dbr.scm_instance
 
        collection = scm_inst.get_changesets(start=0, end=None, branch_name=None)
 
        revs = [x.revision for x in collection]
 
        jsdata = graph_data(scm_inst, revs)
 

	
 
    def test_graphmod_hg(self, benchmark):
 
        benchmark(self.graphmod, base.HG_REPO)
 

	
 
    def test_graphmod_git(self, benchmark):
 
        benchmark(self.graphmod, base.GIT_REPO)
kallithea/tests/scripts/manual_test_concurrency.py
Show inline comments
 
@@ -32,24 +32,25 @@ import shutil
 
import sys
 
import tempfile
 
from os.path import dirname
 
from subprocess import PIPE, Popen
 

	
 
from paste.deploy import appconfig
 
from sqlalchemy import engine_from_config
 

	
 
import kallithea.config.application
 
from kallithea.lib.auth import get_crypt_password
 
from kallithea.model import db, meta
 
from kallithea.model.base import init_model
 
from kallithea.model.repo import RepoModel
 
from kallithea.tests.base import HG_REPO, TEST_USER_ADMIN_LOGIN, TEST_USER_ADMIN_PASS
 

	
 

	
 
rel_path = dirname(dirname(dirname(dirname(os.path.abspath(__file__)))))
 
conf = appconfig('config:development.ini', relative_to=rel_path)
 
kallithea.config.application.make_app(conf.global_conf, **conf.local_conf)
 

	
 
USER = TEST_USER_ADMIN_LOGIN
 
PASS = TEST_USER_ADMIN_PASS
 
HOST = 'server.local'
 
METHOD = 'pull'
 
DEBUG = True
 
@@ -105,25 +106,24 @@ def create_test_user(force=True):
 
        new_usr.name = 'test'
 
        new_usr.lastname = 'lasttestname'
 
        new_usr.active = True
 
        new_usr.admin = True
 
        sa.add(new_usr)
 
        sa.commit()
 

	
 
    print('done')
 

	
 

	
 
def create_test_repo(force=True):
 
    print('creating test repo')
 
    from kallithea.model.repo import RepoModel
 
    sa = get_session()
 

	
 
    user = sa.query(db.User).filter(db.User.username == USER).scalar()
 
    if user is None:
 
        raise Exception('user not found')
 

	
 
    repo = sa.query(db.Repository).filter(db.Repository.repo_name == HG_REPO).scalar()
 

	
 
    if repo is None:
 
        print('repo not found creating')
 

	
 
        form_data = {'repo_name': HG_REPO,
kallithea/tests/vcs/test_repository.py
Show inline comments
 
import copy
 
import datetime
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs import TEST_USER_CONFIG_FILE
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class RepositoryBaseTest(_BackendTestMixin):
 
    recreate_repo_per_test = False
 
@@ -22,25 +23,24 @@ class RepositoryBaseTest(_BackendTestMix
 
        assert self.repo.get_config_value('universal', 'nonexist', TEST_USER_CONFIG_FILE) == None
 

	
 
    def test_get_user_name(self):
 
        assert self.repo.get_user_name(TEST_USER_CONFIG_FILE) == 'Foo Bar'
 

	
 
    def test_get_user_email(self):
 
        assert self.repo.get_user_email(TEST_USER_CONFIG_FILE) == 'foo.bar@example.com'
 

	
 
    def test_repo_equality(self):
 
        assert self.repo == self.repo
 

	
 
    def test_repo_equality_broken_object(self):
 
        import copy
 
        _repo = copy.copy(self.repo)
 
        delattr(_repo, 'path')
 
        assert self.repo != _repo
 

	
 
    def test_repo_equality_other_object(self):
 
        class dummy(object):
 
            path = self.repo.path
 
        assert self.repo != dummy()
 

	
 

	
 
class TestGitRepositoryBase(RepositoryBaseTest):
 
    backend_alias = 'git'
kallithea/tests/vcs/test_workdirs.py
Show inline comments
 
import datetime
 

	
 
import pytest
 

	
 
from kallithea.lib.vcs.exceptions import BranchDoesNotExistError
 
from kallithea.lib.vcs.nodes import FileNode
 
from kallithea.tests.vcs.base import _BackendTestMixin
 

	
 

	
 
class WorkdirTestCaseMixin(_BackendTestMixin):
 

	
 
    @classmethod
 
    def _get_commits(cls):
 
        commits = [
 
            {
 
                'message': 'Initial commit',
 
                'author': 'Joe Doe <joe.doe@example.com>',
 
@@ -58,26 +59,24 @@ class WorkdirTestCaseMixin(_BackendTestM
 
            author='joe',
 
            branch='foobar',
 
        )
 
        assert self.repo.workdir.get_branch() == self.default_branch
 
        self.repo.workdir.checkout_branch('foobar')
 
        assert self.repo.workdir.get_changeset() == head
 

	
 
        # Make sure that old head is still there after update to default branch
 
        self.repo.workdir.checkout_branch(self.default_branch)
 
        assert self.repo.workdir.get_changeset() == old_head
 

	
 
    def test_checkout_branch(self):
 
        from kallithea.lib.vcs.exceptions import BranchDoesNotExistError
 

	
 
        # first, 'foobranch' does not exist.
 
        with pytest.raises(BranchDoesNotExistError):
 
            self.repo.workdir.checkout_branch(branch='foobranch')
 
        # create new branch 'foobranch'.
 
        self.imc.add(FileNode('file1', content='blah'))
 
        self.imc.commit(message='asd', author='john', branch='foobranch')
 
        # go back to the default branch
 
        self.repo.workdir.checkout_branch()
 
        assert self.repo.workdir.get_branch() == self.backend_class.DEFAULT_BRANCH_NAME
 
        # checkout 'foobranch'
 
        self.repo.workdir.checkout_branch('foobranch')
 
        assert self.repo.workdir.get_branch() == 'foobranch'
setup.py
Show inline comments
 
#!/usr/bin/env python3
 
# -*- coding: utf-8 -*-
 
import os
 
import platform
 
import re
 
import sys
 

	
 
import setuptools
 
# monkey patch setuptools to use distutils owner/group functionality
 
from setuptools.command import sdist
 

	
 

	
 
if sys.version_info < (3, 6):
 
    raise Exception('Kallithea requires Python 3.6 or later')
 

	
 

	
 
here = os.path.abspath(os.path.dirname(__file__))
 

	
 

	
 
def _get_meta_var(name, data, callback_handler=None):
 
    import re
 
    matches = re.compile(r'(?:%s)\s*=\s*(.*)' % name).search(data)
 
    if matches:
 
        s = eval(matches.groups()[0])
 
        if callable(callback_handler):
 
            return callback_handler(s)
 
        return s
 

	
 
_meta = open(os.path.join(here, 'kallithea', '__init__.py'), 'r')
 
_metadata = _meta.read()
 
_meta.close()
 

	
 
def callback(V):
0 comments (0 inline, 0 general)