cache.py 8.4 KB


  1. import operator
  2. import os
  3. import csv
  4. import operator
  5. from enum import Enum, auto
  6. from typing import Set, ClassVar, Any
  7. from dataclasses import dataclass
  8. from ansi.color import fg
  9. from . import (
  10. ApiEntries,
  11. ApiEntryFunction,
  12. ApiEntryVariable,
  13. ApiHeader,
  14. )
  15. @dataclass(frozen=True)
  16. class SdkVersion:
  17. major: int = 0
  18. minor: int = 0
  19. csv_type: ClassVar[str] = "Version"
  20. def __str__(self) -> str:
  21. return f"{self.major}.{self.minor}"
  22. def as_int(self) -> int:
  23. return ((self.major & 0xFFFF) << 16) | (self.minor & 0xFFFF)
  24. @staticmethod
  25. def from_str(s: str) -> "SdkVersion":
  26. major, minor = s.split(".")
  27. return SdkVersion(int(major), int(minor))
  28. def dictify(self) -> dict:
  29. return dict(name=str(self), type=None, params=None)
  30. class VersionBump(Enum):
  31. NONE = auto()
  32. MAJOR = auto()
  33. MINOR = auto()
  34. class ApiEntryState(Enum):
  35. PENDING = "?"
  36. APPROVED = "+"
  37. DISABLED = "-"
  38. # Special value for API version entry so users have less incentive to edit it
  39. VERSION_PENDING = "v"
  40. # Class that stores all known API entries, both enabled and disabled.
  41. # Also keeps track of API versioning
  42. # Allows comparison and update from newly-generated API
  43. class SdkCache:
  44. CSV_FIELD_NAMES = ("entry", "status", "name", "type", "params")
  45. def __init__(self, cache_file: str, load_version_only=False):
  46. self.cache_file_name = cache_file
  47. self.version = SdkVersion(0, 0)
  48. self.sdk = ApiEntries()
  49. self.disabled_entries = set()
  50. self.new_entries = set()
  51. self.loaded_dirty_version = False
  52. self.version_action = VersionBump.NONE
  53. self._load_version_only = load_version_only
  54. self.load_cache()
  55. def is_buildable(self) -> bool:
  56. return (
  57. self.version != SdkVersion(0, 0)
  58. and self.version_action == VersionBump.NONE
  59. and not self._have_pending_entries()
  60. )
  61. def _filter_enabled(self, sdk_entries):
  62. return sorted(
  63. filter(lambda e: e not in self.disabled_entries, sdk_entries),
  64. key=operator.attrgetter("name"),
  65. )
  66. def get_valid_names(self):
  67. syms = set(map(lambda e: e.name, self.get_functions()))
  68. syms.update(map(lambda e: e.name, self.get_variables()))
  69. return syms
  70. def get_functions(self):
  71. return self._filter_enabled(self.sdk.functions)
  72. def get_variables(self):
  73. return self._filter_enabled(self.sdk.variables)
  74. def get_headers(self):
  75. return self._filter_enabled(self.sdk.headers)
  76. def _get_entry_status(self, entry) -> str:
  77. if entry in self.disabled_entries:
  78. return ApiEntryState.DISABLED
  79. elif entry in self.new_entries:
  80. if isinstance(entry, SdkVersion):
  81. return ApiEntryState.VERSION_PENDING
  82. return ApiEntryState.PENDING
  83. else:
  84. return ApiEntryState.APPROVED
  85. def _format_entry(self, obj):
  86. obj_dict = obj.dictify()
  87. obj_dict.update(
  88. dict(
  89. entry=obj.csv_type,
  90. status=self._get_entry_status(obj).value,
  91. )
  92. )
  93. return obj_dict
  94. def save(self) -> None:
  95. if self._load_version_only:
  96. raise Exception("Only SDK version was loaded, cannot save")
  97. if self.version_action == VersionBump.MINOR:
  98. self.version = SdkVersion(self.version.major, self.version.minor + 1)
  99. elif self.version_action == VersionBump.MAJOR:
  100. self.version = SdkVersion(self.version.major + 1, 0)
  101. if self._have_pending_entries():
  102. self.new_entries.add(self.version)
  103. print(
  104. fg.red(
  105. f"API version is still WIP: {self.version}. Review the changes and re-run command."
  106. )
  107. )
  108. print(f"CSV file entries to mark up:")
  109. print(
  110. fg.yellow(
  111. "\n".join(
  112. map(
  113. str,
  114. filter(
  115. lambda e: not isinstance(e, SdkVersion),
  116. self.new_entries,
  117. ),
  118. )
  119. )
  120. )
  121. )
  122. else:
  123. print(fg.green(f"API version {self.version} is up to date"))
  124. regenerate_csv = (
  125. self.loaded_dirty_version
  126. or self._have_pending_entries()
  127. or self.version_action != VersionBump.NONE
  128. )
  129. if regenerate_csv:
  130. str_cache_entries = [self.version]
  131. name_getter = operator.attrgetter("name")
  132. str_cache_entries.extend(sorted(self.sdk.headers, key=name_getter))
  133. str_cache_entries.extend(sorted(self.sdk.functions, key=name_getter))
  134. str_cache_entries.extend(sorted(self.sdk.variables, key=name_getter))
  135. with open(self.cache_file_name, "wt", newline="") as f:
  136. writer = csv.DictWriter(f, fieldnames=SdkCache.CSV_FIELD_NAMES)
  137. writer.writeheader()
  138. for entry in str_cache_entries:
  139. writer.writerow(self._format_entry(entry))
  140. def _process_entry(self, entry_dict: dict) -> None:
  141. entry_class = entry_dict["entry"]
  142. entry_status = entry_dict["status"]
  143. entry_name = entry_dict["name"]
  144. entry = None
  145. if entry_class == SdkVersion.csv_type:
  146. self.version = SdkVersion.from_str(entry_name)
  147. if entry_status == ApiEntryState.VERSION_PENDING.value:
  148. self.loaded_dirty_version = True
  149. elif entry_class == ApiHeader.csv_type:
  150. self.sdk.headers.add(entry := ApiHeader(entry_name))
  151. elif entry_class == ApiEntryFunction.csv_type:
  152. self.sdk.functions.add(
  153. entry := ApiEntryFunction(
  154. entry_name,
  155. entry_dict["type"],
  156. entry_dict["params"],
  157. )
  158. )
  159. elif entry_class == ApiEntryVariable.csv_type:
  160. self.sdk.variables.add(
  161. entry := ApiEntryVariable(entry_name, entry_dict["type"])
  162. )
  163. else:
  164. print(entry_dict)
  165. raise Exception("Unknown entry type: %s" % entry_class)
  166. if entry is None:
  167. return
  168. if entry_status == ApiEntryState.DISABLED.value:
  169. self.disabled_entries.add(entry)
  170. elif entry_status == ApiEntryState.PENDING.value:
  171. self.new_entries.add(entry)
  172. def load_cache(self) -> None:
  173. if not os.path.exists(self.cache_file_name):
  174. raise Exception(
  175. f"Cannot load symbol cache '{self.cache_file_name}'! File does not exist"
  176. )
  177. with open(self.cache_file_name, "rt") as f:
  178. reader = csv.DictReader(f)
  179. for row in reader:
  180. self._process_entry(row)
  181. if self._load_version_only and row.get("entry") == SdkVersion.csv_type:
  182. break
  183. def _have_pending_entries(self) -> bool:
  184. return any(
  185. filter(
  186. lambda e: not isinstance(e, SdkVersion),
  187. self.new_entries,
  188. )
  189. )
  190. def sync_sets(
  191. self, known_set: Set[Any], new_set: Set[Any], update_version: bool = True
  192. ):
  193. new_entries = new_set - known_set
  194. if new_entries:
  195. print(f"New: {new_entries}")
  196. known_set |= new_entries
  197. self.new_entries |= new_entries
  198. if update_version and self.version_action == VersionBump.NONE:
  199. self.version_action = VersionBump.MINOR
  200. removed_entries = known_set - new_set
  201. if removed_entries:
  202. print(f"Removed: {removed_entries}")
  203. known_set -= removed_entries
  204. # If any of removed entries was a part of active API, that's a major bump
  205. if update_version and any(
  206. filter(
  207. lambda e: e not in self.disabled_entries
  208. and e not in self.new_entries,
  209. removed_entries,
  210. )
  211. ):
  212. self.version_action = VersionBump.MAJOR
  213. self.disabled_entries -= removed_entries
  214. self.new_entries -= removed_entries
  215. def validate_api(self, api: ApiEntries) -> None:
  216. self.sync_sets(self.sdk.headers, api.headers, False)
  217. self.sync_sets(self.sdk.functions, api.functions)
  218. self.sync_sets(self.sdk.variables, api.variables)