test_deploy_ssh_key.py 3.6 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586
  1. """Tests for daemon.main._deploy_ssh_key — Bambuddy key sync.
  2. Background: Bambuddy generates an ed25519 keypair under its data dir and ships
  3. the public half to the SpoolBuddy daemon over the registration/heartbeat
  4. response. The daemon writes that key into ~/.ssh/authorized_keys so Bambuddy
  5. can SSH in to drive remote updates. Whenever Bambuddy's keypair rotates (data
  6. volume wiped, container recreated, fresh deploy) the device's authorized_keys
  7. must drop the old entries and pick up the new one — otherwise:
  8. 1. SSH updates start failing silently with permission-denied
  9. 2. Stale Bambuddy-tagged keys pile up over time, eroding the security
  10. boundary (any prior keypair Bambuddy held is permanently authorized).
  11. These tests pin the replace-not-append semantics of the deploy helper.
  12. """
  13. from unittest.mock import patch
  14. from daemon.main import _deploy_ssh_key
  15. CURRENT_KEY = "ssh-ed25519 AAAACURRENT bambuddy-spoolbuddy"
  16. STALE_KEY_1 = "ssh-ed25519 AAAASTALE1 bambuddy-spoolbuddy"
  17. STALE_KEY_2 = "ssh-ed25519 AAAASTALE2 bambuddy-spoolbuddy"
  18. USER_KEY = "ssh-ed25519 AAAAUSER alice@laptop"
  19. class TestDeploySshKey:
  20. def test_creates_authorized_keys_when_missing(self, tmp_path):
  21. with patch("daemon.main.Path.home", return_value=tmp_path):
  22. _deploy_ssh_key(CURRENT_KEY)
  23. auth_keys = tmp_path / ".ssh" / "authorized_keys"
  24. assert auth_keys.exists()
  25. assert auth_keys.read_text().strip() == CURRENT_KEY
  26. assert auth_keys.stat().st_mode & 0o777 == 0o600
  27. def test_replaces_all_prior_bambuddy_tagged_keys(self, tmp_path):
  28. """The pile-up scenario: 6+ stale keys accumulated over rotations.
  29. After deploy, only the current key remains — no growth."""
  30. ssh_dir = tmp_path / ".ssh"
  31. ssh_dir.mkdir()
  32. auth_keys = ssh_dir / "authorized_keys"
  33. auth_keys.write_text(f"{STALE_KEY_1}\n{STALE_KEY_2}\n")
  34. with patch("daemon.main.Path.home", return_value=tmp_path):
  35. _deploy_ssh_key(CURRENT_KEY)
  36. lines = auth_keys.read_text().strip().splitlines()
  37. assert lines == [CURRENT_KEY]
  38. def test_preserves_unrelated_user_keys(self, tmp_path):
  39. """Only Bambuddy-tagged keys get replaced — user's own keys stay."""
  40. ssh_dir = tmp_path / ".ssh"
  41. ssh_dir.mkdir()
  42. auth_keys = ssh_dir / "authorized_keys"
  43. auth_keys.write_text(f"{USER_KEY}\n{STALE_KEY_1}\n")
  44. with patch("daemon.main.Path.home", return_value=tmp_path):
  45. _deploy_ssh_key(CURRENT_KEY)
  46. lines = auth_keys.read_text().strip().splitlines()
  47. assert USER_KEY in lines
  48. assert STALE_KEY_1 not in lines
  49. assert CURRENT_KEY in lines
  50. def test_idempotent_when_already_in_sync(self, tmp_path):
  51. """No-op when authorized_keys already matches the desired state —
  52. avoids needless writes on every heartbeat."""
  53. ssh_dir = tmp_path / ".ssh"
  54. ssh_dir.mkdir()
  55. auth_keys = ssh_dir / "authorized_keys"
  56. auth_keys.write_text(f"{USER_KEY}\n{CURRENT_KEY}\n")
  57. original_mtime = auth_keys.stat().st_mtime_ns
  58. with patch("daemon.main.Path.home", return_value=tmp_path):
  59. _deploy_ssh_key(CURRENT_KEY)
  60. assert auth_keys.stat().st_mtime_ns == original_mtime
  61. def test_swallows_write_errors(self, tmp_path):
  62. """A failed deploy must not crash the heartbeat loop."""
  63. with (
  64. patch("daemon.main.Path.home", return_value=tmp_path),
  65. patch("daemon.main.Path.mkdir", side_effect=PermissionError("readonly fs")),
  66. ):
  67. _deploy_ssh_key(CURRENT_KEY) # should not raise