selfupdate.py 4.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. #!/usr/bin/env python3
  2. from typing import final
  3. from flipper.app import App
  4. from flipper.storage import FlipperStorage
  5. from flipper.utils.cdc import resolve_port
  6. import logging
  7. import os
  8. import pathlib
  9. import serial.tools.list_ports as list_ports
  10. class Main(App):
  11. def init(self):
  12. self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
  13. self.parser.add_argument(
  14. "-b",
  15. "--baud",
  16. help="Port Baud rate",
  17. required=False,
  18. default=115200 * 4,
  19. type=int,
  20. )
  21. self.parser.add_argument("manifest_path", help="Manifest path")
  22. self.parser.add_argument(
  23. "--pkg_dir_name", help="Update dir name", default="pcbundle", required=False
  24. )
  25. self.parser.set_defaults(func=self.install)
  26. # logging
  27. self.logger = logging.getLogger()
  28. # make directory with exist check
  29. def mkdir_on_storage(self, storage, flipper_dir_path):
  30. if not storage.exist_dir(flipper_dir_path):
  31. self.logger.debug(f'"{flipper_dir_path}" does not exist, creating')
  32. if not storage.mkdir(flipper_dir_path):
  33. self.logger.error(f"Error: {storage.last_error}")
  34. return False
  35. else:
  36. self.logger.debug(f'"{flipper_dir_path}" already exists')
  37. return True
  38. # send file with exist check and hash check
  39. def send_file_to_storage(self, storage, flipper_file_path, local_file_path, force):
  40. exists = storage.exist_file(flipper_file_path)
  41. do_upload = not exists
  42. if exists:
  43. hash_local = storage.hash_local(local_file_path)
  44. hash_flipper = storage.hash_flipper(flipper_file_path)
  45. self.logger.debug(f"hash check: local {hash_local}, flipper {hash_flipper}")
  46. do_upload = force or (hash_local != hash_flipper)
  47. if do_upload:
  48. self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
  49. if not storage.send_file(local_file_path, flipper_file_path):
  50. self.logger.error(f"Error: {storage.last_error}")
  51. return False
  52. return True
  53. def install(self):
  54. if not (port := resolve_port(self.logger, self.args.port)):
  55. return 1
  56. storage = FlipperStorage(port, self.args.baud)
  57. storage.start()
  58. try:
  59. if not os.path.isfile(self.args.manifest_path):
  60. self.logger.error("Error: manifest not found")
  61. return 2
  62. manifest_path = pathlib.Path(os.path.abspath(self.args.manifest_path))
  63. manifest_name, pkg_name = manifest_path.parts[-1], manifest_path.parts[-2]
  64. pkg_dir_name = self.args.pkg_dir_name or pkg_name
  65. flipper_update_path = f"/ext/update/{pkg_dir_name}"
  66. self.logger.info(f'Installing "{pkg_name}" from {flipper_update_path}')
  67. # if not os.path.exists(self.args.manifest_path):
  68. # self.logger.error("Error: package not found")
  69. if not self.mkdir_on_storage(storage, flipper_update_path):
  70. self.logger.error(f"Error: cannot create {storage.last_error}")
  71. return -2
  72. for dirpath, dirnames, filenames in os.walk(manifest_path.parents[0]):
  73. for fname in filenames:
  74. self.logger.debug(f"Uploading {fname}")
  75. local_file_path = os.path.join(dirpath, fname)
  76. flipper_file_path = f"{flipper_update_path}/{fname}"
  77. if not self.send_file_to_storage(
  78. storage, flipper_file_path, local_file_path, False
  79. ):
  80. self.logger.error(f"Error: {storage.last_error}")
  81. return -3
  82. storage.send_and_wait_eol(
  83. f"update install {flipper_update_path}/{manifest_name}\r"
  84. )
  85. break
  86. return 0
  87. finally:
  88. storage.stop()
  89. if __name__ == "__main__":
  90. Main()()