| 123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335 |
- """GitHub backend — implements GitProviderBackend using the GitHub Git Data API."""
- import base64
- import json
- import logging
- import re
- from datetime import datetime, timezone
- import httpx
- from backend.app.services.git_providers.base import GitProviderBackend
- logger = logging.getLogger(__name__)
- class GitHubBackend(GitProviderBackend):
- """Backend for github.com using the GitHub Git Data API."""
- def get_api_base(self, repo_url: str) -> str:
- m = re.match(r"https?://([\w.\-]+(:\d+)?)/", repo_url)
- if m:
- host = m.group(1)
- return "https://api.github.com" if host == "github.com" else f"https://{host}/api/v3"
- m = re.match(r"git@([\w.\-]+):", repo_url)
- if m:
- host = m.group(1)
- return "https://api.github.com" if host == "github.com" else f"https://{host}/api/v3"
- return "https://api.github.com"
- def parse_repo_url(self, url: str) -> tuple[str, str]:
- """Return (owner, repo) from a Git HTTPS or SSH URL."""
- if not url or len(url) > 500:
- raise ValueError("Invalid Git URL: URL too long or empty")
- # HTTPS: https://<host>[:<port>]/<owner>/<repo>[.git][/]
- match = re.match(
- r"https://[\w.\-]+(:\d+)?/([\w.\-]{1,100})/([\w.\-]{1,100})(?:\.git)?/?$",
- url,
- )
- if match:
- return match.group(2), match.group(3).removesuffix(".git")
- # SSH: git@<host>:<owner>/<repo>[.git]
- match = re.match(
- r"git@[\w.\-]+:([\w.\-]{1,100})/([\w.\-]{1,100})(?:\.git)?$",
- url,
- )
- if match:
- return match.group(1), match.group(2).removesuffix(".git")
- raise ValueError(f"Cannot parse repository URL: {url}")
- async def test_connection(self, repo_url: str, token: str, client: httpx.AsyncClient) -> dict:
- """Test API access and push permission for the repository."""
- try:
- owner, repo = self.parse_repo_url(repo_url)
- api_base = self.get_api_base(repo_url)
- headers = self.get_headers(token)
- response = await client.get(f"{api_base}/repos/{owner}/{repo}", headers=headers)
- if response.status_code == 401:
- return {"success": False, "message": "Invalid access token", "repo_name": None, "permissions": None}
- if response.status_code == 404:
- return {
- "success": False,
- "message": "Repository not found. Check URL and token permissions.",
- "repo_name": None,
- "permissions": None,
- }
- if response.status_code != 200:
- return {
- "success": False,
- "message": f"API error: {response.status_code}",
- "repo_name": None,
- "permissions": None,
- }
- data = response.json()
- permissions = data.get("permissions", {})
- if not permissions.get("push", False):
- return {
- "success": False,
- "message": "Token does not have push permission to this repository",
- "repo_name": data.get("full_name"),
- "permissions": permissions,
- }
- return {
- "success": True,
- "message": "Connection successful",
- "repo_name": data.get("full_name"),
- "permissions": permissions,
- }
- except Exception as e:
- logger.error("Git connection test failed: %s", e)
- return {
- "success": False,
- "message": f"Connection failed: {type(e).__name__}",
- "repo_name": None,
- "permissions": None,
- }
- async def push_files(
- self,
- repo_url: str,
- token: str,
- branch: str,
- files: dict,
- client: httpx.AsyncClient,
- ) -> dict:
- """Push files to the repository using the Git Data API."""
- try:
- owner, repo = self.parse_repo_url(repo_url)
- api_base = self.get_api_base(repo_url)
- headers = self.get_headers(token)
- ref_response = await client.get(f"{api_base}/repos/{owner}/{repo}/git/refs/heads/{branch}", headers=headers)
- if ref_response.status_code == 404:
- return await self._create_branch_and_push(
- client, headers, api_base, owner, repo, branch, files, repo_url, token
- )
- if ref_response.status_code != 200:
- return {
- "status": "failed",
- "message": f"Failed to get branch ref: {ref_response.status_code}",
- "error": self._truncated_response_text(ref_response),
- }
- current_commit_sha = ref_response.json()["object"]["sha"]
- commit_response = await client.get(
- f"{api_base}/repos/{owner}/{repo}/git/commits/{current_commit_sha}", headers=headers
- )
- if commit_response.status_code != 200:
- return {"status": "failed", "message": "Failed to get current commit"}
- current_tree_sha = commit_response.json()["tree"]["sha"]
- tree_response = await client.get(
- f"{api_base}/repos/{owner}/{repo}/git/trees/{current_tree_sha}?recursive=1", headers=headers
- )
- existing_files: dict[str, str] = {}
- if tree_response.status_code == 200:
- for item in tree_response.json().get("tree", []):
- if item["type"] == "blob":
- existing_files[item["path"]] = item["sha"]
- tree_items = []
- files_changed = 0
- for path, content in files.items():
- content_str = json.dumps(content, indent=2, default=str)
- content_bytes = content_str.encode("utf-8")
- content_sha = self._blob_sha(content_bytes)
- if path in existing_files and existing_files[path] == content_sha:
- continue
- blob_response = await client.post(
- f"{api_base}/repos/{owner}/{repo}/git/blobs",
- headers=headers,
- json={"content": base64.b64encode(content_bytes).decode(), "encoding": "base64"},
- )
- if blob_response.status_code != 201:
- logger.error("Failed to create blob for %s: %s", path, self._truncated_response_text(blob_response))
- continue
- tree_items.append({"path": path, "mode": "100644", "type": "blob", "sha": blob_response.json()["sha"]})
- files_changed += 1
- if not tree_items:
- return {"status": "skipped", "message": "No changes to commit", "commit_sha": None, "files_changed": 0}
- tree_response = await client.post(
- f"{api_base}/repos/{owner}/{repo}/git/trees",
- headers=headers,
- json={"base_tree": current_tree_sha, "tree": tree_items},
- )
- if tree_response.status_code != 201:
- return {
- "status": "failed",
- "message": f"Failed to create tree: {self._truncated_response_text(tree_response)}",
- }
- new_tree_sha = tree_response.json()["sha"]
- commit_message = f"Bambuddy backup - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}"
- commit_response = await client.post(
- f"{api_base}/repos/{owner}/{repo}/git/commits",
- headers=headers,
- json={"message": commit_message, "tree": new_tree_sha, "parents": [current_commit_sha]},
- )
- if commit_response.status_code != 201:
- return {
- "status": "failed",
- "message": f"Failed to create commit: {self._truncated_response_text(commit_response)}",
- }
- new_commit_sha = commit_response.json()["sha"]
- ref_update = await client.patch(
- f"{api_base}/repos/{owner}/{repo}/git/refs/heads/{branch}",
- headers=headers,
- json={"sha": new_commit_sha},
- )
- if ref_update.status_code != 200:
- return {
- "status": "failed",
- "message": f"Failed to update branch: {self._truncated_response_text(ref_update)}",
- }
- return {
- "status": "success",
- "message": f"Backup successful - {files_changed} files updated",
- "commit_sha": new_commit_sha,
- "files_changed": files_changed,
- }
- except Exception as e:
- logger.error("Push to Git failed: %s", e)
- return {"status": "failed", "message": str(e), "error": str(e)}
- async def _create_branch_and_push(
- self,
- client: httpx.AsyncClient,
- headers: dict,
- api_base: str,
- owner: str,
- repo: str,
- branch: str,
- files: dict,
- repo_url: str,
- token: str,
- ) -> dict:
- """Create branch (from default branch or as initial commit) then push."""
- try:
- repo_response = await client.get(f"{api_base}/repos/{owner}/{repo}", headers=headers)
- if repo_response.status_code != 200:
- return {"status": "failed", "message": "Failed to get repo info"}
- default_branch = repo_response.json().get("default_branch", "main")
- ref_response = await client.get(
- f"{api_base}/repos/{owner}/{repo}/git/refs/heads/{default_branch}", headers=headers
- )
- if ref_response.status_code != 200:
- return await self._create_initial_commit(client, headers, api_base, owner, repo, branch, files)
- base_sha = ref_response.json()["object"]["sha"]
- create_ref = await client.post(
- f"{api_base}/repos/{owner}/{repo}/git/refs",
- headers=headers,
- json={"ref": f"refs/heads/{branch}", "sha": base_sha},
- )
- if create_ref.status_code != 201:
- return {
- "status": "failed",
- "message": f"Failed to create branch: {self._truncated_response_text(create_ref)}",
- }
- return await self.push_files(repo_url, token, branch, files, client)
- except Exception as e:
- return {"status": "failed", "message": str(e)}
- async def _create_initial_commit(
- self,
- client: httpx.AsyncClient,
- headers: dict,
- api_base: str,
- owner: str,
- repo: str,
- branch: str,
- files: dict,
- ) -> dict:
- """Create the first commit in an empty repository."""
- try:
- tree_items = []
- for path, content in files.items():
- content_str = json.dumps(content, indent=2, default=str)
- blob_response = await client.post(
- f"{api_base}/repos/{owner}/{repo}/git/blobs",
- headers=headers,
- json={"content": base64.b64encode(content_str.encode()).decode(), "encoding": "base64"},
- )
- if blob_response.status_code == 201:
- tree_items.append(
- {"path": path, "mode": "100644", "type": "blob", "sha": blob_response.json()["sha"]}
- )
- tree_response = await client.post(
- f"{api_base}/repos/{owner}/{repo}/git/trees",
- headers=headers,
- json={"tree": tree_items},
- )
- if tree_response.status_code != 201:
- return {"status": "failed", "message": "Failed to create tree"}
- tree_sha = tree_response.json()["sha"]
- commit_response = await client.post(
- f"{api_base}/repos/{owner}/{repo}/git/commits",
- headers=headers,
- json={
- "message": f"Initial Bambuddy backup - {datetime.now(timezone.utc).strftime('%Y-%m-%d %H:%M:%S UTC')}",
- "tree": tree_sha,
- },
- )
- if commit_response.status_code != 201:
- return {"status": "failed", "message": "Failed to create commit"}
- commit_sha = commit_response.json()["sha"]
- ref_response = await client.post(
- f"{api_base}/repos/{owner}/{repo}/git/refs",
- headers=headers,
- json={"ref": f"refs/heads/{branch}", "sha": commit_sha},
- )
- if ref_response.status_code != 201:
- return {"status": "failed", "message": "Failed to create branch ref"}
- return {
- "status": "success",
- "message": f"Initial backup created - {len(files)} files",
- "commit_sha": commit_sha,
- "files_changed": len(files),
- }
- except Exception as e:
- return {"status": "failed", "message": str(e)}
|