selfupdate.py 5.4 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143
  1. #!/usr/bin/env python3
  2. from flipper.storage import FlipperStorage
  3. import logging
  4. import argparse
  5. import os
  6. import sys
  7. import pathlib
  8. import serial.tools.list_ports as list_ports
  9. class Main:
  10. def __init__(self):
  11. # command args
  12. self.parser = argparse.ArgumentParser()
  13. self.parser.add_argument("-d", "--debug", action="store_true", help="Debug")
  14. self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
  15. self.parser.add_argument(
  16. "-b",
  17. "--baud",
  18. help="Port Baud rate",
  19. required=False,
  20. default=115200 * 4,
  21. type=int,
  22. )
  23. self.subparsers = self.parser.add_subparsers(help="sub-command help")
  24. self.parser_install = self.subparsers.add_parser(
  25. "install", help="Install OTA package"
  26. )
  27. self.parser_install.add_argument("manifest_path", help="Manifest path")
  28. self.parser_install.add_argument(
  29. "--pkg_dir_name", help="Update dir name", default="pcbundle", required=False
  30. )
  31. self.parser_install.set_defaults(func=self.install)
  32. # logging
  33. self.logger = logging.getLogger()
  34. def __call__(self):
  35. self.args = self.parser.parse_args()
  36. if "func" not in self.args:
  37. self.parser.error("Choose something to do")
  38. # configure log output
  39. self.log_level = logging.DEBUG if self.args.debug else logging.INFO
  40. self.logger.setLevel(self.log_level)
  41. self.handler = logging.StreamHandler(sys.stdout)
  42. self.handler.setLevel(self.log_level)
  43. self.formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
  44. self.handler.setFormatter(self.formatter)
  45. self.logger.addHandler(self.handler)
  46. # execute requested function
  47. self.args.func()
  48. # make directory with exist check
  49. def mkdir_on_storage(self, storage, flipper_dir_path):
  50. if not storage.exist_dir(flipper_dir_path):
  51. self.logger.debug(f'"{flipper_dir_path}" does not exist, creating')
  52. if not storage.mkdir(flipper_dir_path):
  53. self.logger.error(f"Error: {storage.last_error}")
  54. return False
  55. else:
  56. self.logger.debug(f'"{flipper_dir_path}" already exists')
  57. return True
  58. # send file with exist check and hash check
  59. def send_file_to_storage(self, storage, flipper_file_path, local_file_path, force):
  60. exists = storage.exist_file(flipper_file_path)
  61. do_upload = not exists
  62. if exists:
  63. hash_local = storage.hash_local(local_file_path)
  64. hash_flipper = storage.hash_flipper(flipper_file_path)
  65. self.logger.debug(f"hash check: local {hash_local}, flipper {hash_flipper}")
  66. do_upload = force or (hash_local != hash_flipper)
  67. if do_upload:
  68. self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
  69. if not storage.send_file(local_file_path, flipper_file_path):
  70. self.logger.error(f"Error: {storage.last_error}")
  71. return False
  72. return True
  73. def _get_port(self):
  74. if self.args.port != "auto":
  75. return self.args.port
  76. # Try guessing
  77. flippers = list(list_ports.grep("flip"))
  78. if len(flippers) == 1:
  79. flipper = flippers[0]
  80. self.logger.info(f"Using {flipper.serial_number} on {flipper.device}")
  81. return flipper.device
  82. elif len(flippers) == 0:
  83. self.logger.error("Failed to find connected Flipper")
  84. elif len(flippers) > 1:
  85. self.logger.error("More than one Flipper is attached")
  86. self.logger.error("Failed to guess which port to use. Specify --port")
  87. def install(self):
  88. if not (port := self._get_port()):
  89. return 1
  90. storage = FlipperStorage(port, self.args.baud)
  91. storage.start()
  92. if not os.path.isfile(self.args.manifest_path):
  93. self.logger.error("Error: manifest not found")
  94. return 2
  95. manifest_path = pathlib.Path(os.path.abspath(self.args.manifest_path))
  96. manifest_name, pkg_name = manifest_path.parts[-1], manifest_path.parts[-2]
  97. pkg_dir_name = self.args.pkg_dir_name or pkg_name
  98. flipper_update_path = f"/ext/update/{pkg_dir_name}"
  99. self.logger.info(f'Installing "{pkg_name}" from {flipper_update_path}')
  100. # if not os.path.exists(self.args.manifest_path):
  101. # self.logger.error("Error: package not found")
  102. if not self.mkdir_on_storage(storage, flipper_update_path):
  103. self.logger.error(f"Error: cannot create {storage.last_error}")
  104. return -2
  105. for dirpath, dirnames, filenames in os.walk(manifest_path.parents[0]):
  106. for fname in filenames:
  107. self.logger.debug(f"Uploading {fname}")
  108. local_file_path = os.path.join(dirpath, fname)
  109. flipper_file_path = f"{flipper_update_path}/{fname}"
  110. if not self.send_file_to_storage(
  111. storage, flipper_file_path, local_file_path, False
  112. ):
  113. self.logger.error(f"Error: {storage.last_error}")
  114. return -3
  115. storage.send_and_wait_eol(
  116. f"update install {flipper_update_path}/{manifest_name}\r"
  117. )
  118. break
  119. storage.stop()
  120. if __name__ == "__main__":
  121. Main()()