verbs.c 36 KB

1234567891011121314151617181920212223242526272829303132333435363738394041424344454647484950515253545556575859606162636465666768697071727374757677787980818283848586878889909192939495969798991001011021031041051061071081091101111121131141151161171181191201211221231241251261271281291301311321331341351361371381391401411421431441451461471481491501511521531541551561571581591601611621631641651661671681691701711721731741751761771781791801811821831841851861871881891901911921931941951961971981992002012022032042052062072082092102112122132142152162172182192202212222232242252262272282292302312322332342352362372382392402412422432442452462472482492502512522532542552562572582592602612622632642652662672682692702712722732742752762772782792802812822832842852862872882892902912922932942952962972982993003013023033043053063073083093103113123133143153163173183193203213223233243253263273283293303313323333343353363373383393403413423433443453463473483493503513523533543553563573583593603613623633643653663673683693703713723733743753763773783793803813823833843853863873883893903913923933943953963973983994004014024034044054064074084094104114124134144154164174184194204214224234244254264274284294304314324334344354364374384394404414424434444454464474484494504514524534544554564574584594604614624634644654664674684694704714724734744754764774784794804814824834844854864874884894904914924934944954964974984995005015025035045055065075085095105115125135145155165175185195205215225235245255265275285295305315325335345355365375385395405415425435445455465475485495505515525535545555565575585595605615625635645655665675685695705715725735745755765775785795805815825835845855865875885895905915925935945955965975985996006016026036046056066076086096106116126136146156166176186196206216226236246256266276286296306316326336346356366376386396406416426436446456466476486496506516526536546556566576586596606616626636646656666676686696706716726736746756766776786796806816826836846856866876886896906916926936946956966976986997007017027037047057067077087097107117127137147157167177187197207217227237247257267277287297307317327337347357367377387397407417427437447457467477487497507517527537547557567577587597607617627637647657667677687697707717727737747757767777787797807817827837847857867877887897907917927937947957967977987998008018028038048058068078088098108118128138148158168178188198208218228238248258268278288298308318328338348358368378388398408418428438448458468478488498508518528538548558568578588598608618628638648658668678688698708718728738748758768778788798808818828838848858868878888898908918928938948958968978988999009019029039049059069079089099109119129139149159169179189199209219229239249259269279289299309319329339349359369379389399409419429439449459469479489499509519529539549559569579589599609619629639649659669679689699709719729739749759769779789799809819829839849859869879889899909919929939949959969979989991000100110021003100410051006100710081009101010111012101310141015101610171018101910201021102210231024102510261027102810291030103110321033103410351036103710381039104010411042104310441045104610471048104910501051105210531054105510561057105810591060106110621063106410651066106710681069107010711072107310741075107610771078107910801081108210831084108510861087108810891090109110921093109410951096109710981099110011011102110311041105110611071108110911101111111211131114111511161117111811191120112111221123112411251126112711281129113011311132113311341135113611371138113911401141114211431144114511461147114811491150115111521153115411551156115711581159116011611162116311641165116611671168116911701171117211731174117511761177117811791180118111821183118411851186118711881189119011911192119311941195119611971198119912001201120212031204120512061207120812091210121112121213121412151216121712181219122012211222122312241225122612271228122912301231123212331234123512361237123812391240124112421243124412451246124712481249125012511252125312541255125612571258125912601261126212631264126512661267126812691270127112721273127412751276127712781279128012811282128312841285128612871288128912901291129212931294129512961297129812991300130113021303130413051306130713081309131013111312131313141315131613171318131913201321132213231324132513261327132813291330133113321333133413351336133713381339134013411342134313441345134613471348134913501351135213531354135513561357135813591360136113621363136413651366136713681369137013711372137313741375137613771378137913801381138213831384138513861387138813891390139113921393139413951396139713981399140014011402140314041405140614071408140914101411
  1. // SPDX-License-Identifier: GPL-2.0 OR BSD-3-Clause
  2. /*
  3. * Copyright (c) 2014-2017 Oracle. All rights reserved.
  4. * Copyright (c) 2003-2007 Network Appliance, Inc. All rights reserved.
  5. *
  6. * This software is available to you under a choice of one of two
  7. * licenses. You may choose to be licensed under the terms of the GNU
  8. * General Public License (GPL) Version 2, available from the file
  9. * COPYING in the main directory of this source tree, or the BSD-type
  10. * license below:
  11. *
  12. * Redistribution and use in source and binary forms, with or without
  13. * modification, are permitted provided that the following conditions
  14. * are met:
  15. *
  16. * Redistributions of source code must retain the above copyright
  17. * notice, this list of conditions and the following disclaimer.
  18. *
  19. * Redistributions in binary form must reproduce the above
  20. * copyright notice, this list of conditions and the following
  21. * disclaimer in the documentation and/or other materials provided
  22. * with the distribution.
  23. *
  24. * Neither the name of the Network Appliance, Inc. nor the names of
  25. * its contributors may be used to endorse or promote products
  26. * derived from this software without specific prior written
  27. * permission.
  28. *
  29. * THIS SOFTWARE IS PROVIDED BY THE COPYRIGHT HOLDERS AND CONTRIBUTORS
  30. * "AS IS" AND ANY EXPRESS OR IMPLIED WARRANTIES, INCLUDING, BUT NOT
  31. * LIMITED TO, THE IMPLIED WARRANTIES OF MERCHANTABILITY AND FITNESS FOR
  32. * A PARTICULAR PURPOSE ARE DISCLAIMED. IN NO EVENT SHALL THE COPYRIGHT
  33. * OWNER OR CONTRIBUTORS BE LIABLE FOR ANY DIRECT, INDIRECT, INCIDENTAL,
  34. * SPECIAL, EXEMPLARY, OR CONSEQUENTIAL DAMAGES (INCLUDING, BUT NOT
  35. * LIMITED TO, PROCUREMENT OF SUBSTITUTE GOODS OR SERVICES; LOSS OF USE,
  36. * DATA, OR PROFITS; OR BUSINESS INTERRUPTION) HOWEVER CAUSED AND ON ANY
  37. * THEORY OF LIABILITY, WHETHER IN CONTRACT, STRICT LIABILITY, OR TORT
  38. * (INCLUDING NEGLIGENCE OR OTHERWISE) ARISING IN ANY WAY OUT OF THE USE
  39. * OF THIS SOFTWARE, EVEN IF ADVISED OF THE POSSIBILITY OF SUCH DAMAGE.
  40. */
  41. /*
  42. * verbs.c
  43. *
  44. * Encapsulates the major functions managing:
  45. * o adapters
  46. * o endpoints
  47. * o connections
  48. * o buffer memory
  49. */
  50. #include <linux/bitops.h>
  51. #include <linux/interrupt.h>
  52. #include <linux/slab.h>
  53. #include <linux/sunrpc/addr.h>
  54. #include <linux/sunrpc/svc_rdma.h>
  55. #include <linux/log2.h>
  56. #include <asm/barrier.h>
  57. #include <rdma/ib_cm.h>
  58. #include "xprt_rdma.h"
  59. #include <trace/events/rpcrdma.h>
  60. static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt);
  61. static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt);
  62. static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
  63. struct rpcrdma_sendctx *sc);
  64. static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt);
  65. static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt);
  66. static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt);
  67. static void rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt);
  68. static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt);
  69. static void rpcrdma_ep_get(struct rpcrdma_ep *ep);
  70. static int rpcrdma_ep_put(struct rpcrdma_ep *ep);
  71. static struct rpcrdma_regbuf *
  72. rpcrdma_regbuf_alloc_node(size_t size, enum dma_data_direction direction,
  73. int node);
  74. static struct rpcrdma_regbuf *
  75. rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction);
  76. static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb);
  77. static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb);
  78. /* Wait for outstanding transport work to finish. ib_drain_qp
  79. * handles the drains in the wrong order for us, so open code
  80. * them here.
  81. */
  82. static void rpcrdma_xprt_drain(struct rpcrdma_xprt *r_xprt)
  83. {
  84. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  85. struct rdma_cm_id *id = ep->re_id;
  86. /* Wait for rpcrdma_post_recvs() to leave its critical
  87. * section.
  88. */
  89. if (atomic_inc_return(&ep->re_receiving) > 1)
  90. wait_for_completion(&ep->re_done);
  91. /* Flush Receives, then wait for deferred Reply work
  92. * to complete.
  93. */
  94. ib_drain_rq(id->qp);
  95. /* Deferred Reply processing might have scheduled
  96. * local invalidations.
  97. */
  98. ib_drain_sq(id->qp);
  99. rpcrdma_ep_put(ep);
  100. }
  101. /* Ensure xprt_force_disconnect() is invoked exactly once when a
  102. * connection is closed or lost. (The important thing is it needs
  103. * to be invoked "at least" once).
  104. */
  105. void rpcrdma_force_disconnect(struct rpcrdma_ep *ep)
  106. {
  107. if (atomic_add_unless(&ep->re_force_disconnect, 1, 1))
  108. xprt_force_disconnect(ep->re_xprt);
  109. }
  110. /**
  111. * rpcrdma_flush_disconnect - Disconnect on flushed completion
  112. * @r_xprt: transport to disconnect
  113. * @wc: work completion entry
  114. *
  115. * Must be called in process context.
  116. */
  117. void rpcrdma_flush_disconnect(struct rpcrdma_xprt *r_xprt, struct ib_wc *wc)
  118. {
  119. if (wc->status != IB_WC_SUCCESS)
  120. rpcrdma_force_disconnect(r_xprt->rx_ep);
  121. }
  122. /**
  123. * rpcrdma_wc_send - Invoked by RDMA provider for each polled Send WC
  124. * @cq: completion queue
  125. * @wc: WCE for a completed Send WR
  126. *
  127. */
  128. static void rpcrdma_wc_send(struct ib_cq *cq, struct ib_wc *wc)
  129. {
  130. struct ib_cqe *cqe = wc->wr_cqe;
  131. struct rpcrdma_sendctx *sc =
  132. container_of(cqe, struct rpcrdma_sendctx, sc_cqe);
  133. struct rpcrdma_xprt *r_xprt = cq->cq_context;
  134. /* WARNING: Only wr_cqe and status are reliable at this point */
  135. trace_xprtrdma_wc_send(wc, &sc->sc_cid);
  136. rpcrdma_sendctx_put_locked(r_xprt, sc);
  137. rpcrdma_flush_disconnect(r_xprt, wc);
  138. }
  139. /**
  140. * rpcrdma_wc_receive - Invoked by RDMA provider for each polled Receive WC
  141. * @cq: completion queue
  142. * @wc: WCE for a completed Receive WR
  143. *
  144. */
  145. static void rpcrdma_wc_receive(struct ib_cq *cq, struct ib_wc *wc)
  146. {
  147. struct ib_cqe *cqe = wc->wr_cqe;
  148. struct rpcrdma_rep *rep = container_of(cqe, struct rpcrdma_rep,
  149. rr_cqe);
  150. struct rpcrdma_xprt *r_xprt = cq->cq_context;
  151. /* WARNING: Only wr_cqe and status are reliable at this point */
  152. trace_xprtrdma_wc_receive(wc, &rep->rr_cid);
  153. --r_xprt->rx_ep->re_receive_count;
  154. if (wc->status != IB_WC_SUCCESS)
  155. goto out_flushed;
  156. /* status == SUCCESS means all fields in wc are trustworthy */
  157. rpcrdma_set_xdrlen(&rep->rr_hdrbuf, wc->byte_len);
  158. rep->rr_wc_flags = wc->wc_flags;
  159. rep->rr_inv_rkey = wc->ex.invalidate_rkey;
  160. ib_dma_sync_single_for_cpu(rdmab_device(rep->rr_rdmabuf),
  161. rdmab_addr(rep->rr_rdmabuf),
  162. wc->byte_len, DMA_FROM_DEVICE);
  163. rpcrdma_reply_handler(rep);
  164. return;
  165. out_flushed:
  166. rpcrdma_flush_disconnect(r_xprt, wc);
  167. rpcrdma_rep_put(&r_xprt->rx_buf, rep);
  168. }
  169. static void rpcrdma_update_cm_private(struct rpcrdma_ep *ep,
  170. struct rdma_conn_param *param)
  171. {
  172. const struct rpcrdma_connect_private *pmsg = param->private_data;
  173. unsigned int rsize, wsize;
  174. /* Default settings for RPC-over-RDMA Version One */
  175. rsize = RPCRDMA_V1_DEF_INLINE_SIZE;
  176. wsize = RPCRDMA_V1_DEF_INLINE_SIZE;
  177. if (pmsg &&
  178. pmsg->cp_magic == rpcrdma_cmp_magic &&
  179. pmsg->cp_version == RPCRDMA_CMP_VERSION) {
  180. rsize = rpcrdma_decode_buffer_size(pmsg->cp_send_size);
  181. wsize = rpcrdma_decode_buffer_size(pmsg->cp_recv_size);
  182. }
  183. if (rsize < ep->re_inline_recv)
  184. ep->re_inline_recv = rsize;
  185. if (wsize < ep->re_inline_send)
  186. ep->re_inline_send = wsize;
  187. rpcrdma_set_max_header_sizes(ep);
  188. }
  189. /**
  190. * rpcrdma_cm_event_handler - Handle RDMA CM events
  191. * @id: rdma_cm_id on which an event has occurred
  192. * @event: details of the event
  193. *
  194. * Called with @id's mutex held. Returns 1 if caller should
  195. * destroy @id, otherwise 0.
  196. */
  197. static int
  198. rpcrdma_cm_event_handler(struct rdma_cm_id *id, struct rdma_cm_event *event)
  199. {
  200. struct rpcrdma_ep *ep = id->context;
  201. might_sleep();
  202. switch (event->event) {
  203. case RDMA_CM_EVENT_ADDR_RESOLVED:
  204. case RDMA_CM_EVENT_ROUTE_RESOLVED:
  205. ep->re_async_rc = 0;
  206. complete(&ep->re_done);
  207. return 0;
  208. case RDMA_CM_EVENT_ADDR_ERROR:
  209. ep->re_async_rc = -EPROTO;
  210. complete(&ep->re_done);
  211. return 0;
  212. case RDMA_CM_EVENT_ROUTE_ERROR:
  213. ep->re_async_rc = -ENETUNREACH;
  214. complete(&ep->re_done);
  215. return 0;
  216. case RDMA_CM_EVENT_ADDR_CHANGE:
  217. ep->re_connect_status = -ENODEV;
  218. goto disconnected;
  219. case RDMA_CM_EVENT_ESTABLISHED:
  220. rpcrdma_ep_get(ep);
  221. ep->re_connect_status = 1;
  222. rpcrdma_update_cm_private(ep, &event->param.conn);
  223. trace_xprtrdma_inline_thresh(ep);
  224. wake_up_all(&ep->re_connect_wait);
  225. break;
  226. case RDMA_CM_EVENT_CONNECT_ERROR:
  227. ep->re_connect_status = -ENOTCONN;
  228. goto wake_connect_worker;
  229. case RDMA_CM_EVENT_UNREACHABLE:
  230. ep->re_connect_status = -ENETUNREACH;
  231. goto wake_connect_worker;
  232. case RDMA_CM_EVENT_REJECTED:
  233. ep->re_connect_status = -ECONNREFUSED;
  234. if (event->status == IB_CM_REJ_STALE_CONN)
  235. ep->re_connect_status = -ENOTCONN;
  236. wake_connect_worker:
  237. wake_up_all(&ep->re_connect_wait);
  238. return 0;
  239. case RDMA_CM_EVENT_DISCONNECTED:
  240. ep->re_connect_status = -ECONNABORTED;
  241. disconnected:
  242. rpcrdma_force_disconnect(ep);
  243. return rpcrdma_ep_put(ep);
  244. default:
  245. break;
  246. }
  247. return 0;
  248. }
  249. static void rpcrdma_ep_removal_done(struct rpcrdma_notification *rn)
  250. {
  251. struct rpcrdma_ep *ep = container_of(rn, struct rpcrdma_ep, re_rn);
  252. trace_xprtrdma_device_removal(ep->re_id);
  253. xprt_force_disconnect(ep->re_xprt);
  254. }
  255. static struct rdma_cm_id *rpcrdma_create_id(struct rpcrdma_xprt *r_xprt,
  256. struct rpcrdma_ep *ep)
  257. {
  258. unsigned long wtimeout = msecs_to_jiffies(RDMA_RESOLVE_TIMEOUT) + 1;
  259. struct rpc_xprt *xprt = &r_xprt->rx_xprt;
  260. struct rdma_cm_id *id;
  261. int rc;
  262. init_completion(&ep->re_done);
  263. id = rdma_create_id(xprt->xprt_net, rpcrdma_cm_event_handler, ep,
  264. RDMA_PS_TCP, IB_QPT_RC);
  265. if (IS_ERR(id))
  266. return id;
  267. ep->re_async_rc = -ETIMEDOUT;
  268. rc = rdma_resolve_addr(id, NULL, (struct sockaddr *)&xprt->addr,
  269. RDMA_RESOLVE_TIMEOUT);
  270. if (rc)
  271. goto out;
  272. rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
  273. if (rc < 0)
  274. goto out;
  275. rc = ep->re_async_rc;
  276. if (rc)
  277. goto out;
  278. ep->re_async_rc = -ETIMEDOUT;
  279. rc = rdma_resolve_route(id, RDMA_RESOLVE_TIMEOUT);
  280. if (rc)
  281. goto out;
  282. rc = wait_for_completion_interruptible_timeout(&ep->re_done, wtimeout);
  283. if (rc < 0)
  284. goto out;
  285. rc = ep->re_async_rc;
  286. if (rc)
  287. goto out;
  288. rc = rpcrdma_rn_register(id->device, &ep->re_rn, rpcrdma_ep_removal_done);
  289. if (rc)
  290. goto out;
  291. return id;
  292. out:
  293. rdma_destroy_id(id);
  294. return ERR_PTR(rc);
  295. }
  296. static void rpcrdma_ep_destroy(struct kref *kref)
  297. {
  298. struct rpcrdma_ep *ep = container_of(kref, struct rpcrdma_ep, re_kref);
  299. if (ep->re_id->qp) {
  300. rdma_destroy_qp(ep->re_id);
  301. ep->re_id->qp = NULL;
  302. }
  303. if (ep->re_attr.recv_cq)
  304. ib_free_cq(ep->re_attr.recv_cq);
  305. ep->re_attr.recv_cq = NULL;
  306. if (ep->re_attr.send_cq)
  307. ib_free_cq(ep->re_attr.send_cq);
  308. ep->re_attr.send_cq = NULL;
  309. if (ep->re_pd)
  310. ib_dealloc_pd(ep->re_pd);
  311. ep->re_pd = NULL;
  312. rpcrdma_rn_unregister(ep->re_id->device, &ep->re_rn);
  313. kfree(ep);
  314. module_put(THIS_MODULE);
  315. }
  316. static noinline void rpcrdma_ep_get(struct rpcrdma_ep *ep)
  317. {
  318. kref_get(&ep->re_kref);
  319. }
  320. /* Returns:
  321. * %0 if @ep still has a positive kref count, or
  322. * %1 if @ep was destroyed successfully.
  323. */
  324. static noinline int rpcrdma_ep_put(struct rpcrdma_ep *ep)
  325. {
  326. return kref_put(&ep->re_kref, rpcrdma_ep_destroy);
  327. }
  328. static int rpcrdma_ep_create(struct rpcrdma_xprt *r_xprt)
  329. {
  330. struct rpcrdma_connect_private *pmsg;
  331. struct ib_device *device;
  332. struct rdma_cm_id *id;
  333. struct rpcrdma_ep *ep;
  334. int rc;
  335. ep = kzalloc_obj(*ep, XPRTRDMA_GFP_FLAGS);
  336. if (!ep)
  337. return -ENOTCONN;
  338. ep->re_xprt = &r_xprt->rx_xprt;
  339. kref_init(&ep->re_kref);
  340. id = rpcrdma_create_id(r_xprt, ep);
  341. if (IS_ERR(id)) {
  342. kfree(ep);
  343. return PTR_ERR(id);
  344. }
  345. __module_get(THIS_MODULE);
  346. device = id->device;
  347. ep->re_id = id;
  348. reinit_completion(&ep->re_done);
  349. ep->re_max_requests = r_xprt->rx_xprt.max_reqs;
  350. ep->re_inline_send = xprt_rdma_max_inline_write;
  351. ep->re_inline_recv = xprt_rdma_max_inline_read;
  352. rc = frwr_query_device(ep, device);
  353. if (rc)
  354. goto out_destroy;
  355. r_xprt->rx_buf.rb_max_requests = cpu_to_be32(ep->re_max_requests);
  356. ep->re_attr.srq = NULL;
  357. ep->re_attr.cap.max_inline_data = 0;
  358. ep->re_attr.sq_sig_type = IB_SIGNAL_REQ_WR;
  359. ep->re_attr.qp_type = IB_QPT_RC;
  360. ep->re_attr.port_num = ~0;
  361. ep->re_send_batch = ep->re_max_requests >> 3;
  362. ep->re_send_count = ep->re_send_batch;
  363. init_waitqueue_head(&ep->re_connect_wait);
  364. ep->re_attr.send_cq = ib_alloc_cq_any(device, r_xprt,
  365. ep->re_attr.cap.max_send_wr,
  366. IB_POLL_WORKQUEUE);
  367. if (IS_ERR(ep->re_attr.send_cq)) {
  368. rc = PTR_ERR(ep->re_attr.send_cq);
  369. ep->re_attr.send_cq = NULL;
  370. goto out_destroy;
  371. }
  372. ep->re_attr.recv_cq = ib_alloc_cq_any(device, r_xprt,
  373. ep->re_attr.cap.max_recv_wr,
  374. IB_POLL_WORKQUEUE);
  375. if (IS_ERR(ep->re_attr.recv_cq)) {
  376. rc = PTR_ERR(ep->re_attr.recv_cq);
  377. ep->re_attr.recv_cq = NULL;
  378. goto out_destroy;
  379. }
  380. ep->re_receive_count = 0;
  381. /* Initialize cma parameters */
  382. memset(&ep->re_remote_cma, 0, sizeof(ep->re_remote_cma));
  383. /* Prepare RDMA-CM private message */
  384. pmsg = &ep->re_cm_private;
  385. pmsg->cp_magic = rpcrdma_cmp_magic;
  386. pmsg->cp_version = RPCRDMA_CMP_VERSION;
  387. pmsg->cp_flags |= RPCRDMA_CMP_F_SND_W_INV_OK;
  388. pmsg->cp_send_size = rpcrdma_encode_buffer_size(ep->re_inline_send);
  389. pmsg->cp_recv_size = rpcrdma_encode_buffer_size(ep->re_inline_recv);
  390. ep->re_remote_cma.private_data = pmsg;
  391. ep->re_remote_cma.private_data_len = sizeof(*pmsg);
  392. /* Client offers RDMA Read but does not initiate */
  393. ep->re_remote_cma.initiator_depth = 0;
  394. ep->re_remote_cma.responder_resources =
  395. min_t(int, U8_MAX, device->attrs.max_qp_rd_atom);
  396. /* Limit transport retries so client can detect server
  397. * GID changes quickly. RPC layer handles re-establishing
  398. * transport connection and retransmission.
  399. */
  400. ep->re_remote_cma.retry_count = 6;
  401. /* RPC-over-RDMA handles its own flow control. In addition,
  402. * make all RNR NAKs visible so we know that RPC-over-RDMA
  403. * flow control is working correctly (no NAKs should be seen).
  404. */
  405. ep->re_remote_cma.flow_control = 0;
  406. ep->re_remote_cma.rnr_retry_count = 0;
  407. ep->re_pd = ib_alloc_pd(device, 0);
  408. if (IS_ERR(ep->re_pd)) {
  409. rc = PTR_ERR(ep->re_pd);
  410. ep->re_pd = NULL;
  411. goto out_destroy;
  412. }
  413. rc = rdma_create_qp(id, ep->re_pd, &ep->re_attr);
  414. if (rc)
  415. goto out_destroy;
  416. r_xprt->rx_ep = ep;
  417. return 0;
  418. out_destroy:
  419. rpcrdma_ep_put(ep);
  420. rdma_destroy_id(id);
  421. return rc;
  422. }
  423. /**
  424. * rpcrdma_xprt_connect - Connect an unconnected transport
  425. * @r_xprt: controlling transport instance
  426. *
  427. * Returns 0 on success or a negative errno.
  428. */
  429. int rpcrdma_xprt_connect(struct rpcrdma_xprt *r_xprt)
  430. {
  431. struct rpc_xprt *xprt = &r_xprt->rx_xprt;
  432. struct rpcrdma_ep *ep;
  433. int rc;
  434. rc = rpcrdma_ep_create(r_xprt);
  435. if (rc)
  436. return rc;
  437. ep = r_xprt->rx_ep;
  438. xprt_clear_connected(xprt);
  439. rpcrdma_reset_cwnd(r_xprt);
  440. /* Bump the ep's reference count while there are
  441. * outstanding Receives.
  442. */
  443. rpcrdma_ep_get(ep);
  444. rpcrdma_post_recvs(r_xprt, 1);
  445. rc = rdma_connect(ep->re_id, &ep->re_remote_cma);
  446. if (rc)
  447. goto out;
  448. if (xprt->reestablish_timeout < RPCRDMA_INIT_REEST_TO)
  449. xprt->reestablish_timeout = RPCRDMA_INIT_REEST_TO;
  450. wait_event_interruptible(ep->re_connect_wait,
  451. ep->re_connect_status != 0);
  452. if (ep->re_connect_status <= 0) {
  453. rc = ep->re_connect_status;
  454. goto out;
  455. }
  456. rc = rpcrdma_sendctxs_create(r_xprt);
  457. if (rc) {
  458. rc = -ENOTCONN;
  459. goto out;
  460. }
  461. rc = rpcrdma_reqs_setup(r_xprt);
  462. if (rc) {
  463. rc = -ENOTCONN;
  464. goto out;
  465. }
  466. rpcrdma_mrs_create(r_xprt);
  467. frwr_wp_create(r_xprt);
  468. out:
  469. trace_xprtrdma_connect(r_xprt, rc);
  470. return rc;
  471. }
  472. /**
  473. * rpcrdma_xprt_disconnect - Disconnect underlying transport
  474. * @r_xprt: controlling transport instance
  475. *
  476. * Caller serializes. Either the transport send lock is held,
  477. * or we're being called to destroy the transport.
  478. *
  479. * On return, @r_xprt is completely divested of all hardware
  480. * resources and prepared for the next ->connect operation.
  481. */
  482. void rpcrdma_xprt_disconnect(struct rpcrdma_xprt *r_xprt)
  483. {
  484. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  485. struct rdma_cm_id *id;
  486. int rc;
  487. if (!ep)
  488. return;
  489. id = ep->re_id;
  490. rc = rdma_disconnect(id);
  491. trace_xprtrdma_disconnect(r_xprt, rc);
  492. rpcrdma_xprt_drain(r_xprt);
  493. rpcrdma_reps_unmap(r_xprt);
  494. rpcrdma_reqs_reset(r_xprt);
  495. rpcrdma_mrs_destroy(r_xprt);
  496. rpcrdma_sendctxs_destroy(r_xprt);
  497. if (rpcrdma_ep_put(ep))
  498. rdma_destroy_id(id);
  499. r_xprt->rx_ep = NULL;
  500. }
  501. /* Fixed-size circular FIFO queue. This implementation is wait-free and
  502. * lock-free.
  503. *
  504. * Consumer is the code path that posts Sends. This path dequeues a
  505. * sendctx for use by a Send operation. Multiple consumer threads
  506. * are serialized by the RPC transport lock, which allows only one
  507. * ->send_request call at a time.
  508. *
  509. * Producer is the code path that handles Send completions. This path
  510. * enqueues a sendctx that has been completed. Multiple producer
  511. * threads are serialized by the ib_poll_cq() function.
  512. */
  513. /* rpcrdma_sendctxs_destroy() assumes caller has already quiesced
  514. * queue activity, and rpcrdma_xprt_drain has flushed all remaining
  515. * Send requests.
  516. */
  517. static void rpcrdma_sendctxs_destroy(struct rpcrdma_xprt *r_xprt)
  518. {
  519. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  520. unsigned long i;
  521. if (!buf->rb_sc_ctxs)
  522. return;
  523. for (i = 0; i <= buf->rb_sc_last; i++)
  524. kfree(buf->rb_sc_ctxs[i]);
  525. kfree(buf->rb_sc_ctxs);
  526. buf->rb_sc_ctxs = NULL;
  527. }
  528. static struct rpcrdma_sendctx *rpcrdma_sendctx_create(struct rpcrdma_ep *ep)
  529. {
  530. struct rpcrdma_sendctx *sc;
  531. sc = kzalloc_flex(*sc, sc_sges, ep->re_attr.cap.max_send_sge,
  532. XPRTRDMA_GFP_FLAGS);
  533. if (!sc)
  534. return NULL;
  535. sc->sc_cqe.done = rpcrdma_wc_send;
  536. sc->sc_cid.ci_queue_id = ep->re_attr.send_cq->res.id;
  537. sc->sc_cid.ci_completion_id =
  538. atomic_inc_return(&ep->re_completion_ids);
  539. return sc;
  540. }
  541. static int rpcrdma_sendctxs_create(struct rpcrdma_xprt *r_xprt)
  542. {
  543. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  544. struct rpcrdma_sendctx *sc;
  545. unsigned long i;
  546. /* Maximum number of concurrent outstanding Send WRs. Capping
  547. * the circular queue size stops Send Queue overflow by causing
  548. * the ->send_request call to fail temporarily before too many
  549. * Sends are posted.
  550. */
  551. i = r_xprt->rx_ep->re_max_requests + RPCRDMA_MAX_BC_REQUESTS;
  552. buf->rb_sc_ctxs = kzalloc_objs(sc, i, XPRTRDMA_GFP_FLAGS);
  553. if (!buf->rb_sc_ctxs)
  554. return -ENOMEM;
  555. buf->rb_sc_last = i - 1;
  556. for (i = 0; i <= buf->rb_sc_last; i++) {
  557. sc = rpcrdma_sendctx_create(r_xprt->rx_ep);
  558. if (!sc)
  559. return -ENOMEM;
  560. buf->rb_sc_ctxs[i] = sc;
  561. }
  562. buf->rb_sc_head = 0;
  563. buf->rb_sc_tail = 0;
  564. return 0;
  565. }
  566. /* The sendctx queue is not guaranteed to have a size that is a
  567. * power of two, thus the helpers in circ_buf.h cannot be used.
  568. * The other option is to use modulus (%), which can be expensive.
  569. */
  570. static unsigned long rpcrdma_sendctx_next(struct rpcrdma_buffer *buf,
  571. unsigned long item)
  572. {
  573. return likely(item < buf->rb_sc_last) ? item + 1 : 0;
  574. }
  575. /**
  576. * rpcrdma_sendctx_get_locked - Acquire a send context
  577. * @r_xprt: controlling transport instance
  578. *
  579. * Returns pointer to a free send completion context; or NULL if
  580. * the queue is empty.
  581. *
  582. * Usage: Called to acquire an SGE array before preparing a Send WR.
  583. *
  584. * The caller serializes calls to this function (per transport), and
  585. * provides an effective memory barrier that flushes the new value
  586. * of rb_sc_head.
  587. */
  588. struct rpcrdma_sendctx *rpcrdma_sendctx_get_locked(struct rpcrdma_xprt *r_xprt)
  589. {
  590. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  591. struct rpcrdma_sendctx *sc;
  592. unsigned long next_head;
  593. next_head = rpcrdma_sendctx_next(buf, buf->rb_sc_head);
  594. if (next_head == READ_ONCE(buf->rb_sc_tail))
  595. goto out_emptyq;
  596. /* ORDER: item must be accessed _before_ head is updated */
  597. sc = buf->rb_sc_ctxs[next_head];
  598. /* Releasing the lock in the caller acts as a memory
  599. * barrier that flushes rb_sc_head.
  600. */
  601. buf->rb_sc_head = next_head;
  602. return sc;
  603. out_emptyq:
  604. /* The queue is "empty" if there have not been enough Send
  605. * completions recently. This is a sign the Send Queue is
  606. * backing up. Cause the caller to pause and try again.
  607. */
  608. xprt_wait_for_buffer_space(&r_xprt->rx_xprt);
  609. r_xprt->rx_stats.empty_sendctx_q++;
  610. return NULL;
  611. }
  612. /**
  613. * rpcrdma_sendctx_put_locked - Release a send context
  614. * @r_xprt: controlling transport instance
  615. * @sc: send context to release
  616. *
  617. * Usage: Called from Send completion to return a sendctxt
  618. * to the queue.
  619. *
  620. * The caller serializes calls to this function (per transport).
  621. */
  622. static void rpcrdma_sendctx_put_locked(struct rpcrdma_xprt *r_xprt,
  623. struct rpcrdma_sendctx *sc)
  624. {
  625. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  626. unsigned long next_tail;
  627. /* Unmap SGEs of previously completed but unsignaled
  628. * Sends by walking up the queue until @sc is found.
  629. */
  630. next_tail = buf->rb_sc_tail;
  631. do {
  632. next_tail = rpcrdma_sendctx_next(buf, next_tail);
  633. /* ORDER: item must be accessed _before_ tail is updated */
  634. rpcrdma_sendctx_unmap(buf->rb_sc_ctxs[next_tail]);
  635. } while (buf->rb_sc_ctxs[next_tail] != sc);
  636. /* Paired with READ_ONCE */
  637. smp_store_release(&buf->rb_sc_tail, next_tail);
  638. xprt_write_space(&r_xprt->rx_xprt);
  639. }
  640. static void
  641. rpcrdma_mrs_create(struct rpcrdma_xprt *r_xprt)
  642. {
  643. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  644. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  645. struct ib_device *device = ep->re_id->device;
  646. unsigned int count;
  647. /* Try to allocate enough to perform one full-sized I/O */
  648. for (count = 0; count < ep->re_max_rdma_segs; count++) {
  649. struct rpcrdma_mr *mr;
  650. int rc;
  651. mr = kzalloc_node(sizeof(*mr), XPRTRDMA_GFP_FLAGS,
  652. ibdev_to_node(device));
  653. if (!mr)
  654. break;
  655. rc = frwr_mr_init(r_xprt, mr);
  656. if (rc) {
  657. kfree(mr);
  658. break;
  659. }
  660. spin_lock(&buf->rb_lock);
  661. rpcrdma_mr_push(mr, &buf->rb_mrs);
  662. list_add(&mr->mr_all, &buf->rb_all_mrs);
  663. spin_unlock(&buf->rb_lock);
  664. }
  665. r_xprt->rx_stats.mrs_allocated += count;
  666. trace_xprtrdma_createmrs(r_xprt, count);
  667. }
  668. static void
  669. rpcrdma_mr_refresh_worker(struct work_struct *work)
  670. {
  671. struct rpcrdma_buffer *buf = container_of(work, struct rpcrdma_buffer,
  672. rb_refresh_worker);
  673. struct rpcrdma_xprt *r_xprt = container_of(buf, struct rpcrdma_xprt,
  674. rx_buf);
  675. rpcrdma_mrs_create(r_xprt);
  676. xprt_write_space(&r_xprt->rx_xprt);
  677. }
  678. /**
  679. * rpcrdma_mrs_refresh - Wake the MR refresh worker
  680. * @r_xprt: controlling transport instance
  681. *
  682. */
  683. void rpcrdma_mrs_refresh(struct rpcrdma_xprt *r_xprt)
  684. {
  685. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  686. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  687. /* If there is no underlying connection, it's no use
  688. * to wake the refresh worker.
  689. */
  690. if (ep->re_connect_status != 1)
  691. return;
  692. queue_work(system_highpri_wq, &buf->rb_refresh_worker);
  693. }
  694. /**
  695. * rpcrdma_req_create - Allocate an rpcrdma_req object
  696. * @r_xprt: controlling r_xprt
  697. * @size: initial size, in bytes, of send and receive buffers
  698. *
  699. * Returns an allocated and fully initialized rpcrdma_req or NULL.
  700. */
  701. struct rpcrdma_req *rpcrdma_req_create(struct rpcrdma_xprt *r_xprt,
  702. size_t size)
  703. {
  704. struct rpcrdma_buffer *buffer = &r_xprt->rx_buf;
  705. struct rpcrdma_req *req;
  706. req = kzalloc_obj(*req, XPRTRDMA_GFP_FLAGS);
  707. if (req == NULL)
  708. goto out1;
  709. req->rl_sendbuf = rpcrdma_regbuf_alloc(size, DMA_TO_DEVICE);
  710. if (!req->rl_sendbuf)
  711. goto out2;
  712. req->rl_recvbuf = rpcrdma_regbuf_alloc(size, DMA_NONE);
  713. if (!req->rl_recvbuf)
  714. goto out3;
  715. INIT_LIST_HEAD(&req->rl_free_mrs);
  716. INIT_LIST_HEAD(&req->rl_registered);
  717. spin_lock(&buffer->rb_lock);
  718. list_add(&req->rl_all, &buffer->rb_allreqs);
  719. spin_unlock(&buffer->rb_lock);
  720. return req;
  721. out3:
  722. rpcrdma_regbuf_free(req->rl_sendbuf);
  723. out2:
  724. kfree(req);
  725. out1:
  726. return NULL;
  727. }
  728. /**
  729. * rpcrdma_req_setup - Per-connection instance setup of an rpcrdma_req object
  730. * @r_xprt: controlling transport instance
  731. * @req: rpcrdma_req object to set up
  732. *
  733. * Returns zero on success, and a negative errno on failure.
  734. */
  735. int rpcrdma_req_setup(struct rpcrdma_xprt *r_xprt, struct rpcrdma_req *req)
  736. {
  737. struct rpcrdma_regbuf *rb;
  738. size_t maxhdrsize;
  739. /* Compute maximum header buffer size in bytes */
  740. maxhdrsize = rpcrdma_fixed_maxsz + 3 +
  741. r_xprt->rx_ep->re_max_rdma_segs * rpcrdma_readchunk_maxsz;
  742. maxhdrsize *= sizeof(__be32);
  743. rb = rpcrdma_regbuf_alloc(__roundup_pow_of_two(maxhdrsize),
  744. DMA_TO_DEVICE);
  745. if (!rb)
  746. goto out;
  747. if (!__rpcrdma_regbuf_dma_map(r_xprt, rb))
  748. goto out_free;
  749. req->rl_rdmabuf = rb;
  750. xdr_buf_init(&req->rl_hdrbuf, rdmab_data(rb), rdmab_length(rb));
  751. return 0;
  752. out_free:
  753. rpcrdma_regbuf_free(rb);
  754. out:
  755. return -ENOMEM;
  756. }
  757. /* ASSUMPTION: the rb_allreqs list is stable for the duration,
  758. * and thus can be walked without holding rb_lock. Eg. the
  759. * caller is holding the transport send lock to exclude
  760. * device removal or disconnection.
  761. */
  762. static int rpcrdma_reqs_setup(struct rpcrdma_xprt *r_xprt)
  763. {
  764. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  765. struct rpcrdma_req *req;
  766. int rc;
  767. list_for_each_entry(req, &buf->rb_allreqs, rl_all) {
  768. rc = rpcrdma_req_setup(r_xprt, req);
  769. if (rc)
  770. return rc;
  771. }
  772. return 0;
  773. }
  774. static void rpcrdma_req_reset(struct rpcrdma_req *req)
  775. {
  776. struct rpcrdma_mr *mr;
  777. /* Credits are valid for only one connection */
  778. req->rl_slot.rq_cong = 0;
  779. rpcrdma_regbuf_free(req->rl_rdmabuf);
  780. req->rl_rdmabuf = NULL;
  781. rpcrdma_regbuf_dma_unmap(req->rl_sendbuf);
  782. rpcrdma_regbuf_dma_unmap(req->rl_recvbuf);
  783. /* The verbs consumer can't know the state of an MR on the
  784. * req->rl_registered list unless a successful completion
  785. * has occurred, so they cannot be re-used.
  786. */
  787. while ((mr = rpcrdma_mr_pop(&req->rl_registered))) {
  788. struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
  789. spin_lock(&buf->rb_lock);
  790. list_del(&mr->mr_all);
  791. spin_unlock(&buf->rb_lock);
  792. frwr_mr_release(mr);
  793. }
  794. }
  795. /* ASSUMPTION: the rb_allreqs list is stable for the duration,
  796. * and thus can be walked without holding rb_lock. Eg. the
  797. * caller is holding the transport send lock to exclude
  798. * device removal or disconnection.
  799. */
  800. static void rpcrdma_reqs_reset(struct rpcrdma_xprt *r_xprt)
  801. {
  802. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  803. struct rpcrdma_req *req;
  804. list_for_each_entry(req, &buf->rb_allreqs, rl_all)
  805. rpcrdma_req_reset(req);
  806. }
  807. static noinline
  808. struct rpcrdma_rep *rpcrdma_rep_create(struct rpcrdma_xprt *r_xprt)
  809. {
  810. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  811. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  812. struct ib_device *device = ep->re_id->device;
  813. struct rpcrdma_rep *rep;
  814. rep = kzalloc_obj(*rep, XPRTRDMA_GFP_FLAGS);
  815. if (rep == NULL)
  816. goto out;
  817. rep->rr_rdmabuf = rpcrdma_regbuf_alloc_node(ep->re_inline_recv,
  818. DMA_FROM_DEVICE,
  819. ibdev_to_node(device));
  820. if (!rep->rr_rdmabuf)
  821. goto out_free;
  822. rep->rr_cid.ci_completion_id =
  823. atomic_inc_return(&r_xprt->rx_ep->re_completion_ids);
  824. xdr_buf_init(&rep->rr_hdrbuf, rdmab_data(rep->rr_rdmabuf),
  825. rdmab_length(rep->rr_rdmabuf));
  826. rep->rr_cqe.done = rpcrdma_wc_receive;
  827. rep->rr_rxprt = r_xprt;
  828. rep->rr_recv_wr.next = NULL;
  829. rep->rr_recv_wr.wr_cqe = &rep->rr_cqe;
  830. rep->rr_recv_wr.sg_list = &rep->rr_rdmabuf->rg_iov;
  831. rep->rr_recv_wr.num_sge = 1;
  832. spin_lock(&buf->rb_lock);
  833. list_add(&rep->rr_all, &buf->rb_all_reps);
  834. spin_unlock(&buf->rb_lock);
  835. return rep;
  836. out_free:
  837. kfree(rep);
  838. out:
  839. return NULL;
  840. }
  841. static void rpcrdma_rep_free(struct rpcrdma_rep *rep)
  842. {
  843. rpcrdma_regbuf_free(rep->rr_rdmabuf);
  844. kfree(rep);
  845. }
  846. static struct rpcrdma_rep *rpcrdma_rep_get_locked(struct rpcrdma_buffer *buf)
  847. {
  848. struct llist_node *node;
  849. /* Calls to llist_del_first are required to be serialized */
  850. node = llist_del_first(&buf->rb_free_reps);
  851. if (!node)
  852. return NULL;
  853. return llist_entry(node, struct rpcrdma_rep, rr_node);
  854. }
  855. /**
  856. * rpcrdma_rep_put - Release rpcrdma_rep back to free list
  857. * @buf: buffer pool
  858. * @rep: rep to release
  859. *
  860. */
  861. void rpcrdma_rep_put(struct rpcrdma_buffer *buf, struct rpcrdma_rep *rep)
  862. {
  863. llist_add(&rep->rr_node, &buf->rb_free_reps);
  864. }
  865. /* Caller must ensure the QP is quiescent (RQ is drained) before
  866. * invoking this function, to guarantee rb_all_reps is not
  867. * changing.
  868. */
  869. static void rpcrdma_reps_unmap(struct rpcrdma_xprt *r_xprt)
  870. {
  871. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  872. struct rpcrdma_rep *rep;
  873. list_for_each_entry(rep, &buf->rb_all_reps, rr_all)
  874. rpcrdma_regbuf_dma_unmap(rep->rr_rdmabuf);
  875. }
  876. static void rpcrdma_reps_destroy(struct rpcrdma_buffer *buf)
  877. {
  878. struct rpcrdma_rep *rep;
  879. spin_lock(&buf->rb_lock);
  880. while ((rep = list_first_entry_or_null(&buf->rb_all_reps,
  881. struct rpcrdma_rep,
  882. rr_all)) != NULL) {
  883. list_del(&rep->rr_all);
  884. spin_unlock(&buf->rb_lock);
  885. rpcrdma_rep_free(rep);
  886. spin_lock(&buf->rb_lock);
  887. }
  888. spin_unlock(&buf->rb_lock);
  889. }
  890. /**
  891. * rpcrdma_buffer_create - Create initial set of req/rep objects
  892. * @r_xprt: transport instance to (re)initialize
  893. *
  894. * Returns zero on success, otherwise a negative errno.
  895. */
  896. int rpcrdma_buffer_create(struct rpcrdma_xprt *r_xprt)
  897. {
  898. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  899. int i, rc;
  900. buf->rb_bc_srv_max_requests = 0;
  901. spin_lock_init(&buf->rb_lock);
  902. INIT_LIST_HEAD(&buf->rb_mrs);
  903. INIT_LIST_HEAD(&buf->rb_all_mrs);
  904. INIT_WORK(&buf->rb_refresh_worker, rpcrdma_mr_refresh_worker);
  905. INIT_LIST_HEAD(&buf->rb_send_bufs);
  906. INIT_LIST_HEAD(&buf->rb_allreqs);
  907. INIT_LIST_HEAD(&buf->rb_all_reps);
  908. rc = -ENOMEM;
  909. for (i = 0; i < r_xprt->rx_xprt.max_reqs; i++) {
  910. struct rpcrdma_req *req;
  911. req = rpcrdma_req_create(r_xprt,
  912. RPCRDMA_V1_DEF_INLINE_SIZE * 2);
  913. if (!req)
  914. goto out;
  915. list_add(&req->rl_list, &buf->rb_send_bufs);
  916. }
  917. init_llist_head(&buf->rb_free_reps);
  918. return 0;
  919. out:
  920. rpcrdma_buffer_destroy(buf);
  921. return rc;
  922. }
  923. /**
  924. * rpcrdma_req_destroy - Destroy an rpcrdma_req object
  925. * @req: unused object to be destroyed
  926. *
  927. * Relies on caller holding the transport send lock to protect
  928. * removing req->rl_all from buf->rb_all_reqs safely.
  929. */
  930. void rpcrdma_req_destroy(struct rpcrdma_req *req)
  931. {
  932. struct rpcrdma_mr *mr;
  933. list_del(&req->rl_all);
  934. while ((mr = rpcrdma_mr_pop(&req->rl_free_mrs))) {
  935. struct rpcrdma_buffer *buf = &mr->mr_xprt->rx_buf;
  936. spin_lock(&buf->rb_lock);
  937. list_del(&mr->mr_all);
  938. spin_unlock(&buf->rb_lock);
  939. frwr_mr_release(mr);
  940. }
  941. rpcrdma_regbuf_free(req->rl_recvbuf);
  942. rpcrdma_regbuf_free(req->rl_sendbuf);
  943. rpcrdma_regbuf_free(req->rl_rdmabuf);
  944. kfree(req);
  945. }
  946. /**
  947. * rpcrdma_mrs_destroy - Release all of a transport's MRs
  948. * @r_xprt: controlling transport instance
  949. *
  950. * Relies on caller holding the transport send lock to protect
  951. * removing mr->mr_list from req->rl_free_mrs safely.
  952. */
  953. static void rpcrdma_mrs_destroy(struct rpcrdma_xprt *r_xprt)
  954. {
  955. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  956. struct rpcrdma_mr *mr;
  957. cancel_work_sync(&buf->rb_refresh_worker);
  958. spin_lock(&buf->rb_lock);
  959. while ((mr = list_first_entry_or_null(&buf->rb_all_mrs,
  960. struct rpcrdma_mr,
  961. mr_all)) != NULL) {
  962. list_del(&mr->mr_list);
  963. list_del(&mr->mr_all);
  964. spin_unlock(&buf->rb_lock);
  965. frwr_mr_release(mr);
  966. spin_lock(&buf->rb_lock);
  967. }
  968. spin_unlock(&buf->rb_lock);
  969. }
  970. /**
  971. * rpcrdma_buffer_destroy - Release all hw resources
  972. * @buf: root control block for resources
  973. *
  974. * ORDERING: relies on a prior rpcrdma_xprt_drain :
  975. * - No more Send or Receive completions can occur
  976. * - All MRs, reps, and reqs are returned to their free lists
  977. */
  978. void
  979. rpcrdma_buffer_destroy(struct rpcrdma_buffer *buf)
  980. {
  981. rpcrdma_reps_destroy(buf);
  982. while (!list_empty(&buf->rb_send_bufs)) {
  983. struct rpcrdma_req *req;
  984. req = list_first_entry(&buf->rb_send_bufs,
  985. struct rpcrdma_req, rl_list);
  986. list_del(&req->rl_list);
  987. rpcrdma_req_destroy(req);
  988. }
  989. }
  990. /**
  991. * rpcrdma_mr_get - Allocate an rpcrdma_mr object
  992. * @r_xprt: controlling transport
  993. *
  994. * Returns an initialized rpcrdma_mr or NULL if no free
  995. * rpcrdma_mr objects are available.
  996. */
  997. struct rpcrdma_mr *
  998. rpcrdma_mr_get(struct rpcrdma_xprt *r_xprt)
  999. {
  1000. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  1001. struct rpcrdma_mr *mr;
  1002. spin_lock(&buf->rb_lock);
  1003. mr = rpcrdma_mr_pop(&buf->rb_mrs);
  1004. spin_unlock(&buf->rb_lock);
  1005. return mr;
  1006. }
  1007. /**
  1008. * rpcrdma_reply_put - Put reply buffers back into pool
  1009. * @buffers: buffer pool
  1010. * @req: object to return
  1011. *
  1012. */
  1013. void rpcrdma_reply_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
  1014. {
  1015. if (req->rl_reply) {
  1016. rpcrdma_rep_put(buffers, req->rl_reply);
  1017. req->rl_reply = NULL;
  1018. }
  1019. }
  1020. /**
  1021. * rpcrdma_buffer_get - Get a request buffer
  1022. * @buffers: Buffer pool from which to obtain a buffer
  1023. *
  1024. * Returns a fresh rpcrdma_req, or NULL if none are available.
  1025. */
  1026. struct rpcrdma_req *
  1027. rpcrdma_buffer_get(struct rpcrdma_buffer *buffers)
  1028. {
  1029. struct rpcrdma_req *req;
  1030. spin_lock(&buffers->rb_lock);
  1031. req = list_first_entry_or_null(&buffers->rb_send_bufs,
  1032. struct rpcrdma_req, rl_list);
  1033. if (req)
  1034. list_del_init(&req->rl_list);
  1035. spin_unlock(&buffers->rb_lock);
  1036. return req;
  1037. }
  1038. /**
  1039. * rpcrdma_buffer_put - Put request/reply buffers back into pool
  1040. * @buffers: buffer pool
  1041. * @req: object to return
  1042. *
  1043. */
  1044. void rpcrdma_buffer_put(struct rpcrdma_buffer *buffers, struct rpcrdma_req *req)
  1045. {
  1046. rpcrdma_reply_put(buffers, req);
  1047. spin_lock(&buffers->rb_lock);
  1048. list_add(&req->rl_list, &buffers->rb_send_bufs);
  1049. spin_unlock(&buffers->rb_lock);
  1050. }
  1051. /* Returns a pointer to a rpcrdma_regbuf object, or NULL.
  1052. *
  1053. * xprtrdma uses a regbuf for posting an outgoing RDMA SEND, or for
  1054. * receiving the payload of RDMA RECV operations. During Long Calls
  1055. * or Replies they may be registered externally via frwr_map.
  1056. */
  1057. static struct rpcrdma_regbuf *
  1058. rpcrdma_regbuf_alloc_node(size_t size, enum dma_data_direction direction,
  1059. int node)
  1060. {
  1061. struct rpcrdma_regbuf *rb;
  1062. rb = kmalloc_node(sizeof(*rb), XPRTRDMA_GFP_FLAGS, node);
  1063. if (!rb)
  1064. return NULL;
  1065. rb->rg_data = kmalloc_node(size, XPRTRDMA_GFP_FLAGS, node);
  1066. if (!rb->rg_data) {
  1067. kfree(rb);
  1068. return NULL;
  1069. }
  1070. rb->rg_device = NULL;
  1071. rb->rg_direction = direction;
  1072. rb->rg_iov.length = size;
  1073. return rb;
  1074. }
  1075. static struct rpcrdma_regbuf *
  1076. rpcrdma_regbuf_alloc(size_t size, enum dma_data_direction direction)
  1077. {
  1078. return rpcrdma_regbuf_alloc_node(size, direction, NUMA_NO_NODE);
  1079. }
  1080. /**
  1081. * rpcrdma_regbuf_realloc - re-allocate a SEND/RECV buffer
  1082. * @rb: regbuf to reallocate
  1083. * @size: size of buffer to be allocated, in bytes
  1084. * @flags: GFP flags
  1085. *
  1086. * Returns true if reallocation was successful. If false is
  1087. * returned, @rb is left untouched.
  1088. */
  1089. bool rpcrdma_regbuf_realloc(struct rpcrdma_regbuf *rb, size_t size, gfp_t flags)
  1090. {
  1091. void *buf;
  1092. buf = kmalloc(size, flags);
  1093. if (!buf)
  1094. return false;
  1095. rpcrdma_regbuf_dma_unmap(rb);
  1096. kfree(rb->rg_data);
  1097. rb->rg_data = buf;
  1098. rb->rg_iov.length = size;
  1099. return true;
  1100. }
  1101. /**
  1102. * __rpcrdma_regbuf_dma_map - DMA-map a regbuf
  1103. * @r_xprt: controlling transport instance
  1104. * @rb: regbuf to be mapped
  1105. *
  1106. * Returns true if the buffer is now DMA mapped to @r_xprt's device
  1107. */
  1108. bool __rpcrdma_regbuf_dma_map(struct rpcrdma_xprt *r_xprt,
  1109. struct rpcrdma_regbuf *rb)
  1110. {
  1111. struct ib_device *device = r_xprt->rx_ep->re_id->device;
  1112. if (rb->rg_direction == DMA_NONE)
  1113. return false;
  1114. rb->rg_iov.addr = ib_dma_map_single(device, rdmab_data(rb),
  1115. rdmab_length(rb), rb->rg_direction);
  1116. if (ib_dma_mapping_error(device, rdmab_addr(rb))) {
  1117. trace_xprtrdma_dma_maperr(rdmab_addr(rb));
  1118. return false;
  1119. }
  1120. rb->rg_device = device;
  1121. rb->rg_iov.lkey = r_xprt->rx_ep->re_pd->local_dma_lkey;
  1122. return true;
  1123. }
  1124. static void rpcrdma_regbuf_dma_unmap(struct rpcrdma_regbuf *rb)
  1125. {
  1126. if (!rb)
  1127. return;
  1128. if (!rpcrdma_regbuf_is_mapped(rb))
  1129. return;
  1130. ib_dma_unmap_single(rb->rg_device, rdmab_addr(rb), rdmab_length(rb),
  1131. rb->rg_direction);
  1132. rb->rg_device = NULL;
  1133. }
  1134. static void rpcrdma_regbuf_free(struct rpcrdma_regbuf *rb)
  1135. {
  1136. rpcrdma_regbuf_dma_unmap(rb);
  1137. if (rb)
  1138. kfree(rb->rg_data);
  1139. kfree(rb);
  1140. }
  1141. /**
  1142. * rpcrdma_post_recvs - Refill the Receive Queue
  1143. * @r_xprt: controlling transport instance
  1144. * @needed: current credit grant
  1145. *
  1146. */
  1147. void rpcrdma_post_recvs(struct rpcrdma_xprt *r_xprt, int needed)
  1148. {
  1149. struct rpcrdma_buffer *buf = &r_xprt->rx_buf;
  1150. struct rpcrdma_ep *ep = r_xprt->rx_ep;
  1151. struct ib_recv_wr *wr, *bad_wr;
  1152. struct rpcrdma_rep *rep;
  1153. int count, rc;
  1154. rc = 0;
  1155. count = 0;
  1156. if (likely(ep->re_receive_count > needed))
  1157. goto out;
  1158. needed -= ep->re_receive_count;
  1159. needed += RPCRDMA_MAX_RECV_BATCH;
  1160. if (atomic_inc_return(&ep->re_receiving) > 1)
  1161. goto out_dec;
  1162. /* fast path: all needed reps can be found on the free list */
  1163. wr = NULL;
  1164. while (needed) {
  1165. rep = rpcrdma_rep_get_locked(buf);
  1166. if (!rep)
  1167. rep = rpcrdma_rep_create(r_xprt);
  1168. if (!rep)
  1169. break;
  1170. if (!rpcrdma_regbuf_dma_map(r_xprt, rep->rr_rdmabuf)) {
  1171. rpcrdma_rep_put(buf, rep);
  1172. break;
  1173. }
  1174. rep->rr_cid.ci_queue_id = ep->re_attr.recv_cq->res.id;
  1175. trace_xprtrdma_post_recv(&rep->rr_cid);
  1176. rep->rr_recv_wr.next = wr;
  1177. wr = &rep->rr_recv_wr;
  1178. --needed;
  1179. ++count;
  1180. }
  1181. if (!wr)
  1182. goto out_dec;
  1183. rc = ib_post_recv(ep->re_id->qp, wr,
  1184. (const struct ib_recv_wr **)&bad_wr);
  1185. if (rc) {
  1186. trace_xprtrdma_post_recvs_err(r_xprt, rc);
  1187. for (wr = bad_wr; wr;) {
  1188. struct rpcrdma_rep *rep;
  1189. rep = container_of(wr, struct rpcrdma_rep, rr_recv_wr);
  1190. wr = wr->next;
  1191. rpcrdma_rep_put(buf, rep);
  1192. --count;
  1193. }
  1194. }
  1195. out_dec:
  1196. if (atomic_dec_return(&ep->re_receiving) > 0)
  1197. complete(&ep->re_done);
  1198. out:
  1199. trace_xprtrdma_post_recvs(r_xprt, count);
  1200. ep->re_receive_count += count;
  1201. return;
  1202. }