Browse Source

Remove 30 redundant function-level imports

These modules were already imported at the top of each file.
Removes re-imports of re, json, zipfile, and logging from
inside functions in archive.py, library.py, main.py,
printers.py, support.py, and test_library_api.py.

Resolves all 30 CodeQL py/repeated-import findings.
maziggy 3 months ago
parent
commit
5dcabbdda8

+ 0 - 11
backend/app/api/routes/library.py

@@ -851,7 +851,6 @@ async def extract_zip_file(
         generate_stl_thumbnails: If True, generate thumbnails for STL files
         generate_stl_thumbnails: If True, generate thumbnails for STL files
     """
     """
     import tempfile
     import tempfile
-    import zipfile
 
 
     if not file.filename or not file.filename.lower().endswith(".zip"):
     if not file.filename or not file.filename.lower().endswith(".zip"):
         raise HTTPException(status_code=400, detail="Only ZIP files are supported")
         raise HTTPException(status_code=400, detail="Only ZIP files are supported")
@@ -1313,8 +1312,6 @@ async def get_library_file_plates(
     and filament requirements. For single-plate exports, returns a single plate.
     and filament requirements. For single-plate exports, returns a single plate.
     """
     """
     import json
     import json
-    import re
-    import zipfile
 
 
     import defusedxml.ElementTree as ET
     import defusedxml.ElementTree as ET
 
 
@@ -1574,8 +1571,6 @@ async def get_library_file_plate_thumbnail(
     db: AsyncSession = Depends(get_db),
     db: AsyncSession = Depends(get_db),
 ):
 ):
     """Get the thumbnail image for a specific plate from a library file."""
     """Get the thumbnail image for a specific plate from a library file."""
-    import zipfile
-
     from starlette.responses import Response
     from starlette.responses import Response
 
 
     result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
     result = await db.execute(select(LibraryFile).where(LibraryFile.id == file_id))
@@ -1616,8 +1611,6 @@ async def get_library_file_filament_requirements(
         file_id: The library file ID
         file_id: The library file ID
         plate_id: Optional plate index to get filaments for a specific plate
         plate_id: Optional plate index to get filaments for a specific plate
     """
     """
-    import zipfile
-
     import defusedxml.ElementTree as ET
     import defusedxml.ElementTree as ET
 
 
     # Get the library file
     # Get the library file
@@ -1746,8 +1739,6 @@ async def print_library_file(
 
 
     Only sliced files (.gcode or .gcode.3mf) can be printed.
     Only sliced files (.gcode or .gcode.3mf) can be printed.
     """
     """
-    import zipfile
-
     from backend.app.main import register_expected_print
     from backend.app.main import register_expected_print
     from backend.app.models.printer import Printer
     from backend.app.models.printer import Printer
     from backend.app.services.bambu_ftp import (
     from backend.app.services.bambu_ftp import (
@@ -2188,8 +2179,6 @@ async def get_gcode(
         return FastAPIFileResponse(str(abs_path), media_type="text/plain")
         return FastAPIFileResponse(str(abs_path), media_type="text/plain")
     elif file.file_type == "3mf":
     elif file.file_type == "3mf":
         # Extract gcode from 3mf
         # Extract gcode from 3mf
-        import zipfile
-
         try:
         try:
             with zipfile.ZipFile(str(abs_path), "r") as zf:
             with zipfile.ZipFile(str(abs_path), "r") as zf:
                 # Find gcode file
                 # Find gcode file

+ 0 - 4
backend/app/api/routes/printers.py

@@ -852,7 +852,6 @@ async def get_printer_file_plates(
     """Get available plates from a multi-plate 3MF file stored on a printer."""
     """Get available plates from a multi-plate 3MF file stored on a printer."""
     import io
     import io
     import json
     import json
-    import zipfile
 
 
     import defusedxml.ElementTree as ET
     import defusedxml.ElementTree as ET
 
 
@@ -1097,7 +1096,6 @@ async def get_printer_file_plate_thumbnail(
 ):
 ):
     """Get a plate thumbnail image from a printer-stored 3MF file."""
     """Get a plate thumbnail image from a printer-stored 3MF file."""
     import io
     import io
-    import zipfile
 
 
     result = await db.execute(select(Printer).where(Printer.id == printer_id))
     result = await db.execute(select(Printer).where(Printer.id == printer_id))
     printer = result.scalar_one_or_none()
     printer = result.scalar_one_or_none()
@@ -1594,8 +1592,6 @@ async def configure_ams_slot(
         kprofile_filament_id: K profile's filament_id for proper K profile linking
         kprofile_filament_id: K profile's filament_id for proper K profile linking
         k_value: Direct K value to set (0.0 to skip direct K value setting)
         k_value: Direct K value to set (0.0 to skip direct K value setting)
     """
     """
-    import logging
-
     logger = logging.getLogger(__name__)
     logger = logging.getLogger(__name__)
     logger.info("[configure_ams_slot] printer_id=%s, ams_id=%s, tray_id=%s", printer_id, ams_id, tray_id)
     logger.info("[configure_ams_slot] printer_id=%s, ams_id=%s, tray_id=%s", printer_id, ams_id, tray_id)
     logger.info(
     logger.info(

+ 0 - 2
backend/app/api/routes/support.py

@@ -413,8 +413,6 @@ async def _collect_support_info() -> dict:
 
 
 def _sanitize_log_content(content: str) -> str:
 def _sanitize_log_content(content: str) -> str:
     """Remove sensitive data from log content."""
     """Remove sensitive data from log content."""
-    import re
-
     # Replace IP addresses with [IP]
     # Replace IP addresses with [IP]
     content = re.sub(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", "[IP]", content)
     content = re.sub(r"\b\d{1,3}\.\d{1,3}\.\d{1,3}\.\d{1,3}\b", "[IP]", content)
 
 

+ 0 - 17
backend/app/main.py

@@ -495,8 +495,6 @@ async def on_printer_status_change(printer_id: int, state: PrinterState):
 
 
 async def on_ams_change(printer_id: int, ams_data: list):
 async def on_ams_change(printer_id: int, ams_data: list):
     """Handle AMS data changes - sync to Spoolman if enabled and auto mode."""
     """Handle AMS data changes - sync to Spoolman if enabled and auto mode."""
-    import logging
-
     logger = logging.getLogger(__name__)
     logger = logging.getLogger(__name__)
 
 
     # MQTT relay - publish AMS change
     # MQTT relay - publish AMS change
@@ -581,8 +579,6 @@ async def on_ams_change(printer_id: int, ams_data: list):
                 logger.info("Auto-synced %s AMS trays to Spoolman for printer %s", synced, printer_id)
                 logger.info("Auto-synced %s AMS trays to Spoolman for printer %s", synced, printer_id)
 
 
     except Exception as e:
     except Exception as e:
-        import logging
-
         logging.getLogger(__name__).warning(f"Spoolman AMS sync failed: {e}")
         logging.getLogger(__name__).warning(f"Spoolman AMS sync failed: {e}")
 
 
 
 
@@ -651,8 +647,6 @@ async def _send_print_start_notification(
 ):
 ):
     """Helper to send print start notification with optional archive data."""
     """Helper to send print start notification with optional archive data."""
     if logger is None:
     if logger is None:
-        import logging
-
         logger = logging.getLogger(__name__)
         logger = logging.getLogger(__name__)
 
 
     try:
     try:
@@ -699,8 +693,6 @@ def _load_objects_from_archive(archive, printer_id: int, logger) -> None:
 
 
 async def on_print_start(printer_id: int, data: dict):
 async def on_print_start(printer_id: int, data: dict):
     """Handle print start - archive the 3MF file immediately."""
     """Handle print start - archive the 3MF file immediately."""
-    import logging
-
     logger = logging.getLogger(__name__)
     logger = logging.getLogger(__name__)
 
 
     logger.info("[CALLBACK] on_print_start called for printer %s, data keys: %s", printer_id, list(data.keys()))
     logger.info("[CALLBACK] on_print_start called for printer %s, data keys: %s", printer_id, list(data.keys()))
@@ -1378,8 +1370,6 @@ async def _scan_for_timelapse_with_retries(archive_id: int):
     Since we KNOW timelapse was active (from MQTT ipcam data), the most recent
     Since we KNOW timelapse was active (from MQTT ipcam data), the most recent
     file in /timelapse is our target. Retries handle FTP connection issues.
     file in /timelapse is our target. Retries handle FTP connection issues.
     """
     """
-    import logging
-
     logger = logging.getLogger(__name__)
     logger = logging.getLogger(__name__)
 
 
     # Short delays - printer usually finishes encoding within seconds
     # Short delays - printer usually finishes encoding within seconds
@@ -1487,7 +1477,6 @@ async def _scan_for_timelapse_with_retries(archive_id: int):
 
 
 async def on_print_complete(printer_id: int, data: dict):
 async def on_print_complete(printer_id: int, data: dict):
     """Handle print completion - update the archive status."""
     """Handle print completion - update the archive status."""
-    import logging
     import time
     import time
 
 
     logger = logging.getLogger(__name__)
     logger = logging.getLogger(__name__)
@@ -2141,8 +2130,6 @@ async def on_print_complete(printer_id: int, data: dict):
 
 
                         asyncio.create_task(cooldown_and_poweroff(printer_id, plug.id))
                         asyncio.create_task(cooldown_and_poweroff(printer_id, plug.id))
     except Exception as e:
     except Exception as e:
-        import logging
-
         logging.getLogger(__name__).warning(f"Queue item update failed: {e}")
         logging.getLogger(__name__).warning(f"Queue item update failed: {e}")
 
 
     log_timing("Queue item update")
     log_timing("Queue item update")
@@ -2160,8 +2147,6 @@ AMS_ALARM_COOLDOWN_MINUTES = 60  # Don't send same alarm more than once per hour
 
 
 async def record_ams_history():
 async def record_ams_history():
     """Background task to record AMS humidity and temperature data."""
     """Background task to record AMS humidity and temperature data."""
-    import logging
-
     logger = logging.getLogger(__name__)
     logger = logging.getLogger(__name__)
 
 
     # Wait a short time for MQTT connections to establish on startup
     # Wait a short time for MQTT connections to establish on startup
@@ -2367,8 +2352,6 @@ RUNTIME_TRACKING_INTERVAL = 30  # Update every 30 seconds
 
 
 async def track_printer_runtime():
 async def track_printer_runtime():
     """Background task to track printer active runtime (RUNNING/PAUSE states)."""
     """Background task to track printer active runtime (RUNNING/PAUSE states)."""
-    import logging
-
     logger = logging.getLogger(__name__)
     logger = logging.getLogger(__name__)
 
 
     # Wait for MQTT connections to establish on startup
     # Wait for MQTT connections to establish on startup

+ 0 - 7
backend/app/services/archive.py

@@ -171,8 +171,6 @@ class ThreeMFParser:
 
 
     def _parse_gcode_header(self, zf: zipfile.ZipFile):
     def _parse_gcode_header(self, zf: zipfile.ZipFile):
         """Parse G-code file header for total layer count and printer model."""
         """Parse G-code file header for total layer count and printer model."""
-        import re
-
         try:
         try:
             # Look for plate_1.gcode or similar
             # Look for plate_1.gcode or similar
             gcode_files = [f for f in zf.namelist() if f.endswith(".gcode")]
             gcode_files = [f for f in zf.namelist() if f.endswith(".gcode")]
@@ -315,8 +313,6 @@ class ThreeMFParser:
 
 
     def _parse_3dmodel(self, zf: zipfile.ZipFile):
     def _parse_3dmodel(self, zf: zipfile.ZipFile):
         """Parse 3D/3dmodel.model for MakerWorld metadata."""
         """Parse 3D/3dmodel.model for MakerWorld metadata."""
-        import re
-
         try:
         try:
             model_path = "3D/3dmodel.model"
             model_path = "3D/3dmodel.model"
             if model_path not in zf.namelist():
             if model_path not in zf.namelist():
@@ -404,7 +400,6 @@ def extract_printable_objects_from_3mf(
         If include_positions=False: Dictionary mapping identify_id (int) to object name (str)
         If include_positions=False: Dictionary mapping identify_id (int) to object name (str)
         If include_positions=True: Tuple of (dict mapping identify_id to {name, x, y}, bbox_all list or None)
         If include_positions=True: Tuple of (dict mapping identify_id to {name, x, y}, bbox_all list or None)
     """
     """
-    import json
     from io import BytesIO
     from io import BytesIO
 
 
     printable_objects: dict = {}
     printable_objects: dict = {}
@@ -500,7 +495,6 @@ class ProjectPageParser:
     def parse(self, archive_id: int) -> dict:
     def parse(self, archive_id: int) -> dict:
         """Extract project page metadata and images from 3MF file."""
         """Extract project page metadata and images from 3MF file."""
         import html
         import html
-        import re
 
 
         result = {
         result = {
             "title": None,
             "title": None,
@@ -643,7 +637,6 @@ class ProjectPageParser:
             True if successful, False otherwise.
             True if successful, False otherwise.
         """
         """
         import html
         import html
-        import re
         import tempfile
         import tempfile
 
 
         try:
         try:

+ 0 - 5
backend/tests/integration/test_library_api.py

@@ -435,7 +435,6 @@ class TestLibraryZipExtractAPI:
     async def test_extract_zip_basic(self, async_client: AsyncClient, db_session):
     async def test_extract_zip_basic(self, async_client: AsyncClient, db_session):
         """Verify basic ZIP extraction works."""
         """Verify basic ZIP extraction works."""
         import io
         import io
-        import zipfile
 
 
         # Create a simple ZIP file in memory
         # Create a simple ZIP file in memory
         zip_buffer = io.BytesIO()
         zip_buffer = io.BytesIO()
@@ -457,7 +456,6 @@ class TestLibraryZipExtractAPI:
     async def test_extract_zip_with_folders(self, async_client: AsyncClient, db_session):
     async def test_extract_zip_with_folders(self, async_client: AsyncClient, db_session):
         """Verify ZIP extraction preserves folder structure."""
         """Verify ZIP extraction preserves folder structure."""
         import io
         import io
-        import zipfile
 
 
         # Create a ZIP file with folder structure
         # Create a ZIP file with folder structure
         zip_buffer = io.BytesIO()
         zip_buffer = io.BytesIO()
@@ -480,7 +478,6 @@ class TestLibraryZipExtractAPI:
     async def test_extract_zip_flat(self, async_client: AsyncClient, db_session):
     async def test_extract_zip_flat(self, async_client: AsyncClient, db_session):
         """Verify ZIP extraction can extract flat (no folders)."""
         """Verify ZIP extraction can extract flat (no folders)."""
         import io
         import io
-        import zipfile
 
 
         # Create a ZIP file with folder structure
         # Create a ZIP file with folder structure
         zip_buffer = io.BytesIO()
         zip_buffer = io.BytesIO()
@@ -502,7 +499,6 @@ class TestLibraryZipExtractAPI:
     async def test_extract_zip_skips_macos_files(self, async_client: AsyncClient, db_session):
     async def test_extract_zip_skips_macos_files(self, async_client: AsyncClient, db_session):
         """Verify ZIP extraction skips __MACOSX and hidden files."""
         """Verify ZIP extraction skips __MACOSX and hidden files."""
         import io
         import io
-        import zipfile
 
 
         # Create a ZIP file with macOS junk files
         # Create a ZIP file with macOS junk files
         zip_buffer = io.BytesIO()
         zip_buffer = io.BytesIO()
@@ -524,7 +520,6 @@ class TestLibraryZipExtractAPI:
     async def test_extract_zip_create_folder_from_zip(self, async_client: AsyncClient, db_session):
     async def test_extract_zip_create_folder_from_zip(self, async_client: AsyncClient, db_session):
         """Verify ZIP extraction creates a folder from the ZIP filename."""
         """Verify ZIP extraction creates a folder from the ZIP filename."""
         import io
         import io
-        import zipfile
 
 
         # Create a ZIP file with some files
         # Create a ZIP file with some files
         zip_buffer = io.BytesIO()
         zip_buffer = io.BytesIO()