import re
from itertools import chain
from dulwich import objects
from subprocess import Popen, PIPE
from rhodecode.lib.vcs.conf import settings
from rhodecode.lib.vcs.backends.base import BaseChangeset, EmptyChangeset
from rhodecode.lib.vcs.exceptions import (
RepositoryError, ChangesetError, NodeDoesNotExistError, VCSError,
ChangesetDoesNotExistError, ImproperArchiveTypeError
)
from rhodecode.lib.vcs.nodes import (
FileNode, DirNode, NodeKind, RootNode, RemovedFileNode, SubModuleNode,
ChangedFileNodesGenerator, AddedFileNodesGenerator, RemovedFileNodesGenerator
from rhodecode.lib.vcs.utils import (
safe_unicode, safe_str, safe_int, date_fromtimestamp
from rhodecode.lib.vcs.utils.lazy import LazyProperty
class GitChangeset(BaseChangeset):
"""
Represents state of the repository at single revision.
def __init__(self, repository, revision):
self._stat_modes = {}
self.repository = repository
try:
commit = self.repository._repo[revision]
if isinstance(commit, objects.Tag):
revision = commit.object[1]
commit = self.repository._repo.get_object(commit.object[1])
except KeyError:
raise RepositoryError("Cannot get object with id %s" % revision)
self.raw_id = revision
self.id = self.raw_id
self.short_id = self.raw_id[:12]
self._commit = commit
self._tree_id = commit.tree
self._committer_property = 'committer'
self._author_property = 'author'
self._date_property = 'commit_time'
self._date_tz_property = 'commit_timezone'
self.revision = repository.revisions.index(revision)
self.nodes = {}
self._paths = {}
@LazyProperty
def message(self):
return safe_unicode(self._commit.message)
def committer(self):
return safe_unicode(getattr(self._commit, self._committer_property))
def author(self):
return safe_unicode(getattr(self._commit, self._author_property))
def date(self):
return date_fromtimestamp(getattr(self._commit, self._date_property),
getattr(self._commit, self._date_tz_property))
def _timestamp(self):
return getattr(self._commit, self._date_property)
def status(self):
Returns modified, added, removed, deleted files for current changeset
return self.changed, self.added, self.removed
def tags(self):
_tags = []
for tname, tsha in self.repository.tags.iteritems():
if tsha == self.raw_id:
_tags.append(tname)
return _tags
def branch(self):
heads = self.repository._heads(reverse=False)
ref = heads.get(self.raw_id)
if ref:
return safe_unicode(ref)
def _fix_path(self, path):
Paths are stored without trailing slash so we need to get rid off it if
needed.
if path.endswith('/'):
path = path.rstrip('/')
return path
def _get_id_for_path(self, path):
path = safe_str(path)
# FIXME: Please, spare a couple of minutes and make those codes cleaner;
if not path in self._paths:
path = path.strip('/')
# set root tree
tree = self.repository._repo[self._tree_id]
if path == '':
self._paths[''] = tree.id
return tree.id
splitted = path.split('/')
dirs, name = splitted[:-1], splitted[-1]
curdir = ''
# initially extract things from root dir
for item, stat, id in tree.iteritems():
if curdir:
name = '/'.join((curdir, item))
else:
name = item
self._paths[name] = id
self._stat_modes[name] = stat
for dir in dirs:
curdir = '/'.join((curdir, dir))
curdir = dir
dir_id = None
if dir == item:
dir_id = id
if dir_id:
# Update tree
tree = self.repository._repo[dir_id]
if not isinstance(tree, objects.Tree):
raise ChangesetError('%s is not a directory' % curdir)
raise ChangesetError('%s have not been found' % curdir)
# cache all items from the given traversed tree
raise NodeDoesNotExistError("There is no file nor directory "
"at the given path '%s' at revision %s"
% (path, self.short_id))
% (path, safe_str(self.short_id)))
return self._paths[path]
def _get_kind(self, path):
obj = self.repository._repo[self._get_id_for_path(path)]
if isinstance(obj, objects.Blob):
return NodeKind.FILE
elif isinstance(obj, objects.Tree):
return NodeKind.DIR
def _get_filectx(self, path):
path = self._fix_path(path)
if self._get_kind(path) != NodeKind.FILE:
raise ChangesetError("File does not exist for revision %s at "
" '%s'" % (self.raw_id, path))
def _get_file_nodes(self):
return chain(*(t[2] for t in self.walk()))
def parents(self):
Returns list of parents changesets.
return [self.repository.get_changeset(parent)
for parent in self._commit.parents]
def children(self):
Returns list of children changesets.
rev_filter = _git_path = settings.GIT_REV_FILTER
so, se = self.repository.run_git_command(
"rev-list %s --children | grep '^%s'" % (rev_filter, self.raw_id)
children = []
for l in so.splitlines():
childs = l.split(' ')[1:]
children.extend(childs)
return [self.repository.get_changeset(cs) for cs in children]
def next(self, branch=None):
if branch and self.branch != branch:
raise VCSError('Branch option used on changeset not belonging '
'to that branch')
def _next(changeset, branch):
next_ = changeset.revision + 1
next_rev = changeset.repository.revisions[next_]
except IndexError:
raise ChangesetDoesNotExistError
cs = changeset.repository.get_changeset(next_rev)
if branch and branch != cs.branch:
return _next(cs, branch)
return cs
return _next(self, branch)
def prev(self, branch=None):
def _prev(changeset, branch):
prev_ = changeset.revision - 1
if prev_ < 0:
raise IndexError
prev_rev = changeset.repository.revisions[prev_]
cs = changeset.repository.get_changeset(prev_rev)
return _prev(cs, branch)
return _prev(self, branch)
def diff(self, ignore_whitespace=True, context=3):
rev1 = self.parents[0] if self.parents else self.repository.EMPTY_CHANGESET
rev2 = self
return ''.join(self.repository.get_diff(rev1, rev2,
ignore_whitespace=ignore_whitespace,
context=context))
def get_file_mode(self, path):
Returns stat mode of the file at the given ``path``.
# ensure path is traversed
self._get_id_for_path(path)
return self._stat_modes[path]
def get_file_content(self, path):
Returns content of the file at given ``path``.
id = self._get_id_for_path(path)
blob = self.repository._repo[id]
return blob.as_pretty_string()
def get_file_size(self, path):
Returns size of the file at given ``path``.
return blob.raw_length()
def get_file_changeset(self, path):
Returns last commit of the file at the given ``path``.
return self.get_file_history(path, limit=1)[0]
def get_file_history(self, path, limit=None):
Returns history of file as reversed list of ``Changeset`` objects for
which file at given ``path`` has been modified.
TODO: This function now uses os underlying 'git' and 'grep' commands
which is generally not good. Should be replaced with algorithm
iterating commits.
self._get_filectx(path)
cs_id = safe_str(self.id)
f_path = safe_str(path)
if limit:
cmd = 'log -n %s --pretty="format: %%H" -s -p %s -- "%s"' % (
safe_int(limit, 0), cs_id, f_path
cmd = 'log --pretty="format: %%H" -s -p %s -- "%s"' % (
cs_id, f_path
so, se = self.repository.run_git_command(cmd)
ids = re.findall(r'[0-9a-fA-F]{40}', so)
return [self.repository.get_changeset(id) for id in ids]
def get_file_history_2(self, path):
from dulwich.walk import Walker
include = [self.id]
walker = Walker(self.repository._repo.object_store, include,
paths=[path], max_entries=1)
return [self.repository.get_changeset(sha)
for sha in (x.commit.id for x in walker)]
def get_file_annotate(self, path):
Returns a generator of four element tuples with
lineno, sha, changeset lazy loader and line
TODO: This function now uses os underlying 'git' command which is
generally not good. Should be replaced with algorithm iterating
commits.
cmd = 'blame -l --root -r %s -- "%s"' % (self.id, path)
# -l ==> outputs long shas (and we need all 40 characters)
# --root ==> doesn't put '^' character for bounderies
# -r sha ==> blames for the given revision
for i, blame_line in enumerate(so.split('\n')[:-1]):
ln_no = i + 1
sha, line = re.split(r' ', blame_line, 1)
yield (ln_no, sha, lambda: self.repository.get_changeset(sha), line)
def fill_archive(self, stream=None, kind='tgz', prefix=None,
subrepos=False):
Fills up given stream.
:param stream: file like object.
:param kind: one of following: ``zip``, ``tgz`` or ``tbz2``.
Default: ``tgz``.
:param prefix: name of root directory in archive.
Default is repository name and changeset's raw_id joined with dash
(``repo-tip.<KIND>``).
:param subrepos: include subrepos in this archive.
:raise ImproperArchiveTypeError: If given kind is wrong.
:raise VcsError: If given stream is None
allowed_kinds = settings.ARCHIVE_SPECS.keys()
if kind not in allowed_kinds:
raise ImproperArchiveTypeError('Archive kind not supported use one'
'of %s', allowed_kinds)
if prefix is None:
prefix = '%s-%s' % (self.repository.name, self.short_id)
elif prefix.startswith('/'):
raise VCSError("Prefix cannot start with leading slash")
elif prefix.strip() == '':
raise VCSError("Prefix cannot be empty")
if kind == 'zip':
frmt = 'zip'
frmt = 'tar'
_git_path = settings.GIT_EXECUTABLE_PATH
cmd = '%s archive --format=%s --prefix=%s/ %s' % (_git_path,
frmt, prefix, self.raw_id)
if kind == 'tgz':
cmd += ' | gzip -9'
elif kind == 'tbz2':
cmd += ' | bzip2 -9'
if stream is None:
raise VCSError('You need to pass in a valid stream for filling'
' with archival data')
popen = Popen(cmd, stdout=PIPE, stderr=PIPE, shell=True,
cwd=self.repository.path)
buffer_size = 1024 * 8
chunk = popen.stdout.read(buffer_size)
while chunk:
stream.write(chunk)
# Make sure all descriptors would be read
popen.communicate()
def get_nodes(self, path):
if self._get_kind(path) != NodeKind.DIR:
raise ChangesetError("Directory does not exist for revision %s at "
" '%s'" % (self.revision, path))
tree = self.repository._repo[id]
dirnodes = []
filenodes = []
als = self.repository.alias
for name, stat, id in tree.iteritems():
if objects.S_ISGITLINK(stat):
dirnodes.append(SubModuleNode(name, url=None, changeset=id,
alias=als))
continue
obj = self.repository._repo.get_object(id)
if path != '':
obj_path = '/'.join((path, name))
obj_path = name
if obj_path not in self._stat_modes:
self._stat_modes[obj_path] = stat
if isinstance(obj, objects.Tree):
dirnodes.append(DirNode(obj_path, changeset=self))
elif isinstance(obj, objects.Blob):
filenodes.append(FileNode(obj_path, changeset=self, mode=stat))
raise ChangesetError("Requested object should be Tree "
"or Blob, is %r" % type(obj))
nodes = dirnodes + filenodes
for node in nodes:
if not node.path in self.nodes:
self.nodes[node.path] = node
nodes.sort()
return nodes
def get_node(self, path):
if isinstance(path, unicode):
path = path.encode('utf-8')
if not path in self.nodes:
id_ = self._get_id_for_path(path)
except ChangesetError:
raise NodeDoesNotExistError("Cannot find one of parents' "
"directories for a given path: %s" % path)
_GL = lambda m: m and objects.S_ISGITLINK(m)
if _GL(self._stat_modes.get(path)):
node = SubModuleNode(path, url=None, changeset=id_,
alias=self.repository.alias)
import time
import datetime
import posixpath
from dulwich.repo import Repo
from rhodecode.lib.vcs.backends.base import BaseInMemoryChangeset
from rhodecode.lib.vcs.exceptions import RepositoryError
from rhodecode.lib.vcs.utils import safe_str
class GitInMemoryChangeset(BaseInMemoryChangeset):
def commit(self, message, author, parents=None, branch=None, date=None,
**kwargs):
Performs in-memory commit (doesn't check workdir in any way) and
returns newly created ``Changeset``. Updates repository's
``revisions``.
:param message: message of the commit
:param author: full username, i.e. "Joe Doe <joe.doe@example.com>"
:param parents: single parent or sequence of parents from which commit
would be derieved
:param date: ``datetime.datetime`` instance. Defaults to
``datetime.datetime.now()``.
:param branch: branch name, as string. If none given, default backend's
branch would be used.
:raises ``CommitError``: if any error occurs while committing
self.check_integrity(parents)
from .repository import GitRepository
if branch is None:
branch = GitRepository.DEFAULT_BRANCH_NAME
repo = self.repository._repo
object_store = repo.object_store
ENCODING = "UTF-8"
DIRMOD = 040000
# Create tree and populates it with blobs
commit_tree = self.parents[0] and repo[self.parents[0]._commit.tree] or\
objects.Tree()
for node in self.added + self.changed:
# Compute subdirs if needed
dirpath, nodename = posixpath.split(node.path)
dirnames = dirpath and dirpath.split('/') or []
dirnames = map(safe_str, dirpath and dirpath.split('/') or [])
parent = commit_tree
ancestors = [('', parent)]
# Tries to dig for the deepest existing tree
while dirnames:
curdir = dirnames.pop(0)
dir_id = parent[curdir][1]
# put curdir back into dirnames and stops
dirnames.insert(0, curdir)
break
# If found, updates parent
parent = self.repository._repo[dir_id]
ancestors.append((curdir, parent))
# Now parent is deepest existing tree and we need to create subtrees
# for dirnames (in reverse order) [this only applies for nodes from added]
new_trees = []
if not node.is_binary:
content = node.content.encode(ENCODING)
content = node.content
blob = objects.Blob.from_string(content)
node_path = node.name.encode(ENCODING)
if dirnames:
# If there are trees which should be created we need to build
# them now (in reverse order)
reversed_dirnames = list(reversed(dirnames))
curtree = objects.Tree()
curtree[node_path] = node.mode, blob.id
new_trees.append(curtree)
for dirname in reversed_dirnames[:-1]:
newtree = objects.Tree()
#newtree.add(DIRMOD, dirname, curtree.id)
newtree[dirname] = DIRMOD, curtree.id
new_trees.append(newtree)
curtree = newtree
parent[reversed_dirnames[-1]] = DIRMOD, curtree.id
parent.add(name=node_path, mode=node.mode, hexsha=blob.id)
new_trees.append(parent)
# Update ancestors
for parent, tree, path in reversed([(a[1], b[1], b[0]) for a, b in
zip(ancestors, ancestors[1:])]):
parent[path] = DIRMOD, tree.id
object_store.add_object(tree)
object_store.add_object(blob)
for tree in new_trees:
for node in self.removed:
paths = node.path.split('/')
tree = commit_tree
trees = [tree]
# Traverse deep into the forest...
for path in paths:
obj = self.repository._repo[tree[path][1]]
trees.append(obj)
tree = obj
# Cut down the blob and all rotten trees on the way back...
for path, tree in reversed(zip(paths, trees)):
del tree[path]
if tree:
# This tree still has elements - don't remove it or any
# of it's parents
object_store.add_object(commit_tree)
# Create commit
commit = objects.Commit()
commit.tree = commit_tree.id
commit.parents = [p._commit.id for p in self.parents if p]
commit.author = commit.committer = safe_str(author)
commit.encoding = ENCODING
commit.message = safe_str(message)
# Compute date
if date is None:
date = time.time()
elif isinstance(date, datetime.datetime):
date = time.mktime(date.timetuple())
author_time = kwargs.pop('author_time', date)
commit.commit_time = int(date)
commit.author_time = int(author_time)
tz = time.timezone
author_tz = kwargs.pop('author_timezone', tz)
commit.commit_timezone = tz
commit.author_timezone = author_tz
object_store.add_object(commit)
ref = 'refs/heads/%s' % branch
repo.refs[ref] = commit.id
# Update vcs repository object & recreate dulwich repo
self.repository.revisions.append(commit.id)
# invalidate parsed refs after commit
self.repository._parsed_refs = self.repository._get_parsed_refs()
tip = self.repository.get_changeset()
self.reset()
return tip
def _get_missing_trees(self, path, root_tree):
Creates missing ``Tree`` objects for the given path.
:param path: path given as a string. It may be a path to a file node
(i.e. ``foo/bar/baz.txt``) or directory path - in that case it must
end with slash (i.e. ``foo/bar/``).
:param root_tree: ``dulwich.objects.Tree`` object from which we start
traversing (should be commit's root tree)
dirpath = posixpath.split(path)[0]
dirs = dirpath.split('/')
if not dirs or dirs == ['']:
return []
def get_tree_for_dir(tree, dirname):
for name, mode, id in tree.iteritems():
if name == dirname:
obj = self.repository._repo[id]
return obj
raise RepositoryError("Cannot create directory %s "
"at tree %s as path is occupied and is not a "
"Tree" % (dirname, tree))
return None
trees = []
parent = root_tree
for dirname in dirs:
tree = get_tree_for_dir(parent, dirname)
if tree is None:
tree = objects.Tree()
dirmode = 040000
parent.add(dirmode, dirname, tree.id)
parent = tree
# Always append tree
trees.append(tree)
return trees
# -*- coding: utf-8 -*-
vcs.nodes
~~~~~~~~~
Module holding everything related to vcs nodes.
:created_on: Apr 8, 2010
:copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
import os
import stat
import mimetypes
from rhodecode.lib.vcs.backends.base import EmptyChangeset
from rhodecode.lib.vcs.exceptions import NodeError, RemovedFileNodeError
from rhodecode.lib.vcs.utils import safe_unicode
from rhodecode.lib.vcs.utils import safe_unicode, safe_str
class NodeKind:
SUBMODULE = -1
DIR = 1
FILE = 2
class NodeState:
ADDED = u'added'
CHANGED = u'changed'
NOT_CHANGED = u'not changed'
REMOVED = u'removed'
class NodeGeneratorBase(object):
Base class for removed added and changed filenodes, it's a lazy generator
class that will create filenodes only on iteration or call
The len method doesn't need to create filenodes at all
def __init__(self, current_paths, cs):
self.cs = cs
self.current_paths = current_paths
def __call__(self):
return [n for n in self]
def __getslice__(self, i, j):
for p in self.current_paths[i:j]:
yield self.cs.get_node(p)
def __len__(self):
return len(self.current_paths)
def __iter__(self):
for p in self.current_paths:
class AddedFileNodesGenerator(NodeGeneratorBase):
Class holding Added files for current changeset
pass
class ChangedFileNodesGenerator(NodeGeneratorBase):
Class holding Changed files for current changeset
class RemovedFileNodesGenerator(NodeGeneratorBase):
Class holding removed files for current changeset
yield RemovedFileNode(path=p)
class Node(object):
Simplest class representing file or directory on repository. SCM backends
should use ``FileNode`` and ``DirNode`` subclasses rather than ``Node``
directly.
Node's ``path`` cannot start with slash as we operate on *relative* paths
only. Moreover, every single node is identified by the ``path`` attribute,
so it cannot end with slash, too. Otherwise, path could lead to mistakes.
def __init__(self, path, kind):
if path.startswith('/'):
raise NodeError("Cannot initialize Node objects with slash at "
"the beginning as only relative paths are supported")
self.path = path.rstrip('/')
self.path = safe_str(path.rstrip('/')) # we store paths as str
if path == '' and kind != NodeKind.DIR:
raise NodeError("Only DirNode and its subclasses may be "
"initialized with empty path")
self.kind = kind
#self.dirs, self.files = [], []
if self.is_root() and not self.is_dir():
raise NodeError("Root node cannot be FILE kind")
def parent(self):
parent_path = self.get_parent_path()
if parent_path:
if self.changeset:
return self.changeset.get_node(parent_path)
return DirNode(parent_path)
def unicode_path(self):
return safe_unicode(self.path)
def name(self):
Returns name of the node so if its path
then only last part is returned.
return safe_unicode(self.path.rstrip('/').split('/')[-1])
def _get_kind(self):
return self._kind
def _set_kind(self, kind):
if hasattr(self, '_kind'):
raise NodeError("Cannot change node's kind")
self._kind = kind
# Post setter check (path's trailing slash)
if self.path.endswith('/'):
raise NodeError("Node's path cannot end with slash")
kind = property(_get_kind, _set_kind)
def __cmp__(self, other):
Comparator using name of the node, needed for quick list sorting.
kind_cmp = cmp(self.kind, other.kind)
if kind_cmp:
return kind_cmp
return cmp(self.name, other.name)
def __eq__(self, other):
for attr in ['name', 'path', 'kind']:
if getattr(self, attr) != getattr(other, attr):
return False
if self.is_file():
if self.content != other.content:
# For DirNode's check without entering each dir
self_nodes_paths = list(sorted(n.path for n in self.nodes))
other_nodes_paths = list(sorted(n.path for n in self.nodes))
if self_nodes_paths != other_nodes_paths:
return True
def __nq__(self, other):
return not self.__eq__(other)
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.path)
def __str__(self):
return self.__repr__()
def __unicode__(self):
return self.name
def get_parent_path(self):
Returns node's parent path or empty string if node is root.
if self.is_root():
return ''
return posixpath.dirname(self.path.rstrip('/')) + '/'
def is_file(self):
Returns ``True`` if node's kind is ``NodeKind.FILE``, ``False``
otherwise.
return self.kind == NodeKind.FILE
def is_dir(self):
Returns ``True`` if node's kind is ``NodeKind.DIR``, ``False``
return self.kind == NodeKind.DIR
def is_root(self):
Returns ``True`` if node is a root node and ``False`` otherwise.
return self.kind == NodeKind.DIR and self.path == ''
def is_submodule(self):
Returns ``True`` if node's kind is ``NodeKind.SUBMODULE``, ``False``
return self.kind == NodeKind.SUBMODULE
def added(self):
return self.state is NodeState.ADDED
def changed(self):
return self.state is NodeState.CHANGED
def not_changed(self):
return self.state is NodeState.NOT_CHANGED
def removed(self):
return self.state is NodeState.REMOVED
class FileNode(Node):
Class representing file nodes.
:attribute: path: path to the node, relative to repostiory's root
:attribute: content: if given arbitrary sets content of the file
:attribute: changeset: if given, first time content is accessed, callback
:attribute: mode: octal stat mode for a node. Default is 0100644.
def __init__(self, path, content=None, changeset=None, mode=None):
Only one of ``content`` and ``changeset`` may be given. Passing both
would raise ``NodeError`` exception.
:param path: relative path to the node
:param content: content may be passed to constructor
:param changeset: if given, will use it to lazily fetch content
:param mode: octal representation of ST_MODE (i.e. 0100644)
if content and changeset:
raise NodeError("Cannot use both content and changeset")
super(FileNode, self).__init__(path, kind=NodeKind.FILE)
self.changeset = changeset
self._content = content
self._mode = mode or 0100644
def mode(self):
Returns lazily mode of the FileNode. If ``changeset`` is not set, would
use value given at initialization or 0100644 (default).
mode = self.changeset.get_file_mode(self.path)
mode = self._mode
return mode
def _get_content(self):
content = self.changeset.get_file_content(self.path)
content = self._content
return content
@property
def content(self):
Returns lazily content of the FileNode. If possible, would try to
decode content from UTF-8.
content = self._get_content()
if bool(content and '\0' in content):
return safe_unicode(content)
def size(self):
@@ -391,360 +391,360 @@ class ScmModel(BaseModel):
log.error(traceback.format_exc())
raise
def is_following_repo(self, repo_name, user_id, cache=False):
r = self.sa.query(Repository)\
.filter(Repository.repo_name == repo_name).scalar()
f = self.sa.query(UserFollowing)\
.filter(UserFollowing.follows_repository == r)\
.filter(UserFollowing.user_id == user_id).scalar()
return f is not None
def is_following_user(self, username, user_id, cache=False):
u = User.get_by_username(username)
.filter(UserFollowing.follows_user == u)\
def get_followers(self, repo):
repo = self._get_repo(repo)
return self.sa.query(UserFollowing)\
.filter(UserFollowing.follows_repository == repo).count()
def get_forks(self, repo):
return self.sa.query(Repository)\
.filter(Repository.fork == repo).count()
def get_pull_requests(self, repo):
return self.sa.query(PullRequest)\
.filter(PullRequest.other_repo == repo)\
.filter(PullRequest.status != PullRequest.STATUS_CLOSED).count()
def mark_as_fork(self, repo, fork, user):
repo = self.__get_repo(repo)
fork = self.__get_repo(fork)
if fork and repo.repo_id == fork.repo_id:
raise Exception("Cannot set repository as fork of itself")
repo.fork = fork
self.sa.add(repo)
return repo
def _handle_rc_scm_extras(self, username, repo_name, repo_alias,
action=None):
from rhodecode import CONFIG
from rhodecode.lib.base import _get_ip_addr
from pylons import request
environ = request.environ
except TypeError:
# we might use this outside of request context, let's fake the
# environ data
from webob import Request
environ = Request.blank('').environ
extras = {
'ip': _get_ip_addr(environ),
'username': username,
'action': action or 'push_local',
'repository': repo_name,
'scm': repo_alias,
'config': CONFIG['__file__'],
'server_url': get_server_url(environ),
'make_lock': None,
'locked_by': [None, None]
}
_set_extras(extras)
def _handle_push(self, repo, username, action, repo_name, revisions):
Triggers push action hooks
:param repo: SCM repo
:param username: username who pushes
:param action: push/push_loca/push_remote
:param repo_name: name of repo
:param revisions: list of revisions that we pushed
self._handle_rc_scm_extras(username, repo_name, repo_alias=repo.alias)
_scm_repo = repo._repo
# trigger push hook
if repo.alias == 'hg':
log_push_action(_scm_repo.ui, _scm_repo, node=revisions[0])
elif repo.alias == 'git':
log_push_action(None, _scm_repo, _git_revs=revisions)
def _get_IMC_module(self, scm_type):
Returns InMemoryCommit class based on scm_type
:param scm_type:
if scm_type == 'hg':
from rhodecode.lib.vcs.backends.hg import \
MercurialInMemoryChangeset as IMC
elif scm_type == 'git':
from rhodecode.lib.vcs.backends.git import \
GitInMemoryChangeset as IMC
return IMC
def pull_changes(self, repo, username):
dbrepo = self.__get_repo(repo)
clone_uri = dbrepo.clone_uri
if not clone_uri:
raise Exception("This repository doesn't have a clone uri")
repo = dbrepo.scm_instance
repo_name = dbrepo.repo_name
if repo.alias == 'git':
repo.fetch(clone_uri)
# git doesn't really have something like post-fetch action
# we fake that now. #TODO: extract fetched revisions somehow
# here
self._handle_push(repo,
username=username,
action='push_remote',
repo_name=repo_name,
revisions=[])
self._handle_rc_scm_extras(username, dbrepo.repo_name,
repo.alias, action='push_remote')
repo.pull(clone_uri)
self.mark_for_invalidation(repo_name)
except Exception:
def commit_change(self, repo, repo_name, cs, user, author, message,
content, f_path):
Commits changes
:param repo: SCM instance
user = self._get_user(user)
IMC = self._get_IMC_module(repo.alias)
# decoding here will force that we have proper encoded values
# in any other case this will throw exceptions and deny commit
content = safe_str(content)
path = safe_str(f_path)
# message and author needs to be unicode
# proper backend should then translate that into required type
message = safe_unicode(message)
author = safe_unicode(author)
imc = IMC(repo)
imc.change(FileNode(path, content, mode=cs.get_file_mode(f_path)))
tip = imc.commit(message=message,
author=author,
parents=[cs], branch=cs.branch)
username=user.username,
action='push_local',
revisions=[tip.raw_id])
def create_nodes(self, user, repo, message, nodes, parent_cs=None,
author=None, trigger_push_hook=True):
Commits given multiple nodes into repo
:param user: RhodeCode User object or user_id, the commiter
:param repo: RhodeCode Repository object
:param message: commit message
:param nodes: mapping {filename:{'content':content},...}
:param parent_cs: parent changeset, can be empty than it's initial commit
:param author: author of commit, cna be different that commiter only for git
:param trigger_push_hook: trigger push hooks
:returns: new commited changeset
scm_instance = repo.scm_instance_no_cache()
processed_nodes = []
for f_path in nodes:
if f_path.startswith('/') or f_path.startswith('.') or '../' in f_path:
raise NonRelativePathError('%s is not an relative path' % f_path)
if f_path:
f_path = os.path.normpath(f_path)
content = nodes[f_path]['content']
f_path = safe_str(f_path)
if isinstance(content, (basestring,)):
elif isinstance(content, (file, cStringIO.OutputType,)):
content = content.read()
raise Exception('Content is of unrecognized type %s' % (
type(content)
))
processed_nodes.append((f_path, content))
commiter = user.full_contact
author = safe_unicode(author) if author else commiter
IMC = self._get_IMC_module(scm_instance.alias)
imc = IMC(scm_instance)
if not parent_cs:
parent_cs = EmptyChangeset(alias=scm_instance.alias)
if isinstance(parent_cs, EmptyChangeset):
# EmptyChangeset means we we're editing empty repository
parents = None
parents = [parent_cs]
# add multiple nodes
for path, content in processed_nodes:
imc.add(FileNode(path, content=content))
parents=parents,
branch=parent_cs.branch)
self.mark_for_invalidation(repo.repo_name)
if trigger_push_hook:
self._handle_push(scm_instance,
repo_name=repo.repo_name,
def get_nodes(self, repo_name, revision, root_path='/', flat=True):
recursive walk in root dir and return a set of all path in that dir
based on repository walk function
:param repo_name: name of repository
:param revision: revision for which to list nodes
:param root_path: root path to list
:param flat: return as a list, if False returns a dict with decription
_files = list()
_dirs = list()
_repo = self.__get_repo(repo_name)
changeset = _repo.scm_instance.get_changeset(revision)
root_path = root_path.lstrip('/')
for topnode, dirs, files in changeset.walk(root_path):
for f in files:
_files.append(f.path if flat else {"name": f.path,
"type": "file"})
for d in dirs:
_dirs.append(d.path if flat else {"name": d.path,
"type": "dir"})
except RepositoryError:
log.debug(traceback.format_exc())
return _dirs, _files
def get_unread_journal(self):
return self.sa.query(UserLog).count()
def get_repo_landing_revs(self, repo=None):
Generates select option with tags branches and bookmarks (for hg only)
grouped by type
:param repo:
hist_l = []
choices = []
hist_l.append(['tip', _('latest tip')])
choices.append('tip')
if not repo:
return choices, hist_l
repo = repo.scm_instance
branches_group = ([(k, k) for k, v in
repo.branches.iteritems()], _("Branches"))
hist_l.append(branches_group)
choices.extend([x[0] for x in branches_group[0]])
bookmarks_group = ([(k, k) for k, v in
repo.bookmarks.iteritems()], _("Bookmarks"))
hist_l.append(bookmarks_group)
choices.extend([x[0] for x in bookmarks_group[0]])
tags_group = ([(k, k) for k, v in
repo.tags.iteritems()], _("Tags"))
hist_l.append(tags_group)
choices.extend([x[0] for x in tags_group[0]])
def install_git_hook(self, repo, force_create=False):
Creates a rhodecode hook inside a git repository
:param repo: Instance of VCS repo
:param force_create: Create even if same name hook exists
loc = jn(repo.path, 'hooks')
if not repo.bare:
loc = jn(repo.path, '.git', 'hooks')
if not os.path.isdir(loc):
os.makedirs(loc)
tmpl_post = pkg_resources.resource_string(
'rhodecode', jn('config', 'post_receive_tmpl.py')
tmpl_pre = pkg_resources.resource_string(
'rhodecode', jn('config', 'pre_receive_tmpl.py')
for h_type, tmpl in [('pre', tmpl_pre), ('post', tmpl_post)]:
_hook_file = jn(loc, '%s-receive' % h_type)
_rhodecode_hook = False
log.debug('Installing git hook in repo %s' % repo)
if os.path.exists(_hook_file):
# let's take a look at this hook, maybe it's rhodecode ?
log.debug('hook exists, checking if it is from rhodecode')
_HOOK_VER_PAT = re.compile(r'^RC_HOOK_VER')
with open(_hook_file, 'rb') as f:
data = f.read()
matches = re.compile(r'(?:%s)\s*=\s*(.*)'
% 'RC_HOOK_VER').search(data)
if matches:
ver = matches.groups()[0]
log.debug('got %s it is rhodecode' % (ver))
_rhodecode_hook = True
# there is no hook in this dir, so we want to create one
if _rhodecode_hook or force_create:
log.debug('writing %s hook file !' % h_type)
with open(_hook_file, 'wb') as f:
tmpl = tmpl.replace('_TMPL_', rhodecode.__version__)
f.write(tmpl)
os.chmod(_hook_file, 0755)
log.debug('skipping writing hook file')
# encoding: utf8
from __future__ import with_statement
from rhodecode.lib import vcs
from rhodecode.tests.vcs.base import BackendTestMixin
from rhodecode.tests.vcs.conf import SCM_TESTS
from rhodecode.lib.vcs.backends.base import BaseChangeset
FileNode, AddedFileNodesGenerator,
ChangedFileNodesGenerator, RemovedFileNodesGenerator
BranchDoesNotExistError, ChangesetDoesNotExistError,
RepositoryError, EmptyRepositoryError
from rhodecode.lib.vcs.utils.compat import unittest
from rhodecode.tests.vcs.conf import get_new_dir
class TestBaseChangeset(unittest.TestCase):
def test_as_dict(self):
changeset = BaseChangeset()
changeset.id = 'ID'
changeset.raw_id = 'RAW_ID'
changeset.short_id = 'SHORT_ID'
changeset.revision = 1009
changeset.date = datetime.datetime(2011, 1, 30, 1, 45)
changeset.message = 'Message of a commit'
changeset.author = 'Joe Doe <joe.doe@example.com>'
changeset.added = [FileNode('foo/bar/baz'), FileNode('foobar')]
changeset.changed = []
changeset.removed = []
self.assertEqual(changeset.as_dict(), {
'id': 'ID',
'raw_id': 'RAW_ID',
'short_id': 'SHORT_ID',
'revision': 1009,
'date': datetime.datetime(2011, 1, 30, 1, 45),
'message': 'Message of a commit',
'author': {
'name': 'Joe Doe',
'email': 'joe.doe@example.com',
},
'added': ['foo/bar/baz', 'foobar'],
'changed': [],
'removed': [],
})
class ChangesetsWithCommitsTestCaseixin(BackendTestMixin):
recreate_repo_per_test = True
@classmethod
def _get_commits(cls):
start_date = datetime.datetime(2010, 1, 1, 20)
for x in xrange(5):
yield {
'message': 'Commit %d' % x,
'author': 'Joe Doe <joe.doe@example.com>',
'date': start_date + datetime.timedelta(hours=12 * x),
'added': [
FileNode('file_%d.txt' % x, content='Foobar %d' % x),
],
def test_new_branch(self):
self.imc.add(vcs.nodes.FileNode('docs/index.txt',
content='Documentation\n'))
foobar_tip = self.imc.commit(
message=u'New branch: foobar',
author=u'joe',
branch='foobar',
self.assertTrue('foobar' in self.repo.branches)
self.assertEqual(foobar_tip.branch, 'foobar')
# 'foobar' should be the only branch that contains the new commit
self.assertNotEqual(*self.repo.branches.values())
def test_new_head_in_default_branch(self):
tip = self.repo.get_changeset()
parents=[tip],
self.imc.change(vcs.nodes.FileNode('docs/index.txt',
content='Documentation\nand more...\n'))
newtip = self.imc.commit(
message=u'At default branch',
branch=foobar_tip.branch,
parents=[foobar_tip],
newest_tip = self.imc.commit(
message=u'Merged with %s' % foobar_tip.raw_id,
branch=self.backend_class.DEFAULT_BRANCH_NAME,
parents=[newtip, foobar_tip],
self.assertEqual(newest_tip.branch,
self.backend_class.DEFAULT_BRANCH_NAME)
def test_get_changesets_respects_branch_name(self):
doc_changeset = self.imc.commit(
message=u'New branch: docs',
branch='docs',
self.imc.add(vcs.nodes.FileNode('newfile', content=''))
self.imc.commit(
message=u'Back in default branch',
default_branch_changesets = self.repo.get_changesets(
branch_name=self.repo.DEFAULT_BRANCH_NAME)
self.assertNotIn(doc_changeset, default_branch_changesets)
def test_get_changeset_by_branch(self):
for branch, sha in self.repo.branches.iteritems():
self.assertEqual(sha, self.repo.get_changeset(branch).raw_id)
def test_get_changeset_by_tag(self):
for tag, sha in self.repo.tags.iteritems():
self.assertEqual(sha, self.repo.get_changeset(tag).raw_id)
class ChangesetsTestCaseMixin(BackendTestMixin):
recreate_repo_per_test = False
'message': u'Commit %d' % x,
'author': u'Joe Doe <joe.doe@example.com>',
def test_simple(self):
self.assertEqual(tip.date, datetime.datetime(2010, 1, 3, 20))
def test_get_changesets_is_ordered_by_date(self):
changesets = list(self.repo.get_changesets())
ordered_by_date = sorted(changesets,
key=lambda cs: cs.date)
self.assertItemsEqual(changesets, ordered_by_date)
def test_get_changesets_respects_start(self):
second_id = self.repo.revisions[1]
changesets = list(self.repo.get_changesets(start=second_id))
self.assertEqual(len(changesets), 4)
def test_get_changesets_numerical_id_respects_start(self):
second_id = 1
def test_get_changesets_includes_start_changeset(self):
self.assertEqual(changesets[0].raw_id, second_id)
def test_get_changesets_respects_end(self):
changesets = list(self.repo.get_changesets(end=second_id))
self.assertEqual(changesets[-1].raw_id, second_id)
self.assertEqual(len(changesets), 2)
def test_get_changesets_numerical_id_respects_end(self):
self.assertEqual(changesets.index(changesets[-1]), second_id)
def test_get_changesets_respects_both_start_and_end(self):
third_id = self.repo.revisions[2]
changesets = list(self.repo.get_changesets(start=second_id,
end=third_id))
def test_get_changesets_numerical_id_respects_both_start_and_end(self):
changesets = list(self.repo.get_changesets(start=2, end=3))
def test_get_changesets_on_empty_repo_raises_EmptyRepository_error(self):
Backend = self.get_backend()
repo_path = get_new_dir(str(time.time()))
repo = Backend(repo_path, create=True)
with self.assertRaises(EmptyRepositoryError):
list(repo.get_changesets(start='foobar'))
def test_get_changesets_includes_end_changeset(self):
def test_get_changesets_respects_start_date(self):
start_date = datetime.datetime(2010, 2, 1)
for cs in self.repo.get_changesets(start_date=start_date):
self.assertGreaterEqual(cs.date, start_date)
def test_get_changesets_respects_end_date(self):
start_date = datetime.datetime(2010, 1, 1)
end_date = datetime.datetime(2010, 2, 1)
for cs in self.repo.get_changesets(start_date=start_date,
end_date=end_date):
self.assertLessEqual(cs.date, end_date)
def test_get_changesets_respects_start_date_and_end_date(self):
for cs in self.repo.get_changesets(end_date=end_date):
def test_get_changesets_respects_reverse(self):
changesets_id_list = [cs.raw_id for cs in
self.repo.get_changesets(reverse=True)]
self.assertItemsEqual(changesets_id_list, reversed(self.repo.revisions))
def test_get_filenodes_generator(self):
filepaths = [node.path for node in tip.get_filenodes_generator()]
self.assertItemsEqual(filepaths, ['file_%d.txt' % x for x in xrange(5)])
def test_size(self):
size = 5 * len('Foobar N') # Size of 5 files
self.assertEqual(tip.size, size)
def test_author(self):
self.assertEqual(tip.author, u'Joe Doe <joe.doe@example.com>')
def test_author_name(self):
self.assertEqual(tip.author_name, u'Joe Doe')
def test_author_email(self):
self.assertEqual(tip.author_email, u'joe.doe@example.com')
def test_get_changesets_raise_changesetdoesnotexist_for_wrong_start(self):
with self.assertRaises(ChangesetDoesNotExistError):
list(self.repo.get_changesets(start='foobar'))
def test_get_changesets_raise_changesetdoesnotexist_for_wrong_end(self):
list(self.repo.get_changesets(end='foobar'))
def test_get_changesets_raise_branchdoesnotexist_for_wrong_branch_name(self):
with self.assertRaises(BranchDoesNotExistError):
list(self.repo.get_changesets(branch_name='foobar'))
def test_get_changesets_raise_repositoryerror_for_wrong_start_end(self):
start = self.repo.revisions[-1]
end = self.repo.revisions[0]
with self.assertRaises(RepositoryError):
list(self.repo.get_changesets(start=start, end=end))
def test_get_changesets_numerical_id_reversed(self):
[x for x in self.repo.get_changesets(start=3, end=2)]
def test_get_changesets_numerical_id_respects_both_start_and_end_last(self):
last = len(self.repo.revisions)
list(self.repo.get_changesets(start=last-1, end=last-2))
def test_get_changesets_numerical_id_last_zero_error(self):
list(self.repo.get_changesets(start=last-1, end=0))
class ChangesetsChangesTestCaseMixin(BackendTestMixin):
return [
{
'message': u'Initial',
'date': datetime.datetime(2010, 1, 1, 20),
FileNode('foo/bar', content='foo'),
FileNode('foo/bał', content='foo'),
FileNode('foobar', content='foo'),
FileNode('qwe', content='foo'),
'message': u'Massive changes',
'date': datetime.datetime(2010, 1, 1, 22),
'added': [FileNode('fallout', content='War never changes')],
'changed': [
FileNode('foo/bar', content='baz'),
FileNode('foobar', content='baz'),
'removed': [FileNode('qwe')],
]
def test_initial_commit(self):
changeset = self.repo.get_changeset(0)
self.assertItemsEqual(changeset.added, [
changeset.get_node('foo/bar'),
changeset.get_node('foo/bał'),
changeset.get_node('foobar'),
changeset.get_node('qwe'),
])
self.assertItemsEqual(changeset.changed, [])
self.assertItemsEqual(changeset.removed, [])
def test_head_added(self):
changeset = self.repo.get_changeset()
self.assertTrue(isinstance(changeset.added, AddedFileNodesGenerator))
changeset.get_node('fallout'),
self.assertTrue(isinstance(changeset.changed, ChangedFileNodesGenerator))
self.assertItemsEqual(changeset.changed, [
self.assertTrue(isinstance(changeset.removed, RemovedFileNodesGenerator))
self.assertEqual(len(changeset.removed), 1)
self.assertEqual(list(changeset.removed)[0].path, 'qwe')
def test_get_filemode(self):
self.assertEqual(33188, changeset.get_file_mode('foo/bar'))
def test_get_filemode_non_ascii(self):
self.assertEqual(33188, changeset.get_file_mode('foo/bał'))
self.assertEqual(33188, changeset.get_file_mode(u'foo/bał'))
# For each backend create test case class
for alias in SCM_TESTS:
attrs = {
'backend_alias': alias,
# tests with additional commits
cls_name = ''.join(('%s changesets with commits test' % alias).title().split())
bases = (ChangesetsWithCommitsTestCaseixin, unittest.TestCase)
globals()[cls_name] = type(cls_name, bases, attrs)
# tests without additional commits
cls_name = ''.join(('%s changesets test' % alias).title().split())
bases = (ChangesetsTestCaseMixin, unittest.TestCase)
# tests changes
cls_name = ''.join(('%s changesets changes test' % alias).title().split())
bases = (ChangesetsChangesTestCaseMixin, unittest.TestCase)
if __name__ == '__main__':
unittest.main()
Tests so called "in memory changesets" commit API of vcs.
from rhodecode.tests.vcs.conf import SCM_TESTS, get_new_dir
from rhodecode.lib.vcs.exceptions import EmptyRepositoryError
from rhodecode.lib.vcs.exceptions import NodeAlreadyAddedError
from rhodecode.lib.vcs.exceptions import NodeAlreadyExistsError
from rhodecode.lib.vcs.exceptions import NodeAlreadyRemovedError
from rhodecode.lib.vcs.exceptions import NodeAlreadyChangedError
from rhodecode.lib.vcs.exceptions import NodeDoesNotExistError
from rhodecode.lib.vcs.exceptions import NodeNotChangedError
from rhodecode.lib.vcs.nodes import DirNode
from rhodecode.lib.vcs.nodes import FileNode
class InMemoryChangesetTestMixin(object):
This is a backend independent test case class which should be created
with ``type`` method.
It is required to set following attributes at subclass:
- ``backend_alias``: alias of used backend (see ``vcs.BACKENDS``)
- ``repo_path``: path to the repository which would be created for set of
tests
def get_backend(self):
return vcs.get_backend(self.backend_alias)
def setUp(self):
self.repo_path = get_new_dir(str(time.time()))
self.repo = Backend(self.repo_path, create=True)
self.imc = self.repo.in_memory_changeset
self.nodes = [
FileNode('foobar', content='Foo & bar'),
FileNode('foobar2', content='Foo & bar, doubled!'),
FileNode('foo bar with spaces', content=''),
FileNode('foo/bar/baz', content='Inside'),
FileNode('foo/bar/file.bin', content='\xd0\xcf\x11\xe0\xa1\xb1\x1a\xe1\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00;\x00\x03\x00\xfe\xff\t\x00\x06\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x00\x01\x00\x00\x00\x1a\x00\x00\x00\x00\x00\x00\x00\x00\x10\x00\x00\x18\x00\x00\x00\x01\x00\x00\x00\xfe\xff\xff\xff\x00\x00\x00\x00\x00\x00\x00\x00\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff\xff'),
def test_add(self):
rev_count = len(self.repo.revisions)
to_add = [FileNode(node.path, content=node.content)
for node in self.nodes]
for node in to_add:
self.imc.add(node)
message = u'Added: %s' % ', '.join((node.path for node in self.nodes))
author = unicode(self.__class__)
changeset = self.imc.commit(message=message, author=author)
newtip = self.repo.get_changeset()
self.assertEqual(changeset, newtip)
self.assertEqual(rev_count + 1, len(self.repo.revisions))
self.assertEqual(newtip.message, message)
self.assertEqual(newtip.author, author)
self.assertTrue(not any((self.imc.added, self.imc.changed,
self.imc.removed)))
self.assertEqual(newtip.get_node(node.path).content, node.content)
def test_add_in_bulk(self):
self.imc.add(*to_add)
def test_add_actually_adds_all_nodes_at_second_commit_too(self):
self.imc.add(FileNode('foo/bar/image.png', content='\0'))
self.imc.add(FileNode('foo/README.txt', content='readme!'))
changeset = self.imc.commit(u'Initial', u'joe.doe@example.com')
self.assertTrue(isinstance(changeset.get_node('foo'), DirNode))
self.assertTrue(isinstance(changeset.get_node('foo/bar'), DirNode))
self.assertEqual(changeset.get_node('foo/bar/image.png').content, '\0')
self.assertEqual(changeset.get_node('foo/README.txt').content, 'readme!')
# commit some more files again
to_add = [
FileNode('foo/bar/foobaz/bar', content='foo'),
FileNode('foo/bar/another/bar', content='foo'),
FileNode('foo/baz.txt', content='foo'),
FileNode('foobar/foobaz/file', content='foo'),
FileNode('foobar/barbaz', content='foo'),
changeset = self.imc.commit(u'Another', u'joe.doe@example.com')
self.assertEqual(changeset.get_node('foo/bar/foobaz/bar').content, 'foo')
self.assertEqual(changeset.get_node('foo/bar/another/bar').content, 'foo')
self.assertEqual(changeset.get_node('foo/baz.txt').content, 'foo')
self.assertEqual(changeset.get_node('foobar/foobaz/file').content, 'foo')
self.assertEqual(changeset.get_node('foobar/barbaz').content, 'foo')
def test_add_non_ascii_files(self):
FileNode('żółwik/zwierzątko', content='ćććć'),
FileNode(u'żółwik/zwierzątko_uni', content=u'ćććć'),
def test_add_raise_already_added(self):
node = FileNode('foobar', content='baz')
self.assertRaises(NodeAlreadyAddedError, self.imc.add, node)
def test_check_integrity_raise_already_exist(self):
self.imc.commit(message=u'Added foobar', author=unicode(self))
self.assertRaises(NodeAlreadyExistsError, self.imc.commit,
message='new message',
author=str(self))
def test_change(self):
self.imc.add(FileNode('foo/bar/baz', content='foo'))
self.imc.add(FileNode('foo/fbar', content='foobar'))
tip = self.imc.commit(u'Initial', u'joe.doe@example.com')
# Change node's content
node = FileNode('foo/bar/baz', content='My **changed** content')
self.imc.change(node)
self.imc.commit(u'Changed %s' % node.path, u'joe.doe@example.com')
self.assertNotEqual(tip, newtip)
self.assertNotEqual(tip.id, newtip.id)
self.assertEqual(newtip.get_node('foo/bar/baz').content,
'My **changed** content')
def test_change_non_ascii(self):
node = FileNode('żółwik/zwierzątko', content='My **changed** content')
self.imc.commit(u'Changed %s' % safe_unicode(node.path),
u'joe.doe@example.com')
node = FileNode(u'żółwik/zwierzątko_uni', content=u'My **changed** content')
self.assertEqual(newtip.get_node('żółwik/zwierzątko').content,
self.assertEqual(newtip.get_node('żółwik/zwierzątko_uni').content,
def test_change_raise_empty_repository(self):
node = FileNode('foobar')
self.assertRaises(EmptyRepositoryError, self.imc.change, node)
def test_check_integrity_change_raise_node_does_not_exist(self):
node = FileNode('not-foobar', content='')
self.assertRaises(NodeDoesNotExistError, self.imc.commit,
message='Changed not existing node',
def test_change_raise_node_already_changed(self):
node = FileNode('foobar', content='more baz')
self.assertRaises(NodeAlreadyChangedError, self.imc.change, node)
def test_check_integrity_change_raise_node_not_changed(self):
self.test_add() # Performs first commit
node = FileNode(self.nodes[0].path, content=self.nodes[0].content)
self.assertRaises(NodeNotChangedError, self.imc.commit,
message=u'Trying to mark node as changed without touching it',
author=unicode(self))
def test_change_raise_node_already_removed(self):
self.imc.remove(FileNode('foobar'))
self.assertRaises(NodeAlreadyRemovedError, self.imc.change, node)
def test_remove(self):
node = self.nodes[0]
self.assertEqual(node.content, tip.get_node(node.path).content)
self.imc.remove(node)
self.imc.commit(message=u'Removed %s' % node.path, author=unicode(self))
self.assertRaises(NodeDoesNotExistError, newtip.get_node, node.path)
def test_remove_last_file_from_directory(self):
node = FileNode('omg/qwe/foo/bar', content='foobar')
self.imc.commit(u'added', u'joe doe')
tip = self.imc.commit(u'removed', u'joe doe')
self.assertRaises(NodeDoesNotExistError, tip.get_node, 'omg/qwe/foo/bar')
def test_remove_raise_node_does_not_exist(self):
self.imc.remove(self.nodes[0])
message='Trying to remove node at empty repository',
def test_check_integrity_remove_raise_node_does_not_exist(self):
node = FileNode('no-such-file')
message=u'Trying to remove not existing node',
def test_remove_raise_node_already_removed(self):
node = FileNode(self.nodes[0].path)
self.assertRaises(NodeAlreadyRemovedError, self.imc.remove, node)
def test_remove_raise_node_already_changed(self):
node = FileNode(self.nodes[0].path, content='Bending time')
self.assertRaises(NodeAlreadyChangedError, self.imc.remove, node)
def test_reset(self):
self.imc.add(FileNode('foo', content='bar'))
#self.imc.change(FileNode('baz', content='new'))
#self.imc.remove(FileNode('qwe'))
self.imc.reset()
def test_multiple_commits(self):
N = 3 # number of commits to perform
last = None
for x in xrange(N):
fname = 'file%s' % str(x).rjust(5, '0')
content = 'foobar\n' * x
node = FileNode(fname, content=content)
commit = self.imc.commit(u"Commit no. %s" % (x + 1), author=u'vcs')
self.assertTrue(last != commit)
last = commit
# Check commit number for same repo
self.assertEqual(len(self.repo.revisions), N)
# Check commit number for recreated repo
backend = self.get_backend()
repo = backend(self.repo_path)
self.assertEqual(len(repo.revisions), N)
def test_date_attr(self):
node = FileNode('foobar.txt', content='Foobared!')
date = datetime.datetime(1985, 1, 30, 1, 45)
commit = self.imc.commit(u"Committed at time when I was born ;-)",
author=u'lb', date=date)
self.assertEqual(commit.date, date)
class BackendBaseTestCase(unittest.TestCase):
Base test class for tests which requires repository.
backend_alias = 'hg'
commits = [
'message': 'Initial commit',
FileNode('foobar', content='Foobar'),
FileNode('foobar2', content='Foobar II'),
FileNode('foo/bar/baz', content='baz here!'),
def get_commits(self):
Returns list of commits which builds repository for each tests.
if hasattr(self, 'commits'):
return self.commits
def get_new_repo_path(self):
Returns newly created repository's directory.
key = '%s-%s' % (backend.alias, str(time.time()))
repo_path = get_new_dir(key)
return repo_path
self.backend_class = Backend
self.repo_path = self.get_new_repo_path()
for commit in self.get_commits():
for node in commit.get('added', []):
self.imc.add(FileNode(node.path, content=node.content))
for node in commit.get('changed', []):
self.imc.change(FileNode(node.path, content=node.content))
for node in commit.get('removed', []):
self.imc.remove(FileNode(node.path))
self.imc.commit(message=unicode(commit['message']),
author=unicode(commit['author']),
date=commit['date'])
self.tip = self.repo.get_changeset()
Status change: