@@ -59,147 +59,147 @@ class GitChangeset(BaseChangeset):
@LazyProperty
def author(self):
return safe_unicode(getattr(self._commit, self._author_property))
def date(self):
return date_fromtimestamp(getattr(self._commit, self._date_property),
getattr(self._commit, self._date_tz_property))
def _timestamp(self):
return getattr(self._commit, self._date_property)
def status(self):
"""
Returns modified, added, removed, deleted files for current changeset
return self.changed, self.added, self.removed
def tags(self):
_tags = []
for tname, tsha in self.repository.tags.iteritems():
if tsha == self.raw_id:
_tags.append(tname)
return _tags
def branch(self):
heads = self.repository._heads(reverse=False)
ref = heads.get(self.raw_id)
if ref:
return safe_unicode(ref)
def _fix_path(self, path):
Paths are stored without trailing slash so we need to get rid off it if
needed.
if path.endswith('/'):
path = path.rstrip('/')
return path
def _get_id_for_path(self, path):
path = safe_str(path)
# FIXME: Please, spare a couple of minutes and make those codes cleaner;
if not path in self._paths:
path = path.strip('/')
# set root tree
tree = self.repository._repo[self._tree_id]
if path == '':
self._paths[''] = tree.id
return tree.id
splitted = path.split('/')
dirs, name = splitted[:-1], splitted[-1]
curdir = ''
# initially extract things from root dir
for item, stat, id in tree.iteritems():
if curdir:
name = '/'.join((curdir, item))
else:
name = item
self._paths[name] = id
self._stat_modes[name] = stat
for dir in dirs:
curdir = '/'.join((curdir, dir))
curdir = dir
dir_id = None
if dir == item:
dir_id = id
if dir_id:
# Update tree
tree = self.repository._repo[dir_id]
if not isinstance(tree, objects.Tree):
raise ChangesetError('%s is not a directory' % curdir)
raise ChangesetError('%s have not been found' % curdir)
# cache all items from the given traversed tree
raise NodeDoesNotExistError("There is no file nor directory "
"at the given path '%s' at revision %s"
% (path, self.short_id))
% (path, safe_str(self.short_id)))
return self._paths[path]
def _get_kind(self, path):
obj = self.repository._repo[self._get_id_for_path(path)]
if isinstance(obj, objects.Blob):
return NodeKind.FILE
elif isinstance(obj, objects.Tree):
return NodeKind.DIR
def _get_filectx(self, path):
path = self._fix_path(path)
if self._get_kind(path) != NodeKind.FILE:
raise ChangesetError("File does not exist for revision %s at "
" '%s'" % (self.raw_id, path))
def _get_file_nodes(self):
return chain(*(t[2] for t in self.walk()))
def parents(self):
Returns list of parents changesets.
return [self.repository.get_changeset(parent)
for parent in self._commit.parents]
def children(self):
Returns list of children changesets.
rev_filter = _git_path = settings.GIT_REV_FILTER
so, se = self.repository.run_git_command(
"rev-list %s --children | grep '^%s'" % (rev_filter, self.raw_id)
)
children = []
for l in so.splitlines():
childs = l.split(' ')[1:]
children.extend(childs)
return [self.repository.get_changeset(cs) for cs in children]
def next(self, branch=None):
if branch and self.branch != branch:
raise VCSError('Branch option used on changeset not belonging '
'to that branch')
@@ -209,96 +209,97 @@ class GitChangeset(BaseChangeset):
next_ = changeset.revision + 1
next_rev = changeset.repository.revisions[next_]
except IndexError:
raise ChangesetDoesNotExistError
cs = changeset.repository.get_changeset(next_rev)
if branch and branch != cs.branch:
return _next(cs, branch)
return cs
return _next(self, branch)
def prev(self, branch=None):
def _prev(changeset, branch):
try:
prev_ = changeset.revision - 1
if prev_ < 0:
raise IndexError
prev_rev = changeset.repository.revisions[prev_]
cs = changeset.repository.get_changeset(prev_rev)
return _prev(cs, branch)
return _prev(self, branch)
def diff(self, ignore_whitespace=True, context=3):
rev1 = self.parents[0] if self.parents else self.repository.EMPTY_CHANGESET
rev2 = self
return ''.join(self.repository.get_diff(rev1, rev2,
ignore_whitespace=ignore_whitespace,
context=context))
def get_file_mode(self, path):
Returns stat mode of the file at the given ``path``.
# ensure path is traversed
self._get_id_for_path(path)
return self._stat_modes[path]
def get_file_content(self, path):
Returns content of the file at given ``path``.
id = self._get_id_for_path(path)
blob = self.repository._repo[id]
return blob.as_pretty_string()
def get_file_size(self, path):
Returns size of the file at given ``path``.
return blob.raw_length()
def get_file_changeset(self, path):
Returns last commit of the file at the given ``path``.
return self.get_file_history(path, limit=1)[0]
def get_file_history(self, path, limit=None):
Returns history of file as reversed list of ``Changeset`` objects for
which file at given ``path`` has been modified.
TODO: This function now uses os underlying 'git' and 'grep' commands
which is generally not good. Should be replaced with algorithm
iterating commits.
self._get_filectx(path)
cs_id = safe_str(self.id)
f_path = safe_str(path)
if limit:
cmd = 'log -n %s --pretty="format: %%H" -s -p %s -- "%s"' % (
safe_int(limit, 0), cs_id, f_path
cmd = 'log --pretty="format: %%H" -s -p %s -- "%s"' % (
cs_id, f_path
so, se = self.repository.run_git_command(cmd)
import time
import datetime
import posixpath
from dulwich import objects
from dulwich.repo import Repo
from rhodecode.lib.vcs.backends.base import BaseInMemoryChangeset
from rhodecode.lib.vcs.exceptions import RepositoryError
from rhodecode.lib.vcs.utils import safe_str
class GitInMemoryChangeset(BaseInMemoryChangeset):
def commit(self, message, author, parents=None, branch=None, date=None,
**kwargs):
Performs in-memory commit (doesn't check workdir in any way) and
returns newly created ``Changeset``. Updates repository's
``revisions``.
:param message: message of the commit
:param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
:param parents: single parent or sequence of parents from which commit
would be derieved
:param date: ``datetime.datetime`` instance. Defaults to
``datetime.datetime.now()``.
:param branch: branch name, as string. If none given, default backend's
branch would be used.
:raises ``CommitError``: if any error occurs while committing
self.check_integrity(parents)
from .repository import GitRepository
if branch is None:
branch = GitRepository.DEFAULT_BRANCH_NAME
repo = self.repository._repo
object_store = repo.object_store
ENCODING = "UTF-8"
DIRMOD = 040000
# Create tree and populates it with blobs
commit_tree = self.parents[0] and repo[self.parents[0]._commit.tree] or\
objects.Tree()
for node in self.added + self.changed:
# Compute subdirs if needed
dirpath, nodename = posixpath.split(node.path)
dirnames = dirpath and dirpath.split('/') or []
dirnames = map(safe_str, dirpath and dirpath.split('/') or [])
parent = commit_tree
ancestors = [('', parent)]
# Tries to dig for the deepest existing tree
while dirnames:
curdir = dirnames.pop(0)
dir_id = parent[curdir][1]
except KeyError:
# put curdir back into dirnames and stops
dirnames.insert(0, curdir)
break
# If found, updates parent
parent = self.repository._repo[dir_id]
ancestors.append((curdir, parent))
# Now parent is deepest existing tree and we need to create subtrees
# for dirnames (in reverse order) [this only applies for nodes from added]
new_trees = []
if not node.is_binary:
content = node.content.encode(ENCODING)
content = node.content
blob = objects.Blob.from_string(content)
node_path = node.name.encode(ENCODING)
if dirnames:
# If there are trees which should be created we need to build
# them now (in reverse order)
reversed_dirnames = list(reversed(dirnames))
curtree = objects.Tree()
curtree[node_path] = node.mode, blob.id
new_trees.append(curtree)
for dirname in reversed_dirnames[:-1]:
newtree = objects.Tree()
#newtree.add(DIRMOD, dirname, curtree.id)
newtree[dirname] = DIRMOD, curtree.id
new_trees.append(newtree)
curtree = newtree
parent[reversed_dirnames[-1]] = DIRMOD, curtree.id
parent.add(name=node_path, mode=node.mode, hexsha=blob.id)
new_trees.append(parent)
# Update ancestors
for parent, tree, path in reversed([(a[1], b[1], b[0]) for a, b in
zip(ancestors, ancestors[1:])]):
# -*- coding: utf-8 -*-
vcs.nodes
~~~~~~~~~
Module holding everything related to vcs nodes.
:created_on: Apr 8, 2010
:copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
import os
import stat
import mimetypes
from rhodecode.lib.vcs.backends.base import EmptyChangeset
from rhodecode.lib.vcs.exceptions import NodeError, RemovedFileNodeError
from rhodecode.lib.vcs.utils.lazy import LazyProperty
from rhodecode.lib.vcs.utils import safe_unicode
from rhodecode.lib.vcs.utils import safe_unicode, safe_str
class NodeKind:
SUBMODULE = -1
DIR = 1
FILE = 2
class NodeState:
ADDED = u'added'
CHANGED = u'changed'
NOT_CHANGED = u'not changed'
REMOVED = u'removed'
class NodeGeneratorBase(object):
Base class for removed added and changed filenodes, it's a lazy generator
class that will create filenodes only on iteration or call
The len method doesn't need to create filenodes at all
def __init__(self, current_paths, cs):
self.cs = cs
self.current_paths = current_paths
def __call__(self):
return [n for n in self]
def __getslice__(self, i, j):
for p in self.current_paths[i:j]:
yield self.cs.get_node(p)
def __len__(self):
return len(self.current_paths)
def __iter__(self):
for p in self.current_paths:
class AddedFileNodesGenerator(NodeGeneratorBase):
Class holding Added files for current changeset
pass
class ChangedFileNodesGenerator(NodeGeneratorBase):
Class holding Changed files for current changeset
class RemovedFileNodesGenerator(NodeGeneratorBase):
Class holding removed files for current changeset
yield RemovedFileNode(path=p)
class Node(object):
Simplest class representing file or directory on repository. SCM backends
should use ``FileNode`` and ``DirNode`` subclasses rather than ``Node``
directly.
Node's ``path`` cannot start with slash as we operate on *relative* paths
only. Moreover, every single node is identified by the ``path`` attribute,
so it cannot end with slash, too. Otherwise, path could lead to mistakes.
def __init__(self, path, kind):
if path.startswith('/'):
raise NodeError("Cannot initialize Node objects with slash at "
"the beginning as only relative paths are supported")
self.path = path.rstrip('/')
self.path = safe_str(path.rstrip('/')) # we store paths as str
if path == '' and kind != NodeKind.DIR:
raise NodeError("Only DirNode and its subclasses may be "
"initialized with empty path")
self.kind = kind
#self.dirs, self.files = [], []
if self.is_root() and not self.is_dir():
raise NodeError("Root node cannot be FILE kind")
def parent(self):
parent_path = self.get_parent_path()
if parent_path:
if self.changeset:
return self.changeset.get_node(parent_path)
return DirNode(parent_path)
return None
def unicode_path(self):
return safe_unicode(self.path)
def name(self):
Returns name of the node so if its path
then only last part is returned.
return safe_unicode(self.path.rstrip('/').split('/')[-1])
def _get_kind(self):
return self._kind
def _set_kind(self, kind):
if hasattr(self, '_kind'):
raise NodeError("Cannot change node's kind")
self._kind = kind
# Post setter check (path's trailing slash)
if self.path.endswith('/'):
raise NodeError("Node's path cannot end with slash")
kind = property(_get_kind, _set_kind)
def __cmp__(self, other):
Comparator using name of the node, needed for quick list sorting.
kind_cmp = cmp(self.kind, other.kind)
@@ -535,98 +535,98 @@ class ScmModel(BaseModel):
# decoding here will force that we have proper encoded values
# in any other case this will throw exceptions and deny commit
content = safe_str(content)
path = safe_str(f_path)
# message and author needs to be unicode
# proper backend should then translate that into required type
message = safe_unicode(message)
author = safe_unicode(author)
imc = IMC(repo)
imc.change(FileNode(path, content, mode=cs.get_file_mode(f_path)))
tip = imc.commit(message=message,
author=author,
parents=[cs], branch=cs.branch)
self.mark_for_invalidation(repo_name)
self._handle_push(repo,
username=user.username,
action='push_local',
repo_name=repo_name,
revisions=[tip.raw_id])
return tip
def create_nodes(self, user, repo, message, nodes, parent_cs=None,
author=None, trigger_push_hook=True):
Commits given multiple nodes into repo
:param user: RhodeCode User object or user_id, the commiter
:param repo: RhodeCode Repository object
:param message: commit message
:param nodes: mapping {filename:{'content':content},...}
:param parent_cs: parent changeset, can be empty than it's initial commit
:param author: author of commit, cna be different that commiter only for git
:param trigger_push_hook: trigger push hooks
:returns: new commited changeset
user = self._get_user(user)
scm_instance = repo.scm_instance_no_cache()
processed_nodes = []
for f_path in nodes:
if f_path.startswith('/') or f_path.startswith('.') or '../' in f_path:
raise NonRelativePathError('%s is not an relative path' % f_path)
if f_path:
f_path = os.path.normpath(f_path)
content = nodes[f_path]['content']
f_path = safe_str(f_path)
if isinstance(content, (basestring,)):
elif isinstance(content, (file, cStringIO.OutputType,)):
content = content.read()
raise Exception('Content is of unrecognized type %s' % (
type(content)
))
processed_nodes.append((f_path, content))
commiter = user.full_contact
author = safe_unicode(author) if author else commiter
IMC = self._get_IMC_module(scm_instance.alias)
imc = IMC(scm_instance)
if not parent_cs:
parent_cs = EmptyChangeset(alias=scm_instance.alias)
if isinstance(parent_cs, EmptyChangeset):
# EmptyChangeset means we we're editing empty repository
parents = None
parents = [parent_cs]
# add multiple nodes
for path, content in processed_nodes:
imc.add(FileNode(path, content=content))
parents=parents,
branch=parent_cs.branch)
self.mark_for_invalidation(repo.repo_name)
if trigger_push_hook:
self._handle_push(scm_instance,
repo_name=repo.repo_name,
def get_nodes(self, repo_name, revision, root_path='/', flat=True):
recursive walk in root dir and return a set of all path in that dir
# encoding: utf8
from __future__ import with_statement
from rhodecode.lib import vcs
from rhodecode.tests.vcs.base import BackendTestMixin
from rhodecode.tests.vcs.conf import SCM_TESTS
from rhodecode.lib.vcs.backends.base import BaseChangeset
from rhodecode.lib.vcs.nodes import (
FileNode, AddedFileNodesGenerator,
ChangedFileNodesGenerator, RemovedFileNodesGenerator
from rhodecode.lib.vcs.exceptions import (
BranchDoesNotExistError, ChangesetDoesNotExistError,
RepositoryError, EmptyRepositoryError
from rhodecode.lib.vcs.utils.compat import unittest
from rhodecode.tests.vcs.conf import get_new_dir
class TestBaseChangeset(unittest.TestCase):
def test_as_dict(self):
changeset = BaseChangeset()
changeset.id = 'ID'
changeset.raw_id = 'RAW_ID'
changeset.short_id = 'SHORT_ID'
changeset.revision = 1009
changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
changeset.message = 'Message of a commit'
changeset.author = 'Joe Doe <joe.doe@example.com>'
changeset.added = [FileNode('foo/bar/baz'), FileNode('foobar')]
changeset.changed = []
changeset.removed = []
self.assertEqual(changeset.as_dict(), {
'id': 'ID',
'raw_id': 'RAW_ID',
'short_id': 'SHORT_ID',
'revision': 1009,
'date': datetime.datetime(2011, 1, 30, 1, 45),
'message': 'Message of a commit',
'author': {
'name': 'Joe Doe',
'email': 'joe.doe@example.com',
},
'added': ['foo/bar/baz', 'foobar'],
'changed': [],
@@ -257,114 +258,124 @@ class ChangesetsTestCaseMixin(BackendTes
tip = self.repo.get_changeset()
self.assertEqual(tip.author_email, u'joe.doe@example.com')
def test_get_changesets_raise_changesetdoesnotexist_for_wrong_start(self):
with self.assertRaises(ChangesetDoesNotExistError):
list(self.repo.get_changesets(start='foobar'))
def test_get_changesets_raise_changesetdoesnotexist_for_wrong_end(self):
list(self.repo.get_changesets(end='foobar'))
def test_get_changesets_raise_branchdoesnotexist_for_wrong_branch_name(self):
with self.assertRaises(BranchDoesNotExistError):
list(self.repo.get_changesets(branch_name='foobar'))
def test_get_changesets_raise_repositoryerror_for_wrong_start_end(self):
start = self.repo.revisions[-1]
end = self.repo.revisions[0]
with self.assertRaises(RepositoryError):
list(self.repo.get_changesets(start=start, end=end))
def test_get_changesets_numerical_id_reversed(self):
[x for x in self.repo.get_changesets(start=3, end=2)]
def test_get_changesets_numerical_id_respects_both_start_and_end_last(self):
last = len(self.repo.revisions)
list(self.repo.get_changesets(start=last-1, end=last-2))
def test_get_changesets_numerical_id_last_zero_error(self):
list(self.repo.get_changesets(start=last-1, end=0))
class ChangesetsChangesTestCaseMixin(BackendTestMixin):
recreate_repo_per_test = False
@classmethod
def _get_commits(cls):
return [
{
'message': u'Initial',
'author': u'Joe Doe <joe.doe@example.com>',
'date': datetime.datetime(2010, 1, 1, 20),
'added': [
FileNode('foo/bar', content='foo'),
FileNode('foo/bał', content='foo'),
FileNode('foobar', content='foo'),
FileNode('qwe', content='foo'),
],
'message': u'Massive changes',
'date': datetime.datetime(2010, 1, 1, 22),
'added': [FileNode('fallout', content='War never changes')],
'changed': [
FileNode('foo/bar', content='baz'),
FileNode('foobar', content='baz'),
'removed': [FileNode('qwe')],
]
def test_initial_commit(self):
changeset = self.repo.get_changeset(0)
self.assertItemsEqual(changeset.added, [
changeset.get_node('foo/bar'),
changeset.get_node('foo/bał'),
changeset.get_node('foobar'),
changeset.get_node('qwe'),
])
self.assertItemsEqual(changeset.changed, [])
self.assertItemsEqual(changeset.removed, [])
def test_head_added(self):
changeset = self.repo.get_changeset()
self.assertTrue(isinstance(changeset.added, AddedFileNodesGenerator))
changeset.get_node('fallout'),
self.assertTrue(isinstance(changeset.changed, ChangedFileNodesGenerator))
self.assertItemsEqual(changeset.changed, [
self.assertTrue(isinstance(changeset.removed, RemovedFileNodesGenerator))
self.assertEqual(len(changeset.removed), 1)
self.assertEqual(list(changeset.removed)[0].path, 'qwe')
def test_get_filemode(self):
self.assertEqual(33188, changeset.get_file_mode('foo/bar'))
def test_get_filemode_non_ascii(self):
self.assertEqual(33188, changeset.get_file_mode('foo/bał'))
self.assertEqual(33188, changeset.get_file_mode(u'foo/bał'))
# For each backend create test case class
for alias in SCM_TESTS:
attrs = {
'backend_alias': alias,
}
# tests with additional commits
cls_name = ''.join(('%s changesets with commits test' % alias).title().split())
bases = (ChangesetsWithCommitsTestCaseixin, unittest.TestCase)
globals()[cls_name] = type(cls_name, bases, attrs)
# tests without additional commits
cls_name = ''.join(('%s changesets test' % alias).title().split())
bases = (ChangesetsTestCaseMixin, unittest.TestCase)
# tests changes
cls_name = ''.join(('%s changesets changes test' % alias).title().split())
bases = (ChangesetsChangesTestCaseMixin, unittest.TestCase)
if __name__ == '__main__':
unittest.main()
Tests so called "in memory changesets" commit API of vcs.
from rhodecode.tests.vcs.conf import SCM_TESTS, get_new_dir
from rhodecode.lib.vcs.exceptions import EmptyRepositoryError
from rhodecode.lib.vcs.exceptions import NodeAlreadyAddedError
from rhodecode.lib.vcs.exceptions import NodeAlreadyExistsError
from rhodecode.lib.vcs.exceptions import NodeAlreadyRemovedError
from rhodecode.lib.vcs.exceptions import NodeAlreadyChangedError
from rhodecode.lib.vcs.exceptions import NodeDoesNotExistError
from rhodecode.lib.vcs.exceptions import NodeNotChangedError
from rhodecode.lib.vcs.nodes import DirNode
from rhodecode.lib.vcs.nodes import FileNode
class InMemoryChangesetTestMixin(object):
This is a backend independent test case class which should be created
with ``type`` method.
It is required to set following attributes at subclass:
- ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
- ``repo_path``: path to the repository which would be created for set of
tests
def get_backend(self):
return vcs.get_backend(self.backend_alias)
def setUp(self):
Backend = self.get_backend()
self.repo_path = get_new_dir(str(time.time()))
self.repo = Backend(self.repo_path, create=True)
self.imc = self.repo.in_memory_changeset
self.nodes = [
FileNode('foobar', content='Foo & bar'),
FileNode('foobar2', content='Foo & bar, doubled!'),
FileNode('foo bar with spaces', content=''),
FileNode('foo/bar/baz', content='Inside'),
FileNode('foo/bar/file.bin', content='\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'),
def test_add(self):
rev_count = len(self.repo.revisions)
to_add = [FileNode(node.path, content=node.content)
for node in self.nodes]
for node in to_add:
self.imc.add(node)
message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
author = unicode(self.__class__)
changeset = self.imc.commit(message=message, author=author)
newtip = self.repo.get_changeset()
self.assertEqual(changeset, newtip)
self.assertEqual(rev_count + 1, len(self.repo.revisions))
self.assertEqual(newtip.message, message)
self.assertEqual(newtip.author, author)
self.assertTrue(not any((self.imc.added, self.imc.changed,
self.imc.removed)))
self.assertEqual(newtip.get_node(node.path).content, node.content)
def test_add_in_bulk(self):
self.imc.add(*to_add)
def test_add_actually_adds_all_nodes_at_second_commit_too(self):
self.imc.add(FileNode('foo/bar/image.png', content='\0'))
self.imc.add(FileNode('foo/README.txt', content='readme!'))
changeset = self.imc.commit(u'Initial', u'joe.doe@example.com')
self.assertTrue(isinstance(changeset.get_node('foo'), DirNode))
self.assertTrue(isinstance(changeset.get_node('foo/bar'), DirNode))
self.assertEqual(changeset.get_node('foo/bar/image.png').content, '\0')
self.assertEqual(changeset.get_node('foo/README.txt').content, 'readme!')
# commit some more files again
to_add = [
FileNode('foo/bar/foobaz/bar', content='foo'),
FileNode('foo/bar/another/bar', content='foo'),
FileNode('foo/baz.txt', content='foo'),
FileNode('foobar/foobaz/file', content='foo'),
FileNode('foobar/barbaz', content='foo'),
changeset = self.imc.commit(u'Another', u'joe.doe@example.com')
self.assertEqual(changeset.get_node('foo/bar/foobaz/bar').content, 'foo')
self.assertEqual(changeset.get_node('foo/bar/another/bar').content, 'foo')
self.assertEqual(changeset.get_node('foo/baz.txt').content, 'foo')
self.assertEqual(changeset.get_node('foobar/foobaz/file').content, 'foo')
self.assertEqual(changeset.get_node('foobar/barbaz').content, 'foo')
def test_add_non_ascii_files(self):
FileNode('żółwik/zwierzątko', content='ćććć'),
FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
def test_add_raise_already_added(self):
node = FileNode('foobar', content='baz')
self.assertRaises(NodeAlreadyAddedError, self.imc.add, node)
def test_check_integrity_raise_already_exist(self):
self.imc.commit(message=u'Added foobar', author=unicode(self))
self.assertRaises(NodeAlreadyExistsError, self.imc.commit,
message='new message',
author=str(self))
def test_change(self):
self.imc.add(FileNode('foo/bar/baz', content='foo'))
self.imc.add(FileNode('foo/fbar', content='foobar'))
tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
# Change node's content
node = FileNode('foo/bar/baz', content='My **changed** content')
self.imc.change(node)
self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
self.assertNotEqual(tip, newtip)
self.assertNotEqual(tip.id, newtip.id)
self.assertEqual(newtip.get_node('foo/bar/baz').content,
'My **changed** content')
def test_change_non_ascii(self):
node = FileNode('żółwik/zwierzątko', content='My **changed** content')
self.imc.commit(u'Changed %s' % safe_unicode(node.path),
u'joe.doe@example.com')
node = FileNode(u'żółwik/zwierzątko_uni', content=u'My **changed** content')
self.assertEqual(newtip.get_node('żółwik/zwierzątko').content,
self.assertEqual(newtip.get_node('żółwik/zwierzątko_uni').content,
def test_change_raise_empty_repository(self):
node = FileNode('foobar')
self.assertRaises(EmptyRepositoryError, self.imc.change, node)
def test_check_integrity_change_raise_node_does_not_exist(self):
node = FileNode('not-foobar', content='')
self.assertRaises(NodeDoesNotExistError, self.imc.commit,
message='Changed not existing node',
def test_change_raise_node_already_changed(self):
node = FileNode('foobar', content='more baz')
self.assertRaises(NodeAlreadyChangedError, self.imc.change, node)
def test_check_integrity_change_raise_node_not_changed(self):
self.test_add() # Performs first commit
node = FileNode(self.nodes[0].path, content=self.nodes[0].content)
self.assertRaises(NodeNotChangedError, self.imc.commit,
message=u'Trying to mark node as changed without touching it',
author=unicode(self))
def test_change_raise_node_already_removed(self):
self.imc.remove(FileNode('foobar'))
self.assertRaises(NodeAlreadyRemovedError, self.imc.change, node)
def test_remove(self):
node = self.nodes[0]
self.assertEqual(node.content, tip.get_node(node.path).content)
self.imc.remove(node)
self.imc.commit(message=u'Removed %s' % node.path, author=unicode(self))
Status change: