selfupdate.py 4.5 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120
  1. #!/usr/bin/env python3
  2. from typing import final
  3. from flipper.app import App
  4. from flipper.storage import FlipperStorage
  5. from flipper.utils.cdc import resolve_port
  6. import logging
  7. import os
  8. import pathlib
  9. import serial.tools.list_ports as list_ports
  10. class Main(App):
  11. def init(self):
  12. self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
  13. self.parser.add_argument(
  14. "-b",
  15. "--baud",
  16. help="Port Baud rate",
  17. required=False,
  18. default=115200 * 4,
  19. type=int,
  20. )
  21. self.parser.add_argument("manifest_path", help="Manifest path")
  22. self.parser.add_argument(
  23. "--pkg_dir_name", help="Update dir name", default="pcbundle", required=False
  24. )
  25. self.parser.set_defaults(func=self.install)
  26. # logging
  27. self.logger = logging.getLogger()
  28. # make directory with exist check
  29. def mkdir_on_storage(self, storage, flipper_dir_path):
  30. if not storage.exist_dir(flipper_dir_path):
  31. self.logger.debug(f'"{flipper_dir_path}" does not exist, creating')
  32. if not storage.mkdir(flipper_dir_path):
  33. self.logger.error(f"Error: {storage.last_error}")
  34. return False
  35. else:
  36. self.logger.debug(f'"{flipper_dir_path}" already exists')
  37. return True
  38. # send file with exist check and hash check
  39. def send_file_to_storage(self, storage, flipper_file_path, local_file_path, force):
  40. exists = storage.exist_file(flipper_file_path)
  41. do_upload = not exists
  42. if exists:
  43. hash_local = storage.hash_local(local_file_path)
  44. hash_flipper = storage.hash_flipper(flipper_file_path)
  45. self.logger.debug(f"hash check: local {hash_local}, flipper {hash_flipper}")
  46. do_upload = force or (hash_local != hash_flipper)
  47. if do_upload:
  48. self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
  49. if not storage.send_file(local_file_path, flipper_file_path):
  50. self.logger.error(f"Error: {storage.last_error}")
  51. return False
  52. return True
  53. def install(self):
  54. if not (port := resolve_port(self.logger, self.args.port)):
  55. return 1
  56. storage = FlipperStorage(port, self.args.baud)
  57. storage.start()
  58. try:
  59. if not os.path.isfile(self.args.manifest_path):
  60. self.logger.error("Error: manifest not found")
  61. return 2
  62. manifest_path = pathlib.Path(os.path.abspath(self.args.manifest_path))
  63. manifest_name, pkg_name = manifest_path.parts[-1], manifest_path.parts[-2]
  64. pkg_dir_name = self.args.pkg_dir_name or pkg_name
  65. update_root = "/ext/update"
  66. flipper_update_path = f"{update_root}/{pkg_dir_name}"
  67. self.logger.info(f'Installing "{pkg_name}" from {flipper_update_path}')
  68. # if not os.path.exists(self.args.manifest_path):
  69. # self.logger.error("Error: package not found")
  70. if not self.mkdir_on_storage(
  71. storage, update_root
  72. ) or not self.mkdir_on_storage(storage, flipper_update_path):
  73. self.logger.error(f"Error: cannot create {storage.last_error}")
  74. return -2
  75. for dirpath, dirnames, filenames in os.walk(manifest_path.parents[0]):
  76. for fname in filenames:
  77. self.logger.debug(f"Uploading {fname}")
  78. local_file_path = os.path.join(dirpath, fname)
  79. flipper_file_path = f"{flipper_update_path}/{fname}"
  80. if not self.send_file_to_storage(
  81. storage, flipper_file_path, local_file_path, False
  82. ):
  83. self.logger.error(f"Error: {storage.last_error}")
  84. return -3
  85. storage.send_and_wait_eol(
  86. f"update install {flipper_update_path}/{manifest_name}\r"
  87. )
  88. result = storage.read.until(storage.CLI_EOL)
  89. if not b"Verifying" in result:
  90. self.logger.error(f"Unexpected response: {result.decode('ascii')}")
  91. return -4
  92. result = storage.read.until(storage.CLI_EOL)
  93. if not result.startswith(b"OK"):
  94. self.logger.error(result.decode("ascii"))
  95. return -5
  96. break
  97. return 0
  98. finally:
  99. storage.stop()
  100. if __name__ == "__main__":
  101. Main()()