|
|
@@ -16,6 +16,7 @@ from backend.app.services.ldap_service import (
|
|
|
LDAPConfig,
|
|
|
LDAPUserInfo,
|
|
|
_ldap_escape,
|
|
|
+ authenticate_ldap_user,
|
|
|
parse_ldap_config,
|
|
|
resolve_group_mapping,
|
|
|
)
|
|
|
@@ -55,6 +56,7 @@ class TestParseConfig:
|
|
|
assert config.group_mapping == {}
|
|
|
assert config.auto_provision is False
|
|
|
assert config.ca_cert_path == ""
|
|
|
+ assert config.default_group == ""
|
|
|
|
|
|
def test_parses_full_config(self):
|
|
|
settings = {
|
|
|
@@ -68,6 +70,7 @@ class TestParseConfig:
|
|
|
"ldap_group_mapping": '{"cn=admins,dc=example,dc=com": "Administrators"}',
|
|
|
"ldap_auto_provision": "true",
|
|
|
"ldap_ca_cert_path": "/path/to/ca.pem",
|
|
|
+ "ldap_default_group": "Viewers",
|
|
|
}
|
|
|
config = parse_ldap_config(settings)
|
|
|
assert config is not None
|
|
|
@@ -79,6 +82,7 @@ class TestParseConfig:
|
|
|
assert config.group_mapping == {"cn=admins,dc=example,dc=com": "Administrators"}
|
|
|
assert config.auto_provision is True
|
|
|
assert config.ca_cert_path == "/path/to/ca.pem"
|
|
|
+ assert config.default_group == "Viewers"
|
|
|
|
|
|
def test_handles_invalid_group_mapping_json(self):
|
|
|
settings = {
|
|
|
@@ -113,11 +117,13 @@ class TestParseConfig:
|
|
|
"ldap_server_url": " ldaps://ldap.example.com ",
|
|
|
"ldap_bind_dn": " cn=admin,dc=example,dc=com ",
|
|
|
"ldap_search_base": " dc=example,dc=com ",
|
|
|
+ "ldap_default_group": " Viewers ",
|
|
|
}
|
|
|
config = parse_ldap_config(settings)
|
|
|
assert config.server_url == "ldaps://ldap.example.com"
|
|
|
assert config.bind_dn == "cn=admin,dc=example,dc=com"
|
|
|
assert config.search_base == "dc=example,dc=com"
|
|
|
+ assert config.default_group == "Viewers"
|
|
|
|
|
|
|
|
|
class TestLDAPEscape:
|
|
|
@@ -225,6 +231,197 @@ class TestDataclasses:
|
|
|
group_mapping={"cn=admins": "Administrators"},
|
|
|
auto_provision=True,
|
|
|
ca_cert_path="",
|
|
|
+ default_group="Viewers",
|
|
|
)
|
|
|
assert config.server_url == "ldaps://ldap.example.com:636"
|
|
|
assert config.auto_provision is True
|
|
|
+ assert config.default_group == "Viewers"
|
|
|
+
|
|
|
+
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# Mocked authenticate_ldap_user group-discovery tests
|
|
|
+# ---------------------------------------------------------------------------
|
|
|
+# These tests mock ldap3.Connection to exercise the group-discovery logic in
|
|
|
+# authenticate_ldap_user without a live LDAP server. Added after a bug where
|
|
|
+# POSIX primary-group membership (via gidNumber) was ignored — see CHANGELOG.
|
|
|
+
|
|
|
+
|
|
|
+class _MockAttr:
|
|
|
+ """Minimal stand-in for ldap3 Attribute objects.
|
|
|
+
|
|
|
+ Supports str(), bool(), .value, .values, and iteration — the operations
|
|
|
+ used by ldap_service against user entry attributes.
|
|
|
+ """
|
|
|
+
|
|
|
+ def __init__(self, value):
|
|
|
+ self._value = value
|
|
|
+
|
|
|
+ @property
|
|
|
+ def value(self):
|
|
|
+ return self._value
|
|
|
+
|
|
|
+ @property
|
|
|
+ def values(self):
|
|
|
+ return self._value if isinstance(self._value, list) else [self._value]
|
|
|
+
|
|
|
+ def __str__(self):
|
|
|
+ return str(self._value)
|
|
|
+
|
|
|
+ def __bool__(self):
|
|
|
+ return bool(self._value)
|
|
|
+
|
|
|
+ def __iter__(self):
|
|
|
+ if isinstance(self._value, list):
|
|
|
+ return iter(self._value)
|
|
|
+ return iter([self._value])
|
|
|
+
|
|
|
+
|
|
|
+class _MockEntry:
|
|
|
+ """Minimal stand-in for ldap3 Entry. Only attributes passed at construction exist."""
|
|
|
+
|
|
|
+ def __init__(self, dn, **attrs):
|
|
|
+ self.entry_dn = dn
|
|
|
+ for key, val in attrs.items():
|
|
|
+ setattr(self, key, _MockAttr(val))
|
|
|
+
|
|
|
+
|
|
|
+class _MockConnection:
|
|
|
+ """Mock ldap3 Connection that returns pre-configured entries based on filter substring match.
|
|
|
+
|
|
|
+ Every Connection() instance shares a class-level fixture dict so the service-account
|
|
|
+ connection and the user-bind connection both see the same fake directory.
|
|
|
+ """
|
|
|
+
|
|
|
+ _search_fixture: dict[str, list] = {}
|
|
|
+ _instances: list["_MockConnection"] = []
|
|
|
+
|
|
|
+ def __init__(self, *args, **kwargs):
|
|
|
+ self.entries: list = []
|
|
|
+ self.search_calls: list[str] = []
|
|
|
+ _MockConnection._instances.append(self)
|
|
|
+
|
|
|
+ def open(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def start_tls(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def bind(self):
|
|
|
+ return True
|
|
|
+
|
|
|
+ def unbind(self):
|
|
|
+ pass
|
|
|
+
|
|
|
+ def search(self, search_base=None, search_filter=None, search_scope=None, attributes=None):
|
|
|
+ self.search_calls.append(search_filter or "")
|
|
|
+ for needle, entries in _MockConnection._search_fixture.items():
|
|
|
+ if needle in (search_filter or ""):
|
|
|
+ self.entries = entries
|
|
|
+ return True
|
|
|
+ self.entries = []
|
|
|
+ return True
|
|
|
+
|
|
|
+
|
|
|
+@pytest.fixture
|
|
|
+def mock_ldap(monkeypatch):
|
|
|
+ """Patch Connection + _create_server in ldap_service so authenticate_ldap_user can run offline."""
|
|
|
+ _MockConnection._search_fixture = {}
|
|
|
+ _MockConnection._instances = []
|
|
|
+ monkeypatch.setattr("backend.app.services.ldap_service.Connection", _MockConnection)
|
|
|
+ monkeypatch.setattr("backend.app.services.ldap_service._create_server", lambda config: None)
|
|
|
+ return _MockConnection
|
|
|
+
|
|
|
+
|
|
|
+def _base_config(**overrides):
|
|
|
+ """Build a minimal LDAPConfig for mocked tests."""
|
|
|
+ defaults = {
|
|
|
+ "server_url": "ldaps://test.example.com:636",
|
|
|
+ "bind_dn": "cn=admin,dc=test,dc=com",
|
|
|
+ "bind_password": "x",
|
|
|
+ "search_base": "dc=test,dc=com",
|
|
|
+ "user_filter": "(uid={username})",
|
|
|
+ "security": "ldaps",
|
|
|
+ "group_mapping": {},
|
|
|
+ "auto_provision": False,
|
|
|
+ "ca_cert_path": "",
|
|
|
+ "default_group": "",
|
|
|
+ }
|
|
|
+ defaults.update(overrides)
|
|
|
+ return LDAPConfig(**defaults)
|
|
|
+
|
|
|
+
|
|
|
+class TestAuthenticateLdapUserGroups:
|
|
|
+ """Group-discovery behaviour in authenticate_ldap_user.
|
|
|
+
|
|
|
+ Covers the POSIX primary gidNumber lookup and case-insensitive dedupe added
|
|
|
+ to fix a bug where users whose role came from their primary group were
|
|
|
+ authenticated without the correct group membership.
|
|
|
+ """
|
|
|
+
|
|
|
+ def test_primary_gidnumber_group_found(self, mock_ldap):
|
|
|
+ """Regression: POSIX primary group (gidNumber match) must be included in the result."""
|
|
|
+ user_entry = _MockEntry("cn=mz,dc=test,dc=com", uid="mz", gidNumber=10002)
|
|
|
+ operators_group = _MockEntry("cn=bambuddy-operators,ou=groups,dc=test,dc=com")
|
|
|
+
|
|
|
+ mock_ldap._search_fixture = {
|
|
|
+ "(uid=mz)": [user_entry],
|
|
|
+ "memberUid=mz": [], # no supplementary memberships
|
|
|
+ "gidNumber=10002": [operators_group],
|
|
|
+ }
|
|
|
+
|
|
|
+ info = authenticate_ldap_user(_base_config(), "mz", "password")
|
|
|
+
|
|
|
+ assert info is not None
|
|
|
+ assert info.groups == ["cn=bambuddy-operators,ou=groups,dc=test,dc=com"]
|
|
|
+
|
|
|
+ def test_dedupes_group_found_via_both_memberuid_and_primary_gid(self, mock_ldap):
|
|
|
+ """A user in the same group via BOTH memberUid and primary gidNumber should appear once."""
|
|
|
+ user_entry = _MockEntry("cn=mz,dc=test,dc=com", uid="mz", gidNumber=10002)
|
|
|
+ group_entry = _MockEntry("cn=bambuddy-operators,ou=groups,dc=test,dc=com")
|
|
|
+
|
|
|
+ mock_ldap._search_fixture = {
|
|
|
+ "(uid=mz)": [user_entry],
|
|
|
+ "memberUid=mz": [group_entry], # supplementary membership
|
|
|
+ "gidNumber=10002": [group_entry], # primary group — same DN
|
|
|
+ }
|
|
|
+
|
|
|
+ info = authenticate_ldap_user(_base_config(), "mz", "password")
|
|
|
+
|
|
|
+ assert info.groups == ["cn=bambuddy-operators,ou=groups,dc=test,dc=com"]
|
|
|
+
|
|
|
+ def test_case_insensitive_dedupe(self, mock_ldap):
|
|
|
+ """DNs differing only in case should collapse to a single entry (LDAP DNs are case-insensitive)."""
|
|
|
+ user_entry = _MockEntry("cn=mz,dc=test,dc=com", uid="mz", gidNumber=10002)
|
|
|
+ upper_dn = _MockEntry("CN=Bambuddy-Operators,OU=Groups,DC=Test,DC=Com")
|
|
|
+ lower_dn = _MockEntry("cn=bambuddy-operators,ou=groups,dc=test,dc=com")
|
|
|
+
|
|
|
+ mock_ldap._search_fixture = {
|
|
|
+ "(uid=mz)": [user_entry],
|
|
|
+ "memberUid=mz": [upper_dn],
|
|
|
+ "gidNumber=10002": [lower_dn],
|
|
|
+ }
|
|
|
+
|
|
|
+ info = authenticate_ldap_user(_base_config(), "mz", "password")
|
|
|
+
|
|
|
+ assert len(info.groups) == 1
|
|
|
+ # The first-seen casing (memberUid result) is kept.
|
|
|
+ assert info.groups[0] == "CN=Bambuddy-Operators,OU=Groups,DC=Test,DC=Com"
|
|
|
+
|
|
|
+ def test_no_gidnumber_skips_primary_search(self, mock_ldap):
|
|
|
+ """User entries without a gidNumber attribute should not crash and should not issue the primary-gid query."""
|
|
|
+ user_entry = _MockEntry("cn=tester,dc=test,dc=com", uid="tester") # no gidNumber
|
|
|
+ viewers_group = _MockEntry("cn=bambuddy-viewers,ou=groups,dc=test,dc=com")
|
|
|
+
|
|
|
+ mock_ldap._search_fixture = {
|
|
|
+ "(uid=tester)": [user_entry],
|
|
|
+ "memberUid=tester": [viewers_group],
|
|
|
+ }
|
|
|
+
|
|
|
+ info = authenticate_ldap_user(_base_config(), "tester", "password")
|
|
|
+
|
|
|
+ assert info is not None
|
|
|
+ assert info.groups == ["cn=bambuddy-viewers,ou=groups,dc=test,dc=com"]
|
|
|
+ # Ensure the primary-gidNumber search was never issued — verifying the guard works.
|
|
|
+ service_conn = _MockConnection._instances[0]
|
|
|
+ gidnumber_searches = [call for call in service_conn.search_calls if "gidNumber=" in call]
|
|
|
+ assert gidnumber_searches == []
|