Changeset - 141066b8a89a
[Not reviewed]
default
0 6 0
Mads Kiilerich (mads) - 6 years ago 2020-01-25 20:06:36
mads@kiilerich.com
Grafted from: de3c11c9e9a7
tests: minor doctest updates for py3
6 files changed with 32 insertions and 32 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/inifile.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
kallithea.lib.inifile
 
~~~~~~~~~~~~~~~~~~~~~
 

	
 
Handling of .ini files, mainly creating them from Mako templates and adding
 
other custom values.
 
"""
 

	
 
import logging
 
import os
 
import re
 

	
 
import mako.template
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
template_file = os.path.join(
 
    os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
 
    'kallithea/lib/paster_commands/template.ini.mako')
 

	
 
default_variables = {
 
    'database_engine': 'sqlite',
 
    'http_server': 'waitress',
 
    'host': '127.0.0.1',
 
    'port': '5000',
 
    'uuid': lambda: 'VERY-SECRET',
 
}
 

	
 

	
 
def expand(template, mako_variable_values, settings):
 
    """Expand mako template and tweak it.
 
    Not entirely stable for random templates as input, but good enough for our
 
    single template.
 

	
 
    >>> template = '''
 
    ... [first-section]
 
    ...
 
    ... variable=${mako_variable}
 
    ... variable2  =\tvalue after tab
 
    ... ## This section had some whitespace and stuff
 
    ...
 
    ...
 
    ... # ${mako_function()}
 
    ... [second-section]
 
    ... %if conditional_options == 'option-a':
 
    ... # option a was chosen
 
    ... %elif conditional_options == 'option-b':
 
    ... some_variable = "never mind - option-b will not be used anyway ..."
 
    ... %endif
 
    ... '''
 
    >>> selected_mako_conditionals = []
 
    >>> mako_variable_values = {'mako_variable': 'VALUE', 'mako_function': (lambda: 'FUNCTION RESULT'),
 
    ...                         'conditional_options': 'option-a'}
 
    >>> settings = { # only partially used
 
    ...     '[first-section]': {'variable2': 'VAL2', 'first_extra': 'EXTRA'},
 
    ...     '[third-section]': {'third_extra': ' 3'},
 
    ...     '[fourth-section]': {'fourth_extra': '4', 'fourth': '"four"'},
 
    ... }
 
    >>> print expand(template, mako_variable_values, settings)
 
    >>> print(expand(template, mako_variable_values, settings))
 
    <BLANKLINE>
 
    [first-section]
 
    <BLANKLINE>
 
    variable=VALUE
 
    #variable2  =    value after tab
 
    variable2 = VAL2
 
    <BLANKLINE>
 
    first_extra = EXTRA
 
    <BLANKLINE>
 
    <BLANKLINE>
 
    # FUNCTION RESULT
 
    [second-section]
 
    # option a was chosen
 
    <BLANKLINE>
 
    [fourth-section]
 
    fourth = "four"
 
    fourth_extra = 4
 
    <BLANKLINE>
 
    [third-section]
 
    third_extra =  3
 
    <BLANKLINE>
 
    """
 
    mako_variables = dict(default_variables)
 
    mako_variables.update(mako_variable_values or {})
 
    settings = dict((k, dict(v)) for k, v in settings.items()) # deep copy before mutating
 

	
 
    ini_lines = mako.template.Template(template).render(**mako_variables)
 

	
 
    def process_section(m):
 
        """process a ini section, replacing values as necessary"""
 
        sectionname, lines = m.groups()
 
        if sectionname in settings:
 
            section_settings = settings.pop(sectionname)
 

	
 
            def process_line(m):
 
                """process a section line and update value if necessary"""
 
                key, value = m.groups()
 
                line = m.group(0)
 
                if key in section_settings:
 
                    new_line = '%s = %s' % (key, section_settings.pop(key))
 
                    if new_line != line:
 
                        # keep old entry as example - comments might refer to it
 
                        line = '#%s\n%s' % (line, new_line)
 
                return line.rstrip()
 

	
 
            # process lines that not are comments or empty and look like name=value
 
            lines = re.sub(r'^([^#\n\s]*)[ \t]*=[ \t]*(.*)$', process_line, lines, flags=re.MULTILINE)
 
            # add unused section settings
 
            if section_settings:
 
                lines += '\n' + ''.join('%s = %s\n' % (key, value) for key, value in sorted(section_settings.items()))
 

	
 
        return sectionname + '\n' + lines
 

	
 
    # process sections until comments before next section or end
 
    ini_lines = re.sub(r'''^
 
        (\[.*\])\n
 
        # after the section name, a number of chunks with:
 
        (
 
            (?:
 
                # a number of comments or empty lines
 
                (?:[#].*\n|\n)*
 
                # one or more non-empty non-comments non-section-start lines
 
                (?:[^\n#[].*\n)+
 
                # a number of comments - not empty lines
 
                (?:[#].*\n)*
 
            )*
 
        )
 
        ''',
 
        process_section, ini_lines, flags=re.MULTILINE | re.VERBOSE) \
 
        + \
 
        ''.join(
 
            '\n' + sectionname + '\n' + ''.join('%s = %s\n' % (key, value) for key, value in sorted(section_settings.items()))
 
            for sectionname, section_settings in sorted(settings.items())
 
            if section_settings)
 

	
 
    return ini_lines
 

	
 

	
 
def create(dest_file, mako_variable_values, settings):
 
    """Create an ini file at dest_file"""
 
    with open(template_file, 'rb') as f:
 
        template = f.read().decode('utf-8')
 

	
 
    ini_lines = expand(template, mako_variable_values, settings)
 

	
 
    with open(dest_file, 'wb') as f:
 
        f.write(ini_lines.encode('utf-8'))
kallithea/lib/markup_renderer.py
Show inline comments
 
@@ -26,223 +26,223 @@ Original author and date, and relevant c
 
"""
 

	
 

	
 
import logging
 
import re
 
import traceback
 

	
 
import bleach
 
import markdown as markdown_mod
 

	
 
from kallithea.lib.utils2 import MENTIONS_REGEX, safe_str
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
url_re = re.compile(r'''\bhttps?://(?:[\da-zA-Z0-9@:.-]+)'''
 
                    r'''(?:[/a-zA-Z0-9_=@#~&+%.,:;?!*()-]*[/a-zA-Z0-9_=@#~])?''')
 

	
 

	
 
class MarkupRenderer(object):
 
    RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES = ['include', 'meta', 'raw']
 

	
 
    MARKDOWN_PAT = re.compile(r'md|mkdn?|mdown|markdown', re.IGNORECASE)
 
    RST_PAT = re.compile(r're?st', re.IGNORECASE)
 
    PLAIN_PAT = re.compile(r'readme', re.IGNORECASE)
 

	
 
    @classmethod
 
    def _detect_renderer(cls, source, filename):
 
        """
 
        runs detection of what renderer should be used for generating html
 
        from a markup language
 

	
 
        filename can be also explicitly a renderer name
 
        """
 

	
 
        if cls.MARKDOWN_PAT.findall(filename):
 
            return cls.markdown
 
        elif cls.RST_PAT.findall(filename):
 
            return cls.rst
 
        elif cls.PLAIN_PAT.findall(filename):
 
            return cls.rst
 
        return cls.plain
 

	
 
    @classmethod
 
    def _flavored_markdown(cls, text):
 
        """
 
        Github style flavored markdown
 

	
 
        :param text:
 
        """
 
        from hashlib import md5
 

	
 
        # Extract pre blocks.
 
        extractions = {}
 

	
 
        def pre_extraction_callback(matchobj):
 
            digest = md5(matchobj.group(0)).hexdigest()
 
            extractions[digest] = matchobj.group(0)
 
            return "{gfm-extraction-%s}" % digest
 
        pattern = re.compile(r'<pre>.*?</pre>', re.MULTILINE | re.DOTALL)
 
        text = re.sub(pattern, pre_extraction_callback, text)
 

	
 
        # Prevent foo_bar_baz from ending up with an italic word in the middle.
 
        def italic_callback(matchobj):
 
            s = matchobj.group(0)
 
            if list(s).count('_') >= 2:
 
                return s.replace('_', r'\_')
 
            return s
 
        text = re.sub(r'^(?! {4}|\t)\w+_\w+_\w[\w_]*', italic_callback, text)
 

	
 
        # In very clear cases, let newlines become <br /> tags.
 
        def newline_callback(matchobj):
 
            if len(matchobj.group(1)) == 1:
 
                return matchobj.group(0).rstrip() + '  \n'
 
            else:
 
                return matchobj.group(0)
 
        pattern = re.compile(r'^[\w\<][^\n]*(\n+)', re.MULTILINE)
 
        text = re.sub(pattern, newline_callback, text)
 

	
 
        # Insert pre block extractions.
 
        def pre_insert_callback(matchobj):
 
            return '\n\n' + extractions[matchobj.group(1)]
 
        text = re.sub(r'{gfm-extraction-([0-9a-f]{32})\}',
 
                      pre_insert_callback, text)
 

	
 
        return text
 

	
 
    @classmethod
 
    def render(cls, source, filename=None):
 
        """
 
        Renders a given filename using detected renderer
 
        it detects renderers based on file extension or mimetype.
 
        At last it will just do a simple html replacing new lines with <br/>
 

	
 
        >>> MarkupRenderer.render('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''', '.md')
 
        u'<p><img id="a" src="http://example.com/test.jpg" style="color: red;"></p>'
 
        '<p><img id="a" src="http://example.com/test.jpg" style="color: red;"></p>'
 
        >>> MarkupRenderer.render('''<img class="c d" src="file://localhost/test.jpg">''', 'b.mkd')
 
        u'<p><img class="c d"></p>'
 
        '<p><img class="c d"></p>'
 
        >>> MarkupRenderer.render('''<a href="foo">foo</a>''', 'c.mkdn')
 
        u'<p><a href="foo">foo</a></p>'
 
        '<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.render('''<script>alert(1)</script>''', 'd.mdown')
 
        u'&lt;script&gt;alert(1)&lt;/script&gt;'
 
        '&lt;script&gt;alert(1)&lt;/script&gt;'
 
        >>> MarkupRenderer.render('''<div onclick="alert(2)">yo</div>''', 'markdown')
 
        u'<div>yo</div>'
 
        '<div>yo</div>'
 
        >>> MarkupRenderer.render('''<a href="javascript:alert(3)">yo</a>''', 'md')
 
        u'<p><a>yo</a></p>'
 
        '<p><a>yo</a></p>'
 
        """
 

	
 
        renderer = cls._detect_renderer(source, filename)
 
        readme_data = renderer(source)
 
        # Allow most HTML, while preventing XSS issues:
 
        # no <script> tags, no onclick attributes, no javascript
 
        # "protocol", and also limit styling to prevent defacing.
 
        return bleach.clean(readme_data,
 
            tags=['a', 'abbr', 'b', 'blockquote', 'br', 'code', 'dd',
 
                  'div', 'dl', 'dt', 'em', 'h1', 'h2', 'h3', 'h4', 'h5',
 
                  'h6', 'hr', 'i', 'img', 'li', 'ol', 'p', 'pre', 'span',
 
                  'strong', 'sub', 'sup', 'table', 'tbody', 'td', 'th',
 
                  'thead', 'tr', 'ul'],
 
            attributes=['class', 'id', 'style', 'label', 'title', 'alt', 'href', 'src'],
 
            styles=['color'],
 
            protocols=['http', 'https', 'mailto'],
 
            )
 

	
 
    @classmethod
 
    def plain(cls, source, universal_newline=True):
 
        source = safe_str(source)
 
        if universal_newline:
 
            newline = '\n'
 
            source = newline.join(source.splitlines())
 

	
 
        def url_func(match_obj):
 
            url_full = match_obj.group(0)
 
            return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
 
        source = url_re.sub(url_func, source)
 
        return '<br />' + source.replace("\n", '<br />')
 

	
 
    @classmethod
 
    def markdown(cls, source, safe=True, flavored=False):
 
        """
 
        Convert Markdown (possibly GitHub Flavored) to INSECURE HTML, possibly
 
        with "safe" fall-back to plaintext. Output from this method should be sanitized before use.
 

	
 
        >>> MarkupRenderer.markdown('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''')
 
        u'<p><img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg"></p>'
 
        '<p><img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<img class="c d" src="file://localhost/test.jpg">''')
 
        u'<p><img class="c d" src="file://localhost/test.jpg"></p>'
 
        '<p><img class="c d" src="file://localhost/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<a href="foo">foo</a>''')
 
        u'<p><a href="foo">foo</a></p>'
 
        '<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.markdown('''<script>alert(1)</script>''')
 
        u'<script>alert(1)</script>'
 
        '<script>alert(1)</script>'
 
        >>> MarkupRenderer.markdown('''<div onclick="alert(2)">yo</div>''')
 
        u'<div onclick="alert(2)">yo</div>'
 
        '<div onclick="alert(2)">yo</div>'
 
        >>> MarkupRenderer.markdown('''<a href="javascript:alert(3)">yo</a>''')
 
        u'<p><a href="javascript:alert(3)">yo</a></p>'
 
        '<p><a href="javascript:alert(3)">yo</a></p>'
 
        >>> MarkupRenderer.markdown('''## Foo''')
 
        u'<h2>Foo</h2>'
 
        >>> print MarkupRenderer.markdown('''
 
        '<h2>Foo</h2>'
 
        >>> print(MarkupRenderer.markdown('''
 
        ...     #!/bin/bash
 
        ...     echo "hello"
 
        ... ''')
 
        ... '''))
 
        <table class="code-highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
 
        2</pre></div></td><td class="code"><div class="code-highlight"><pre><span></span><span class="ch">#!/bin/bash</span>
 
        <span class="nb">echo</span> <span class="s2">&quot;hello&quot;</span>
 
        </pre></div>
 
        </td></tr></table>
 
        """
 
        source = safe_str(source)
 
        try:
 
            if flavored:
 
                source = cls._flavored_markdown(source)
 
            return markdown_mod.markdown(
 
                source,
 
                extensions=['markdown.extensions.codehilite', 'markdown.extensions.extra'],
 
                extension_configs={'markdown.extensions.codehilite': {'css_class': 'code-highlight'}})
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst(cls, source, safe=True):
 
        source = safe_str(source)
 
        try:
 
            from docutils.core import publish_parts
 
            from docutils.parsers.rst import directives
 
            docutils_settings = dict([(alias, None) for alias in
 
                                cls.RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES])
 

	
 
            docutils_settings.update({'input_encoding': 'unicode',
 
                                      'report_level': 4})
 

	
 
            for k, v in docutils_settings.items():
 
                directives.register_directive(k, v)
 

	
 
            parts = publish_parts(source=source,
 
                                  writer_name="html4css1",
 
                                  settings_overrides=docutils_settings)
 

	
 
            return parts['html_title'] + parts["fragment"]
 
        except ImportError:
 
            log.warning('Install docutils to use this function')
 
            return cls.plain(source)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst_with_mentions(cls, source):
 

	
 
        def wrapp(match_obj):
 
            uname = match_obj.groups()[0]
 
            return r'\ **@%(uname)s**\ ' % {'uname': uname}
 
        mention_hl = MENTIONS_REGEX.sub(wrapp, source).strip()
 
        return cls.rst(mention_hl)
kallithea/lib/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    kallithea.lib.ssh
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    :created_on: Dec 10, 2012
 
    :author: ir4y
 
    :copyright: (C) 2012 Ilya Beda <ir4y.ix@gmail.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import base64
 
import logging
 
import re
 

	
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib.utils2 import ascii_bytes, ascii_str
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SshKeyParseError(Exception):
 
    """Exception raised by parse_pub_key"""
 

	
 

	
 
def parse_pub_key(ssh_key):
 
    r"""Parse SSH public key string, raise SshKeyParseError or return decoded keytype, data and comment
 

	
 
    >>> getfixture('doctest_mock_ugettext')
 
    >>> parse_pub_key('')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: SSH key is missing
 
    kallithea.lib.ssh.SshKeyParseError: SSH key is missing
 
    >>> parse_pub_key('''AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - it must have both a key type and a base64 part, like 'ssh-rsa ASRNeaZu4FA...xlJp='
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - it must have both a key type and a base64 part, like 'ssh-rsa ASRNeaZu4FA...xlJp='
 
    >>> parse_pub_key('''abc AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - failed to decode base64 part 'AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ'
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - failed to decode base64 part 'AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB2NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - base64 part is not 'ssh-rsa' as claimed but 'csh-rsa'
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - base64 part is not 'ssh-rsa' as claimed but 'csh-rsa'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - unexpected characters in base64 part "AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ"
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - unexpected characters in base64 part "AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ"
 
    >>> parse_pub_key(''' ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ== and a comment
 
    ... ''')
 
    ('ssh-rsa', '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x0bThis is fake!', 'and a comment\n')
 
    ('ssh-rsa', b'\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x0bThis is fake!', 'and a comment\n')
 
    >>> parse_pub_key('''ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIP1NA2kBQIKe74afUXmIWD9ByDYQJqUwW44Y4gJOBRuo''')
 
    ('ssh-ed25519', '\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 \xfdM\x03i\x01@\x82\x9e\xef\x86\x9fQy\x88X?A\xc86\x10&\xa50[\x8e\x18\xe2\x02N\x05\x1b\xa8', '')
 
    ('ssh-ed25519', b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 \xfdM\x03i\x01@\x82\x9e\xef\x86\x9fQy\x88X?A\xc86\x10&\xa50[\x8e\x18\xe2\x02N\x05\x1b\xa8', '')
 
    """
 
    if not ssh_key:
 
        raise SshKeyParseError(_("SSH key is missing"))
 

	
 
    parts = ssh_key.split(None, 2)
 
    if len(parts) < 2:
 
        raise SshKeyParseError(_("Incorrect SSH key - it must have both a key type and a base64 part, like 'ssh-rsa ASRNeaZu4FA...xlJp='"))
 

	
 
    keytype, keyvalue, comment = (parts + [''])[:3]
 
    if keytype not in ('ssh-rsa', 'ssh-dss', 'ssh-ed25519'):
 
        raise SshKeyParseError(_("Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'"))
 

	
 
    if re.search(r'[^a-zA-Z0-9+/=]', keyvalue):
 
        raise SshKeyParseError(_("Incorrect SSH key - unexpected characters in base64 part %r") % keyvalue)
 

	
 
    try:
 
        key_bytes = base64.b64decode(keyvalue)
 
    except base64.binascii.Error:
 
        raise SshKeyParseError(_("Incorrect SSH key - failed to decode base64 part %r") % keyvalue)
 

	
 
    if not key_bytes.startswith(b'\x00\x00\x00%c%s\x00' % (len(keytype), ascii_bytes(keytype))):
 
        raise SshKeyParseError(_("Incorrect SSH key - base64 part is not %r as claimed but %r") % (keytype, ascii_str(key_bytes[4:].split(b'\0', 1)[0])))
 

	
 
    return keytype, key_bytes, comment
 

	
 

	
 
SSH_OPTIONS = 'no-pty,no-port-forwarding,no-X11-forwarding,no-agent-forwarding'
 

	
 

	
 
def _safe_check(s, rec = re.compile('^[a-zA-Z0-9+/]+={0,2}$')):
 
    """Return true if s really has the right content for base64 encoding and only contains safe characters
 
    >>> _safe_check('asdf')
 
    True
 
    >>> _safe_check('as df')
 
    False
 
    >>> _safe_check('AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==')
 
    True
 
    """
 
    return rec.match(s) is not None
 

	
 

	
 
def authorized_keys_line(kallithea_cli_path, config_file, key):
 
    """
 
    Return a line as it would appear in .authorized_keys
 

	
 
    >>> from kallithea.model.db import UserSshKeys, User
 
    >>> user = User(user_id=7, username='uu')
 
    >>> key = UserSshKeys(user_ssh_key_id=17, user=user, description='test key')
 
    >>> key.public_key='''ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ== and a comment'''
 
    >>> authorized_keys_line('/srv/kallithea/venv/bin/kallithea-cli', '/srv/kallithea/my.ini', key)
 
    'no-pty,no-port-forwarding,no-X11-forwarding,no-agent-forwarding,command="/srv/kallithea/venv/bin/kallithea-cli ssh-serve -c /srv/kallithea/my.ini 7 17" ssh-rsa AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==\\n'
 
    """
 
    try:
 
        keytype, key_bytes, comment = parse_pub_key(key.public_key)
 
    except SshKeyParseError:
 
        return '# Invalid Kallithea SSH key: %s %s\n' % (key.user.user_id, key.user_ssh_key_id)
 
    base64_key = ascii_str(base64.b64encode(key_bytes))
 
    assert '\n' not in base64_key
 
    if not _safe_check(base64_key):
 
        return '# Invalid Kallithea SSH key - bad base64 encoding: %s %s\n' % (key.user.user_id, key.user_ssh_key_id)
 
    return '%s,command="%s ssh-serve -c %s %s %s" %s %s\n' % (
 
        SSH_OPTIONS, kallithea_cli_path, config_file,
 
        key.user.user_id, key.user_ssh_key_id,
 
        keytype, base64_key)
kallithea/lib/vcs/backends/git/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import os
 

	
 
from kallithea.lib.hooks import log_pull_action
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitSshHandler(BaseSshHandler):
 
    vcs_type = 'git'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> GitSshHandler.make(shlex.split("git-upload-pack '/foo bar'")).repo_name
 
        u'foo bar'
 
        'foo bar'
 
        >>> GitSshHandler.make(shlex.split("git-upload-pack '/foo bar'")).verb
 
        'git-upload-pack'
 
        >>> GitSshHandler.make(shlex.split(" git-upload-pack /blåbærgrød ")).repo_name # might not be necessary to support no quoting ... but we can
 
        u'bl\xe5b\xe6rgr\xf8d'
 
        'bl\xe5b\xe6rgr\xf8d'
 
        >>> GitSshHandler.make(shlex.split('''git-upload-pack "/foo'bar"''')).repo_name
 
        u"foo'bar"
 
        "foo'bar"
 
        >>> GitSshHandler.make(shlex.split("git-receive-pack '/foo'")).repo_name
 
        u'foo'
 
        'foo'
 
        >>> GitSshHandler.make(shlex.split("git-receive-pack '/foo'")).verb
 
        'git-receive-pack'
 

	
 
        >>> GitSshHandler.make(shlex.split("/bin/git-upload-pack '/foo'")) # ssh-serve will report 'SSH command %r is not supported'
 
        >>> GitSshHandler.make(shlex.split('''git-upload-pack /foo bar''')) # ssh-serve will report 'SSH command %r is not supported'
 
        >>> shlex.split("git-upload-pack '/foo'bar' x") # ssh-serve will report: Error parsing SSH command "...": No closing quotation
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> GitSshHandler.make(shlex.split('hg -R foo serve --stdio')) # not handled here
 
        """
 
        if (len(ssh_command_parts) == 2 and
 
            ssh_command_parts[0] in ['git-upload-pack', 'git-receive-pack'] and
 
            ssh_command_parts[1].startswith('/')
 
        ):
 
            return cls(ssh_command_parts[1][1:], ssh_command_parts[0])
 

	
 
        return None
 

	
 
    def __init__(self, repo_name, verb):
 
        BaseSshHandler.__init__(self, repo_name)
 
        self.verb = verb
 

	
 
    def _serve(self):
 
        if self.verb == 'git-upload-pack': # action 'pull'
 
            # base class called set_hook_environment - action is hardcoded to 'pull'
 
            log_pull_action(ui=make_ui(), repo=self.db_repo.scm_instance._repo)
 
        else: # probably verb 'git-receive-pack', action 'push'
 
            if not self.allow_push:
 
                self.exit('Push access to %r denied' % self.repo_name)
 
            # Note: push logging is handled by Git post-receive hook
 

	
 
        # git shell is not a real shell but use shell inspired quoting *inside* the argument.
 
        # Per https://github.com/git/git/blob/v2.22.0/quote.c#L12 :
 
        # The path must be "'" quoted, but "'" and "!" must exit the quoting and be "\" escaped
 
        quoted_abspath = "'%s'" % self.db_repo.repo_full_path.replace("'", r"'\''").replace("!", r"'\!'")
 
        newcmd = ['git', 'shell', '-c', "%s %s" % (self.verb, quoted_abspath)]
 
        log.debug('Serving: %s', newcmd)
 
        os.execvp(newcmd[0], newcmd)
 
        self.exit("Failed to exec 'git' as %s" % newcmd)
kallithea/lib/vcs/backends/hg/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 

	
 
import mercurial.hg
 
import mercurial.wireprotoserver
 

	
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 
from kallithea.lib.vcs.utils import safe_bytes
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialSshHandler(BaseSshHandler):
 
    vcs_type = 'hg'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> MercurialSshHandler.make(shlex.split('hg -R "foo bar" serve --stdio')).repo_name
 
        u'foo bar'
 
        'foo bar'
 
        >>> MercurialSshHandler.make(shlex.split(' hg -R blåbærgrød serve --stdio ')).repo_name
 
        u'bl\xe5b\xe6rgr\xf8d'
 
        'bl\xe5b\xe6rgr\xf8d'
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R 'foo"bar' serve --stdio''')).repo_name
 
        u'foo"bar'
 
        'foo"bar'
 

	
 
        >>> MercurialSshHandler.make(shlex.split('/bin/hg -R "foo" serve --stdio'))
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R "foo"bar" serve --stdio''')) # ssh-serve will report: Error parsing SSH command "...": invalid syntax
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> MercurialSshHandler.make(shlex.split('git-upload-pack "/foo"')) # not handled here
 
        """
 
        if ssh_command_parts[:2] == ['hg', '-R'] and ssh_command_parts[3:] == ['serve', '--stdio']:
 
            return cls(ssh_command_parts[2])
 

	
 
        return None
 

	
 
    def _serve(self):
 
        # Note: we want a repo with config based on .hg/hgrc and can thus not use self.db_repo.scm_instance._repo.ui
 
        baseui = make_ui(repo_path=self.db_repo.repo_full_path)
 
        if not self.allow_push:
 
            baseui.setconfig(b'hooks', b'pretxnopen._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 
            baseui.setconfig(b'hooks', b'prepushkey._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 

	
 
        repo = mercurial.hg.repository(baseui, safe_bytes(self.db_repo.repo_full_path))
 
        log.debug("Starting Mercurial sshserver for %s", self.db_repo.repo_full_path)
 
        mercurial.wireprotoserver.sshserver(baseui, repo).serve_forever()
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
@@ -38,183 +38,183 @@ def get_scm(path, search_up=False, expli
 
        if found_scms or not search_up:
 
            break
 
        newpath = abspath(path, '..')
 
        if newpath == path:
 
            break
 
        path = newpath
 

	
 
    if len(found_scms) > 1:
 
        for scm in found_scms:
 
            if scm[0] == explicit_alias:
 
                return scm
 
        raise VCSError('More than one [%s] scm found at given path %s'
 
                       % (', '.join((x[0] for x in found_scms)), path))
 

	
 
    if len(found_scms) == 0:
 
        raise VCSError('No scm found at given path %s' % path)
 

	
 
    return found_scms[0]
 

	
 

	
 
def get_scms_for_path(path):
 
    """
 
    Returns all scm's found at the given path. If no scm is recognized
 
    - empty list is returned.
 

	
 
    :param path: path to directory which should be checked. May be callable.
 

	
 
    :raises VCSError: if given ``path`` is not a directory
 
    """
 
    from kallithea.lib.vcs.backends import get_backend
 
    if hasattr(path, '__call__'):
 
        path = path()
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %r is not a directory" % path)
 

	
 
    result = []
 
    for key in ALIASES:
 
        # find .hg / .git
 
        dirname = os.path.join(path, '.' + key)
 
        if os.path.isdir(dirname):
 
            result.append(key)
 
            continue
 
        # find rm__.hg / rm__.git too - left overs from old method for deleting
 
        dirname = os.path.join(path, 'rm__.' + key)
 
        if os.path.isdir(dirname):
 
            return result
 
        # We still need to check if it's not bare repository as
 
        # bare repos don't have working directories
 
        try:
 
            get_backend(key)(path)
 
            result.append(key)
 
            continue
 
        except RepositoryError:
 
            # Wrong backend
 
            pass
 
        except VCSError:
 
            # No backend at all
 
            pass
 
    return result
 

	
 

	
 
def get_highlighted_code(name, code, type='terminal'):
 
    """
 
    If pygments are available on the system
 
    then returned output is colored. Otherwise
 
    unchanged content is returned.
 
    """
 
    import logging
 
    try:
 
        import pygments
 
        pygments
 
    except ImportError:
 
        return code
 
    from pygments import highlight
 
    from pygments.lexers import guess_lexer_for_filename, ClassNotFound
 
    from pygments.formatters import TerminalFormatter
 

	
 
    try:
 
        lexer = guess_lexer_for_filename(name, code)
 
        formatter = TerminalFormatter()
 
        content = highlight(code, lexer, formatter)
 
    except ClassNotFound:
 
        logging.debug("Couldn't guess Lexer, will not use pygments.")
 
        content = code
 
    return content
 

	
 

	
 
def parse_changesets(text):
 
    """
 
    Returns dictionary with *start*, *main* and *end* ids.
 

	
 
    Examples::
 

	
 
        >>> parse_changesets('aaabbb')
 
        {'start': None, 'main': 'aaabbb', 'end': None}
 
        >>> parse_changesets('aaabbb..cccddd')
 
        {'start': 'aaabbb', 'main': None, 'end': 'cccddd'}
 
        {'start': 'aaabbb', 'end': 'cccddd', 'main': None}
 

	
 
    """
 
    text = text.strip()
 
    CID_RE = r'[a-zA-Z0-9]+'
 
    if '..' not in text:
 
        m = re.match(r'^(?P<cid>%s)$' % CID_RE, text)
 
        if m:
 
            return {
 
                'start': None,
 
                'main': text,
 
                'end': None,
 
            }
 
    else:
 
        RE = r'^(?P<start>%s)?\.{2,3}(?P<end>%s)?$' % (CID_RE, CID_RE)
 
        m = re.match(RE, text)
 
        if m:
 
            result = m.groupdict()
 
            result['main'] = None
 
            return result
 
    raise ValueError("IDs not recognized")
 

	
 

	
 
def parse_datetime(text):
 
    """
 
    Parses given text and returns ``datetime.datetime`` instance or raises
 
    ``ValueError``.
 

	
 
    :param text: string of desired date/datetime or something more verbose,
 
      like *yesterday*, *2weeks 3days*, etc.
 
    """
 

	
 
    text = text.strip().lower()
 

	
 
    INPUT_FORMATS = (
 
        '%Y-%m-%d %H:%M:%S',
 
        '%Y-%m-%d %H:%M',
 
        '%Y-%m-%d',
 
        '%m/%d/%Y %H:%M:%S',
 
        '%m/%d/%Y %H:%M',
 
        '%m/%d/%Y',
 
        '%m/%d/%y %H:%M:%S',
 
        '%m/%d/%y %H:%M',
 
        '%m/%d/%y',
 
    )
 
    for format in INPUT_FORMATS:
 
        try:
 
            return datetime.datetime(*time.strptime(text, format)[:6])
 
        except ValueError:
 
            pass
 

	
 
    # Try descriptive texts
 
    if text == 'tomorrow':
 
        future = datetime.datetime.now() + datetime.timedelta(days=1)
 
        args = future.timetuple()[:3] + (23, 59, 59)
 
        return datetime.datetime(*args)
 
    elif text == 'today':
 
        return datetime.datetime(*datetime.datetime.today().timetuple()[:3])
 
    elif text == 'now':
 
        return datetime.datetime.now()
 
    elif text == 'yesterday':
 
        past = datetime.datetime.now() - datetime.timedelta(days=1)
 
        return datetime.datetime(*past.timetuple()[:3])
 
    else:
 
        days = 0
 
        matched = re.match(
 
            r'^((?P<weeks>\d+) ?w(eeks?)?)? ?((?P<days>\d+) ?d(ays?)?)?$', text)
 
        if matched:
 
            groupdict = matched.groupdict()
 
            if groupdict['days']:
 
                days += int(matched.groupdict()['days'])
 
            if groupdict['weeks']:
 
                days += int(matched.groupdict()['weeks']) * 7
 
            past = datetime.datetime.now() - datetime.timedelta(days=days)
 
            return datetime.datetime(*past.timetuple()[:3])
 

	
 
    raise ValueError('Wrong date: "%s"' % text)
 

	
 

	
 
def get_dict_for_attrs(obj, attrs):
 
    """
 
    Returns dictionary for each attribute from given ``obj``.
 
    """
 
    data = {}
 
    for attr in attrs:
 
        data[attr] = getattr(obj, attr)
 
    return data
0 comments (0 inline, 0 general)