@@ -237,138 +237,140 @@ class MercurialRepository(BaseRepository
Returns (git like) *diff*, as plain text. Shows changes introduced by
``rev2`` since ``rev1``.
:param rev1: Entry point from which diff is shown. Can be
``self.EMPTY_CHANGESET`` - in this case, patch showing all
the changes since empty state of the repository until ``rev2``
:param rev2: Until which revision changes should be shown.
:param ignore_whitespace: If set to ``True``, would not show whitespace
changes. Defaults to ``False``.
:param context: How many lines before/after changed lines should be
shown. Defaults to ``3``.
"""
if hasattr(rev1, 'raw_id'):
rev1 = getattr(rev1, 'raw_id')
if hasattr(rev2, 'raw_id'):
rev2 = getattr(rev2, 'raw_id')
# Check if given revisions are present at repository (may raise
# ChangesetDoesNotExistError)
if rev1 != self.EMPTY_CHANGESET:
self.get_changeset(rev1)
self.get_changeset(rev2)
if path:
file_filter = match(self.path, '', [path])
else:
file_filter = None
return ''.join(patch.diff(self._repo, rev1, rev2, match=file_filter,
opts=diffopts(git=True,
ignorews=ignore_whitespace,
context=context)))
@classmethod
def _check_url(cls, url, repoui=None):
Function will check given url and try to verify if it's a valid
link. Sometimes it may happened that mercurial will issue basic
auth request that can cause whole API to hang when used from python
or other external calls.
On failures it'll raise urllib2.HTTPError, exception is also thrown
when the return code is non 200
# check first if it's not an local url
if os.path.isdir(url) or url.startswith('file:'):
return True
url_prefix = None
if '+' in url[:url.find('://')]:
url = url[url.find('+') + 1:]
url_prefix, url = url.split('+', 1)
handlers = []
url_obj = hg_url(url)
test_uri, authinfo = url_obj.authinfo()
url_obj.passwd = '*****'
cleaned_uri = str(url_obj)
if authinfo:
#create a password manager
passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
passmgr.add_password(*authinfo)
handlers.extend((httpbasicauthhandler(passmgr),
httpdigestauthhandler(passmgr)))
o = urllib2.build_opener(*handlers)
o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
('Accept', 'application/mercurial-0.1')]
q = {"cmd": 'between'}
q.update({'pairs': "%s-%s" % ('0' * 40, '0' * 40)})
qs = '?%s' % urllib.urlencode(q)
cu = "%s%s" % (test_uri, qs)
req = urllib2.Request(cu, None, {})
try:
resp = o.open(req)
if resp.code != 200:
raise Exception('Return Code is not 200')
except Exception, e:
# means it cannot be cloned
raise urllib2.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
# now check if it's a proper hg repo
httppeer(repoui or ui.ui(), url).lookup('tip')
raise urllib2.URLError(
"url [%s] does not look like an hg repo org_exc: %s"
% (cleaned_uri, e))
if not url_prefix: # skip svn+http://... (and git+... too)
def _get_repo(self, create, src_url=None, update_after_clone=False):
Function will check for mercurial repository in given path and return
a localrepo object. If there is no repository in that path it will
raise an exception unless ``create`` parameter is set to True - in
that case repository would be created and returned.
If ``src_url`` is given, would try to clone repository from the
location at given clone_point. Additionally it'll make update to
working copy accordingly to ``update_after_clone`` flag
if src_url:
url = str(self._get_url(src_url))
opts = {}
if not update_after_clone:
opts.update({'noupdate': True})
MercurialRepository._check_url(url, self.baseui)
clone(self.baseui, url, self.path, **opts)
# Don't try to create if we've already cloned repo
create = False
return localrepository(self.baseui, self.path, create=create)
except (Abort, RepoError), err:
if create:
msg = "Cannot create repository at %s. Original error was %s"\
% (self.path, err)
msg = "Not valid repository at %s. Original error was %s"\
raise RepositoryError(msg)
@LazyProperty
def in_memory_changeset(self):
return MercurialInMemoryChangeset(self)
def description(self):
undefined_description = u'unknown'
_desc = self._repo.ui.config('web', 'description', None, untrusted=True)
return safe_unicode(_desc or undefined_description)
def contact(self):
undefined_contact = u'Unknown'
Status change: