# -*- coding: utf-8 -*-
"""
vcs.backends.git.repository
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Git repository implementation.
:created_on: Apr 8, 2010
:copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
import errno
import logging
import os
import re
import time
import urllib.error
import urllib.parse
import urllib.request
from collections import OrderedDict
import mercurial.url # import httpbasicauthhandler, httpdigestauthhandler
import mercurial.util # import url as hg_url
from dulwich.config import ConfigFile
from dulwich.objects import Tag
from dulwich.repo import NotGitRepository, Repo
from kallithea.lib.vcs import subprocessio
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
from kallithea.lib.vcs.conf import settings
from kallithea.lib.vcs.exceptions import (BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError, RepositoryError, TagAlreadyExistError,
TagDoesNotExistError)
from kallithea.lib.vcs.utils import ascii_str, date_fromtimestamp, makedate, safe_bytes, safe_str
from kallithea.lib.vcs.utils.lazy import LazyProperty
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
from .changeset import GitChangeset
from .inmemory import GitInMemoryChangeset
from .workdir import GitWorkdir
SHA_PATTERN = re.compile(r'^([0-9a-fA-F]{12}|[0-9a-fA-F]{40})$')
log = logging.getLogger(__name__)
class GitRepository(BaseRepository):
Git repository backend.
DEFAULT_BRANCH_NAME = 'master'
scm = 'git'
def __init__(self, repo_path, create=False, src_url=None,
update_after_clone=False, bare=False):
self.path = abspath(repo_path)
self.repo = self._get_repo(create, src_url, update_after_clone, bare)
self.bare = self.repo.bare
@property
def _config_files(self):
return [
self.bare and abspath(self.path, 'config')
or abspath(self.path, '.git', 'config'),
abspath(get_user_home(), '.gitconfig'),
]
def _repo(self):
return self.repo
def head(self):
try:
return self._repo.head()
except KeyError:
return None
def _empty(self):
Checks if repository is empty ie. without any changesets
self.revisions[0]
except (KeyError, IndexError):
return True
return False
@LazyProperty
def revisions(self):
Returns list of revisions' ids, in ascending order. Being lazy
attribute allows external tools to inject shas from cache.
return self._get_all_revisions()
@classmethod
def _run_git_command(cls, cmd, cwd=None):
Runs given ``cmd`` as git command and returns output bytes in a tuple
(stdout, stderr) ... or raise RepositoryError.
:param cmd: git command to be executed
:param cwd: passed directly to subprocess
# need to clean fix GIT_DIR !
gitenv = dict(os.environ)
gitenv.pop('GIT_DIR', None)
gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
assert isinstance(cmd, list), cmd
cmd = [settings.GIT_EXECUTABLE_PATH, '-c', 'core.quotepath=false'] + cmd
p = subprocessio.SubprocessIOChunker(cmd, cwd=cwd, env=gitenv, shell=False)
except (EnvironmentError, OSError) as err:
# output from the failing process is in str(EnvironmentError)
msg = ("Couldn't run git command %s.\n"
"Subprocess failed with '%s': %s\n" %
(cmd, type(err).__name__, err)
).strip()
log.error(msg)
raise RepositoryError(msg)
stdout = b''.join(p.output)
stderr = b''.join(p.error)
finally:
p.close()
# TODO: introduce option to make commands fail if they have any stderr output?
if stderr:
log.debug('stderr from %s:\n%s', cmd, stderr)
else:
log.debug('stderr from %s: None', cmd)
return stdout, stderr
def run_git_command(self, cmd):
Runs given ``cmd`` as git command with cwd set to current repo.
Returns stdout as unicode str ... or raise RepositoryError.
cwd = None
if os.path.isdir(self.path):
cwd = self.path
stdout, _stderr = self._run_git_command(cmd, cwd=cwd)
return safe_str(stdout)
def _check_url(cls, url):
Function will check given url and try to verify if it's a valid
link. Sometimes it may happened that git will issue basic
auth request that can cause whole API to hang when used from python
or other external calls.
On failures it'll raise urllib2.HTTPError, exception is also thrown
when the return code is non 200
# check first if it's not an local url
if os.path.isdir(url) or url.startswith('file:'):
if url.startswith('git://'):
if '+' in url[:url.find('://')]:
url = url[url.find('+') + 1:]
handlers = []
url_obj = mercurial.util.url(safe_bytes(url))
test_uri, authinfo = url_obj.authinfo()
if not test_uri.endswith(b'info/refs'):
test_uri = test_uri.rstrip(b'/') + b'/info/refs'
url_obj.passwd = b'*****'
cleaned_uri = str(url_obj)
if authinfo:
# create a password manager
passmgr = urllib.request.HTTPPasswordMgrWithDefaultRealm()
passmgr.add_password(*authinfo)
handlers.extend((mercurial.url.httpbasicauthhandler(passmgr),
mercurial.url.httpdigestauthhandler(passmgr)))
o = urllib.request.build_opener(*handlers)
o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
req = urllib.request.Request(
"%s?%s" % (
test_uri,
safe_str(test_uri),
urllib.parse.urlencode({"service": 'git-upload-pack'})
))
resp = o.open(req)
if resp.code != 200:
raise Exception('Return Code is not 200')
except Exception as e:
# means it cannot be cloned
raise urllib.error.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
# now detect if it's proper git repo
gitdata = resp.read()
if b'service=git-upload-pack' not in gitdata:
raise urllib.error.URLError(
"url [%s] does not look like an git" % cleaned_uri)
def _get_repo(self, create, src_url=None, update_after_clone=False,
bare=False):
if create and os.path.exists(self.path):
raise RepositoryError("Location already exist")
if src_url and not create:
raise RepositoryError("Create should be set to True if src_url is "
"given (clone operation creates repository)")
if create and src_url:
GitRepository._check_url(src_url)
self.clone(src_url, update_after_clone, bare)
return Repo(self.path)
elif create:
os.makedirs(self.path)
if bare:
return Repo.init_bare(self.path)
return Repo.init(self.path)
except (NotGitRepository, OSError) as err:
raise RepositoryError(err)
def _get_all_revisions(self):
# we must check if this repo is not empty, since later command
# fails if it is. And it's cheaper to ask than throw the subprocess
# errors
self._repo.head()
return []
rev_filter = settings.GIT_REV_FILTER
cmd = ['rev-list', rev_filter, '--reverse', '--date-order']
so = self.run_git_command(cmd)
except RepositoryError:
# Can be raised for empty repositories
return so.splitlines()
def _get_all_revisions2(self):
# alternate implementation using dulwich
includes = [ascii_str(sha) for key, (sha, type_) in self._parsed_refs.items()
if type_ != b'T']
return [c.commit.id for c in self._repo.get_walker(include=includes)]
def _get_revision(self, revision):
Given any revision identifier, returns a 40 char string with revision hash.
if self._empty:
raise EmptyRepositoryError("There are no changesets yet")
if revision in (None, '', 'tip', 'HEAD', 'head', -1):
revision = -1
if isinstance(revision, int):
return self.revisions[revision]
except IndexError:
msg = "Revision %r does not exist for %s" % (revision, self.name)
raise ChangesetDoesNotExistError(msg)
if isinstance(revision, str):
if revision.isdigit() and (len(revision) < 12 or len(revision) == revision.count('0')):
return self.revisions[int(revision)]
msg = "Revision %r does not exist for %s" % (revision, self)
# get by branch/tag name
_ref_revision = self._parsed_refs.get(safe_bytes(revision))
if _ref_revision: # and _ref_revision[1] in [b'H', b'RH', b'T']:
return ascii_str(_ref_revision[0])
if revision in self.revisions:
return revision
# maybe it's a tag ? we don't have them in self.revisions
if revision in self.tags.values():
if SHA_PATTERN.match(revision):
raise ChangesetDoesNotExistError("Given revision %r not recognized" % revision)
def get_ref_revision(self, ref_type, ref_name):
Returns ``GitChangeset`` object representing repository's
changeset at the given ``revision``.
return self._get_revision(ref_name)
def _get_archives(self, archive_name='tip'):
for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
yield {"type": i[0], "extension": i[1], "node": archive_name}
def _get_url(self, url):
Returns normalized url. If schema is not given, would fall to
filesystem (``file:///``) schema.
if url != 'default' and '://' not in url:
url = ':///'.join(('file', url))
return url
def name(self):
return os.path.basename(self.path)
def last_change(self):
Returns last change made on this repository as datetime object
return date_fromtimestamp(self._get_mtime(), makedate()[1])
def _get_mtime(self):
return time.mktime(self.get_changeset().date.timetuple())
idx_loc = '' if self.bare else '.git'
# fallback to filesystem
in_path = os.path.join(self.path, idx_loc, "index")
he_path = os.path.join(self.path, idx_loc, "HEAD")
if os.path.exists(in_path):
return os.stat(in_path).st_mtime
return os.stat(he_path).st_mtime
def description(self):
return safe_str(self._repo.get_description() or b'unknown')
def contact(self):
undefined_contact = 'Unknown'
return undefined_contact
def branches(self):
if not self.revisions:
return {}
_branches = [(safe_str(key), ascii_str(sha))
for key, (sha, type_) in self._parsed_refs.items() if type_ == b'H']
return OrderedDict(sorted(_branches, key=(lambda ctx: ctx[0]), reverse=False))
def closed_branches(self):
def tags(self):
return self._get_tags()
def _get_tags(self):
_tags = [(safe_str(key), ascii_str(sha))
for key, (sha, type_) in self._parsed_refs.items() if type_ == b'T']
return OrderedDict(sorted(_tags, key=(lambda ctx: ctx[0]), reverse=True))
def tag(self, name, user, revision=None, message=None, date=None,
**kwargs):
Creates and returns a tag for the given ``revision``.
:param name: name for new tag
Status change: