@@ -120,8 +120,10 @@ class ReposController(BaseController):
c.repos_list = [('', _('--REMOVE FORK--'))]
c.repos_list += [(x.repo_id, x.repo_name) for x in
Repository.query().order_by(Repository.repo_name).all()]
Repository.query().order_by(Repository.repo_name).all()
if x.repo_id != c.repo_info.repo_id]
defaults['id_fork_of'] = db_repo.fork.repo_id if db_repo.fork else ''
return defaults
@HasPermissionAllDecorator('hg.admin')
@@ -417,11 +419,11 @@ class ReposController(BaseController):
repo = ScmModel().mark_as_fork(repo_name, fork_id,
self.rhodecode_user.username)
fork = repo.fork.repo_name if repo.fork else _('Nothing')
Session.commit()
h.flash(_('Marked repo %s as fork of %s') % (repo_name,fork),
Session().commit()
h.flash(_('Marked repo %s as fork of %s') % (repo_name, fork),
category='success')
except Exception, e:
raise
log.error(traceback.format_exc())
h.flash(_('An error occurred during this operation'),
category='error')
@@ -380,6 +380,8 @@ class ScmModel(BaseModel):
def mark_as_fork(self, repo, fork, user):
repo = self.__get_repo(repo)
fork = self.__get_repo(fork)
if fork and repo.repo_id == fork.repo_id:
raise Exception("Cannot set repository as fork of itself")
repo.fork = fork
self.sa.add(repo)
return repo
@@ -160,5 +160,5 @@ class TestController(TestCase):
if not msg in response.session['flash'][0][1]:
self.fail(
'msg `%s` not found in session flash: got `%s` instead' % (
msg, response.session['flash'][0][1])
msg, response.session['flash'])
)
# -*- coding: utf-8 -*-
from rhodecode.lib.auth import get_crypt_password, check_password
from rhodecode.model.db import User, RhodeCodeSetting
from rhodecode.model.db import User, RhodeCodeSetting, Repository
from rhodecode.tests import *
from rhodecode.lib import helpers as h
from rhodecode.model.user import UserModel
from rhodecode.model.scm import ScmModel
class TestAdminSettingsController(TestController):
@@ -211,3 +212,55 @@ class TestAdminSettingsController(TestCo
old_data={})._messages['username_exists']
msg = h.html_escape(msg % {'username': 'test_admin'})
response.mustcontain(u"%s" % msg)
def test_set_repo_fork_has_no_self_id(self):
self.log_user()
repo = Repository.get_by_repo_name(HG_REPO)
response = self.app.get(url('edit_repo', repo_name=HG_REPO))
opt = """<option value="%s">vcs_test_git</option>""" % repo.repo_id
assert opt not in response.body
def test_set_fork_of_repo(self):
repo2 = Repository.get_by_repo_name(GIT_REPO)
response = self.app.put(url('repo_as_fork', repo_name=HG_REPO),
params=dict(
id_fork_of=repo2.repo_id
))
self.checkSessionFlash(response,
'Marked repo %s as fork of %s' % (repo.repo_name, repo2.repo_name))
assert repo.fork == repo2
response = response.follow()
# check if given repo is selected
opt = """<option value="%s" selected="selected">%s</option>""" % (
repo2.repo_id, repo2.repo_name)
response.mustcontain(opt)
# clean session flash
#response = self.app.get(url('edit_repo', repo_name=HG_REPO))
## mark it as None
id_fork_of=None
'Marked repo %s as fork of %s' % (repo.repo_name, "Nothing"))
assert repo.fork == None
def test_set_fork_of_same_repo(self):
id_fork_of=repo.repo_id
'An error occurred during this operation')
\ No newline at end of file
Status change: