# -*- coding: utf-8 -*-
"""
vcs.backends.git.repository
~~~~~~~~~~~~~~~~~~~~~~~~~~~
Git repository implementation.
:created_on: Apr 8, 2010
:copyright: (c) 2010-2011 by Marcin Kuzminski, Lukasz Balcerzak.
import os
import re
import time
import urllib
import urllib2
import logging
import posixpath
import string
try:
# Python <=2.7
from pipes import quote
except ImportError:
# Python 3.3+
from shlex import quote
import sys
if sys.platform == "win32":
from subprocess import list2cmdline
def quote(s):
return list2cmdline([s])
else:
from dulwich.objects import Tag
from dulwich.repo import Repo, NotGitRepository
from dulwich.config import ConfigFile
from kallithea.lib.vcs import subprocessio
from kallithea.lib.vcs.backends.base import BaseRepository, CollectionGenerator
from kallithea.lib.vcs.conf import settings
from kallithea.lib.vcs.exceptions import (
BranchDoesNotExistError, ChangesetDoesNotExistError, EmptyRepositoryError,
RepositoryError, TagAlreadyExistError, TagDoesNotExistError
)
from kallithea.lib.vcs.utils import safe_unicode, makedate, date_fromtimestamp
from kallithea.lib.vcs.utils.lazy import LazyProperty
from kallithea.lib.vcs.utils.ordered_dict import OrderedDict
from kallithea.lib.vcs.utils.paths import abspath, get_user_home
from kallithea.lib.vcs.utils.hgcompat import (
hg_url, httpbasicauthhandler, httpdigestauthhandler
from .changeset import GitChangeset
from .inmemory import GitInMemoryChangeset
from .workdir import GitWorkdir
SHA_PATTERN = re.compile(r'^[[0-9a-fA-F]{12}|[0-9a-fA-F]{40}]$')
log = logging.getLogger(__name__)
class GitRepository(BaseRepository):
Git repository backend.
DEFAULT_BRANCH_NAME = 'master'
scm = 'git'
def __init__(self, repo_path, create=False, src_url=None,
update_after_clone=False, bare=False):
self.path = abspath(repo_path)
repo = self._get_repo(create, src_url, update_after_clone, bare)
self.bare = repo.bare
@property
def _config_files(self):
return [
self.bare and abspath(self.path, 'config')
or abspath(self.path, '.git', 'config'),
abspath(get_user_home(), '.gitconfig'),
]
def _repo(self):
return Repo(self.path)
def head(self):
return self._repo.head()
except KeyError:
return None
def _empty(self):
Checks if repository is empty ie. without any changesets
self.revisions[0]
except (KeyError, IndexError):
return True
return False
@LazyProperty
def revisions(self):
Returns list of revisions' ids, in ascending order. Being lazy
attribute allows external tools to inject shas from cache.
return self._get_all_revisions()
@classmethod
def _run_git_command(cls, cmd, **opts):
Runs given ``cmd`` as git command and returns tuple
(stdout, stderr).
:param cmd: git command to be executed
:param opts: env options to pass into Subprocess command
if '_bare' in opts:
_copts = []
del opts['_bare']
_copts = ['-c', 'core.quotepath=false', ]
safe_call = False
if '_safe' in opts:
#no exc on failure
del opts['_safe']
safe_call = True
_str_cmd = False
if isinstance(cmd, basestring):
cmd = [cmd]
_str_cmd = True
gitenv = os.environ
# need to clean fix GIT_DIR !
if 'GIT_DIR' in gitenv:
del gitenv['GIT_DIR']
gitenv['GIT_CONFIG_NOGLOBAL'] = '1'
_git_path = settings.GIT_EXECUTABLE_PATH
cmd = [_git_path] + _copts + cmd
if _str_cmd:
cmd = ' '.join(cmd)
_opts = dict(
env=gitenv,
shell=True,
_opts.update(opts)
p = subprocessio.SubprocessIOChunker(cmd, **_opts)
except (EnvironmentError, OSError), err:
tb_err = ("Couldn't run git command (%s).\n"
"Original error was:%s\n" % (cmd, err))
log.error(tb_err)
if safe_call:
return '', err
raise RepositoryError(tb_err)
return ''.join(p.output), ''.join(p.error)
def run_git_command(self, cmd):
opts = {}
if os.path.isdir(self.path):
opts['cwd'] = self.path
return self._run_git_command(cmd, **opts)
def _check_url(cls, url):
Function will check given url and try to verify if it's a valid
link. Sometimes it may happened that git will issue basic
auth request that can cause whole API to hang when used from python
or other external calls.
On failures it'll raise urllib2.HTTPError, exception is also thrown
when the return code is non 200
# check first if it's not an local url
if os.path.isdir(url) or url.startswith('file:'):
if '+' in url[:url.find('://')]:
url = url[url.find('+') + 1:]
handlers = []
url_obj = hg_url(url)
test_uri, authinfo = url_obj.authinfo()
url_obj.passwd = '*****'
cleaned_uri = str(url_obj)
if not test_uri.endswith('info/refs'):
test_uri = test_uri.rstrip('/') + '/info/refs'
if authinfo:
#create a password manager
passmgr = urllib2.HTTPPasswordMgrWithDefaultRealm()
passmgr.add_password(*authinfo)
handlers.extend((httpbasicauthhandler(passmgr),
httpdigestauthhandler(passmgr)))
o = urllib2.build_opener(*handlers)
o.addheaders = [('User-Agent', 'git/1.7.8.0')] # fake some git
q = {"service": 'git-upload-pack'}
qs = '?%s' % urllib.urlencode(q)
cu = "%s%s" % (test_uri, qs)
req = urllib2.Request(cu, None, {})
resp = o.open(req)
if resp.code != 200:
Status change: