network_utils.py 3.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119
  1. """Network utility functions for interface detection."""
  2. import ipaddress
  3. import logging
  4. import socket
  5. import struct
  6. logger = logging.getLogger(__name__)
  7. # Interfaces to exclude from selection
  8. EXCLUDED_INTERFACE_PREFIXES = ("lo", "docker", "br-", "veth", "virbr")
  9. def get_network_interfaces() -> list[dict]:
  10. """Get all network interfaces with their IPs and subnets.
  11. Returns:
  12. List of dicts with name, ip, netmask, subnet, broadcast
  13. """
  14. interfaces = []
  15. try:
  16. import fcntl
  17. for iface in socket.if_nameindex():
  18. name = iface[1]
  19. # Skip excluded interfaces
  20. if any(name.startswith(prefix) for prefix in EXCLUDED_INTERFACE_PREFIXES):
  21. continue
  22. try:
  23. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  24. # Get IP address
  25. ip_bytes = fcntl.ioctl(
  26. s.fileno(),
  27. 0x8915, # SIOCGIFADDR
  28. struct.pack("256s", name[:15].encode()),
  29. )[20:24]
  30. ip = socket.inet_ntoa(ip_bytes)
  31. # Get netmask
  32. netmask_bytes = fcntl.ioctl(
  33. s.fileno(),
  34. 0x891B, # SIOCGIFNETMASK
  35. struct.pack("256s", name[:15].encode()),
  36. )[20:24]
  37. netmask = socket.inet_ntoa(netmask_bytes)
  38. # Calculate subnet
  39. network = ipaddress.IPv4Network(f"{ip}/{netmask}", strict=False)
  40. interfaces.append(
  41. {
  42. "name": name,
  43. "ip": ip,
  44. "netmask": netmask,
  45. "subnet": str(network),
  46. }
  47. )
  48. s.close()
  49. except OSError:
  50. # Interface doesn't have an IP or other error
  51. pass
  52. except Exception as e:
  53. logger.debug(f"Error getting info for interface {name}: {e}")
  54. except ImportError:
  55. # fcntl not available (Windows)
  56. logger.warning("fcntl not available, interface detection limited")
  57. except Exception as e:
  58. logger.error(f"Error enumerating interfaces: {e}")
  59. return interfaces
  60. def find_interface_for_ip(target_ip: str) -> dict | None:
  61. """Find which interface is on the same subnet as the target IP.
  62. Args:
  63. target_ip: IP address to find the matching interface for
  64. Returns:
  65. Interface dict or None if not found
  66. """
  67. try:
  68. target = ipaddress.IPv4Address(target_ip)
  69. except ValueError:
  70. logger.error(f"Invalid target IP: {target_ip}")
  71. return None
  72. interfaces = get_network_interfaces()
  73. for iface in interfaces:
  74. try:
  75. network = ipaddress.IPv4Network(iface["subnet"], strict=False)
  76. if target in network:
  77. logger.debug(f"Found interface {iface['name']} ({iface['ip']}) for target {target_ip}")
  78. return iface
  79. except ValueError:
  80. continue
  81. logger.warning(f"No interface found for target IP {target_ip}")
  82. return None
  83. def get_other_interfaces(exclude_ip: str) -> list[dict]:
  84. """Get all interfaces except the one with the given IP.
  85. Args:
  86. exclude_ip: IP address of interface to exclude
  87. Returns:
  88. List of interface dicts
  89. """
  90. interfaces = get_network_interfaces()
  91. return [iface for iface in interfaces if iface["ip"] != exclude_ip]