import os
import posixpath
import mercurial.archival
import mercurial.node
import mercurial.obsutil
from kallithea.lib.vcs.backends.base import BaseChangeset
from kallithea.lib.vcs.conf import settings
from kallithea.lib.vcs.exceptions import ChangesetDoesNotExistError, ChangesetError, ImproperArchiveTypeError, NodeDoesNotExistError, VCSError
from kallithea.lib.vcs.nodes import (AddedFileNodesGenerator, ChangedFileNodesGenerator, DirNode, FileNode, NodeKind, RemovedFileNodesGenerator, RootNode,
SubModuleNode)
from kallithea.lib.vcs.utils import ascii_bytes, ascii_str, date_fromtimestamp, safe_bytes, safe_str
from kallithea.lib.vcs.utils.lazy import LazyProperty
from kallithea.lib.vcs.utils.paths import get_dirs_for_path
class MercurialChangeset(BaseChangeset):
"""
Represents state of the repository at a revision.
def __init__(self, repository, revision):
self.repository = repository
assert isinstance(revision, str), repr(revision)
self._ctx = repository._repo[ascii_bytes(revision)]
self.raw_id = ascii_str(self._ctx.hex())
self.revision = self._ctx._rev
self.nodes = {}
@LazyProperty
def tags(self):
return [safe_str(tag) for tag in self._ctx.tags()]
def branch(self):
return safe_str(self._ctx.branch())
def branches(self):
return [safe_str(self._ctx.branch())]
def closesbranch(self):
return self._ctx.closesbranch()
def obsolete(self):
return self._ctx.obsolete()
def bumped(self):
return self._ctx.phasedivergent()
def divergent(self):
return self._ctx.contentdivergent()
def extinct(self):
return self._ctx.extinct()
def unstable(self):
return self._ctx.orphan()
def phase(self):
if(self._ctx.phase() == 1):
return 'Draft'
elif(self._ctx.phase() == 2):
return 'Secret'
else:
return ''
def successors(self):
successors = mercurial.obsutil.successorssets(self._ctx._repo, self._ctx.node(), closest=True)
# flatten the list here handles both divergent (len > 1)
# and the usual case (len = 1)
return [safe_str(mercurial.node.hex(n)[:12]) for sub in successors for n in sub if n != self._ctx.node()]
def predecessors(self):
return [safe_str(mercurial.node.hex(n)[:12]) for n in mercurial.obsutil.closestpredecessors(self._ctx._repo, self._ctx.node())]
def bookmarks(self):
return [safe_str(bookmark) for bookmark in self._ctx.bookmarks()]
def message(self):
return safe_str(self._ctx.description())
def committer(self):
return safe_str(self.author)
def author(self):
return safe_str(self._ctx.user())
def date(self):
return date_fromtimestamp(*self._ctx.date())
def _timestamp(self):
return self._ctx.date()[0]
def status(self):
Returns modified, added, removed, deleted files for current changeset
return self.repository._repo.status(self._ctx.p1().node(),
self._ctx.node())
def _file_paths(self):
return list(safe_str(f) for f in self._ctx)
def _dir_paths(self):
p = list(set(get_dirs_for_path(*self._file_paths)))
p.insert(0, '')
return p
def _paths(self):
return self._dir_paths + self._file_paths
def short_id(self):
return self.raw_id[:12]
def parents(self):
Returns list of parents changesets.
return [self.repository.get_changeset(parent.rev())
for parent in self._ctx.parents() if parent.rev() >= 0]
def children(self):
Returns list of children changesets.
return [self.repository.get_changeset(child.rev())
for child in self._ctx.children() if child.rev() >= 0]
def next(self, branch=None):
if branch and self.branch != branch:
raise VCSError('Branch option used on changeset not belonging '
'to that branch')
cs = self
while True:
try:
next_ = cs.repository.revisions.index(cs.raw_id) + 1
next_rev = cs.repository.revisions[next_]
except IndexError:
raise ChangesetDoesNotExistError
cs = cs.repository.get_changeset(next_rev)
if not branch or branch == cs.branch:
return cs
def prev(self, branch=None):
prev_ = cs.repository.revisions.index(cs.raw_id) - 1
if prev_ < 0:
raise IndexError
prev_rev = cs.repository.revisions[prev_]
cs = cs.repository.get_changeset(prev_rev)
def diff(self):
# Only used to feed diffstat
return b''.join(self._ctx.diff())
def _get_kind(self, path):
path = path.rstrip('/')
if path in self._file_paths:
return NodeKind.FILE
elif path in self._dir_paths:
return NodeKind.DIR
raise ChangesetError("Node does not exist at the given path '%s'"
% (path))
def _get_filectx(self, path):
if self._get_kind(path) != NodeKind.FILE:
raise ChangesetError("File does not exist for revision %s at "
" '%s'" % (self.raw_id, path))
return self._ctx.filectx(safe_bytes(path))
def _extract_submodules(self):
returns a dictionary with submodule information from substate file
of hg repository
return self._ctx.substate
def get_file_mode(self, path):
Returns stat mode of the file at the given ``path``.
fctx = self._get_filectx(path)
if b'x' in fctx.flags():
return 0o100755
return 0o100644
def get_file_content(self, path):
Returns content of the file at given ``path``.
return fctx.data()
def get_file_size(self, path):
Returns size of the file at given ``path``.
return fctx.size()
def get_file_changeset(self, path):
Returns last commit of the file at the given ``path``.
return self.get_file_history(path, limit=1)[0]
def get_file_history(self, path, limit=None):
Returns history of file as reversed list of ``Changeset`` objects for
which file at given ``path`` has been modified.
hist = []
cnt = 0
for cs in reversed([x for x in fctx.filelog()]):
cnt += 1
hist.append(mercurial.node.hex(fctx.filectx(cs).node()))
if limit is not None and cnt == limit:
break
return [self.repository.get_changeset(node) for node in hist]
def get_file_annotate(self, path):
Returns a generator of four element tuples with
lineno, sha, changeset lazy loader and line
annotations = self._get_filectx(path).annotate()
annotation_lines = [(annotateline.fctx, annotateline.text) for annotateline in annotations]
for i, (fctx, line) in enumerate(annotation_lines):
sha = ascii_str(fctx.hex())
yield (i + 1, sha, lambda sha=sha: self.repository.get_changeset(sha), line)
def fill_archive(self, stream=None, kind='tgz', prefix=None,
subrepos=False):
Fills up given stream.
:param stream: file like object.
:param kind: one of following: ``zip``, ``tgz`` or ``tbz2``.
Default: ``tgz``.
:param prefix: name of root directory in archive.
Default is repository name and changeset's raw_id joined with dash
(``repo-tip.<KIND>``).
:param subrepos: include subrepos in this archive.
:raise ImproperArchiveTypeError: If given kind is wrong.
:raise VcsError: If given stream is None
allowed_kinds = settings.ARCHIVE_SPECS
if kind not in allowed_kinds:
raise ImproperArchiveTypeError('Archive kind not supported use one'
'of %s' % ' '.join(allowed_kinds))
if stream is None:
raise VCSError('You need to pass in a valid stream for filling'
' with archival data')
if prefix is None:
prefix = '%s-%s' % (self.repository.name, self.short_id)
elif prefix.startswith('/'):
raise VCSError("Prefix cannot start with leading slash")
elif prefix.strip() == '':
raise VCSError("Prefix cannot be empty")
mercurial.archival.archive(self.repository._repo, stream, ascii_bytes(self.raw_id),
safe_bytes(kind), prefix=safe_bytes(prefix), subrepos=subrepos)
def get_nodes(self, path):
Returns combined ``DirNode`` and ``FileNode`` objects list representing
state of changeset at the given ``path``. If node at the given ``path``
is not instance of ``DirNode``, ChangesetError would be raised.
if self._get_kind(path) != NodeKind.DIR:
raise ChangesetError("Directory does not exist for revision %s at "
" '%s'" % (self.revision, path))
filenodes = [FileNode(f, changeset=self) for f in self._file_paths
if os.path.dirname(f) == path]
dirs = path == '' and '' or [d for d in self._dir_paths
if d and posixpath.dirname(d) == path]
dirnodes = [DirNode(d, changeset=self) for d in dirs
if os.path.dirname(d) == path]
als = self.repository.alias
for k, vals in self._extract_submodules().items():
#vals = url,rev,type
loc = vals[0]
cs = vals[1]
dirnodes.append(SubModuleNode(k, url=loc, changeset=cs,
dirnodes.append(SubModuleNode(safe_str(k), url=safe_str(loc), changeset=cs,
alias=als))
nodes = dirnodes + filenodes
for node in nodes:
self.nodes[node.path] = node
nodes.sort()
return nodes
def get_node(self, path):
Returns ``Node`` object from the given ``path``. If there is no node at
the given ``path``, ``ChangesetError`` would be raised.
if path not in self.nodes:
node = FileNode(path, changeset=self)
elif path in self._dir_paths or path in self._dir_paths:
if path == '':
node = RootNode(changeset=self)
node = DirNode(path, changeset=self)
raise NodeDoesNotExistError("There is no file nor directory "
"at the given path: '%s' at revision %s"
% (path, self.short_id))
# cache node
self.nodes[path] = node
return self.nodes[path]
def affected_files(self):
Gets a fast accessible file changes for given changeset
return self._ctx.files()
@property
def added(self):
Returns list of added ``FileNode`` objects.
return AddedFileNodesGenerator([safe_str(n) for n in self.status.added], self)
def changed(self):
Returns list of modified ``FileNode`` objects.
return ChangedFileNodesGenerator([safe_str(n) for n in self.status.modified], self)
def removed(self):
Returns list of removed ``FileNode`` objects.
return RemovedFileNodesGenerator([safe_str(n) for n in self.status.removed], self)
def extra(self):
return self._ctx.extra()
# -*- coding: utf-8 -*-
vcs.nodes
~~~~~~~~~
Module holding everything related to vcs nodes.
:created_on: Apr 8, 2010
:copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
import functools
import mimetypes
import stat
from kallithea.lib.vcs.backends.base import EmptyChangeset
from kallithea.lib.vcs.exceptions import NodeError, RemovedFileNodeError
from kallithea.lib.vcs.utils import safe_bytes, safe_str
class NodeKind:
SUBMODULE = -1
DIR = 1
FILE = 2
class NodeState:
ADDED = 'added'
CHANGED = 'changed'
NOT_CHANGED = 'not changed'
REMOVED = 'removed'
class NodeGeneratorBase(object):
Base class for removed added and changed filenodes, it's a lazy generator
class that will create filenodes only on iteration or call
The len method doesn't need to create filenodes at all
def __init__(self, current_paths, cs):
self.cs = cs
self.current_paths = current_paths
def __getitem__(self, key):
assert isinstance(key, slice), key
for p in self.current_paths[key]:
yield self.cs.get_node(p)
def __len__(self):
return len(self.current_paths)
def __iter__(self):
for p in self.current_paths:
class AddedFileNodesGenerator(NodeGeneratorBase):
Class holding Added files for current changeset
pass
class ChangedFileNodesGenerator(NodeGeneratorBase):
Class holding Changed files for current changeset
class RemovedFileNodesGenerator(NodeGeneratorBase):
Class holding removed files for current changeset
yield RemovedFileNode(path=p)
@functools.total_ordering
class Node(object):
Simplest class representing file or directory on repository. SCM backends
should use ``FileNode`` and ``DirNode`` subclasses rather than ``Node``
directly.
Node's ``path`` cannot start with slash as we operate on *relative* paths
only. Moreover, every single node is identified by the ``path`` attribute,
so it cannot end with slash, too. Otherwise, path could lead to mistakes.
def __init__(self, path, kind):
if path.startswith('/'):
raise NodeError("Cannot initialize Node objects with slash at "
"the beginning as only relative paths are supported")
self.path = path.rstrip('/')
if path == '' and kind != NodeKind.DIR:
raise NodeError("Only DirNode and its subclasses may be "
"initialized with empty path")
self.kind = kind
#self.dirs, self.files = [], []
if self.is_root() and not self.is_dir():
raise NodeError("Root node cannot be FILE kind")
def parent(self):
parent_path = self.get_parent_path()
if parent_path:
if self.changeset:
return self.changeset.get_node(parent_path)
return DirNode(parent_path)
return None
def name(self):
Returns name of the node so if its path
then only last part is returned.
return self.path.rstrip('/').split('/')[-1]
def __eq__(self, other):
if type(self) is not type(other):
return False
if self.kind != other.kind:
if self.path != other.path:
def __lt__(self, other):
if self.kind < other.kind:
return True
if self.kind > other.kind:
if self.path < other.path:
if self.path > other.path:
def __repr__(self):
return '<%s %r>' % (self.__class__.__name__, self.path)
def get_parent_path(self):
Returns node's parent path or empty string if node is root.
if self.is_root():
return posixpath.dirname(self.path.rstrip('/')) + '/'
def is_file(self):
Returns ``True`` if node's kind is ``NodeKind.FILE``, ``False``
otherwise.
return self.kind == NodeKind.FILE
def is_dir(self):
Returns ``True`` if node's kind is ``NodeKind.DIR``, ``False``
return self.kind == NodeKind.DIR
def is_root(self):
Returns ``True`` if node is a root node and ``False`` otherwise.
return self.kind == NodeKind.DIR and self.path == ''
def is_submodule(self):
Returns ``True`` if node's kind is ``NodeKind.SUBMODULE``, ``False``
return self.kind == NodeKind.SUBMODULE
return self.state is NodeState.ADDED
return self.state is NodeState.CHANGED
def not_changed(self):
return self.state is NodeState.NOT_CHANGED
return self.state is NodeState.REMOVED
class FileNode(Node):
Class representing file nodes.
:attribute: path: path to the node, relative to repository's root
:attribute: content: if given arbitrary sets content of the file
:attribute: changeset: if given, first time content is accessed, callback
:attribute: mode: octal stat mode for a node. Default is 0100644.
def __init__(self, path, content=None, changeset=None, mode=None):
Only one of ``content`` and ``changeset`` may be given. Passing both
would raise ``NodeError`` exception.
:param path: relative path to the node
:param content: content may be passed to constructor
:param changeset: if given, will use it to lazily fetch content
:param mode: octal representation of ST_MODE (i.e. 0100644)
if content and changeset:
raise NodeError("Cannot use both content and changeset")
super(FileNode, self).__init__(path, kind=NodeKind.FILE)
self.changeset = changeset
if not isinstance(content, bytes) and content is not None:
# File content is one thing that inherently must be bytes ... but
# VCS module tries to be "user friendly" and support unicode ...
content = safe_bytes(content)
self._content = content
self._mode = mode or 0o100644
eq = super(FileNode, self).__eq__(other)
if eq is not None:
return eq
return self.content == other.content
lt = super(FileNode, self).__lt__(other)
if lt is not None:
return lt
return self.content < other.content
def mode(self):
Returns lazily mode of the FileNode. If ``changeset`` is not set, would
use value given at initialization or 0100644 (default).
mode = self.changeset.get_file_mode(self.path)
mode = self._mode
return mode
def content(self):
Returns lazily byte content of the FileNode.
content = self.changeset.get_file_content(self.path)
content = self._content
return content
def size(self):
return self.changeset.get_file_size(self.path)
raise NodeError("Cannot retrieve size of the file without related "
"changeset attribute")
return self.last_changeset.message
raise NodeError("Cannot retrieve message of the file without related "
def last_changeset(self):
return self.changeset.get_file_changeset(self.path)
raise NodeError("Cannot retrieve last changeset of the file without "
"related changeset attribute")
def get_mimetype(self):
Mimetype is calculated based on the file's content.
mtype, encoding = mimetypes.guess_type(self.name)
if mtype is None:
if self.is_binary:
mtype = 'application/octet-stream'
encoding = None
mtype = 'text/plain'
# try with pygments
from pygments import lexers
mt = lexers.get_lexer_for_filename(self.name).mimetypes
except lexers.ClassNotFound:
mt = None
if mt:
mtype = mt[0]
return mtype, encoding
def mimetype(self):
Wrapper around full mimetype info. It returns only type of fetched
mimetype without the encoding part. use get_mimetype function to fetch
full set of (type,encoding)
return self.get_mimetype()[0]
def mimetype_main(self):
return self.mimetype.split('/')[0]
def lexer(self):
Returns pygment's lexer class. Would try to guess lexer taking file's
content, name and mimetype.
lexer = lexers.guess_lexer_for_filename(self.name, safe_str(self.content), stripnl=False)
lexer = lexers.TextLexer(stripnl=False)
# returns first alias
return lexer
def lexer_alias(self):
Returns first alias of the lexer guessed for this file.
return self.lexer.aliases[0]
def history(self):
Returns a list of changeset for this file in which the file was changed
if self.changeset is None:
raise NodeError('Unable to get changeset for this FileNode')
return self.changeset.get_file_history(self.path)
def annotate(self):
Returns a list of three element tuples with lineno,changeset and line
return self.changeset.get_file_annotate(self.path)
def state(self):
if not self.changeset:
raise NodeError("Cannot check state of the node if it's not "
"linked with changeset")
elif self.path in (node.path for node in self.changeset.added):
return NodeState.ADDED
elif self.path in (node.path for node in self.changeset.changed):
return NodeState.CHANGED
return NodeState.NOT_CHANGED
def is_binary(self):
Returns True if file has binary content.
return b'\0' in self.content
def is_browser_compatible_image(self):
return self.mimetype in [
"image/gif",
"image/jpeg",
"image/png",
"image/bmp"
]
def extension(self):
"""Returns filenode extension"""
return self.name.split('.')[-1]
def is_executable(self):
Returns ``True`` if file has executable flag turned on.
return bool(self.mode & stat.S_IXUSR)
return '<%s %r @ %s>' % (self.__class__.__name__, self.path,
getattr(self.changeset, 'short_id', ''))
class RemovedFileNode(FileNode):
Dummy FileNode class - trying to access any public attribute except path,
name, kind or state (or methods/attributes checking those two) would raise
RemovedFileNodeError.
ALLOWED_ATTRIBUTES = [
'name', 'path', 'state', 'is_root', 'is_file', 'is_dir', 'kind',
'added', 'changed', 'not_changed', 'removed'
def __init__(self, path):
super(RemovedFileNode, self).__init__(path=path)
def __getattribute__(self, attr):
if attr.startswith('_') or attr in RemovedFileNode.ALLOWED_ATTRIBUTES:
return super(RemovedFileNode, self).__getattribute__(attr)
raise RemovedFileNodeError("Cannot access attribute %s on "
"RemovedFileNode" % attr)
return NodeState.REMOVED
class DirNode(Node):
DirNode stores list of files and directories within this node.
Nodes may be used standalone but within repository context they
lazily fetch data within same repository's changeset.
def __init__(self, path, nodes=(), changeset=None):
Only one of ``nodes`` and ``changeset`` may be given. Passing both
:param nodes: content may be passed to constructor
:param size: always 0 for ``DirNode``
if nodes and changeset:
raise NodeError("Cannot use both nodes and changeset")
super(DirNode, self).__init__(path, NodeKind.DIR)
self._nodes = nodes
eq = super(DirNode, self).__eq__(other)
# check without entering each dir
self_nodes_paths = list(sorted(n.path for n in self.nodes))
other_nodes_paths = list(sorted(n.path for n in self.nodes))
return self_nodes_paths == other_nodes_paths
lt = super(DirNode, self).__lt__(other)
return self_nodes_paths < other_nodes_paths
def nodes(self):
nodes = self.changeset.get_nodes(self.path)
nodes = self._nodes
self._nodes_dict = dict((node.path, node) for node in nodes)
return sorted(nodes)
def files(self):
return sorted((node for node in self.nodes if node.is_file()))
def dirs(self):
return sorted((node for node in self.nodes if node.is_dir()))
for node in self.nodes:
yield node
Returns node from within this particular ``DirNode``, so it is now
allowed to fetch, i.e. node located at 'docs/api/index.rst' from node
'docs'. In order to access deeper nodes one must fetch nodes between
them first - this would work::
docs = root.get_node('docs')
docs.get_node('api').get_node('index.rst')
:param: path - relative to the current node
.. note::
To access lazily (as in example above) node have to be initialized
with related changeset object - without it node is out of
context and may know nothing about anything else than nearest
(located at same level) nodes.
raise NodeError("Cannot retrieve node without path")
self.nodes # access nodes first in order to set _nodes_dict
paths = path.split('/')
if len(paths) == 1:
if not self.is_root():
path = '/'.join((self.path, paths[0]))
path = paths[0]
return self._nodes_dict[path]
elif len(paths) > 1:
raise NodeError("Cannot access deeper "
"nodes without changeset")
path1, path2 = paths[0], '/'.join(paths[1:])
return self.get_node(path1).get_node(path2)
raise KeyError
except KeyError:
raise NodeError("Node does not exist at %s" % path)
raise NodeError("Cannot access state of DirNode")
size = 0
for root, dirs, files in self.changeset.walk(self.path):
for f in files:
size += f.size
return size
class RootNode(DirNode):
DirNode being the root node of the repository.
def __init__(self, nodes=(), changeset=None):
super(RootNode, self).__init__(path='', nodes=nodes,
changeset=changeset)
return '<%s>' % self.__class__.__name__
class SubModuleNode(Node):
represents a SubModule of Git or SubRepo of Mercurial
is_binary = False
def __init__(self, name, url, changeset=None, alias=None):
# Note: Doesn't call Node.__init__!
self.path = name.rstrip('/')
self.kind = NodeKind.SUBMODULE
self.alias = alias
# we have to use emptyChangeset here since this can point to svn/git/hg
# submodules we cannot get from repository
self.changeset = EmptyChangeset(changeset, alias=alias)
self.url = url
org = self.path.rstrip('/').rsplit('/', 1)[-1]
return '%s @ %s' % (org, self.changeset.short_id)
return '%s @ %s' % (org, safe_str(self.changeset.short_id))
Status change: