@@ -156,97 +156,97 @@ class BasicAuth(paste.auth.basic.AuthBas
return paste.httpexceptions.HTTPUnauthorized(headers=head)
def authenticate(self, environ):
authorization = paste.httpheaders.AUTHORIZATION(environ)
if not authorization:
return self.build_authentication()
(authmeth, auth) = authorization.split(' ', 1)
if 'basic' != authmeth.lower():
auth = auth.strip().decode('base64')
_parts = auth.split(':', 1)
if len(_parts) == 2:
username, password = _parts
if self.authfunc(username, password, environ) is not None:
return username
__call__ = authenticate
class BaseVCSController(object):
def __init__(self, application, config):
self.application = application
self.config = config
# base path of repo locations
self.basepath = self.config['base_path']
# authenticate this VCS request using the authentication modules
self.authenticate = BasicAuth('', auth_modules.authenticate,
config.get('auth_ret_code'))
self.ip_addr = '0.0.0.0'
def _handle_request(self, environ, start_response):
raise NotImplementedError()
def _get_by_id(self, repo_name):
"""
Gets a special pattern _<ID> from clone url and tries to replace it
with a repository_name for support of _<ID> permanent URLs
:param repo_name:
data = repo_name.split('/')
if len(data) >= 2:
from kallithea.lib.utils import get_repo_by_id
by_id_match = get_repo_by_id(repo_name)
if by_id_match:
data[1] = by_id_match
data[1] = safe_str(by_id_match)
return '/'.join(data)
def _invalidate_cache(self, repo_name):
Sets cache for this repository for invalidation on next access
:param repo_name: full repo name, also a cache key
ScmModel().mark_for_invalidation(repo_name)
def _check_permission(self, action, user, repo_name, ip_addr=None):
Checks permissions using action (push/pull) user and repository
name
:param action: push or pull action
:param user: `User` instance
:param repo_name: repository name
# check IP
ip_allowed = AuthUser.check_ip_allowed(user, ip_addr)
if ip_allowed:
log.info('Access for IP:%s allowed', ip_addr)
else:
return False
if action == 'push':
if not HasPermissionAnyMiddleware('repository.write',
'repository.admin')(user,
repo_name):
#any other action need at least read permission
if not HasPermissionAnyMiddleware('repository.read',
'repository.write',
return True
def _get_ip_addr(self, environ):
return _get_ip_addr(environ)
def _check_ssl(self, environ):
@@ -39,97 +39,97 @@ from kallithea.model.db import User
from kallithea.lib.utils2 import safe_str, safe_unicode, fix_PATH, get_server_url,\
_set_extras
from kallithea.lib.base import BaseVCSController, WSGIResultCloseCallback
from kallithea.lib.utils import make_ui, is_valid_repo, ui_sections
from kallithea.lib.vcs.utils.hgcompat import RepoError, hgweb_mod
from kallithea.lib.exceptions import HTTPLockedRC
from kallithea.lib import auth_modules
log = logging.getLogger(__name__)
def is_mercurial(environ):
Returns True if request's target is mercurial server - header
``HTTP_ACCEPT`` of such request would start with ``application/mercurial``.
http_accept = environ.get('HTTP_ACCEPT')
path_info = environ['PATH_INFO']
if http_accept and http_accept.startswith('application/mercurial'):
ishg_path = True
ishg_path = False
log.debug('pathinfo: %s detected as Mercurial %s',
path_info, ishg_path
)
return ishg_path
class SimpleHg(BaseVCSController):
if not is_mercurial(environ):
return self.application(environ, start_response)
if not self._check_ssl(environ):
return HTTPNotAcceptable('SSL REQUIRED !')(environ, start_response)
ip_addr = self._get_ip_addr(environ)
username = None
# skip passing error to error controller
environ['pylons.status_code_redirect'] = True
#======================================================================
# EXTRACT REPOSITORY NAME FROM ENV
try:
str_repo_name = environ['REPO_NAME'] = self.__get_repository(environ)
assert isinstance(str_repo_name, str)
assert isinstance(str_repo_name, str), str_repo_name
repo_name = safe_unicode(str_repo_name)
assert safe_str(repo_name) == str_repo_name, (str_repo_name, repo_name)
log.debug('Extracted repo name is %s', repo_name)
except Exception as e:
log.error('error extracting repo_name: %r', e)
return HTTPInternalServerError()(environ, start_response)
# quick check if that dir exists...
if not is_valid_repo(repo_name, self.basepath, 'hg'):
return HTTPNotFound()(environ, start_response)
# GET ACTION PULL or PUSH
action = self.__get_action(environ)
# CHECK ANONYMOUS PERMISSION
if action in ['pull', 'push']:
anonymous_user = self.__get_user('default')
username = anonymous_user.username
if anonymous_user.active:
# ONLY check permissions if the user is activated
anonymous_perm = self._check_permission(action, anonymous_user,
repo_name, ip_addr)
anonymous_perm = False
if not anonymous_user.active or not anonymous_perm:
if not anonymous_user.active:
log.debug('Anonymous access is disabled, running '
'authentication')
if not anonymous_perm:
log.debug('Not enough credentials to access this '
'repository as anonymous user')
#==============================================================
# DEFAULT PERM FAILED OR ANONYMOUS ACCESS IS DISABLED SO WE
# NEED TO AUTHENTICATE AND ASK FOR AUTH USER PERMISSIONS
# try to auth based on environ, container auth methods
log.debug('Running PRE-AUTH for container based authentication')
pre_auth = auth_modules.authenticate('', '', environ)
if pre_auth is not None and pre_auth.get('username'):
# -*- coding: utf-8 -*-
import os
import mock
import urllib
import pytest
from kallithea.lib import vcs
from kallithea.model.db import Repository, RepoGroup, UserRepoToPerm, User,\
Permission
from kallithea.model.user import UserModel
from kallithea.tests import *
from kallithea.model.repo_group import RepoGroupModel
from kallithea.model.repo import RepoModel
from kallithea.model.meta import Session
from kallithea.tests.fixture import Fixture, error_function
fixture = Fixture()
def _get_permission_for_user(user, repo):
perm = UserRepoToPerm.query()\
.filter(UserRepoToPerm.repository ==
Repository.get_by_repo_name(repo))\
.filter(UserRepoToPerm.user == User.get_by_username(user))\
.all()
return perm
class _BaseTest(object):
Write all tests here
REPO = None
REPO_TYPE = None
NEW_REPO = None
OTHER_TYPE_REPO = None
OTHER_TYPE = None
@classmethod
def setup_class(cls):
pass
def teardown_class(cls):
def test_index(self):
self.log_user()
response = self.app.get(url('repos'))
def test_create(self):
repo_name = self.NEW_REPO
description = 'description for newly created repo'
@@ -609,48 +611,53 @@ class _BaseTest(object):
response = self.app.post(url('repos'),
fixture._get_repo_create_params(repo_private=False,
repo_name=repo_name,
repo_type=self.REPO_TYPE,
repo_description=description,
_authentication_token=self.authentication_token()))
response.mustcontain('<span class="error-message">Invalid value</span>')
RepoModel().delete(repo_name)
Session().commit()
@mock.patch.object(RepoModel, '_create_filesystem_repo', error_function)
def test_create_repo_when_filesystem_op_fails(self):
self.checkSessionFlash(response,
'Error creating repository %s' % repo_name)
# repo must not be in db
repo = Repository.get_by_repo_name(repo_name)
self.assertEqual(repo, None)
# repo must not be in filesystem !
self.assertFalse(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)))
class TestAdminReposControllerGIT(TestController, _BaseTest):
REPO = GIT_REPO
REPO_TYPE = 'git'
NEW_REPO = NEW_GIT_REPO
OTHER_TYPE_REPO = HG_REPO
OTHER_TYPE = 'hg'
class TestAdminReposControllerHG(TestController, _BaseTest):
REPO = HG_REPO
REPO_TYPE = 'hg'
NEW_REPO = NEW_HG_REPO
OTHER_TYPE_REPO = GIT_REPO
OTHER_TYPE = 'git'
def test_permanent_url_protocol_access(self):
with pytest.raises(Exception) as e:
self.app.get(url('summary_home', repo_name='_1'), extra_environ={'HTTP_ACCEPT': 'application/mercurial'})
assert 'Unable to detect pull/push action' in str(e)
Status change: