@@ -83,49 +83,49 @@ class GitChangeset(BaseChangeset):
for tname, tsha in self.repository.tags.iteritems():
if tsha == self.raw_id:
_tags.append(tname)
return _tags
@LazyProperty
def branch(self):
heads = self.repository._heads(reverse=False)
ref = heads.get(self.raw_id)
if ref:
return safe_unicode(ref)
def _fix_path(self, path):
"""
Paths are stored without trailing slash so we need to get rid off it if
needed.
if path.endswith('/'):
path = path.rstrip('/')
return path
def _get_id_for_path(self, path):
path = safe_str(path)
# FIXME: Please, spare a couple of minutes and make those codes cleaner;
if not path in self._paths:
path = path.strip('/')
# set root tree
tree = self.repository._repo[self._tree_id]
if path == '':
self._paths[''] = tree.id
return tree.id
splitted = path.split('/')
dirs, name = splitted[:-1], splitted[-1]
curdir = ''
# initially extract things from root dir
for item, stat, id in tree.iteritems():
if curdir:
name = '/'.join((curdir, item))
else:
name = item
self._paths[name] = id
self._stat_modes[name] = stat
for dir in dirs:
curdir = '/'.join((curdir, dir))
@@ -133,49 +133,49 @@ class GitChangeset(BaseChangeset):
curdir = dir
dir_id = None
if dir == item:
dir_id = id
if dir_id:
# Update tree
tree = self.repository._repo[dir_id]
if not isinstance(tree, objects.Tree):
raise ChangesetError('%s is not a directory' % curdir)
raise ChangesetError('%s have not been found' % curdir)
# cache all items from the given traversed tree
raise NodeDoesNotExistError("There is no file nor directory "
"at the given path '%s' at revision %s"
% (path, self.short_id))
% (path, safe_str(self.short_id)))
return self._paths[path]
def _get_kind(self, path):
obj = self.repository._repo[self._get_id_for_path(path)]
if isinstance(obj, objects.Blob):
return NodeKind.FILE
elif isinstance(obj, objects.Tree):
return NodeKind.DIR
def _get_filectx(self, path):
path = self._fix_path(path)
if self._get_kind(path) != NodeKind.FILE:
raise ChangesetError("File does not exist for revision %s at "
" '%s'" % (self.raw_id, path))
def _get_file_nodes(self):
return chain(*(t[2] for t in self.walk()))
def parents(self):
Returns list of parents changesets.
@@ -233,48 +233,49 @@ class GitChangeset(BaseChangeset):
except IndexError:
raise ChangesetDoesNotExistError
cs = changeset.repository.get_changeset(prev_rev)
if branch and branch != cs.branch:
return _prev(cs, branch)
return cs
return _prev(self, branch)
def diff(self, ignore_whitespace=True, context=3):
rev1 = self.parents[0] if self.parents else self.repository.EMPTY_CHANGESET
rev2 = self
return ''.join(self.repository.get_diff(rev1, rev2,
ignore_whitespace=ignore_whitespace,
context=context))
def get_file_mode(self, path):
Returns stat mode of the file at the given ``path``.
# ensure path is traversed
self._get_id_for_path(path)
return self._stat_modes[path]
def get_file_content(self, path):
Returns content of the file at given ``path``.
id = self._get_id_for_path(path)
blob = self.repository._repo[id]
return blob.as_pretty_string()
def get_file_size(self, path):
Returns size of the file at given ``path``.
return blob.raw_length()
def get_file_changeset(self, path):
Returns last commit of the file at the given ``path``.
return self.get_file_history(path, limit=1)[0]
@@ -25,49 +25,49 @@ class GitInMemoryChangeset(BaseInMemoryC
``datetime.datetime.now()``.
:param branch: branch name, as string. If none given, default backend's
branch would be used.
:raises ``CommitError``: if any error occurs while committing
self.check_integrity(parents)
from .repository import GitRepository
if branch is None:
branch = GitRepository.DEFAULT_BRANCH_NAME
repo = self.repository._repo
object_store = repo.object_store
ENCODING = "UTF-8"
DIRMOD = 040000
# Create tree and populates it with blobs
commit_tree = self.parents[0] and repo[self.parents[0]._commit.tree] or\
objects.Tree()
for node in self.added + self.changed:
# Compute subdirs if needed
dirpath, nodename = posixpath.split(node.path)
dirnames = dirpath and dirpath.split('/') or []
dirnames = map(safe_str, dirpath and dirpath.split('/') or [])
parent = commit_tree
ancestors = [('', parent)]
# Tries to dig for the deepest existing tree
while dirnames:
curdir = dirnames.pop(0)
try:
dir_id = parent[curdir][1]
except KeyError:
# put curdir back into dirnames and stops
dirnames.insert(0, curdir)
break
# If found, updates parent
parent = self.repository._repo[dir_id]
ancestors.append((curdir, parent))
# Now parent is deepest existing tree and we need to create subtrees
# for dirnames (in reverse order) [this only applies for nodes from added]
new_trees = []
if not node.is_binary:
content = node.content.encode(ENCODING)
content = node.content
# -*- coding: utf-8 -*-
vcs.nodes
~~~~~~~~~
Module holding everything related to vcs nodes.
:created_on: Apr 8, 2010
:copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
import os
import stat
import posixpath
import mimetypes
from rhodecode.lib.vcs.backends.base import EmptyChangeset
from rhodecode.lib.vcs.exceptions import NodeError, RemovedFileNodeError
from rhodecode.lib.vcs.utils.lazy import LazyProperty
from rhodecode.lib.vcs.utils import safe_unicode
from rhodecode.lib.vcs.utils import safe_unicode, safe_str
class NodeKind:
SUBMODULE = -1
DIR = 1
FILE = 2
class NodeState:
ADDED = u'added'
CHANGED = u'changed'
NOT_CHANGED = u'not changed'
REMOVED = u'removed'
class NodeGeneratorBase(object):
Base class for removed added and changed filenodes, it's a lazy generator
class that will create filenodes only on iteration or call
The len method doesn't need to create filenodes at all
def __init__(self, current_paths, cs):
@@ -79,50 +79,50 @@ class RemovedFileNodesGenerator(NodeGene
def __iter__(self):
for p in self.current_paths:
yield RemovedFileNode(path=p)
def __getslice__(self, i, j):
for p in self.current_paths[i:j]:
class Node(object):
Simplest class representing file or directory on repository. SCM backends
should use ``FileNode`` and ``DirNode`` subclasses rather than ``Node``
directly.
Node's ``path`` cannot start with slash as we operate on *relative* paths
only. Moreover, every single node is identified by the ``path`` attribute,
so it cannot end with slash, too. Otherwise, path could lead to mistakes.
def __init__(self, path, kind):
if path.startswith('/'):
raise NodeError("Cannot initialize Node objects with slash at "
"the beginning as only relative paths are supported")
self.path = path.rstrip('/')
self.path = safe_str(path.rstrip('/')) # we store paths as str
if path == '' and kind != NodeKind.DIR:
raise NodeError("Only DirNode and its subclasses may be "
"initialized with empty path")
self.kind = kind
#self.dirs, self.files = [], []
if self.is_root() and not self.is_dir():
raise NodeError("Root node cannot be FILE kind")
def parent(self):
parent_path = self.get_parent_path()
if parent_path:
if self.changeset:
return self.changeset.get_node(parent_path)
return DirNode(parent_path)
return None
def unicode_path(self):
return safe_unicode(self.path)
def name(self):
@@ -559,50 +559,50 @@ class ScmModel(BaseModel):
author=None, trigger_push_hook=True):
Commits given multiple nodes into repo
:param user: RhodeCode User object or user_id, the commiter
:param repo: RhodeCode Repository object
:param message: commit message
:param nodes: mapping {filename:{'content':content},...}
:param parent_cs: parent changeset, can be empty than it's initial commit
:param author: author of commit, cna be different that commiter only for git
:param trigger_push_hook: trigger push hooks
:returns: new commited changeset
user = self._get_user(user)
scm_instance = repo.scm_instance_no_cache()
processed_nodes = []
for f_path in nodes:
if f_path.startswith('/') or f_path.startswith('.') or '../' in f_path:
raise NonRelativePathError('%s is not an relative path' % f_path)
if f_path:
f_path = os.path.normpath(f_path)
content = nodes[f_path]['content']
f_path = safe_str(f_path)
# decoding here will force that we have proper encoded values
# in any other case this will throw exceptions and deny commit
if isinstance(content, (basestring,)):
content = safe_str(content)
elif isinstance(content, (file, cStringIO.OutputType,)):
content = content.read()
raise Exception('Content is of unrecognized type %s' % (
type(content)
))
processed_nodes.append((f_path, content))
message = safe_unicode(message)
commiter = user.full_contact
author = safe_unicode(author) if author else commiter
IMC = self._get_IMC_module(scm_instance.alias)
imc = IMC(scm_instance)
if not parent_cs:
parent_cs = EmptyChangeset(alias=scm_instance.alias)
if isinstance(parent_cs, EmptyChangeset):
# EmptyChangeset means we we're editing empty repository
# encoding: utf8
from __future__ import with_statement
import time
import datetime
from rhodecode.lib import vcs
from rhodecode.tests.vcs.base import BackendTestMixin
from rhodecode.tests.vcs.conf import SCM_TESTS
from rhodecode.lib.vcs.backends.base import BaseChangeset
from rhodecode.lib.vcs.nodes import (
FileNode, AddedFileNodesGenerator,
ChangedFileNodesGenerator, RemovedFileNodesGenerator
)
from rhodecode.lib.vcs.exceptions import (
BranchDoesNotExistError, ChangesetDoesNotExistError,
RepositoryError, EmptyRepositoryError
from rhodecode.lib.vcs.utils.compat import unittest
from rhodecode.tests.vcs.conf import get_new_dir
class TestBaseChangeset(unittest.TestCase):
def test_as_dict(self):
@@ -281,90 +282,100 @@ class ChangesetsTestCaseMixin(BackendTes
def test_get_changesets_numerical_id_respects_both_start_and_end_last(self):
with self.assertRaises(RepositoryError):
last = len(self.repo.revisions)
list(self.repo.get_changesets(start=last-1, end=last-2))
def test_get_changesets_numerical_id_last_zero_error(self):
list(self.repo.get_changesets(start=last-1, end=0))
class ChangesetsChangesTestCaseMixin(BackendTestMixin):
recreate_repo_per_test = False
@classmethod
def _get_commits(cls):
return [
{
'message': u'Initial',
'author': u'Joe Doe <joe.doe@example.com>',
'date': datetime.datetime(2010, 1, 1, 20),
'added': [
FileNode('foo/bar', content='foo'),
FileNode('foo/bał', content='foo'),
FileNode('foobar', content='foo'),
FileNode('qwe', content='foo'),
],
},
'message': u'Massive changes',
'date': datetime.datetime(2010, 1, 1, 22),
'added': [FileNode('fallout', content='War never changes')],
'changed': [
FileNode('foo/bar', content='baz'),
FileNode('foobar', content='baz'),
'removed': [FileNode('qwe')],
]
def test_initial_commit(self):
changeset = self.repo.get_changeset(0)
self.assertItemsEqual(changeset.added, [
changeset.get_node('foo/bar'),
changeset.get_node('foo/bał'),
changeset.get_node('foobar'),
changeset.get_node('qwe'),
])
self.assertItemsEqual(changeset.changed, [])
self.assertItemsEqual(changeset.removed, [])
def test_head_added(self):
changeset = self.repo.get_changeset()
self.assertTrue(isinstance(changeset.added, AddedFileNodesGenerator))
changeset.get_node('fallout'),
self.assertTrue(isinstance(changeset.changed, ChangedFileNodesGenerator))
self.assertItemsEqual(changeset.changed, [
self.assertTrue(isinstance(changeset.removed, RemovedFileNodesGenerator))
self.assertEqual(len(changeset.removed), 1)
self.assertEqual(list(changeset.removed)[0].path, 'qwe')
def test_get_filemode(self):
self.assertEqual(33188, changeset.get_file_mode('foo/bar'))
def test_get_filemode_non_ascii(self):
self.assertEqual(33188, changeset.get_file_mode('foo/bał'))
self.assertEqual(33188, changeset.get_file_mode(u'foo/bał'))
# For each backend create test case class
for alias in SCM_TESTS:
attrs = {
'backend_alias': alias,
}
# tests with additional commits
cls_name = ''.join(('%s changesets with commits test' % alias).title().split())
bases = (ChangesetsWithCommitsTestCaseixin, unittest.TestCase)
globals()[cls_name] = type(cls_name, bases, attrs)
# tests without additional commits
cls_name = ''.join(('%s changesets test' % alias).title().split())
bases = (ChangesetsTestCaseMixin, unittest.TestCase)
# tests changes
cls_name = ''.join(('%s changesets changes test' % alias).title().split())
bases = (ChangesetsChangesTestCaseMixin, unittest.TestCase)
if __name__ == '__main__':
unittest.main()
Tests so called "in memory changesets" commit API of vcs.
from rhodecode.tests.vcs.conf import SCM_TESTS, get_new_dir
from rhodecode.lib.vcs.exceptions import EmptyRepositoryError
from rhodecode.lib.vcs.exceptions import NodeAlreadyAddedError
from rhodecode.lib.vcs.exceptions import NodeAlreadyExistsError
from rhodecode.lib.vcs.exceptions import NodeAlreadyRemovedError
from rhodecode.lib.vcs.exceptions import NodeAlreadyChangedError
from rhodecode.lib.vcs.exceptions import NodeDoesNotExistError
from rhodecode.lib.vcs.exceptions import NodeNotChangedError
from rhodecode.lib.vcs.nodes import DirNode
from rhodecode.lib.vcs.nodes import FileNode
class InMemoryChangesetTestMixin(object):
This is a backend independent test case class which should be created
with ``type`` method.
It is required to set following attributes at subclass:
- ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
- ``repo_path``: path to the repository which would be created for set of
tests
def get_backend(self):
return vcs.get_backend(self.backend_alias)
def setUp(self):
Backend = self.get_backend()
self.repo_path = get_new_dir(str(time.time()))
self.repo = Backend(self.repo_path, create=True)
self.imc = self.repo.in_memory_changeset
self.nodes = [
FileNode('foobar', content='Foo & bar'),
@@ -91,77 +93,129 @@ class InMemoryChangesetTestMixin(object)
self.imc.add(FileNode('foo/bar/image.png', content='\0'))
self.imc.add(FileNode('foo/README.txt', content='readme!'))
changeset = self.imc.commit(u'Initial', u'joe.doe@example.com')
self.assertTrue(isinstance(changeset.get_node('foo'), DirNode))
self.assertTrue(isinstance(changeset.get_node('foo/bar'), DirNode))
self.assertEqual(changeset.get_node('foo/bar/image.png').content, '\0')
self.assertEqual(changeset.get_node('foo/README.txt').content, 'readme!')
# commit some more files again
to_add = [
FileNode('foo/bar/foobaz/bar', content='foo'),
FileNode('foo/bar/another/bar', content='foo'),
FileNode('foo/baz.txt', content='foo'),
FileNode('foobar/foobaz/file', content='foo'),
FileNode('foobar/barbaz', content='foo'),
self.imc.add(*to_add)
changeset = self.imc.commit(u'Another', u'joe.doe@example.com')
self.assertEqual(changeset.get_node('foo/bar/foobaz/bar').content, 'foo')
self.assertEqual(changeset.get_node('foo/bar/another/bar').content, 'foo')
self.assertEqual(changeset.get_node('foo/baz.txt').content, 'foo')
self.assertEqual(changeset.get_node('foobar/foobaz/file').content, 'foo')
self.assertEqual(changeset.get_node('foobar/barbaz').content, 'foo')
def test_add_non_ascii_files(self):
rev_count = len(self.repo.revisions)
FileNode('żółwik/zwierzątko', content='ćććć'),
FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
for node in to_add:
self.imc.add(node)
message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
author = unicode(self.__class__)
changeset = self.imc.commit(message=message, author=author)
newtip = self.repo.get_changeset()
self.assertEqual(changeset, newtip)
self.assertEqual(rev_count + 1, len(self.repo.revisions))
self.assertEqual(newtip.message, message)
self.assertEqual(newtip.author, author)
self.assertTrue(not any((self.imc.added, self.imc.changed,
self.imc.removed)))
self.assertEqual(newtip.get_node(node.path).content, node.content)
def test_add_raise_already_added(self):
node = FileNode('foobar', content='baz')
self.assertRaises(NodeAlreadyAddedError, self.imc.add, node)
def test_check_integrity_raise_already_exist(self):
self.imc.commit(message=u'Added foobar', author=unicode(self))
self.assertRaises(NodeAlreadyExistsError, self.imc.commit,
message='new message',
author=str(self))
def test_change(self):
self.imc.add(FileNode('foo/bar/baz', content='foo'))
self.imc.add(FileNode('foo/fbar', content='foobar'))
tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
# Change node's content
node = FileNode('foo/bar/baz', content='My **changed** content')
self.imc.change(node)
self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
self.assertNotEqual(tip, newtip)
self.assertNotEqual(tip.id, newtip.id)
self.assertEqual(newtip.get_node('foo/bar/baz').content,
'My **changed** content')
def test_change_non_ascii(self):
node = FileNode('żółwik/zwierzątko', content='My **changed** content')
self.imc.commit(u'Changed %s' % safe_unicode(node.path),
u'joe.doe@example.com')
node = FileNode(u'żółwik/zwierzątko_uni', content=u'My **changed** content')
self.assertEqual(newtip.get_node('żółwik/zwierzątko').content,
self.assertEqual(newtip.get_node('żółwik/zwierzątko_uni').content,
def test_change_raise_empty_repository(self):
node = FileNode('foobar')
self.assertRaises(EmptyRepositoryError, self.imc.change, node)
def test_check_integrity_change_raise_node_does_not_exist(self):
node = FileNode('not-foobar', content='')
self.assertRaises(NodeDoesNotExistError, self.imc.commit,
message='Changed not existing node',
def test_change_raise_node_already_changed(self):
node = FileNode('foobar', content='more baz')
self.assertRaises(NodeAlreadyChangedError, self.imc.change, node)
def test_check_integrity_change_raise_node_not_changed(self):
Status change: