@@ -225,96 +225,99 @@ class MercurialRepository(BaseRepository
:param rev1: Entry point from which diff is shown. Can be
``self.EMPTY_CHANGESET`` - in this case, patch showing all
the changes since empty state of the repository until ``rev2``
:param rev2: Until which revision changes should be shown.
:param ignore_whitespace: If set to ``True``, would not show whitespace
changes. Defaults to ``False``.
:param context: How many lines before/after changed lines should be
shown. Defaults to ``3``.
"""
if hasattr(rev1, 'raw_id'):
rev1 = getattr(rev1, 'raw_id')
if hasattr(rev2, 'raw_id'):
rev2 = getattr(rev2, 'raw_id')
# Check if given revisions are present at repository (may raise
# ChangesetDoesNotExistError)
if rev1 != self.EMPTY_CHANGESET:
self.get_changeset(rev1)
self.get_changeset(rev2)
file_filter = match(self.path, '', [path])
return ''.join(patch.diff(self._repo, rev1, rev2, match=file_filter,
opts=diffopts(git=True,
ignorews=ignore_whitespace,
context=context)))
def _check_url(self, url):
Function will check given url and try to verify if it's a valid
link. Sometimes it may happened that mercurial will issue basic
auth request that can cause whole API to hang when used from python
or other external calls.
On failures it'll raise urllib2.HTTPError, return code 200 if url
is valid or True if it's a local path
from mercurial.util import url as Url
# those authnadlers are patched for python 2.6.5 bug an
# infinit looping when given invalid resources
from mercurial.url import httpbasicauthhandler, httpdigestauthhandler
# check first if it's not an local url
if os.path.isdir(url) or url.startswith('file:'):
return True
if('+' in url[:url.find('://')]):
url = url[url.find('+')+1:]
handlers = []
test_uri, authinfo = Url(url).authinfo()
if authinfo:
#create a password manager
passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
passmgr.add_password(*authinfo)
handlers.extend((httpbasicauthhandler(passmgr),
httpdigestauthhandler(passmgr)))
o = urllib2.build_opener(*handlers)
o.addheaders = [('Content-Type', 'application/mercurial-0.1'),
('Accept', 'application/mercurial-0.1')]
q = {"cmd": 'between'}
q.update({'pairs': "%s-%s" % ('0' * 40, '0' * 40)})
qs = '?%s' % urllib.urlencode(q)
cu = "%s%s" % (test_uri, qs)
req = urllib2.Request(cu, None, {})
try:
resp = o.open(req)
return resp.code == 200
except Exception, e:
# means it cannot be cloned
raise urllib2.URLError(e)
def _get_repo(self, create, src_url=None, update_after_clone=False):
Function will check for mercurial repository in given path and return
a localrepo object. If there is no repository in that path it will
raise an exception unless ``create`` parameter is set to True - in
that case repository would be created and returned.
If ``src_url`` is given, would try to clone repository from the
location at given clone_point. Additionally it'll make update to
working copy accordingly to ``update_after_clone`` flag
if src_url:
url = str(self._get_url(src_url))
opts = {}
if not update_after_clone:
opts.update({'noupdate': True})
self._check_url(url)
clone(self.baseui, url, self.path, **opts)
# except urllib2.URLError:
@@ -325,135 +325,132 @@ def ValidRepoName(edit=False, old_data={
rename = old_data.get('repo_name') != repo_name_full
create = not edit
if rename or create:
if group_path != '':
if Repository.get_by_repo_name(repo_name_full):
msg = M(self, 'repository_in_group_exists', state,
repo=repo_name, group=group_name)
raise formencode.Invalid(msg, value, state,
error_dict=dict(repo_name=msg)
)
elif RepoGroup.get_by_group_name(repo_name_full):
msg = M(self, 'same_group_exists', state,
repo=repo_name)
elif Repository.get_by_repo_name(repo_name_full):
msg = M(self, 'repository_exists', state,
return value
return _validator
def ValidForkName(*args, **kwargs):
return ValidRepoName(*args, **kwargs)
def SlugifyName():
class _validator(formencode.validators.FancyValidator):
def _to_python(self, value, state):
return repo_name_slug(value)
def validate_python(self, value, state):
pass
def ValidCloneUri():
from rhodecode.lib.utils import make_ui
def url_handler(repo_type, url, proto, ui=None):
def url_handler(repo_type, url, ui=None):
if repo_type == 'hg':
from mercurial.httprepo import httprepository, httpsrepository
if proto == 'https':
if url.startswith('https'):
httpsrepository(make_ui('db'), url).capabilities
elif proto == 'http':
elif url.startswith('http'):
httprepository(make_ui('db'), url).capabilities
elif url.startswith('svn+http'):
from hgsubversion.svnrepo import svnremoterepo
svnremoterepo(make_ui('db'), url).capabilities
elif repo_type == 'git':
#TODO: write a git url validator
messages = {
'clone_uri': _(u'invalid clone url'),
'invalid_clone_uri': _(u'Invalid clone url, provide a '
'valid clone http\s url')
'valid clone http(s)/svn+http(s) url')
}
repo_type = value.get('repo_type')
url = value.get('clone_uri')
if not url:
elif url.startswith('https') or url.startswith('http'):
_type = 'https' if url.startswith('https') else 'http'
else:
url_handler(repo_type, url, _type, make_ui('db'))
url_handler(repo_type, url, make_ui('db'))
except Exception:
log.exception('Url validation failed')
msg = M(self, 'clone_uri')
error_dict=dict(clone_uri=msg)
msg = M(self, 'invalid_clone_uri', state)
def ValidForkType(old_data={}):
'invalid_fork_type': _(u'Fork have to be the same type as parent')
if old_data['repo_type'] != value:
msg = M(self, 'invalid_fork_type', state)
error_dict=dict(repo_type=msg)
def ValidPerms(type_='repo'):
if type_ == 'group':
EMPTY_PERM = 'group.none'
elif type_ == 'repo':
EMPTY_PERM = 'repository.none'
'perm_new_member_name':
_(u'This username or users group name is not valid')
def to_python(self, value, state):
perms_update = []
perms_new = []
# build a list of permission to update and new permission to create
for k, v in value.items():
# means new added member to permissions
if k.startswith('perm_new_member'):
new_perm = value.get('perm_new_member', False)
new_member = value.get('perm_new_member_name', False)
new_type = value.get('perm_new_member_type')
if new_member and new_perm:
if (new_member, new_perm, new_type) not in perms_new:
perms_new.append((new_member, new_perm, new_type))
elif k.startswith('u_perm_') or k.startswith('g_perm_'):
member = k[7:]
t = {'u': 'user',
'g': 'users_group'
Status change: