manifest.py 4.9 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176
  1. import datetime
  2. import logging
  3. import os
  4. import posixpath
  5. from pathlib import Path
  6. from flipper.utils import *
  7. from flipper.utils.fstree import *
  8. MANIFEST_VERSION = 0
  9. class ManifestRecord:
  10. tag = None
  11. @staticmethod
  12. def fromLine(line):
  13. raise NotImplementedError
  14. def toLine(self):
  15. raise NotImplementedError
  16. def _unpack(self, manifest, key, nodetype):
  17. key, value = manifest.readline().split(":", 1)
  18. assert key == key
  19. return nodetype(value)
  20. MANIFEST_TAGS_RECORDS = {}
  21. def addManifestRecord(record: ManifestRecord):
  22. assert record.tag
  23. MANIFEST_TAGS_RECORDS[record.tag] = record
  24. class ManifestRecordVersion(ManifestRecord):
  25. tag = "V"
  26. def __init__(self, version):
  27. self.version = version
  28. @staticmethod
  29. def fromLine(line):
  30. return ManifestRecordVersion(int(line))
  31. def toLine(self):
  32. return f"{self.tag}:{self.version}\n"
  33. addManifestRecord(ManifestRecordVersion)
  34. class ManifestRecordTimestamp(ManifestRecord):
  35. tag = "T"
  36. def __init__(self, timestamp: int):
  37. self.timestamp = int(timestamp)
  38. @staticmethod
  39. def fromLine(line):
  40. return ManifestRecordTimestamp(int(line))
  41. def toLine(self):
  42. return f"{self.tag}:{self.timestamp}\n"
  43. addManifestRecord(ManifestRecordTimestamp)
  44. class ManifestRecordDirectory(ManifestRecord):
  45. tag = "D"
  46. def __init__(self, path: str):
  47. self.path = path
  48. @staticmethod
  49. def fromLine(line):
  50. return ManifestRecordDirectory(line)
  51. def toLine(self):
  52. return f"{self.tag}:{self.path}\n"
  53. addManifestRecord(ManifestRecordDirectory)
  54. class ManifestRecordFile(ManifestRecord):
  55. tag = "F"
  56. def __init__(self, path: str, md5: str, size: int):
  57. self.path = path
  58. self.md5 = md5
  59. self.size = size
  60. @staticmethod
  61. def fromLine(line):
  62. data = line.split(":", 3)
  63. return ManifestRecordFile(data[2], data[0], int(data[1]))
  64. def toLine(self):
  65. return f"{self.tag}:{self.md5}:{self.size}:{self.path}\n"
  66. addManifestRecord(ManifestRecordFile)
  67. class Manifest:
  68. def __init__(self, timestamp_value=None):
  69. self.version = None
  70. self.records = []
  71. self.records.append(ManifestRecordVersion(MANIFEST_VERSION))
  72. self.records.append(ManifestRecordTimestamp(timestamp_value or timestamp()))
  73. self.logger = logging.getLogger(self.__class__.__name__)
  74. def load(self, filename):
  75. with open(filename, "r") as manifest:
  76. for line in manifest.readlines():
  77. line = line.strip()
  78. if len(line) == 0:
  79. continue
  80. tag, line = line.split(":", 1)
  81. record = MANIFEST_TAGS_RECORDS[tag].fromLine(line)
  82. self.records.append(record)
  83. def save(self, filename):
  84. with open(filename, "w+", newline="\n") as manifest:
  85. for record in self.records:
  86. manifest.write(record.toLine())
  87. def addDirectory(self, path):
  88. self.records.append(ManifestRecordDirectory(path))
  89. def addFile(self, path, md5, size):
  90. self.records.append(ManifestRecordFile(path, md5, size))
  91. def create(self, directory_path, ignore_files=["Manifest"]):
  92. for root, dirs, files in os.walk(directory_path):
  93. dirs.sort()
  94. files.sort()
  95. relative_root = root.replace(directory_path, "", 1)
  96. if relative_root:
  97. relative_root = Path(relative_root).as_posix()
  98. if relative_root.startswith("/"):
  99. relative_root = relative_root[1:]
  100. # process directories
  101. for dirname in dirs:
  102. relative_dir_path = posixpath.join(relative_root, dirname)
  103. self.logger.debug(f'Adding directory: "{relative_dir_path}"')
  104. self.addDirectory(relative_dir_path)
  105. # Process files
  106. for file in files:
  107. relative_file_path = posixpath.join(relative_root, file)
  108. if file in ignore_files:
  109. self.logger.info(f'Skipping file "{relative_file_path}"')
  110. continue
  111. full_file_path = posixpath.join(root, file)
  112. self.logger.debug(f'Adding file: "{relative_file_path}"')
  113. self.addFile(
  114. relative_file_path,
  115. file_md5(full_file_path),
  116. os.path.getsize(full_file_path),
  117. )
  118. def toFsTree(self):
  119. root = FsNode("", FsNode.NodeType.Directory)
  120. for record in self.records:
  121. if isinstance(record, ManifestRecordDirectory):
  122. root.addDirectory(record.path)
  123. elif isinstance(record, ManifestRecordFile):
  124. root.addFile(record.path, record.md5, record.size)
  125. return root
  126. @staticmethod
  127. def compare(left: "Manifest", right: "Manifest"):
  128. return compare_fs_trees(left.toFsTree(), right.toFsTree())