recvmsg.c 17 KB

123456789101112131415161718192021222324252627282930313233343536373839404142434445464748495051525354555657585960616263646566676869707172737475767778798081828384858687888990919293949596979899100101102103104105106107108109110111112113114115116117118119120121122123124125126127128129130131132133134135136137138139140141142143144145146147148149150151152153154155156157158159160161162163164165166167168169170171172173174175176177178179180181182183184185186187188189190191192193194195196197198199200201202203204205206207208209210211212213214215216217218219220221222223224225226227228229230231232233234235236237238239240241242243244245246247248249250251252253254255256257258259260261262263264265266267268269270271272273274275276277278279280281282283284285286287288289290291292293294295296297298299300301302303304305306307308309310311312313314315316317318319320321322323324325326327328329330331332333334335336337338339340341342343344345346347348349350351352353354355356357358359360361362363364365366367368369370371372373374375376377378379380381382383384385386387388389390391392393394395396397398399400401402403404405406407408409410411412413414415416417418419420421422423424425426427428429430431432433434435436437438439440441442443444445446447448449450451452453454455456457458459460461462463464465466467468469470471472473474475476477478479480481482483484485486487488489490491492493494495496497498499500501502503504505506507508509510511512513514515516517518519520521522523524525526527528529530531532533534535536537538539540541542543544545546547548549550551552553554555556557558559560561562563564565566567568569570571572573574575576577578579580581582583584585586587588589590591592593594595596597598599600601602603604605606607608609610611612613614615616617618619620621622623624625626627628629630631632633634635636637638639640641642643644645646647648649650651652653654655656657658659660661662663664665666667668669
  1. // SPDX-License-Identifier: GPL-2.0-or-later
  2. /* RxRPC recvmsg() implementation
  3. *
  4. * Copyright (C) 2007 Red Hat, Inc. All Rights Reserved.
  5. * Written by David Howells (dhowells@redhat.com)
  6. */
  7. #define pr_fmt(fmt) KBUILD_MODNAME ": " fmt
  8. #include <linux/net.h>
  9. #include <linux/skbuff.h>
  10. #include <linux/export.h>
  11. #include <linux/sched/signal.h>
  12. #include <net/sock.h>
  13. #include <net/af_rxrpc.h>
  14. #include "ar-internal.h"
  15. /*
  16. * Post a call for attention by the socket or kernel service. Further
  17. * notifications are suppressed by putting recvmsg_link on a dummy queue.
  18. */
  19. void rxrpc_notify_socket(struct rxrpc_call *call)
  20. {
  21. struct rxrpc_sock *rx;
  22. struct sock *sk;
  23. _enter("%d", call->debug_id);
  24. if (!list_empty(&call->recvmsg_link))
  25. return;
  26. if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
  27. rxrpc_see_call(call, rxrpc_call_see_notify_released);
  28. return;
  29. }
  30. rcu_read_lock();
  31. rx = rcu_dereference(call->socket);
  32. sk = &rx->sk;
  33. if (rx && sk->sk_state < RXRPC_CLOSE) {
  34. if (call->notify_rx) {
  35. spin_lock_irq(&call->notify_lock);
  36. call->notify_rx(sk, call, call->user_call_ID);
  37. spin_unlock_irq(&call->notify_lock);
  38. } else {
  39. spin_lock_irq(&rx->recvmsg_lock);
  40. if (list_empty(&call->recvmsg_link)) {
  41. rxrpc_get_call(call, rxrpc_call_get_notify_socket);
  42. list_add_tail(&call->recvmsg_link, &rx->recvmsg_q);
  43. }
  44. spin_unlock_irq(&rx->recvmsg_lock);
  45. if (!sock_flag(sk, SOCK_DEAD)) {
  46. _debug("call %ps", sk->sk_data_ready);
  47. sk->sk_data_ready(sk);
  48. }
  49. }
  50. }
  51. rcu_read_unlock();
  52. _leave("");
  53. }
  54. /*
  55. * Pass a call terminating message to userspace.
  56. */
  57. static int rxrpc_recvmsg_term(struct rxrpc_call *call, struct msghdr *msg)
  58. {
  59. u32 tmp = 0;
  60. int ret;
  61. switch (call->completion) {
  62. case RXRPC_CALL_SUCCEEDED:
  63. ret = 0;
  64. if (rxrpc_is_service_call(call))
  65. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ACK, 0, &tmp);
  66. break;
  67. case RXRPC_CALL_REMOTELY_ABORTED:
  68. tmp = call->abort_code;
  69. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  70. break;
  71. case RXRPC_CALL_LOCALLY_ABORTED:
  72. tmp = call->abort_code;
  73. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_ABORT, 4, &tmp);
  74. break;
  75. case RXRPC_CALL_NETWORK_ERROR:
  76. tmp = -call->error;
  77. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_NET_ERROR, 4, &tmp);
  78. break;
  79. case RXRPC_CALL_LOCAL_ERROR:
  80. tmp = -call->error;
  81. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_LOCAL_ERROR, 4, &tmp);
  82. break;
  83. default:
  84. pr_err("Invalid terminal call state %u\n", call->completion);
  85. BUG();
  86. break;
  87. }
  88. trace_rxrpc_recvdata(call, rxrpc_recvmsg_terminal,
  89. call->ackr_window - 1,
  90. call->rx_pkt_offset, call->rx_pkt_len, ret);
  91. return ret;
  92. }
  93. /*
  94. * Discard a packet we've used up and advance the Rx window by one.
  95. */
  96. static void rxrpc_rotate_rx_window(struct rxrpc_call *call)
  97. {
  98. struct rxrpc_skb_priv *sp;
  99. struct sk_buff *skb;
  100. rxrpc_serial_t serial;
  101. rxrpc_seq_t old_consumed = call->rx_consumed, tseq;
  102. bool last;
  103. int acked;
  104. _enter("%d", call->debug_id);
  105. skb = skb_dequeue(&call->recvmsg_queue);
  106. rxrpc_see_skb(skb, rxrpc_skb_see_rotate);
  107. sp = rxrpc_skb(skb);
  108. tseq = sp->hdr.seq;
  109. serial = sp->hdr.serial;
  110. last = sp->hdr.flags & RXRPC_LAST_PACKET;
  111. /* Barrier against rxrpc_input_data(). */
  112. if (after(tseq, call->rx_consumed))
  113. smp_store_release(&call->rx_consumed, tseq);
  114. rxrpc_free_skb(skb, rxrpc_skb_put_rotate);
  115. trace_rxrpc_receive(call, last ? rxrpc_receive_rotate_last : rxrpc_receive_rotate,
  116. serial, call->rx_consumed);
  117. if (last)
  118. set_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags);
  119. /* Check to see if there's an ACK that needs sending. */
  120. acked = atomic_add_return(call->rx_consumed - old_consumed,
  121. &call->ackr_nr_consumed);
  122. if (acked > 8 &&
  123. !test_and_set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags))
  124. rxrpc_poke_call(call, rxrpc_call_poke_idle);
  125. }
  126. /*
  127. * Decrypt and verify a DATA packet.
  128. */
  129. static int rxrpc_verify_data(struct rxrpc_call *call, struct sk_buff *skb)
  130. {
  131. struct rxrpc_skb_priv *sp = rxrpc_skb(skb);
  132. if (sp->flags & RXRPC_RX_VERIFIED)
  133. return 0;
  134. return call->security->verify_packet(call, skb);
  135. }
  136. /*
  137. * Transcribe a call's user ID to a control message.
  138. */
  139. static int rxrpc_recvmsg_user_id(struct rxrpc_call *call, struct msghdr *msg,
  140. int flags)
  141. {
  142. if (!test_bit(RXRPC_CALL_HAS_USERID, &call->flags))
  143. return 0;
  144. if (flags & MSG_CMSG_COMPAT) {
  145. unsigned int id32 = call->user_call_ID;
  146. return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
  147. sizeof(unsigned int), &id32);
  148. } else {
  149. unsigned long idl = call->user_call_ID;
  150. return put_cmsg(msg, SOL_RXRPC, RXRPC_USER_CALL_ID,
  151. sizeof(unsigned long), &idl);
  152. }
  153. }
  154. /*
  155. * Deal with a CHALLENGE packet.
  156. */
  157. static int rxrpc_recvmsg_challenge(struct socket *sock, struct msghdr *msg,
  158. struct sk_buff *challenge, unsigned int flags)
  159. {
  160. struct rxrpc_skb_priv *sp = rxrpc_skb(challenge);
  161. struct rxrpc_connection *conn = sp->chall.conn;
  162. return conn->security->challenge_to_recvmsg(conn, challenge, msg);
  163. }
  164. /*
  165. * Process OOB packets. Called with the socket locked.
  166. */
  167. static int rxrpc_recvmsg_oob(struct socket *sock, struct msghdr *msg,
  168. unsigned int flags)
  169. {
  170. struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
  171. struct sk_buff *skb;
  172. bool need_response = false;
  173. int ret;
  174. skb = skb_peek(&rx->recvmsg_oobq);
  175. if (!skb)
  176. return -EAGAIN;
  177. rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
  178. ret = put_cmsg(msg, SOL_RXRPC, RXRPC_OOB_ID, sizeof(u64),
  179. &skb->skb_mstamp_ns);
  180. if (ret < 0)
  181. return ret;
  182. switch ((enum rxrpc_oob_type)skb->mark) {
  183. case RXRPC_OOB_CHALLENGE:
  184. need_response = true;
  185. ret = rxrpc_recvmsg_challenge(sock, msg, skb, flags);
  186. break;
  187. default:
  188. WARN_ONCE(1, "recvmsg() can't process unknown OOB type %u\n",
  189. skb->mark);
  190. ret = -EIO;
  191. break;
  192. }
  193. if (!(flags & MSG_PEEK))
  194. skb_unlink(skb, &rx->recvmsg_oobq);
  195. if (need_response)
  196. rxrpc_add_pending_oob(rx, skb);
  197. else
  198. rxrpc_free_skb(skb, rxrpc_skb_put_oob);
  199. return ret;
  200. }
  201. /*
  202. * Deliver messages to a call. This keeps processing packets until the buffer
  203. * is filled and we find either more DATA (returns 0) or the end of the DATA
  204. * (returns 1). If more packets are required, it returns -EAGAIN and if the
  205. * call has failed it returns -EIO.
  206. */
  207. static int rxrpc_recvmsg_data(struct socket *sock, struct rxrpc_call *call,
  208. struct msghdr *msg, struct iov_iter *iter,
  209. size_t len, int flags, size_t *_offset)
  210. {
  211. struct rxrpc_skb_priv *sp;
  212. struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
  213. struct sk_buff *skb;
  214. rxrpc_seq_t seq = 0;
  215. size_t remain;
  216. unsigned int rx_pkt_offset, rx_pkt_len;
  217. int copy, ret = -EAGAIN, ret2;
  218. rx_pkt_offset = call->rx_pkt_offset;
  219. rx_pkt_len = call->rx_pkt_len;
  220. if (rxrpc_call_has_failed(call)) {
  221. seq = call->ackr_window - 1;
  222. ret = -EIO;
  223. goto done;
  224. }
  225. if (test_bit(RXRPC_CALL_RECVMSG_READ_ALL, &call->flags)) {
  226. seq = call->ackr_window - 1;
  227. ret = 1;
  228. goto done;
  229. }
  230. /* No one else can be removing stuff from the queue, so we shouldn't
  231. * need the Rx lock to walk it.
  232. */
  233. skb = skb_peek(&call->recvmsg_queue);
  234. while (skb) {
  235. rxrpc_see_skb(skb, rxrpc_skb_see_recvmsg);
  236. sp = rxrpc_skb(skb);
  237. seq = sp->hdr.seq;
  238. if (!(flags & MSG_PEEK))
  239. trace_rxrpc_receive(call, rxrpc_receive_front,
  240. sp->hdr.serial, seq);
  241. if (msg)
  242. sock_recv_timestamp(msg, sock->sk, skb);
  243. if (rx_pkt_offset == 0) {
  244. ret2 = rxrpc_verify_data(call, skb);
  245. trace_rxrpc_recvdata(call, rxrpc_recvmsg_next, seq,
  246. sp->offset, sp->len, ret2);
  247. if (ret2 < 0) {
  248. ret = ret2;
  249. goto out;
  250. }
  251. rx_pkt_offset = sp->offset;
  252. rx_pkt_len = sp->len;
  253. } else {
  254. trace_rxrpc_recvdata(call, rxrpc_recvmsg_cont, seq,
  255. rx_pkt_offset, rx_pkt_len, 0);
  256. }
  257. /* We have to handle short, empty and used-up DATA packets. */
  258. remain = len - *_offset;
  259. copy = rx_pkt_len;
  260. if (copy > remain)
  261. copy = remain;
  262. if (copy > 0) {
  263. ret2 = skb_copy_datagram_iter(skb, rx_pkt_offset, iter,
  264. copy);
  265. if (ret2 < 0) {
  266. ret = ret2;
  267. goto out;
  268. }
  269. /* handle piecemeal consumption of data packets */
  270. rx_pkt_offset += copy;
  271. rx_pkt_len -= copy;
  272. *_offset += copy;
  273. }
  274. if (rx_pkt_len > 0) {
  275. trace_rxrpc_recvdata(call, rxrpc_recvmsg_full, seq,
  276. rx_pkt_offset, rx_pkt_len, 0);
  277. ASSERTCMP(*_offset, ==, len);
  278. ret = 0;
  279. break;
  280. }
  281. /* The whole packet has been transferred. */
  282. if (sp->hdr.flags & RXRPC_LAST_PACKET)
  283. ret = 1;
  284. rx_pkt_offset = 0;
  285. rx_pkt_len = 0;
  286. skb = skb_peek_next(skb, &call->recvmsg_queue);
  287. if (!(flags & MSG_PEEK))
  288. rxrpc_rotate_rx_window(call);
  289. if (!rx->app_ops &&
  290. !skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
  291. trace_rxrpc_recvdata(call, rxrpc_recvmsg_oobq, seq,
  292. rx_pkt_offset, rx_pkt_len, ret);
  293. break;
  294. }
  295. }
  296. out:
  297. if (!(flags & MSG_PEEK)) {
  298. call->rx_pkt_offset = rx_pkt_offset;
  299. call->rx_pkt_len = rx_pkt_len;
  300. }
  301. done:
  302. trace_rxrpc_recvdata(call, rxrpc_recvmsg_data_return, seq,
  303. rx_pkt_offset, rx_pkt_len, ret);
  304. if (ret == -EAGAIN)
  305. set_bit(RXRPC_CALL_RX_IS_IDLE, &call->flags);
  306. return ret;
  307. }
  308. /*
  309. * Receive a message from an RxRPC socket
  310. * - we need to be careful about two or more threads calling recvmsg
  311. * simultaneously
  312. */
  313. int rxrpc_recvmsg(struct socket *sock, struct msghdr *msg, size_t len,
  314. int flags)
  315. {
  316. struct rxrpc_call *call;
  317. struct rxrpc_sock *rx = rxrpc_sk(sock->sk);
  318. struct list_head *l;
  319. unsigned int call_debug_id = 0;
  320. size_t copied = 0;
  321. long timeo;
  322. int ret;
  323. DEFINE_WAIT(wait);
  324. trace_rxrpc_recvmsg(0, rxrpc_recvmsg_enter, 0);
  325. if (flags & (MSG_OOB | MSG_TRUNC))
  326. return -EOPNOTSUPP;
  327. timeo = sock_rcvtimeo(&rx->sk, flags & MSG_DONTWAIT);
  328. try_again:
  329. lock_sock(&rx->sk);
  330. /* Return immediately if a client socket has no outstanding calls */
  331. if (RB_EMPTY_ROOT(&rx->calls) &&
  332. list_empty(&rx->recvmsg_q) &&
  333. skb_queue_empty_lockless(&rx->recvmsg_oobq) &&
  334. rx->sk.sk_state != RXRPC_SERVER_LISTENING) {
  335. release_sock(&rx->sk);
  336. return -EAGAIN;
  337. }
  338. if (list_empty(&rx->recvmsg_q)) {
  339. ret = -EWOULDBLOCK;
  340. if (timeo == 0) {
  341. call = NULL;
  342. goto error_no_call;
  343. }
  344. release_sock(&rx->sk);
  345. /* Wait for something to happen */
  346. prepare_to_wait_exclusive(sk_sleep(&rx->sk), &wait,
  347. TASK_INTERRUPTIBLE);
  348. ret = sock_error(&rx->sk);
  349. if (ret)
  350. goto wait_error;
  351. if (list_empty(&rx->recvmsg_q) &&
  352. skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
  353. if (signal_pending(current))
  354. goto wait_interrupted;
  355. trace_rxrpc_recvmsg(0, rxrpc_recvmsg_wait, 0);
  356. timeo = schedule_timeout(timeo);
  357. }
  358. finish_wait(sk_sleep(&rx->sk), &wait);
  359. goto try_again;
  360. }
  361. /* Deal with OOB messages before we consider getting normal data. */
  362. if (!skb_queue_empty_lockless(&rx->recvmsg_oobq)) {
  363. ret = rxrpc_recvmsg_oob(sock, msg, flags);
  364. release_sock(&rx->sk);
  365. if (ret == -EAGAIN)
  366. goto try_again;
  367. goto error_no_call;
  368. }
  369. /* Find the next call and dequeue it if we're not just peeking. If we
  370. * do dequeue it, that comes with a ref that we will need to release.
  371. * We also want to weed out calls that got requeued whilst we were
  372. * shovelling data out.
  373. */
  374. spin_lock_irq(&rx->recvmsg_lock);
  375. l = rx->recvmsg_q.next;
  376. call = list_entry(l, struct rxrpc_call, recvmsg_link);
  377. if (!rxrpc_call_is_complete(call) &&
  378. skb_queue_empty(&call->recvmsg_queue) &&
  379. skb_queue_empty(&rx->recvmsg_oobq)) {
  380. list_del_init(&call->recvmsg_link);
  381. spin_unlock_irq(&rx->recvmsg_lock);
  382. release_sock(&rx->sk);
  383. trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
  384. rxrpc_put_call(call, rxrpc_call_put_recvmsg);
  385. goto try_again;
  386. }
  387. rxrpc_see_call(call, rxrpc_call_see_recvmsg);
  388. if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
  389. rxrpc_see_call(call, rxrpc_call_see_already_released);
  390. list_del_init(&call->recvmsg_link);
  391. spin_unlock_irq(&rx->recvmsg_lock);
  392. release_sock(&rx->sk);
  393. trace_rxrpc_recvmsg(call->debug_id, rxrpc_recvmsg_unqueue, 0);
  394. rxrpc_put_call(call, rxrpc_call_put_recvmsg);
  395. goto try_again;
  396. }
  397. if (!(flags & MSG_PEEK))
  398. list_del_init(&call->recvmsg_link);
  399. else
  400. rxrpc_get_call(call, rxrpc_call_get_recvmsg);
  401. spin_unlock_irq(&rx->recvmsg_lock);
  402. call_debug_id = call->debug_id;
  403. trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_dequeue, 0);
  404. /* We're going to drop the socket lock, so we need to lock the call
  405. * against interference by sendmsg.
  406. */
  407. if (!mutex_trylock(&call->user_mutex)) {
  408. ret = -EWOULDBLOCK;
  409. if (flags & MSG_DONTWAIT)
  410. goto error_requeue_call;
  411. ret = -ERESTARTSYS;
  412. if (mutex_lock_interruptible(&call->user_mutex) < 0)
  413. goto error_requeue_call;
  414. }
  415. release_sock(&rx->sk);
  416. if (test_bit(RXRPC_CALL_RELEASED, &call->flags)) {
  417. rxrpc_see_call(call, rxrpc_call_see_already_released);
  418. mutex_unlock(&call->user_mutex);
  419. if (!(flags & MSG_PEEK))
  420. rxrpc_put_call(call, rxrpc_call_put_recvmsg);
  421. goto try_again;
  422. }
  423. ret = rxrpc_recvmsg_user_id(call, msg, flags);
  424. if (ret < 0)
  425. goto error_unlock_call;
  426. if (msg->msg_name && call->peer) {
  427. size_t len = sizeof(call->dest_srx);
  428. memcpy(msg->msg_name, &call->dest_srx, len);
  429. msg->msg_namelen = len;
  430. }
  431. ret = rxrpc_recvmsg_data(sock, call, msg, &msg->msg_iter, len,
  432. flags, &copied);
  433. if (ret == -EAGAIN)
  434. ret = 0;
  435. if (ret == -EIO)
  436. goto call_failed;
  437. if (ret < 0)
  438. goto error_unlock_call;
  439. if (rxrpc_call_is_complete(call) &&
  440. skb_queue_empty(&call->recvmsg_queue))
  441. goto call_complete;
  442. if (rxrpc_call_has_failed(call))
  443. goto call_failed;
  444. if (!(flags & MSG_PEEK) &&
  445. !skb_queue_empty(&call->recvmsg_queue))
  446. rxrpc_notify_socket(call);
  447. goto not_yet_complete;
  448. call_failed:
  449. rxrpc_purge_queue(&call->recvmsg_queue);
  450. call_complete:
  451. ret = rxrpc_recvmsg_term(call, msg);
  452. if (ret < 0)
  453. goto error_unlock_call;
  454. if (!(flags & MSG_PEEK))
  455. rxrpc_release_call(rx, call);
  456. msg->msg_flags |= MSG_EOR;
  457. ret = 1;
  458. not_yet_complete:
  459. if (ret == 0)
  460. msg->msg_flags |= MSG_MORE;
  461. else
  462. msg->msg_flags &= ~MSG_MORE;
  463. ret = copied;
  464. error_unlock_call:
  465. mutex_unlock(&call->user_mutex);
  466. rxrpc_put_call(call, rxrpc_call_put_recvmsg);
  467. trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
  468. return ret;
  469. error_requeue_call:
  470. if (!(flags & MSG_PEEK)) {
  471. spin_lock_irq(&rx->recvmsg_lock);
  472. if (list_empty(&call->recvmsg_link)) {
  473. list_add(&call->recvmsg_link, &rx->recvmsg_q);
  474. rxrpc_see_call(call, rxrpc_call_see_recvmsg_requeue);
  475. spin_unlock_irq(&rx->recvmsg_lock);
  476. } else if (list_is_first(&call->recvmsg_link, &rx->recvmsg_q)) {
  477. spin_unlock_irq(&rx->recvmsg_lock);
  478. rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_first);
  479. } else {
  480. list_move(&call->recvmsg_link, &rx->recvmsg_q);
  481. spin_unlock_irq(&rx->recvmsg_lock);
  482. rxrpc_put_call(call, rxrpc_call_see_recvmsg_requeue_move);
  483. }
  484. trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_requeue, 0);
  485. } else {
  486. rxrpc_put_call(call, rxrpc_call_put_recvmsg_peek_nowait);
  487. }
  488. error_no_call:
  489. release_sock(&rx->sk);
  490. error_trace:
  491. trace_rxrpc_recvmsg(call_debug_id, rxrpc_recvmsg_return, ret);
  492. return ret;
  493. wait_interrupted:
  494. ret = sock_intr_errno(timeo);
  495. wait_error:
  496. finish_wait(sk_sleep(&rx->sk), &wait);
  497. call = NULL;
  498. goto error_trace;
  499. }
  500. /**
  501. * rxrpc_kernel_recv_data - Allow a kernel service to receive data/info
  502. * @sock: The socket that the call exists on
  503. * @call: The call to send data through
  504. * @iter: The buffer to receive into
  505. * @_len: The amount of data we want to receive (decreased on return)
  506. * @want_more: True if more data is expected to be read
  507. * @_abort: Where the abort code is stored if -ECONNABORTED is returned
  508. * @_service: Where to store the actual service ID (may be upgraded)
  509. *
  510. * Allow a kernel service to receive data and pick up information about the
  511. * state of a call. Note that *@_abort should also be initialised to %0.
  512. *
  513. * Note that we may return %-EAGAIN to drain empty packets at the end
  514. * of the data, even if we've already copied over the requested data.
  515. *
  516. * Return: %0 if got what was asked for and there's more available, %1
  517. * if we got what was asked for and we're at the end of the data and
  518. * %-EAGAIN if we need more data.
  519. */
  520. int rxrpc_kernel_recv_data(struct socket *sock, struct rxrpc_call *call,
  521. struct iov_iter *iter, size_t *_len,
  522. bool want_more, u32 *_abort, u16 *_service)
  523. {
  524. size_t offset = 0;
  525. int ret;
  526. _enter("{%d},%zu,%d", call->debug_id, *_len, want_more);
  527. mutex_lock(&call->user_mutex);
  528. ret = rxrpc_recvmsg_data(sock, call, NULL, iter, *_len, 0, &offset);
  529. *_len -= offset;
  530. if (ret == -EIO)
  531. goto call_failed;
  532. if (ret < 0)
  533. goto out;
  534. /* We can only reach here with a partially full buffer if we have
  535. * reached the end of the data. We must otherwise have a full buffer
  536. * or have been given -EAGAIN.
  537. */
  538. if (ret == 1) {
  539. if (iov_iter_count(iter) > 0)
  540. goto short_data;
  541. if (!want_more)
  542. goto read_phase_complete;
  543. ret = 0;
  544. goto out;
  545. }
  546. if (!want_more)
  547. goto excess_data;
  548. goto out;
  549. read_phase_complete:
  550. ret = 1;
  551. out:
  552. if (_service)
  553. *_service = call->dest_srx.srx_service;
  554. mutex_unlock(&call->user_mutex);
  555. _leave(" = %d [%zu,%d]", ret, iov_iter_count(iter), *_abort);
  556. return ret;
  557. short_data:
  558. trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_short_data,
  559. call->cid, call->call_id, call->rx_consumed,
  560. 0, -EBADMSG);
  561. ret = -EBADMSG;
  562. goto out;
  563. excess_data:
  564. trace_rxrpc_abort(call->debug_id, rxrpc_recvmsg_excess_data,
  565. call->cid, call->call_id, call->rx_consumed,
  566. 0, -EMSGSIZE);
  567. ret = -EMSGSIZE;
  568. goto out;
  569. call_failed:
  570. *_abort = call->abort_code;
  571. ret = call->error;
  572. if (call->completion == RXRPC_CALL_SUCCEEDED) {
  573. ret = 1;
  574. if (iov_iter_count(iter) > 0)
  575. ret = -ECONNRESET;
  576. }
  577. goto out;
  578. }
  579. EXPORT_SYMBOL(rxrpc_kernel_recv_data);