@@ -67,12 +67,13 @@ class ReposController(BaseController):
c.repo_groups = RepoGroup.groups_choices()
c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
repo_model = RepoModel()
c.users_array = repo_model.get_users_js()
c.users_groups_array = repo_model.get_users_groups_js()
c.landing_revs = ScmModel().get_repo_landing_revs()
def __load_data(self, repo_name=None):
"""
Load defaults settings for edit, and update
:param repo_name:
@@ -88,12 +89,13 @@ class ReposController(BaseController):
' please run the application again'
' in order to rescan repositories') % repo_name,
category='error')
return redirect(url('repos'))
c.landing_revs = ScmModel().get_repo_landing_revs(c.repo_info)
c.default_user_id = User.get_by_username('default').user_id
c.in_public_journal = UserFollowing.query()\
.filter(UserFollowing.user_id == c.default_user_id)\
.filter(UserFollowing.follows_repository == c.repo_info).scalar()
if c.repo_info.stats:
@@ -113,12 +115,13 @@ class ReposController(BaseController):
defaults = RepoModel()._get_defaults(repo_name)
c.repos_list = [('', _('--REMOVE FORK--'))]
c.repos_list += [(x.repo_id, x.repo_name) for x in
Repository.query().order_by(Repository.repo_name).all()]
return defaults
@HasPermissionAllDecorator('hg.admin')
def index(self, format='html'):
"""GET /repos: All items in the collection"""
# url('repos')
@@ -394,12 +394,13 @@ class SettingsController(BaseController)
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def create_repository(self):
"""GET /_admin/create_repository: Form to create a new item"""
new_repo = request.GET.get('repo', '')
c.new_repo = repo_name_slug(new_repo)
return render('admin/repos/repo_add_create_repository.html')
@@ -445,13 +445,14 @@ def repo2db_mapper(initial_repo_list, re
'repo_name': name,
'repo_name_full': name,
'repo_type': repo.alias,
'description': repo.description \
if repo.description != 'unknown' else '%s repository' % name,
'private': False,
'group_id': getattr(group, 'group_id', None)
'group_id': getattr(group, 'group_id', None),
'landing_rev': repo.DEFAULT_BRANCH_NAME
}
rm.create(form_data, user, just_db=True)
sa.commit()
removed = []
if remove_obsolete:
# remove from database those repositories that are not in the filesystem
@@ -555,13 +556,13 @@ def create_test_index(repo_location, con
def create_test_env(repos_test_path, config):
Makes a fresh database and
install test repository into tmp dir
from rhodecode.lib.db_manage import DbManage
from rhodecode.tests import HG_REPO, TESTS_TMP_PATH
from rhodecode.tests import HG_REPO, GIT_REPO, TESTS_TMP_PATH
# PART ONE create db
dbconf = config['sqlalchemy.db1.url']
log.debug('making test db %s' % dbconf)
# create test dir if it doesn't exist
@@ -590,18 +591,23 @@ def create_test_env(repos_test_path, con
shutil.rmtree(idx_path)
if data_path and os.path.exists(data_path):
log.debug('remove %s' % data_path)
shutil.rmtree(data_path)
#CREATE DEFAULT HG REPOSITORY
#CREATE DEFAULT TEST REPOS
cur_dir = dn(dn(abspath(__file__)))
tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
tar.close()
tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_git.tar.gz"))
tar.extractall(jn(TESTS_TMP_PATH, GIT_REPO))
#LOAD VCS test stuff
from rhodecode.tests.vcs import setup_package
setup_package()
#==============================================================================
@@ -292,13 +292,13 @@ class User(Base, BaseModel):
)
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=None)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
name = Column("firstname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
_email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
ldap_dn = Column("ldap_dn", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
@@ -497,19 +497,20 @@ class Repository(Base, BaseModel):
'mysql_charset': 'utf8'},
repo_id = Column("repo_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repo_name = Column("repo_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
clone_uri = Column("clone_uri", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default='hg')
repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
private = Column("private", Boolean(), nullable=True, unique=None, default=None)
enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
landing_rev = Column("landing_revision", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
user = relationship('User')
fork = relationship('Repository', remote_side=repo_id)
@@ -520,13 +521,13 @@ class Repository(Base, BaseModel):
followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id', cascade='all')
logs = relationship('UserLog')
def __unicode__(self):
return u"<%s('%s:%s')>" % (self.__class__.__name__,self.repo_id,
return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
self.repo_name)
@classmethod
def url_sep(cls):
return URL_SEP
@@ -659,29 +659,30 @@ def RepoForm(edit=False, old_data={}, su
filter_extra_fields = False
repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
SlugifyName())
clone_uri = All(UnicodeString(strip=True, min=1, not_empty=False))
repo_group = OneOf(repo_groups, hideList=True)
repo_type = OneOf(supported_backends)
description = UnicodeString(strip=True, min=1, not_empty=True)
description = UnicodeString(strip=True, min=1, not_empty=False)
private = StringBoolean(if_missing=False)
enable_statistics = StringBoolean(if_missing=False)
enable_downloads = StringBoolean(if_missing=False)
landing_rev = UnicodeString(strip=True, min=1, not_empty=True)
if edit:
#this is repo owner
user = All(UnicodeString(not_empty=True), ValidRepoUser)
chained_validators = [ValidCloneUri()(),
ValidRepoName(edit, old_data),
ValidPerms()]
return _RepoForm
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
repo_groups=[]):
def RepoForkForm(edit=False, old_data={},
supported_backends=BACKENDS.keys(), repo_groups=[]):
class _RepoForkForm(formencode.Schema):
allow_extra_fields = True
@@ -693,23 +694,23 @@ def RepoForkForm(edit=False, old_data={}
fork_parent_id = UnicodeString()
chained_validators = [ValidForkName(edit, old_data)]
return _RepoForkForm
def RepoSettingsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
def RepoSettingsForm(edit=False, old_data={},
class _RepoForm(formencode.Schema):
chained_validators = [ValidRepoName(edit, old_data), ValidPerms(),
ValidSettings]
def ApplicationSettingsForm():
@@ -26,12 +26,13 @@ import os
import time
import traceback
import logging
import cStringIO
from sqlalchemy import func
from pylons.i18n.translation import _
from rhodecode.lib.vcs import get_backend
from rhodecode.lib.vcs.exceptions import RepositoryError
from rhodecode.lib.vcs.utils.lazy import LazyProperty
from rhodecode.lib.vcs.nodes import FileNode
@@ -469,6 +470,36 @@ class ScmModel(BaseModel):
raise
return _dirs, _files
def get_unread_journal(self):
return self.sa.query(UserLog).count()
def get_repo_landing_revs(self, repo=None):
Generates select option with tags branches and bookmarks (for hg only)
grouped by type
:param repo:
:type repo:
hist_l = []
repo = self.__get_repo(repo)
hist_l.append(['tip', _('latest tip')])
if not repo:
return hist_l
repo = repo.scm_instance
branches_group = ([(k, k) for k, v in
repo.branches.iteritems()], _("Branches"))
hist_l.append(branches_group)
if repo.alias == 'hg':
bookmarks_group = ([(k, k) for k, v in
repo.bookmarks.iteritems()], _("Bookmarks"))
hist_l.append(bookmarks_group)
tags_group = ([(k, k) for k, v in
repo.tags.iteritems()], _("Tags"))
hist_l.append(tags_group)
@@ -39,12 +39,21 @@ ${h.form(url('repos'))}
</div>
<div class="input">
${h.select('repo_type','hg',c.backends,class_="small")}
<span class="help-block">${_('Type of repository to create.')}</span>
<div class="field">
<div class="label">
<label for="landing_rev">${_('Landing revision')}:</label>
${h.select('landing_rev','',c.landing_revs,class_="medium")}
<span class="help-block">${_('Default revision for files page, downloads, whoosh and readme')}</span>
<div class="label label-textarea">
<label for="description">${_('Description')}:</label>
<div class="textarea text-area editor">
${h.textarea('description')}
@@ -59,12 +59,21 @@
${h.select('repo_type','hg',c.backends,class_="medium")}
<span class="help-block">${_('Keep it short and to the point. Use a README file for longer descriptions.')}</span>
@@ -207,22 +216,22 @@
</li>
</ul>
${h.end_form()}
<h3>${_('Set as fork')}</h3>
<h3>${_('Set as fork of')}</h3>
${h.form(url('repo_as_fork', repo_name=c.repo_info.repo_name),method='put')}
<div class="form">
<div class="fields">
${h.select('id_fork_of','',c.repos_list,class_="medium")}
${h.submit('set_as_fork_%s' % c.repo_info.repo_name,_('set'),class_="ui-btn",)}
<div class="field" style="border:none;color:#888">
<ul>
<li>${_('''Manually set this repository as a fork of another''')}</li>
<li>${_('''Manually set this repository as a fork of another from the list''')}</li>
@@ -35,19 +35,20 @@ os.environ['TZ'] = 'UTC'
if not is_windows:
time.tzset()
log = logging.getLogger(__name__)
__all__ = [
'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK',
'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
'environ', 'url', 'get_new_dir', 'TestController', 'TESTS_TMP_PATH',
'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK',
'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_GIT_REPO',
'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
]
# Invoke websetup with the current config file
# SetupCommand('setup-app').run([config_file])
##RUNNING DESIRED TESTS
@@ -80,28 +81,33 @@ HG_FORK = 'vcs_test_hg_fork'
GIT_FORK = 'vcs_test_git_fork'
## VCS
SCM_TESTS = ['hg', 'git']
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
THIS = os.path.abspath(os.path.dirname(__file__))
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
TEST_HG_REPO = jn(TESTS_TMP_PATH, 'vcs-hg')
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
TEST_DIR = tempfile.gettempdir()
TEST_REPO_PREFIX = 'vcs-test'
# cached repos if any !
# comment out to get some other repos from bb or github
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
def get_new_dir(title):
Returns always new directory path.
from rhodecode.tests.vcs.utils import get_normalized_path
@@ -111,18 +117,12 @@ def get_new_dir(title):
hex = hashlib.sha1(str(time.time())).hexdigest()
name = '-'.join((name, hex))
path = os.path.join(TEST_DIR, name)
return get_normalized_path(path)
PACKAGE_DIR = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..'))
TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
class TestController(TestCase):
def __init__(self, *args, **kwargs):
wsgiapp = pylons.test.pylonsapp
config = wsgiapp.config
@@ -3,12 +3,13 @@
import os
from rhodecode.lib import vcs
from rhodecode.model.db import Repository
from rhodecode.tests import *
class TestAdminReposController(TestController):
def __make_repo(self):
pass
def test_index(self):
@@ -21,113 +22,148 @@ class TestAdminReposController(TestContr
def test_create_hg(self):
self.log_user()
repo_name = NEW_HG_REPO
description = 'description for newly created repo'
private = False
response = self.app.post(url('repos'), {'repo_name':repo_name,
'repo_type':'hg',
'clone_uri':'',
'repo_group':'',
'description':description,
'private':private})
self.checkSessionFlash(response, 'created repository %s' % (repo_name))
response = self.app.post(url('repos'), {'repo_name': repo_name,
'repo_type': 'hg',
'clone_uri': '',
'repo_group': '',
'description': description,
'private': private,
'landing_rev': 'tip'})
self.checkSessionFlash(response,
'created repository %s' % (repo_name))
#test if the repo was created in the database
new_repo = self.Session.query(Repository).filter(Repository.repo_name ==
repo_name).one()
new_repo = self.Session.query(Repository)\
.filter(Repository.repo_name == repo_name).one()
self.assertEqual(new_repo.repo_name, repo_name)
self.assertEqual(new_repo.description, description)
#test if repository is visible in the list ?
response = response.follow()
self.assertTrue(repo_name in response.body)
response.mustcontain(repo_name)
#test if repository was created on filesystem
try:
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
except:
self.fail('no repo in filesystem')
self.fail('no repo %s in filesystem' % repo_name)
def test_create_hg_non_ascii(self):
non_ascii = "ąęł"
repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
repo_name_unicode = repo_name.decode('utf8')
description = 'description for newly created repo' + non_ascii
description_unicode = description.decode('utf8')
'created repository %s' % (repo_name_unicode))
repo_name_unicode).one()
.filter(Repository.repo_name == repo_name_unicode).one()
self.assertEqual(new_repo.repo_name, repo_name_unicode)
self.assertEqual(new_repo.description, description_unicode)
def test_create_hg_in_group(self):
#TODO: write test !
def test_create_git(self):
return
repo_name = NEW_GIT_REPO
'repo_type':'git',
'repo_type': 'git',
#test if we have a message for that repository
assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
#test if the fork was created in the database
new_repo = self.Session.query(Repository).filter(Repository.repo_name == repo_name).one()
assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
assert new_repo.description == description, 'wrong description'
assert repo_name in response.body, 'missing new repo from the main repos list'
assert False , 'no repo in filesystem'
def test_create_git_non_ascii(self):
repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
def test_new(self):
response = self.app.get(url('new_repo'))
def test_new_as_xml(self):
@@ -137,74 +173,123 @@ class TestAdminReposController(TestContr
response = self.app.put(url('repo', repo_name=HG_REPO))
def test_update_browser_fakeout(self):
response = self.app.post(url('repo', repo_name=HG_REPO),
params=dict(_method='put'))
def test_delete(self):
def test_delete_hg(self):
repo_name = 'vcs_test_new_to_delete'
self.assertTrue('flash' in response.session)
self.assertTrue('''created repository %s''' % (repo_name) in
response.session['flash'][0])
response = self.app.delete(url('repo', repo_name=repo_name))
self.assertTrue('''deleted repository %s''' % (repo_name) in
response.follow()
#check if repo was deleted from db
deleted_repo = self.Session.query(Repository).filter(Repository.repo_name
== repo_name).scalar()
deleted_repo = self.Session.query(Repository)\
.filter(Repository.repo_name == repo_name).scalar()
self.assertEqual(deleted_repo, None)
self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
False)
def test_delete_git(self):
def test_delete_repo_with_group(self):
#TODO:
def test_delete_browser_fakeout(self):
params=dict(_method='delete'))
def test_show(self):
def test_show_hg(self):
response = self.app.get(url('repo', repo_name=HG_REPO))
def test_show_as_xml(self):
response = self.app.get(url('formatted_repo', repo_name=HG_REPO,
format='xml'))
def test_show_git(self):
response = self.app.get(url('repo', repo_name=GIT_REPO))
def test_edit(self):
response = self.app.get(url('edit_repo', repo_name=HG_REPO))
def test_edit_as_xml(self):
response = self.app.get(url('formatted_edit_repo', repo_name=HG_REPO,
@@ -25,36 +25,66 @@ class TestForksController(TestController
repo_name = HG_REPO
response = self.app.get(url(controller='forks', action='forks',
repo_name=repo_name))
self.assertTrue("""There are no forks yet""" in response.body)
def test_index_with_fork(self):
def test_index_with_fork_hg(self):
# create a fork
fork_name = HG_FORK
description = 'fork of vcs test'
org_repo = Repository.get_by_repo_name(repo_name)
response = self.app.post(url(controller='forks',
action='fork_create',
repo_name=repo_name),
{'repo_name':fork_name,
'fork_parent_id':org_repo.repo_id,
'private':'False'})
{'repo_name': fork_name,
'fork_parent_id': org_repo.repo_id,
'private': 'False',
self.assertTrue("""<a href="/%s/summary">"""
"""vcs_test_hg_fork</a>""" % fork_name
in response.body)
response.mustcontain(
"""<a href="/%s/summary">%s</a>""" % (fork_name, fork_name)
#remove this fork
response = self.app.delete(url('repo', repo_name=fork_name))
def test_index_with_fork_git(self):
fork_name = GIT_FORK
repo_name = GIT_REPO
def test_z_fork_create(self):
@@ -66,32 +96,29 @@ class TestForksController(TestController
'private':'False',
#test if we have a message that fork is ok
self.assertTrue('forked %s repository as %s' \
% (repo_name, fork_name) in response.session['flash'][0])
'forked %s repository as %s' % (repo_name, fork_name))
fork_repo = self.Session.query(Repository)\
.filter(Repository.repo_name == fork_name).one()
self.assertEqual(fork_repo.repo_name, fork_name)
self.assertEqual(fork_repo.fork.repo_name, repo_name)
#test if fork is visible in the list ?
# check if fork is marked as fork
# wait for cache to expire
time.sleep(10)
response = self.app.get(url(controller='summary', action='index',
repo_name=fork_name))
self.assertTrue('Fork of %s' % repo_name in response.body)
def test_zz_fork_permission_page(self):
from nose.plugins.skip import SkipTest
class TestSearchController(TestController):
@@ -24,16 +24,14 @@ class TestSearchController(TestControlle
'Please run whoosh indexer' in response.body)
def test_normal_search(self):
response = self.app.get(url(controller='search', action='index'),
{'q': 'def repo'})
response.mustcontain('10 results')
response.mustcontain('Permission denied')
response.mustcontain('39 results')
def test_repo_search(self):
{'q': 'repository:%s def test' % HG_REPO})
response.mustcontain('4 results')
@@ -89,13 +89,13 @@ class TestSummaryController(TestControll
action='index',
repo_name='_%s' % ID))
#repo type
response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
"""title="Git repository" alt="Git """
"""repository" src="/images/icons/hgicon.png"/>""")
"""repository" src="/images/icons/giticon.png"/>""")
"""title="public repository" alt="public """
"""repository" src="/images/icons/lock_open.png"/>""")
def _enable_stats(self):
r = Repository.get_by_repo_name(HG_REPO)
@@ -133,13 +133,14 @@ class TestReposGroups(unittest.TestCase)
repo_name_full='john',
fork_name=None,
description=None,
repo_group=None,
private=False,
repo_type='hg',
clone_uri=None)
clone_uri=None,
landing_rev='tip')
cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
self.assertEqual(r.repo_name, 'john')
# put repo into group
@@ -605,30 +606,30 @@ class TestPermissions(unittest.TestCase)
user=self.anon,
perm='group.none')
ReposGroupModel().grant_user_permission(repos_group=self.g2,
u1_auth = AuthUser(user_id=self.u1.user_id)
self.assertEqual(u1_auth.permissions['repositories_groups'],
{u'group1': u'group.none', u'group2': u'group.none'})
a1_auth = AuthUser(user_id=self.anon.user_id)
self.assertEqual(a1_auth.permissions['repositories_groups'],
# add repo to group
form_data = {
'repo_name':HG_REPO,
'repo_name_full':RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
'repo_group':self.g1.group_id,
'description':'desc',
'private':False
'repo_name': HG_REPO,
'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
'repo_group': self.g1.group_id,
'description': 'desc',
'landing_rev': 'tip'
self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
Session.commit()
@@ -49,8 +49,8 @@ def setup_package():
for scm, fetcher_info in fetchers.items():
fetcher = SCMFetcher(**fetcher_info)
fetcher.setup()
except VCSTestError, err:
raise RuntimeError(str(err))
start_dir = os.path.abspath(os.path.dirname(__file__))
unittest.defaultTestLoader.discover(start_dir)
#start_dir = os.path.abspath(os.path.dirname(__file__))
#unittest.defaultTestLoader.discover(start_dir)
Unit tests configuration module for vcs.
import hashlib
import tempfile
import datetime
from utils import get_normalized_path
from os.path import join as jn
__all__ = (
'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
'SCM_TESTS',
TEST_TMP_PATH = TESTS_TMP_PATH
#__all__ = (
# 'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
# 'SCM_TESTS',
#)
#
#SCM_TESTS = ['hg', 'git']
#uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
jn(TEST_TMP_PATH, 'vcs-git'))
TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
jn(TEST_TMP_PATH, 'vcs-hg'))
TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
name = TEST_REPO_PREFIX
if title:
name = '-'.join((name, title))
#GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
#TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
#TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
# jn(TEST_TMP_PATH, 'vcs-git'))
#TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
# jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
#TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
# jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
#HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
#TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
# jn(TEST_TMP_PATH, 'vcs-hg'))
#TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
# jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
#TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
# jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
#TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
#TEST_REPO_PREFIX = 'vcs-test'
#def get_new_dir(title):
# """
# Returns always new directory path.
# name = TEST_REPO_PREFIX
# if title:
# name = '-'.join((name, title))
# hex = hashlib.sha1(str(time.time())).hexdigest()
# name = '-'.join((name, hex))
# path = os.path.join(TEST_DIR, name)
# return get_normalized_path(path)
@@ -153,20 +153,16 @@ class MercurialRepositoryTest(unittest.T
def test_branches(self):
# TODO: Need more tests here
#active branches
self.assertTrue('default' in self.repo.branches)
#closed branches
self.assertFalse('web' in self.repo.branches)
self.assertFalse('git' in self.repo.branches)
self.assertTrue('git' in self.repo.branches)
# closed
self.assertTrue('workdir' in self.repo._get_branches(closed=True))
self.assertTrue('webvcs' in self.repo._get_branches(closed=True))
self.assertTrue('web' in self.repo._get_branches(closed=True))
for name, id in self.repo.branches.items():
self.assertTrue(isinstance(
self.repo.get_changeset(id), MercurialChangeset))
def test_tip_in_tags(self):
new file 100644
binary diff not shown
Status change: