sqpoll.c 13 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Contains the core associated with submission side polling of the SQ
  4. * ring, offloading submissions from the application to a kernel thread.
  5. */
  6. #include <linux/kernel.h>
  7. #include <linux/errno.h>
  8. #include <linux/file.h>
  9. #include <linux/mm.h>
  10. #include <linux/slab.h>
  11. #include <linux/audit.h>
  12. #include <linux/security.h>
  13. #include <linux/cpuset.h>
  14. #include <linux/sched/cputime.h>
  15. #include <linux/io_uring.h>
  16. #include <uapi/linux/io_uring.h>
  17. #include "io_uring.h"
  18. #include "tctx.h"
  19. #include "napi.h"
  20. #include "cancel.h"
  21. #include "sqpoll.h"
  22. #define IORING_SQPOLL_CAP_ENTRIES_VALUE 8
  23. #define IORING_TW_CAP_ENTRIES_VALUE 32
  24. enum {
  25. IO_SQ_THREAD_SHOULD_STOP = 0,
  26. IO_SQ_THREAD_SHOULD_PARK,
  27. };
  28. void io_sq_thread_unpark(struct io_sq_data *sqd)
  29. __releases(&sqd->lock)
  30. {
  31. WARN_ON_ONCE(sqpoll_task_locked(sqd) == current);
  32. /*
  33. * Do the dance but not conditional clear_bit() because it'd race with
  34. * other threads incrementing park_pending and setting the bit.
  35. */
  36. clear_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
  37. if (atomic_dec_return(&sqd->park_pending))
  38. set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
  39. mutex_unlock(&sqd->lock);
  40. wake_up(&sqd->wait);
  41. }
  42. void io_sq_thread_park(struct io_sq_data *sqd)
  43. __acquires(&sqd->lock)
  44. {
  45. struct task_struct *tsk;
  46. atomic_inc(&sqd->park_pending);
  47. set_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state);
  48. mutex_lock(&sqd->lock);
  49. tsk = sqpoll_task_locked(sqd);
  50. if (tsk) {
  51. WARN_ON_ONCE(tsk == current);
  52. wake_up_process(tsk);
  53. }
  54. }
  55. void io_sq_thread_stop(struct io_sq_data *sqd)
  56. {
  57. struct task_struct *tsk;
  58. WARN_ON_ONCE(test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state));
  59. set_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
  60. mutex_lock(&sqd->lock);
  61. tsk = sqpoll_task_locked(sqd);
  62. if (tsk) {
  63. WARN_ON_ONCE(tsk == current);
  64. wake_up_process(tsk);
  65. }
  66. mutex_unlock(&sqd->lock);
  67. wait_for_completion(&sqd->exited);
  68. }
  69. void io_put_sq_data(struct io_sq_data *sqd)
  70. {
  71. if (refcount_dec_and_test(&sqd->refs)) {
  72. WARN_ON_ONCE(atomic_read(&sqd->park_pending));
  73. io_sq_thread_stop(sqd);
  74. kfree(sqd);
  75. }
  76. }
  77. static __cold void io_sqd_update_thread_idle(struct io_sq_data *sqd)
  78. {
  79. struct io_ring_ctx *ctx;
  80. unsigned sq_thread_idle = 0;
  81. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  82. sq_thread_idle = max(sq_thread_idle, ctx->sq_thread_idle);
  83. sqd->sq_thread_idle = sq_thread_idle;
  84. }
  85. void io_sq_thread_finish(struct io_ring_ctx *ctx)
  86. {
  87. struct io_sq_data *sqd = ctx->sq_data;
  88. if (sqd) {
  89. io_sq_thread_park(sqd);
  90. list_del_init(&ctx->sqd_list);
  91. io_sqd_update_thread_idle(sqd);
  92. io_sq_thread_unpark(sqd);
  93. io_put_sq_data(sqd);
  94. ctx->sq_data = NULL;
  95. }
  96. }
  97. static struct io_sq_data *io_attach_sq_data(struct io_uring_params *p)
  98. {
  99. struct io_ring_ctx *ctx_attach;
  100. struct io_sq_data *sqd;
  101. CLASS(fd, f)(p->wq_fd);
  102. if (fd_empty(f))
  103. return ERR_PTR(-ENXIO);
  104. if (!io_is_uring_fops(fd_file(f)))
  105. return ERR_PTR(-EINVAL);
  106. ctx_attach = fd_file(f)->private_data;
  107. sqd = ctx_attach->sq_data;
  108. if (!sqd)
  109. return ERR_PTR(-EINVAL);
  110. if (sqd->task_tgid != current->tgid)
  111. return ERR_PTR(-EPERM);
  112. refcount_inc(&sqd->refs);
  113. return sqd;
  114. }
  115. static struct io_sq_data *io_get_sq_data(struct io_uring_params *p,
  116. bool *attached)
  117. {
  118. struct io_sq_data *sqd;
  119. *attached = false;
  120. if (p->flags & IORING_SETUP_ATTACH_WQ) {
  121. sqd = io_attach_sq_data(p);
  122. if (!IS_ERR(sqd)) {
  123. *attached = true;
  124. return sqd;
  125. }
  126. /* fall through for EPERM case, setup new sqd/task */
  127. if (PTR_ERR(sqd) != -EPERM)
  128. return sqd;
  129. }
  130. sqd = kzalloc_obj(*sqd);
  131. if (!sqd)
  132. return ERR_PTR(-ENOMEM);
  133. atomic_set(&sqd->park_pending, 0);
  134. refcount_set(&sqd->refs, 1);
  135. INIT_LIST_HEAD(&sqd->ctx_list);
  136. mutex_init(&sqd->lock);
  137. init_waitqueue_head(&sqd->wait);
  138. init_completion(&sqd->exited);
  139. return sqd;
  140. }
  141. static inline bool io_sqd_events_pending(struct io_sq_data *sqd)
  142. {
  143. return READ_ONCE(sqd->state);
  144. }
  145. struct io_sq_time {
  146. bool started;
  147. u64 usec;
  148. };
  149. u64 io_sq_cpu_usec(struct task_struct *tsk)
  150. {
  151. u64 utime, stime;
  152. task_cputime_adjusted(tsk, &utime, &stime);
  153. do_div(stime, 1000);
  154. return stime;
  155. }
  156. static void io_sq_update_worktime(struct io_sq_data *sqd, struct io_sq_time *ist)
  157. {
  158. if (!ist->started)
  159. return;
  160. ist->started = false;
  161. sqd->work_time += io_sq_cpu_usec(current) - ist->usec;
  162. }
  163. static void io_sq_start_worktime(struct io_sq_time *ist)
  164. {
  165. if (ist->started)
  166. return;
  167. ist->started = true;
  168. ist->usec = io_sq_cpu_usec(current);
  169. }
  170. static int __io_sq_thread(struct io_ring_ctx *ctx, struct io_sq_data *sqd,
  171. bool cap_entries, struct io_sq_time *ist)
  172. {
  173. unsigned int to_submit;
  174. int ret = 0;
  175. to_submit = io_sqring_entries(ctx);
  176. /* if we're handling multiple rings, cap submit size for fairness */
  177. if (cap_entries && to_submit > IORING_SQPOLL_CAP_ENTRIES_VALUE)
  178. to_submit = IORING_SQPOLL_CAP_ENTRIES_VALUE;
  179. if (to_submit || !list_empty(&ctx->iopoll_list)) {
  180. const struct cred *creds = NULL;
  181. io_sq_start_worktime(ist);
  182. if (ctx->sq_creds != current_cred())
  183. creds = override_creds(ctx->sq_creds);
  184. mutex_lock(&ctx->uring_lock);
  185. if (!list_empty(&ctx->iopoll_list))
  186. io_do_iopoll(ctx, true);
  187. /*
  188. * Don't submit if refs are dying, good for io_uring_register(),
  189. * but also it is relied upon by io_ring_exit_work()
  190. */
  191. if (to_submit && likely(!percpu_ref_is_dying(&ctx->refs)) &&
  192. !(ctx->flags & IORING_SETUP_R_DISABLED))
  193. ret = io_submit_sqes(ctx, to_submit);
  194. mutex_unlock(&ctx->uring_lock);
  195. if (to_submit && wq_has_sleeper(&ctx->sqo_sq_wait))
  196. wake_up(&ctx->sqo_sq_wait);
  197. if (creds)
  198. revert_creds(creds);
  199. }
  200. return ret;
  201. }
  202. static bool io_sqd_handle_event(struct io_sq_data *sqd)
  203. {
  204. bool did_sig = false;
  205. struct ksignal ksig;
  206. if (test_bit(IO_SQ_THREAD_SHOULD_PARK, &sqd->state) ||
  207. signal_pending(current)) {
  208. mutex_unlock(&sqd->lock);
  209. if (signal_pending(current))
  210. did_sig = get_signal(&ksig);
  211. wait_event(sqd->wait, !atomic_read(&sqd->park_pending));
  212. mutex_lock(&sqd->lock);
  213. sqd->sq_cpu = raw_smp_processor_id();
  214. }
  215. return did_sig || test_bit(IO_SQ_THREAD_SHOULD_STOP, &sqd->state);
  216. }
  217. /*
  218. * Run task_work, processing the retry_list first. The retry_list holds
  219. * entries that we passed on in the previous run, if we had more task_work
  220. * than we were asked to process. Newly queued task_work isn't run until the
  221. * retry list has been fully processed.
  222. */
  223. static unsigned int io_sq_tw(struct llist_node **retry_list, int max_entries)
  224. {
  225. struct io_uring_task *tctx = current->io_uring;
  226. unsigned int count = 0;
  227. if (*retry_list) {
  228. *retry_list = io_handle_tw_list(*retry_list, &count, max_entries);
  229. if (count >= max_entries)
  230. goto out;
  231. max_entries -= count;
  232. }
  233. *retry_list = tctx_task_work_run(tctx, max_entries, &count);
  234. out:
  235. if (task_work_pending(current))
  236. task_work_run();
  237. return count;
  238. }
  239. static bool io_sq_tw_pending(struct llist_node *retry_list)
  240. {
  241. struct io_uring_task *tctx = current->io_uring;
  242. return retry_list || !llist_empty(&tctx->task_list);
  243. }
  244. static int io_sq_thread(void *data)
  245. {
  246. struct llist_node *retry_list = NULL;
  247. struct io_sq_data *sqd = data;
  248. struct io_ring_ctx *ctx;
  249. unsigned long timeout = 0;
  250. char buf[TASK_COMM_LEN] = {};
  251. DEFINE_WAIT(wait);
  252. /* offload context creation failed, just exit */
  253. if (!current->io_uring) {
  254. mutex_lock(&sqd->lock);
  255. rcu_assign_pointer(sqd->thread, NULL);
  256. put_task_struct(current);
  257. mutex_unlock(&sqd->lock);
  258. goto err_out;
  259. }
  260. snprintf(buf, sizeof(buf), "iou-sqp-%d", sqd->task_pid);
  261. set_task_comm(current, buf);
  262. /* reset to our pid after we've set task_comm, for fdinfo */
  263. sqd->task_pid = current->pid;
  264. if (sqd->sq_cpu != -1) {
  265. set_cpus_allowed_ptr(current, cpumask_of(sqd->sq_cpu));
  266. } else {
  267. set_cpus_allowed_ptr(current, cpu_online_mask);
  268. sqd->sq_cpu = raw_smp_processor_id();
  269. }
  270. /*
  271. * Force audit context to get setup, in case we do prep side async
  272. * operations that would trigger an audit call before any issue side
  273. * audit has been done.
  274. */
  275. audit_uring_entry(IORING_OP_NOP);
  276. audit_uring_exit(true, 0);
  277. mutex_lock(&sqd->lock);
  278. while (1) {
  279. bool cap_entries, sqt_spin = false;
  280. struct io_sq_time ist = { };
  281. if (io_sqd_events_pending(sqd) || signal_pending(current)) {
  282. if (io_sqd_handle_event(sqd))
  283. break;
  284. timeout = jiffies + sqd->sq_thread_idle;
  285. }
  286. cap_entries = !list_is_singular(&sqd->ctx_list);
  287. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
  288. int ret = __io_sq_thread(ctx, sqd, cap_entries, &ist);
  289. if (!sqt_spin && (ret > 0 || !list_empty(&ctx->iopoll_list)))
  290. sqt_spin = true;
  291. }
  292. if (io_sq_tw(&retry_list, IORING_TW_CAP_ENTRIES_VALUE))
  293. sqt_spin = true;
  294. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
  295. if (io_napi(ctx)) {
  296. io_sq_start_worktime(&ist);
  297. io_napi_sqpoll_busy_poll(ctx);
  298. }
  299. }
  300. io_sq_update_worktime(sqd, &ist);
  301. if (sqt_spin || !time_after(jiffies, timeout)) {
  302. if (sqt_spin)
  303. timeout = jiffies + sqd->sq_thread_idle;
  304. if (unlikely(need_resched())) {
  305. mutex_unlock(&sqd->lock);
  306. cond_resched();
  307. mutex_lock(&sqd->lock);
  308. sqd->sq_cpu = raw_smp_processor_id();
  309. }
  310. continue;
  311. }
  312. prepare_to_wait(&sqd->wait, &wait, TASK_INTERRUPTIBLE);
  313. if (!io_sqd_events_pending(sqd) && !io_sq_tw_pending(retry_list)) {
  314. bool needs_sched = true;
  315. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list) {
  316. atomic_or(IORING_SQ_NEED_WAKEUP,
  317. &ctx->rings->sq_flags);
  318. if ((ctx->flags & IORING_SETUP_IOPOLL) &&
  319. !list_empty(&ctx->iopoll_list)) {
  320. needs_sched = false;
  321. break;
  322. }
  323. /*
  324. * Ensure the store of the wakeup flag is not
  325. * reordered with the load of the SQ tail
  326. */
  327. smp_mb__after_atomic();
  328. if (io_sqring_entries(ctx)) {
  329. needs_sched = false;
  330. break;
  331. }
  332. }
  333. if (needs_sched) {
  334. mutex_unlock(&sqd->lock);
  335. schedule();
  336. mutex_lock(&sqd->lock);
  337. sqd->sq_cpu = raw_smp_processor_id();
  338. }
  339. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  340. atomic_andnot(IORING_SQ_NEED_WAKEUP,
  341. &ctx->rings->sq_flags);
  342. }
  343. finish_wait(&sqd->wait, &wait);
  344. timeout = jiffies + sqd->sq_thread_idle;
  345. }
  346. if (retry_list)
  347. io_sq_tw(&retry_list, UINT_MAX);
  348. io_uring_cancel_generic(true, sqd);
  349. rcu_assign_pointer(sqd->thread, NULL);
  350. put_task_struct(current);
  351. list_for_each_entry(ctx, &sqd->ctx_list, sqd_list)
  352. atomic_or(IORING_SQ_NEED_WAKEUP, &ctx->rings->sq_flags);
  353. io_run_task_work();
  354. mutex_unlock(&sqd->lock);
  355. err_out:
  356. complete(&sqd->exited);
  357. do_exit(0);
  358. }
  359. void io_sqpoll_wait_sq(struct io_ring_ctx *ctx)
  360. {
  361. DEFINE_WAIT(wait);
  362. do {
  363. if (!io_sqring_full(ctx))
  364. break;
  365. prepare_to_wait(&ctx->sqo_sq_wait, &wait, TASK_INTERRUPTIBLE);
  366. if (!io_sqring_full(ctx))
  367. break;
  368. schedule();
  369. } while (!signal_pending(current));
  370. finish_wait(&ctx->sqo_sq_wait, &wait);
  371. }
  372. __cold int io_sq_offload_create(struct io_ring_ctx *ctx,
  373. struct io_uring_params *p)
  374. {
  375. int ret;
  376. /* Retain compatibility with failing for an invalid attach attempt */
  377. if ((ctx->flags & (IORING_SETUP_ATTACH_WQ | IORING_SETUP_SQPOLL)) ==
  378. IORING_SETUP_ATTACH_WQ) {
  379. CLASS(fd, f)(p->wq_fd);
  380. if (fd_empty(f))
  381. return -ENXIO;
  382. if (!io_is_uring_fops(fd_file(f)))
  383. return -EINVAL;
  384. }
  385. if (ctx->flags & IORING_SETUP_SQPOLL) {
  386. struct task_struct *tsk;
  387. struct io_sq_data *sqd;
  388. bool attached;
  389. ret = security_uring_sqpoll();
  390. if (ret)
  391. return ret;
  392. sqd = io_get_sq_data(p, &attached);
  393. if (IS_ERR(sqd)) {
  394. ret = PTR_ERR(sqd);
  395. goto err;
  396. }
  397. ctx->sq_creds = get_current_cred();
  398. ctx->sq_data = sqd;
  399. ctx->sq_thread_idle = msecs_to_jiffies(p->sq_thread_idle);
  400. if (!ctx->sq_thread_idle)
  401. ctx->sq_thread_idle = HZ;
  402. io_sq_thread_park(sqd);
  403. list_add(&ctx->sqd_list, &sqd->ctx_list);
  404. io_sqd_update_thread_idle(sqd);
  405. /* don't attach to a dying SQPOLL thread, would be racy */
  406. ret = (attached && !sqd->thread) ? -ENXIO : 0;
  407. io_sq_thread_unpark(sqd);
  408. if (ret < 0)
  409. goto err;
  410. if (attached)
  411. return 0;
  412. if (p->flags & IORING_SETUP_SQ_AFF) {
  413. cpumask_var_t allowed_mask;
  414. int cpu = p->sq_thread_cpu;
  415. ret = -EINVAL;
  416. if (cpu >= nr_cpu_ids || !cpu_online(cpu))
  417. goto err_sqpoll;
  418. ret = -ENOMEM;
  419. if (!alloc_cpumask_var(&allowed_mask, GFP_KERNEL))
  420. goto err_sqpoll;
  421. ret = -EINVAL;
  422. cpuset_cpus_allowed(current, allowed_mask);
  423. if (!cpumask_test_cpu(cpu, allowed_mask)) {
  424. free_cpumask_var(allowed_mask);
  425. goto err_sqpoll;
  426. }
  427. free_cpumask_var(allowed_mask);
  428. sqd->sq_cpu = cpu;
  429. } else {
  430. sqd->sq_cpu = -1;
  431. }
  432. sqd->task_pid = current->pid;
  433. sqd->task_tgid = current->tgid;
  434. tsk = create_io_thread(io_sq_thread, sqd, NUMA_NO_NODE);
  435. if (IS_ERR(tsk)) {
  436. ret = PTR_ERR(tsk);
  437. goto err_sqpoll;
  438. }
  439. mutex_lock(&sqd->lock);
  440. rcu_assign_pointer(sqd->thread, tsk);
  441. mutex_unlock(&sqd->lock);
  442. get_task_struct(tsk);
  443. ret = io_uring_alloc_task_context(tsk, ctx);
  444. wake_up_new_task(tsk);
  445. if (ret)
  446. goto err;
  447. } else if (p->flags & IORING_SETUP_SQ_AFF) {
  448. /* Can't have SQ_AFF without SQPOLL */
  449. ret = -EINVAL;
  450. goto err;
  451. }
  452. return 0;
  453. err_sqpoll:
  454. complete(&ctx->sq_data->exited);
  455. err:
  456. io_sq_thread_finish(ctx);
  457. return ret;
  458. }
  459. __cold int io_sqpoll_wq_cpu_affinity(struct io_ring_ctx *ctx,
  460. cpumask_var_t mask)
  461. {
  462. struct io_sq_data *sqd = ctx->sq_data;
  463. int ret = -EINVAL;
  464. if (sqd) {
  465. struct task_struct *tsk;
  466. io_sq_thread_park(sqd);
  467. /* Don't set affinity for a dying thread */
  468. tsk = sqpoll_task_locked(sqd);
  469. if (tsk)
  470. ret = io_wq_cpu_affinity(tsk->io_uring, mask);
  471. io_sq_thread_unpark(sqd);
  472. }
  473. return ret;
  474. }