| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257 |
- """GitLab backend — implements GitProviderBackend using the GitLab REST API v4."""
- import base64
- import json
- import logging
- import re
- import urllib.parse
- from datetime import datetime, timezone
- import httpx
- from backend.app.services.git_providers.base import GitProviderBackend
- logger = logging.getLogger(__name__)
- class GitLabBackend(GitProviderBackend):
- """Backend for gitlab.com and self-hosted GitLab instances."""
- def get_api_base(self, repo_url: str) -> str:
- match = re.match(r"(https?://[\w.\-]+(:\d+)?)/", repo_url)
- if not match:
- raise ValueError(f"Cannot derive API base from URL: {repo_url}")
- return f"{match.group(1)}/api/v4"
- def get_headers(self, token: str) -> dict:
- return {"Authorization": f"Bearer {token}", "Content-Type": "application/json"}
- def parse_repo_url(self, url: str) -> tuple[str, str]:
- """Return (namespace, repo) from HTTPS or SSH URL.
- namespace may include subgroups, e.g. 'group/subgroup' for
- gitlab.com/group/subgroup/project. Callers join them with '/' and
- URL-encode the result for /api/v4/projects/{encoded_path}.
- """
- if not url or len(url) > 500:
- raise ValueError("Invalid Git URL: URL too long or empty")
- match = re.match(r"https?://[\w.\-]+(:\d+)?/(.+?)(?:\.git)?/?$", url)
- if match:
- full_path = match.group(2)
- if "/" not in full_path:
- raise ValueError(f"Cannot parse repository URL: {url}")
- namespace, _, repo = full_path.rpartition("/")
- return namespace, repo
- match = re.match(r"git@[\w.\-]+:(.+?)(?:\.git)?$", url)
- if match:
- full_path = match.group(1)
- if "/" not in full_path:
- raise ValueError(f"Cannot parse repository URL: {url}")
- namespace, _, repo = full_path.rpartition("/")
- return namespace, repo
- raise ValueError(f"Cannot parse repository URL: {url}")
- async def test_connection(self, repo_url: str, token: str, client: httpx.AsyncClient) -> dict:
- try:
- owner, repo = self.parse_repo_url(repo_url)
- api_base = self.get_api_base(repo_url)
- headers = self.get_headers(token)
- encoded_path = urllib.parse.quote(f"{owner}/{repo}", safe="")
- response = await client.get(f"{api_base}/projects/{encoded_path}", headers=headers)
- if response.status_code == 401:
- return {"success": False, "message": "Invalid access token", "repo_name": None, "permissions": None}
- if response.status_code == 404:
- return {
- "success": False,
- "message": "Repository not found. Check URL and token permissions.",
- "repo_name": None,
- "permissions": None,
- }
- if response.status_code != 200:
- return {
- "success": False,
- "message": f"API error: {response.status_code}",
- "repo_name": None,
- "permissions": None,
- }
- data = response.json()
- perms = data.get("permissions") or {}
- project_level = (perms.get("project_access") or {}).get("access_level", 0)
- group_level = (perms.get("group_access") or {}).get("access_level", 0)
- effective = max(project_level, group_level)
- if effective < 30: # Developer = 30, Maintainer = 40, Owner = 50
- return {
- "success": False,
- "message": "Token requires Developer access or higher to push",
- "repo_name": data.get("name_with_namespace"),
- "permissions": perms,
- }
- return {
- "success": True,
- "message": "Connection successful",
- "repo_name": data.get("name_with_namespace"),
- "permissions": perms,
- }
- except Exception as e:
- logger.error("GitLab connection test failed: %s", e)
- return {
- "success": False,
- "message": f"Connection failed: {type(e).__name__}",
- "repo_name": None,
- "permissions": None,
- }
- async def push_files(
- self,
- repo_url: str,
- token: str,
- branch: str,
- files: dict,
- client: httpx.AsyncClient,
- ) -> dict:
- try:
- owner, repo = self.parse_repo_url(repo_url)
- api_base = self.get_api_base(repo_url)
- headers = self.get_headers(token)
- encoded_path = urllib.parse.quote(f"{owner}/{repo}", safe="")
- encoded_branch = urllib.parse.quote(branch, safe="")
- branch_response = await client.get(
- f"{api_base}/projects/{encoded_path}/repository/branches/{encoded_branch}",
- headers=headers,
- )
- if branch_response.status_code == 404:
- proj_response = await client.get(f"{api_base}/projects/{encoded_path}", headers=headers)
- if proj_response.status_code != 200:
- return {"status": "failed", "message": "Failed to get project info"}
- default_branch = proj_response.json().get("default_branch", "main")
- default_encoded = urllib.parse.quote(default_branch, safe="")
- default_response = await client.get(
- f"{api_base}/projects/{encoded_path}/repository/branches/{default_encoded}",
- headers=headers,
- )
- if default_response.status_code != 200:
- return await self._create_initial_commit(client, headers, api_base, encoded_path, branch, files)
- create_response = await client.post(
- f"{api_base}/projects/{encoded_path}/repository/branches",
- headers=headers,
- json={"branch": branch, "ref": default_branch},
- )
- if create_response.status_code not in (200, 201):
- return {"status": "failed", "message": f"Failed to create branch: {create_response.status_code}"}
- elif branch_response.status_code != 200:
- return {"status": "failed", "message": f"Failed to check branch: {branch_response.status_code}"}
- existing_blobs: dict[str, str] = {}
- page = 1
- while True:
- tree_response = await client.get(
- f"{api_base}/projects/{encoded_path}/repository/tree",
- headers=headers,
- params={"recursive": "true", "ref": branch, "per_page": 100, "page": page},
- )
- if tree_response.status_code != 200:
- break
- items = tree_response.json()
- if not items:
- break
- for item in items:
- if item.get("type") == "blob":
- existing_blobs[item["path"]] = item["id"]
- page += 1
- actions = []
- for path, content in files.items():
- content_str = json.dumps(content, indent=2, default=str)
- content_bytes = content_str.encode("utf-8")
- content_sha = self._blob_sha(content_bytes)
- if path in existing_blobs and existing_blobs[path] == content_sha:
- continue
- actions.append(
- {
- "action": "update" if path in existing_blobs else "create",
- "file_path": path,
- "content": base64.b64encode(content_bytes).decode(),
- "encoding": "base64",
- }
- )
- if not actions:
- return {"status": "skipped", "message": "No changes to commit", "commit_sha": None, "files_changed": 0}
- commit_message = f"Bambuddy backup - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}"
- commit_response = await client.post(
- f"{api_base}/projects/{encoded_path}/repository/commits",
- headers=headers,
- json={"branch": branch, "commit_message": commit_message, "actions": actions},
- )
- if commit_response.status_code not in (200, 201):
- return {
- "status": "failed",
- "message": f"Failed to create commit: {self._truncated_response_text(commit_response)}",
- }
- return {
- "status": "success",
- "message": f"Backup successful - {len(actions)} files updated",
- "commit_sha": commit_response.json().get("id"),
- "files_changed": len(actions),
- }
- except Exception as e:
- logger.error("Push to GitLab failed: %s", e)
- return {"status": "failed", "message": str(e), "error": str(e)}
- async def _create_initial_commit(
- self,
- client: httpx.AsyncClient,
- headers: dict,
- api_base: str,
- encoded_path: str,
- branch: str,
- files: dict,
- ) -> dict:
- """Create the first commit in an empty repository."""
- try:
- actions = []
- for path, content in files.items():
- content_str = json.dumps(content, indent=2, default=str)
- actions.append(
- {
- "action": "create",
- "file_path": path,
- "content": base64.b64encode(content_str.encode()).decode(),
- "encoding": "base64",
- }
- )
- commit_message = f"Initial Bambuddy backup - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}"
- commit_response = await client.post(
- f"{api_base}/projects/{encoded_path}/repository/commits",
- headers=headers,
- json={"branch": branch, "commit_message": commit_message, "actions": actions, "start_branch": branch},
- )
- if commit_response.status_code not in (200, 201):
- return {
- "status": "failed",
- "message": f"Failed to create initial commit: {self._truncated_response_text(commit_response)}",
- }
- return {
- "status": "success",
- "message": f"Initial backup created - {len(files)} files",
- "commit_sha": commit_response.json().get("id"),
- "files_changed": len(files),
- }
- except Exception as e:
- return {"status": "failed", "message": str(e)}
|