file.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122
  1. from enum import Enum
  2. import serial
  3. class RemoteIOError(IOError):
  4. pass
  5. class UnsupportedOperation(RemoteIOError):
  6. # TODO: Tell more on error
  7. pass
  8. class ReadWriteError(RemoteIOError):
  9. pass
  10. class FileMode(Enum):
  11. READ = "r"
  12. WRITE = "w"
  13. # APPEND = "a" - Not Impl
  14. # READ_WRITE = "w+"
  15. # READ_APPEND = "a+"
  16. class File:
  17. opened: bool = False
  18. filename: str
  19. conn: serial.Serial
  20. mode = FileMode.READ
  21. def _open(self):
  22. # Opening file on Flipper's end
  23. self.conn.write(f"f_open {self.filename} {self.mode}\n".encode())
  24. # Checking, if everything is OK.
  25. self.conn.write(b"f_valid\n")
  26. return bool(int(self.conn.read_until().decode()))
  27. def __init__(self, conn: serial.Serial, filename: str, mode=FileMode.READ):
  28. self.conn = conn
  29. self.mode = mode
  30. self.filename = filename
  31. if not self._open():
  32. raise RemoteIOError
  33. self.opened = True
  34. def lseek(self, size: int):
  35. self.conn.write(f"f_lseek {size}\n".encode())
  36. def tell(self):
  37. self.conn.write(b"f_tell\n")
  38. return int(self.conn.read_until().decode())
  39. def size(self):
  40. self.conn.write(b"f_size\n")
  41. return int(self.conn.read_until().decode())
  42. def write(self, data: bytes):
  43. if self.mode != FileMode.WRITE:
  44. raise UnsupportedOperation
  45. self.conn.write(f"f_write {len(data)}\n".encode())
  46. self.conn.write(data)
  47. self.conn.write(b"\n")
  48. if self.conn.read_until().decode().strip() != "OK":
  49. raise ReadWriteError
  50. def read(self, size: int) -> bytes:
  51. self.conn.write(f"f_read {size}\n".encode())
  52. return self.conn.read(size)
  53. def _close(self):
  54. self.conn.write(b"f_close\n")
  55. def close(self):
  56. self._close()
  57. # TODO: Add termination of remote file process.
  58. del self
  59. def read_all(self, block_size=128) -> bytes:
  60. """
  61. Function to simplify reading of file over serial.
  62. :return: content of file represented in `bytes`
  63. """
  64. size = self.size()
  65. a = bytearray()
  66. seek = 0
  67. # Reading in blocks of block_size
  68. while seek < block_size * (size // block_size - 1):
  69. seek += block_size
  70. self.lseek(seek)
  71. a.extend(self.read(block_size))
  72. seek += block_size
  73. self.lseek(seek)
  74. a.extend(self.read(size - seek)) # Appending mod
  75. self.lseek(0) # Resetting seek to default-position
  76. return bytes(a)
  77. def push(ser: serial.Serial, local_path, remote_path):
  78. with open(local_path, "rb") as local_f: # Reading local file
  79. a = local_f.read()
  80. remote_f = File(ser, remote_path, FileMode.WRITE)
  81. remote_f.write(a)
  82. remote_f.close()
  83. def pull(ser: serial.Serial, remote_path, local_path):
  84. remote_f = File(ser, remote_path, FileMode.READ)
  85. with open(local_path, "wb") as f:
  86. f.write(remote_f.read_all())
  87. remote_f.close()