diff --git a/kallithea/config/routing.py b/kallithea/config/routing.py
--- a/kallithea/config/routing.py
+++ b/kallithea/config/routing.py
@@ -499,6 +499,7 @@ def make_map(config):
)
#LOGIN/LOGOUT/REGISTER/SIGN IN
+ rmap.connect('authentication_token', '%s/authentication_token' % ADMIN_PREFIX, controller='login', action='authentication_token')
rmap.connect('login_home', '%s/login' % ADMIN_PREFIX, controller='login')
rmap.connect('logout_home', '%s/logout' % ADMIN_PREFIX, controller='login',
action='logout')
diff --git a/kallithea/controllers/login.py b/kallithea/controllers/login.py
--- a/kallithea/controllers/login.py
+++ b/kallithea/controllers/login.py
@@ -270,3 +270,11 @@ class LoginController(BaseController):
session.delete()
log.info('Logging out and deleting session for user')
redirect(url('home'))
+
+ def authentication_token(self):
+ """Return the CSRF protection token for the session - just like it
+ could have been screen scrabed from a page with a form.
+ Only intended for testing but might also be useful for other kinds
+ of automation.
+ """
+ return h.authentication_token()
diff --git a/kallithea/tests/__init__.py b/kallithea/tests/__init__.py
--- a/kallithea/tests/__init__.py
+++ b/kallithea/tests/__init__.py
@@ -213,6 +213,9 @@ class TestController(BaseTestCase):
def _get_logged_user(self):
return User.get_by_username(self._logged_username)
+ def authentication_token(self):
+ return self.app.get(url('authentication_token')).body
+
def checkSessionFlash(self, response, msg, skip=0):
if 'flash' not in response.session:
self.fail(safe_str(u'msg `%s` not found - session has no flash ' % msg))
diff --git a/kallithea/tests/functional/test_admin_auth_settings.py b/kallithea/tests/functional/test_admin_auth_settings.py
--- a/kallithea/tests/functional/test_admin_auth_settings.py
+++ b/kallithea/tests/functional/test_admin_auth_settings.py
@@ -6,7 +6,7 @@ class TestAuthSettingsController(TestCon
def _enable_plugins(self, plugins_list):
test_url = url(controller='admin/auth_settings',
action='auth_settings')
- params={'auth_plugins': plugins_list,}
+ params={'auth_plugins': plugins_list, '_authentication_token': self.authentication_token()}
for plugin in plugins_list.split(','):
enable = plugin.partition('kallithea.lib.auth_modules.')[-1]
diff --git a/kallithea/tests/functional/test_admin_defaults.py b/kallithea/tests/functional/test_admin_defaults.py
--- a/kallithea/tests/functional/test_admin_defaults.py
+++ b/kallithea/tests/functional/test_admin_defaults.py
@@ -32,10 +32,12 @@ class TestDefaultsController(TestControl
'default_repo_enable_statistics': True,
'default_repo_private': True,
'default_repo_type': 'hg',
+ '_authentication_token': self.authentication_token(),
}
response = self.app.put(url('default', id='default'), params=params)
self.checkSessionFlash(response, 'Default settings updated successfully')
+ params.pop('_authentication_token')
defs = Setting.get_default_repo_settings()
self.assertEqual(params, defs)
@@ -47,20 +49,23 @@ class TestDefaultsController(TestControl
'default_repo_enable_statistics': False,
'default_repo_private': False,
'default_repo_type': 'git',
+ '_authentication_token': self.authentication_token(),
}
response = self.app.put(url('default', id='default'), params=params)
self.checkSessionFlash(response, 'Default settings updated successfully')
+
+ params.pop('_authentication_token')
defs = Setting.get_default_repo_settings()
self.assertEqual(params, defs)
def test_update_browser_fakeout(self):
- response = self.app.post(url('default', id=1), params=dict(_method='put'))
+ response = self.app.post(url('default', id=1), params=dict(_method='put', _authentication_token=self.authentication_token()))
def test_delete(self):
response = self.app.delete(url('default', id=1))
def test_delete_browser_fakeout(self):
- response = self.app.post(url('default', id=1), params=dict(_method='delete'))
+ response = self.app.post(url('default', id=1), params=dict(_method='delete', _authentication_token=self.authentication_token()))
def test_show(self):
response = self.app.get(url('default', id=1))
diff --git a/kallithea/tests/functional/test_admin_gists.py b/kallithea/tests/functional/test_admin_gists.py
--- a/kallithea/tests/functional/test_admin_gists.py
+++ b/kallithea/tests/functional/test_admin_gists.py
@@ -56,7 +56,8 @@ class TestGistsController(TestController
def test_create_missing_description(self):
self.log_user()
response = self.app.post(url('gists'),
- params={'lifetime': -1}, status=200)
+ params={'lifetime': -1, '_authentication_token': self.authentication_token()},
+ status=200)
response.mustcontain('Missing value')
@@ -66,7 +67,8 @@ class TestGistsController(TestController
params={'lifetime': -1,
'content': 'gist test',
'filename': 'foo',
- 'public': 'public'},
+ 'public': 'public',
+ '_authentication_token': self.authentication_token()},
status=302)
response = response.follow()
response.mustcontain('added file: foo')
@@ -79,7 +81,8 @@ class TestGistsController(TestController
params={'lifetime': -1,
'content': 'gist test',
'filename': '/home/foo',
- 'public': 'public'},
+ 'public': 'public',
+ '_authentication_token': self.authentication_token()},
status=200)
response.mustcontain('Filename cannot be inside a directory')
@@ -98,7 +101,8 @@ class TestGistsController(TestController
params={'lifetime': -1,
'content': 'private gist test',
'filename': 'private-foo',
- 'private': 'private'},
+ 'private': 'private',
+ '_authentication_token': self.authentication_token()},
status=302)
response = response.follow()
response.mustcontain('added file: private-foo<')
@@ -112,7 +116,8 @@ class TestGistsController(TestController
'content': 'gist test',
'filename': 'foo-desc',
'description': 'gist-desc',
- 'public': 'public'},
+ 'public': 'public',
+ '_authentication_token': self.authentication_token()},
status=302)
response = response.follow()
response.mustcontain('added file: foo-desc')
diff --git a/kallithea/tests/functional/test_admin_permissions.py b/kallithea/tests/functional/test_admin_permissions.py
--- a/kallithea/tests/functional/test_admin_permissions.py
+++ b/kallithea/tests/functional/test_admin_permissions.py
@@ -18,7 +18,8 @@ class TestAdminPermissionsController(Tes
self.log_user()
default_user_id = User.get_default_user().user_id
response = self.app.put(url('edit_user_ips', id=default_user_id),
- params=dict(new_ip='127.0.0.0/24'))
+ params=dict(new_ip='127.0.0.0/24',
+ _authentication_token=self.authentication_token()))
response = self.app.get(url('admin_permissions_ips'))
response.mustcontain('127.0.0.0/24')
@@ -31,7 +32,8 @@ class TestAdminPermissionsController(Tes
response = self.app.post(url('edit_user_ips', id=default_user_id),
params=dict(_method='delete',
- del_ip_id=del_ip_id))
+ del_ip_id=del_ip_id,
+ _authentication_token=self.authentication_token()))
response = self.app.get(url('admin_permissions_ips'))
response.mustcontain('All IP addresses are allowed')
diff --git a/kallithea/tests/functional/test_admin_repos.py b/kallithea/tests/functional/test_admin_repos.py
--- a/kallithea/tests/functional/test_admin_repos.py
+++ b/kallithea/tests/functional/test_admin_repos.py
@@ -56,7 +56,8 @@ class _BaseTest(TestController):
fixture._get_repo_create_params(repo_private=False,
repo_name=repo_name,
repo_type=self.REPO_TYPE,
- repo_description=description))
+ repo_description=description,
+ _authentication_token=self.authentication_token()))
## run the check page that triggers the flash message
response = self.app.get(url('repo_check_home', repo_name=repo_name))
self.assertEqual(response.json, {u'result': True})
@@ -96,7 +97,8 @@ class _BaseTest(TestController):
fixture._get_repo_create_params(repo_private=False,
repo_name=repo_name,
repo_type=self.REPO_TYPE,
- repo_description=description))
+ repo_description=description,
+ _authentication_token=self.authentication_token()))
## run the check page that triggers the flash message
response = self.app.get(url('repo_check_home', repo_name=repo_name))
self.assertEqual(response.json, {u'result': True})
@@ -139,7 +141,8 @@ class _BaseTest(TestController):
repo_name=repo_name,
repo_type=self.REPO_TYPE,
repo_description=description,
- repo_group=gr.group_id,))
+ repo_group=gr.group_id,
+ _authentication_token=self.authentication_token()))
## run the check page that triggers the flash message
response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
self.assertEqual(response.json, {u'result': True})
@@ -177,6 +180,8 @@ class _BaseTest(TestController):
def test_create_in_group_without_needed_permissions(self):
usr = self.log_user(TEST_USER_REGULAR_LOGIN, TEST_USER_REGULAR_PASS)
+ # avoid spurious RepoGroup DetachedInstanceError ...
+ authentication_token = self.authentication_token()
# revoke
user_model = UserModel()
# disable fork and create on default user
@@ -213,7 +218,8 @@ class _BaseTest(TestController):
repo_name=repo_name,
repo_type=self.REPO_TYPE,
repo_description=description,
- repo_group=gr.group_id,))
+ repo_group=gr.group_id,
+ _authentication_token=authentication_token))
response.mustcontain('Invalid value')
@@ -226,7 +232,8 @@ class _BaseTest(TestController):
repo_name=repo_name,
repo_type=self.REPO_TYPE,
repo_description=description,
- repo_group=gr_allowed.group_id,))
+ repo_group=gr_allowed.group_id,
+ _authentication_token=authentication_token))
## run the check page that triggers the flash message
response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
@@ -287,7 +294,8 @@ class _BaseTest(TestController):
repo_type=self.REPO_TYPE,
repo_description=description,
repo_group=gr.group_id,
- repo_copy_permissions=True))
+ repo_copy_permissions=True,
+ _authentication_token=self.authentication_token()))
## run the check page that triggers the flash message
response = self.app.get(url('repo_check_home', repo_name=repo_name_full))
@@ -338,7 +346,8 @@ class _BaseTest(TestController):
repo_name=repo_name,
repo_type=self.REPO_TYPE,
repo_description=description,
- clone_uri='http://127.0.0.1/repo'))
+ clone_uri='http://127.0.0.1/repo',
+ _authentication_token=self.authentication_token()))
response.mustcontain('invalid clone URL')
@@ -351,7 +360,8 @@ class _BaseTest(TestController):
repo_name=repo_name,
repo_type=self.REPO_TYPE,
repo_description=description,
- clone_uri='svn+http://127.0.0.1/repo'))
+ clone_uri='svn+http://127.0.0.1/repo',
+ _authentication_token=self.authentication_token()))
response.mustcontain('invalid clone URL')
@@ -363,7 +373,8 @@ class _BaseTest(TestController):
fixture._get_repo_create_params(repo_private=False,
repo_type=self.REPO_TYPE,
repo_name=repo_name,
- repo_description=description))
+ repo_description=description,
+ _authentication_token=self.authentication_token()))
## run the check page that triggers the flash message
response = self.app.get(url('repo_check_home', repo_name=repo_name))
self.checkSessionFlash(response,
@@ -413,7 +424,8 @@ class _BaseTest(TestController):
fixture._get_repo_create_params(repo_private=False,
repo_name=repo_name,
repo_type=self.REPO_TYPE,
- repo_description=description))
+ repo_description=description,
+ _authentication_token=self.authentication_token()))
## run the check page that triggers the flash message
response = self.app.get(url('repo_check_home', repo_name=repo_name))
self.assertEqual(response.json, {u'result': True})
@@ -457,7 +469,7 @@ class _BaseTest(TestController):
def test_delete_browser_fakeout(self):
response = self.app.post(url('repo', repo_name=self.REPO),
- params=dict(_method='delete'))
+ params=dict(_method='delete', _authentication_token=self.authentication_token()))
def test_show(self):
self.log_user()
@@ -478,7 +490,8 @@ class _BaseTest(TestController):
fixture._get_repo_create_params(repo_private=1,
repo_name=self.REPO,
repo_type=self.REPO_TYPE,
- user=TEST_USER_ADMIN_LOGIN))
+ user=TEST_USER_ADMIN_LOGIN,
+ _authentication_token=self.authentication_token()))
self.checkSessionFlash(response,
msg='Repository %s updated successfully' % (self.REPO))
self.assertEqual(Repository.get_by_repo_name(self.REPO).private, True)
@@ -492,7 +505,8 @@ class _BaseTest(TestController):
fixture._get_repo_create_params(repo_private=False,
repo_name=self.REPO,
repo_type=self.REPO_TYPE,
- user=TEST_USER_ADMIN_LOGIN))
+ user=TEST_USER_ADMIN_LOGIN,
+ _authentication_token=self.authentication_token()))
self.checkSessionFlash(response,
msg='Repository %s updated successfully' % (self.REPO))
self.assertEqual(Repository.get_by_repo_name(self.REPO).private, False)
@@ -521,7 +535,7 @@ class _BaseTest(TestController):
repo = Repository.get_by_repo_name(self.REPO)
repo2 = Repository.get_by_repo_name(other_repo)
response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
- params=dict(id_fork_of=repo2.repo_id))
+ params=dict(id_fork_of=repo2.repo_id, _authentication_token=self.authentication_token()))
repo = Repository.get_by_repo_name(self.REPO)
repo2 = Repository.get_by_repo_name(other_repo)
self.checkSessionFlash(response,
@@ -542,7 +556,7 @@ class _BaseTest(TestController):
repo = Repository.get_by_repo_name(self.REPO)
repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
- params=dict(id_fork_of=repo2.repo_id))
+ params=dict(id_fork_of=repo2.repo_id, _authentication_token=self.authentication_token()))
repo = Repository.get_by_repo_name(self.REPO)
repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
self.checkSessionFlash(response,
@@ -552,7 +566,7 @@ class _BaseTest(TestController):
self.log_user()
## mark it as None
response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
- params=dict(id_fork_of=None))
+ params=dict(id_fork_of=None, _authentication_token=self.authentication_token()))
repo = Repository.get_by_repo_name(self.REPO)
repo2 = Repository.get_by_repo_name(self.OTHER_TYPE_REPO)
self.checkSessionFlash(response,
@@ -564,7 +578,7 @@ class _BaseTest(TestController):
self.log_user()
repo = Repository.get_by_repo_name(self.REPO)
response = self.app.put(url('edit_repo_advanced_fork', repo_name=self.REPO),
- params=dict(id_fork_of=repo.repo_id))
+ params=dict(id_fork_of=repo.repo_id, _authentication_token=self.authentication_token()))
self.checkSessionFlash(response,
'An error occurred during this operation')
@@ -594,7 +608,8 @@ class _BaseTest(TestController):
fixture._get_repo_create_params(repo_private=False,
repo_name=repo_name,
repo_type=self.REPO_TYPE,
- repo_description=description))
+ repo_description=description,
+ _authentication_token=self.authentication_token()))
response.mustcontain('no permission to create repository in root location')
@@ -611,7 +626,8 @@ class _BaseTest(TestController):
fixture._get_repo_create_params(repo_private=False,
repo_name=repo_name,
repo_type=self.REPO_TYPE,
- repo_description=description))
+ repo_description=description,
+ _authentication_token=self.authentication_token()))
self.checkSessionFlash(response,
'Error creating repository %s' % repo_name)
diff --git a/kallithea/tests/functional/test_admin_settings.py b/kallithea/tests/functional/test_admin_settings.py
--- a/kallithea/tests/functional/test_admin_settings.py
+++ b/kallithea/tests/functional/test_admin_settings.py
@@ -37,7 +37,8 @@ class TestAdminSettingsController(TestCo
self.log_user()
response = self.app.post(url('admin_settings_hooks'),
params=dict(new_hook_ui_key='test_hooks_1',
- new_hook_ui_value='cd /tmp'))
+ new_hook_ui_value='cd /tmp',
+ _authentication_token=self.authentication_token()))
response = response.follow()
response.mustcontain('test_hooks_1')
@@ -47,7 +48,8 @@ class TestAdminSettingsController(TestCo
self.log_user()
response = self.app.post(url('admin_settings_hooks'),
params=dict(new_hook_ui_key='test_hooks_2',
- new_hook_ui_value='cd /tmp2'))
+ new_hook_ui_value='cd /tmp2',
+ _authentication_token=self.authentication_token()))
response = response.follow()
response.mustcontain('test_hooks_2')
@@ -56,7 +58,7 @@ class TestAdminSettingsController(TestCo
hook_id = Ui.get_by_key('test_hooks_2').ui_id
## delete
self.app.post(url('admin_settings_hooks'),
- params=dict(hook_id=hook_id))
+ params=dict(hook_id=hook_id, _authentication_token=self.authentication_token()))
response = self.app.get(url('admin_settings_hooks'))
response.mustcontain(no=['test_hooks_2'])
response.mustcontain(no=['cd /tmp2'])
@@ -80,6 +82,7 @@ class TestAdminSettingsController(TestCo
ga_code=new_ga_code,
captcha_private_key='',
captcha_public_key='',
+ _authentication_token=self.authentication_token(),
))
self.checkSessionFlash(response, 'Updated application settings')
@@ -101,6 +104,7 @@ class TestAdminSettingsController(TestCo
ga_code=new_ga_code,
captcha_private_key='',
captcha_public_key='',
+ _authentication_token=self.authentication_token(),
))
self.checkSessionFlash(response, 'Updated application settings')
@@ -121,6 +125,7 @@ class TestAdminSettingsController(TestCo
ga_code=new_ga_code,
captcha_private_key='1234567890',
captcha_public_key='1234567890',
+ _authentication_token=self.authentication_token(),
))
self.checkSessionFlash(response, 'Updated application settings')
@@ -141,6 +146,7 @@ class TestAdminSettingsController(TestCo
ga_code=new_ga_code,
captcha_private_key='',
captcha_public_key='1234567890',
+ _authentication_token=self.authentication_token(),
))
self.checkSessionFlash(response, 'Updated application settings')
@@ -163,6 +169,7 @@ class TestAdminSettingsController(TestCo
ga_code='',
captcha_private_key='',
captcha_public_key='',
+ _authentication_token=self.authentication_token(),
))
self.checkSessionFlash(response, 'Updated application settings')
diff --git a/kallithea/tests/functional/test_admin_user_groups.py b/kallithea/tests/functional/test_admin_user_groups.py
--- a/kallithea/tests/functional/test_admin_user_groups.py
+++ b/kallithea/tests/functional/test_admin_user_groups.py
@@ -19,7 +19,8 @@ class TestAdminUsersGroupsController(Tes
response = self.app.post(url('users_groups'),
{'users_group_name': users_group_name,
'user_group_description': 'DESC',
- 'active': True})
+ 'active': True,
+ '_authentication_token': self.authentication_token()})
response.follow()
self.checkSessionFlash(response,
@@ -35,7 +36,7 @@ class TestAdminUsersGroupsController(Tes
def test_update_browser_fakeout(self):
response = self.app.post(url('users_group', id=1),
- params=dict(_method='put'))
+ params=dict(_method='put', _authentication_token=self.authentication_token()))
def test_delete(self):
self.log_user()
@@ -43,7 +44,8 @@ class TestAdminUsersGroupsController(Tes
response = self.app.post(url('users_groups'),
{'users_group_name':users_group_name,
'user_group_description': 'DESC',
- 'active': True})
+ 'active': True,
+ '_authentication_token': self.authentication_token()})
response.follow()
self.checkSessionFlash(response,
@@ -65,7 +67,8 @@ class TestAdminUsersGroupsController(Tes
response = self.app.post(url('users_groups'),
{'users_group_name': users_group_name,
'user_group_description': 'DESC',
- 'active': True})
+ 'active': True,
+ '_authentication_token': self.authentication_token()})
response.follow()
ug = UserGroup.get_by_group_name(users_group_name)
@@ -74,8 +77,8 @@ class TestAdminUsersGroupsController(Tes
## ENABLE REPO CREATE ON A GROUP
response = self.app.put(url('edit_user_group_default_perms',
id=ug.users_group_id),
- {'create_repo_perm': True})
-
+ {'create_repo_perm': True,
+ '_authentication_token': self.authentication_token()})
response.follow()
ug = UserGroup.get_by_group_name(users_group_name)
p = Permission.get_by_key('hg.create.repository')
@@ -135,7 +138,8 @@ class TestAdminUsersGroupsController(Tes
response = self.app.post(url('users_groups'),
{'users_group_name': users_group_name,
'user_group_description': 'DESC',
- 'active': True})
+ 'active': True,
+ '_authentication_token': self.authentication_token()})
response.follow()
ug = UserGroup.get_by_group_name(users_group_name)
@@ -144,7 +148,7 @@ class TestAdminUsersGroupsController(Tes
## ENABLE REPO CREATE ON A GROUP
response = self.app.put(url('edit_user_group_default_perms',
id=ug.users_group_id),
- {'fork_repo_perm': True})
+ {'fork_repo_perm': True, '_authentication_token': self.authentication_token()})
response.follow()
ug = UserGroup.get_by_group_name(users_group_name)
@@ -204,7 +208,7 @@ class TestAdminUsersGroupsController(Tes
def test_delete_browser_fakeout(self):
response = self.app.post(url('users_group', id=1),
- params=dict(_method='delete'))
+ params=dict(_method='delete', _authentication_token=self.authentication_token()))
def test_show(self):
response = self.app.get(url('users_group', id=1))
diff --git a/kallithea/tests/functional/test_admin_users.py b/kallithea/tests/functional/test_admin_users.py
--- a/kallithea/tests/functional/test_admin_users.py
+++ b/kallithea/tests/functional/test_admin_users.py
@@ -58,7 +58,8 @@ class TestAdminUsersController(TestContr
'lastname': lastname,
'extern_name': 'internal',
'extern_type': 'internal',
- 'email': email})
+ 'email': email,
+ '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, '''Created user %s''' % (username))
@@ -89,7 +90,8 @@ class TestAdminUsersController(TestContr
'name': name,
'active': False,
'lastname': lastname,
- 'email': email})
+ 'email': email,
+ '_authentication_token': self.authentication_token()})
msg = validators.ValidUsername(False, {})._messages['system_invalid_username']
msg = h.html_escape(msg % {'username': 'new_user'})
@@ -145,8 +147,10 @@ class TestAdminUsersController(TestContr
# logged in yet his data is not filled
# so we use creation data
+ params.update({'_authentication_token': self.authentication_token()})
response = self.app.put(url('user', id=usr.user_id), params)
self.checkSessionFlash(response, 'User updated successfully')
+ params.pop('_authentication_token')
updated_user = User.get_by_username(self.test_user_1)
updated_params = updated_user.get_api_data(True)
@@ -266,7 +270,8 @@ class TestAdminUsersController(TestContr
response = self.app.post(url('edit_user_perms', id=uid),
params=dict(_method='put',
- create_repo_perm=True))
+ create_repo_perm=True,
+ _authentication_token=self.authentication_token()))
perm_none = Permission.get_by_key('hg.create.none')
perm_create = Permission.get_by_key('hg.create.repository')
@@ -295,7 +300,7 @@ class TestAdminUsersController(TestContr
self.assertEqual(UserModel().has_perm(user, perm_create), False)
response = self.app.post(url('edit_user_perms', id=uid),
- params=dict(_method='put'))
+ params=dict(_method='put', _authentication_token=self.authentication_token()))
perm_none = Permission.get_by_key('hg.create.none')
perm_create = Permission.get_by_key('hg.create.repository')
@@ -325,7 +330,8 @@ class TestAdminUsersController(TestContr
response = self.app.post(url('edit_user_perms', id=uid),
params=dict(_method='put',
- create_repo_perm=True))
+ create_repo_perm=True,
+ _authentication_token=self.authentication_token()))
perm_none = Permission.get_by_key('hg.create.none')
perm_create = Permission.get_by_key('hg.create.repository')
@@ -354,7 +360,7 @@ class TestAdminUsersController(TestContr
self.assertEqual(UserModel().has_perm(user, perm_fork), False)
response = self.app.post(url('edit_user_perms', id=uid),
- params=dict(_method='put'))
+ params=dict(_method='put', _authentication_token=self.authentication_token()))
perm_none = Permission.get_by_key('hg.create.none')
perm_create = Permission.get_by_key('hg.create.repository')
@@ -386,7 +392,7 @@ class TestAdminUsersController(TestContr
user_id = user.user_id
response = self.app.put(url('edit_user_ips', id=user_id),
- params=dict(new_ip=ip))
+ params=dict(new_ip=ip, _authentication_token=self.authentication_token()))
if failure:
self.checkSessionFlash(response, 'Please enter a valid IPv4 or IpV6 address')
@@ -419,7 +425,7 @@ class TestAdminUsersController(TestContr
response.mustcontain(ip_range)
self.app.post(url('edit_user_ips', id=user_id),
- params=dict(_method='delete', del_ip_id=new_ip_id))
+ params=dict(_method='delete', del_ip_id=new_ip_id, _authentication_token=self.authentication_token()))
response = self.app.get(url('edit_user_ips', id=user_id))
response.mustcontain('All IP addresses are allowed')
@@ -445,7 +451,7 @@ class TestAdminUsersController(TestContr
user_id = user.user_id
response = self.app.post(url('edit_user_api_keys', id=user_id),
- {'_method': 'put', 'description': desc, 'lifetime': lifetime})
+ {'_method': 'put', 'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'Api key successfully created')
try:
response = response.follow()
@@ -463,7 +469,7 @@ class TestAdminUsersController(TestContr
user_id = user.user_id
response = self.app.post(url('edit_user_api_keys', id=user_id),
- {'_method': 'put', 'description': 'desc', 'lifetime': -1})
+ {'_method': 'put', 'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'Api key successfully created')
response = response.follow()
@@ -472,7 +478,7 @@ class TestAdminUsersController(TestContr
self.assertEqual(1, len(keys))
response = self.app.post(url('edit_user_api_keys', id=user_id),
- {'_method': 'delete', 'del_api_key': keys[0].api_key})
+ {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'Api key successfully deleted')
keys = UserApiKeys.query().filter(UserApiKeys.user_id == user_id).all()
self.assertEqual(0, len(keys))
@@ -487,7 +493,7 @@ class TestAdminUsersController(TestContr
response.mustcontain('expires: never')
response = self.app.post(url('edit_user_api_keys', id=user_id),
- {'_method': 'delete', 'del_api_key_builtin': api_key})
+ {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'Api key successfully reset')
response = response.follow()
response.mustcontain(no=[api_key])
diff --git a/kallithea/tests/functional/test_changeset_comments.py b/kallithea/tests/functional/test_changeset_comments.py
--- a/kallithea/tests/functional/test_changeset_comments.py
+++ b/kallithea/tests/functional/test_changeset_comments.py
@@ -29,7 +29,7 @@ class TestChangeSetCommentsController(Te
rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
text = u'CommentOnRevision'
- params = {'text': text}
+ params = {'text': text, '_authentication_token': self.authentication_token()}
response = self.app.post(url(controller='changeset', action='comment',
repo_name=HG_REPO, revision=rev),
params=params)
@@ -66,7 +66,7 @@ class TestChangeSetCommentsController(Te
f_path = 'vcs/web/simplevcs/views/repository.py'
line = 'n1'
- params = {'text': text, 'f_path': f_path, 'line': line}
+ params = {'text': text, 'f_path': f_path, 'line': line, '_authentication_token': self.authentication_token()}
response = self.app.post(url(controller='changeset', action='comment',
repo_name=HG_REPO, revision=rev),
params=params)
@@ -106,7 +106,7 @@ class TestChangeSetCommentsController(Te
rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
text = u'@test_regular check CommentOnRevision'
- params = {'text':text}
+ params = {'text': text, '_authentication_token': self.authentication_token()}
response = self.app.post(url(controller='changeset', action='comment',
repo_name=HG_REPO, revision=rev),
params=params)
@@ -134,7 +134,7 @@ class TestChangeSetCommentsController(Te
rev = '27cd5cce30c96924232dffcd24178a07ffeb5dfc'
text = u'CommentOnRevision'
- params = {'text': text}
+ params = {'text': text, '_authentication_token': self.authentication_token()}
response = self.app.post(url(controller='changeset', action='comment',
repo_name=HG_REPO, revision=rev),
params=params)
diff --git a/kallithea/tests/functional/test_files.py b/kallithea/tests/functional/test_files.py
--- a/kallithea/tests/functional/test_files.py
+++ b/kallithea/tests/functional/test_files.py
@@ -328,7 +328,8 @@ removed extra unicode conversion in diff
repo_name=HG_REPO,
revision='tip', f_path='/'),
params={
- 'content': ''
+ 'content': '',
+ '_authentication_token': self.authentication_token(),
},
status=302)
@@ -340,7 +341,8 @@ removed extra unicode conversion in diff
repo_name=HG_REPO,
revision='tip', f_path='/'),
params={
- 'content': "foo"
+ 'content': "foo",
+ '_authentication_token': self.authentication_token(),
},
status=302)
@@ -359,7 +361,8 @@ removed extra unicode conversion in diff
params={
'content': "foo",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
@@ -379,7 +382,8 @@ removed extra unicode conversion in diff
params={
'content': "foo",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
try:
@@ -401,7 +405,8 @@ removed extra unicode conversion in diff
repo_name=GIT_REPO,
revision='tip', f_path='/'),
params={
- 'content': ''
+ 'content': '',
+ '_authentication_token': self.authentication_token(),
},
status=302)
self.checkSessionFlash(response, 'No content')
@@ -412,7 +417,8 @@ removed extra unicode conversion in diff
repo_name=GIT_REPO,
revision='tip', f_path='/'),
params={
- 'content': "foo"
+ 'content': "foo",
+ '_authentication_token': self.authentication_token(),
},
status=302)
@@ -431,7 +437,8 @@ removed extra unicode conversion in diff
params={
'content': "foo",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
@@ -451,7 +458,8 @@ removed extra unicode conversion in diff
params={
'content': "foo",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
try:
@@ -480,7 +488,8 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello'\n",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
response.follow()
@@ -510,7 +519,8 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello'\n",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
response.follow()
@@ -524,6 +534,7 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello world'\n",
'message': 'i commited',
+ '_authentication_token': self.authentication_token(),
},
status=302)
self.checkSessionFlash(response,
@@ -551,7 +562,8 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello'\n",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
response.follow()
@@ -581,7 +593,8 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello'\n",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
response.follow()
@@ -595,6 +608,7 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello world'\n",
'message': 'i commited',
+ '_authentication_token': self.authentication_token(),
},
status=302)
self.checkSessionFlash(response,
@@ -622,7 +636,8 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello'\n",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
response.follow()
@@ -652,7 +667,8 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello'\n",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
response.follow()
@@ -665,6 +681,7 @@ removed extra unicode conversion in diff
f_path='vcs/nodes.py'),
params={
'message': 'i commited',
+ '_authentication_token': self.authentication_token(),
},
status=302)
self.checkSessionFlash(response,
@@ -692,7 +709,8 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello'\n",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
response.follow()
@@ -722,7 +740,8 @@ removed extra unicode conversion in diff
params={
'content': "def py():\n print 'hello'\n",
'filename': filename,
- 'location': location
+ 'location': location,
+ '_authentication_token': self.authentication_token(),
},
status=302)
response.follow()
@@ -735,6 +754,7 @@ removed extra unicode conversion in diff
f_path='vcs/nodes.py'),
params={
'message': 'i commited',
+ '_authentication_token': self.authentication_token(),
},
status=302)
self.checkSessionFlash(response,
diff --git a/kallithea/tests/functional/test_forks.py b/kallithea/tests/functional/test_forks.py
--- a/kallithea/tests/functional/test_forks.py
+++ b/kallithea/tests/functional/test_forks.py
@@ -60,7 +60,7 @@ class _BaseTest(TestController):
# try create a fork
repo_name = self.REPO
self.app.post(url(controller='forks', action='fork_create',
- repo_name=repo_name), {}, status=403)
+ repo_name=repo_name), {'_authentication_token': self.authentication_token()}, status=403)
def test_index_with_fork(self):
self.log_user()
@@ -77,7 +77,8 @@ class _BaseTest(TestController):
'repo_type': self.REPO_TYPE,
'description': description,
'private': 'False',
- 'landing_rev': 'rev:tip'}
+ 'landing_rev': 'rev:tip',
+ '_authentication_token': self.authentication_token()}
self.app.post(url(controller='forks', action='fork_create',
repo_name=repo_name), creation_args)
@@ -108,7 +109,8 @@ class _BaseTest(TestController):
'repo_type': self.REPO_TYPE,
'description': description,
'private': 'False',
- 'landing_rev': 'rev:tip'}
+ 'landing_rev': 'rev:tip',
+ '_authentication_token': self.authentication_token()}
self.app.post(url(controller='forks', action='fork_create',
repo_name=repo_name), creation_args)
repo = Repository.get_by_repo_name(fork_name_full)
@@ -150,7 +152,8 @@ class _BaseTest(TestController):
'repo_type': self.REPO_TYPE,
'description': description,
'private': 'False',
- 'landing_rev': 'rev:tip'}
+ 'landing_rev': 'rev:tip',
+ '_authentication_token': self.authentication_token()}
self.app.post(url(controller='forks', action='fork_create',
repo_name=repo_name), creation_args)
repo = Repository.get_by_repo_name(self.REPO_FORK)
diff --git a/kallithea/tests/functional/test_my_account.py b/kallithea/tests/functional/test_my_account.py
--- a/kallithea/tests/functional/test_my_account.py
+++ b/kallithea/tests/functional/test_my_account.py
@@ -50,7 +50,7 @@ class TestMyAccountController(TestContro
response = self.app.get(url('my_account_emails'))
response.mustcontain('No additional emails specified')
response = self.app.post(url('my_account_emails'),
- {'new_email': TEST_USER_REGULAR_EMAIL})
+ {'new_email': TEST_USER_REGULAR_EMAIL, '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'This e-mail address is already taken')
def test_my_account_my_emails_add_mising_email_in_form(self):
@@ -66,7 +66,7 @@ class TestMyAccountController(TestContro
response.mustcontain('No additional emails specified')
response = self.app.post(url('my_account_emails'),
- {'new_email': 'foo@barz.com'})
+ {'new_email': 'foo@barz.com', '_authentication_token': self.authentication_token()})
response = self.app.get(url('my_account_emails'))
@@ -79,7 +79,7 @@ class TestMyAccountController(TestContro
response.mustcontain('' % email_id)
response = self.app.post(url('my_account_emails'),
- {'del_email_id': email_id, '_method': 'delete'})
+ {'del_email_id': email_id, '_method': 'delete', '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'Removed email from user')
response = self.app.get(url('my_account_emails'))
response.mustcontain('No additional emails specified')
@@ -114,6 +114,7 @@ class TestMyAccountController(TestContro
params.update({'new_password': ''})
params.update({'extern_type': 'internal'})
params.update({'extern_name': self.test_user_1})
+ params.update({'_authentication_token': self.authentication_token()})
params.update(attrs)
response = self.app.post(url('my_account'), params)
@@ -142,6 +143,7 @@ class TestMyAccountController(TestContro
#my account cannot make you an admin !
params['admin'] = False
+ params.pop('_authentication_token')
self.assertEqual(params, updated_params)
def test_my_account_update_err_email_exists(self):
@@ -155,7 +157,8 @@ class TestMyAccountController(TestContro
password_confirmation='test122',
firstname='NewName',
lastname='NewLastname',
- email=new_email,)
+ email=new_email,
+ _authentication_token=self.authentication_token())
)
response.mustcontain('This e-mail address is already taken')
@@ -171,7 +174,8 @@ class TestMyAccountController(TestContro
password_confirmation='test122',
firstname='NewName',
lastname='NewLastname',
- email=new_email,))
+ email=new_email,
+ _authentication_token=self.authentication_token()))
response.mustcontain('An email address must contain a single @')
from kallithea.model import validators
@@ -196,7 +200,7 @@ class TestMyAccountController(TestContro
usr = self.log_user('test_regular2', 'test12')
user = User.get(usr['user_id'])
response = self.app.post(url('my_account_api_keys'),
- {'description': desc, 'lifetime': lifetime})
+ {'description': desc, 'lifetime': lifetime, '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'Api key successfully created')
try:
response = response.follow()
@@ -212,7 +216,7 @@ class TestMyAccountController(TestContro
usr = self.log_user('test_regular2', 'test12')
user = User.get(usr['user_id'])
response = self.app.post(url('my_account_api_keys'),
- {'description': 'desc', 'lifetime': -1})
+ {'description': 'desc', 'lifetime': -1, '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'Api key successfully created')
response = response.follow()
@@ -221,7 +225,7 @@ class TestMyAccountController(TestContro
self.assertEqual(1, len(keys))
response = self.app.post(url('my_account_api_keys'),
- {'_method': 'delete', 'del_api_key': keys[0].api_key})
+ {'_method': 'delete', 'del_api_key': keys[0].api_key, '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'Api key successfully deleted')
keys = UserApiKeys.query().all()
self.assertEqual(0, len(keys))
@@ -236,7 +240,7 @@ class TestMyAccountController(TestContro
response.mustcontain('expires: never')
response = self.app.post(url('my_account_api_keys'),
- {'_method': 'delete', 'del_api_key_builtin': api_key})
+ {'_method': 'delete', 'del_api_key_builtin': api_key, '_authentication_token': self.authentication_token()})
self.checkSessionFlash(response, 'Api key successfully reset')
response = response.follow()
response.mustcontain(no=[api_key])