@@ -97,193 +97,193 @@ class GitRepository(BaseRepository):
"""
return self._get_all_revisions()
@classmethod
def _run_git_command(cls, cmd, cwd=None):
Runs given ``cmd`` as git command and returns output bytes in a tuple
(stdout, stderr) ... or raise RepositoryError.
:param cmd: git command to be executed
:param cwd: passed directly to subprocess
# need to clean fix GIT_DIR !
gitenv = dict(os.environ)
gitenv.pop('GIT_DIR', None)
gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
assert isinstance(cmd, list), cmd
cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd
try:
p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False)
except (EnvironmentError, OSError) as err:
# output from the failing process is in str(EnvironmentError)
msg = ("Couldn't run git command %s.\n"
"Subprocess failed with '%s': %s\n" %
(cmd, type(err).__name__, err)
).strip()
log.error(msg)
raise RepositoryError(msg)
stdout = b''.join(p.output)
stderr = b''.join(p.error)
finally:
p.close()
# TODO: introduce option to make commands fail if they have any stderr output?
if stderr:
log.debug('stderr from %s:\n%s', cmd, stderr)
else:
log.debug('stderr from %s: None', cmd)
return stdout, stderr
def run_git_command(self, cmd):
Runs given ``cmd`` as git command with cwd set to current repo.
Returns stdout as unicode str ... or raise RepositoryError.
cwd = None
if os.path.isdir(self.path):
cwd = self.path
stdout, _stderr = self._run_git_command(cmd, cwd=cwd)
return safe_str(stdout)
def _check_url(cls, url):
Function will check given url and try to verify if it's a valid
link. Sometimes it may happened that git will issue basic
auth request that can cause whole API to hang when used from python
or other external calls.
On failures it'll raise urllib2.HTTPError, exception is also thrown
when the return code is non 200
# check first if it's not an local url
if os.path.isdir(url) or url.startswith('file:'):
return True
if url.startswith('git://'):
if '+' in url[:url.find('://')]:
url = url[url.find('+') + 1:]
handlers = []
url_obj = mercurial.util.url(safe_bytes(url))
test_uri, authinfo = url_obj.authinfo()
if not test_uri.endswith(b'info/refs'):
test_uri = test_uri.rstrip(b'/') + b'/info/refs'
url_obj.passwd = b'*****'
cleaned_uri = str(url_obj)
if authinfo:
# create a password manager
passmgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
passmgr.add_password(*authinfo)
handlers.extend((mercurial.url.httpbasicauthhandler(passmgr),
mercurial.url.httpdigestauthhandler(passmgr)))
o = urllib.request.build_opener(*handlers)
o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
req = urllib.request.Request(
"%s?%s" % (
test_uri,
safe_str(test_uri),
urllib.parse.urlencode({"service": 'git-upload-pack'})
))
resp = o.open(req)
if resp.code != 200:
raise Exception('Return Code is not 200')
except Exception as e:
# means it cannot be cloned
raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
# now detect if it's proper git repo
gitdata = resp.read()
if b'service=git-upload-pack' not in gitdata:
raise urllib.error.URLError(
"url [%s] does not look like an git" % cleaned_uri)
def _get_repo(self, create, src_url=None, update_after_clone=False,
bare=False):
if create and os.path.exists(self.path):
raise RepositoryError("Location already exist")
if src_url and not create:
raise RepositoryError("Create should be set to True if src_url is "
"given (clone operation creates repository)")
if create and src_url:
GitRepository._check_url(src_url)
self.clone(src_url, update_after_clone, bare)
return Repo(self.path)
elif create:
os.makedirs(self.path)
if bare:
return Repo.init_bare(self.path)
return Repo.init(self.path)
except (NotGitRepository, OSError) as err:
raise RepositoryError(err)
def _get_all_revisions(self):
# we must check if this repo is not empty, since later command
# fails if it is. And it's cheaper to ask than throw the subprocess
# errors
self._repo.head()
except KeyError:
return []
rev_filter = settings.GIT_REV_FILTER
cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
so = self.run_git_command(cmd)
except RepositoryError:
# Can be raised for empty repositories
return so.splitlines()
def _get_all_revisions2(self):
# alternate implementation using dulwich
includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.items()
if type_ != b'T']
return [c.commit.id for c in self._repo.get_walker(include=includes)]
def _get_revision(self, revision):
Given any revision identifier, returns a 40 char string with revision hash.
if self._empty:
raise EmptyRepositoryError("There are no changesets yet")
if revision in (None, '', 'tip', 'HEAD', 'head', -1):
revision = -1
if isinstance(revision, int):
return self.revisions[revision]
except IndexError:
msg = "Revision %r does not exist for %s" % (revision, self.name)
raise ChangesetDoesNotExistError(msg)
if isinstance(revision, str):
if revision.isdigit() and (len(revision) < 12 or len(revision) == revision.count('0')):
return self.revisions[int(revision)]
msg = "Revision %r does not exist for %s" % (revision, self)
# get by branch/tag name
_ref_revision = self._parsed_refs.get(safe_bytes(revision))
if _ref_revision: # and _ref_revision[1] in [b'H', b'RH', b'T']:
return ascii_str(_ref_revision[0])
Status change: