openocd.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173
  1. import socket
  2. import subprocess
  3. import logging
  4. class OpenOCD:
  5. """OpenOCD cli wrapper"""
  6. COMMAND_TOKEN = "\x1a"
  7. def __init__(self, config: dict = {}) -> None:
  8. assert isinstance(config, dict)
  9. # Params base
  10. self.params = []
  11. self.gdb_port = 3333
  12. self.telnet_port = 4444
  13. self.tcl_port = 6666
  14. # Port
  15. if port_base := config.get("port_base", None):
  16. self.gdb_port = port_base
  17. self.tcl_port = port_base + 1
  18. self.telnet_port = port_base + 2
  19. self._add_command(f"gdb_port {self.gdb_port}")
  20. self._add_command(f"tcl_port {self.tcl_port}")
  21. self._add_command(f"telnet_port {self.telnet_port}")
  22. # Config files
  23. if interface := config.get("interface", None):
  24. pass
  25. else:
  26. interface = "interface/stlink.cfg"
  27. if target := config.get("target", None):
  28. pass
  29. else:
  30. target = "target/stm32wbx.cfg"
  31. self._add_file(interface)
  32. self._add_file(target)
  33. # Programmer settings
  34. if serial := config.get("serial", None):
  35. self._add_command(f"{serial}")
  36. # Other params
  37. if "params" in config:
  38. self.params += config["params"]
  39. # logging
  40. self.logger = logging.getLogger()
  41. def _add_command(self, command: str):
  42. self.params.append("-c")
  43. self.params.append(command)
  44. def _add_file(self, file: str):
  45. self.params.append("-f")
  46. self.params.append(file)
  47. def start(self, args: list[str] = []):
  48. """Start OpenOCD process"""
  49. params = ["openocd", *self.params, *args]
  50. self.logger.debug(f"_execute: {params}")
  51. self.process = subprocess.Popen(
  52. params, stderr=subprocess.PIPE, stdout=subprocess.PIPE
  53. )
  54. self._wait_for_openocd_tcl()
  55. self.socket = socket.socket(socket.AF_INET, socket.SOCK_STREAM)
  56. self.socket.connect(("127.0.0.1", self.tcl_port))
  57. def _wait_for_openocd_tcl(self):
  58. """Wait for OpenOCD to start"""
  59. # TODO: timeout
  60. while True:
  61. stderr = self.process.stderr
  62. if not stderr:
  63. break
  64. line = stderr.readline()
  65. if not line:
  66. break
  67. line = line.decode("utf-8").strip()
  68. self.logger.debug(f"OpenOCD: {line}")
  69. if "Listening on port" in line and "for tcl connections" in line:
  70. break
  71. def stop(self):
  72. self.send_tcl("exit")
  73. self.send_tcl("shutdown")
  74. self.socket.close()
  75. try:
  76. self.process.wait(timeout=10)
  77. except subprocess.TimeoutExpired as e:
  78. self.process.kill()
  79. self.logger.error("Failed to stop OpenOCD")
  80. self.logger.exception(e)
  81. self.postmortem()
  82. def send_tcl(self, cmd) -> str:
  83. """Send a command string to TCL RPC. Return the result that was read."""
  84. try:
  85. data = (cmd + OpenOCD.COMMAND_TOKEN).encode("utf-8")
  86. self.logger.debug(f"<- {data}")
  87. self.socket.send(data)
  88. except Exception as e:
  89. self.logger.error("Failed to send command to OpenOCD")
  90. self.logger.exception(e)
  91. self.postmortem()
  92. raise
  93. try:
  94. data = self._recv()
  95. return data
  96. except Exception as e:
  97. self.logger.error("Failed to receive response from OpenOCD")
  98. self.logger.exception(e)
  99. self.postmortem()
  100. raise
  101. def _recv(self):
  102. """Read from the stream until the token (\x1a) was received."""
  103. # TODO: timeout
  104. data = bytes()
  105. while True:
  106. chunk = self.socket.recv(4096)
  107. data += chunk
  108. if bytes(OpenOCD.COMMAND_TOKEN, encoding="utf-8") in chunk:
  109. break
  110. self.logger.debug(f"-> {data}")
  111. data = data.decode("utf-8").strip()
  112. data = data[:-1] # strip trailing \x1a
  113. return data
  114. def postmortem(self) -> None:
  115. """Postmortem analysis of the OpenOCD process"""
  116. stdout, stderr = self.process.communicate()
  117. log = self.logger.error
  118. if self.process.returncode == 0:
  119. log = self.logger.debug
  120. log("OpenOCD exited normally")
  121. else:
  122. log("OpenOCD exited with error")
  123. log(f"Exit code: {self.process.returncode}")
  124. for line in stdout.decode("utf-8").splitlines():
  125. log(f"Stdout: {line}")
  126. for line in stderr.decode("utf-8").splitlines():
  127. log(f"Stderr: {line}")
  128. def read_32(self, addr: int) -> int:
  129. """Read 32-bit value from memory"""
  130. data = self.send_tcl(f"mdw {addr}").strip()
  131. data = data.split(": ")[-1]
  132. data = int(data, 16)
  133. return data
  134. def write_32(self, addr: int, value: int) -> None:
  135. """Write 32-bit value to memory"""
  136. self.send_tcl(f"mww {addr} {value}")