QueueTools.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109
  1. # File: Queue.py
  2. # Author: Carl Allendorph
  3. # Date: 05NOV2014
  4. #
  5. # Description:
  6. # This file contains the implementation of a Queue Inspector
  7. # class.
  8. #
  9. import gdb
  10. from .List import ListInspector
  11. from .Task import TaskInspector
  12. class QueueMode:
  13. QUEUE = 0
  14. MUTEX = 1
  15. COUNTING = 2
  16. BINARY = 3
  17. RECURSIVE = 4
  18. Map = None
  19. @staticmethod
  20. def IsValid(qType):
  21. if (
  22. qType == QueueMode.QUEUE
  23. or qType == QueueMode.MUTEX
  24. or qType == QueueMode.COUNTING
  25. or qType == QueueMode.BINARY
  26. or qType == QueueMode.RECURSIVE
  27. ):
  28. return True
  29. else:
  30. return False
  31. QueueMap = {
  32. "mutex": QueueMode.MUTEX,
  33. "queue": QueueMode.QUEUE,
  34. "semaphore": QueueMode.BINARY,
  35. "counting": QueueMode.COUNTING,
  36. "recursive": QueueMode.RECURSIVE,
  37. }
  38. QueueMode.Map = QueueMap
  39. class QueueInspector:
  40. QueueType = gdb.lookup_type("Queue_t")
  41. def __init__(self, handle):
  42. # print("Queue: Handle: %s" % handle)
  43. self.name = None
  44. queueObjPtr = None
  45. if type(handle) == gdb.Value:
  46. queueObjPtr = handle.cast(QueueInspector.QueueType.pointer())
  47. self._queue = queueObjPtr.dereference()
  48. else:
  49. queueObjPtr = gdb.Value(handle).cast(QueueInspector.QueueType.pointer())
  50. self._queue = queueObjPtr.dereference()
  51. def GetName(self):
  52. return self.name
  53. def SetName(self, name):
  54. self.name = name
  55. def GetTasksWaitingToSend(self):
  56. """Retrieve a list of gdb.Value objects of type
  57. TCB that are the tasks that are currently waiting to
  58. send data on this queue object.
  59. """
  60. sendList = ListInspector(self._queue["xTasksWaitingToSend"])
  61. return sendList.GetElements(TaskInspector.TCBType)
  62. def GetTasksWaitingToReceive(self):
  63. """Retrieve a list of gdb.Value objects of Type
  64. TCB that are the tasks that are currently waiting to
  65. receive data on this queue object.
  66. """
  67. rxList = ListInspector(self._queue["xTasksWaitingToReceive"])
  68. return rxList.GetElements(TaskInspector.TCBType)
  69. def GetQueueMessagesWaiting(self):
  70. """Return the number of messages waiting as a
  71. L{gdb.Value} object
  72. """
  73. return self._queue["uxMessagesWaiting"]
  74. def GetQueueType(self):
  75. """Return the Type of the Queue as a enumerated number"""
  76. try:
  77. qType = self._queue["ucQueueType"]
  78. if QueueMode.IsValid(int(qType)):
  79. return qType
  80. else:
  81. raise ValueError(
  82. "Invalid Queue Type In Queue Object! Are you sure this is a Queue Handle?"
  83. )
  84. except Exception as exc:
  85. # If the TRACE functionality of the RTOS is not enabled,
  86. # then the queue type will not be availabe in the queue
  87. # handle - so we return None
  88. print("Failed to get Type: %s" % str(exc))
  89. return None