|
|
@@ -0,0 +1,86 @@
|
|
|
+"""Tests for daemon.main._deploy_ssh_key — Bambuddy key sync.
|
|
|
+
|
|
|
+Background: Bambuddy generates an ed25519 keypair under its data dir and ships
|
|
|
+the public half to the SpoolBuddy daemon over the registration/heartbeat
|
|
|
+response. The daemon writes that key into ~/.ssh/authorized_keys so Bambuddy
|
|
|
+can SSH in to drive remote updates. Whenever Bambuddy's keypair rotates (data
|
|
|
+volume wiped, container recreated, fresh deploy) the device's authorized_keys
|
|
|
+must drop the old entries and pick up the new one — otherwise:
|
|
|
+
|
|
|
+ 1. SSH updates start failing silently with permission-denied
|
|
|
+ 2. Stale Bambuddy-tagged keys pile up over time, eroding the security
|
|
|
+ boundary (any prior keypair Bambuddy held is permanently authorized).
|
|
|
+
|
|
|
+These tests pin the replace-not-append semantics of the deploy helper.
|
|
|
+"""
|
|
|
+
|
|
|
+from unittest.mock import patch
|
|
|
+
|
|
|
+from daemon.main import _deploy_ssh_key
|
|
|
+
|
|
|
+CURRENT_KEY = "ssh-ed25519 AAAACURRENT bambuddy-spoolbuddy"
|
|
|
+STALE_KEY_1 = "ssh-ed25519 AAAASTALE1 bambuddy-spoolbuddy"
|
|
|
+STALE_KEY_2 = "ssh-ed25519 AAAASTALE2 bambuddy-spoolbuddy"
|
|
|
+USER_KEY = "ssh-ed25519 AAAAUSER alice@laptop"
|
|
|
+
|
|
|
+
|
|
|
+class TestDeploySshKey:
|
|
|
+ def test_creates_authorized_keys_when_missing(self, tmp_path):
|
|
|
+ with patch("daemon.main.Path.home", return_value=tmp_path):
|
|
|
+ _deploy_ssh_key(CURRENT_KEY)
|
|
|
+
|
|
|
+ auth_keys = tmp_path / ".ssh" / "authorized_keys"
|
|
|
+ assert auth_keys.exists()
|
|
|
+ assert auth_keys.read_text().strip() == CURRENT_KEY
|
|
|
+ assert auth_keys.stat().st_mode & 0o777 == 0o600
|
|
|
+
|
|
|
+ def test_replaces_all_prior_bambuddy_tagged_keys(self, tmp_path):
|
|
|
+ """The pile-up scenario: 6+ stale keys accumulated over rotations.
|
|
|
+ After deploy, only the current key remains — no growth."""
|
|
|
+ ssh_dir = tmp_path / ".ssh"
|
|
|
+ ssh_dir.mkdir()
|
|
|
+ auth_keys = ssh_dir / "authorized_keys"
|
|
|
+ auth_keys.write_text(f"{STALE_KEY_1}\n{STALE_KEY_2}\n")
|
|
|
+
|
|
|
+ with patch("daemon.main.Path.home", return_value=tmp_path):
|
|
|
+ _deploy_ssh_key(CURRENT_KEY)
|
|
|
+
|
|
|
+ lines = auth_keys.read_text().strip().splitlines()
|
|
|
+ assert lines == [CURRENT_KEY]
|
|
|
+
|
|
|
+ def test_preserves_unrelated_user_keys(self, tmp_path):
|
|
|
+ """Only Bambuddy-tagged keys get replaced — user's own keys stay."""
|
|
|
+ ssh_dir = tmp_path / ".ssh"
|
|
|
+ ssh_dir.mkdir()
|
|
|
+ auth_keys = ssh_dir / "authorized_keys"
|
|
|
+ auth_keys.write_text(f"{USER_KEY}\n{STALE_KEY_1}\n")
|
|
|
+
|
|
|
+ with patch("daemon.main.Path.home", return_value=tmp_path):
|
|
|
+ _deploy_ssh_key(CURRENT_KEY)
|
|
|
+
|
|
|
+ lines = auth_keys.read_text().strip().splitlines()
|
|
|
+ assert USER_KEY in lines
|
|
|
+ assert STALE_KEY_1 not in lines
|
|
|
+ assert CURRENT_KEY in lines
|
|
|
+
|
|
|
+ def test_idempotent_when_already_in_sync(self, tmp_path):
|
|
|
+ """No-op when authorized_keys already matches the desired state —
|
|
|
+ avoids needless writes on every heartbeat."""
|
|
|
+ ssh_dir = tmp_path / ".ssh"
|
|
|
+ ssh_dir.mkdir()
|
|
|
+ auth_keys = ssh_dir / "authorized_keys"
|
|
|
+ auth_keys.write_text(f"{USER_KEY}\n{CURRENT_KEY}\n")
|
|
|
+ original_mtime = auth_keys.stat().st_mtime_ns
|
|
|
+
|
|
|
+ with patch("daemon.main.Path.home", return_value=tmp_path):
|
|
|
+ _deploy_ssh_key(CURRENT_KEY)
|
|
|
+
|
|
|
+ assert auth_keys.stat().st_mtime_ns == original_mtime
|
|
|
+
|
|
|
+ def test_swallows_write_errors(self, tmp_path):
|
|
|
+ """A failed deploy must not crash the heartbeat loop."""
|
|
|
+ with (
|
|
|
+ patch("daemon.main.Path.home", return_value=tmp_path),
|
|
|
+ patch("daemon.main.Path.mkdir", side_effect=PermissionError("readonly fs")),
|
|
|
+ ):
|
|
|
+ _deploy_ssh_key(CURRENT_KEY) # should not raise
|