QueueTools.py 3.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110
  1. # File: Queue.py
  2. # Author: Carl Allendorph
  3. # Date: 05NOV2014
  4. #
  5. # Description:
  6. # This file contains the implementation of a Queue Inspector
  7. # class.
  8. #
  9. import gdb
  10. from .List import ListInspector
  11. from .Task import TaskInspector
  12. class QueueMode:
  13. QUEUE = 0
  14. MUTEX = 1
  15. COUNTING = 2
  16. BINARY = 3
  17. RECURSIVE = 4
  18. Map = None
  19. @staticmethod
  20. def IsValid(qType):
  21. if (
  22. qType == QueueMode.QUEUE
  23. or qType == QueueMode.MUTEX
  24. or qType == QueueMode.COUNTING
  25. or qType == QueueMode.BINARY
  26. or qType == QueueMode.RECURSIVE
  27. ):
  28. return True
  29. else:
  30. return False
  31. QueueMap = {
  32. "mutex": QueueMode.MUTEX,
  33. "queue": QueueMode.QUEUE,
  34. "semaphore": QueueMode.BINARY,
  35. "counting": QueueMode.COUNTING,
  36. "recursive": QueueMode.RECURSIVE,
  37. }
  38. QueueMode.Map = QueueMap
  39. class QueueInspector:
  40. QueueType = gdb.lookup_type("Queue_t")
  41. def __init__(self, handle):
  42. """"""
  43. # print("Queue: Handle: %s" % handle)
  44. self.name = None
  45. queueObjPtr = None
  46. if type(handle) == gdb.Value:
  47. queueObjPtr = handle.cast(QueueInspector.QueueType.pointer())
  48. self._queue = queueObjPtr.dereference()
  49. else:
  50. queueObjPtr = gdb.Value(handle).cast(QueueInspector.QueueType.pointer())
  51. self._queue = queueObjPtr.dereference()
  52. def GetName(self):
  53. return self.name
  54. def SetName(self, name):
  55. self.name = name
  56. def GetTasksWaitingToSend(self):
  57. """Retrieve a list of gdb.Value objects of type
  58. TCB that are the tasks that are currently waiting to
  59. send data on this queue object.
  60. """
  61. sendList = ListInspector(self._queue["xTasksWaitingToSend"])
  62. return sendList.GetElements(TaskInspector.TCBType)
  63. def GetTasksWaitingToReceive(self):
  64. """Retrieve a list of gdb.Value objects of Type
  65. TCB that are the tasks that are currently waiting to
  66. receive data on this queue object.
  67. """
  68. rxList = ListInspector(self._queue["xTasksWaitingToReceive"])
  69. return rxList.GetElements(TaskInspector.TCBType)
  70. def GetQueueMessagesWaiting(self):
  71. """Return the number of messages waiting as a
  72. L{gdb.Value} object
  73. """
  74. return self._queue["uxMessagesWaiting"]
  75. def GetQueueType(self):
  76. """Return the Type of the Queue as a enumerated number"""
  77. try:
  78. qType = self._queue["ucQueueType"]
  79. if QueueMode.IsValid(int(qType)):
  80. return qType
  81. else:
  82. raise ValueError(
  83. "Invalid Queue Type In Queue Object! Are you sure this is a Queue Handle?"
  84. )
  85. except Exception as exc:
  86. # If the TRACE functionality of the RTOS is not enabled,
  87. # then the queue type will not be availabe in the queue
  88. # handle - so we return None
  89. print("Failed to get Type: %s" % str(exc))
  90. return None