mqtt_sniffer.py 3.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. #!/usr/bin/env python3
  2. """MQTT Sniffer for Bambu Lab printers.
  3. Connects to a printer and logs all MQTT messages to capture the exact
  4. command format used by OrcaSlicer or Bambu Studio.
  5. Usage:
  6. python mqtt_sniffer.py <printer_ip> <serial_number> <access_code>
  7. Example:
  8. python mqtt_sniffer.py 192.168.1.100 0948BB540200427 12345678
  9. """
  10. import json
  11. import ssl
  12. import sys
  13. import time
  14. from datetime import datetime
  15. import paho.mqtt.client as mqtt
  16. def on_connect(client, userdata, flags, rc):
  17. """Called when connected to the MQTT broker."""
  18. if rc == 0:
  19. print(f"[{datetime.now().strftime('%H:%M:%S')}] Connected to printer!")
  20. serial = userdata["serial"]
  21. # Subscribe to all topics for this printer
  22. topic_report = f"device/{serial}/report"
  23. client.subscribe(topic_report)
  24. print(f"[{datetime.now().strftime('%H:%M:%S')}] Subscribed to: {topic_report}")
  25. print("-" * 80)
  26. print("Listening for MQTT messages... Press Ctrl+C to stop.")
  27. print("Now use OrcaSlicer to add a K-profile and the command will be logged here.")
  28. print("-" * 80)
  29. else:
  30. print(f"[{datetime.now().strftime('%H:%M:%S')}] Connection failed with code: {rc}")
  31. def on_message(client, userdata, msg):
  32. """Called when a message is received."""
  33. try:
  34. payload = json.loads(msg.payload.decode("utf-8"))
  35. # Check if this is an extrusion_cali related message
  36. is_cali_msg = False
  37. command = None
  38. if "print" in payload:
  39. command = payload["print"].get("command", "")
  40. if "extrusion_cali" in command:
  41. is_cali_msg = True
  42. # Always log calibration messages with full detail
  43. if is_cali_msg:
  44. print(f"\n{'='*80}")
  45. print(f"[{datetime.now().strftime('%H:%M:%S.%f')[:-3]}] *** CALIBRATION COMMAND: {command} ***")
  46. print(f"Topic: {msg.topic}")
  47. print(f"Full payload:")
  48. print(json.dumps(payload, indent=2))
  49. print(f"{'='*80}\n")
  50. else:
  51. # For other messages, just show a brief summary
  52. if "print" in payload:
  53. cmd = payload["print"].get("command", "unknown")
  54. # Skip noisy status messages
  55. if cmd not in ["push_status"]:
  56. print(f"[{datetime.now().strftime('%H:%M:%S')}] Command: {cmd}")
  57. except json.JSONDecodeError:
  58. print(f"[{datetime.now().strftime('%H:%M:%S')}] Non-JSON message on {msg.topic}")
  59. except Exception as e:
  60. print(f"[{datetime.now().strftime('%H:%M:%S')}] Error processing message: {e}")
  61. def on_disconnect(client, userdata, rc):
  62. """Called when disconnected from the MQTT broker."""
  63. print(f"[{datetime.now().strftime('%H:%M:%S')}] Disconnected with code: {rc}")
  64. def main():
  65. if len(sys.argv) != 4:
  66. print("Usage: python mqtt_sniffer.py <printer_ip> <serial_number> <access_code>")
  67. print("\nExample:")
  68. print(" python mqtt_sniffer.py 192.168.1.100 0948BB540200427 12345678")
  69. sys.exit(1)
  70. printer_ip = sys.argv[1]
  71. serial_number = sys.argv[2]
  72. access_code = sys.argv[3]
  73. print(f"Connecting to printer at {printer_ip}...")
  74. print(f"Serial: {serial_number}")
  75. # Create MQTT client
  76. client = mqtt.Client(userdata={"serial": serial_number})
  77. client.username_pw_set("bblp", access_code)
  78. # Configure TLS
  79. ssl_context = ssl.create_default_context()
  80. ssl_context.check_hostname = False
  81. ssl_context.verify_mode = ssl.CERT_NONE
  82. client.tls_set_context(ssl_context)
  83. # Set callbacks
  84. client.on_connect = on_connect
  85. client.on_message = on_message
  86. client.on_disconnect = on_disconnect
  87. try:
  88. client.connect(printer_ip, 8883, 60)
  89. client.loop_forever()
  90. except KeyboardInterrupt:
  91. print("\n\nStopping sniffer...")
  92. client.disconnect()
  93. except Exception as e:
  94. print(f"Error: {e}")
  95. sys.exit(1)
  96. if __name__ == "__main__":
  97. main()