storage.py 13 KB


  1. #!/usr/bin/env python3
  2. from flipper.storage import FlipperStorage
  3. import logging
  4. import argparse
  5. import os
  6. import sys
  7. import binascii
  8. import posixpath
  9. import filecmp
  10. import tempfile
  11. class Main:
  12. def __init__(self):
  13. # command args
  14. self.parser = argparse.ArgumentParser()
  15. self.parser.add_argument("-d", "--debug", action="store_true", help="Debug")
  16. self.parser.add_argument("-p", "--port", help="CDC Port", required=True)
  17. self.parser.add_argument(
  18. "-b",
  19. "--baud",
  20. help="Port Baud rate",
  21. required=False,
  22. default=115200 * 4,
  23. type=int,
  24. )
  25. self.subparsers = self.parser.add_subparsers(help="sub-command help")
  26. self.parser_mkdir = self.subparsers.add_parser("mkdir", help="Create directory")
  27. self.parser_mkdir.add_argument("flipper_path", help="Flipper path")
  28. self.parser_mkdir.set_defaults(func=self.mkdir)
  29. self.parser_remove = self.subparsers.add_parser(
  30. "remove", help="Remove file/directory"
  31. )
  32. self.parser_remove.add_argument("flipper_path", help="Flipper path")
  33. self.parser_remove.set_defaults(func=self.remove)
  34. self.parser_read = self.subparsers.add_parser("read", help="Read file")
  35. self.parser_read.add_argument("flipper_path", help="Flipper path")
  36. self.parser_read.set_defaults(func=self.read)
  37. self.parser_size = self.subparsers.add_parser("size", help="Size of file")
  38. self.parser_size.add_argument("flipper_path", help="Flipper path")
  39. self.parser_size.set_defaults(func=self.size)
  40. self.parser_receive = self.subparsers.add_parser("receive", help="Receive file")
  41. self.parser_receive.add_argument("flipper_path", help="Flipper path")
  42. self.parser_receive.add_argument("local_path", help="Local path")
  43. self.parser_receive.set_defaults(func=self.receive)
  44. self.parser_send = self.subparsers.add_parser(
  45. "send", help="Send file or directory"
  46. )
  47. self.parser_send.add_argument(
  48. "-f", "--force", help="Force sending", action="store_true"
  49. )
  50. self.parser_send.add_argument("local_path", help="Local path")
  51. self.parser_send.add_argument("flipper_path", help="Flipper path")
  52. self.parser_send.set_defaults(func=self.send)
  53. self.parser_list = self.subparsers.add_parser(
  54. "list", help="Recursively list files and dirs"
  55. )
  56. self.parser_list.add_argument("flipper_path", help="Flipper path", default="/")
  57. self.parser_list.set_defaults(func=self.list)
  58. self.parser_stress = self.subparsers.add_parser("stress", help="Stress test")
  59. self.parser.add_argument(
  60. "-c", "--count", type=int, default=10, help="Iteration count"
  61. )
  62. self.parser_stress.add_argument("flipper_path", help="Flipper path")
  63. self.parser_stress.add_argument(
  64. "file_size", type=int, help="Test file size in bytes"
  65. )
  66. self.parser_stress.set_defaults(func=self.stress)
  67. # logging
  68. self.logger = logging.getLogger()
  69. def __call__(self):
  70. self.args = self.parser.parse_args()
  71. if "func" not in self.args:
  72. self.parser.error("Choose something to do")
  73. # configure log output
  74. self.log_level = logging.DEBUG if self.args.debug else logging.INFO
  75. self.logger.setLevel(self.log_level)
  76. self.handler = logging.StreamHandler(sys.stdout)
  77. self.handler.setLevel(self.log_level)
  78. self.formatter = logging.Formatter("%(asctime)s [%(levelname)s] %(message)s")
  79. self.handler.setFormatter(self.formatter)
  80. self.logger.addHandler(self.handler)
  81. # execute requested function
  82. self.args.func()
  83. def mkdir(self):
  84. storage = FlipperStorage(self.args.port)
  85. storage.start()
  86. self.logger.debug(f'Creating "{self.args.flipper_path}"')
  87. if not storage.mkdir(self.args.flipper_path):
  88. self.logger.error(f"Error: {storage.last_error}")
  89. storage.stop()
  90. def remove(self):
  91. storage = FlipperStorage(self.args.port)
  92. storage.start()
  93. self.logger.debug(f'Removing "{self.args.flipper_path}"')
  94. if not storage.remove(self.args.flipper_path):
  95. self.logger.error(f"Error: {storage.last_error}")
  96. storage.stop()
  97. def receive(self):
  98. storage = FlipperStorage(self.args.port)
  99. storage.start()
  100. if storage.exist_dir(self.args.flipper_path):
  101. for dirpath, dirnames, filenames in storage.walk(self.args.flipper_path):
  102. self.logger.debug(
  103. f'Processing directory "{os.path.normpath(dirpath)}"'.replace(
  104. os.sep, "/"
  105. )
  106. )
  107. dirnames.sort()
  108. filenames.sort()
  109. rel_path = os.path.relpath(dirpath, self.args.flipper_path)
  110. for dirname in dirnames:
  111. local_dir_path = os.path.join(
  112. self.args.local_path, rel_path, dirname
  113. )
  114. local_dir_path = os.path.normpath(local_dir_path)
  115. os.makedirs(local_dir_path, exist_ok=True)
  116. for filename in filenames:
  117. local_file_path = os.path.join(
  118. self.args.local_path, rel_path, filename
  119. )
  120. local_file_path = os.path.normpath(local_file_path)
  121. flipper_file_path = os.path.normpath(
  122. os.path.join(dirpath, filename)
  123. ).replace(os.sep, "/")
  124. self.logger.info(
  125. f'Receiving "{flipper_file_path}" to "{local_file_path}"'
  126. )
  127. if not storage.receive_file(flipper_file_path, local_file_path):
  128. self.logger.error(f"Error: {storage.last_error}")
  129. else:
  130. self.logger.info(
  131. f'Receiving "{self.args.flipper_path}" to "{self.args.local_path}"'
  132. )
  133. if not storage.receive_file(self.args.flipper_path, self.args.local_path):
  134. self.logger.error(f"Error: {storage.last_error}")
  135. storage.stop()
  136. def send(self):
  137. storage = FlipperStorage(self.args.port)
  138. storage.start()
  139. self.send_to_storage(
  140. storage, self.args.flipper_path, self.args.local_path, self.args.force
  141. )
  142. storage.stop()
  143. # send file or folder recursively
  144. def send_to_storage(self, storage, flipper_path, local_path, force):
  145. if not os.path.exists(local_path):
  146. self.logger.error(f'Error: "{local_path}" is not exist')
  147. if os.path.isdir(local_path):
  148. # create parent dir
  149. self.mkdir_on_storage(storage, flipper_path)
  150. for dirpath, dirnames, filenames in os.walk(local_path):
  151. self.logger.debug(f'Processing directory "{os.path.normpath(dirpath)}"')
  152. dirnames.sort()
  153. filenames.sort()
  154. rel_path = os.path.relpath(dirpath, local_path)
  155. # create subdirs
  156. for dirname in dirnames:
  157. flipper_dir_path = os.path.join(flipper_path, rel_path, dirname)
  158. flipper_dir_path = os.path.normpath(flipper_dir_path).replace(
  159. os.sep, "/"
  160. )
  161. self.mkdir_on_storage(storage, flipper_dir_path)
  162. # send files
  163. for filename in filenames:
  164. flipper_file_path = os.path.join(flipper_path, rel_path, filename)
  165. flipper_file_path = os.path.normpath(flipper_file_path).replace(
  166. os.sep, "/"
  167. )
  168. local_file_path = os.path.normpath(os.path.join(dirpath, filename))
  169. self.send_file_to_storage(
  170. storage, flipper_file_path, local_file_path, force
  171. )
  172. else:
  173. self.send_file_to_storage(storage, flipper_path, local_path, force)
  174. # make directory with exist check
  175. def mkdir_on_storage(self, storage, flipper_dir_path):
  176. if not storage.exist_dir(flipper_dir_path):
  177. self.logger.debug(f'"{flipper_dir_path}" does not exist, creating')
  178. if not storage.mkdir(flipper_dir_path):
  179. self.logger.error(f"Error: {storage.last_error}")
  180. else:
  181. self.logger.debug(f'"{flipper_dir_path}" already exists')
  182. # send file with exist check and hash check
  183. def send_file_to_storage(self, storage, flipper_file_path, local_file_path, force):
  184. if not storage.exist_file(flipper_file_path):
  185. self.logger.debug(
  186. f'"{flipper_file_path}" does not exist, sending "{local_file_path}"'
  187. )
  188. self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
  189. if not storage.send_file(local_file_path, flipper_file_path):
  190. self.logger.error(f"Error: {storage.last_error}")
  191. elif force:
  192. self.logger.debug(
  193. f'"{flipper_file_path}" exists, but will be overwritten by "{local_file_path}"'
  194. )
  195. self.logger.info(f'Sending "{local_file_path}" to "{flipper_file_path}"')
  196. if not storage.send_file(local_file_path, flipper_file_path):
  197. self.logger.error(f"Error: {storage.last_error}")
  198. else:
  199. self.logger.debug(
  200. f'"{flipper_file_path}" exists, compare hash with "{local_file_path}"'
  201. )
  202. hash_local = storage.hash_local(local_file_path)
  203. hash_flipper = storage.hash_flipper(flipper_file_path)
  204. if not hash_flipper:
  205. self.logger.error(f"Error: {storage.last_error}")
  206. if hash_local == hash_flipper:
  207. self.logger.debug(
  208. f'"{flipper_file_path}" is equal to "{local_file_path}"'
  209. )
  210. else:
  211. self.logger.debug(
  212. f'"{flipper_file_path}" is NOT equal to "{local_file_path}"'
  213. )
  214. self.logger.info(
  215. f'Sending "{local_file_path}" to "{flipper_file_path}"'
  216. )
  217. if not storage.send_file(local_file_path, flipper_file_path):
  218. self.logger.error(f"Error: {storage.last_error}")
  219. def read(self):
  220. storage = FlipperStorage(self.args.port, self.args.baud)
  221. storage.start()
  222. self.logger.debug(f'Reading "{self.args.flipper_path}"')
  223. data = storage.read_file(self.args.flipper_path)
  224. if not data:
  225. self.logger.error(f"Error: {storage.last_error}")
  226. else:
  227. try:
  228. print("Text data:")
  229. print(data.decode())
  230. except:
  231. print("Binary hexadecimal data:")
  232. print(binascii.hexlify(data).decode())
  233. storage.stop()
  234. def size(self):
  235. storage = FlipperStorage(self.args.port)
  236. storage.start()
  237. self.logger.debug(f'Getting size of "{self.args.flipper_path}"')
  238. size = storage.size(self.args.flipper_path)
  239. if size < 0:
  240. self.logger.error(f"Error: {storage.last_error}")
  241. else:
  242. print(size)
  243. storage.stop()
  244. def list(self):
  245. storage = FlipperStorage(self.args.port)
  246. storage.start()
  247. self.logger.debug(f'Listing "{self.args.flipper_path}"')
  248. storage.list_tree(self.args.flipper_path)
  249. storage.stop()
  250. def stress(self):
  251. self.logger.error("This test is wearing out flash memory.")
  252. self.logger.error("Never use it with internal storage(/int)")
  253. if self.args.flipper_path.startswith(
  254. "/int"
  255. ) or self.args.flipper_path.startswith("/any"):
  256. self.logger.error("Stop at this point or device warranty will be void")
  257. say = input("Anything to say? ").strip().lower()
  258. if say != "void":
  259. return
  260. say = input("Why, Mr. Anderson? ").strip().lower()
  261. if say != "because":
  262. return
  263. with tempfile.TemporaryDirectory() as tmpdirname:
  264. send_file_name = os.path.join(tmpdirname, "send")
  265. receive_file_name = os.path.join(tmpdirname, "receive")
  266. with open(send_file_name, "w") as fout:
  267. fout.write("A" * self.args.file_size)
  268. storage = FlipperStorage(self.args.port)
  269. storage.start()
  270. if storage.exist_file(self.args.flipper_path):
  271. self.logger.error("File exists, remove it first")
  272. return
  273. while self.args.count > 0:
  274. storage.send_file(send_file_name, self.args.flipper_path)
  275. storage.receive_file(self.args.flipper_path, receive_file_name)
  276. if not filecmp.cmp(receive_file_name, send_file_name):
  277. self.logger.error("Files mismatch")
  278. break
  279. storage.remove(self.args.flipper_path)
  280. os.unlink(receive_file_name)
  281. self.args.count -= 1
  282. storage.stop()
  283. if __name__ == "__main__":
  284. Main()()