funnel-queue.c 5.0 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170
  1. // SPDX-License-Identifier: GPL-2.0-only
  2. /*
  3. * Copyright 2023 Red Hat
  4. */
  5. #include "funnel-queue.h"
  6. #include "cpu.h"
  7. #include "memory-alloc.h"
  8. #include "permassert.h"
  9. int vdo_make_funnel_queue(struct funnel_queue **queue_ptr)
  10. {
  11. int result;
  12. struct funnel_queue *queue;
  13. result = vdo_allocate(1, struct funnel_queue, "funnel queue", &queue);
  14. if (result != VDO_SUCCESS)
  15. return result;
  16. /*
  17. * Initialize the stub entry and put it in the queue, establishing the invariant that
  18. * queue->newest and queue->oldest are never null.
  19. */
  20. queue->stub.next = NULL;
  21. queue->newest = &queue->stub;
  22. queue->oldest = &queue->stub;
  23. *queue_ptr = queue;
  24. return VDO_SUCCESS;
  25. }
  26. void vdo_free_funnel_queue(struct funnel_queue *queue)
  27. {
  28. vdo_free(queue);
  29. }
  30. static struct funnel_queue_entry *get_oldest(struct funnel_queue *queue)
  31. {
  32. /*
  33. * Barrier requirements: We need a read barrier between reading a "next" field pointer
  34. * value and reading anything it points to. There's an accompanying barrier in
  35. * vdo_funnel_queue_put() between its caller setting up the entry and making it visible.
  36. */
  37. struct funnel_queue_entry *oldest = queue->oldest;
  38. struct funnel_queue_entry *next = READ_ONCE(oldest->next);
  39. if (oldest == &queue->stub) {
  40. /*
  41. * When the oldest entry is the stub and it has no successor, the queue is
  42. * logically empty.
  43. */
  44. if (next == NULL)
  45. return NULL;
  46. /*
  47. * The stub entry has a successor, so the stub can be dequeued and ignored without
  48. * breaking the queue invariants.
  49. */
  50. oldest = next;
  51. queue->oldest = oldest;
  52. next = READ_ONCE(oldest->next);
  53. }
  54. /*
  55. * We have a non-stub candidate to dequeue. If it lacks a successor, we'll need to put the
  56. * stub entry back on the queue first.
  57. */
  58. if (next == NULL) {
  59. struct funnel_queue_entry *newest = READ_ONCE(queue->newest);
  60. if (oldest != newest) {
  61. /*
  62. * Another thread has already swung queue->newest atomically, but not yet
  63. * assigned previous->next. The queue is really still empty.
  64. */
  65. return NULL;
  66. }
  67. /*
  68. * Put the stub entry back on the queue, ensuring a successor will eventually be
  69. * seen.
  70. */
  71. vdo_funnel_queue_put(queue, &queue->stub);
  72. /* Check again for a successor. */
  73. next = READ_ONCE(oldest->next);
  74. if (next == NULL) {
  75. /*
  76. * We lost a race with a producer who swapped queue->newest before we did,
  77. * but who hasn't yet updated previous->next. Try again later.
  78. */
  79. return NULL;
  80. }
  81. }
  82. return oldest;
  83. }
  84. /*
  85. * Poll a queue, removing the oldest entry if the queue is not empty. This function must only be
  86. * called from a single consumer thread.
  87. */
  88. struct funnel_queue_entry *vdo_funnel_queue_poll(struct funnel_queue *queue)
  89. {
  90. struct funnel_queue_entry *oldest = get_oldest(queue);
  91. if (oldest == NULL)
  92. return oldest;
  93. /*
  94. * Dequeue the oldest entry and return it. Only one consumer thread may call this function,
  95. * so no locking, atomic operations, or fences are needed; queue->oldest is owned by the
  96. * consumer and oldest->next is never used by a producer thread after it is swung from NULL
  97. * to non-NULL.
  98. */
  99. queue->oldest = READ_ONCE(oldest->next);
  100. /*
  101. * Make sure the caller sees the proper stored data for this entry. Since we've already
  102. * fetched the entry pointer we stored in "queue->oldest", this also ensures that on entry
  103. * to the next call we'll properly see the dependent data.
  104. */
  105. smp_rmb();
  106. /*
  107. * If "oldest" is a very light-weight work item, we'll be looking for the next one very
  108. * soon, so prefetch it now.
  109. */
  110. uds_prefetch_address(queue->oldest, true);
  111. WRITE_ONCE(oldest->next, NULL);
  112. return oldest;
  113. }
  114. /*
  115. * Check whether the funnel queue is empty or not. If the queue is in a transition state with one
  116. * or more entries being added such that the list view is incomplete, this function will report the
  117. * queue as empty.
  118. */
  119. bool vdo_is_funnel_queue_empty(struct funnel_queue *queue)
  120. {
  121. return get_oldest(queue) == NULL;
  122. }
  123. /*
  124. * Check whether the funnel queue is idle or not. If the queue has entries available to be
  125. * retrieved, it is not idle. If the queue is in a transition state with one or more entries being
  126. * added such that the list view is incomplete, it may not be possible to retrieve an entry with
  127. * the vdo_funnel_queue_poll() function, but the queue will not be considered idle.
  128. */
  129. bool vdo_is_funnel_queue_idle(struct funnel_queue *queue)
  130. {
  131. /*
  132. * Oldest is not the stub, so there's another entry, though if next is NULL we can't
  133. * retrieve it yet.
  134. */
  135. if (queue->oldest != &queue->stub)
  136. return false;
  137. /*
  138. * Oldest is the stub, but newest has been updated by _put(); either there's another,
  139. * retrievable entry in the list, or the list is officially empty but in the intermediate
  140. * state of having an entry added.
  141. *
  142. * Whether anything is retrievable depends on whether stub.next has been updated and become
  143. * visible to us, but for idleness we don't care. And due to memory ordering in _put(), the
  144. * update to newest would be visible to us at the same time or sooner.
  145. */
  146. if (READ_ONCE(queue->newest) != &queue->stub)
  147. return false;
  148. return true;
  149. }