@@ -145,219 +145,219 @@ class AuthLdap(object):
if dn is None:
continue
try:
log.debug('Trying simple bind with %s' % dn)
server.simple_bind_s(dn, safe_str(password))
attrs = server.search_ext_s(dn, ldap.SCOPE_BASE,
'(objectClass=*)')[0][1]
break
except ldap.INVALID_CREDENTIALS:
log.debug("LDAP rejected password for user '%s' (%s): %s"
% (uid, username, dn))
else:
log.debug("No matching LDAP objects for authentication "
"of '%s' (%s)", uid, username)
raise LdapPasswordError()
except ldap.NO_SUCH_OBJECT:
log.debug("LDAP says no such user '%s' (%s)" % (uid, username))
raise LdapUsernameError()
except ldap.SERVER_DOWN:
raise LdapConnectionError("LDAP can't access authentication server")
return dn, attrs
class KallitheaAuthPlugin(auth_modules.KallitheaExternalAuthPlugin):
def __init__(self):
self._logger = logging.getLogger(__name__)
self._tls_kind_values = ["PLAIN", "LDAPS", "START_TLS"]
self._tls_reqcert_values = ["NEVER", "ALLOW", "TRY", "DEMAND", "HARD"]
self._search_scopes = ["BASE", "ONELEVEL", "SUBTREE"]
@hybrid_property
def name(self):
return "ldap"
def settings(self):
settings = [
{
"name": "host",
"validator": self.validators.UnicodeString(strip=True),
"type": "string",
"description": "Host of the LDAP Server",
"formname": "LDAP Host"
},
"name": "port",
"validator": self.validators.Number(strip=True, not_empty=True),
"description": "Port that the LDAP server is listening on",
"default": 389,
"formname": "Port"
"name": "dn_user",
"description": "User to connect to LDAP",
"formname": "Account"
"name": "dn_pass",
"type": "password",
"description": "Password to connect to LDAP",
"formname": "Password"
"name": "tls_kind",
"validator": self.validators.OneOf(self._tls_kind_values),
"type": "select",
"values": self._tls_kind_values,
"description": "TLS Type",
"default": 'PLAIN',
"formname": "Connection Security"
"name": "tls_reqcert",
"validator": self.validators.OneOf(self._tls_reqcert_values),
"values": self._tls_reqcert_values,
"description": "Require Cert over TLS?",
"formname": "Certificate Checks"
"name": "base_dn",
"description": "Base DN to search (e.g., dc=mydomain,dc=com)",
"formname": "Base DN"
"name": "filter",
"description": "Filter to narrow results (e.g., ou=Users, etc)",
"formname": "LDAP Search Filter"
"name": "search_scope",
"validator": self.validators.OneOf(self._search_scopes),
"values": self._search_scopes,
"description": "How deep to search LDAP",
"formname": "LDAP Search Scope"
"name": "attr_login",
"validator": self.validators.AttrLoginValidator(not_empty=True, strip=True),
"description": "LDAP Attribute to map to user name",
"formname": "Login Attribute"
"name": "attr_firstname",
"description": "LDAP Attribute to map to first name",
"formname": "First Name Attribute"
"name": "attr_lastname",
"description": "LDAP Attribute to map to last name",
"formname": "Last Name Attribute"
"name": "attr_email",
"description": "LDAP Attribute to map to email address",
"formname": "Email Attribute"
}
]
return settings
def use_fake_password(self):
return True
def user_activation_state(self):
def_user_perms = User.get_default_user().AuthUser.permissions['global']
return 'hg.extern_activate.auto' in def_user_perms
def auth(self, userobj, username, password, settings, **kwargs):
"""
Given a user object (which may be null), username, a plaintext password,
and a settings object (containing all the keys needed as listed in settings()),
authenticate this user's login attempt.
Return None on failure. On success, return a dictionary of the form:
see: KallitheaAuthPluginBase.auth_func_attrs
This is later validated for correctness
if not username or not password:
log.debug('Empty username or password skipping...')
return None
kwargs = {
'server': settings.get('host', ''),
'base_dn': settings.get('base_dn', ''),
'port': settings.get('port'),
'bind_dn': settings.get('dn_user'),
'bind_pass': settings.get('dn_pass'),
'tls_kind': settings.get('tls_kind'),
'tls_reqcert': settings.get('tls_reqcert'),
'ldap_filter': settings.get('filter'),
'search_scope': settings.get('search_scope'),
'attr_login': settings.get('attr_login'),
'ldap_version': 3,
if kwargs['bind_dn'] and not kwargs['bind_pass']:
log.debug('Using dynamic binding.')
kwargs['bind_dn'] = kwargs['bind_dn'].replace('$login', username)
kwargs['bind_pass'] = password
log.debug('Checking for ldap authentication')
aldap = AuthLdap(**kwargs)
(user_dn, ldap_attrs) = aldap.authenticate_ldap(username, password)
log.debug('Got ldap DN response %s' % user_dn)
get_ldap_attr = lambda k: ldap_attrs.get(settings.get(k), [''])[0]
# old attrs fetched from Kallithea database
admin = getattr(userobj, 'admin', False)
active = getattr(userobj, 'active', True)
active = getattr(userobj, 'active', self.user_activation_state())
email = getattr(userobj, 'email', '')
firstname = getattr(userobj, 'firstname', '')
lastname = getattr(userobj, 'lastname', '')
extern_type = getattr(userobj, 'extern_type', '')
user_attrs = {
'username': username,
'firstname': safe_unicode(get_ldap_attr('attr_firstname') or firstname),
'lastname': safe_unicode(get_ldap_attr('attr_lastname') or lastname),
'groups': [],
'email': get_ldap_attr('attr_email' or email),
'admin': admin,
'active': active,
"active_from_extern": None,
'extern_name': user_dn,
'extern_type': extern_type,
log.info('user %s authenticated correctly' % user_attrs['username'])
return user_attrs
except (LdapUsernameError, LdapPasswordError, LdapImportError):
log.error(traceback.format_exc())
except (Exception,):
Status change: