# -*- coding: utf-8 -*-
"""
vcs.backends.git.repository
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Git repository implementation.
:created_on: Apr 8, 2010
:copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
import os
import re
import time
import urllib
import urllib2
import logging
import posixpath
import string
try:
# Python <=2.7
from pipes import quote
except ImportError:
# Python 3.3+
from shlex import quote
import sys
if sys.platform == "win32":
from subprocess import list2cmdline
def quote(s):
return list2cmdline([s])
else:
from dulwich.objects import Tag
from dulwich.repo import Repo, NotGitRepository
from dulwich.config import ConfigFile
from kallithea.lib.vcs import subprocessio
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
from kallithea.lib.vcs.conf import settings
from kallithea.lib.vcs.exceptions import (
BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError,
RepositoryError, TagAlreadyExistError, TagDoesNotExistError
)
from kallithea.lib.vcs.utils import safe_unicode, makedate, date_fromtimestamp
from kallithea.lib.vcs.utils.lazy import LazyProperty
from kallithea.lib.vcs.utils.ordered_dict import OrderedDict
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
from kallithea.lib.vcs.utils.hgcompat import (
hg_url, httpbasicauthhandler, httpdigestauthhandler
from .changeset import GitChangeset
from .inmemory import GitInMemoryChangeset
from .workdir import GitWorkdir
SHA_PATTERN = re.compile(r'^[[0-9a-fA-F]{12}|[0-9a-fA-F]{40}]$')
log = logging.getLogger(__name__)
class GitRepository(BaseRepository):
Git repository backend.
DEFAULT_BRANCH_NAME = 'master'
scm = 'git'
def __init__(self, repo_path, create=False, src_url=None,
update_after_clone=False, bare=False):
self.path = abspath(repo_path)
repo = self._get_repo(create, src_url, update_after_clone, bare)
self.bare = repo.bare
@property
def _config_files(self):
return [
self.bare and abspath(self.path, 'config')
or abspath(self.path, '.git', 'config'),
abspath(get_user_home(), '.gitconfig'),
]
def _repo(self):
return Repo(self.path)
def head(self):
return self._repo.head()
except KeyError:
return None
def _empty(self):
Checks if repository is empty ie. without any changesets
self.revisions[0]
except (KeyError, IndexError):
return True
return False
@LazyProperty
def revisions(self):
Returns list of revisions' ids, in ascending order. Being lazy
attribute allows external tools to inject shas from cache.
return self._get_all_revisions()
@classmethod
def _run_git_command(cls, cmd, **opts):
Runs given ``cmd`` as git command and returns tuple
(stdout, stderr).
:param cmd: git command to be executed
:param opts: env options to pass into Subprocess command
if '_bare' in opts:
_copts = []
del opts['_bare']
_copts = ['-c', 'core.quotepath=false', ]
safe_call = False
if '_safe' in opts:
#no exc on failure
del opts['_safe']
safe_call = True
_str_cmd = False
if isinstance(cmd, basestring):
cmd = [cmd]
_str_cmd = True
gitenv = os.environ
# need to clean fix GIT_DIR !
if 'GIT_DIR' in gitenv:
del gitenv['GIT_DIR']
gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
_git_path = settings.GIT_EXECUTABLE_PATH
cmd = [_git_path] + _copts + cmd
if _str_cmd:
cmd = ' '.join(cmd)
_opts = dict(
env=gitenv,
shell=True,
_opts.update(opts)
p = subprocessio.SubprocessIOChunker(cmd, **_opts)
except (EnvironmentError, OSError), err:
tb_err = ("Couldn't run git command (%s).\n"
"Original error was:%s\n" % (cmd, err))
log.error(tb_err)
if safe_call:
return '', err
raise RepositoryError(tb_err)
return ''.join(p.output), ''.join(p.error)
def run_git_command(self, cmd):
opts = {}
if os.path.isdir(self.path):
opts['cwd'] = self.path
return self._run_git_command(cmd, **opts)
def _check_url(cls, url):
Function will check given url and try to verify if it's a valid
link. Sometimes it may happened that git will issue basic
auth request that can cause whole API to hang when used from python
or other external calls.
On failures it'll raise urllib2.HTTPError, exception is also thrown
when the return code is non 200
# check first if it's not an local url
if os.path.isdir(url) or url.startswith('file:'):
if '+' in url[:url.find('://')]:
url = url[url.find('+') + 1:]
handlers = []
url_obj = hg_url(url)
test_uri, authinfo = url_obj.authinfo()
url_obj.passwd = '*****'
cleaned_uri = str(url_obj)
if not test_uri.endswith('info/refs'):
test_uri = test_uri.rstrip('/') + '/info/refs'
if authinfo:
#create a password manager
passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
passmgr.add_password(*authinfo)
handlers.extend((httpbasicauthhandler(passmgr),
httpdigestauthhandler(passmgr)))
o = urllib2.build_opener(*handlers)
o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
q = {"service": 'git-upload-pack'}
qs = '?%s' % urllib.urlencode(q)
cu = "%s%s" % (test_uri, qs)
req = urllib2.Request(cu, None, {})
resp = o.open(req)
if resp.code != 200:
raise Exception('Return Code is not 200')
except Exception, e:
# means it cannot be cloned
raise urllib2.URLError("[%s] org_exc: %s" % (cleaned_uri, e))
# now detect if it's proper git repo
gitdata = resp.read()
if not 'service=git-upload-pack' in gitdata:
raise urllib2.URLError(
"url [%s] does not look like an git" % (cleaned_uri))
def _get_repo(self, create, src_url=None, update_after_clone=False,
bare=False):
if create and os.path.exists(self.path):
raise RepositoryError("Location already exist")
if src_url and not create:
raise RepositoryError("Create should be set to True if src_url is "
"given (clone operation creates repository)")
if create and src_url:
GitRepository._check_url(src_url)
self.clone(src_url, update_after_clone, bare)
elif create:
os.makedirs(self.path)
if bare:
return Repo.init_bare(self.path)
return Repo.init(self.path)
return self._repo
except (NotGitRepository, OSError), err:
raise RepositoryError(err)
def _get_all_revisions(self):
# we must check if this repo is not empty, since later command
# fails if it is. And it's cheaper to ask than throw the subprocess
# errors
self._repo.head()
return []
rev_filter = settings.GIT_REV_FILTER
cmd = 'rev-list %s --reverse --date-order' % (rev_filter)
so, se = self.run_git_command(cmd)
except RepositoryError:
# Can be raised for empty repositories
return so.splitlines()
def _get_all_revisions2(self):
#alternate implementation using dulwich
includes = [x[1][0] for x in self._parsed_refs.iteritems()
if x[1][1] != 'T']
return [c.commit.id for c in self._repo.get_walker(include=includes)]
def _get_revision(self, revision):
For git backend we always return integer here. This way we ensure
that changeset's revision attribute would become integer.
is_null = lambda o: len(o) == revision.count('0')
if self._empty:
raise EmptyRepositoryError("There are no changesets yet")
if revision in (None, '', 'tip', 'HEAD', 'head', -1):
return self.revisions[-1]
is_bstr = isinstance(revision, (str, unicode))
if ((is_bstr and revision.isdigit() and len(revision) < 12)
or isinstance(revision, int) or is_null(revision)):
revision = self.revisions[int(revision)]
except IndexError:
msg = ("Revision %s does not exist for %s" % (revision, self))
raise ChangesetDoesNotExistError(msg)
elif is_bstr:
# get by branch/tag name
_ref_revision = self._parsed_refs.get(revision)
if _ref_revision: # and _ref_revision[1] in ['H', 'RH', 'T']:
return _ref_revision[0]
_tags_shas = self.tags.values()
# maybe it's a tag ? we don't have them in self.revisions
if revision in _tags_shas:
return _tags_shas[_tags_shas.index(revision)]
elif not SHA_PATTERN.match(revision) or revision not in self.revisions:
# Ensure we return full id
if not SHA_PATTERN.match(str(revision)):
raise ChangesetDoesNotExistError("Given revision %s not recognized"
% revision)
return revision
def get_ref_revision(self, ref_type, ref_name):
Returns ``MercurialChangeset`` object representing repository's
changeset at the given ``revision``.
return self._get_revision(ref_name)
def _get_archives(self, archive_name='tip'):
for i in [('zip', '.zip'), ('gz', '.tar.gz'), ('bz2', '.tar.bz2')]:
yield {"type": i[0], "extension": i[1], "node": archive_name}
def _get_url(self, url):
Returns normalized url. If schema is not given, would fall to
filesystem (``file:///``) schema.
url = str(url)
if url != 'default' and not '://' in url:
url = ':///'.join(('file', url))
return url
def get_hook_location(self):
returns absolute path to location where hooks are stored
loc = os.path.join(self.path, 'hooks')
if not self.bare:
loc = os.path.join(self.path, '.git', 'hooks')
return loc
def name(self):
return os.path.basename(self.path)
def last_change(self):
Returns last change made on this repository as datetime object
return date_fromtimestamp(self._get_mtime(), makedate()[1])
def _get_mtime(self):
return time.mktime(self.get_changeset().date.timetuple())
idx_loc = '' if self.bare else '.git'
# fallback to filesystem
in_path = os.path.join(self.path, idx_loc, "index")
he_path = os.path.join(self.path, idx_loc, "HEAD")
if os.path.exists(in_path):
return os.stat(in_path).st_mtime
return os.stat(he_path).st_mtime
def description(self):
undefined_description = u'unknown'
_desc = self._repo.get_description()
return safe_unicode(_desc or undefined_description)
def contact(self):
undefined_contact = u'Unknown'
return undefined_contact
def branches(self):
if not self.revisions:
return {}
sortkey = lambda ctx: ctx[0]
_branches = [(x[0], x[1][0])
for x in self._parsed_refs.iteritems() if x[1][1] == 'H']
return OrderedDict(sorted(_branches, key=sortkey, reverse=False))
def closed_branches(self):
def tags(self):
return self._get_tags()
def _get_tags(self):
Status change: