smc_rx.c 14 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527
  1. // SPDX-License-Identifier: GPL-2.0
  2. /*
  3. * Shared Memory Communications over RDMA (SMC-R) and RoCE
  4. *
  5. * Manage RMBE
  6. * copy new RMBE data into user space
  7. *
  8. * Copyright IBM Corp. 2016
  9. *
  10. * Author(s): Ursula Braun <ubraun@linux.vnet.ibm.com>
  11. */
  12. #include <linux/net.h>
  13. #include <linux/rcupdate.h>
  14. #include <linux/sched/signal.h>
  15. #include <linux/splice.h>
  16. #include <net/sock.h>
  17. #include <trace/events/sock.h>
  18. #include "smc.h"
  19. #include "smc_core.h"
  20. #include "smc_cdc.h"
  21. #include "smc_tx.h" /* smc_tx_consumer_update() */
  22. #include "smc_rx.h"
  23. #include "smc_stats.h"
  24. #include "smc_tracepoint.h"
  25. /* callback implementation to wakeup consumers blocked with smc_rx_wait().
  26. * indirectly called by smc_cdc_msg_recv_action().
  27. */
  28. static void smc_rx_wake_up(struct sock *sk)
  29. {
  30. struct socket_wq *wq;
  31. trace_sk_data_ready(sk);
  32. /* derived from sock_def_readable() */
  33. /* called already in smc_listen_work() */
  34. rcu_read_lock();
  35. wq = rcu_dereference(sk->sk_wq);
  36. if (skwq_has_sleeper(wq))
  37. wake_up_interruptible_sync_poll(&wq->wait, EPOLLIN | EPOLLPRI |
  38. EPOLLRDNORM | EPOLLRDBAND);
  39. sk_wake_async_rcu(sk, SOCK_WAKE_WAITD, POLL_IN);
  40. if ((sk->sk_shutdown == SHUTDOWN_MASK) ||
  41. (sk->sk_state == SMC_CLOSED))
  42. sk_wake_async_rcu(sk, SOCK_WAKE_WAITD, POLL_HUP);
  43. rcu_read_unlock();
  44. }
  45. /* Update consumer cursor
  46. * @conn connection to update
  47. * @cons consumer cursor
  48. * @len number of Bytes consumed
  49. * Returns:
  50. * 1 if we should end our receive, 0 otherwise
  51. */
  52. static int smc_rx_update_consumer(struct smc_sock *smc,
  53. union smc_host_cursor cons, size_t len)
  54. {
  55. struct smc_connection *conn = &smc->conn;
  56. struct sock *sk = &smc->sk;
  57. bool force = false;
  58. int diff, rc = 0;
  59. smc_curs_add(conn->rmb_desc->len, &cons, len);
  60. /* did we process urgent data? */
  61. if (conn->urg_state == SMC_URG_VALID || conn->urg_rx_skip_pend) {
  62. diff = smc_curs_comp(conn->rmb_desc->len, &cons,
  63. &conn->urg_curs);
  64. if (sock_flag(sk, SOCK_URGINLINE)) {
  65. if (diff == 0) {
  66. force = true;
  67. rc = 1;
  68. conn->urg_state = SMC_URG_READ;
  69. }
  70. } else {
  71. if (diff == 1) {
  72. /* skip urgent byte */
  73. force = true;
  74. smc_curs_add(conn->rmb_desc->len, &cons, 1);
  75. conn->urg_rx_skip_pend = false;
  76. } else if (diff < -1)
  77. /* we read past urgent byte */
  78. conn->urg_state = SMC_URG_READ;
  79. }
  80. }
  81. smc_curs_copy(&conn->local_tx_ctrl.cons, &cons, conn);
  82. /* send consumer cursor update if required */
  83. /* similar to advertising new TCP rcv_wnd if required */
  84. smc_tx_consumer_update(conn, force);
  85. return rc;
  86. }
  87. static void smc_rx_update_cons(struct smc_sock *smc, size_t len)
  88. {
  89. struct smc_connection *conn = &smc->conn;
  90. union smc_host_cursor cons;
  91. smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
  92. smc_rx_update_consumer(smc, cons, len);
  93. }
  94. struct smc_spd_priv {
  95. struct smc_sock *smc;
  96. size_t len;
  97. };
  98. static void smc_rx_pipe_buf_release(struct pipe_inode_info *pipe,
  99. struct pipe_buffer *buf)
  100. {
  101. struct smc_spd_priv *priv = (struct smc_spd_priv *)buf->private;
  102. struct smc_sock *smc = priv->smc;
  103. struct smc_connection *conn;
  104. struct sock *sk = &smc->sk;
  105. if (sk->sk_state == SMC_CLOSED ||
  106. sk->sk_state == SMC_PEERFINCLOSEWAIT ||
  107. sk->sk_state == SMC_APPFINCLOSEWAIT)
  108. goto out;
  109. conn = &smc->conn;
  110. lock_sock(sk);
  111. smc_rx_update_cons(smc, priv->len);
  112. release_sock(sk);
  113. if (atomic_sub_and_test(priv->len, &conn->splice_pending))
  114. smc_rx_wake_up(sk);
  115. out:
  116. kfree(priv);
  117. put_page(buf->page);
  118. sock_put(sk);
  119. }
  120. static bool smc_rx_pipe_buf_get(struct pipe_inode_info *pipe,
  121. struct pipe_buffer *buf)
  122. {
  123. /* smc_spd_priv in buf->private is not shareable; disallow cloning. */
  124. return false;
  125. }
  126. static const struct pipe_buf_operations smc_pipe_ops = {
  127. .release = smc_rx_pipe_buf_release,
  128. .get = smc_rx_pipe_buf_get,
  129. };
  130. static void smc_rx_spd_release(struct splice_pipe_desc *spd,
  131. unsigned int i)
  132. {
  133. put_page(spd->pages[i]);
  134. }
  135. static int smc_rx_splice(struct pipe_inode_info *pipe, char *src, size_t len,
  136. struct smc_sock *smc)
  137. {
  138. struct smc_link_group *lgr = smc->conn.lgr;
  139. int offset = offset_in_page(src);
  140. struct partial_page *partial;
  141. struct splice_pipe_desc spd;
  142. struct smc_spd_priv **priv;
  143. struct page **pages;
  144. int bytes, nr_pages;
  145. int i;
  146. nr_pages = !lgr->is_smcd && smc->conn.rmb_desc->is_vm ?
  147. PAGE_ALIGN(len + offset) / PAGE_SIZE : 1;
  148. pages = kzalloc_objs(*pages, nr_pages);
  149. if (!pages)
  150. goto out;
  151. partial = kzalloc_objs(*partial, nr_pages);
  152. if (!partial)
  153. goto out_page;
  154. priv = kzalloc_objs(*priv, nr_pages);
  155. if (!priv)
  156. goto out_part;
  157. for (i = 0; i < nr_pages; i++) {
  158. priv[i] = kzalloc_obj(**priv);
  159. if (!priv[i])
  160. goto out_priv;
  161. }
  162. if (lgr->is_smcd ||
  163. (!lgr->is_smcd && !smc->conn.rmb_desc->is_vm)) {
  164. /* smcd or smcr that uses physically contiguous RMBs */
  165. priv[0]->len = len;
  166. priv[0]->smc = smc;
  167. partial[0].offset = src - (char *)smc->conn.rmb_desc->cpu_addr;
  168. partial[0].len = len;
  169. partial[0].private = (unsigned long)priv[0];
  170. pages[0] = smc->conn.rmb_desc->pages;
  171. } else {
  172. int size, left = len;
  173. void *buf = src;
  174. /* smcr that uses virtually contiguous RMBs*/
  175. for (i = 0; i < nr_pages; i++) {
  176. size = min_t(int, PAGE_SIZE - offset, left);
  177. priv[i]->len = size;
  178. priv[i]->smc = smc;
  179. pages[i] = vmalloc_to_page(buf);
  180. partial[i].offset = offset;
  181. partial[i].len = size;
  182. partial[i].private = (unsigned long)priv[i];
  183. buf += size;
  184. left -= size;
  185. offset = 0;
  186. }
  187. }
  188. spd.nr_pages_max = nr_pages;
  189. spd.nr_pages = nr_pages;
  190. spd.pages = pages;
  191. spd.partial = partial;
  192. spd.ops = &smc_pipe_ops;
  193. spd.spd_release = smc_rx_spd_release;
  194. bytes = splice_to_pipe(pipe, &spd);
  195. if (bytes > 0) {
  196. sock_hold(&smc->sk);
  197. if (!lgr->is_smcd && smc->conn.rmb_desc->is_vm) {
  198. for (i = 0; i < PAGE_ALIGN(bytes + offset) / PAGE_SIZE; i++)
  199. get_page(pages[i]);
  200. } else {
  201. get_page(smc->conn.rmb_desc->pages);
  202. }
  203. atomic_add(bytes, &smc->conn.splice_pending);
  204. }
  205. kfree(priv);
  206. kfree(partial);
  207. kfree(pages);
  208. return bytes;
  209. out_priv:
  210. for (i = (i - 1); i >= 0; i--)
  211. kfree(priv[i]);
  212. kfree(priv);
  213. out_part:
  214. kfree(partial);
  215. out_page:
  216. kfree(pages);
  217. out:
  218. return -ENOMEM;
  219. }
  220. static int smc_rx_data_available_and_no_splice_pend(struct smc_connection *conn, size_t peeked)
  221. {
  222. return smc_rx_data_available(conn, peeked) &&
  223. !atomic_read(&conn->splice_pending);
  224. }
  225. /* blocks rcvbuf consumer until >=len bytes available or timeout or interrupted
  226. * @smc smc socket
  227. * @timeo pointer to max seconds to wait, pointer to value 0 for no timeout
  228. * @peeked number of bytes already peeked
  229. * @fcrit add'l criterion to evaluate as function pointer
  230. * Returns:
  231. * 1 if at least 1 byte available in rcvbuf or if socket error/shutdown.
  232. * 0 otherwise (nothing in rcvbuf nor timeout, e.g. interrupted).
  233. */
  234. int smc_rx_wait(struct smc_sock *smc, long *timeo, size_t peeked,
  235. int (*fcrit)(struct smc_connection *conn, size_t baseline))
  236. {
  237. DEFINE_WAIT_FUNC(wait, woken_wake_function);
  238. struct smc_connection *conn = &smc->conn;
  239. struct smc_cdc_conn_state_flags *cflags =
  240. &conn->local_tx_ctrl.conn_state_flags;
  241. struct sock *sk = &smc->sk;
  242. int rc;
  243. if (fcrit(conn, peeked))
  244. return 1;
  245. sk_set_bit(SOCKWQ_ASYNC_WAITDATA, sk);
  246. add_wait_queue(sk_sleep(sk), &wait);
  247. rc = sk_wait_event(sk, timeo,
  248. READ_ONCE(sk->sk_err) ||
  249. cflags->peer_conn_abort ||
  250. READ_ONCE(sk->sk_shutdown) & RCV_SHUTDOWN ||
  251. conn->killed ||
  252. fcrit(conn, peeked),
  253. &wait);
  254. remove_wait_queue(sk_sleep(sk), &wait);
  255. sk_clear_bit(SOCKWQ_ASYNC_WAITDATA, sk);
  256. return rc;
  257. }
  258. static int smc_rx_recv_urg(struct smc_sock *smc, struct msghdr *msg, int len,
  259. int flags)
  260. {
  261. struct smc_connection *conn = &smc->conn;
  262. union smc_host_cursor cons;
  263. struct sock *sk = &smc->sk;
  264. int rc = 0;
  265. if (sock_flag(sk, SOCK_URGINLINE) ||
  266. !(conn->urg_state == SMC_URG_VALID) ||
  267. conn->urg_state == SMC_URG_READ)
  268. return -EINVAL;
  269. SMC_STAT_INC(smc, urg_data_cnt);
  270. if (conn->urg_state == SMC_URG_VALID) {
  271. if (!(flags & MSG_PEEK))
  272. smc->conn.urg_state = SMC_URG_READ;
  273. msg->msg_flags |= MSG_OOB;
  274. if (len > 0) {
  275. if (!(flags & MSG_TRUNC))
  276. rc = memcpy_to_msg(msg, &conn->urg_rx_byte, 1);
  277. len = 1;
  278. smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
  279. if (smc_curs_diff(conn->rmb_desc->len, &cons,
  280. &conn->urg_curs) > 1)
  281. conn->urg_rx_skip_pend = true;
  282. /* Urgent Byte was already accounted for, but trigger
  283. * skipping the urgent byte in non-inline case
  284. */
  285. if (!(flags & MSG_PEEK))
  286. smc_rx_update_consumer(smc, cons, 0);
  287. } else {
  288. msg->msg_flags |= MSG_TRUNC;
  289. }
  290. return rc ? -EFAULT : len;
  291. }
  292. if (sk->sk_state == SMC_CLOSED || sk->sk_shutdown & RCV_SHUTDOWN)
  293. return 0;
  294. return -EAGAIN;
  295. }
  296. static bool smc_rx_recvmsg_data_available(struct smc_sock *smc, size_t peeked)
  297. {
  298. struct smc_connection *conn = &smc->conn;
  299. if (smc_rx_data_available(conn, peeked))
  300. return true;
  301. else if (conn->urg_state == SMC_URG_VALID)
  302. /* we received a single urgent Byte - skip */
  303. smc_rx_update_cons(smc, 0);
  304. return false;
  305. }
  306. /* smc_rx_recvmsg - receive data from RMBE
  307. * @msg: copy data to receive buffer
  308. * @pipe: copy data to pipe if set - indicates splice() call
  309. *
  310. * rcvbuf consumer: main API called by socket layer.
  311. * Called under sk lock.
  312. */
  313. int smc_rx_recvmsg(struct smc_sock *smc, struct msghdr *msg,
  314. struct pipe_inode_info *pipe, size_t len, int flags)
  315. {
  316. size_t copylen, read_done = 0, read_remaining = len, peeked_bytes = 0;
  317. size_t chunk_len, chunk_off, chunk_len_sum;
  318. struct smc_connection *conn = &smc->conn;
  319. int (*func)(struct smc_connection *conn, size_t baseline);
  320. union smc_host_cursor cons;
  321. int readable, chunk;
  322. char *rcvbuf_base;
  323. struct sock *sk;
  324. int splbytes;
  325. long timeo;
  326. int target; /* Read at least these many bytes */
  327. int rc;
  328. if (unlikely(flags & MSG_ERRQUEUE))
  329. return -EINVAL; /* future work for sk.sk_family == AF_SMC */
  330. sk = &smc->sk;
  331. if (sk->sk_state == SMC_LISTEN)
  332. return -ENOTCONN;
  333. if (flags & MSG_OOB)
  334. return smc_rx_recv_urg(smc, msg, len, flags);
  335. timeo = sock_rcvtimeo(sk, flags & MSG_DONTWAIT);
  336. target = sock_rcvlowat(sk, flags & MSG_WAITALL, len);
  337. readable = atomic_read(&conn->bytes_to_rcv);
  338. if (readable >= conn->rmb_desc->len)
  339. SMC_STAT_RMB_RX_FULL(smc, !conn->lnk);
  340. if (len < readable)
  341. SMC_STAT_RMB_RX_SIZE_SMALL(smc, !conn->lnk);
  342. /* we currently use 1 RMBE per RMB, so RMBE == RMB base addr */
  343. rcvbuf_base = conn->rx_off + conn->rmb_desc->cpu_addr;
  344. do { /* while (read_remaining) */
  345. if (read_done >= target || (pipe && read_done))
  346. break;
  347. if (conn->killed)
  348. break;
  349. if (smc_rx_recvmsg_data_available(smc, peeked_bytes))
  350. goto copy;
  351. if (sk->sk_shutdown & RCV_SHUTDOWN) {
  352. /* smc_cdc_msg_recv_action() could have run after
  353. * above smc_rx_recvmsg_data_available()
  354. */
  355. if (smc_rx_recvmsg_data_available(smc, peeked_bytes))
  356. goto copy;
  357. break;
  358. }
  359. if (read_done) {
  360. if (sk->sk_err ||
  361. sk->sk_state == SMC_CLOSED ||
  362. !timeo ||
  363. signal_pending(current))
  364. break;
  365. } else {
  366. if (sk->sk_err) {
  367. read_done = sock_error(sk);
  368. break;
  369. }
  370. if (sk->sk_state == SMC_CLOSED) {
  371. if (!sock_flag(sk, SOCK_DONE)) {
  372. /* This occurs when user tries to read
  373. * from never connected socket.
  374. */
  375. read_done = -ENOTCONN;
  376. break;
  377. }
  378. break;
  379. }
  380. if (!timeo)
  381. return -EAGAIN;
  382. if (signal_pending(current)) {
  383. read_done = sock_intr_errno(timeo);
  384. break;
  385. }
  386. }
  387. if (!smc_rx_data_available(conn, peeked_bytes)) {
  388. smc_rx_wait(smc, &timeo, peeked_bytes, smc_rx_data_available);
  389. continue;
  390. }
  391. copy:
  392. /* initialize variables for 1st iteration of subsequent loop */
  393. /* could be just 1 byte, even after waiting on data above */
  394. readable = smc_rx_data_available(conn, peeked_bytes);
  395. splbytes = atomic_read(&conn->splice_pending);
  396. if (!readable || (msg && splbytes)) {
  397. if (splbytes)
  398. func = smc_rx_data_available_and_no_splice_pend;
  399. else
  400. func = smc_rx_data_available;
  401. smc_rx_wait(smc, &timeo, peeked_bytes, func);
  402. continue;
  403. }
  404. smc_curs_copy(&cons, &conn->local_tx_ctrl.cons, conn);
  405. if ((flags & MSG_PEEK) && peeked_bytes)
  406. smc_curs_add(conn->rmb_desc->len, &cons, peeked_bytes);
  407. /* subsequent splice() calls pick up where previous left */
  408. if (splbytes)
  409. smc_curs_add(conn->rmb_desc->len, &cons, splbytes);
  410. if (conn->urg_state == SMC_URG_VALID &&
  411. sock_flag(&smc->sk, SOCK_URGINLINE) &&
  412. readable > 1)
  413. readable--; /* always stop at urgent Byte */
  414. /* not more than what user space asked for */
  415. copylen = min_t(size_t, read_remaining, readable);
  416. /* determine chunks where to read from rcvbuf */
  417. /* either unwrapped case, or 1st chunk of wrapped case */
  418. chunk_len = min_t(size_t, copylen, conn->rmb_desc->len -
  419. cons.count);
  420. chunk_len_sum = chunk_len;
  421. chunk_off = cons.count;
  422. smc_rmb_sync_sg_for_cpu(conn);
  423. for (chunk = 0; chunk < 2; chunk++) {
  424. if (!(flags & MSG_TRUNC)) {
  425. if (msg) {
  426. rc = memcpy_to_msg(msg, rcvbuf_base +
  427. chunk_off,
  428. chunk_len);
  429. } else {
  430. rc = smc_rx_splice(pipe, rcvbuf_base +
  431. chunk_off, chunk_len,
  432. smc);
  433. }
  434. if (rc < 0) {
  435. if (!read_done)
  436. read_done = -EFAULT;
  437. goto out;
  438. }
  439. }
  440. read_remaining -= chunk_len;
  441. read_done += chunk_len;
  442. if (flags & MSG_PEEK)
  443. peeked_bytes += chunk_len;
  444. if (chunk_len_sum == copylen)
  445. break; /* either on 1st or 2nd iteration */
  446. /* prepare next (== 2nd) iteration */
  447. chunk_len = copylen - chunk_len; /* remainder */
  448. chunk_len_sum += chunk_len;
  449. chunk_off = 0; /* modulo offset in recv ring buffer */
  450. }
  451. /* update cursors */
  452. if (!(flags & MSG_PEEK)) {
  453. /* increased in recv tasklet smc_cdc_msg_rcv() */
  454. smp_mb__before_atomic();
  455. atomic_sub(copylen, &conn->bytes_to_rcv);
  456. /* guarantee 0 <= bytes_to_rcv <= rmb_desc->len */
  457. smp_mb__after_atomic();
  458. if (msg && smc_rx_update_consumer(smc, cons, copylen))
  459. goto out;
  460. }
  461. trace_smc_rx_recvmsg(smc, copylen);
  462. } while (read_remaining);
  463. out:
  464. return read_done;
  465. }
  466. /* Initialize receive properties on connection establishment. NB: not __init! */
  467. void smc_rx_init(struct smc_sock *smc)
  468. {
  469. smc->sk.sk_data_ready = smc_rx_wake_up;
  470. atomic_set(&smc->conn.splice_pending, 0);
  471. smc->conn.urg_state = SMC_URG_READ;
  472. }