network_utils.py 6.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214
  1. """Network utility functions for interface detection."""
  2. import ipaddress
  3. import json
  4. import logging
  5. import shutil
  6. import socket
  7. import struct
  8. import subprocess
  9. logger = logging.getLogger(__name__)
  10. # Interfaces to exclude from selection
  11. EXCLUDED_INTERFACE_PREFIXES = ("lo", "docker", "br-", "veth", "virbr")
  12. # Resolve full path to `ip` command (may not be in PATH for service users)
  13. _IP_CMD: str | None = shutil.which("ip") or shutil.which("ip", path="/usr/sbin:/sbin:/usr/bin:/bin")
  14. def _is_excluded(name: str) -> bool:
  15. """Check if an interface name should be excluded."""
  16. return any(name.startswith(prefix) for prefix in EXCLUDED_INTERFACE_PREFIXES)
  17. def get_network_interfaces() -> list[dict]:
  18. """Get all network interfaces with their IPs and subnets.
  19. Returns:
  20. List of dicts with name, ip, netmask, subnet, broadcast
  21. """
  22. interfaces = []
  23. try:
  24. import fcntl
  25. for iface in socket.if_nameindex():
  26. name = iface[1]
  27. # Skip excluded interfaces
  28. if _is_excluded(name):
  29. continue
  30. try:
  31. s = socket.socket(socket.AF_INET, socket.SOCK_DGRAM)
  32. # Get IP address
  33. ip_bytes = fcntl.ioctl(
  34. s.fileno(),
  35. 0x8915, # SIOCGIFADDR
  36. struct.pack("256s", name[:15].encode()),
  37. )[20:24]
  38. ip = socket.inet_ntoa(ip_bytes)
  39. # Get netmask
  40. netmask_bytes = fcntl.ioctl(
  41. s.fileno(),
  42. 0x891B, # SIOCGIFNETMASK
  43. struct.pack("256s", name[:15].encode()),
  44. )[20:24]
  45. netmask = socket.inet_ntoa(netmask_bytes)
  46. # Calculate subnet
  47. network = ipaddress.IPv4Network(f"{ip}/{netmask}", strict=False)
  48. interfaces.append(
  49. {
  50. "name": name,
  51. "ip": ip,
  52. "netmask": netmask,
  53. "subnet": str(network),
  54. }
  55. )
  56. s.close()
  57. except OSError:
  58. # Interface doesn't have an IP or other error
  59. pass
  60. except Exception as e:
  61. logger.debug("Error getting info for interface %s: %s", name, e)
  62. except ImportError:
  63. # fcntl not available (Windows)
  64. logger.warning("fcntl not available, interface detection limited")
  65. except Exception as e:
  66. logger.error("Error enumerating interfaces: %s", e)
  67. return interfaces
  68. def get_all_interface_ips() -> list[dict]:
  69. """Get all IPs (primary + aliases) for all non-excluded interfaces.
  70. Uses `ip -j addr show` to see secondary/alias IPs that ioctl misses.
  71. Falls back to ioctl-based get_network_interfaces() if `ip` is unavailable.
  72. Returns:
  73. List of dicts with name, ip, netmask, subnet, is_alias, label
  74. """
  75. if not _IP_CMD:
  76. logger.debug("ip command not found, using ioctl fallback")
  77. return _fallback_get_all_ips()
  78. try:
  79. result = subprocess.run(
  80. [_IP_CMD, "-j", "addr", "show"],
  81. capture_output=True,
  82. text=True,
  83. timeout=5,
  84. )
  85. if result.returncode != 0:
  86. logger.warning("ip addr show failed: %s", result.stderr)
  87. return _fallback_get_all_ips()
  88. interfaces_data = json.loads(result.stdout)
  89. except (subprocess.TimeoutExpired, json.JSONDecodeError, FileNotFoundError) as e:
  90. logger.warning("Failed to run ip -j addr show: %s", e)
  91. return _fallback_get_all_ips()
  92. entries = []
  93. for iface in interfaces_data:
  94. ifname = iface.get("ifname", "")
  95. if _is_excluded(ifname):
  96. continue
  97. ipv4_count = 0
  98. for addr_info in iface.get("addr_info", []):
  99. if addr_info.get("family") != "inet":
  100. continue
  101. ip = addr_info.get("local", "")
  102. prefix = addr_info.get("prefixlen", 24)
  103. label = addr_info.get("label", ifname)
  104. try:
  105. network = ipaddress.IPv4Network(f"{ip}/{prefix}", strict=False)
  106. netmask = str(network.netmask)
  107. except ValueError:
  108. continue
  109. # An alias has ":" in label (e.g. eth0:vp1) or is not the first IPv4
  110. is_alias = ":" in label or ipv4_count > 0
  111. entries.append(
  112. {
  113. "name": ifname,
  114. "ip": ip,
  115. "netmask": netmask,
  116. "subnet": str(network),
  117. "is_alias": is_alias,
  118. "label": label,
  119. }
  120. )
  121. ipv4_count += 1
  122. # Sort: primary IPs first per interface, then by interface name
  123. entries.sort(key=lambda e: (e["name"], e["is_alias"], e["ip"]))
  124. return entries
  125. def _fallback_get_all_ips() -> list[dict]:
  126. """Fallback: wrap get_network_interfaces() result with alias fields."""
  127. return [
  128. {
  129. **iface,
  130. "is_alias": False,
  131. "label": iface["name"],
  132. }
  133. for iface in get_network_interfaces()
  134. ]
  135. def find_interface_for_ip(target_ip: str) -> dict | None:
  136. """Find which interface is on the same subnet as the target IP.
  137. Args:
  138. target_ip: IP address to find the matching interface for
  139. Returns:
  140. Interface dict or None if not found
  141. """
  142. try:
  143. target = ipaddress.IPv4Address(target_ip)
  144. except ValueError:
  145. logger.error("Invalid target IP: %s", target_ip)
  146. return None
  147. interfaces = get_all_interface_ips()
  148. for iface in interfaces:
  149. if iface.get("is_alias"):
  150. continue
  151. try:
  152. network = ipaddress.IPv4Network(iface["subnet"], strict=False)
  153. if target in network:
  154. logger.debug("Found interface %s (%s) for target %s", iface["name"], iface["ip"], target_ip)
  155. return iface
  156. except ValueError:
  157. continue
  158. logger.warning("No interface found for target IP %s", target_ip)
  159. return None
  160. def get_other_interfaces(exclude_ip: str) -> list[dict]:
  161. """Get all interfaces except the one with the given IP.
  162. Args:
  163. exclude_ip: IP address of interface to exclude
  164. Returns:
  165. List of interface dicts
  166. """
  167. interfaces = get_network_interfaces()
  168. return [iface for iface in interfaces if iface["ip"] != exclude_ip]