|
@@ -4,6 +4,7 @@ import asyncio
|
|
|
import os
|
|
import os
|
|
|
import platform
|
|
import platform
|
|
|
import time
|
|
import time
|
|
|
|
|
+from collections.abc import Callable
|
|
|
from datetime import datetime
|
|
from datetime import datetime
|
|
|
from pathlib import Path
|
|
from pathlib import Path
|
|
|
|
|
|
|
@@ -116,13 +117,6 @@ def _get_data_dirs() -> list[Path]:
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _get_storage_scan_roots() -> list[Path]:
|
|
|
|
|
- roots = []
|
|
|
|
|
- for path in _get_data_dirs():
|
|
|
|
|
- roots.append(path)
|
|
|
|
|
- return roots
|
|
|
|
|
-
|
|
|
|
|
-
|
|
|
|
|
def _is_system_path(path: Path) -> bool:
|
|
def _is_system_path(path: Path) -> bool:
|
|
|
app_dir = _get_app_dir()
|
|
app_dir = _get_app_dir()
|
|
|
if not _is_under(path, app_dir):
|
|
if not _is_under(path, app_dir):
|
|
@@ -130,7 +124,7 @@ def _is_system_path(path: Path) -> bool:
|
|
|
return all(not _is_under(path, data_dir) for data_dir in _get_data_dirs())
|
|
return all(not _is_under(path, data_dir) for data_dir in _get_data_dirs())
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _get_storage_rules() -> list[tuple[str, str, callable]]:
|
|
|
|
|
|
|
+def _get_storage_rules() -> list[tuple[str, str, Callable]]:
|
|
|
base_dir = settings.base_dir
|
|
base_dir = settings.base_dir
|
|
|
archive_dir = settings.archive_dir
|
|
archive_dir = settings.archive_dir
|
|
|
library_dir = archive_dir / "library"
|
|
library_dir = archive_dir / "library"
|
|
@@ -213,7 +207,7 @@ def _get_storage_rules() -> list[tuple[str, str, callable]]:
|
|
|
]
|
|
]
|
|
|
|
|
|
|
|
|
|
|
|
|
-def _classify_file(path: Path, rules: list[tuple[str, str, callable]]) -> tuple[str, str]:
|
|
|
|
|
|
|
+def _classify_file(path: Path, rules: list[tuple[str, str, Callable]]) -> tuple[str, str]:
|
|
|
for key, label, matcher in rules:
|
|
for key, label, matcher in rules:
|
|
|
try:
|
|
try:
|
|
|
if matcher(path):
|
|
if matcher(path):
|
|
@@ -236,11 +230,7 @@ def _get_other_bucket(path: Path, base_dir: Path) -> str:
|
|
|
return path.parent.name or path.name
|
|
return path.parent.name or path.name
|
|
|
|
|
|
|
|
parts = relative.parts
|
|
parts = relative.parts
|
|
|
- if len(parts) > 1:
|
|
|
|
|
- return parts[0]
|
|
|
|
|
- if parts:
|
|
|
|
|
- return parts[0]
|
|
|
|
|
- return path.name
|
|
|
|
|
|
|
+ return parts[0] if parts else path.name
|
|
|
|
|
|
|
|
|
|
|
|
|
def _walk_files(roots: list[Path]) -> list[Path]:
|
|
def _walk_files(roots: list[Path]) -> list[Path]:
|
|
@@ -269,7 +259,7 @@ def _scan_storage_usage() -> dict:
|
|
|
base_dir = settings.base_dir
|
|
base_dir = settings.base_dir
|
|
|
rules = _get_storage_rules()
|
|
rules = _get_storage_rules()
|
|
|
|
|
|
|
|
- roots = _get_storage_scan_roots()
|
|
|
|
|
|
|
+ roots = _get_data_dirs()
|
|
|
|
|
|
|
|
seen_roots = set()
|
|
seen_roots = set()
|
|
|
unique_roots = []
|
|
unique_roots = []
|