@@ -49,94 +49,97 @@ from rhodecode.model.repo import RepoMod
log = logging.getLogger(__name__)
class ReposController(BaseController):
"""
REST Controller styled on the Atom Publishing Protocol"""
# To properly map this controller, ensure your config/routing.py
# file has a resource setup:
# map.resource('repo', 'repos')
@LoginRequired()
@HasPermissionAnyDecorator('hg.admin', 'hg.create.repository')
def __before__(self):
c.admin_user = session.get('admin_user')
c.admin_username = session.get('admin_username')
super(ReposController, self).__before__()
def __load_defaults(self):
c.repo_groups = RepoGroup.groups_choices()
c.repo_groups_choices = map(lambda k: unicode(k[0]), c.repo_groups)
repo_model = RepoModel()
c.users_array = repo_model.get_users_js()
c.users_groups_array = repo_model.get_users_groups_js()
c.landing_revs = ScmModel().get_repo_landing_revs()
def __load_data(self, repo_name=None):
Load defaults settings for edit, and update
:param repo_name:
self.__load_defaults()
c.repo_info = db_repo = Repository.get_by_repo_name(repo_name)
repo = db_repo.scm_instance
if c.repo_info is None:
h.flash(_('%s repository is not mapped to db perhaps'
' it was created or renamed from the filesystem'
' please run the application again'
' in order to rescan repositories') % repo_name,
category='error')
return redirect(url('repos'))
c.landing_revs = ScmModel().get_repo_landing_revs(c.repo_info)
c.default_user_id = User.get_by_username('default').user_id
c.in_public_journal = UserFollowing.query()\
.filter(UserFollowing.user_id == c.default_user_id)\
.filter(UserFollowing.follows_repository == c.repo_info).scalar()
if c.repo_info.stats:
# this is on what revision we ended up so we add +1 for count
last_rev = c.repo_info.stats.stat_on_revision + 1
else:
last_rev = 0
c.stats_revision = last_rev
c.repo_last_rev = repo.count() if repo.revisions else 0
if last_rev == 0 or c.repo_last_rev == 0:
c.stats_percentage = 0
c.stats_percentage = '%.2f' % ((float((last_rev)) /
c.repo_last_rev) * 100)
defaults = RepoModel()._get_defaults(repo_name)
c.repos_list = [('', _('--REMOVE FORK--'))]
c.repos_list += [(x.repo_id, x.repo_name) for x in
Repository.query().order_by(Repository.repo_name).all()]
return defaults
@HasPermissionAllDecorator('hg.admin')
def index(self, format='html'):
"""GET /repos: All items in the collection"""
# url('repos')
c.repos_list = ScmModel().get_repos(Repository.query()
.order_by(Repository.repo_name)
.all(), sort_key='name_sort')
return render('admin/repos/repos.html')
def create(self):
POST /repos: Create a new item"""
form_result = {}
try:
form_result = RepoForm(repo_groups=c.repo_groups_choices)()\
.to_python(dict(request.POST))
RepoModel().create(form_result, self.rhodecode_user)
@@ -376,48 +376,49 @@ class SettingsController(BaseController)
.all()
c.user_repos = ScmModel().get_repos(all_repos)
c.form = htmlfill.render(
render('admin/users/user_edit_my_account_form.html'),
defaults=errors.value,
errors=errors.error_dict or {},
prefix_error=False,
encoding="UTF-8")
return render('admin/users/user_edit_my_account.html')
except Exception:
log.error(traceback.format_exc())
h.flash(_('error occurred during update of user %s') \
% form_result.get('username'), category='error')
return redirect(url('my_account'))
@NotAnonymous()
def create_repository(self):
"""GET /_admin/create_repository: Form to create a new item"""
new_repo = request.GET.get('repo', '')
c.new_repo = repo_name_slug(new_repo)
return render('admin/repos/repo_add_create_repository.html')
def get_hg_ui_settings(self):
ret = self.sa.query(RhodeCodeUi).all()
if not ret:
raise Exception('Could not get application ui settings !')
settings = {}
for each in ret:
k = each.ui_key
v = each.ui_value
if k == '/':
k = 'root_path'
if k.find('.') != -1:
k = k.replace('.', '_')
if each.ui_section == 'hooks':
v = each.ui_active
@@ -427,49 +427,50 @@ def repo2db_mapper(initial_repo_list, re
:param initial_repo_list: list of repositories found by scanning methods
:param remove_obsolete: check for obsolete entries in database
from rhodecode.model.repo import RepoModel
sa = meta.Session
rm = RepoModel()
user = sa.query(User).filter(User.admin == True).first()
if user is None:
raise Exception('Missing administrative account !')
added = []
for name, repo in initial_repo_list.items():
group = map_groups(name)
if not rm.get_by_repo_name(name, cache=False):
log.info('repository %s not found creating default' % name)
added.append(name)
form_data = {
'repo_name': name,
'repo_name_full': name,
'repo_type': repo.alias,
'description': repo.description \
if repo.description != 'unknown' else '%s repository' % name,
'private': False,
'group_id': getattr(group, 'group_id', None)
'group_id': getattr(group, 'group_id', None),
'landing_rev': repo.DEFAULT_BRANCH_NAME
}
rm.create(form_data, user, just_db=True)
sa.commit()
removed = []
if remove_obsolete:
# remove from database those repositories that are not in the filesystem
for repo in sa.query(Repository).all():
if repo.repo_name not in initial_repo_list.keys():
log.debug("Removing non existing repository found in db %s" %
repo.repo_name)
removed.append(repo.repo_name)
sa.delete(repo)
# clear cache keys
log.debug("Clearing cache keys now...")
CacheInvalidation.clear_cache()
return added, removed
# set cache regions for beaker so celery can utilise it
def add_cache(settings):
cache_settings = {'regions': None}
@@ -537,89 +538,94 @@ def create_test_index(repo_location, con
from rhodecode.lib.pidlock import DaemonLock, LockHeld
repo_location = repo_location
index_location = os.path.join(config['app_conf']['index_dir'])
if not os.path.exists(index_location):
os.makedirs(index_location)
l = DaemonLock(file_=jn(dn(index_location), 'make_index.lock'))
WhooshIndexingDaemon(index_location=index_location,
repo_location=repo_location)\
.run(full_index=full_index)
l.release()
except LockHeld:
pass
def create_test_env(repos_test_path, config):
Makes a fresh database and
install test repository into tmp dir
from rhodecode.lib.db_manage import DbManage
from rhodecode.tests import HG_REPO, TESTS_TMP_PATH
from rhodecode.tests import HG_REPO, GIT_REPO, TESTS_TMP_PATH
# PART ONE create db
dbconf = config['sqlalchemy.db1.url']
log.debug('making test db %s' % dbconf)
# create test dir if it doesn't exist
if not os.path.isdir(repos_test_path):
log.debug('Creating testdir %s' % repos_test_path)
os.makedirs(repos_test_path)
dbmanage = DbManage(log_sql=True, dbconf=dbconf, root=config['here'],
tests=True)
dbmanage.create_tables(override=True)
dbmanage.create_settings(dbmanage.config_prompt(repos_test_path))
dbmanage.create_default_user()
dbmanage.admin_prompt()
dbmanage.create_permissions()
dbmanage.populate_default_permissions()
Session.commit()
# PART TWO make test repo
log.debug('making test vcs repositories')
idx_path = config['app_conf']['index_dir']
data_path = config['app_conf']['cache_dir']
#clean index and data
if idx_path and os.path.exists(idx_path):
log.debug('remove %s' % idx_path)
shutil.rmtree(idx_path)
if data_path and os.path.exists(data_path):
log.debug('remove %s' % data_path)
shutil.rmtree(data_path)
#CREATE DEFAULT HG REPOSITORY
#CREATE DEFAULT TEST REPOS
cur_dir = dn(dn(abspath(__file__)))
tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_hg.tar.gz"))
tar.extractall(jn(TESTS_TMP_PATH, HG_REPO))
tar.close()
tar = tarfile.open(jn(cur_dir, 'tests', "vcs_test_git.tar.gz"))
tar.extractall(jn(TESTS_TMP_PATH, GIT_REPO))
#LOAD VCS test stuff
from rhodecode.tests.vcs import setup_package
setup_package()
#==============================================================================
# PASTER COMMANDS
class BasePasterCommand(Command):
Abstract Base Class for paster commands.
The celery commands are somewhat aggressive about loading
celery.conf, and since our module sets the `CELERY_LOADER`
environment variable to our loader, we have to bootstrap a bit and
make sure we've had a chance to load the pylons config off of the
command line, otherwise everything fails.
min_args = 1
min_args_error = "Please provide a paster config file as an argument."
takes_config_file = 1
requires_config_file = True
def notify_msg(self, msg, log=False):
@@ -274,49 +274,49 @@ class RhodeCodeUi(Base, BaseModel):
@classmethod
def create_or_update_hook(cls, key, val):
new_ui = cls.get_by_key(key).scalar() or cls()
new_ui.ui_section = 'hooks'
new_ui.ui_active = True
new_ui.ui_key = key
new_ui.ui_value = val
Session.add(new_ui)
class User(Base, BaseModel):
__tablename__ = 'users'
__table_args__ = (
UniqueConstraint('username'), UniqueConstraint('email'),
{'extend_existing': True, 'mysql_engine': 'InnoDB',
'mysql_charset': 'utf8'}
)
user_id = Column("user_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
username = Column("username", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
password = Column("password", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
active = Column("active", Boolean(), nullable=True, unique=None, default=None)
admin = Column("admin", Boolean(), nullable=True, unique=None, default=False)
name = Column("name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
name = Column("firstname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
lastname = Column("lastname", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
_email = Column("email", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
last_login = Column("last_login", DateTime(timezone=False), nullable=True, unique=None, default=None)
ldap_dn = Column("ldap_dn", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
api_key = Column("api_key", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
user_log = relationship('UserLog', cascade='all')
user_perms = relationship('UserToPerm', primaryjoin="User.user_id==UserToPerm.user_id", cascade='all')
repositories = relationship('Repository')
user_followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_user_id==User.user_id', cascade='all')
repo_to_perm = relationship('UserRepoToPerm', primaryjoin='UserRepoToPerm.user_id==User.user_id', cascade='all')
repo_group_to_perm = relationship('UserRepoGroupToPerm', primaryjoin='UserRepoGroupToPerm.user_id==User.user_id', cascade='all')
group_member = relationship('UsersGroupMember', cascade='all')
notifications = relationship('UserNotification', cascade='all')
# notifications assigned to this user
user_created_notifications = relationship('Notification', cascade='all')
# comments created by this user
user_comments = relationship('ChangesetComment', cascade='all')
@hybrid_property
def email(self):
@@ -479,72 +479,73 @@ class UsersGroupMember(Base, BaseModel):
users_group_member_id = Column("users_group_member_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
users_group_id = Column("users_group_id", Integer(), ForeignKey('users_groups.users_group_id'), nullable=False, unique=None, default=None)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=None, default=None)
user = relationship('User', lazy='joined')
users_group = relationship('UsersGroup')
def __init__(self, gr_id='', u_id=''):
self.users_group_id = gr_id
self.user_id = u_id
class Repository(Base, BaseModel):
__tablename__ = 'repositories'
UniqueConstraint('repo_name'),
'mysql_charset': 'utf8'},
repo_id = Column("repo_id", Integer(), nullable=False, unique=True, default=None, primary_key=True)
repo_name = Column("repo_name", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=True, default=None)
clone_uri = Column("clone_uri", String(length=255, convert_unicode=False, assert_unicode=None), nullable=True, unique=False, default=None)
repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default='hg')
repo_type = Column("repo_type", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
user_id = Column("user_id", Integer(), ForeignKey('users.user_id'), nullable=False, unique=False, default=None)
private = Column("private", Boolean(), nullable=True, unique=None, default=None)
enable_statistics = Column("statistics", Boolean(), nullable=True, unique=None, default=True)
enable_downloads = Column("downloads", Boolean(), nullable=True, unique=None, default=True)
description = Column("description", String(length=10000, convert_unicode=False, assert_unicode=None), nullable=True, unique=None, default=None)
created_on = Column('created_on', DateTime(timezone=False), nullable=True, unique=None, default=datetime.datetime.now)
landing_rev = Column("landing_revision", String(length=255, convert_unicode=False, assert_unicode=None), nullable=False, unique=False, default=None)
fork_id = Column("fork_id", Integer(), ForeignKey('repositories.repo_id'), nullable=True, unique=False, default=None)
group_id = Column("group_id", Integer(), ForeignKey('groups.group_id'), nullable=True, unique=False, default=None)
user = relationship('User')
fork = relationship('Repository', remote_side=repo_id)
group = relationship('RepoGroup')
repo_to_perm = relationship('UserRepoToPerm', cascade='all', order_by='UserRepoToPerm.repo_to_perm_id')
users_group_to_perm = relationship('UsersGroupRepoToPerm', cascade='all')
stats = relationship('Statistics', cascade='all', uselist=False)
followers = relationship('UserFollowing', primaryjoin='UserFollowing.follows_repo_id==Repository.repo_id', cascade='all')
logs = relationship('UserLog')
def __unicode__(self):
return u"<%s('%s:%s')>" % (self.__class__.__name__,self.repo_id,
return u"<%s('%s:%s')>" % (self.__class__.__name__, self.repo_id,
self.repo_name)
def url_sep(cls):
return URL_SEP
def get_by_repo_name(cls, repo_name):
q = Session.query(cls).filter(cls.repo_name == repo_name)
q = q.options(joinedload(Repository.fork))\
.options(joinedload(Repository.user))\
.options(joinedload(Repository.group))
return q.scalar()
def get_by_full_path(cls, repo_full_path):
repo_name = repo_full_path.split(cls.base_path(), 1)[-1]
return cls.get_by_repo_name(repo_name.strip(URL_SEP))
def get_repo_forks(cls, repo_id):
return cls.query().filter(Repository.fork_id == repo_id)
@@ -641,93 +641,94 @@ def RegisterForm(edit=False, old_data={}
chained_validators = [ValidPasswordsMatch, ValidPassword]
return _RegisterForm
def PasswordResetForm():
class _PasswordResetForm(formencode.Schema):
allow_extra_fields = True
filter_extra_fields = True
email = All(ValidSystemEmail(), Email(not_empty=True))
return _PasswordResetForm
def RepoForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
repo_groups=[]):
class _RepoForm(formencode.Schema):
filter_extra_fields = False
repo_name = All(UnicodeString(strip=True, min=1, not_empty=True),
SlugifyName())
clone_uri = All(UnicodeString(strip=True, min=1, not_empty=False))
repo_group = OneOf(repo_groups, hideList=True)
repo_type = OneOf(supported_backends)
description = UnicodeString(strip=True, min=1, not_empty=True)
description = UnicodeString(strip=True, min=1, not_empty=False)
private = StringBoolean(if_missing=False)
enable_statistics = StringBoolean(if_missing=False)
enable_downloads = StringBoolean(if_missing=False)
landing_rev = UnicodeString(strip=True, min=1, not_empty=True)
if edit:
#this is repo owner
user = All(UnicodeString(not_empty=True), ValidRepoUser)
chained_validators = [ValidCloneUri()(),
ValidRepoName(edit, old_data),
ValidPerms()]
return _RepoForm
def RepoForkForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
def RepoForkForm(edit=False, old_data={},
supported_backends=BACKENDS.keys(), repo_groups=[]):
class _RepoForkForm(formencode.Schema):
repo_type = All(ValidForkType(old_data), OneOf(supported_backends))
copy_permissions = StringBoolean(if_missing=False)
update_after_clone = StringBoolean(if_missing=False)
fork_parent_id = UnicodeString()
chained_validators = [ValidForkName(edit, old_data)]
return _RepoForkForm
def RepoSettingsForm(edit=False, old_data={}, supported_backends=BACKENDS.keys(),
def RepoSettingsForm(edit=False, old_data={},
chained_validators = [ValidRepoName(edit, old_data), ValidPerms(),
ValidSettings]
def ApplicationSettingsForm():
class _ApplicationSettingsForm(formencode.Schema):
rhodecode_title = UnicodeString(strip=True, min=1, not_empty=True)
rhodecode_realm = UnicodeString(strip=True, min=1, not_empty=True)
rhodecode_ga_code = UnicodeString(strip=True, min=1, not_empty=False)
return _ApplicationSettingsForm
def ApplicationUiSettingsForm():
class _ApplicationUiSettingsForm(formencode.Schema):
web_push_ssl = OneOf(['true', 'false'], if_missing='false')
paths_root_path = All(ValidPath(), UnicodeString(strip=True, min=1, not_empty=True))
hooks_changegroup_update = OneOf(['True', 'False'], if_missing=False)
hooks_changegroup_repo_size = OneOf(['True', 'False'], if_missing=False)
@@ -8,48 +8,49 @@
:created_on: Apr 9, 2010
:author: marcink
:copyright: (C) 2010-2012 Marcin Kuzminski <marcin@python-works.com>
:license: GPLv3, see COPYING for more details.
# This program is free software: you can redistribute it and/or modify
# it under the terms of the GNU General Public License as published by
# the Free Software Foundation, either version 3 of the License, or
# (at your option) any later version.
#
# This program is distributed in the hope that it will be useful,
# but WITHOUT ANY WARRANTY; without even the implied warranty of
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE. See the
# GNU General Public License for more details.
# You should have received a copy of the GNU General Public License
# along with this program. If not, see <http://www.gnu.org/licenses/>.
import os
import time
import traceback
import logging
import cStringIO
from sqlalchemy import func
from pylons.i18n.translation import _
from rhodecode.lib.vcs import get_backend
from rhodecode.lib.vcs.exceptions import RepositoryError
from rhodecode.lib.vcs.utils.lazy import LazyProperty
from rhodecode.lib.vcs.nodes import FileNode
from rhodecode import BACKENDS
from rhodecode.lib import helpers as h
from rhodecode.lib.utils2 import safe_str, safe_unicode
from rhodecode.lib.auth import HasRepoPermissionAny, HasReposGroupPermissionAny
from rhodecode.lib.utils import get_repos as get_filesystem_repos, make_ui, \
action_logger, EmptyChangeset, REMOVED_REPO_PAT
from rhodecode.model import BaseModel
from rhodecode.model.db import Repository, RhodeCodeUi, CacheInvalidation, \
UserFollowing, UserLog, User, RepoGroup
class UserTemp(object):
def __init__(self, user_id):
self.user_id = user_id
def __repr__(self):
@@ -451,24 +452,54 @@ class ScmModel(BaseModel):
:param flat: return as a list, if False returns a dict with decription
_files = list()
_dirs = list()
_repo = self.__get_repo(repo_name)
changeset = _repo.scm_instance.get_changeset(revision)
root_path = root_path.lstrip('/')
for topnode, dirs, files in changeset.walk(root_path):
for f in files:
_files.append(f.path if flat else {"name": f.path,
"type": "file"})
for d in dirs:
_dirs.append(d.path if flat else {"name": d.path,
"type": "dir"})
except RepositoryError:
log.debug(traceback.format_exc())
raise
return _dirs, _files
def get_unread_journal(self):
return self.sa.query(UserLog).count()
def get_repo_landing_revs(self, repo=None):
Generates select option with tags branches and bookmarks (for hg only)
grouped by type
:param repo:
:type repo:
hist_l = []
repo = self.__get_repo(repo)
hist_l.append(['tip', _('latest tip')])
if not repo:
return hist_l
repo = repo.scm_instance
branches_group = ([(k, k) for k, v in
repo.branches.iteritems()], _("Branches"))
hist_l.append(branches_group)
if repo.alias == 'hg':
bookmarks_group = ([(k, k) for k, v in
repo.bookmarks.iteritems()], _("Bookmarks"))
hist_l.append(bookmarks_group)
tags_group = ([(k, k) for k, v in
repo.tags.iteritems()], _("Tags"))
hist_l.append(tags_group)
@@ -21,48 +21,57 @@ ${h.form(url('repos'))}
</div>
<div class="input">
${h.text('clone_uri',class_="small")}
<span class="help-block">${_('Optional http[s] url from which repository should be cloned.')}</span>
<div class="field">
<div class="label">
<label for="repo_group">${_('Repository group')}:</label>
${h.select('repo_group',request.GET.get('parent_group'),c.repo_groups,class_="medium")}
<span class="help-block">${_('Optional select a group to put this repository into.')}</span>
<label for="repo_type">${_('Type')}:</label>
${h.select('repo_type','hg',c.backends,class_="small")}
<span class="help-block">${_('Type of repository to create.')}</span>
<label for="landing_rev">${_('Landing revision')}:</label>
${h.select('landing_rev','',c.landing_revs,class_="medium")}
<span class="help-block">${_('Default revision for files page, downloads, whoosh and readme')}</span>
<div class="label label-textarea">
<label for="description">${_('Description')}:</label>
<div class="textarea text-area editor">
${h.textarea('description')}
<span class="help-block">${_('Keep it short and to the point. Use a README file for longer descriptions.')}</span>
<div class="label label-checkbox">
<label for="private">${_('Private repository')}:</label>
<div class="checkboxes">
${h.checkbox('private',value="True")}
<span class="help-block">${_('Private repositories are only visible to people explicitly added as collaborators.')}</span>
<div class="buttons">
${h.submit('add',_('add'),class_="ui-button")}
${h.end_form()}
@@ -41,48 +41,57 @@
${h.text('clone_uri',class_="medium")}
${h.select('repo_group','',c.repo_groups,class_="medium")}
${h.select('repo_type','hg',c.backends,class_="medium")}
<label for="enable_statistics">${_('Enable statistics')}:</label>
${h.checkbox('enable_statistics',value="True")}
@@ -189,43 +198,43 @@
<ul>
<li>${_('''All actions made on this repository will be accessible to everyone in public journal''')}
</li>
</ul>
<h3>${_('Delete')}</h3>
${h.form(url('repo', repo_name=c.repo_info.repo_name),method='delete')}
<div class="form">
<div class="fields">
${h.submit('remove_%s' % c.repo_info.repo_name,_('Remove this repository'),class_="ui-btn red",onclick="return confirm('"+_('Confirm to delete this repository')+"');")}
<div class="field" style="border:none;color:#888">
<li>${_('''This repository will be renamed in a special way in order to be unaccesible for RhodeCode and VCS systems.
If you need fully delete it from filesystem please do it manually''')}
<h3>${_('Set as fork')}</h3>
<h3>${_('Set as fork of')}</h3>
${h.form(url('repo_as_fork', repo_name=c.repo_info.repo_name),method='put')}
${h.select('id_fork_of','',c.repos_list,class_="medium")}
${h.submit('set_as_fork_%s' % c.repo_info.repo_name,_('set'),class_="ui-btn",)}
<li>${_('''Manually set this repository as a fork of another''')}</li>
<li>${_('''Manually set this repository as a fork of another from the list''')}</li>
</%def>
@@ -17,130 +17,130 @@ from os.path import join as jn
from unittest import TestCase
from tempfile import _RandomNameSequence
from paste.deploy import loadapp
from paste.script.appinstall import SetupCommand
from pylons import config, url
from routes.util import URLGenerator
from webtest import TestApp
from rhodecode import is_windows
from rhodecode.model.meta import Session
from rhodecode.model.db import User
import pylons.test
os.environ['TZ'] = 'UTC'
if not is_windows:
time.tzset()
__all__ = [
'environ', 'url', 'TestController', 'TESTS_TMP_PATH', 'HG_REPO',
'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK', 'GIT_FORK',
'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
'environ', 'url', 'get_new_dir', 'TestController', 'TESTS_TMP_PATH',
'HG_REPO', 'GIT_REPO', 'NEW_HG_REPO', 'NEW_GIT_REPO', 'HG_FORK',
'GIT_FORK', 'TEST_USER_ADMIN_LOGIN', 'TEST_USER_REGULAR_LOGIN',
'TEST_USER_REGULAR_PASS', 'TEST_USER_REGULAR_EMAIL',
'TEST_USER_REGULAR2_LOGIN', 'TEST_USER_REGULAR2_PASS',
'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_GIT_REPO',
'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
'TEST_USER_REGULAR2_EMAIL', 'TEST_HG_REPO', 'TEST_HG_REPO_CLONE',
'TEST_HG_REPO_PULL', 'TEST_GIT_REPO', 'TEST_GIT_REPO_CLONE',
'TEST_GIT_REPO_PULL', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO', 'SCM_TESTS',
]
# Invoke websetup with the current config file
# SetupCommand('setup-app').run([config_file])
##RUNNING DESIRED TESTS
# nosetests -x rhodecode.tests.functional.test_admin_settings:TestSettingsController.test_my_account
# nosetests --pdb --pdb-failures
environ = {}
#SOME GLOBALS FOR TESTS
TESTS_TMP_PATH = jn('/', 'tmp', 'rc_test_%s' % _RandomNameSequence().next())
TEST_USER_ADMIN_LOGIN = 'test_admin'
TEST_USER_ADMIN_PASS = 'test12'
TEST_USER_ADMIN_EMAIL = 'test_admin@mail.com'
TEST_USER_REGULAR_LOGIN = 'test_regular'
TEST_USER_REGULAR_PASS = 'test12'
TEST_USER_REGULAR_EMAIL = 'test_regular@mail.com'
TEST_USER_REGULAR2_LOGIN = 'test_regular2'
TEST_USER_REGULAR2_PASS = 'test12'
TEST_USER_REGULAR2_EMAIL = 'test_regular2@mail.com'
HG_REPO = 'vcs_test_hg'
GIT_REPO = 'vcs_test_git'
NEW_HG_REPO = 'vcs_test_hg_new'
NEW_GIT_REPO = 'vcs_test_git_new'
HG_FORK = 'vcs_test_hg_fork'
GIT_FORK = 'vcs_test_git_fork'
## VCS
SCM_TESTS = ['hg', 'git']
uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
THIS = os.path.abspath(os.path.dirname(__file__))
GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
TEST_GIT_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
TEST_GIT_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcsgitclone%s' % uniq_suffix)
TEST_GIT_REPO_PULL = jn(TESTS_TMP_PATH, 'vcsgitpull%s' % uniq_suffix)
HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
TEST_HG_REPO = jn(TESTS_TMP_PATH, 'vcs-hg')
TEST_HG_REPO = jn(TESTS_TMP_PATH, HG_REPO)
TEST_HG_REPO_CLONE = jn(TESTS_TMP_PATH, 'vcshgclone%s' % uniq_suffix)
TEST_HG_REPO_PULL = jn(TESTS_TMP_PATH, 'vcshgpull%s' % uniq_suffix)
TEST_DIR = tempfile.gettempdir()
TEST_REPO_PREFIX = 'vcs-test'
# cached repos if any !
# comment out to get some other repos from bb or github
GIT_REMOTE_REPO = jn(TESTS_TMP_PATH, GIT_REPO)
HG_REMOTE_REPO = jn(TESTS_TMP_PATH, HG_REPO)
def get_new_dir(title):
Returns always new directory path.
from rhodecode.tests.vcs.utils import get_normalized_path
name = TEST_REPO_PREFIX
if title:
name = '-'.join((name, title))
hex = hashlib.sha1(str(time.time())).hexdigest()
name = '-'.join((name, hex))
path = os.path.join(TEST_DIR, name)
return get_normalized_path(path)
PACKAGE_DIR = os.path.abspath(os.path.join(
os.path.dirname(__file__), '..'))
TEST_USER_CONFIG_FILE = jn(THIS, 'aconfig')
class TestController(TestCase):
def __init__(self, *args, **kwargs):
wsgiapp = pylons.test.pylonsapp
config = wsgiapp.config
self.app = TestApp(wsgiapp)
url._push_object(URLGenerator(config['routes.map'], environ))
self.Session = Session
self.index_location = config['app_conf']['index_dir']
TestCase.__init__(self, *args, **kwargs)
def log_user(self, username=TEST_USER_ADMIN_LOGIN,
password=TEST_USER_ADMIN_PASS):
self._logged_username = username
response = self.app.post(url(controller='login', action='index'),
{'username': username,
'password': password})
if 'invalid user name' in response.body:
self.fail('could not login using %s %s' % (username, password))
self.assertEqual(response.status, '302 Found')
ses = response.session['rhodecode_user']
# -*- coding: utf-8 -*-
from rhodecode.lib import vcs
from rhodecode.model.db import Repository
from rhodecode.tests import *
class TestAdminReposController(TestController):
def __make_repo(self):
def test_index(self):
self.log_user()
response = self.app.get(url('repos'))
# Test response...
def test_index_as_xml(self):
response = self.app.get(url('formatted_repos', format='xml'))
def test_create_hg(self):
repo_name = NEW_HG_REPO
description = 'description for newly created repo'
private = False
response = self.app.post(url('repos'), {'repo_name':repo_name,
'repo_type':'hg',
'clone_uri':'',
'repo_group':'',
'description':description,
'private':private})
self.checkSessionFlash(response, 'created repository %s' % (repo_name))
response = self.app.post(url('repos'), {'repo_name': repo_name,
'repo_type': 'hg',
'clone_uri': '',
'repo_group': '',
'description': description,
'private': private,
'landing_rev': 'tip'})
self.checkSessionFlash(response,
'created repository %s' % (repo_name))
#test if the repo was created in the database
new_repo = self.Session.query(Repository).filter(Repository.repo_name ==
repo_name).one()
new_repo = self.Session.query(Repository)\
.filter(Repository.repo_name == repo_name).one()
self.assertEqual(new_repo.repo_name, repo_name)
self.assertEqual(new_repo.description, description)
#test if repository is visible in the list ?
response = response.follow()
self.assertTrue(repo_name in response.body)
response.mustcontain(repo_name)
#test if repository was created on filesystem
vcs.get_repo(os.path.join(TESTS_TMP_PATH, repo_name))
except:
self.fail('no repo in filesystem')
self.fail('no repo %s in filesystem' % repo_name)
def test_create_hg_non_ascii(self):
non_ascii = "ąęł"
repo_name = "%s%s" % (NEW_HG_REPO, non_ascii)
repo_name_unicode = repo_name.decode('utf8')
description = 'description for newly created repo' + non_ascii
description_unicode = description.decode('utf8')
'created repository %s' % (repo_name_unicode))
repo_name_unicode).one()
.filter(Repository.repo_name == repo_name_unicode).one()
self.assertEqual(new_repo.repo_name, repo_name_unicode)
self.assertEqual(new_repo.description, description_unicode)
def test_create_hg_in_group(self):
#TODO: write test !
def test_create_git(self):
return
repo_name = NEW_GIT_REPO
'repo_type':'git',
'repo_type': 'git',
#test if we have a message for that repository
assert '''created repository %s''' % (repo_name) in response.session['flash'][0], 'No flash message about new repo'
#test if the fork was created in the database
new_repo = self.Session.query(Repository).filter(Repository.repo_name == repo_name).one()
assert new_repo.repo_name == repo_name, 'wrong name of repo name in db'
assert new_repo.description == description, 'wrong description'
assert repo_name in response.body, 'missing new repo from the main repos list'
assert False , 'no repo in filesystem'
def test_create_git_non_ascii(self):
repo_name = "%s%s" % (NEW_GIT_REPO, non_ascii)
def test_new(self):
response = self.app.get(url('new_repo'))
def test_new_as_xml(self):
response = self.app.get(url('formatted_new_repo', format='xml'))
def test_update(self):
response = self.app.put(url('repo', repo_name=HG_REPO))
def test_update_browser_fakeout(self):
response = self.app.post(url('repo', repo_name=HG_REPO),
params=dict(_method='put'))
def test_delete(self):
def test_delete_hg(self):
repo_name = 'vcs_test_new_to_delete'
self.assertTrue('flash' in response.session)
self.assertTrue('''created repository %s''' % (repo_name) in
response.session['flash'][0])
response = self.app.delete(url('repo', repo_name=repo_name))
self.assertTrue('''deleted repository %s''' % (repo_name) in
response.follow()
#check if repo was deleted from db
deleted_repo = self.Session.query(Repository).filter(Repository.repo_name
== repo_name).scalar()
deleted_repo = self.Session.query(Repository)\
.filter(Repository.repo_name == repo_name).scalar()
self.assertEqual(deleted_repo, None)
self.assertEqual(os.path.isdir(os.path.join(TESTS_TMP_PATH, repo_name)),
False)
def test_delete_git(self):
def test_delete_repo_with_group(self):
#TODO:
def test_delete_browser_fakeout(self):
params=dict(_method='delete'))
def test_show(self):
def test_show_hg(self):
response = self.app.get(url('repo', repo_name=HG_REPO))
def test_show_as_xml(self):
response = self.app.get(url('formatted_repo', repo_name=HG_REPO,
format='xml'))
def test_show_git(self):
response = self.app.get(url('repo', repo_name=GIT_REPO))
def test_edit(self):
response = self.app.get(url('edit_repo', repo_name=HG_REPO))
def test_edit_as_xml(self):
response = self.app.get(url('formatted_edit_repo', repo_name=HG_REPO,
@@ -7,109 +7,136 @@ from rhodecode.model.user import UserMod
class TestForksController(TestController):
def setUp(self):
self.username = u'forkuser'
self.password = u'qweqwe'
self.u1 = UserModel().create_or_update(
username=self.username, password=self.password,
email=u'fork_king@rhodecode.org', name=u'u1', lastname=u'u1'
self.Session.commit()
def tearDown(self):
self.Session.delete(self.u1)
repo_name = HG_REPO
response = self.app.get(url(controller='forks', action='forks',
repo_name=repo_name))
self.assertTrue("""There are no forks yet""" in response.body)
def test_index_with_fork(self):
def test_index_with_fork_hg(self):
# create a fork
fork_name = HG_FORK
description = 'fork of vcs test'
org_repo = Repository.get_by_repo_name(repo_name)
response = self.app.post(url(controller='forks',
action='fork_create',
repo_name=repo_name),
{'repo_name':fork_name,
'fork_parent_id':org_repo.repo_id,
'private':'False'})
{'repo_name': fork_name,
'fork_parent_id': org_repo.repo_id,
'private': 'False',
self.assertTrue("""<a href="/%s/summary">"""
"""vcs_test_hg_fork</a>""" % fork_name
in response.body)
response.mustcontain(
"""<a href="/%s/summary">%s</a>""" % (fork_name, fork_name)
#remove this fork
response = self.app.delete(url('repo', repo_name=fork_name))
def test_index_with_fork_git(self):
fork_name = GIT_FORK
repo_name = GIT_REPO
def test_z_fork_create(self):
response = self.app.post(url(controller='forks', action='fork_create',
'private':'False',
#test if we have a message that fork is ok
self.assertTrue('forked %s repository as %s' \
% (repo_name, fork_name) in response.session['flash'][0])
'forked %s repository as %s' % (repo_name, fork_name))
fork_repo = self.Session.query(Repository)\
.filter(Repository.repo_name == fork_name).one()
self.assertEqual(fork_repo.repo_name, fork_name)
self.assertEqual(fork_repo.fork.repo_name, repo_name)
#test if fork is visible in the list ?
# check if fork is marked as fork
# wait for cache to expire
time.sleep(10)
response = self.app.get(url(controller='summary', action='index',
repo_name=fork_name))
self.assertTrue('Fork of %s' % repo_name in response.body)
def test_zz_fork_permission_page(self):
usr = self.log_user(self.username, self.password)['user_id']
forks = self.Session.query(Repository)\
.filter(Repository.fork_id != None)\
self.assertEqual(1, len(forks))
# set read permissions for this
RepoModel().grant_user_permission(repo=forks[0],
user=usr,
perm='repository.read')
response.mustcontain('<div style="padding:5px 3px 3px 42px;">fork of vcs test</div>')
from nose.plugins.skip import SkipTest
class TestSearchController(TestController):
response = self.app.get(url(controller='search', action='index'))
self.assertTrue('class="small" id="q" name="q" type="text"' in
response.body)
def test_empty_search(self):
if os.path.isdir(self.index_location):
raise SkipTest('skipped due to existing index')
response = self.app.get(url(controller='search', action='index'),
{'q': HG_REPO})
self.assertTrue('There is no index to search in. '
'Please run whoosh indexer' in response.body)
def test_normal_search(self):
{'q': 'def repo'})
response.mustcontain('10 results')
response.mustcontain('Permission denied')
response.mustcontain('39 results')
def test_repo_search(self):
{'q': 'repository:%s def test' % HG_REPO})
response.mustcontain('4 results')
@@ -71,34 +71,34 @@ class TestSummaryController(TestControll
ID = Repository.get_by_repo_name(HG_REPO).repo_id
response = self.app.get(url(controller='summary',
action='index',
repo_name='_%s' % ID))
#repo type
response.mustcontain("""<img style="margin-bottom:2px" class="icon" """
"""title="Mercurial repository" alt="Mercurial """
"""repository" src="/images/icons/hgicon.png"/>""")
"""title="public repository" alt="public """
"""repository" src="/images/icons/lock_open.png"/>""")
def test_index_by_id_git(self):
ID = Repository.get_by_repo_name(GIT_REPO).repo_id
"""title="Git repository" alt="Git """
"""repository" src="/images/icons/giticon.png"/>""")
def _enable_stats(self):
r = Repository.get_by_repo_name(HG_REPO)
r.enable_statistics = True
self.Session.add(r)
@@ -115,49 +115,50 @@ class TestReposGroups(unittest.TestCase)
self.assertEqual(RepoGroup.get_by_group_name('test1/initial'), None)
new_sg1 = self.__update_group(sg1.group_id, 'after', parent_id=self.g3.group_id)
self.assertTrue(self.__check_path('test3', 'after'))
self.assertEqual(RepoGroup.get_by_group_name('test3/initial'), None)
new_sg1 = self.__update_group(sg1.group_id, 'hello')
self.assertTrue(self.__check_path('hello'))
self.assertEqual(RepoGroup.get_by_group_name('hello'), new_sg1)
def test_subgrouping_with_repo(self):
g1 = _make_group('g1')
g2 = _make_group('g2')
# create new repo
form_data = dict(repo_name='john',
repo_name_full='john',
fork_name=None,
description=None,
repo_group=None,
private=False,
repo_type='hg',
clone_uri=None)
clone_uri=None,
landing_rev='tip')
cur_user = User.get_by_username(TEST_USER_ADMIN_LOGIN)
r = RepoModel().create(form_data, cur_user)
self.assertEqual(r.repo_name, 'john')
# put repo into group
form_data = form_data
form_data['repo_group'] = g1.group_id
form_data['perms_new'] = []
form_data['perms_updates'] = []
RepoModel().update(r.repo_name, form_data)
self.assertEqual(r.repo_name, 'g1/john')
self.__update_group(g1.group_id, 'g1', parent_id=g2.group_id)
self.assertTrue(self.__check_path('g2', 'g1'))
# test repo
self.assertEqual(r.repo_name, RepoGroup.url_sep().join(['g2', 'g1', r.just_name]))
def test_move_to_root(self):
g1 = _make_group('t11')
g2 = _make_group('t22', parent_id=g1.group_id)
@@ -587,66 +588,66 @@ class TestPermissions(unittest.TestCase)
self.assertEqual(u1_auth.permissions['repositories_groups'],
perms['repositories_groups'])
def test_repo_in_group_permissions(self):
self.g1 = _make_group('group1', skip_if_exists=True)
self.g2 = _make_group('group2', skip_if_exists=True)
# both perms should be read !
u1_auth = AuthUser(user_id=self.u1.user_id)
{u'group1': u'group.read', u'group2': u'group.read'})
a1_auth = AuthUser(user_id=self.anon.user_id)
self.assertEqual(a1_auth.permissions['repositories_groups'],
#Change perms to none for both groups
ReposGroupModel().grant_user_permission(repos_group=self.g1,
user=self.anon,
perm='group.none')
ReposGroupModel().grant_user_permission(repos_group=self.g2,
{u'group1': u'group.none', u'group2': u'group.none'})
# add repo to group
'repo_name':HG_REPO,
'repo_name_full':RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
'repo_group':self.g1.group_id,
'description':'desc',
'private':False
'repo_name': HG_REPO,
'repo_name_full': RepoGroup.url_sep().join([self.g1.group_name,HG_REPO]),
'repo_group': self.g1.group_id,
'description': 'desc',
'landing_rev': 'tip'
self.test_repo = RepoModel().create(form_data, cur_user=self.u1)
#grant permission for u2 !
user=self.u2,
perm='group.read')
self.assertNotEqual(self.u1, self.u2)
#u1 and anon should have not change perms while u2 should !
@@ -31,26 +31,26 @@ def setup_package():
Prepares whole package for tests which mainly means it would try to fetch
test repositories or use already existing ones.
fetchers = {
'hg': {
'alias': 'hg',
'test_repo_path': TEST_HG_REPO,
'remote_repo': HG_REMOTE_REPO,
'clone_cmd': 'hg clone',
},
'git': {
'alias': 'git',
'test_repo_path': TEST_GIT_REPO,
'remote_repo': GIT_REMOTE_REPO,
'clone_cmd': 'git clone --bare',
for scm, fetcher_info in fetchers.items():
fetcher = SCMFetcher(**fetcher_info)
fetcher.setup()
except VCSTestError, err:
raise RuntimeError(str(err))
start_dir = os.path.abspath(os.path.dirname(__file__))
unittest.defaultTestLoader.discover(start_dir)
#start_dir = os.path.abspath(os.path.dirname(__file__))
#unittest.defaultTestLoader.discover(start_dir)
Unit tests configuration module for vcs.
import hashlib
import tempfile
import datetime
from utils import get_normalized_path
from os.path import join as jn
__all__ = (
'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
'SCM_TESTS',
TEST_TMP_PATH = TESTS_TMP_PATH
#__all__ = (
# 'TEST_HG_REPO', 'TEST_GIT_REPO', 'HG_REMOTE_REPO', 'GIT_REMOTE_REPO',
# 'SCM_TESTS',
#)
#SCM_TESTS = ['hg', 'git']
#uniq_suffix = str(int(time.mktime(datetime.datetime.now().timetuple())))
TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
jn(TEST_TMP_PATH, 'vcs-git'))
TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
jn(TEST_TMP_PATH, 'vcs-hg'))
TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
#GIT_REMOTE_REPO = 'git://github.com/codeinn/vcs.git'
#TEST_TMP_PATH = os.environ.get('VCS_TEST_ROOT', '/tmp')
#TEST_GIT_REPO = os.environ.get('VCS_TEST_GIT_REPO',
# jn(TEST_TMP_PATH, 'vcs-git'))
#TEST_GIT_REPO_CLONE = os.environ.get('VCS_TEST_GIT_REPO_CLONE',
# jn(TEST_TMP_PATH, 'vcsgitclone%s' % uniq_suffix))
#TEST_GIT_REPO_PULL = os.environ.get('VCS_TEST_GIT_REPO_PULL',
# jn(TEST_TMP_PATH, 'vcsgitpull%s' % uniq_suffix))
#HG_REMOTE_REPO = 'http://bitbucket.org/marcinkuzminski/vcs'
#TEST_HG_REPO = os.environ.get('VCS_TEST_HG_REPO',
# jn(TEST_TMP_PATH, 'vcs-hg'))
#TEST_HG_REPO_CLONE = os.environ.get('VCS_TEST_HG_REPO_CLONE',
# jn(TEST_TMP_PATH, 'vcshgclone%s' % uniq_suffix))
#TEST_HG_REPO_PULL = os.environ.get('VCS_TEST_HG_REPO_PULL',
# jn(TEST_TMP_PATH, 'vcshgpull%s' % uniq_suffix))
#TEST_DIR = os.environ.get('VCS_TEST_ROOT', tempfile.gettempdir())
#TEST_REPO_PREFIX = 'vcs-test'
#def get_new_dir(title):
# """
# Returns always new directory path.
# name = TEST_REPO_PREFIX
# if title:
# name = '-'.join((name, title))
# hex = hashlib.sha1(str(time.time())).hexdigest()
# name = '-'.join((name, hex))
# path = os.path.join(TEST_DIR, name)
# return get_normalized_path(path)
@@ -135,56 +135,52 @@ class MercurialRepositoryTest(unittest.T
'ff5ab059786ebc7411e559a2cc309dfae3625a3b',
'6b6ad5f82ad5bb6190037671bd254bd4e1f4bf08',
'ee87846a61c12153b51543bf860e1026c6d3dcba', ]
self.assertEqual(org, self.repo.revisions[:31])
def test_iter_slice(self):
sliced = list(self.repo[:10])
itered = list(self.repo)[:10]
self.assertEqual(sliced, itered)
def test_slicing(self):
#4 1 5 10 95
for sfrom, sto, size in [(0, 4, 4), (1, 2, 1), (10, 15, 5),
(10, 20, 10), (5, 100, 95)]:
revs = list(self.repo[sfrom:sto])
self.assertEqual(len(revs), size)
self.assertEqual(revs[0], self.repo.get_changeset(sfrom))
self.assertEqual(revs[-1], self.repo.get_changeset(sto - 1))
def test_branches(self):
# TODO: Need more tests here
#active branches
self.assertTrue('default' in self.repo.branches)
#closed branches
self.assertFalse('web' in self.repo.branches)
self.assertFalse('git' in self.repo.branches)
self.assertTrue('git' in self.repo.branches)
# closed
self.assertTrue('workdir' in self.repo._get_branches(closed=True))
self.assertTrue('webvcs' in self.repo._get_branches(closed=True))
self.assertTrue('web' in self.repo._get_branches(closed=True))
for name, id in self.repo.branches.items():
self.assertTrue(isinstance(
self.repo.get_changeset(id), MercurialChangeset))
def test_tip_in_tags(self):
# tip is always a tag
self.assertIn('tip', self.repo.tags)
def test_tip_changeset_in_tags(self):
tip = self.repo.get_changeset()
self.assertEqual(self.repo.tags['tip'], tip.raw_id)
def test_initial_changeset(self):
init_chset = self.repo.get_changeset(0)
self.assertEqual(init_chset.message, 'initial import')
self.assertEqual(init_chset.author,
'Marcin Kuzminski <marcin@python-blog.com>')
self.assertEqual(sorted(init_chset._file_paths),
sorted([
'vcs/__init__.py',
'vcs/backends/BaseRepository.py',
'vcs/backends/__init__.py',
new file 100644
binary diff not shown
Status change: