| 1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586 |
- """Tests for daemon.main._deploy_ssh_key — Bambuddy key sync.
- Background: Bambuddy generates an ed25519 keypair under its data dir and ships
- the public half to the SpoolBuddy daemon over the registration/heartbeat
- response. The daemon writes that key into ~/.ssh/authorized_keys so Bambuddy
- can SSH in to drive remote updates. Whenever Bambuddy's keypair rotates (data
- volume wiped, container recreated, fresh deploy) the device's authorized_keys
- must drop the old entries and pick up the new one — otherwise:
- 1. SSH updates start failing silently with permission-denied
- 2. Stale Bambuddy-tagged keys pile up over time, eroding the security
- boundary (any prior keypair Bambuddy held is permanently authorized).
- These tests pin the replace-not-append semantics of the deploy helper.
- """
- from unittest.mock import patch
- from daemon.main import _deploy_ssh_key
- CURRENT_KEY = "ssh-ed25519 AAAACURRENT bambuddy-spoolbuddy"
- STALE_KEY_1 = "ssh-ed25519 AAAASTALE1 bambuddy-spoolbuddy"
- STALE_KEY_2 = "ssh-ed25519 AAAASTALE2 bambuddy-spoolbuddy"
- USER_KEY = "ssh-ed25519 AAAAUSER alice@laptop"
- class TestDeploySshKey:
- def test_creates_authorized_keys_when_missing(self, tmp_path):
- with patch("daemon.main.Path.home", return_value=tmp_path):
- _deploy_ssh_key(CURRENT_KEY)
- auth_keys = tmp_path / ".ssh" / "authorized_keys"
- assert auth_keys.exists()
- assert auth_keys.read_text().strip() == CURRENT_KEY
- assert auth_keys.stat().st_mode & 0o777 == 0o600
- def test_replaces_all_prior_bambuddy_tagged_keys(self, tmp_path):
- """The pile-up scenario: 6+ stale keys accumulated over rotations.
- After deploy, only the current key remains — no growth."""
- ssh_dir = tmp_path / ".ssh"
- ssh_dir.mkdir()
- auth_keys = ssh_dir / "authorized_keys"
- auth_keys.write_text(f"{STALE_KEY_1}\n{STALE_KEY_2}\n")
- with patch("daemon.main.Path.home", return_value=tmp_path):
- _deploy_ssh_key(CURRENT_KEY)
- lines = auth_keys.read_text().strip().splitlines()
- assert lines == [CURRENT_KEY]
- def test_preserves_unrelated_user_keys(self, tmp_path):
- """Only Bambuddy-tagged keys get replaced — user's own keys stay."""
- ssh_dir = tmp_path / ".ssh"
- ssh_dir.mkdir()
- auth_keys = ssh_dir / "authorized_keys"
- auth_keys.write_text(f"{USER_KEY}\n{STALE_KEY_1}\n")
- with patch("daemon.main.Path.home", return_value=tmp_path):
- _deploy_ssh_key(CURRENT_KEY)
- lines = auth_keys.read_text().strip().splitlines()
- assert USER_KEY in lines
- assert STALE_KEY_1 not in lines
- assert CURRENT_KEY in lines
- def test_idempotent_when_already_in_sync(self, tmp_path):
- """No-op when authorized_keys already matches the desired state —
- avoids needless writes on every heartbeat."""
- ssh_dir = tmp_path / ".ssh"
- ssh_dir.mkdir()
- auth_keys = ssh_dir / "authorized_keys"
- auth_keys.write_text(f"{USER_KEY}\n{CURRENT_KEY}\n")
- original_mtime = auth_keys.stat().st_mtime_ns
- with patch("daemon.main.Path.home", return_value=tmp_path):
- _deploy_ssh_key(CURRENT_KEY)
- assert auth_keys.stat().st_mtime_ns == original_mtime
- def test_swallows_write_errors(self, tmp_path):
- """A failed deploy must not crash the heartbeat loop."""
- with (
- patch("daemon.main.Path.home", return_value=tmp_path),
- patch("daemon.main.Path.mkdir", side_effect=PermissionError("readonly fs")),
- ):
- _deploy_ssh_key(CURRENT_KEY) # should not raise
|