wait.c 8.8 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Waiting for completion events
  4. */
  5. #include <linux/kernel.h>
  6. #include <linux/sched/signal.h>
  7. #include <linux/io_uring.h>
  8. #include <trace/events/io_uring.h>
  9. #include <uapi/linux/io_uring.h>
  10. #include "io_uring.h"
  11. #include "napi.h"
  12. #include "wait.h"
  13. static int io_wake_function(struct wait_queue_entry *curr, unsigned int mode,
  14. int wake_flags, void *key)
  15. {
  16. struct io_wait_queue *iowq = container_of(curr, struct io_wait_queue, wq);
  17. /*
  18. * Cannot safely flush overflowed CQEs from here, ensure we wake up
  19. * the task, and the next invocation will do it.
  20. */
  21. if (io_should_wake(iowq) || io_has_work(iowq->ctx))
  22. return autoremove_wake_function(curr, mode, wake_flags, key);
  23. return -1;
  24. }
  25. int io_run_task_work_sig(struct io_ring_ctx *ctx)
  26. {
  27. if (io_local_work_pending(ctx)) {
  28. __set_current_state(TASK_RUNNING);
  29. if (io_run_local_work(ctx, INT_MAX, IO_LOCAL_TW_DEFAULT_MAX) > 0)
  30. return 0;
  31. }
  32. if (io_run_task_work() > 0)
  33. return 0;
  34. if (task_sigpending(current))
  35. return -EINTR;
  36. return 0;
  37. }
  38. static bool current_pending_io(void)
  39. {
  40. struct io_uring_task *tctx = current->io_uring;
  41. if (!tctx)
  42. return false;
  43. return percpu_counter_read_positive(&tctx->inflight);
  44. }
  45. static enum hrtimer_restart io_cqring_timer_wakeup(struct hrtimer *timer)
  46. {
  47. struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
  48. WRITE_ONCE(iowq->hit_timeout, 1);
  49. iowq->min_timeout = 0;
  50. wake_up_process(iowq->wq.private);
  51. return HRTIMER_NORESTART;
  52. }
  53. /*
  54. * Doing min_timeout portion. If we saw any timeouts, events, or have work,
  55. * wake up. If not, and we have a normal timeout, switch to that and keep
  56. * sleeping.
  57. */
  58. static enum hrtimer_restart io_cqring_min_timer_wakeup(struct hrtimer *timer)
  59. {
  60. struct io_wait_queue *iowq = container_of(timer, struct io_wait_queue, t);
  61. struct io_ring_ctx *ctx = iowq->ctx;
  62. /* no general timeout, or shorter (or equal), we are done */
  63. if (iowq->timeout == KTIME_MAX ||
  64. ktime_compare(iowq->min_timeout, iowq->timeout) >= 0)
  65. goto out_wake;
  66. /* work we may need to run, wake function will see if we need to wake */
  67. if (io_has_work(ctx))
  68. goto out_wake;
  69. /* got events since we started waiting, min timeout is done */
  70. scoped_guard(rcu) {
  71. struct io_rings *rings = io_get_rings(ctx);
  72. if (iowq->cq_min_tail != READ_ONCE(rings->cq.tail))
  73. goto out_wake;
  74. /* if we have any events and min timeout expired, we're done */
  75. if (io_cqring_events(ctx))
  76. goto out_wake;
  77. }
  78. /*
  79. * If using deferred task_work running and application is waiting on
  80. * more than one request, ensure we reset it now where we are switching
  81. * to normal sleeps. Any request completion post min_wait should wake
  82. * the task and return.
  83. */
  84. if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
  85. atomic_set(&ctx->cq_wait_nr, 1);
  86. smp_mb();
  87. if (!llist_empty(&ctx->work_llist))
  88. goto out_wake;
  89. }
  90. /* any generated CQE posted past this time should wake us up */
  91. iowq->cq_tail = iowq->cq_min_tail;
  92. hrtimer_update_function(&iowq->t, io_cqring_timer_wakeup);
  93. hrtimer_set_expires(timer, iowq->timeout);
  94. return HRTIMER_RESTART;
  95. out_wake:
  96. return io_cqring_timer_wakeup(timer);
  97. }
  98. static int io_cqring_schedule_timeout(struct io_wait_queue *iowq,
  99. clockid_t clock_id, ktime_t start_time)
  100. {
  101. ktime_t timeout;
  102. if (iowq->min_timeout) {
  103. timeout = ktime_add_ns(iowq->min_timeout, start_time);
  104. hrtimer_setup_on_stack(&iowq->t, io_cqring_min_timer_wakeup, clock_id,
  105. HRTIMER_MODE_ABS);
  106. } else {
  107. timeout = iowq->timeout;
  108. hrtimer_setup_on_stack(&iowq->t, io_cqring_timer_wakeup, clock_id,
  109. HRTIMER_MODE_ABS);
  110. }
  111. hrtimer_set_expires_range_ns(&iowq->t, timeout, 0);
  112. hrtimer_start_expires(&iowq->t, HRTIMER_MODE_ABS);
  113. if (!READ_ONCE(iowq->hit_timeout))
  114. schedule();
  115. hrtimer_cancel(&iowq->t);
  116. destroy_hrtimer_on_stack(&iowq->t);
  117. __set_current_state(TASK_RUNNING);
  118. return READ_ONCE(iowq->hit_timeout) ? -ETIME : 0;
  119. }
  120. static int __io_cqring_wait_schedule(struct io_ring_ctx *ctx,
  121. struct io_wait_queue *iowq,
  122. struct ext_arg *ext_arg,
  123. ktime_t start_time)
  124. {
  125. int ret = 0;
  126. /*
  127. * Mark us as being in io_wait if we have pending requests, so cpufreq
  128. * can take into account that the task is waiting for IO - turns out
  129. * to be important for low QD IO.
  130. */
  131. if (ext_arg->iowait && current_pending_io())
  132. current->in_iowait = 1;
  133. if (iowq->timeout != KTIME_MAX || iowq->min_timeout)
  134. ret = io_cqring_schedule_timeout(iowq, ctx->clockid, start_time);
  135. else
  136. schedule();
  137. current->in_iowait = 0;
  138. return ret;
  139. }
  140. /* If this returns > 0, the caller should retry */
  141. static inline int io_cqring_wait_schedule(struct io_ring_ctx *ctx,
  142. struct io_wait_queue *iowq,
  143. struct ext_arg *ext_arg,
  144. ktime_t start_time)
  145. {
  146. if (unlikely(READ_ONCE(ctx->check_cq)))
  147. return 1;
  148. if (unlikely(io_local_work_pending(ctx)))
  149. return 1;
  150. if (unlikely(task_work_pending(current)))
  151. return 1;
  152. if (unlikely(task_sigpending(current)))
  153. return -EINTR;
  154. if (unlikely(io_should_wake(iowq)))
  155. return 0;
  156. return __io_cqring_wait_schedule(ctx, iowq, ext_arg, start_time);
  157. }
  158. /*
  159. * Wait until events become available, if we don't already have some. The
  160. * application must reap them itself, as they reside on the shared cq ring.
  161. */
  162. int io_cqring_wait(struct io_ring_ctx *ctx, int min_events, u32 flags,
  163. struct ext_arg *ext_arg)
  164. {
  165. struct io_wait_queue iowq;
  166. struct io_rings *rings;
  167. ktime_t start_time;
  168. int ret, nr_wait;
  169. min_events = min_t(int, min_events, ctx->cq_entries);
  170. if (!io_allowed_run_tw(ctx))
  171. return -EEXIST;
  172. if (io_local_work_pending(ctx))
  173. io_run_local_work(ctx, min_events,
  174. max(IO_LOCAL_TW_DEFAULT_MAX, min_events));
  175. io_run_task_work();
  176. if (unlikely(test_bit(IO_CHECK_CQ_OVERFLOW_BIT, &ctx->check_cq)))
  177. io_cqring_do_overflow_flush(ctx);
  178. rcu_read_lock();
  179. rings = io_get_rings(ctx);
  180. if (__io_cqring_events_user(ctx) >= min_events) {
  181. rcu_read_unlock();
  182. return 0;
  183. }
  184. init_waitqueue_func_entry(&iowq.wq, io_wake_function);
  185. iowq.wq.private = current;
  186. INIT_LIST_HEAD(&iowq.wq.entry);
  187. iowq.ctx = ctx;
  188. iowq.cq_tail = READ_ONCE(rings->cq.head) + min_events;
  189. iowq.cq_min_tail = READ_ONCE(rings->cq.tail);
  190. nr_wait = (int) iowq.cq_tail - READ_ONCE(rings->cq.tail);
  191. rcu_read_unlock();
  192. rings = NULL;
  193. iowq.nr_timeouts = atomic_read(&ctx->cq_timeouts);
  194. iowq.hit_timeout = 0;
  195. iowq.min_timeout = ext_arg->min_time;
  196. iowq.timeout = KTIME_MAX;
  197. start_time = io_get_time(ctx);
  198. if (ext_arg->ts_set) {
  199. iowq.timeout = timespec64_to_ktime(ext_arg->ts);
  200. if (!(flags & IORING_ENTER_ABS_TIMER))
  201. iowq.timeout = ktime_add(iowq.timeout, start_time);
  202. }
  203. if (ext_arg->sig) {
  204. #ifdef CONFIG_COMPAT
  205. if (in_compat_syscall())
  206. ret = set_compat_user_sigmask((const compat_sigset_t __user *)ext_arg->sig,
  207. ext_arg->argsz);
  208. else
  209. #endif
  210. ret = set_user_sigmask(ext_arg->sig, ext_arg->argsz);
  211. if (ret)
  212. return ret;
  213. }
  214. io_napi_busy_loop(ctx, &iowq);
  215. trace_io_uring_cqring_wait(ctx, min_events);
  216. do {
  217. unsigned long check_cq;
  218. if (ctx->flags & IORING_SETUP_DEFER_TASKRUN) {
  219. atomic_set(&ctx->cq_wait_nr, nr_wait);
  220. set_current_state(TASK_INTERRUPTIBLE);
  221. } else {
  222. prepare_to_wait_exclusive(&ctx->cq_wait, &iowq.wq,
  223. TASK_INTERRUPTIBLE);
  224. }
  225. ret = io_cqring_wait_schedule(ctx, &iowq, ext_arg, start_time);
  226. __set_current_state(TASK_RUNNING);
  227. atomic_set(&ctx->cq_wait_nr, IO_CQ_WAKE_INIT);
  228. /*
  229. * Run task_work after scheduling and before io_should_wake().
  230. * If we got woken because of task_work being processed, run it
  231. * now rather than let the caller do another wait loop.
  232. */
  233. if (io_local_work_pending(ctx))
  234. io_run_local_work(ctx, nr_wait, nr_wait);
  235. io_run_task_work();
  236. /*
  237. * Non-local task_work will be run on exit to userspace, but
  238. * if we're using DEFER_TASKRUN, then we could have waited
  239. * with a timeout for a number of requests. If the timeout
  240. * hits, we could have some requests ready to process. Ensure
  241. * this break is _after_ we have run task_work, to avoid
  242. * deferring running potentially pending requests until the
  243. * next time we wait for events.
  244. */
  245. if (ret < 0)
  246. break;
  247. check_cq = READ_ONCE(ctx->check_cq);
  248. if (unlikely(check_cq)) {
  249. /* let the caller flush overflows, retry */
  250. if (check_cq & BIT(IO_CHECK_CQ_OVERFLOW_BIT))
  251. io_cqring_do_overflow_flush(ctx);
  252. if (check_cq & BIT(IO_CHECK_CQ_DROPPED_BIT)) {
  253. ret = -EBADR;
  254. break;
  255. }
  256. }
  257. if (io_should_wake(&iowq)) {
  258. ret = 0;
  259. break;
  260. }
  261. cond_resched();
  262. /* if min timeout has been hit, don't reset wait count */
  263. if (!iowq.hit_timeout)
  264. scoped_guard(rcu)
  265. nr_wait = (int) iowq.cq_tail -
  266. READ_ONCE(io_get_rings(ctx)->cq.tail);
  267. else
  268. nr_wait = 1;
  269. } while (1);
  270. if (!(ctx->flags & IORING_SETUP_DEFER_TASKRUN))
  271. finish_wait(&ctx->cq_wait, &iowq.wq);
  272. restore_saved_sigmask_unless(ret == -EINTR);
  273. guard(rcu)();
  274. return READ_ONCE(io_get_rings(ctx)->cq.head) == READ_ONCE(io_get_rings(ctx)->cq.tail) ? ret : 0;
  275. }