selfupdate.py 4.3 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113
  1. #!/usr/bin/env python3
  2. from typing import final
  3. from flipper.app import App
  4. from flipper.storage import FlipperStorage
  5. from flipper.utils.cdc import resolve_port
  6. import logging
  7. import os
  8. import pathlib
  9. import serial.tools.list_ports as list_ports
  10. class Main(App):
  11. def init(self):
  12. self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
  13. self.parser.add_argument("manifest_path", help="Manifest path")
  14. self.parser.add_argument(
  15. "--pkg_dir_name", help="Update dir name", default="pcbundle", required=False
  16. )
  17. self.parser.set_defaults(func=self.install)
  18. # logging
  19. self.logger = logging.getLogger()
  20. # make directory with exist check
  21. def mkdir_on_storage(self, storage, flipper_dir_path):
  22. if not storage.exist_dir(flipper_dir_path):
  23. self.logger.debug(f'"{flipper_dir_path}" does not exist, creating')
  24. if not storage.mkdir(flipper_dir_path):
  25. self.logger.error(f"Error: {storage.last_error}")
  26. return False
  27. else:
  28. self.logger.debug(f'"{flipper_dir_path}" already exists')
  29. return True
  30. # send file with exist check and hash check
  31. def send_file_to_storage(self, storage, flipper_file_path, local_file_path, force):
  32. exists = storage.exist_file(flipper_file_path)
  33. do_upload = not exists
  34. if exists:
  35. hash_local = storage.hash_local(local_file_path)
  36. hash_flipper = storage.hash_flipper(flipper_file_path)
  37. self.logger.debug(f"hash check: local {hash_local}, flipper {hash_flipper}")
  38. do_upload = force or (hash_local != hash_flipper)
  39. if do_upload:
  40. self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
  41. if not storage.send_file(local_file_path, flipper_file_path):
  42. self.logger.error(f"Error: {storage.last_error}")
  43. return False
  44. return True
  45. def install(self):
  46. if not (port := resolve_port(self.logger, self.args.port)):
  47. return 1
  48. storage = FlipperStorage(port)
  49. storage.start()
  50. try:
  51. if not os.path.isfile(self.args.manifest_path):
  52. self.logger.error("Error: manifest not found")
  53. return 2
  54. manifest_path = pathlib.Path(os.path.abspath(self.args.manifest_path))
  55. manifest_name, pkg_name = manifest_path.parts[-1], manifest_path.parts[-2]
  56. pkg_dir_name = self.args.pkg_dir_name or pkg_name
  57. update_root = "/ext/update"
  58. flipper_update_path = f"{update_root}/{pkg_dir_name}"
  59. self.logger.info(f'Installing "{pkg_name}" from {flipper_update_path}')
  60. # if not os.path.exists(self.args.manifest_path):
  61. # self.logger.error("Error: package not found")
  62. if not self.mkdir_on_storage(
  63. storage, update_root
  64. ) or not self.mkdir_on_storage(storage, flipper_update_path):
  65. self.logger.error(f"Error: cannot create {storage.last_error}")
  66. return -2
  67. for dirpath, dirnames, filenames in os.walk(manifest_path.parents[0]):
  68. for fname in filenames:
  69. self.logger.debug(f"Uploading {fname}")
  70. local_file_path = os.path.join(dirpath, fname)
  71. flipper_file_path = f"{flipper_update_path}/{fname}"
  72. if not self.send_file_to_storage(
  73. storage, flipper_file_path, local_file_path, False
  74. ):
  75. self.logger.error(f"Error: {storage.last_error}")
  76. return -3
  77. # return -11
  78. storage.send_and_wait_eol(
  79. f"update install {flipper_update_path}/{manifest_name}\r"
  80. )
  81. result = storage.read.until(storage.CLI_EOL)
  82. if not b"Verifying" in result:
  83. self.logger.error(f"Unexpected response: {result.decode('ascii')}")
  84. return -4
  85. result = storage.read.until(storage.CLI_EOL)
  86. if not result.startswith(b"OK"):
  87. self.logger.error(result.decode("ascii"))
  88. return -5
  89. break
  90. return 0
  91. finally:
  92. storage.stop()
  93. if __name__ == "__main__":
  94. Main()()