@@ -93,426 +93,428 @@ class MercurialRepository(BaseRepository
@LazyProperty
def name(self):
return os.path.basename(self.path)
def branches(self):
return self._get_branches()
def closed_branches(self):
return self._get_branches(normal=False, closed=True)
def allbranches(self):
"""
List all branches, including closed branches.
return self._get_branches(closed=True)
def _get_branches(self, normal=True, closed=False):
Gets branches for this repository
Returns only not closed branches by default
:param closed: return also closed branches for mercurial
:param normal: return also normal branches
if self._empty:
return {}
bt = OrderedDict()
for bn, _heads, tip, isclosed in sorted(self._repo.branchmap().iterbranches()):
if isclosed:
if closed:
bt[safe_unicode(bn)] = hex(tip)
else:
if normal:
return bt
def tags(self):
Gets tags for this repository
return self._get_tags()
def _get_tags(self):
sortkey = lambda ctx: ctx[0] # sort by name
_tags = [(safe_unicode(n), hex(h),) for n, h in
self._repo.tags().items()]
return OrderedDict(sorted(_tags, key=sortkey, reverse=True))
def tag(self, name, user, revision=None, message=None, date=None,
**kwargs):
Creates and returns a tag for the given ``revision``.
:param name: name for new tag
:param user: full username, i.e.: "Joe Doe <joe.doe@example.com>"
:param revision: changeset id for which new tag would be created
:param message: message of the tag's commit
:param date: date of tag's commit
:raises TagAlreadyExistError: if tag with same name already exists
if name in self.tags:
raise TagAlreadyExistError("Tag %s already exists" % name)
changeset = self.get_changeset(revision)
local = kwargs.setdefault('local', False)
if message is None:
message = "Added tag %s for changeset %s" % (name,
changeset.short_id)
if date is None:
date = datetime.datetime.now().ctime()
try:
self._repo.tag(name, changeset._ctx.node(), message, local, user,
date)
except Abort, e:
raise RepositoryError(e.message)
# Reinitialize tags
self.tags = self._get_tags()
tag_id = self.tags[name]
return self.get_changeset(revision=tag_id)
def remove_tag(self, name, user, message=None, date=None):
Removes tag with the given ``name``.
:param name: name of the tag to be removed
:param message: message of the tag's removal commit
:param date: date of tag's removal commit
:raises TagDoesNotExistError: if tag with given name does not exists
if name not in self.tags:
raise TagDoesNotExistError("Tag %s does not exist" % name)
message = "Removed tag %s" % name
local = False
self._repo.tag(name, nullid, message, local, user, date)
def bookmarks(self):
Gets bookmarks for this repository
return self._get_bookmarks()
def _get_bookmarks(self):
_bookmarks = [(safe_unicode(n), hex(h),) for n, h in
self._repo._bookmarks.items()]
return OrderedDict(sorted(_bookmarks, key=sortkey, reverse=True))
def _get_all_revisions(self):
return map(lambda x: hex(x[7]), self._repo.changelog.index)[:-1]
def get_diff(self, rev1, rev2, path='', ignore_whitespace=False,
context=3):
Returns (git like) *diff*, as plain text. Shows changes introduced by
``rev2`` since ``rev1``.
:param rev1: Entry point from which diff is shown. Can be
``self.EMPTY_CHANGESET`` - in this case, patch showing all
the changes since empty state of the repository until ``rev2``
:param rev2: Until which revision changes should be shown.
:param ignore_whitespace: If set to ``True``, would not show whitespace
changes. Defaults to ``False``.
:param context: How many lines before/after changed lines should be
shown. Defaults to ``3``.
if hasattr(rev1, 'raw_id'):
rev1 = getattr(rev1, 'raw_id')
if hasattr(rev2, 'raw_id'):
rev2 = getattr(rev2, 'raw_id')
# Check if given revisions are present at repository (may raise
# ChangesetDoesNotExistError)
if rev1 != self.EMPTY_CHANGESET:
self.get_changeset(rev1)
self.get_changeset(rev2)
if path:
file_filter = match(self.path, '', [path])
file_filter = None
return ''.join(patch.diff(self._repo, rev1, rev2, match=file_filter,
opts=diffopts(git=True,
ignorews=ignore_whitespace,
context=context)))
@classmethod
def _check_url(cls, url, repoui=None):
Function will check given url and try to verify if it's a valid
link. Sometimes it may happened that mercurial will issue basic
auth request that can cause whole API to hang when used from python
or other external calls.
On failures it'll raise urllib2.HTTPError, exception is also thrown
when the return code is non 200
# check first if it's not an local url
if os.path.isdir(url) or url.startswith('file:'):
return True
url_prefix = None
if '+' in url[:url.find('://')]:
url = url[url.find('+') + 1:]
url_prefix, url = url.split('+', 1)
handlers = []
url_obj = hg_url(url)
test_uri, authinfo = url_obj.authinfo()
url_obj.passwd = '*****'
cleaned_uri = str(url_obj)
if authinfo:
#create a password manager
passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
passmgr.add_password(*authinfo)
handlers.extend((httpbasicauthhandler(passmgr),
httpdigestauthhandler(passmgr)))
o = urllib2.build_opener(*handlers)
o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
('Accept', 'application/mercurial-0.1')]
q = {"cmd": 'between'}
q.update({'pairs': "%s-%s" % ('0' * 40, '0' * 40)})
qs = '?%s' % urllib.urlencode(q)
cu = "%s%s" % (test_uri, qs)
req = urllib2.Request(cu, None, {})
resp = o.open(req)
if resp.code != 200:
raise Exception('Return Code is not 200')
except Exception, e:
# means it cannot be cloned
raise urllib2.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
# now check if it's a proper hg repo
httppeer(repoui or ui.ui(), url).lookup('tip')
raise urllib2.URLError(
"url [%s] does not look like an hg repo org_exc: %s"
% (cleaned_uri, e))
if not url_prefix: # skip svn+http://... (and git+... too)
def _get_repo(self, create, src_url=None, update_after_clone=False):
Function will check for mercurial repository in given path and return
a localrepo object. If there is no repository in that path it will
raise an exception unless ``create`` parameter is set to True - in
that case repository would be created and returned.
If ``src_url`` is given, would try to clone repository from the
location at given clone_point. Additionally it'll make update to
working copy accordingly to ``update_after_clone`` flag
if src_url:
url = str(self._get_url(src_url))
opts = {}
if not update_after_clone:
opts.update({'noupdate': True})
MercurialRepository._check_url(url, self.baseui)
clone(self.baseui, url, self.path, **opts)
# Don't try to create if we've already cloned repo
create = False
return localrepository(self.baseui, self.path, create=create)
except (Abort, RepoError), err:
if create:
msg = "Cannot create repository at %s. Original error was %s"\
% (self.path, err)
msg = "Not valid repository at %s. Original error was %s"\
raise RepositoryError(msg)
def in_memory_changeset(self):
return MercurialInMemoryChangeset(self)
def description(self):
undefined_description = u'unknown'
_desc = self._repo.ui.config('web', 'description', None, untrusted=True)
return safe_unicode(_desc or undefined_description)
def contact(self):
undefined_contact = u'Unknown'
return safe_unicode(get_contact(self._repo.ui.config)
or undefined_contact)
def last_change(self):
Returns last change made on this repository as datetime object
return date_fromtimestamp(self._get_mtime(), makedate()[1])
def _get_mtime(self):
return time.mktime(self.get_changeset().date.timetuple())
except RepositoryError:
#fallback to filesystem
cl_path = os.path.join(self.path, '.hg', "00changelog.i")
st_path = os.path.join(self.path, '.hg', "store")
if os.path.exists(cl_path):
return os.stat(cl_path).st_mtime
return os.stat(st_path).st_mtime
def _get_revision(self, revision):
Gets an ID revision given as str. This will always return a fill
40 char revision number
:param revision: str or int or None
if isinstance(revision, unicode):
revision = safe_str(revision)
raise EmptyRepositoryError("There are no changesets yet")
if revision in [-1, 'tip', None]:
revision = 'tip'
revision = hex(self._repo.lookup(revision))
except (LookupError, ):
msg = ("Ambiguous identifier `%s` for %s" % (revision, self))
raise ChangesetDoesNotExistError(msg)
except (IndexError, ValueError, RepoLookupError, TypeError):
msg = ("Revision %s does not exist for %s" % (revision, self))
return revision
def get_ref_revision(self, ref_type, ref_name):
Returns revision number for the given reference.
ref_name = safe_str(ref_name)
if ref_type == 'rev' and not ref_name.strip('0'):
return self.EMPTY_CHANGESET
# lookup up the exact node id
_revset_predicates = {
'branch': 'branch',
'book': 'bookmark',
'tag': 'tag',
'rev': 'id',
}
# avoid expensive branch(x) iteration over whole repo
rev_spec = "%%s & %s(%%s)" % _revset_predicates[ref_type]
revs = self._repo.revs(rev_spec, ref_name, ref_name)
except LookupError:
msg = ("Ambiguous identifier %s:%s for %s" % (ref_type, ref_name, self.name))
except RepoLookupError:
msg = ("Revision %s:%s does not exist for %s" % (ref_type, ref_name, self.name))
if revs:
revision = revs[-1]
# TODO: just report 'not found'?
revision = ref_name
return self._get_revision(revision)
def _get_archives(self, archive_name='tip'):
allowed = self.baseui.configlist("web", "allow_archive",
untrusted=True)
for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
if i[0] in allowed or self._repo.ui.configbool("web",
"allow" + i[0],
untrusted=True):
yield {"type": i[0], "extension": i[1], "node": archive_name}
def _get_url(self, url):
Returns normalized url. If schema is not given, would fall
to filesystem
(``file:///``) schema.
url = str(url)
if url != 'default' and not '://' in url:
url = "file:" + urllib.pathname2url(url)
return url
def get_hook_location(self):
returns absolute path to location where hooks are stored
return os.path.join(self.path, '.hg', '.hgrc')
def get_changeset(self, revision=None):
Returns ``MercurialChangeset`` object representing repository's
changeset at the given ``revision``.
revision = self._get_revision(revision)
changeset = MercurialChangeset(repository=self, revision=revision)
return changeset
def get_changesets(self, start=None, end=None, start_date=None,
end_date=None, branch_name=None, reverse=False):
Returns iterator of ``MercurialChangeset`` objects from start to end
(both are inclusive)
:param start: None, str, int or mercurial lookup format
:param end: None, str, int or mercurial lookup format
:param start_date:
:param end_date:
:param branch_name:
:param reversed: return changesets in reversed order
start_raw_id = self._get_revision(start)
start_pos = self.revisions.index(start_raw_id) if start else None
end_raw_id = self._get_revision(end)
end_pos = self.revisions.index(end_raw_id) if end else None
if None not in [start, end] and start_pos > end_pos:
raise RepositoryError("Start revision '%s' cannot be "
"after end revision '%s'" % (start, end))
if branch_name and branch_name not in self.allbranches.keys():
msg = ("Branch %s not found in %s" % (branch_name, self))
raise BranchDoesNotExistError(msg)
if end_pos is not None:
end_pos += 1
Status change: