cache.py 8.5 KB


  1. import csv
  2. import operator
  3. import os
  4. from dataclasses import dataclass
  5. from enum import Enum, auto
  6. from typing import Any, ClassVar, Set
  7. from ansi.color import fg
  8. from . import ApiEntries, ApiEntryFunction, ApiEntryVariable, ApiHeader
  9. @dataclass(frozen=True)
  10. class SdkVersion:
  11. major: int = 0
  12. minor: int = 0
  13. csv_type: ClassVar[str] = "Version"
  14. def __str__(self) -> str:
  15. return f"{self.major}.{self.minor}"
  16. def as_int(self) -> int:
  17. return ((self.major & 0xFFFF) << 16) | (self.minor & 0xFFFF)
  18. @staticmethod
  19. def from_str(s: str) -> "SdkVersion":
  20. major, minor = s.split(".")
  21. return SdkVersion(int(major), int(minor))
  22. def dictify(self) -> dict:
  23. return dict(name=str(self), type=None, params=None)
  24. class VersionBump(Enum):
  25. NONE = auto()
  26. MAJOR = auto()
  27. MINOR = auto()
  28. class ApiEntryState(Enum):
  29. PENDING = "?"
  30. APPROVED = "+"
  31. DISABLED = "-"
  32. # Special value for API version entry so users have less incentive to edit it
  33. VERSION_PENDING = "v"
  34. # Class that stores all known API entries, both enabled and disabled.
  35. # Also keeps track of API versioning
  36. # Allows comparison and update from newly-generated API
  37. class SdkCache:
  38. CSV_FIELD_NAMES = ("entry", "status", "name", "type", "params")
  39. def __init__(self, cache_file: str, load_version_only=False):
  40. self.cache_file_name = cache_file
  41. self.version = SdkVersion(0, 0)
  42. self.sdk = ApiEntries()
  43. self.disabled_entries = set()
  44. self.new_entries = set()
  45. self.loaded_dirty_version = False
  46. self.version_action = VersionBump.NONE
  47. self._load_version_only = load_version_only
  48. self.load_cache()
  49. def is_buildable(self) -> bool:
  50. return (
  51. self.version != SdkVersion(0, 0)
  52. and self.version_action == VersionBump.NONE
  53. and not self._have_pending_entries()
  54. )
  55. def _filter_enabled(self, sdk_entries):
  56. return sorted(
  57. filter(lambda e: e not in self.disabled_entries, sdk_entries),
  58. key=operator.attrgetter("name"),
  59. )
  60. def get_valid_names(self):
  61. syms = set(map(lambda e: e.name, self.get_functions()))
  62. syms.update(map(lambda e: e.name, self.get_variables()))
  63. return syms
  64. def get_disabled_names(self):
  65. return set(map(lambda e: e.name, self.disabled_entries))
  66. def get_functions(self):
  67. return self._filter_enabled(self.sdk.functions)
  68. def get_variables(self):
  69. return self._filter_enabled(self.sdk.variables)
  70. def get_headers(self):
  71. return self._filter_enabled(self.sdk.headers)
  72. def _get_entry_status(self, entry) -> str:
  73. if entry in self.disabled_entries:
  74. return ApiEntryState.DISABLED
  75. elif entry in self.new_entries:
  76. if isinstance(entry, SdkVersion):
  77. return ApiEntryState.VERSION_PENDING
  78. return ApiEntryState.PENDING
  79. else:
  80. return ApiEntryState.APPROVED
  81. def _format_entry(self, obj):
  82. obj_dict = obj.dictify()
  83. obj_dict.update(
  84. dict(
  85. entry=obj.csv_type,
  86. status=self._get_entry_status(obj).value,
  87. )
  88. )
  89. return obj_dict
  90. def save(self) -> None:
  91. if self._load_version_only:
  92. raise Exception("Only SDK version was loaded, cannot save")
  93. if self.version_action == VersionBump.MINOR:
  94. self.version = SdkVersion(self.version.major, self.version.minor + 1)
  95. elif self.version_action == VersionBump.MAJOR:
  96. self.version = SdkVersion(self.version.major + 1, 0)
  97. if self._have_pending_entries():
  98. self.new_entries.add(self.version)
  99. print(
  100. fg.red(
  101. f"API version is still WIP: {self.version}. Review the changes and re-run command."
  102. )
  103. )
  104. print("CSV file entries to mark up:")
  105. print(
  106. fg.yellow(
  107. "\n".join(
  108. map(
  109. str,
  110. filter(
  111. lambda e: not isinstance(e, SdkVersion),
  112. self.new_entries,
  113. ),
  114. )
  115. )
  116. )
  117. )
  118. else:
  119. print(fg.green(f"API version {self.version} is up to date"))
  120. regenerate_csv = (
  121. self.loaded_dirty_version
  122. or self._have_pending_entries()
  123. or self.version_action != VersionBump.NONE
  124. )
  125. if regenerate_csv:
  126. str_cache_entries = [self.version]
  127. name_getter = operator.attrgetter("name")
  128. str_cache_entries.extend(sorted(self.sdk.headers, key=name_getter))
  129. str_cache_entries.extend(sorted(self.sdk.functions, key=name_getter))
  130. str_cache_entries.extend(sorted(self.sdk.variables, key=name_getter))
  131. with open(self.cache_file_name, "wt", newline="") as f:
  132. writer = csv.DictWriter(f, fieldnames=SdkCache.CSV_FIELD_NAMES)
  133. writer.writeheader()
  134. for entry in str_cache_entries:
  135. writer.writerow(self._format_entry(entry))
  136. def _process_entry(self, entry_dict: dict) -> None:
  137. entry_class = entry_dict["entry"]
  138. entry_status = entry_dict["status"]
  139. entry_name = entry_dict["name"]
  140. entry = None
  141. if entry_class == SdkVersion.csv_type:
  142. self.version = SdkVersion.from_str(entry_name)
  143. if entry_status == ApiEntryState.VERSION_PENDING.value:
  144. self.loaded_dirty_version = True
  145. elif entry_class == ApiHeader.csv_type:
  146. self.sdk.headers.add(entry := ApiHeader(entry_name))
  147. elif entry_class == ApiEntryFunction.csv_type:
  148. self.sdk.functions.add(
  149. entry := ApiEntryFunction(
  150. entry_name,
  151. entry_dict["type"],
  152. entry_dict["params"],
  153. )
  154. )
  155. elif entry_class == ApiEntryVariable.csv_type:
  156. self.sdk.variables.add(
  157. entry := ApiEntryVariable(entry_name, entry_dict["type"])
  158. )
  159. else:
  160. print(entry_dict)
  161. raise Exception("Unknown entry type: %s" % entry_class)
  162. if entry is None:
  163. return
  164. if entry_status == ApiEntryState.DISABLED.value:
  165. self.disabled_entries.add(entry)
  166. elif entry_status == ApiEntryState.PENDING.value:
  167. self.new_entries.add(entry)
  168. def load_cache(self) -> None:
  169. if not os.path.exists(self.cache_file_name):
  170. raise Exception(
  171. f"Cannot load symbol cache '{self.cache_file_name}'! File does not exist"
  172. )
  173. with open(self.cache_file_name, "rt") as f:
  174. reader = csv.DictReader(f)
  175. for row in reader:
  176. self._process_entry(row)
  177. if self._load_version_only and row.get("entry") == SdkVersion.csv_type:
  178. break
  179. def _have_pending_entries(self) -> bool:
  180. return any(
  181. filter(
  182. lambda e: not isinstance(e, SdkVersion),
  183. self.new_entries,
  184. )
  185. )
  186. def sync_sets(
  187. self, known_set: Set[Any], new_set: Set[Any], update_version: bool = True
  188. ):
  189. new_entries = new_set - known_set
  190. if new_entries:
  191. print(f"New: {new_entries}")
  192. known_set |= new_entries
  193. self.new_entries |= new_entries
  194. if update_version and self.version_action == VersionBump.NONE:
  195. self.version_action = VersionBump.MINOR
  196. removed_entries = known_set - new_set
  197. if removed_entries:
  198. print(f"Removed: {removed_entries}")
  199. known_set -= removed_entries
  200. # If any of removed entries was a part of active API, that's a major bump
  201. if update_version and any(
  202. filter(
  203. lambda e: e not in self.disabled_entries
  204. and e not in self.new_entries,
  205. removed_entries,
  206. )
  207. ):
  208. self.version_action = VersionBump.MAJOR
  209. self.disabled_entries -= removed_entries
  210. self.new_entries -= removed_entries
  211. def validate_api(self, api: ApiEntries) -> None:
  212. self.sync_sets(self.sdk.headers, api.headers, False)
  213. self.sync_sets(self.sdk.functions, api.functions)
  214. self.sync_sets(self.sdk.variables, api.variables)