|
@@ -1,5 +1,6 @@
|
|
|
"""Unit tests for SpoolBuddy SSH update service."""
|
|
"""Unit tests for SpoolBuddy SSH update service."""
|
|
|
|
|
|
|
|
|
|
+import asyncio
|
|
|
import os
|
|
import os
|
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
from unittest.mock import AsyncMock, MagicMock, patch
|
|
|
|
|
|
|
@@ -113,15 +114,49 @@ async def test_get_public_key(tmp_path):
|
|
|
# -- detect_current_branch ----------------------------------------------------
|
|
# -- detect_current_branch ----------------------------------------------------
|
|
|
|
|
|
|
|
|
|
|
|
|
-def test_detect_branch_from_git(tmp_path):
|
|
|
|
|
- (tmp_path / ".git").mkdir()
|
|
|
|
|
|
|
+def test_detect_branch_from_git_head(tmp_path):
|
|
|
|
|
+ """Read branch directly from .git/HEAD — no subprocess."""
|
|
|
|
|
+ git_dir = tmp_path / ".git"
|
|
|
|
|
+ git_dir.mkdir()
|
|
|
|
|
+ (git_dir / "HEAD").write_text("ref: refs/heads/dev\n")
|
|
|
|
|
+
|
|
|
with (
|
|
with (
|
|
|
patch("backend.app.services.spoolbuddy_ssh.settings") as mock_settings,
|
|
patch("backend.app.services.spoolbuddy_ssh.settings") as mock_settings,
|
|
|
|
|
+ patch("asyncio.create_subprocess_exec") as mock_exec,
|
|
|
patch("subprocess.run") as mock_run,
|
|
patch("subprocess.run") as mock_run,
|
|
|
):
|
|
):
|
|
|
mock_settings.base_dir = tmp_path
|
|
mock_settings.base_dir = tmp_path
|
|
|
- mock_run.return_value = MagicMock(returncode=0, stdout="dev\n")
|
|
|
|
|
assert detect_current_branch() == "dev"
|
|
assert detect_current_branch() == "dev"
|
|
|
|
|
+ # Regression guard: must not shell out (fails with getpwuid under
|
|
|
|
|
+ # arbitrary Docker PUIDs if ever reintroduced).
|
|
|
|
|
+ mock_exec.assert_not_called()
|
|
|
|
|
+ mock_run.assert_not_called()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def test_detect_branch_worktree_gitdir_file(tmp_path):
|
|
|
|
|
+ """Git worktrees store a `gitdir:` pointer instead of a dir — follow it."""
|
|
|
|
|
+ real_git_dir = tmp_path / "real-git"
|
|
|
|
|
+ real_git_dir.mkdir()
|
|
|
|
|
+ (real_git_dir / "HEAD").write_text("ref: refs/heads/feature-x\n")
|
|
|
|
|
+ (tmp_path / ".git").write_text(f"gitdir: {real_git_dir}\n")
|
|
|
|
|
+
|
|
|
|
|
+ with patch("backend.app.services.spoolbuddy_ssh.settings") as mock_settings:
|
|
|
|
|
+ mock_settings.base_dir = tmp_path
|
|
|
|
|
+ assert detect_current_branch() == "feature-x"
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+def test_detect_branch_detached_head_falls_back(tmp_path):
|
|
|
|
|
+ """Detached HEAD (raw commit hash) should fall through to the env var."""
|
|
|
|
|
+ git_dir = tmp_path / ".git"
|
|
|
|
|
+ git_dir.mkdir()
|
|
|
|
|
+ (git_dir / "HEAD").write_text("deadbeef1234\n")
|
|
|
|
|
+
|
|
|
|
|
+ with (
|
|
|
|
|
+ patch("backend.app.services.spoolbuddy_ssh.settings") as mock_settings,
|
|
|
|
|
+ patch.dict(os.environ, {"GIT_BRANCH": "release"}),
|
|
|
|
|
+ ):
|
|
|
|
|
+ mock_settings.base_dir = tmp_path
|
|
|
|
|
+ assert detect_current_branch() == "release"
|
|
|
|
|
|
|
|
|
|
|
|
|
def test_detect_branch_env_fallback(tmp_path):
|
|
def test_detect_branch_env_fallback(tmp_path):
|
|
@@ -145,6 +180,12 @@ def test_detect_branch_default_main(tmp_path):
|
|
|
|
|
|
|
|
|
|
|
|
|
# -- _run_ssh_command ----------------------------------------------------------
|
|
# -- _run_ssh_command ----------------------------------------------------------
|
|
|
|
|
+#
|
|
|
|
|
+# _run_ssh_command uses asyncssh (pure Python) rather than the OpenSSH `ssh`
|
|
|
|
|
+# binary. Both `ssh` and `ssh-keygen` call getpwuid(getuid()) during startup
|
|
|
|
|
+# and abort with "No user exists for uid <N>" when the container runs under
|
|
|
|
|
+# an arbitrary PUID that is not listed in /etc/passwd — asyncssh avoids the
|
|
|
|
|
+# subprocess entirely.
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.asyncio
|
|
@@ -152,61 +193,121 @@ async def test_run_ssh_command_success(tmp_path):
|
|
|
key_file = tmp_path / "key"
|
|
key_file = tmp_path / "key"
|
|
|
key_file.write_text("KEY")
|
|
key_file.write_text("KEY")
|
|
|
|
|
|
|
|
- mock_proc = AsyncMock()
|
|
|
|
|
- mock_proc.communicate = AsyncMock(return_value=(b"hello\n", b""))
|
|
|
|
|
- mock_proc.returncode = 0
|
|
|
|
|
|
|
+ mock_result = MagicMock()
|
|
|
|
|
+ mock_result.stdout = "hello\n"
|
|
|
|
|
+ mock_result.stderr = ""
|
|
|
|
|
+ mock_result.exit_status = 0
|
|
|
|
|
+
|
|
|
|
|
+ mock_conn = AsyncMock()
|
|
|
|
|
+ mock_conn.run = AsyncMock(return_value=mock_result)
|
|
|
|
|
+ mock_conn.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
|
|
|
+ mock_conn.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
|
|
|
|
- with patch("asyncio.create_subprocess_exec", return_value=mock_proc) as mock_exec:
|
|
|
|
|
|
|
+ with patch("backend.app.services.spoolbuddy_ssh.asyncssh.connect", return_value=mock_conn) as mock_connect:
|
|
|
rc, stdout, stderr = await _run_ssh_command("10.0.0.1", "echo hello", key_file)
|
|
rc, stdout, stderr = await _run_ssh_command("10.0.0.1", "echo hello", key_file)
|
|
|
|
|
|
|
|
assert rc == 0
|
|
assert rc == 0
|
|
|
assert stdout == "hello\n"
|
|
assert stdout == "hello\n"
|
|
|
assert stderr == ""
|
|
assert stderr == ""
|
|
|
- args = mock_exec.call_args[0]
|
|
|
|
|
- assert "spoolbuddy@10.0.0.1" in args
|
|
|
|
|
- assert "echo hello" in args
|
|
|
|
|
- assert "BatchMode=yes" in args
|
|
|
|
|
|
|
+ kwargs = mock_connect.call_args.kwargs
|
|
|
|
|
+ assert kwargs["host"] == "10.0.0.1"
|
|
|
|
|
+ assert kwargs["username"] == "spoolbuddy"
|
|
|
|
|
+ assert kwargs["client_keys"] == [str(key_file)]
|
|
|
|
|
+ # Host-key verification is disabled (equivalent to StrictHostKeyChecking=no)
|
|
|
|
|
+ assert kwargs["known_hosts"] is None
|
|
|
|
|
+ mock_conn.run.assert_awaited_once()
|
|
|
|
|
+ run_args = mock_conn.run.call_args
|
|
|
|
|
+ assert run_args.args[0] == "echo hello"
|
|
|
|
|
+ # check=False — we handle non-zero exit codes ourselves
|
|
|
|
|
+ assert run_args.kwargs.get("check") is False
|
|
|
|
|
|
|
|
|
|
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.asyncio
|
|
|
-async def test_run_ssh_command_failure(tmp_path):
|
|
|
|
|
|
|
+async def test_run_ssh_command_no_subprocess(tmp_path):
|
|
|
|
|
+ """Regression guard: _run_ssh_command must not spawn any subprocess.
|
|
|
|
|
+
|
|
|
|
|
+ The whole point of switching to asyncssh is to avoid `ssh`/`ssh-keygen`
|
|
|
|
|
+ calling getpwuid() inside Docker containers with arbitrary PUIDs.
|
|
|
|
|
+ """
|
|
|
key_file = tmp_path / "key"
|
|
key_file = tmp_path / "key"
|
|
|
key_file.write_text("KEY")
|
|
key_file.write_text("KEY")
|
|
|
|
|
|
|
|
- mock_proc = AsyncMock()
|
|
|
|
|
- mock_proc.communicate = AsyncMock(return_value=(b"", b"Connection refused"))
|
|
|
|
|
- mock_proc.returncode = 255
|
|
|
|
|
|
|
+ mock_result = MagicMock()
|
|
|
|
|
+ mock_result.stdout = ""
|
|
|
|
|
+ mock_result.stderr = ""
|
|
|
|
|
+ mock_result.exit_status = 0
|
|
|
|
|
+
|
|
|
|
|
+ mock_conn = AsyncMock()
|
|
|
|
|
+ mock_conn.run = AsyncMock(return_value=mock_result)
|
|
|
|
|
+ mock_conn.__aenter__ = AsyncMock(return_value=mock_conn)
|
|
|
|
|
+ mock_conn.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
|
|
|
|
- with patch("asyncio.create_subprocess_exec", return_value=mock_proc):
|
|
|
|
|
|
|
+ with (
|
|
|
|
|
+ patch("backend.app.services.spoolbuddy_ssh.asyncssh.connect", return_value=mock_conn),
|
|
|
|
|
+ patch("asyncio.create_subprocess_exec") as mock_exec,
|
|
|
|
|
+ ):
|
|
|
|
|
+ await _run_ssh_command("10.0.0.1", "echo hi", key_file)
|
|
|
|
|
+
|
|
|
|
|
+ mock_exec.assert_not_called()
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
|
|
+@pytest.mark.asyncio
|
|
|
|
|
+async def test_run_ssh_command_connection_failure(tmp_path):
|
|
|
|
|
+ """Connection errors should surface as rc=255 with the asyncssh message."""
|
|
|
|
|
+ import asyncssh
|
|
|
|
|
+
|
|
|
|
|
+ key_file = tmp_path / "key"
|
|
|
|
|
+ key_file.write_text("KEY")
|
|
|
|
|
+
|
|
|
|
|
+ with patch(
|
|
|
|
|
+ "backend.app.services.spoolbuddy_ssh.asyncssh.connect",
|
|
|
|
|
+ side_effect=asyncssh.Error(code=0, reason="Connection refused"),
|
|
|
|
|
+ ):
|
|
|
rc, stdout, stderr = await _run_ssh_command("10.0.0.1", "echo hello", key_file)
|
|
rc, stdout, stderr = await _run_ssh_command("10.0.0.1", "echo hello", key_file)
|
|
|
|
|
|
|
|
assert rc == 255
|
|
assert rc == 255
|
|
|
|
|
+ assert stdout == ""
|
|
|
assert "Connection refused" in stderr
|
|
assert "Connection refused" in stderr
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
+@pytest.mark.asyncio
|
|
|
|
|
+async def test_run_ssh_command_os_error(tmp_path):
|
|
|
|
|
+ """OS-level connection errors (DNS, route) also map to rc=255."""
|
|
|
|
|
+ key_file = tmp_path / "key"
|
|
|
|
|
+ key_file.write_text("KEY")
|
|
|
|
|
+
|
|
|
|
|
+ with patch(
|
|
|
|
|
+ "backend.app.services.spoolbuddy_ssh.asyncssh.connect",
|
|
|
|
|
+ side_effect=OSError("Network is unreachable"),
|
|
|
|
|
+ ):
|
|
|
|
|
+ rc, _, stderr = await _run_ssh_command("10.0.0.1", "echo hello", key_file)
|
|
|
|
|
+
|
|
|
|
|
+ assert rc == 255
|
|
|
|
|
+ assert "Network is unreachable" in stderr
|
|
|
|
|
+
|
|
|
|
|
+
|
|
|
@pytest.mark.asyncio
|
|
@pytest.mark.asyncio
|
|
|
async def test_run_ssh_command_timeout(tmp_path):
|
|
async def test_run_ssh_command_timeout(tmp_path):
|
|
|
|
|
+ """asyncio.timeout should convert long-running commands into rc=-1."""
|
|
|
key_file = tmp_path / "key"
|
|
key_file = tmp_path / "key"
|
|
|
key_file.write_text("KEY")
|
|
key_file.write_text("KEY")
|
|
|
|
|
|
|
|
- mock_proc = AsyncMock()
|
|
|
|
|
- mock_proc.communicate = AsyncMock(return_value=(b"", b""))
|
|
|
|
|
- mock_proc.kill = MagicMock()
|
|
|
|
|
|
|
+ # asyncssh.connect() returns a _ConnectionManager synchronously; the hang
|
|
|
|
|
+ # must happen inside __aenter__ so the surrounding asyncio.timeout can
|
|
|
|
|
+ # cancel it.
|
|
|
|
|
+ mock_conn = AsyncMock()
|
|
|
|
|
|
|
|
- async def fake_wait_for(coro, timeout):
|
|
|
|
|
- # Consume the coroutine to avoid warning
|
|
|
|
|
- coro.close()
|
|
|
|
|
- raise TimeoutError
|
|
|
|
|
|
|
+ async def hang_enter():
|
|
|
|
|
+ await asyncio.sleep(10)
|
|
|
|
|
|
|
|
- with (
|
|
|
|
|
- patch("asyncio.create_subprocess_exec", return_value=mock_proc),
|
|
|
|
|
- patch("backend.app.services.spoolbuddy_ssh.asyncio.wait_for", side_effect=fake_wait_for),
|
|
|
|
|
- ):
|
|
|
|
|
- rc, stdout, stderr = await _run_ssh_command("10.0.0.1", "sleep 999", key_file, timeout=1)
|
|
|
|
|
|
|
+ mock_conn.__aenter__ = AsyncMock(side_effect=hang_enter)
|
|
|
|
|
+ mock_conn.__aexit__ = AsyncMock(return_value=False)
|
|
|
|
|
+
|
|
|
|
|
+ with patch("backend.app.services.spoolbuddy_ssh.asyncssh.connect", return_value=mock_conn):
|
|
|
|
|
+ rc, _, stderr = await _run_ssh_command("10.0.0.1", "sleep 999", key_file, timeout=0.05)
|
|
|
|
|
|
|
|
assert rc == -1
|
|
assert rc == -1
|
|
|
assert "timed out" in stderr
|
|
assert "timed out" in stderr
|
|
|
- mock_proc.kill.assert_called_once()
|
|
|
|
|
|
|
|
|
|
|
|
|
|
|
# -- perform_ssh_update --------------------------------------------------------
|
|
# -- perform_ssh_update --------------------------------------------------------
|