backchannel_rqst.c 11 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399
  1. // SPDX-License-Identifier: GPL-2.0-only
  2. /******************************************************************************
  3. (c) 2007 Network Appliance, Inc. All Rights Reserved.
  4. (c) 2009 NetApp. All Rights Reserved.
  5. ******************************************************************************/
  6. #include <linux/tcp.h>
  7. #include <linux/slab.h>
  8. #include <linux/sunrpc/xprt.h>
  9. #include <linux/export.h>
  10. #include <linux/sunrpc/bc_xprt.h>
  11. #if IS_ENABLED(CONFIG_SUNRPC_DEBUG)
  12. #define RPCDBG_FACILITY RPCDBG_TRANS
  13. #endif
  14. #define BC_MAX_SLOTS 64U
  15. unsigned int xprt_bc_max_slots(struct rpc_xprt *xprt)
  16. {
  17. return BC_MAX_SLOTS;
  18. }
  19. /*
  20. * Helper function to nullify backchannel server pointer in transport.
  21. * We need to synchronize setting the pointer to NULL (done so after
  22. * the backchannel server is shutdown) with the usage of that pointer
  23. * by the backchannel request processing routines
  24. * xprt_complete_bc_request() and rpcrdma_bc_receive_call().
  25. */
  26. void xprt_svc_destroy_nullify_bc(struct rpc_xprt *xprt, struct svc_serv **serv)
  27. {
  28. spin_lock(&xprt->bc_pa_lock);
  29. svc_destroy(serv);
  30. xprt->bc_serv = NULL;
  31. spin_unlock(&xprt->bc_pa_lock);
  32. }
  33. EXPORT_SYMBOL_GPL(xprt_svc_destroy_nullify_bc);
  34. /*
  35. * Helper routines that track the number of preallocation elements
  36. * on the transport.
  37. */
  38. static inline int xprt_need_to_requeue(struct rpc_xprt *xprt)
  39. {
  40. return xprt->bc_alloc_count < xprt->bc_alloc_max;
  41. }
  42. /*
  43. * Free the preallocated rpc_rqst structure and the memory
  44. * buffers hanging off of it.
  45. */
  46. static void xprt_free_allocation(struct rpc_rqst *req)
  47. {
  48. struct xdr_buf *xbufp;
  49. dprintk("RPC: free allocations for req= %p\n", req);
  50. WARN_ON_ONCE(test_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state));
  51. xbufp = &req->rq_rcv_buf;
  52. free_page((unsigned long)xbufp->head[0].iov_base);
  53. xbufp = &req->rq_snd_buf;
  54. free_page((unsigned long)xbufp->head[0].iov_base);
  55. kfree(req);
  56. }
  57. static void xprt_bc_reinit_xdr_buf(struct xdr_buf *buf)
  58. {
  59. buf->head[0].iov_len = PAGE_SIZE;
  60. buf->tail[0].iov_len = 0;
  61. buf->pages = NULL;
  62. buf->page_len = 0;
  63. buf->flags = 0;
  64. buf->len = 0;
  65. buf->buflen = PAGE_SIZE;
  66. }
  67. static int xprt_alloc_xdr_buf(struct xdr_buf *buf, gfp_t gfp_flags)
  68. {
  69. struct page *page;
  70. /* Preallocate one XDR receive buffer */
  71. page = alloc_page(gfp_flags);
  72. if (page == NULL)
  73. return -ENOMEM;
  74. xdr_buf_init(buf, page_address(page), PAGE_SIZE);
  75. return 0;
  76. }
  77. static struct rpc_rqst *xprt_alloc_bc_req(struct rpc_xprt *xprt)
  78. {
  79. gfp_t gfp_flags = GFP_KERNEL | __GFP_NORETRY | __GFP_NOWARN;
  80. struct rpc_rqst *req;
  81. /* Pre-allocate one backchannel rpc_rqst */
  82. req = kzalloc_obj(*req, gfp_flags);
  83. if (req == NULL)
  84. return NULL;
  85. req->rq_xprt = xprt;
  86. /* Preallocate one XDR receive buffer */
  87. if (xprt_alloc_xdr_buf(&req->rq_rcv_buf, gfp_flags) < 0) {
  88. printk(KERN_ERR "Failed to create bc receive xbuf\n");
  89. goto out_free;
  90. }
  91. req->rq_rcv_buf.len = PAGE_SIZE;
  92. /* Preallocate one XDR send buffer */
  93. if (xprt_alloc_xdr_buf(&req->rq_snd_buf, gfp_flags) < 0) {
  94. printk(KERN_ERR "Failed to create bc snd xbuf\n");
  95. goto out_free;
  96. }
  97. return req;
  98. out_free:
  99. xprt_free_allocation(req);
  100. return NULL;
  101. }
  102. /*
  103. * Preallocate up to min_reqs structures and related buffers for use
  104. * by the backchannel. This function can be called multiple times
  105. * when creating new sessions that use the same rpc_xprt. The
  106. * preallocated buffers are added to the pool of resources used by
  107. * the rpc_xprt. Any one of these resources may be used by an
  108. * incoming callback request. It's up to the higher levels in the
  109. * stack to enforce that the maximum number of session slots is not
  110. * being exceeded.
  111. *
  112. * Some callback arguments can be large. For example, a pNFS server
  113. * using multiple deviceids. The list can be unbound, but the client
  114. * has the ability to tell the server the maximum size of the callback
  115. * requests. Each deviceID is 16 bytes, so allocate one page
  116. * for the arguments to have enough room to receive a number of these
  117. * deviceIDs. The NFS client indicates to the pNFS server that its
  118. * callback requests can be up to 4096 bytes in size.
  119. */
  120. int xprt_setup_backchannel(struct rpc_xprt *xprt, unsigned int min_reqs)
  121. {
  122. if (!xprt->ops->bc_setup)
  123. return 0;
  124. return xprt->ops->bc_setup(xprt, min_reqs);
  125. }
  126. EXPORT_SYMBOL_GPL(xprt_setup_backchannel);
  127. int xprt_setup_bc(struct rpc_xprt *xprt, unsigned int min_reqs)
  128. {
  129. struct rpc_rqst *req;
  130. LIST_HEAD(tmp_list);
  131. int i;
  132. dprintk("RPC: setup backchannel transport\n");
  133. if (min_reqs > BC_MAX_SLOTS)
  134. min_reqs = BC_MAX_SLOTS;
  135. /*
  136. * We use a temporary list to keep track of the preallocated
  137. * buffers. Once we're done building the list we splice it
  138. * into the backchannel preallocation list off of the rpc_xprt
  139. * struct. This helps minimize the amount of time the list
  140. * lock is held on the rpc_xprt struct. It also makes cleanup
  141. * easier in case of memory allocation errors.
  142. */
  143. for (i = 0; i < min_reqs; i++) {
  144. /* Pre-allocate one backchannel rpc_rqst */
  145. req = xprt_alloc_bc_req(xprt);
  146. if (req == NULL) {
  147. printk(KERN_ERR "Failed to create bc rpc_rqst\n");
  148. goto out_free;
  149. }
  150. /* Add the allocated buffer to the tmp list */
  151. dprintk("RPC: adding req= %p\n", req);
  152. list_add(&req->rq_bc_pa_list, &tmp_list);
  153. }
  154. /*
  155. * Add the temporary list to the backchannel preallocation list
  156. */
  157. spin_lock(&xprt->bc_pa_lock);
  158. list_splice(&tmp_list, &xprt->bc_pa_list);
  159. xprt->bc_alloc_count += min_reqs;
  160. xprt->bc_alloc_max += min_reqs;
  161. atomic_add(min_reqs, &xprt->bc_slot_count);
  162. spin_unlock(&xprt->bc_pa_lock);
  163. dprintk("RPC: setup backchannel transport done\n");
  164. return 0;
  165. out_free:
  166. /*
  167. * Memory allocation failed, free the temporary list
  168. */
  169. while (!list_empty(&tmp_list)) {
  170. req = list_first_entry(&tmp_list,
  171. struct rpc_rqst,
  172. rq_bc_pa_list);
  173. list_del(&req->rq_bc_pa_list);
  174. xprt_free_allocation(req);
  175. }
  176. dprintk("RPC: setup backchannel transport failed\n");
  177. return -ENOMEM;
  178. }
  179. /**
  180. * xprt_destroy_backchannel - Destroys the backchannel preallocated structures.
  181. * @xprt: the transport holding the preallocated strucures
  182. * @max_reqs: the maximum number of preallocated structures to destroy
  183. *
  184. * Since these structures may have been allocated by multiple calls
  185. * to xprt_setup_backchannel, we only destroy up to the maximum number
  186. * of reqs specified by the caller.
  187. */
  188. void xprt_destroy_backchannel(struct rpc_xprt *xprt, unsigned int max_reqs)
  189. {
  190. if (xprt->ops->bc_destroy)
  191. xprt->ops->bc_destroy(xprt, max_reqs);
  192. }
  193. EXPORT_SYMBOL_GPL(xprt_destroy_backchannel);
  194. void xprt_destroy_bc(struct rpc_xprt *xprt, unsigned int max_reqs)
  195. {
  196. struct rpc_rqst *req = NULL, *tmp = NULL;
  197. dprintk("RPC: destroy backchannel transport\n");
  198. if (max_reqs == 0)
  199. goto out;
  200. spin_lock_bh(&xprt->bc_pa_lock);
  201. xprt->bc_alloc_max -= min(max_reqs, xprt->bc_alloc_max);
  202. list_for_each_entry_safe(req, tmp, &xprt->bc_pa_list, rq_bc_pa_list) {
  203. dprintk("RPC: req=%p\n", req);
  204. list_del(&req->rq_bc_pa_list);
  205. xprt_free_allocation(req);
  206. xprt->bc_alloc_count--;
  207. atomic_dec(&xprt->bc_slot_count);
  208. if (--max_reqs == 0)
  209. break;
  210. }
  211. spin_unlock_bh(&xprt->bc_pa_lock);
  212. out:
  213. dprintk("RPC: backchannel list empty= %s\n",
  214. list_empty(&xprt->bc_pa_list) ? "true" : "false");
  215. }
  216. static struct rpc_rqst *xprt_get_bc_request(struct rpc_xprt *xprt, __be32 xid,
  217. struct rpc_rqst *new)
  218. {
  219. struct rpc_rqst *req = NULL;
  220. dprintk("RPC: allocate a backchannel request\n");
  221. if (list_empty(&xprt->bc_pa_list)) {
  222. if (!new)
  223. goto not_found;
  224. if (atomic_read(&xprt->bc_slot_count) >= BC_MAX_SLOTS)
  225. goto not_found;
  226. list_add_tail(&new->rq_bc_pa_list, &xprt->bc_pa_list);
  227. xprt->bc_alloc_count++;
  228. atomic_inc(&xprt->bc_slot_count);
  229. }
  230. req = list_first_entry(&xprt->bc_pa_list, struct rpc_rqst,
  231. rq_bc_pa_list);
  232. req->rq_reply_bytes_recvd = 0;
  233. memcpy(&req->rq_private_buf, &req->rq_rcv_buf,
  234. sizeof(req->rq_private_buf));
  235. req->rq_xid = xid;
  236. req->rq_connect_cookie = xprt->connect_cookie;
  237. dprintk("RPC: backchannel req=%p\n", req);
  238. not_found:
  239. return req;
  240. }
  241. /*
  242. * Return the preallocated rpc_rqst structure and XDR buffers
  243. * associated with this rpc_task.
  244. */
  245. void xprt_free_bc_request(struct rpc_rqst *req)
  246. {
  247. struct rpc_xprt *xprt = req->rq_xprt;
  248. xprt->ops->bc_free_rqst(req);
  249. }
  250. void xprt_free_bc_rqst(struct rpc_rqst *req)
  251. {
  252. struct rpc_xprt *xprt = req->rq_xprt;
  253. dprintk("RPC: free backchannel req=%p\n", req);
  254. req->rq_connect_cookie = xprt->connect_cookie - 1;
  255. smp_mb__before_atomic();
  256. clear_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
  257. smp_mb__after_atomic();
  258. /*
  259. * Return it to the list of preallocations so that it
  260. * may be reused by a new callback request.
  261. */
  262. spin_lock_bh(&xprt->bc_pa_lock);
  263. if (xprt_need_to_requeue(xprt)) {
  264. xprt_bc_reinit_xdr_buf(&req->rq_snd_buf);
  265. xprt_bc_reinit_xdr_buf(&req->rq_rcv_buf);
  266. req->rq_rcv_buf.len = PAGE_SIZE;
  267. list_add_tail(&req->rq_bc_pa_list, &xprt->bc_pa_list);
  268. xprt->bc_alloc_count++;
  269. atomic_inc(&xprt->bc_slot_count);
  270. req = NULL;
  271. }
  272. spin_unlock_bh(&xprt->bc_pa_lock);
  273. if (req != NULL) {
  274. /*
  275. * The last remaining session was destroyed while this
  276. * entry was in use. Free the entry and don't attempt
  277. * to add back to the list because there is no need to
  278. * have anymore preallocated entries.
  279. */
  280. dprintk("RPC: Last session removed req=%p\n", req);
  281. xprt_free_allocation(req);
  282. }
  283. xprt_put(xprt);
  284. }
  285. /*
  286. * One or more rpc_rqst structure have been preallocated during the
  287. * backchannel setup. Buffer space for the send and private XDR buffers
  288. * has been preallocated as well. Use xprt_alloc_bc_request to allocate
  289. * to this request. Use xprt_free_bc_request to return it.
  290. *
  291. * We know that we're called in soft interrupt context, grab the spin_lock
  292. * since there is no need to grab the bottom half spin_lock.
  293. *
  294. * Return an available rpc_rqst, otherwise NULL if non are available.
  295. */
  296. struct rpc_rqst *xprt_lookup_bc_request(struct rpc_xprt *xprt, __be32 xid)
  297. {
  298. struct rpc_rqst *req, *new = NULL;
  299. do {
  300. spin_lock(&xprt->bc_pa_lock);
  301. list_for_each_entry(req, &xprt->bc_pa_list, rq_bc_pa_list) {
  302. if (req->rq_connect_cookie != xprt->connect_cookie)
  303. continue;
  304. if (req->rq_xid == xid)
  305. goto found;
  306. }
  307. req = xprt_get_bc_request(xprt, xid, new);
  308. found:
  309. spin_unlock(&xprt->bc_pa_lock);
  310. if (new) {
  311. if (req != new)
  312. xprt_free_allocation(new);
  313. break;
  314. } else if (req)
  315. break;
  316. new = xprt_alloc_bc_req(xprt);
  317. } while (new);
  318. return req;
  319. }
  320. /*
  321. * Add callback request to callback list. Wake a thread
  322. * on the first pool (usually the only pool) to handle it.
  323. */
  324. void xprt_complete_bc_request(struct rpc_rqst *req, uint32_t copied)
  325. {
  326. struct rpc_xprt *xprt = req->rq_xprt;
  327. spin_lock(&xprt->bc_pa_lock);
  328. list_del(&req->rq_bc_pa_list);
  329. xprt->bc_alloc_count--;
  330. spin_unlock(&xprt->bc_pa_lock);
  331. req->rq_private_buf.len = copied;
  332. set_bit(RPC_BC_PA_IN_USE, &req->rq_bc_pa_state);
  333. dprintk("RPC: add callback request to list\n");
  334. xprt_enqueue_bc_request(req);
  335. }
  336. void xprt_enqueue_bc_request(struct rpc_rqst *req)
  337. {
  338. struct rpc_xprt *xprt = req->rq_xprt;
  339. struct svc_serv *bc_serv;
  340. xprt_get(xprt);
  341. spin_lock(&xprt->bc_pa_lock);
  342. bc_serv = xprt->bc_serv;
  343. if (bc_serv) {
  344. lwq_enqueue(&req->rq_bc_list, &bc_serv->sv_cb_list);
  345. svc_pool_wake_idle_thread(&bc_serv->sv_pools[0]);
  346. }
  347. spin_unlock(&xprt->bc_pa_lock);
  348. }
  349. EXPORT_SYMBOL_GPL(xprt_enqueue_bc_request);