Changeset - 141066b8a89a
[Not reviewed]
default
0 6 0
Mads Kiilerich (mads) - 6 years ago 2020-01-25 20:06:36
mads@kiilerich.com
Grafted from: de3c11c9e9a7
tests: minor doctest updates for py3
6 files changed with 32 insertions and 32 deletions:
0 comments (0 inline, 0 general)
kallithea/lib/inifile.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
"""
 
kallithea.lib.inifile
 
~~~~~~~~~~~~~~~~~~~~~
 

	
 
Handling of .ini files, mainly creating them from Mako templates and adding
 
other custom values.
 
"""
 

	
 
import logging
 
import os
 
import re
 

	
 
import mako.template
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
template_file = os.path.join(
 
    os.path.dirname(os.path.dirname(os.path.dirname(__file__))),
 
    'kallithea/lib/paster_commands/template.ini.mako')
 

	
 
default_variables = {
 
    'database_engine': 'sqlite',
 
    'http_server': 'waitress',
 
    'host': '127.0.0.1',
 
    'port': '5000',
 
    'uuid': lambda: 'VERY-SECRET',
 
}
 

	
 

	
 
def expand(template, mako_variable_values, settings):
 
    """Expand mako template and tweak it.
 
    Not entirely stable for random templates as input, but good enough for our
 
    single template.
 

	
 
    >>> template = '''
 
    ... [first-section]
 
    ...
 
    ... variable=${mako_variable}
 
    ... variable2  =\tvalue after tab
 
    ... ## This section had some whitespace and stuff
 
    ...
 
    ...
 
    ... # ${mako_function()}
 
    ... [second-section]
 
    ... %if conditional_options == 'option-a':
 
    ... # option a was chosen
 
    ... %elif conditional_options == 'option-b':
 
    ... some_variable = "never mind - option-b will not be used anyway ..."
 
    ... %endif
 
    ... '''
 
    >>> selected_mako_conditionals = []
 
    >>> mako_variable_values = {'mako_variable': 'VALUE', 'mako_function': (lambda: 'FUNCTION RESULT'),
 
    ...                         'conditional_options': 'option-a'}
 
    >>> settings = { # only partially used
 
    ...     '[first-section]': {'variable2': 'VAL2', 'first_extra': 'EXTRA'},
 
    ...     '[third-section]': {'third_extra': ' 3'},
 
    ...     '[fourth-section]': {'fourth_extra': '4', 'fourth': '"four"'},
 
    ... }
 
    >>> print expand(template, mako_variable_values, settings)
 
    >>> print(expand(template, mako_variable_values, settings))
 
    <BLANKLINE>
 
    [first-section]
 
    <BLANKLINE>
 
    variable=VALUE
 
    #variable2  =    value after tab
 
    variable2 = VAL2
 
    <BLANKLINE>
 
    first_extra = EXTRA
 
    <BLANKLINE>
 
    <BLANKLINE>
 
    # FUNCTION RESULT
 
    [second-section]
 
    # option a was chosen
 
    <BLANKLINE>
 
    [fourth-section]
 
    fourth = "four"
 
    fourth_extra = 4
 
    <BLANKLINE>
 
    [third-section]
 
    third_extra =  3
 
    <BLANKLINE>
 
    """
 
    mako_variables = dict(default_variables)
 
    mako_variables.update(mako_variable_values or {})
 
    settings = dict((k, dict(v)) for k, v in settings.items()) # deep copy before mutating
 

	
 
    ini_lines = mako.template.Template(template).render(**mako_variables)
 

	
 
    def process_section(m):
 
        """process a ini section, replacing values as necessary"""
 
        sectionname, lines = m.groups()
 
        if sectionname in settings:
 
            section_settings = settings.pop(sectionname)
 

	
 
            def process_line(m):
 
                """process a section line and update value if necessary"""
 
                key, value = m.groups()
 
                line = m.group(0)
 
                if key in section_settings:
 
                    new_line = '%s = %s' % (key, section_settings.pop(key))
 
                    if new_line != line:
 
                        # keep old entry as example - comments might refer to it
 
                        line = '#%s\n%s' % (line, new_line)
 
                return line.rstrip()
 

	
 
            # process lines that not are comments or empty and look like name=value
 
            lines = re.sub(r'^([^#\n\s]*)[ \t]*=[ \t]*(.*)$', process_line, lines, flags=re.MULTILINE)
 
            # add unused section settings
 
            if section_settings:
 
                lines += '\n' + ''.join('%s = %s\n' % (key, value) for key, value in sorted(section_settings.items()))
 

	
 
        return sectionname + '\n' + lines
 

	
 
    # process sections until comments before next section or end
 
    ini_lines = re.sub(r'''^
 
        (\[.*\])\n
 
        # after the section name, a number of chunks with:
 
        (
 
            (?:
 
                # a number of comments or empty lines
 
                (?:[#].*\n|\n)*
 
                # one or more non-empty non-comments non-section-start lines
 
                (?:[^\n#[].*\n)+
 
                # a number of comments - not empty lines
 
                (?:[#].*\n)*
 
            )*
 
        )
 
        ''',
 
        process_section, ini_lines, flags=re.MULTILINE | re.VERBOSE) \
 
        + \
 
        ''.join(
 
            '\n' + sectionname + '\n' + ''.join('%s = %s\n' % (key, value) for key, value in sorted(section_settings.items()))
 
            for sectionname, section_settings in sorted(settings.items())
 
            if section_settings)
 

	
 
    return ini_lines
 

	
 

	
 
def create(dest_file, mako_variable_values, settings):
 
    """Create an ini file at dest_file"""
 
    with open(template_file, 'rb') as f:
 
        template = f.read().decode('utf-8')
 

	
 
    ini_lines = expand(template, mako_variable_values, settings)
 

	
 
    with open(dest_file, 'wb') as f:
 
        f.write(ini_lines.encode('utf-8'))
kallithea/lib/markup_renderer.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 
"""
 
kallithea.lib.markup_renderer
 
~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
Renderer for markup languages with ability to parse using rst or markdown
 

	
 
This file was forked by the Kallithea project in July 2014.
 
Original author and date, and relevant copyright and licensing information is below:
 
:created_on: Oct 27, 2011
 
:author: marcink
 
:copyright: (c) 2013 RhodeCode GmbH, and others.
 
:license: GPLv3, see LICENSE.md for more details.
 
"""
 

	
 

	
 
import logging
 
import re
 
import traceback
 

	
 
import bleach
 
import markdown as markdown_mod
 

	
 
from kallithea.lib.utils2 import MENTIONS_REGEX, safe_str
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
url_re = re.compile(r'''\bhttps?://(?:[\da-zA-Z0-9@:.-]+)'''
 
                    r'''(?:[/a-zA-Z0-9_=@#~&+%.,:;?!*()-]*[/a-zA-Z0-9_=@#~])?''')
 

	
 

	
 
class MarkupRenderer(object):
 
    RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES = ['include', 'meta', 'raw']
 

	
 
    MARKDOWN_PAT = re.compile(r'md|mkdn?|mdown|markdown', re.IGNORECASE)
 
    RST_PAT = re.compile(r're?st', re.IGNORECASE)
 
    PLAIN_PAT = re.compile(r'readme', re.IGNORECASE)
 

	
 
    @classmethod
 
    def _detect_renderer(cls, source, filename):
 
        """
 
        runs detection of what renderer should be used for generating html
 
        from a markup language
 

	
 
        filename can be also explicitly a renderer name
 
        """
 

	
 
        if cls.MARKDOWN_PAT.findall(filename):
 
            return cls.markdown
 
        elif cls.RST_PAT.findall(filename):
 
            return cls.rst
 
        elif cls.PLAIN_PAT.findall(filename):
 
            return cls.rst
 
        return cls.plain
 

	
 
    @classmethod
 
    def _flavored_markdown(cls, text):
 
        """
 
        Github style flavored markdown
 

	
 
        :param text:
 
        """
 
        from hashlib import md5
 

	
 
        # Extract pre blocks.
 
        extractions = {}
 

	
 
        def pre_extraction_callback(matchobj):
 
            digest = md5(matchobj.group(0)).hexdigest()
 
            extractions[digest] = matchobj.group(0)
 
            return "{gfm-extraction-%s}" % digest
 
        pattern = re.compile(r'<pre>.*?</pre>', re.MULTILINE | re.DOTALL)
 
        text = re.sub(pattern, pre_extraction_callback, text)
 

	
 
        # Prevent foo_bar_baz from ending up with an italic word in the middle.
 
        def italic_callback(matchobj):
 
            s = matchobj.group(0)
 
            if list(s).count('_') >= 2:
 
                return s.replace('_', r'\_')
 
            return s
 
        text = re.sub(r'^(?! {4}|\t)\w+_\w+_\w[\w_]*', italic_callback, text)
 

	
 
        # In very clear cases, let newlines become <br /> tags.
 
        def newline_callback(matchobj):
 
            if len(matchobj.group(1)) == 1:
 
                return matchobj.group(0).rstrip() + '  \n'
 
            else:
 
                return matchobj.group(0)
 
        pattern = re.compile(r'^[\w\<][^\n]*(\n+)', re.MULTILINE)
 
        text = re.sub(pattern, newline_callback, text)
 

	
 
        # Insert pre block extractions.
 
        def pre_insert_callback(matchobj):
 
            return '\n\n' + extractions[matchobj.group(1)]
 
        text = re.sub(r'{gfm-extraction-([0-9a-f]{32})\}',
 
                      pre_insert_callback, text)
 

	
 
        return text
 

	
 
    @classmethod
 
    def render(cls, source, filename=None):
 
        """
 
        Renders a given filename using detected renderer
 
        it detects renderers based on file extension or mimetype.
 
        At last it will just do a simple html replacing new lines with <br/>
 

	
 
        >>> MarkupRenderer.render('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''', '.md')
 
        u'<p><img id="a" src="http://example.com/test.jpg" style="color: red;"></p>'
 
        '<p><img id="a" src="http://example.com/test.jpg" style="color: red;"></p>'
 
        >>> MarkupRenderer.render('''<img class="c d" src="file://localhost/test.jpg">''', 'b.mkd')
 
        u'<p><img class="c d"></p>'
 
        '<p><img class="c d"></p>'
 
        >>> MarkupRenderer.render('''<a href="foo">foo</a>''', 'c.mkdn')
 
        u'<p><a href="foo">foo</a></p>'
 
        '<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.render('''<script>alert(1)</script>''', 'd.mdown')
 
        u'&lt;script&gt;alert(1)&lt;/script&gt;'
 
        '&lt;script&gt;alert(1)&lt;/script&gt;'
 
        >>> MarkupRenderer.render('''<div onclick="alert(2)">yo</div>''', 'markdown')
 
        u'<div>yo</div>'
 
        '<div>yo</div>'
 
        >>> MarkupRenderer.render('''<a href="javascript:alert(3)">yo</a>''', 'md')
 
        u'<p><a>yo</a></p>'
 
        '<p><a>yo</a></p>'
 
        """
 

	
 
        renderer = cls._detect_renderer(source, filename)
 
        readme_data = renderer(source)
 
        # Allow most HTML, while preventing XSS issues:
 
        # no <script> tags, no onclick attributes, no javascript
 
        # "protocol", and also limit styling to prevent defacing.
 
        return bleach.clean(readme_data,
 
            tags=['a', 'abbr', 'b', 'blockquote', 'br', 'code', 'dd',
 
                  'div', 'dl', 'dt', 'em', 'h1', 'h2', 'h3', 'h4', 'h5',
 
                  'h6', 'hr', 'i', 'img', 'li', 'ol', 'p', 'pre', 'span',
 
                  'strong', 'sub', 'sup', 'table', 'tbody', 'td', 'th',
 
                  'thead', 'tr', 'ul'],
 
            attributes=['class', 'id', 'style', 'label', 'title', 'alt', 'href', 'src'],
 
            styles=['color'],
 
            protocols=['http', 'https', 'mailto'],
 
            )
 

	
 
    @classmethod
 
    def plain(cls, source, universal_newline=True):
 
        source = safe_str(source)
 
        if universal_newline:
 
            newline = '\n'
 
            source = newline.join(source.splitlines())
 

	
 
        def url_func(match_obj):
 
            url_full = match_obj.group(0)
 
            return '<a href="%(url)s">%(url)s</a>' % ({'url': url_full})
 
        source = url_re.sub(url_func, source)
 
        return '<br />' + source.replace("\n", '<br />')
 

	
 
    @classmethod
 
    def markdown(cls, source, safe=True, flavored=False):
 
        """
 
        Convert Markdown (possibly GitHub Flavored) to INSECURE HTML, possibly
 
        with "safe" fall-back to plaintext. Output from this method should be sanitized before use.
 

	
 
        >>> MarkupRenderer.markdown('''<img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg">''')
 
        u'<p><img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg"></p>'
 
        '<p><img id="a" style="margin-top:-1000px;color:red" src="http://example.com/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<img class="c d" src="file://localhost/test.jpg">''')
 
        u'<p><img class="c d" src="file://localhost/test.jpg"></p>'
 
        '<p><img class="c d" src="file://localhost/test.jpg"></p>'
 
        >>> MarkupRenderer.markdown('''<a href="foo">foo</a>''')
 
        u'<p><a href="foo">foo</a></p>'
 
        '<p><a href="foo">foo</a></p>'
 
        >>> MarkupRenderer.markdown('''<script>alert(1)</script>''')
 
        u'<script>alert(1)</script>'
 
        '<script>alert(1)</script>'
 
        >>> MarkupRenderer.markdown('''<div onclick="alert(2)">yo</div>''')
 
        u'<div onclick="alert(2)">yo</div>'
 
        '<div onclick="alert(2)">yo</div>'
 
        >>> MarkupRenderer.markdown('''<a href="javascript:alert(3)">yo</a>''')
 
        u'<p><a href="javascript:alert(3)">yo</a></p>'
 
        '<p><a href="javascript:alert(3)">yo</a></p>'
 
        >>> MarkupRenderer.markdown('''## Foo''')
 
        u'<h2>Foo</h2>'
 
        >>> print MarkupRenderer.markdown('''
 
        '<h2>Foo</h2>'
 
        >>> print(MarkupRenderer.markdown('''
 
        ...     #!/bin/bash
 
        ...     echo "hello"
 
        ... ''')
 
        ... '''))
 
        <table class="code-highlighttable"><tr><td class="linenos"><div class="linenodiv"><pre>1
 
        2</pre></div></td><td class="code"><div class="code-highlight"><pre><span></span><span class="ch">#!/bin/bash</span>
 
        <span class="nb">echo</span> <span class="s2">&quot;hello&quot;</span>
 
        </pre></div>
 
        </td></tr></table>
 
        """
 
        source = safe_str(source)
 
        try:
 
            if flavored:
 
                source = cls._flavored_markdown(source)
 
            return markdown_mod.markdown(
 
                source,
 
                extensions=['markdown.extensions.codehilite', 'markdown.extensions.extra'],
 
                extension_configs={'markdown.extensions.codehilite': {'css_class': 'code-highlight'}})
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst(cls, source, safe=True):
 
        source = safe_str(source)
 
        try:
 
            from docutils.core import publish_parts
 
            from docutils.parsers.rst import directives
 
            docutils_settings = dict([(alias, None) for alias in
 
                                cls.RESTRUCTUREDTEXT_DISALLOWED_DIRECTIVES])
 

	
 
            docutils_settings.update({'input_encoding': 'unicode',
 
                                      'report_level': 4})
 

	
 
            for k, v in docutils_settings.items():
 
                directives.register_directive(k, v)
 

	
 
            parts = publish_parts(source=source,
 
                                  writer_name="html4css1",
 
                                  settings_overrides=docutils_settings)
 

	
 
            return parts['html_title'] + parts["fragment"]
 
        except ImportError:
 
            log.warning('Install docutils to use this function')
 
            return cls.plain(source)
 
        except Exception:
 
            log.error(traceback.format_exc())
 
            if safe:
 
                log.debug('Falling back to render in plain mode')
 
                return cls.plain(source)
 
            else:
 
                raise
 

	
 
    @classmethod
 
    def rst_with_mentions(cls, source):
 

	
 
        def wrapp(match_obj):
 
            uname = match_obj.groups()[0]
 
            return r'\ **@%(uname)s**\ ' % {'uname': uname}
 
        mention_hl = MENTIONS_REGEX.sub(wrapp, source).strip()
 
        return cls.rst(mention_hl)
kallithea/lib/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
"""
 
    kallithea.lib.ssh
 
    ~~~~~~~~~~~~~~~~~~~~~~~~~~~~~
 

	
 
    :created_on: Dec 10, 2012
 
    :author: ir4y
 
    :copyright: (C) 2012 Ilya Beda <ir4y.ix@gmail.com>
 
    :license: GPLv3, see COPYING for more details.
 
"""
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import base64
 
import logging
 
import re
 

	
 
from tg.i18n import ugettext as _
 

	
 
from kallithea.lib.utils2 import ascii_bytes, ascii_str
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class SshKeyParseError(Exception):
 
    """Exception raised by parse_pub_key"""
 

	
 

	
 
def parse_pub_key(ssh_key):
 
    r"""Parse SSH public key string, raise SshKeyParseError or return decoded keytype, data and comment
 

	
 
    >>> getfixture('doctest_mock_ugettext')
 
    >>> parse_pub_key('')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: SSH key is missing
 
    kallithea.lib.ssh.SshKeyParseError: SSH key is missing
 
    >>> parse_pub_key('''AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - it must have both a key type and a base64 part, like 'ssh-rsa ASRNeaZu4FA...xlJp='
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - it must have both a key type and a base64 part, like 'ssh-rsa ASRNeaZu4FA...xlJp='
 
    >>> parse_pub_key('''abc AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - failed to decode base64 part 'AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ'
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - failed to decode base64 part 'AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB2NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - base64 part is not 'ssh-rsa' as claimed but 'csh-rsa'
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - base64 part is not 'ssh-rsa' as claimed but 'csh-rsa'
 
    >>> parse_pub_key('''ssh-rsa  AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ''')
 
    Traceback (most recent call last):
 
    ...
 
    SshKeyParseError: Incorrect SSH key - unexpected characters in base64 part "AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ"
 
    kallithea.lib.ssh.SshKeyParseError: Incorrect SSH key - unexpected characters in base64 part "AAAAB3NzaC1yc2EAAAA'LVGhpcyBpcyBmYWtlIQ"
 
    >>> parse_pub_key(''' ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ== and a comment
 
    ... ''')
 
    ('ssh-rsa', '\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x0bThis is fake!', 'and a comment\n')
 
    ('ssh-rsa', b'\x00\x00\x00\x07ssh-rsa\x00\x00\x00\x0bThis is fake!', 'and a comment\n')
 
    >>> parse_pub_key('''ssh-ed25519 AAAAC3NzaC1lZDI1NTE5AAAAIP1NA2kBQIKe74afUXmIWD9ByDYQJqUwW44Y4gJOBRuo''')
 
    ('ssh-ed25519', '\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 \xfdM\x03i\x01@\x82\x9e\xef\x86\x9fQy\x88X?A\xc86\x10&\xa50[\x8e\x18\xe2\x02N\x05\x1b\xa8', '')
 
    ('ssh-ed25519', b'\x00\x00\x00\x0bssh-ed25519\x00\x00\x00 \xfdM\x03i\x01@\x82\x9e\xef\x86\x9fQy\x88X?A\xc86\x10&\xa50[\x8e\x18\xe2\x02N\x05\x1b\xa8', '')
 
    """
 
    if not ssh_key:
 
        raise SshKeyParseError(_("SSH key is missing"))
 

	
 
    parts = ssh_key.split(None, 2)
 
    if len(parts) < 2:
 
        raise SshKeyParseError(_("Incorrect SSH key - it must have both a key type and a base64 part, like 'ssh-rsa ASRNeaZu4FA...xlJp='"))
 

	
 
    keytype, keyvalue, comment = (parts + [''])[:3]
 
    if keytype not in ('ssh-rsa', 'ssh-dss', 'ssh-ed25519'):
 
        raise SshKeyParseError(_("Incorrect SSH key - it must start with 'ssh-(rsa|dss|ed25519)'"))
 

	
 
    if re.search(r'[^a-zA-Z0-9+/=]', keyvalue):
 
        raise SshKeyParseError(_("Incorrect SSH key - unexpected characters in base64 part %r") % keyvalue)
 

	
 
    try:
 
        key_bytes = base64.b64decode(keyvalue)
 
    except base64.binascii.Error:
 
        raise SshKeyParseError(_("Incorrect SSH key - failed to decode base64 part %r") % keyvalue)
 

	
 
    if not key_bytes.startswith(b'\x00\x00\x00%c%s\x00' % (len(keytype), ascii_bytes(keytype))):
 
        raise SshKeyParseError(_("Incorrect SSH key - base64 part is not %r as claimed but %r") % (keytype, ascii_str(key_bytes[4:].split(b'\0', 1)[0])))
 

	
 
    return keytype, key_bytes, comment
 

	
 

	
 
SSH_OPTIONS = 'no-pty,no-port-forwarding,no-X11-forwarding,no-agent-forwarding'
 

	
 

	
 
def _safe_check(s, rec = re.compile('^[a-zA-Z0-9+/]+={0,2}$')):
 
    """Return true if s really has the right content for base64 encoding and only contains safe characters
 
    >>> _safe_check('asdf')
 
    True
 
    >>> _safe_check('as df')
 
    False
 
    >>> _safe_check('AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==')
 
    True
 
    """
 
    return rec.match(s) is not None
 

	
 

	
 
def authorized_keys_line(kallithea_cli_path, config_file, key):
 
    """
 
    Return a line as it would appear in .authorized_keys
 

	
 
    >>> from kallithea.model.db import UserSshKeys, User
 
    >>> user = User(user_id=7, username='uu')
 
    >>> key = UserSshKeys(user_ssh_key_id=17, user=user, description='test key')
 
    >>> key.public_key='''ssh-rsa  AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ== and a comment'''
 
    >>> authorized_keys_line('/srv/kallithea/venv/bin/kallithea-cli', '/srv/kallithea/my.ini', key)
 
    'no-pty,no-port-forwarding,no-X11-forwarding,no-agent-forwarding,command="/srv/kallithea/venv/bin/kallithea-cli ssh-serve -c /srv/kallithea/my.ini 7 17" ssh-rsa AAAAB3NzaC1yc2EAAAALVGhpcyBpcyBmYWtlIQ==\\n'
 
    """
 
    try:
 
        keytype, key_bytes, comment = parse_pub_key(key.public_key)
 
    except SshKeyParseError:
 
        return '# Invalid Kallithea SSH key: %s %s\n' % (key.user.user_id, key.user_ssh_key_id)
 
    base64_key = ascii_str(base64.b64encode(key_bytes))
 
    assert '\n' not in base64_key
 
    if not _safe_check(base64_key):
 
        return '# Invalid Kallithea SSH key - bad base64 encoding: %s %s\n' % (key.user.user_id, key.user_ssh_key_id)
 
    return '%s,command="%s ssh-serve -c %s %s %s" %s %s\n' % (
 
        SSH_OPTIONS, kallithea_cli_path, config_file,
 
        key.user.user_id, key.user_ssh_key_id,
 
        keytype, base64_key)
kallithea/lib/vcs/backends/git/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 
import os
 

	
 
from kallithea.lib.hooks import log_pull_action
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class GitSshHandler(BaseSshHandler):
 
    vcs_type = 'git'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> GitSshHandler.make(shlex.split("git-upload-pack '/foo bar'")).repo_name
 
        u'foo bar'
 
        'foo bar'
 
        >>> GitSshHandler.make(shlex.split("git-upload-pack '/foo bar'")).verb
 
        'git-upload-pack'
 
        >>> GitSshHandler.make(shlex.split(" git-upload-pack /blåbærgrød ")).repo_name # might not be necessary to support no quoting ... but we can
 
        u'bl\xe5b\xe6rgr\xf8d'
 
        'bl\xe5b\xe6rgr\xf8d'
 
        >>> GitSshHandler.make(shlex.split('''git-upload-pack "/foo'bar"''')).repo_name
 
        u"foo'bar"
 
        "foo'bar"
 
        >>> GitSshHandler.make(shlex.split("git-receive-pack '/foo'")).repo_name
 
        u'foo'
 
        'foo'
 
        >>> GitSshHandler.make(shlex.split("git-receive-pack '/foo'")).verb
 
        'git-receive-pack'
 

	
 
        >>> GitSshHandler.make(shlex.split("/bin/git-upload-pack '/foo'")) # ssh-serve will report 'SSH command %r is not supported'
 
        >>> GitSshHandler.make(shlex.split('''git-upload-pack /foo bar''')) # ssh-serve will report 'SSH command %r is not supported'
 
        >>> shlex.split("git-upload-pack '/foo'bar' x") # ssh-serve will report: Error parsing SSH command "...": No closing quotation
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> GitSshHandler.make(shlex.split('hg -R foo serve --stdio')) # not handled here
 
        """
 
        if (len(ssh_command_parts) == 2 and
 
            ssh_command_parts[0] in ['git-upload-pack', 'git-receive-pack'] and
 
            ssh_command_parts[1].startswith('/')
 
        ):
 
            return cls(ssh_command_parts[1][1:], ssh_command_parts[0])
 

	
 
        return None
 

	
 
    def __init__(self, repo_name, verb):
 
        BaseSshHandler.__init__(self, repo_name)
 
        self.verb = verb
 

	
 
    def _serve(self):
 
        if self.verb == 'git-upload-pack': # action 'pull'
 
            # base class called set_hook_environment - action is hardcoded to 'pull'
 
            log_pull_action(ui=make_ui(), repo=self.db_repo.scm_instance._repo)
 
        else: # probably verb 'git-receive-pack', action 'push'
 
            if not self.allow_push:
 
                self.exit('Push access to %r denied' % self.repo_name)
 
            # Note: push logging is handled by Git post-receive hook
 

	
 
        # git shell is not a real shell but use shell inspired quoting *inside* the argument.
 
        # Per https://github.com/git/git/blob/v2.22.0/quote.c#L12 :
 
        # The path must be "'" quoted, but "'" and "!" must exit the quoting and be "\" escaped
 
        quoted_abspath = "'%s'" % self.db_repo.repo_full_path.replace("'", r"'\''").replace("!", r"'\!'")
 
        newcmd = ['git', 'shell', '-c', "%s %s" % (self.verb, quoted_abspath)]
 
        log.debug('Serving: %s', newcmd)
 
        os.execvp(newcmd[0], newcmd)
 
        self.exit("Failed to exec 'git' as %s" % newcmd)
kallithea/lib/vcs/backends/hg/ssh.py
Show inline comments
 
# -*- coding: utf-8 -*-
 
# This program is free software: you can redistribute it and/or modify
 
# it under the terms of the GNU General Public License as published by
 
# the Free Software Foundation, either version 3 of the License, or
 
# (at your option) any later version.
 
#
 
# This program is distributed in the hope that it will be useful,
 
# but WITHOUT ANY WARRANTY; without even the implied warranty of
 
# MERCHANTABILITY or FITNESS FOR A PARTICULAR PURPOSE.  See the
 
# GNU General Public License for more details.
 
#
 
# You should have received a copy of the GNU General Public License
 
# along with this program.  If not, see <http://www.gnu.org/licenses/>.
 

	
 
import logging
 

	
 
import mercurial.hg
 
import mercurial.wireprotoserver
 

	
 
from kallithea.lib.utils import make_ui
 
from kallithea.lib.vcs.backends.ssh import BaseSshHandler
 
from kallithea.lib.vcs.utils import safe_bytes
 

	
 

	
 
log = logging.getLogger(__name__)
 

	
 

	
 
class MercurialSshHandler(BaseSshHandler):
 
    vcs_type = 'hg'
 

	
 
    @classmethod
 
    def make(cls, ssh_command_parts):
 
        r"""
 
        >>> import shlex
 

	
 
        >>> MercurialSshHandler.make(shlex.split('hg -R "foo bar" serve --stdio')).repo_name
 
        u'foo bar'
 
        'foo bar'
 
        >>> MercurialSshHandler.make(shlex.split(' hg -R blåbærgrød serve --stdio ')).repo_name
 
        u'bl\xe5b\xe6rgr\xf8d'
 
        'bl\xe5b\xe6rgr\xf8d'
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R 'foo"bar' serve --stdio''')).repo_name
 
        u'foo"bar'
 
        'foo"bar'
 

	
 
        >>> MercurialSshHandler.make(shlex.split('/bin/hg -R "foo" serve --stdio'))
 
        >>> MercurialSshHandler.make(shlex.split('''hg -R "foo"bar" serve --stdio''')) # ssh-serve will report: Error parsing SSH command "...": invalid syntax
 
        Traceback (most recent call last):
 
        ValueError: No closing quotation
 
        >>> MercurialSshHandler.make(shlex.split('git-upload-pack "/foo"')) # not handled here
 
        """
 
        if ssh_command_parts[:2] == ['hg', '-R'] and ssh_command_parts[3:] == ['serve', '--stdio']:
 
            return cls(ssh_command_parts[2])
 

	
 
        return None
 

	
 
    def _serve(self):
 
        # Note: we want a repo with config based on .hg/hgrc and can thus not use self.db_repo.scm_instance._repo.ui
 
        baseui = make_ui(repo_path=self.db_repo.repo_full_path)
 
        if not self.allow_push:
 
            baseui.setconfig(b'hooks', b'pretxnopen._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 
            baseui.setconfig(b'hooks', b'prepushkey._ssh_reject', b'python:kallithea.lib.hooks.rejectpush')
 

	
 
        repo = mercurial.hg.repository(baseui, safe_bytes(self.db_repo.repo_full_path))
 
        log.debug("Starting Mercurial sshserver for %s", self.db_repo.repo_full_path)
 
        mercurial.wireprotoserver.sshserver(baseui, repo).serve_forever()
kallithea/lib/vcs/utils/helpers.py
Show inline comments
 
"""
 
Utilities aimed to help achieve mostly basic tasks.
 
"""
 
from __future__ import division
 

	
 
import datetime
 
import os
 
import re
 
import time
 

	
 
from kallithea.lib.vcs.exceptions import RepositoryError, VCSError
 
from kallithea.lib.vcs.utils.paths import abspath
 

	
 

	
 
ALIASES = ['hg', 'git']
 

	
 

	
 
def get_scm(path, search_up=False, explicit_alias=None):
 
    """
 
    Returns one of alias from ``ALIASES`` (in order of precedence same as
 
    shortcuts given in ``ALIASES``) and top working dir path for the given
 
    argument. If no scm-specific directory is found or more than one scm is
 
    found at that directory, ``VCSError`` is raised.
 

	
 
    :param search_up: if set to ``True``, this function would try to
 
      move up to parent directory every time no scm is recognized for the
 
      currently checked path. Default: ``False``.
 
    :param explicit_alias: can be one of available backend aliases, when given
 
      it will return given explicit alias in repositories under more than one
 
      version control, if explicit_alias is different than found it will raise
 
      VCSError
 
    """
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %s is not a directory" % path)
 

	
 
    while True:
 
        found_scms = [(scm, path) for scm in get_scms_for_path(path)]
 
        if found_scms or not search_up:
 
            break
 
        newpath = abspath(path, '..')
 
        if newpath == path:
 
            break
 
        path = newpath
 

	
 
    if len(found_scms) > 1:
 
        for scm in found_scms:
 
            if scm[0] == explicit_alias:
 
                return scm
 
        raise VCSError('More than one [%s] scm found at given path %s'
 
                       % (', '.join((x[0] for x in found_scms)), path))
 

	
 
    if len(found_scms) == 0:
 
        raise VCSError('No scm found at given path %s' % path)
 

	
 
    return found_scms[0]
 

	
 

	
 
def get_scms_for_path(path):
 
    """
 
    Returns all scm's found at the given path. If no scm is recognized
 
    - empty list is returned.
 

	
 
    :param path: path to directory which should be checked. May be callable.
 

	
 
    :raises VCSError: if given ``path`` is not a directory
 
    """
 
    from kallithea.lib.vcs.backends import get_backend
 
    if hasattr(path, '__call__'):
 
        path = path()
 
    if not os.path.isdir(path):
 
        raise VCSError("Given path %r is not a directory" % path)
 

	
 
    result = []
 
    for key in ALIASES:
 
        # find .hg / .git
 
        dirname = os.path.join(path, '.' + key)
 
        if os.path.isdir(dirname):
 
            result.append(key)
 
            continue
 
        # find rm__.hg / rm__.git too - left overs from old method for deleting
 
        dirname = os.path.join(path, 'rm__.' + key)
 
        if os.path.isdir(dirname):
 
            return result
 
        # We still need to check if it's not bare repository as
 
        # bare repos don't have working directories
 
        try:
 
            get_backend(key)(path)
 
            result.append(key)
 
            continue
 
        except RepositoryError:
 
            # Wrong backend
 
            pass
 
        except VCSError:
 
            # No backend at all
 
            pass
 
    return result
 

	
 

	
 
def get_highlighted_code(name, code, type='terminal'):
 
    """
 
    If pygments are available on the system
 
    then returned output is colored. Otherwise
 
    unchanged content is returned.
 
    """
 
    import logging
 
    try:
 
        import pygments
 
        pygments
 
    except ImportError:
 
        return code
 
    from pygments import highlight
 
    from pygments.lexers import guess_lexer_for_filename, ClassNotFound
 
    from pygments.formatters import TerminalFormatter
 

	
 
    try:
 
        lexer = guess_lexer_for_filename(name, code)
 
        formatter = TerminalFormatter()
 
        content = highlight(code, lexer, formatter)
 
    except ClassNotFound:
 
        logging.debug("Couldn't guess Lexer, will not use pygments.")
 
        content = code
 
    return content
 

	
 

	
 
def parse_changesets(text):
 
    """
 
    Returns dictionary with *start*, *main* and *end* ids.
 

	
 
    Examples::
 

	
 
        >>> parse_changesets('aaabbb')
 
        {'start': None, 'main': 'aaabbb', 'end': None}
 
        >>> parse_changesets('aaabbb..cccddd')
 
        {'start': 'aaabbb', 'main': None, 'end': 'cccddd'}
 
        {'start': 'aaabbb', 'end': 'cccddd', 'main': None}
 

	
 
    """
 
    text = text.strip()
 
    CID_RE = r'[a-zA-Z0-9]+'
 
    if '..' not in text:
 
        m = re.match(r'^(?P<cid>%s)$' % CID_RE, text)
 
        if m:
 
            return {
 
                'start': None,
 
                'main': text,
 
                'end': None,
 
            }
 
    else:
 
        RE = r'^(?P<start>%s)?\.{2,3}(?P<end>%s)?$' % (CID_RE, CID_RE)
 
        m = re.match(RE, text)
 
        if m:
 
            result = m.groupdict()
 
            result['main'] = None
 
            return result
 
    raise ValueError("IDs not recognized")
 

	
 

	
 
def parse_datetime(text):
 
    """
 
    Parses given text and returns ``datetime.datetime`` instance or raises
 
    ``ValueError``.
 

	
 
    :param text: string of desired date/datetime or something more verbose,
 
      like *yesterday*, *2weeks 3days*, etc.
 
    """
 

	
 
    text = text.strip().lower()
 

	
 
    INPUT_FORMATS = (
 
        '%Y-%m-%d %H:%M:%S',
 
        '%Y-%m-%d %H:%M',
 
        '%Y-%m-%d',
 
        '%m/%d/%Y %H:%M:%S',
 
        '%m/%d/%Y %H:%M',
 
        '%m/%d/%Y',
 
        '%m/%d/%y %H:%M:%S',
 
        '%m/%d/%y %H:%M',
 
        '%m/%d/%y',
 
    )
 
    for format in INPUT_FORMATS:
 
        try:
 
            return datetime.datetime(*time.strptime(text, format)[:6])
 
        except ValueError:
 
            pass
 

	
 
    # Try descriptive texts
 
    if text == 'tomorrow':
 
        future = datetime.datetime.now() + datetime.timedelta(days=1)
 
        args = future.timetuple()[:3] + (23, 59, 59)
 
        return datetime.datetime(*args)
 
    elif text == 'today':
 
        return datetime.datetime(*datetime.datetime.today().timetuple()[:3])
 
    elif text == 'now':
 
        return datetime.datetime.now()
 
    elif text == 'yesterday':
 
        past = datetime.datetime.now() - datetime.timedelta(days=1)
 
        return datetime.datetime(*past.timetuple()[:3])
 
    else:
 
        days = 0
 
        matched = re.match(
 
            r'^((?P<weeks>\d+) ?w(eeks?)?)? ?((?P<days>\d+) ?d(ays?)?)?$', text)
 
        if matched:
 
            groupdict = matched.groupdict()
 
            if groupdict['days']:
 
                days += int(matched.groupdict()['days'])
 
            if groupdict['weeks']:
 
                days += int(matched.groupdict()['weeks']) * 7
 
            past = datetime.datetime.now() - datetime.timedelta(days=days)
 
            return datetime.datetime(*past.timetuple()[:3])
 

	
 
    raise ValueError('Wrong date: "%s"' % text)
 

	
 

	
 
def get_dict_for_attrs(obj, attrs):
 
    """
 
    Returns dictionary for each attribute from given ``obj``.
 
    """
 
    data = {}
 
    for attr in attrs:
 
        data[attr] = getattr(obj, attr)
 
    return data
0 comments (0 inline, 0 general)