storage.py 7.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185
  1. #!/usr/bin/env python3
  2. import binascii
  3. import filecmp
  4. import os
  5. import tempfile
  6. from flipper.app import App
  7. from flipper.storage import FlipperStorage, FlipperStorageOperations
  8. from flipper.utils.cdc import resolve_port
  9. def WrapStorageOp(func):
  10. def wrapper(*args, **kwargs):
  11. try:
  12. func(*args, **kwargs)
  13. return 0
  14. except Exception as e:
  15. print(f"Error: {e}")
  16. # raise # uncomment to debug
  17. return 1
  18. return wrapper
  19. class Main(App):
  20. def init(self):
  21. self.parser.add_argument("-p", "--port", help="CDC Port", default="auto")
  22. self.subparsers = self.parser.add_subparsers(help="sub-command help")
  23. self.parser_mkdir = self.subparsers.add_parser("mkdir", help="Create directory")
  24. self.parser_mkdir.add_argument("flipper_path", help="Flipper path")
  25. self.parser_mkdir.set_defaults(func=self.mkdir)
  26. self.parser_format = self.subparsers.add_parser(
  27. "format_ext", help="Format flash card"
  28. )
  29. self.parser_format.set_defaults(func=self.format_ext)
  30. self.parser_remove = self.subparsers.add_parser(
  31. "remove", help="Remove file/directory"
  32. )
  33. self.parser_remove.add_argument("flipper_path", help="Flipper path")
  34. self.parser_remove.set_defaults(func=self.remove)
  35. self.parser_read = self.subparsers.add_parser("read", help="Read file")
  36. self.parser_read.add_argument("flipper_path", help="Flipper path")
  37. self.parser_read.set_defaults(func=self.read)
  38. self.parser_size = self.subparsers.add_parser("size", help="Size of file")
  39. self.parser_size.add_argument("flipper_path", help="Flipper path")
  40. self.parser_size.set_defaults(func=self.size)
  41. self.parser_receive = self.subparsers.add_parser("receive", help="Receive file")
  42. self.parser_receive.add_argument("flipper_path", help="Flipper path")
  43. self.parser_receive.add_argument("local_path", help="Local path")
  44. self.parser_receive.set_defaults(func=self.receive)
  45. self.parser_send = self.subparsers.add_parser(
  46. "send", help="Send file or directory"
  47. )
  48. self.parser_send.add_argument(
  49. "-f", "--force", help="Force sending", action="store_true"
  50. )
  51. self.parser_send.add_argument("local_path", help="Local path")
  52. self.parser_send.add_argument("flipper_path", help="Flipper path")
  53. self.parser_send.set_defaults(func=self.send)
  54. self.parser_list = self.subparsers.add_parser(
  55. "list", help="Recursively list files and dirs"
  56. )
  57. self.parser_list.add_argument("flipper_path", help="Flipper path", default="/")
  58. self.parser_list.set_defaults(func=self.list)
  59. self.parser_stress = self.subparsers.add_parser("stress", help="Stress test")
  60. self.parser.add_argument(
  61. "-c", "--count", type=int, default=10, help="Iteration count"
  62. )
  63. self.parser_stress.add_argument("flipper_path", help="Flipper path")
  64. self.parser_stress.add_argument(
  65. "file_size", type=int, help="Test file size in bytes"
  66. )
  67. self.parser_stress.set_defaults(func=self.stress)
  68. def _get_port(self):
  69. if not (port := resolve_port(self.logger, self.args.port)):
  70. raise Exception("Failed to resolve port")
  71. return port
  72. @WrapStorageOp
  73. def mkdir(self):
  74. self.logger.debug(f'Creating "{self.args.flipper_path}"')
  75. with FlipperStorage(self._get_port()) as storage:
  76. storage.mkdir(self.args.flipper_path)
  77. @WrapStorageOp
  78. def remove(self):
  79. self.logger.debug(f'Removing "{self.args.flipper_path}"')
  80. with FlipperStorage(self._get_port()) as storage:
  81. storage.remove(self.args.flipper_path)
  82. @WrapStorageOp
  83. def receive(self):
  84. with FlipperStorage(self._get_port()) as storage:
  85. FlipperStorageOperations(storage).recursive_receive(
  86. self.args.flipper_path, self.args.local_path
  87. )
  88. @WrapStorageOp
  89. def send(self):
  90. with FlipperStorage(self._get_port()) as storage:
  91. FlipperStorageOperations(storage).recursive_send(
  92. self.args.flipper_path, self.args.local_path, self.args.force
  93. )
  94. @WrapStorageOp
  95. def read(self):
  96. self.logger.debug(f'Reading "{self.args.flipper_path}"')
  97. with FlipperStorage(self._get_port()) as storage:
  98. data = storage.read_file(self.args.flipper_path)
  99. try:
  100. print("Text data:")
  101. print(data.decode())
  102. except Exception:
  103. print("Binary hexadecimal data:")
  104. print(binascii.hexlify(data).decode())
  105. @WrapStorageOp
  106. def size(self):
  107. self.logger.debug(f'Getting size of "{self.args.flipper_path}"')
  108. with FlipperStorage(self._get_port()) as storage:
  109. print(storage.size(self.args.flipper_path))
  110. @WrapStorageOp
  111. def list(self):
  112. self.logger.debug(f'Listing "{self.args.flipper_path}"')
  113. with FlipperStorage(self._get_port()) as storage:
  114. storage.list_tree(self.args.flipper_path)
  115. @WrapStorageOp
  116. def format_ext(self):
  117. self.logger.debug("Formatting /ext SD card")
  118. with FlipperStorage(self._get_port()) as storage:
  119. storage.format_ext()
  120. @WrapStorageOp
  121. def stress(self):
  122. self.logger.error("This test is wearing out flash memory.")
  123. self.logger.error("Never use it with internal storage (/int)")
  124. if self.args.flipper_path.startswith(
  125. "/int"
  126. ) or self.args.flipper_path.startswith("/any"):
  127. self.logger.error("Stop at this point or device warranty will be void")
  128. say = input("Anything to say? ").strip().lower()
  129. if say != "void":
  130. return 2
  131. say = input("Why, Mr. Anderson? ").strip().lower()
  132. if say != "because":
  133. return 3
  134. with tempfile.TemporaryDirectory() as tmpdirname:
  135. send_file_name = os.path.join(tmpdirname, "send")
  136. receive_file_name = os.path.join(tmpdirname, "receive")
  137. with open(send_file_name, "w") as fout:
  138. fout.write("A" * self.args.file_size)
  139. with FlipperStorage(self._get_port()) as storage:
  140. if storage.exist_file(self.args.flipper_path):
  141. self.logger.error("File exists, remove it first")
  142. return
  143. while self.args.count > 0:
  144. storage.send_file(send_file_name, self.args.flipper_path)
  145. storage.receive_file(self.args.flipper_path, receive_file_name)
  146. if not filecmp.cmp(receive_file_name, send_file_name):
  147. self.logger.error("Files mismatch")
  148. break
  149. storage.remove(self.args.flipper_path)
  150. os.unlink(receive_file_name)
  151. self.args.count -= 1
  152. if __name__ == "__main__":
  153. Main()()