@@ -111,26 +111,28 @@ class ReposController(BaseController):
c.repo_last_rev = repo.count() if repo.revisions else 0
if last_rev == 0 or c.repo_last_rev == 0:
c.stats_percentage = 0
else:
c.stats_percentage = '%.2f' % ((float((last_rev)) /
c.repo_last_rev) * 100)
defaults = RepoModel()._get_defaults(repo_name)
c.repos_list = [('', _('--REMOVE FORK--'))]
c.repos_list += [(x.repo_id, x.repo_name) for x in
Repository.query().order_by(Repository.repo_name).all()]
Repository.query().order_by(Repository.repo_name).all()
if x.repo_id != c.repo_info.repo_id]
defaults['id_fork_of'] = db_repo.fork.repo_id if db_repo.fork else ''
return defaults
@HasPermissionAllDecorator('hg.admin')
def index(self, format='html'):
"""GET /repos: All items in the collection"""
# url('repos')
c.repos_list = ScmModel().get_repos(Repository.query()
.order_by(Repository.repo_name)
.all(), sort_key='name_sort')
return render('admin/repos/repos.html')
@@ -408,29 +410,29 @@ class ReposController(BaseController):
def repo_as_fork(self, repo_name):
"""
Mark given repository as a fork of another
:param repo_name:
try:
fork_id = request.POST.get('id_fork_of')
repo = ScmModel().mark_as_fork(repo_name, fork_id,
self.rhodecode_user.username)
fork = repo.fork.repo_name if repo.fork else _('Nothing')
Session.commit()
h.flash(_('Marked repo %s as fork of %s') % (repo_name,fork),
Session().commit()
h.flash(_('Marked repo %s as fork of %s') % (repo_name, fork),
category='success')
except Exception, e:
raise
log.error(traceback.format_exc())
h.flash(_('An error occurred during this operation'),
category='error')
return redirect(url('edit_repo', repo_name=repo_name))
def show(self, repo_name, format='html'):
"""GET /repos/repo_name: Show a specific item"""
# url('repo', repo_name=ID)
def edit(self, repo_name, format='html'):
@@ -371,24 +371,26 @@ class ScmModel(BaseModel):
repo = self._get_repo(repo)
return self.sa.query(Repository)\
.filter(Repository.fork == repo).count()
def get_pull_requests(self, repo):
return self.sa.query(PullRequest)\
.filter(PullRequest.other_repo == repo).count()
def mark_as_fork(self, repo, fork, user):
repo = self.__get_repo(repo)
fork = self.__get_repo(fork)
if fork and repo.repo_id == fork.repo_id:
raise Exception("Cannot set repository as fork of itself")
repo.fork = fork
self.sa.add(repo)
return repo
def pull_changes(self, repo, username):
dbrepo = self.__get_repo(repo)
clone_uri = dbrepo.clone_uri
if not clone_uri:
raise Exception("This repository doesn't have a clone uri")
repo = dbrepo.scm_instance
@@ -151,14 +151,14 @@ class TestController(TestCase):
self.assertEqual(ses.get('is_authenticated'), True)
return response.session['rhodecode_user']
def _get_logged_user(self):
return User.get_by_username(self._logged_username)
def checkSessionFlash(self, response, msg):
self.assertTrue('flash' in response.session)
if not msg in response.session['flash'][0][1]:
self.fail(
'msg `%s` not found in session flash: got `%s` instead' % (
msg, response.session['flash'][0][1])
msg, response.session['flash'])
)
# -*- coding: utf-8 -*-
from rhodecode.lib.auth import get_crypt_password, check_password
from rhodecode.model.db import User, RhodeCodeSetting
from rhodecode.model.db import User, RhodeCodeSetting, Repository
from rhodecode.tests import *
from rhodecode.lib import helpers as h
from rhodecode.model.user import UserModel
from rhodecode.model.scm import ScmModel
class TestAdminSettingsController(TestController):
def test_index(self):
response = self.app.get(url('admin_settings'))
# Test response...
def test_index_as_xml(self):
response = self.app.get(url('formatted_admin_settings', format='xml'))
def test_create(self):
@@ -202,12 +203,64 @@ class TestAdminSettingsController(TestCo
password_confirmation='test122',
firstname='NewName',
lastname='NewLastname',
email=new_email,)
response.mustcontain('An email address must contain a single @')
from rhodecode.model import validators
msg = validators.ValidUsername(edit=False,
old_data={})._messages['username_exists']
msg = h.html_escape(msg % {'username': 'test_admin'})
response.mustcontain(u"%s" % msg)
def test_set_repo_fork_has_no_self_id(self):
self.log_user()
repo = Repository.get_by_repo_name(HG_REPO)
response = self.app.get(url('edit_repo', repo_name=HG_REPO))
opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
assert opt not in response.body
def test_set_fork_of_repo(self):
repo2 = Repository.get_by_repo_name(GIT_REPO)
response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
params=dict(
id_fork_of=repo2.repo_id
))
self.checkSessionFlash(response,
'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
assert repo.fork == repo2
response = response.follow()
# check if given repo is selected
opt = """<option value="%s" selected="selected">%s</option>""" % (
repo2.repo_id, repo2.repo_name)
response.mustcontain(opt)
# clean session flash
#response = self.app.get(url('edit_repo', repo_name=HG_REPO))
## mark it as None
id_fork_of=None
'Marked repo %s as fork of %s' % (repo.repo_name, "Nothing"))
assert repo.fork == None
def test_set_fork_of_same_repo(self):
id_fork_of=repo.repo_id
'An error occurred during this operation')
\ No newline at end of file
Status change: